From e9abeda9162c2b19e327d205c420fef8255922ec Mon Sep 17 00:00:00 2001 From: Adam Van Ymeren Date: Thu, 17 Jul 2025 09:56:11 -0400 Subject: [PATCH] Rework telegram client lifecycle to hopefully fix not stopping issues (#112) --- pkg/connector/client.go | 156 ++++++++++-------------------------- pkg/connector/future.go | 51 ++++++++++++ pkg/connector/login.go | 13 +-- pkg/connector/loginphone.go | 22 +++-- pkg/connector/loginqr.go | 21 +++-- pkg/connector/matrix.go | 13 +-- 6 files changed, 132 insertions(+), 144 deletions(-) create mode 100644 pkg/connector/future.go diff --git a/pkg/connector/client.go b/pkg/connector/client.go index c8210648..d7e629c4 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -60,20 +60,19 @@ var ( ) type TelegramClient struct { - main *TelegramConnector - ScopedStore *store.ScopedStore - telegramUserID int64 - loginID networkid.UserLoginID - userID networkid.UserID - userLogin *bridgev2.UserLogin - client *telegram.Client - updatesManager *updates.Manager - updatesCloseC chan struct{} - clientCtx context.Context - clientCancel context.CancelFunc - clientCloseC chan struct{} - initialized chan struct{} - mu sync.Mutex + main *TelegramConnector + ScopedStore *store.ScopedStore + telegramUserID int64 + loginID networkid.UserLoginID + userID networkid.UserID + userLogin *bridgev2.UserLogin + client *telegram.Client + updatesManager *updates.Manager + clientCtx context.Context + clientCancel context.CancelFunc + clientDone *Future[error] + clientInitialized *exsync.Event + mu sync.Mutex appConfigLock sync.Mutex appConfig map[string]any @@ -182,7 +181,7 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge prevReactionPoll: map[networkid.PortalKey]time.Time{}, - initialized: make(chan struct{}), + clientInitialized: exsync.NewEvent(), } if !login.Metadata.(*UserLoginMetadata).Session.HasAuthKey() { @@ -394,38 +393,6 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge return &client, err } -// connectTelegramClient blocks until client is connected, calling Run -// internally. -// Technique from: https://github.com/gotd/contrib/blob/master/bg/connect.go -func connectTelegramClient(ctx context.Context, cancel context.CancelFunc, client *telegram.Client) (<-chan struct{}, error) { - errC := make(chan error, 1) - initDone := make(chan struct{}) - closeC := make(chan struct{}) - go func() { - defer close(errC) - defer close(closeC) - errC <- client.Run(ctx, func(ctx context.Context) error { - close(initDone) - <-ctx.Done() - if errors.Is(ctx.Err(), context.Canceled) { - return nil - } - return ctx.Err() - }) - }() - - select { - case <-ctx.Done(): // context canceled - cancel() - return nil, fmt.Errorf("context cancelled before init done: %w", ctx.Err()) - case err := <-errC: // startup timeout - cancel() - return nil, fmt.Errorf("client connection timeout: %w", err) - case <-initDone: // init done - } - return closeC, nil -} - func (t *TelegramClient) onDead() { prevState := t.userLogin.BridgeState.GetPrev().StateEvent if slices.Contains([]status.BridgeStateEvent{ @@ -504,18 +471,20 @@ func (t *TelegramClient) onAuthError(err error) { t.sendBadCredentialsOrUnknownError(err) t.userLogin.Metadata.(*UserLoginMetadata).ResetOnLogout() go func() { - t.Disconnect() if err := t.userLogin.Save(context.Background()); err != nil { t.main.Bridge.Log.Err(err).Msg("failed to save user login") } }() } -func (t *TelegramClient) Connect(ctx context.Context) { +func (t *TelegramClient) Connect(_ context.Context) { t.mu.Lock() defer t.mu.Unlock() - log := zerolog.Ctx(ctx).With().Int64("user_id", t.telegramUserID).Logger() + ctx := context.Background() + + log := zerolog.Ctx(context.Background()).With().Int64("user_id", t.telegramUserID).Logger() + ctx = log.WithContext(ctx) if !t.userLogin.Metadata.(*UserLoginMetadata).Session.HasAuthKey() { log.Warn().Msg("user does not have an auth key, sending bad credentials state") @@ -525,61 +494,28 @@ func (t *TelegramClient) Connect(ctx context.Context) { log.Info().Msg("Connecting client") - t.clientCtx, t.clientCancel = context.WithCancel(ctx) - t.clientCloseC = make(chan struct{}) - t.updatesCloseC = make(chan struct{}) + // Add a cancellation layer we can use for explicit Disconnect + + ctx, cancel := context.WithCancel(ctx) + t.clientCtx = ctx + t.clientCancel = cancel + t.clientDone = NewFuture[error]() + t.clientInitialized.Clear() + + runTelegramClient(ctx, t.client, t.clientInitialized, t.clientDone, func(ctx context.Context) error { + log.Info().Msg("Client running starting updates") + return t.updatesManager.Run(ctx, t.client.API(), t.telegramUserID, updates.AuthOptions{}) + }) +} + +func runTelegramClient(ctx context.Context, client *telegram.Client, initialized *exsync.Event, done *Future[error], callback func(ctx context.Context) error) { go func() { - defer close(t.initialized) - connectClientCloseC, err := connectTelegramClient(t.clientCtx, t.clientCancel, t.client) - if err != nil { - t.sendBadCredentialsOrUnknownError(err) - close(t.updatesCloseC) - return - } - - // awful hack to prevent assigning clientCloseC from racing Disconnect() - go func() { - <-connectClientCloseC - close(t.clientCloseC) - }() - - go func() { - defer close(t.updatesCloseC) - for { - err = t.updatesManager.Run(t.clientCtx, t.client.API(), t.telegramUserID, updates.AuthOptions{}) - if err == nil || errors.Is(err, context.Canceled) { - return - } - - zerolog.Ctx(t.clientCtx).Err(err).Msg("failed to run updates manager, retrying") - - select { - case <-t.clientCtx.Done(): - return - case <-time.After(5 * time.Second): - } - } - }() - - // Update the logged-in user's ghost info (this also updates the user - // login's remote name and profile). - if me, err := t.client.Self(t.clientCtx); err != nil { - t.sendBadCredentialsOrUnknownError(err) - } else if _, err := t.updateGhost(t.clientCtx, t.telegramUserID, me); err != nil { - t.sendBadCredentialsOrUnknownError(err) - } else { - t.userLogin.BridgeState.Send(status.BridgeState{StateEvent: status.StateConnected}) - } - - // Fix the "Telegram Saved Messages" chat - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ - ChatInfo: t.getDMChatInfo(t.telegramUserID), - EventMeta: simplevent.EventMeta{ - Type: bridgev2.RemoteEventChatResync, - PortalKey: t.makePortalKeyFromID(ids.PeerTypeUser, t.telegramUserID), - CreatePortal: false, // Do not create the portal if it doesn't already exist - }, + err := client.Run(ctx, func(ctx context.Context) error { + initialized.Set() + return callback(ctx) }) + initialized.Set() + done.Set(err) }() } @@ -591,17 +527,9 @@ func (t *TelegramClient) Disconnect() { if t.clientCancel != nil { t.clientCancel() - t.clientCancel = nil - } - if t.clientCloseC != nil { - t.userLogin.Log.Debug().Msg("Waiting for client to finish") - <-t.clientCloseC - t.clientCloseC = nil - } - if t.updatesCloseC != nil { - t.userLogin.Log.Debug().Msg("Waiting for updates to finish") - <-t.updatesCloseC - t.updatesCloseC = nil + t.userLogin.Log.Info().Msg("Waiting for client") + err, _ := t.clientDone.Get(context.Background()) + t.userLogin.Log.Info().Err(err).Msg("Client done") } t.userLogin.Log.Info().Msg("Disconnect complete") diff --git a/pkg/connector/future.go b/pkg/connector/future.go new file mode 100644 index 00000000..f9624786 --- /dev/null +++ b/pkg/connector/future.go @@ -0,0 +1,51 @@ +// mautrix-telegram - A Matrix-Telegram puppeting bridge. +// Copyright (C) 2025 Automattic Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package connector + +import ( + "context" + "sync" +) + +type Future[T any] struct { + value T + err error + ready chan struct{} + once sync.Once +} + +func NewFuture[T any]() *Future[T] { + return &Future[T]{ + ready: make(chan struct{}), + } +} + +func (f *Future[T]) Set(value T) { + f.once.Do(func() { + f.value = value + close(f.ready) + }) +} + +func (f *Future[T]) Get(ctx context.Context) (T, error) { + select { + case <-f.ready: + return f.value, nil + case <-ctx.Done(): + return f.value, ctx.Err() + } +} diff --git a/pkg/connector/login.go b/pkg/connector/login.go index 3d763d26..b515ee05 100644 --- a/pkg/connector/login.go +++ b/pkg/connector/login.go @@ -75,21 +75,22 @@ func finalizeLogin(ctx context.Context, user *bridgev2.User, authorization *tg.A if err != nil { return nil, fmt.Errorf("failed to save new login: %w", err) } - ul.Client.Connect(ul.Log.WithContext(context.Background())) + ul.Client.Connect(ul.Log.WithContext(ctx)) client := ul.Client.(*TelegramClient) + // Connecting is non-blocking so wait for gotd to initialize before doing anythign to avoid deadlocking - select { - case <-client.initialized: - case <-ctx.Done(): - return nil, ctx.Err() + err = client.clientInitialized.Wait(ctx) + if err != nil { + return nil, err } + me, err := client.client.Self(ctx) if err != nil { return nil, err } go func() { log := ul.Log.With().Str("component", "login_sync_chats").Logger() - if err := client.SyncChats(log.WithContext(context.Background())); err != nil { + if err := client.SyncChats(log.WithContext(client.clientCtx)); err != nil { log.Err(err).Msg("Failed to sync chats") } }() diff --git a/pkg/connector/loginphone.go b/pkg/connector/loginphone.go index 4e22ddb3..72614c5d 100644 --- a/pkg/connector/loginphone.go +++ b/pkg/connector/loginphone.go @@ -23,6 +23,7 @@ import ( "time" "github.com/rs/zerolog" + "go.mau.fi/util/exsync" "go.mau.fi/zerozap" "go.uber.org/zap" "maunium.net/go/mautrix/bridgev2" @@ -45,7 +46,6 @@ type PhoneLogin struct { authClient *telegram.Client authClientCtx context.Context authClientCancel context.CancelFunc - authClientCloseC <-chan struct{} phone string hash string @@ -56,9 +56,7 @@ var _ bridgev2.LoginProcessUserInput = (*PhoneLogin)(nil) func (p *PhoneLogin) Cancel() { if p.authClientCancel != nil { p.authClientCancel() - } - if p.authClientCloseC != nil { - <-p.authClientCloseC + <-p.authClientCtx.Done() } } @@ -88,11 +86,21 @@ func (p *PhoneLogin) SubmitUserInput(ctx context.Context, input map[string]strin CustomSessionStorage: &p.authData, Logger: zap.New(zerozap.New(zerolog.Ctx(ctx).With().Str("component", "telegram_phone_login_client").Logger())), }) - var err error - p.authClientCtx, p.authClientCancel = context.WithTimeoutCause(log.WithContext(context.Background()), time.Hour, errors.New("phone login took over one hour")) - if p.authClientCloseC, err = connectTelegramClient(p.authClientCtx, p.authClientCancel, p.authClient); err != nil { + + p.authClientCtx, p.authClientCancel = context.WithTimeoutCause(log.WithContext(ctx), time.Hour, errors.New("phone login took over one hour")) + initialized := exsync.NewEvent() + done := NewFuture[error]() + runTelegramClient(p.authClientCtx, p.authClient, initialized, done, func(ctx context.Context) error { + <-ctx.Done() + return ctx.Err() + }) + + log.Info().Msg("Waiting for client to connect.") + err := initialized.Wait(ctx) + if err != nil { return nil, err } + sentCode, err := p.authClient.Auth().SendCode(p.authClientCtx, p.phone, auth.SendCodeOptions{}) if err != nil { return nil, err diff --git a/pkg/connector/loginqr.go b/pkg/connector/loginqr.go index 21986c14..026fd135 100644 --- a/pkg/connector/loginqr.go +++ b/pkg/connector/loginqr.go @@ -23,6 +23,7 @@ import ( "time" "github.com/rs/zerolog" + "go.mau.fi/util/exsync" "go.mau.fi/zerozap" "go.uber.org/zap" "maunium.net/go/mautrix/bridgev2" @@ -48,7 +49,6 @@ type QRLogin struct { authClientCtx context.Context authClientCancel context.CancelFunc - authClientCloseC <-chan struct{} auth chan qrAuthResult qrToken chan qrlogin.Token @@ -62,9 +62,7 @@ var _ bridgev2.LoginProcessUserInput = (*QRLogin)(nil) // For asking for pa func (q *QRLogin) Cancel() { if q.authClientCancel != nil { q.authClientCancel() - } - if q.authClientCloseC != nil { - <-q.authClientCloseC + <-q.authClientCtx.Done() } } @@ -88,9 +86,18 @@ func (q *QRLogin) Start(ctx context.Context) (*bridgev2.LoginStep, error) { Logger: zaplog, }) - var err error - q.authClientCtx, q.authClientCancel = context.WithTimeoutCause(log.WithContext(context.Background()), time.Hour, errors.New("phone login took over one hour")) - if q.authClientCloseC, err = connectTelegramClient(q.authClientCtx, q.authClientCancel, q.authClient); err != nil { + q.authClientCtx, q.authClientCancel = context.WithTimeoutCause(log.WithContext(ctx), time.Hour, errors.New("phone login took over one hour")) + + initialized := exsync.NewEvent() + done := NewFuture[error]() + runTelegramClient(q.authClientCtx, q.authClient, initialized, done, func(ctx context.Context) error { + <-ctx.Done() + return ctx.Err() + }) + + log.Info().Msg("Waiting for client to connect.") + err := initialized.Wait(ctx) + if err != nil { return nil, err } diff --git a/pkg/connector/matrix.go b/pkg/connector/matrix.go index 0bfa961e..52426646 100644 --- a/pkg/connector/matrix.go +++ b/pkg/connector/matrix.go @@ -220,16 +220,9 @@ func parseRandomID(txnID networkid.RawTransactionID) int64 { func (t *TelegramClient) HandleMatrixMessage(ctx context.Context, msg *bridgev2.MatrixMessage) (resp *bridgev2.MatrixMessageResponse, err error) { // Handle Matrix events only after initial connection has been established to avoid deadlocking gotd - select { - case <-t.initialized: - default: - zerolog.Ctx(ctx).Warn().Msg("Got Matrix event before connected, blocking until done") - - select { - case <-t.initialized: - case <-ctx.Done(): - return nil, ctx.Err() - } + err = t.clientInitialized.Wait(ctx) + if err != nil { + return nil, err } peer, err := t.inputPeerForPortalID(ctx, msg.Portal.ID)