From a84dd2f30ccdbc153dd71e214412bd0650350d94 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Tue, 3 Mar 2026 16:22:41 +0200 Subject: [PATCH] handletelegram: add log for stuck update handlers --- pkg/connector/client.go | 51 ++---------------- pkg/connector/handletelegram.go | 91 ++++++++++++++++++++++++++++++++- 2 files changed, 92 insertions(+), 50 deletions(-) diff --git a/pkg/connector/client.go b/pkg/connector/client.go index 969ac328..de00d6a3 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -178,53 +178,7 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge } dispatcher := tg.NewUpdateDispatcher() - dispatcher.OnFallback(client.onEntityUpdate) - dispatcher.OnNewMessage(func(ctx context.Context, e tg.Entities, update *tg.UpdateNewMessage) error { - return client.onUpdateNewMessage(ctx, e, update) - }) - dispatcher.OnNewChannelMessage(func(ctx context.Context, e tg.Entities, update *tg.UpdateNewChannelMessage) error { - return client.onUpdateNewMessage(ctx, e, update) - }) - dispatcher.OnChannel(client.onUpdateChannel) - dispatcher.OnUserName(client.onUserName) - dispatcher.OnDeleteMessages(func(ctx context.Context, e tg.Entities, update *tg.UpdateDeleteMessages) error { - return client.onDeleteMessages(ctx, 0, update) - }) - dispatcher.OnDeleteChannelMessages(func(ctx context.Context, e tg.Entities, update *tg.UpdateDeleteChannelMessages) error { - return client.onDeleteMessages(ctx, update.ChannelID, update) - }) - dispatcher.OnEditMessage(func(ctx context.Context, e tg.Entities, update *tg.UpdateEditMessage) error { - return client.onMessageEdit(ctx, update) - }) - dispatcher.OnEditChannelMessage(func(ctx context.Context, e tg.Entities, update *tg.UpdateEditChannelMessage) error { - return client.onMessageEdit(ctx, update) - }) - dispatcher.OnUserTyping(func(ctx context.Context, e tg.Entities, update *tg.UpdateUserTyping) error { - return client.handleTyping(client.makePortalKeyFromID(ids.PeerTypeUser, update.UserID, 0), client.senderForUserID(update.UserID), update.Action) - }) - dispatcher.OnChatUserTyping(func(ctx context.Context, e tg.Entities, update *tg.UpdateChatUserTyping) error { - if update.FromID.TypeID() != tg.PeerUserTypeID { - log.Warn().Str("from_id_type", update.FromID.TypeName()).Msg("unsupported from_id type") - return nil - } - return client.handleTyping(client.makePortalKeyFromID(ids.PeerTypeChat, update.ChatID, 0), client.getPeerSender(update.FromID), update.Action) - }) - dispatcher.OnChannelUserTyping(func(ctx context.Context, e tg.Entities, update *tg.UpdateChannelUserTyping) error { - return client.handleTyping(client.makePortalKeyFromID(ids.PeerTypeChannel, update.ChannelID, update.TopMsgID), client.getPeerSender(update.FromID), update.Action) - }) - dispatcher.OnReadHistoryOutbox(client.updateReadReceipt) - dispatcher.OnReadHistoryInbox(func(ctx context.Context, e tg.Entities, update *tg.UpdateReadHistoryInbox) error { - return client.onOwnReadReceipt(client.makePortalKeyFromPeer(update.Peer, update.TopMsgID), update.MaxID) - }) - dispatcher.OnReadChannelInbox(func(ctx context.Context, e tg.Entities, update *tg.UpdateReadChannelInbox) error { - return client.onOwnReadReceipt(client.makePortalKeyFromID(ids.PeerTypeChannel, update.ChannelID, 0), update.MaxID) - }) - dispatcher.OnNotifySettings(client.onNotifySettings) - dispatcher.OnPinnedDialogs(client.onPinnedDialogs) - dispatcher.OnChatDefaultBannedRights(client.onChatDefaultBannedRights) - dispatcher.OnPeerBlocked(client.onPeerBlocked) - dispatcher.OnChat(client.onChat) - dispatcher.OnPhoneCall(client.onPhoneCall) + dispatcher.OnFallback(client.onUpdateWrapper) client.updatesManager = updates.New(updates.Config{ OnNotChannelMember: client.onNotChannelMember, @@ -426,7 +380,8 @@ func (t *TelegramClient) sendBadCredentialsOrUnknownError(err error) { } func (t *TelegramClient) onPing() { - if t.userLogin.BridgeState.GetPrev().StateEvent == status.StateConnected { + prev := t.userLogin.BridgeState.GetPrev() + if prev.StateEvent == status.StateConnected || prev.Error == updateHandlerStuck { return } ctx := t.userLogin.Log.WithContext(t.main.Bridge.BackgroundCtx) diff --git a/pkg/connector/handletelegram.go b/pkg/connector/handletelegram.go index 0eaab578..f635d13b 100644 --- a/pkg/connector/handletelegram.go +++ b/pkg/connector/handletelegram.go @@ -860,7 +860,46 @@ func (t *TelegramClient) updateChannel(ctx context.Context, channel *tg.Channel) return userInfo, nil } -func (t *TelegramClient) onEntityUpdate(ctx context.Context, e tg.Entities, upd tg.UpdateClass) error { +const updateHandlerStuck status.BridgeStateErrorCode = "tg-update-handler-stuck" + +func (t *TelegramClient) onUpdateWrapper(ctx context.Context, e tg.Entities, upd tg.UpdateClass) error { + doneChan := make(chan error, 1) + go func() { + doneChan <- t.onUpdate(ctx, e, upd) + }() + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + startedAt := time.Now() + bridgeStateUpdated := false + for { + select { + case <-ticker.C: + zerolog.Ctx(ctx).Warn(). + Time("started_at", startedAt). + Msg("Telegram update handling is taking long") + if time.Since(startedAt) > 3*time.Minute && !bridgeStateUpdated { + bridgeStateUpdated = true + t.userLogin.BridgeState.Send(status.BridgeState{ + StateEvent: status.StateUnknownError, + Error: updateHandlerStuck, + Message: "Processing messages from Telegram is stuck", + }) + } + case err := <-doneChan: + if bridgeStateUpdated && t.userLogin.BridgeState.GetPrevUnsent().Error == updateHandlerStuck { + t.userLogin.BridgeState.Send(status.BridgeState{ + StateEvent: status.StateConnected, + Info: map[string]any{ + "update_reason": "finished processing slow update", + }, + }) + } + return err + } + } +} + +func (t *TelegramClient) onUpdate(ctx context.Context, e tg.Entities, upd tg.UpdateClass) error { zerolog.Ctx(ctx).Trace().Stringer("update", upd).Msg("Raw update") for userID, user := range e.Users { zerolog.Ctx(ctx).Trace().Stringer("user", user).Msg("Raw user info in update") @@ -883,7 +922,55 @@ func (t *TelegramClient) onEntityUpdate(ctx context.Context, e tg.Entities, upd return err } } - return nil + switch update := upd.(type) { + case *tg.UpdateNewMessage: + return t.onUpdateNewMessage(ctx, e, update) + case *tg.UpdateNewChannelMessage: + return t.onUpdateNewMessage(ctx, e, update) + case *tg.UpdateChannel: + return t.onUpdateChannel(ctx, e, update) + case *tg.UpdateUserName: + return t.onUserName(ctx, e, update) + case *tg.UpdateDeleteMessages: + return t.onDeleteMessages(ctx, 0, update) + case *tg.UpdateDeleteChannelMessages: + return t.onDeleteMessages(ctx, update.ChannelID, update) + case *tg.UpdateEditMessage: + return t.onMessageEdit(ctx, update) + case *tg.UpdateEditChannelMessage: + return t.onMessageEdit(ctx, update) + case *tg.UpdateUserTyping: + return t.handleTyping(t.makePortalKeyFromID(ids.PeerTypeUser, update.UserID, 0), t.senderForUserID(update.UserID), update.Action) + case *tg.UpdateChatUserTyping: + if update.FromID.TypeID() != tg.PeerUserTypeID { + zerolog.Ctx(ctx).Warn().Str("from_id_type", update.FromID.TypeName()).Msg("unsupported from_id type") + return nil + } + return t.handleTyping(t.makePortalKeyFromID(ids.PeerTypeChat, update.ChatID, 0), t.getPeerSender(update.FromID), update.Action) + case *tg.UpdateChannelUserTyping: + return t.handleTyping(t.makePortalKeyFromID(ids.PeerTypeChannel, update.ChannelID, update.TopMsgID), t.getPeerSender(update.FromID), update.Action) + case *tg.UpdateReadHistoryOutbox: + return t.updateReadReceipt(ctx, e, update) + case *tg.UpdateReadHistoryInbox: + return t.onOwnReadReceipt(t.makePortalKeyFromPeer(update.Peer, update.TopMsgID), update.MaxID) + case *tg.UpdateReadChannelInbox: + return t.onOwnReadReceipt(t.makePortalKeyFromID(ids.PeerTypeChannel, update.ChannelID, 0), update.MaxID) + case *tg.UpdateNotifySettings: + return t.onNotifySettings(ctx, e, update) + case *tg.UpdatePinnedDialogs: + return t.onPinnedDialogs(ctx, e, update) + case *tg.UpdateChatDefaultBannedRights: + return t.onChatDefaultBannedRights(ctx, e, update) + case *tg.UpdatePeerBlocked: + return t.onPeerBlocked(ctx, e, update) + case *tg.UpdateChat: + return t.onChat(ctx, e, update) + case *tg.UpdatePhoneCall: + return t.onPhoneCall(ctx, e, update) + default: + zerolog.Ctx(ctx).Debug().Type("update_type", update).Msg("Unhandled update type") + return nil + } } func (t *TelegramClient) onMessageEdit(ctx context.Context, update IGetMessage) error {