From 0f36833e89d3b821ea4452997983860468b669f6 Mon Sep 17 00:00:00 2001 From: Toni Spets Date: Tue, 27 May 2025 07:40:20 +0300 Subject: [PATCH] Revert "Revert "client: unblock connect without network"" This reverts commit ea4626107c56799ad3ca503e63450823a7f36546. Adds waiting support for initial connection established to avoid locking up gotd. This isn't extremely pretty but should do the job for now. --- pkg/connector/client.go | 84 ++++++++++++++++++++++------------------- pkg/connector/login.go | 6 +++ pkg/connector/matrix.go | 13 +++++++ 3 files changed, 65 insertions(+), 38 deletions(-) diff --git a/pkg/connector/client.go b/pkg/connector/client.go index 8d41eb37..69969641 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -67,6 +67,7 @@ type TelegramClient struct { clientCtx context.Context clientCancel context.CancelFunc clientCloseC <-chan struct{} + initialized chan struct{} mu sync.Mutex appConfigLock sync.Mutex @@ -164,6 +165,8 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge takeoutAccepted: exsync.NewEvent(), prevReactionPoll: map[networkid.PortalKey]time.Time{}, + + initialized: make(chan struct{}), } if !login.Metadata.(*UserLoginMetadata).Session.HasAuthKey() { @@ -498,48 +501,53 @@ func (t *TelegramClient) Connect(ctx context.Context) { var err error t.clientCtx, t.clientCancel = context.WithCancel(ctx) - if t.clientCloseC, err = connectTelegramClient(t.clientCtx, t.clientCancel, t.client); err != nil { - t.sendBadCredentialsOrUnknownError(err) - return - } t.updatesCloseC = make(chan struct{}) go func() { - defer close(t.updatesCloseC) - for { - err = t.updatesManager.Run(t.clientCtx, t.client.API(), t.telegramUserID, updates.AuthOptions{}) - if err == nil || errors.Is(err, context.Canceled) { - return - } - - zerolog.Ctx(t.clientCtx).Err(err).Msg("failed to run updates manager, retrying") - - select { - case <-t.clientCtx.Done(): - return - case <-time.After(5 * time.Second): - } + defer close(t.initialized) + if t.clientCloseC, err = connectTelegramClient(t.clientCtx, t.clientCancel, t.client); err != nil { + t.sendBadCredentialsOrUnknownError(err) + close(t.updatesCloseC) + return } + + go func() { + defer close(t.updatesCloseC) + for { + err = t.updatesManager.Run(t.clientCtx, t.client.API(), t.telegramUserID, updates.AuthOptions{}) + if err == nil || errors.Is(err, context.Canceled) { + return + } + + zerolog.Ctx(t.clientCtx).Err(err).Msg("failed to run updates manager, retrying") + + select { + case <-t.clientCtx.Done(): + return + case <-time.After(5 * time.Second): + } + } + }() + + // Update the logged-in user's ghost info (this also updates the user + // login's remote name and profile). + if me, err := t.client.Self(t.clientCtx); err != nil { + t.sendBadCredentialsOrUnknownError(err) + } else if _, err := t.updateGhost(t.clientCtx, t.telegramUserID, me); err != nil { + t.sendBadCredentialsOrUnknownError(err) + } else { + t.userLogin.BridgeState.Send(status.BridgeState{StateEvent: status.StateConnected}) + } + + // Fix the "Telegram Saved Messages" chat + t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ + ChatInfo: t.getDMChatInfo(t.telegramUserID), + EventMeta: simplevent.EventMeta{ + Type: bridgev2.RemoteEventChatResync, + PortalKey: t.makePortalKeyFromID(ids.PeerTypeUser, t.telegramUserID), + CreatePortal: false, // Do not create the portal if it doesn't already exist + }, + }) }() - - // Update the logged-in user's ghost info (this also updates the user - // login's remote name and profile). - if me, err := t.client.Self(t.clientCtx); err != nil { - t.sendBadCredentialsOrUnknownError(err) - } else if _, err := t.updateGhost(t.clientCtx, t.telegramUserID, me); err != nil { - t.sendBadCredentialsOrUnknownError(err) - } else { - t.userLogin.BridgeState.Send(status.BridgeState{StateEvent: status.StateConnected}) - } - - // Fix the "Telegram Saved Messages" chat - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ - ChatInfo: t.getDMChatInfo(t.telegramUserID), - EventMeta: simplevent.EventMeta{ - Type: bridgev2.RemoteEventChatResync, - PortalKey: t.makePortalKeyFromID(ids.PeerTypeUser, t.telegramUserID), - CreatePortal: false, // Do not create the portal if it doesn't already exist - }, - }) } func (t *TelegramClient) Disconnect() { diff --git a/pkg/connector/login.go b/pkg/connector/login.go index a3305721..02ea1b3e 100644 --- a/pkg/connector/login.go +++ b/pkg/connector/login.go @@ -76,6 +76,12 @@ func finalizeLogin(ctx context.Context, user *bridgev2.User, authorization *tg.A } ul.Client.Connect(ul.Log.WithContext(context.Background())) client := ul.Client.(*TelegramClient) + // Connecting is non-blocking so wait for gotd to initialize before doing anythign to avoid deadlocking + select { + case <-client.initialized: + case <-ctx.Done(): + return nil, ctx.Err() + } me, err := client.client.Self(ctx) if err != nil { return nil, err diff --git a/pkg/connector/matrix.go b/pkg/connector/matrix.go index 1800741a..24d202a4 100644 --- a/pkg/connector/matrix.go +++ b/pkg/connector/matrix.go @@ -218,6 +218,19 @@ func parseRandomID(txnID networkid.RawTransactionID) int64 { } func (t *TelegramClient) HandleMatrixMessage(ctx context.Context, msg *bridgev2.MatrixMessage) (resp *bridgev2.MatrixMessageResponse, err error) { + // Handle Matrix events only after initial connection has been established to avoid deadlocking gotd + select { + case <-t.initialized: + default: + zerolog.Ctx(ctx).Warn().Msg("Got Matrix event before connected, blocking until done") + + select { + case <-t.initialized: + case <-ctx.Done(): + return nil, ctx.Err() + } + } + peer, err := t.inputPeerForPortalID(ctx, msg.Portal.ID) if err != nil { return nil, err