From a31787f8949b83a1120927bc3cd1066ebe85bccd Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Thu, 31 Jul 2025 13:46:24 +0300 Subject: [PATCH] client: include event handling error in returns --- pkg/connector/client.go | 15 +++-- pkg/connector/reactions.go | 10 +-- pkg/connector/sync.go | 4 +- pkg/connector/telegram.go | 130 +++++++++++++++---------------------- 4 files changed, 67 insertions(+), 92 deletions(-) diff --git a/pkg/connector/client.go b/pkg/connector/client.go index d7e629c4..1e4192df 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -59,6 +59,16 @@ var ( ErrFailToQueueEvent = errors.New("failed to queue event") ) +func resultToError(res bridgev2.EventHandlingResult) error { + if !res.Success { + if res.Error != nil { + return fmt.Errorf("%w: %w", ErrFailToQueueEvent, res.Error) + } + return ErrFailToQueueEvent + } + return nil +} + type TelegramClient struct { main *TelegramConnector ScopedStore *store.ScopedStore @@ -252,10 +262,7 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge CheckNeedsBackfillFunc: func(ctx context.Context, latestMessage *database.Message) (bool, error) { return true, nil }, }) - if !res.Success { - return ErrFailToQueueEvent - } - return nil + return resultToError(res) }, Handler: dispatcher, Logger: zaplog.Named("gaps"), diff --git a/pkg/connector/reactions.go b/pkg/connector/reactions.go index a3228fb9..f0f5deea 100644 --- a/pkg/connector/reactions.go +++ b/pkg/connector/reactions.go @@ -158,11 +158,7 @@ func (t *TelegramClient) handleTelegramReactions(ctx context.Context, msg *tg.Me Reactions: &bridgev2.ReactionSyncData{Users: users, HasAllUsers: isFull}, }) - if !res.Success { - return ErrFailToQueueEvent - } - - return nil + return resultToError(res) } func splitDMReactionCounts(res []tg.ReactionCount, theirUserID, myUserID int64) (reactions []tg.MessagePeerReaction) { @@ -301,8 +297,8 @@ func (t *TelegramClient) pollForReactions(ctx context.Context, portalKey network TargetMessage: dbMsg.ID, Reactions: &bridgev2.ReactionSyncData{Users: users, HasAllUsers: isFull}, }) - if !res.Success { - return ErrFailToQueueEvent + if err := resultToError(res); err != nil { + return err } } else { log.Warn().Type("update_type", update).Msg("Unexpected update type in get reactions response") diff --git a/pkg/connector/sync.go b/pkg/connector/sync.go index 8d875e4f..3c643a73 100644 --- a/pkg/connector/sync.go +++ b/pkg/connector/sync.go @@ -283,8 +283,8 @@ func (t *TelegramClient) handleDialogs(ctx context.Context, dialogs tg.ModifiedM }, }) - if !res.Success { - return ErrFailToQueueEvent + if err = resultToError(res); err != nil { + return err } } return nil diff --git a/pkg/connector/telegram.go b/pkg/connector/telegram.go index fa2ac8a2..f5aff3b1 100644 --- a/pkg/connector/telegram.go +++ b/pkg/connector/telegram.go @@ -67,10 +67,7 @@ func (t *TelegramClient) selfLeaveChat(portalKey networkid.PortalKey) error { }, OnlyForMe: true, }) - if !res.Success { - return ErrFailToQueueEvent - } - return nil + return resultToError(res) } func (t *TelegramClient) onUpdateChannel(ctx context.Context, e tg.Entities, update *tg.UpdateChannel) error { @@ -162,8 +159,8 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent ConvertMessageFunc: t.convertToMatrixWithRefetch, }) - if !res.Success { - return ErrFailToQueueEvent + if err := resultToError(res); err != nil { + return err } return t.handleTelegramReactions(ctx, msg) @@ -190,8 +187,8 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent EventMeta: eventMeta.WithType(bridgev2.RemoteEventChatInfoChange), ChatInfoChange: &bridgev2.ChatInfoChange{ChatInfo: &bridgev2.ChatInfo{Name: &action.Title}}, }) - if !res.Success { - return ErrFailToQueueEvent + if err := resultToError(res); err != nil { + return err } case *tg.MessageActionChatEditPhoto: switch peer := msg.PeerID.(type) { @@ -200,16 +197,16 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent EventMeta: eventMeta.WithType(bridgev2.RemoteEventChatInfoChange), ChatInfoChange: &bridgev2.ChatInfoChange{ChatInfo: &bridgev2.ChatInfo{Avatar: t.avatarFromPhoto(ctx, ids.PeerTypeChat, peer.ChatID, action.Photo)}}, }) - if !res.Success { - return ErrFailToQueueEvent + if err := resultToError(res); err != nil { + return err } case *tg.PeerChannel: res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{ EventMeta: eventMeta.WithType(bridgev2.RemoteEventChatInfoChange), ChatInfoChange: &bridgev2.ChatInfoChange{ChatInfo: &bridgev2.ChatInfo{Avatar: t.avatarFromPhoto(ctx, ids.PeerTypeChannel, peer.ChannelID, action.Photo)}}, }) - if !res.Success { - return ErrFailToQueueEvent + if err := resultToError(res); err != nil { + return err } } @@ -218,8 +215,8 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent EventMeta: eventMeta.WithType(bridgev2.RemoteEventChatInfoChange), ChatInfoChange: &bridgev2.ChatInfoChange{ChatInfo: &bridgev2.ChatInfo{Avatar: &bridgev2.Avatar{Remove: true}}}, }) - if !res.Success { - return ErrFailToQueueEvent + if err := resultToError(res); err != nil { + return err } case *tg.MessageActionChatAddUser: memberChanges := &bridgev2.ChatMemberList{ @@ -235,8 +232,8 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent EventMeta: eventMeta.WithType(bridgev2.RemoteEventChatInfoChange), ChatInfoChange: &bridgev2.ChatInfoChange{MemberChanges: memberChanges}, }) - if !res.Success { - return ErrFailToQueueEvent + if err := resultToError(res); err != nil { + return err } case *tg.MessageActionChatJoinedByLink: res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{ @@ -249,8 +246,8 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent }, }, }) - if !res.Success { - return ErrFailToQueueEvent + if err := resultToError(res); err != nil { + return err } case *tg.MessageActionChatDeleteUser: if action.UserID == t.telegramUserID { @@ -269,8 +266,8 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent }, }, }) - if !res.Success { - return ErrFailToQueueEvent + if err := resultToError(res); err != nil { + return err } case *tg.MessageActionChatCreate: memberMap := map[networkid.UserID]bridgev2.ChatMember{} @@ -295,8 +292,8 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent CanBackfill: true, }, }) - if !res.Success { - return ErrFailToQueueEvent + if err := resultToError(res); err != nil { + return err } res = t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[any]{ EventMeta: eventMeta.WithType(bridgev2.RemoteEventMessage), @@ -312,8 +309,8 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent }, nil }, }) - if !res.Success { - return ErrFailToQueueEvent + if err := resultToError(res); err != nil { + return err } case *tg.MessageActionChannelCreate: @@ -339,8 +336,8 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent CanBackfill: true, }, }) - if !res.Success { - return ErrFailToQueueEvent + if err := resultToError(res); err != nil { + return err } res = t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[any]{ EventMeta: eventMeta.WithType(bridgev2.RemoteEventMessage), @@ -356,8 +353,8 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent }, nil }, }) - if !res.Success { - return ErrFailToQueueEvent + if err := resultToError(res); err != nil { + return err } case *tg.MessageActionSetMessagesTTL: res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ @@ -370,8 +367,8 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent }, }, }) - if !res.Success { - return ErrFailToQueueEvent + if err := resultToError(res); err != nil { + return err } // Send a notice about the TTL change @@ -387,8 +384,8 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent }, nil }, }) - if !res.Success { - return ErrFailToQueueEvent + if err := resultToError(res); err != nil { + return err } case *tg.MessageActionPhoneCall: var body strings.Builder @@ -430,8 +427,8 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent }, nil }, }) - if !res.Success { - return ErrFailToQueueEvent + if err := resultToError(res); err != nil { + return err } case *tg.MessageActionGroupCall: var body strings.Builder @@ -458,8 +455,8 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent }, nil }, }) - if !res.Success { - return ErrFailToQueueEvent + if err := resultToError(res); err != nil { + return err } case *tg.MessageActionInviteToGroupCall: var body, html strings.Builder @@ -508,8 +505,8 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent }, nil }, }) - if !res.Success { - return ErrFailToQueueEvent + if err := resultToError(res); err != nil { + return err } case *tg.MessageActionGroupCallScheduled: start := time.Unix(int64(action.ScheduleDate), 0) @@ -532,8 +529,8 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent }, nil }, }) - if !res.Success { - return ErrFailToQueueEvent + if err := resultToError(res); err != nil { + return err } // case *tg.MessageActionChatMigrateTo: @@ -728,8 +725,8 @@ func (t *TelegramClient) onDeleteMessages(ctx context.Context, channelID int64, }, TargetMessage: ids.MakeMessageID(channelID, messageID), }) - if !res.Success { - return ErrFailToQueueEvent + if err := resultToError(res); err != nil { + return err } } return nil @@ -884,11 +881,7 @@ func (t *TelegramClient) onMessageEdit(ctx context.Context, update IGetMessage) return &ce, nil }, }) - if !res.Success { - return ErrFailToQueueEvent - } - - return nil + return resultToError(res) } func (t *TelegramClient) handleTyping(portal networkid.PortalKey, sender bridgev2.EventSender, action tg.SendMessageActionClass) error { @@ -908,10 +901,7 @@ func (t *TelegramClient) handleTyping(portal networkid.PortalKey, sender bridgev }, Timeout: timeout, }) - if !res.Success { - return ErrFailToQueueEvent - } - return nil + return resultToError(res) } func (t *TelegramClient) updateReadReceipt(ctx context.Context, e tg.Entities, update *tg.UpdateReadHistoryOutbox) error { @@ -933,10 +923,7 @@ func (t *TelegramClient) updateReadReceipt(ctx context.Context, e tg.Entities, u LastTarget: ids.MakeMessageID(update.Peer, update.MaxID), ReadUpToStreamOrder: int64(update.MaxID), }) - if !res.Success { - return ErrFailToQueueEvent - } - return nil + return resultToError(res) } func (t *TelegramClient) onOwnReadReceipt(portalKey networkid.PortalKey, maxID int) error { @@ -949,10 +936,7 @@ func (t *TelegramClient) onOwnReadReceipt(portalKey networkid.PortalKey, maxID i LastTarget: ids.MakeMessageID(portalKey, maxID), ReadUpToStreamOrder: int64(maxID), }) - if !res.Success { - return ErrFailToQueueEvent - } - return nil + return resultToError(res) } func (t *TelegramClient) inputPeerForPortalID(ctx context.Context, portalID networkid.PortalID) (tg.InputPeerClass, error) { @@ -1138,10 +1122,7 @@ func (t *TelegramClient) onNotifySettings(ctx context.Context, e tg.Entities, up PortalKey: t.makePortalKeyFromPeer(update.Peer.(*tg.NotifyPeer).Peer), }, }) - if !res.Success { - return ErrFailToQueueEvent - } - return nil + return resultToError(res) } func (t *TelegramClient) HandleMute(ctx context.Context, msg *bridgev2.MatrixMute) error { @@ -1194,8 +1175,8 @@ func (t *TelegramClient) onPinnedDialogs(ctx context.Context, e tg.Entities, msg PortalKey: portalKey, }, }) - if !res.Success { - return ErrFailToQueueEvent + if err := resultToError(res); err != nil { + return err } } @@ -1213,8 +1194,8 @@ func (t *TelegramClient) onPinnedDialogs(ctx context.Context, e tg.Entities, msg PortalKey: portalKey, }, }) - if !res.Success { - return ErrFailToQueueEvent + if err := resultToError(res); err != nil { + return err } } @@ -1247,10 +1228,7 @@ func (t *TelegramClient) onChatDefaultBannedRights(ctx context.Context, entities PortalKey: t.makePortalKeyFromPeer(update.Peer), }, }) - if !res.Success { - return ErrFailToQueueEvent - } - return nil + return resultToError(res) } func (t *TelegramClient) onPeerBlocked(ctx context.Context, e tg.Entities, update *tg.UpdatePeerBlocked) error { @@ -1287,10 +1265,7 @@ func (t *TelegramClient) onPeerBlocked(ctx context.Context, e tg.Entities, updat PortalKey: t.makePortalKeyFromPeer(update.PeerID), }, }) - if !res.Success { - return ErrFailToQueueEvent - } - return nil + return resultToError(res) } func (t *TelegramClient) onChat(ctx context.Context, e tg.Entities, update *tg.UpdateChat) error { @@ -1333,8 +1308,5 @@ func (t *TelegramClient) onPhoneCall(ctx context.Context, e tg.Entities, update }, nil }, }) - if !res.Success { - return ErrFailToQueueEvent - } - return nil + return resultToError(res) }