From 0952df024406b34c16f84887099cf6a4ff2203bf Mon Sep 17 00:00:00 2001 From: Adam Van Ymeren Date: Thu, 26 Jun 2025 13:43:35 -0700 Subject: [PATCH] all: respect/propagate errors from QueueRemoteEvent (#110) --- go.mod | 4 +- go.sum | 8 +- pkg/connector/client.go | 14 +++- pkg/connector/reactions.go | 24 ++++-- pkg/connector/sync.go | 18 ++-- pkg/connector/telegram.go | 166 ++++++++++++++++++++++++++++--------- 6 files changed, 170 insertions(+), 64 deletions(-) diff --git a/go.mod b/go.mod index 00671e58..421d822a 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20250606033433-dcc06ee1d476 golang.org/x/net v0.41.0 - maunium.net/go/mautrix v0.24.2-0.20250617163829-26da46dbbf6e + maunium.net/go/mautrix v0.24.2-0.20250625103518-3a135b6b1586 ) require ( @@ -60,4 +60,4 @@ require ( rsc.io/qr v0.2.0 // indirect ) -replace github.com/gotd/td => github.com/beeper/td v0.107.1-0.20250606104440-4708e4d69efd +replace github.com/gotd/td => github.com/beeper/td v0.107.1-0.20250626202922-648509b041ae diff --git a/go.sum b/go.sum index 42bd52c4..f6d1b9ce 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,8 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= -github.com/beeper/td v0.107.1-0.20250606104440-4708e4d69efd h1:/j6+mjxV9m+jJUFOZ5eqKQKr6Jc6Un5Zf8ty+qCOYBk= -github.com/beeper/td v0.107.1-0.20250606104440-4708e4d69efd/go.mod h1:5Db4K0d5B+vGIBpNVv2F0ABM14f0PcRJ+RQlKRnUcZQ= +github.com/beeper/td v0.107.1-0.20250626202922-648509b041ae h1:RYdVR4MoYZdIpLztC+t0wDCwwRK95FbRl0+Ll6WCsFo= +github.com/beeper/td v0.107.1-0.20250626202922-648509b041ae/go.mod h1:5Db4K0d5B+vGIBpNVv2F0ABM14f0PcRJ+RQlKRnUcZQ= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/coder/websocket v1.8.12 h1:5bUXkEPPIbewrnkU8LTCLVaxi4N4J8ahufH2vlo4NAo= @@ -121,8 +121,8 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M= maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA= -maunium.net/go/mautrix v0.24.2-0.20250617163829-26da46dbbf6e h1:Y8kbRpPcKMZn2gIjUFd15xzMB5GJ2bS6ZcOfvlx4KnE= -maunium.net/go/mautrix v0.24.2-0.20250617163829-26da46dbbf6e/go.mod h1:Xy6o+pXmbqmgWsUWh15EQ1eozjC+k/VT/7kloByv9PI= +maunium.net/go/mautrix v0.24.2-0.20250625103518-3a135b6b1586 h1:l8X1JAvBV5E36nDmOEtB4LO6ZcbC3gV5rXeDGBYd2wc= +maunium.net/go/mautrix v0.24.2-0.20250625103518-3a135b6b1586/go.mod h1:Xy6o+pXmbqmgWsUWh15EQ1eozjC+k/VT/7kloByv9PI= nhooyr.io/websocket v1.8.17 h1:KEVeLJkUywCKVsnLIDlD/5gtayKp8VoCkksHCGGfT9Y= nhooyr.io/websocket v1.8.17/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= rsc.io/qr v0.2.0 h1:6vBLea5/NRMVTz8V66gipeLycZMl/+UlFmk8DvqQ6WY= diff --git a/pkg/connector/client.go b/pkg/connector/client.go index e7b926d3..61fabbea 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -53,7 +53,10 @@ import ( "go.mau.fi/mautrix-telegram/pkg/connector/util" ) -var ErrNoAuthKey = errors.New("user does not have auth key") +var ( + ErrNoAuthKey = errors.New("user does not have auth key") + ErrFailToQueueEvent = errors.New("failed to queue event") +) type TelegramClient struct { main *TelegramConnector @@ -237,8 +240,8 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge dispatcher.OnPhoneCall(client.onPhoneCall) client.updatesManager = updates.New(updates.Config{ - OnChannelTooLong: func(channelID int64) { - tc.Bridge.QueueRemoteEvent(login, &simplevent.ChatResync{ + OnChannelTooLong: func(channelID int64) error { + res := tc.Bridge.QueueRemoteEvent(login, &simplevent.ChatResync{ EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventChatResync, LogContext: func(c zerolog.Context) zerolog.Context { @@ -248,6 +251,11 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge }, CheckNeedsBackfillFunc: func(ctx context.Context, latestMessage *database.Message) (bool, error) { return true, nil }, }) + + if !res.Success { + return ErrFailToQueueEvent + } + return nil }, Handler: dispatcher, Logger: zaplog.Named("gaps"), diff --git a/pkg/connector/reactions.go b/pkg/connector/reactions.go index d0a8efe1..2e5930d0 100644 --- a/pkg/connector/reactions.go +++ b/pkg/connector/reactions.go @@ -105,7 +105,7 @@ func computeEmojiAndID(reaction tg.ReactionClass, customEmojis map[networkid.Emo return } -func (t *TelegramClient) handleTelegramReactions(ctx context.Context, msg *tg.Message) { +func (t *TelegramClient) handleTelegramReactions(ctx context.Context, msg *tg.Message) error { log := zerolog.Ctx(ctx).With(). Str("handler", "handle_telegram_reactions"). Int("message_id", msg.ID). @@ -113,16 +113,14 @@ func (t *TelegramClient) handleTelegramReactions(ctx context.Context, msg *tg.Me reactionsList, isFull, customEmojis, err := t.computeReactionsList(ctx, msg.PeerID, msg.ID, msg.Reactions) if err != nil { - log.Err(err).Msg("failed to compute reactions list") - return + return fmt.Errorf("failed to compute reactions: %w", err) } users := map[networkid.UserID]*bridgev2.ReactionSyncUser{} for _, reaction := range reactionsList { peer, ok := reaction.PeerID.(*tg.PeerUser) if !ok { - log.Error().Type("peer_id", reaction.PeerID).Msg("unknown peer type") - return + return fmt.Errorf("unknown peer type: %T", reaction.PeerID) } userID := ids.MakeUserID(peer.UserID) reactionLimit, err := t.getReactionLimit(ctx, userID) @@ -136,8 +134,7 @@ func (t *TelegramClient) handleTelegramReactions(ctx context.Context, msg *tg.Me emojiID, emoji, err := computeEmojiAndID(reaction.Reaction, customEmojis) if err != nil { - log.Err(err).Msg("failed to compute emoji and ID") - return + return fmt.Errorf("failed to compute emoji and ID: %w", err) } users[userID].Reactions = append(users[userID].Reactions, &bridgev2.BackfillReaction{ @@ -148,7 +145,7 @@ func (t *TelegramClient) handleTelegramReactions(ctx context.Context, msg *tg.Me }) } - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ReactionSync{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ReactionSync{ EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventReactionSync, LogContext: func(c zerolog.Context) zerolog.Context { @@ -159,6 +156,12 @@ func (t *TelegramClient) handleTelegramReactions(ctx context.Context, msg *tg.Me TargetMessage: ids.GetMessageIDFromMessage(msg), Reactions: &bridgev2.ReactionSyncData{Users: users, HasAllUsers: isFull}, }) + + if !res.Success { + return ErrFailToQueueEvent + } + + return nil } func splitDMReactionCounts(res []tg.ReactionCount, theirUserID, myUserID int64) (reactions []tg.MessagePeerReaction) { @@ -286,7 +289,7 @@ func (t *TelegramClient) pollForReactions(ctx context.Context, portalKey network Emoji: emoji, }) } - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ReactionSync{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ReactionSync{ EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventReactionSync, LogContext: func(c zerolog.Context) zerolog.Context { @@ -297,6 +300,9 @@ func (t *TelegramClient) pollForReactions(ctx context.Context, portalKey network TargetMessage: dbMsg.ID, Reactions: &bridgev2.ReactionSyncData{Users: users, HasAllUsers: isFull}, }) + if !res.Success { + return ErrFailToQueueEvent + } } else { log.Warn().Type("update_type", update).Msg("Unexpected update type in get reactions response") } diff --git a/pkg/connector/sync.go b/pkg/connector/sync.go index 2a7eeb85..cd6c7285 100644 --- a/pkg/connector/sync.go +++ b/pkg/connector/sync.go @@ -111,8 +111,7 @@ func (t *TelegramClient) handleDialogs(ctx context.Context, dialogs tg.ModifiedM portalKey := t.makePortalKeyFromPeer(dialog.GetPeer()) portal, err := t.main.Bridge.GetPortalByKey(ctx, portalKey) if err != nil { - log.Err(err).Msg("Failed to get portal") - continue + return err } if dialog.UnreadCount == 0 && !dialog.UnreadMark { portal.Metadata.(*PortalMetadata).ReadUpTo = dialog.TopMessage @@ -153,15 +152,14 @@ func (t *TelegramClient) handleDialogs(ctx context.Context, dialogs tg.ModifiedM return t.client.API().MessagesGetFullChat(ctx, chat.GetID()) }) if err != nil { - log.Err(err).Msg("Failed to get full chat") - continue + return err } chatFull, ok := fullChat.FullChat.(*tg.ChatFull) var avatar *bridgev2.Avatar - if ok { + if ok && chatFull.ChatPhoto != nil { avatar, err = t.convertPhoto(ctx, ids.PeerTypeChat, chatFull.ID, chatFull.ChatPhoto) if err != nil { - log.Err(err).Msg("Failed to convert group avatar") + return err } } @@ -198,7 +196,7 @@ func (t *TelegramClient) handleDialogs(ctx context.Context, dialogs tg.ModifiedM if photo, ok := fullChannel.GetPhoto().(*tg.ChatPhoto); ok { avatar, err = t.convertChatPhoto(ctx, fullChannel.ID, fullChannel.AccessHash, photo) if err != nil { - log.Err(err).Msg("Failed to convert channel avatar") + return err } } chatInfo = &bridgev2.ChatInfo{ @@ -255,7 +253,7 @@ func (t *TelegramClient) handleDialogs(ctx context.Context, dialogs tg.ModifiedM chatInfo.UserLocal.Tag = ptr.Ptr(event.RoomTagFavourite) } - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ ChatInfo: chatInfo, EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventChatResync, @@ -276,6 +274,10 @@ func (t *TelegramClient) handleDialogs(ctx context.Context, dialogs tg.ModifiedM return dialog.TopMessage > latestMessageID, nil }, }) + + if !res.Success { + return ErrFailToQueueEvent + } } return nil } diff --git a/pkg/connector/telegram.go b/pkg/connector/telegram.go index eb5c6b00..c5125b27 100644 --- a/pkg/connector/telegram.go +++ b/pkg/connector/telegram.go @@ -53,8 +53,8 @@ type IGetMessages interface { GetMessages() []int } -func (t *TelegramClient) selfLeaveChat(portalKey networkid.PortalKey) { - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatDelete{ +func (t *TelegramClient) selfLeaveChat(portalKey networkid.PortalKey) error { + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatDelete{ EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventChatDelete, LogContext: func(c zerolog.Context) zerolog.Context { @@ -65,6 +65,10 @@ func (t *TelegramClient) selfLeaveChat(portalKey networkid.PortalKey) { }, OnlyForMe: true, }) + if !res.Success { + return ErrFailToQueueEvent + } + return nil } func (t *TelegramClient) onUpdateChannel(ctx context.Context, e tg.Entities, update *tg.UpdateChannel) error { @@ -87,18 +91,17 @@ func (t *TelegramClient) onUpdateChannel(ctx context.Context, e tg.Entities, upd }) if err != nil { if tgerr.Is(err, tg.ErrChannelInvalid, tg.ErrChannelPrivate) { - t.selfLeaveChat(portalKey) - return nil + return t.selfLeaveChat(portalKey) } return fmt.Errorf("failed to get channel: %w", err) } else if len(chats.GetChats()) != 1 { return fmt.Errorf("expected 1 chat, got %d", len(chats.GetChats())) } else if channel, ok := chats.GetChats()[0].(*tg.Channel); !ok { log.Error().Type("chat_type", chats.GetChats()[0]).Msg("Expected channel, got something else. Leaving the channel.") - t.selfLeaveChat(portalKey) + return t.selfLeaveChat(portalKey) } else if channel.Left { log.Error().Msg("Update was for a left channel. Leaving the channel.") - t.selfLeaveChat(portalKey) + return t.selfLeaveChat(portalKey) } else { // TODO update the channel info } @@ -141,7 +144,7 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent log.Info().Int64("user_id", contact.UserID).Msg("received contact") } - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[*tg.Message]{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[*tg.Message]{ EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventMessage, LogContext: func(c zerolog.Context) zerolog.Context { @@ -163,7 +166,11 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent ConvertMessageFunc: t.convertToMatrixWithRefetch, }) - t.handleTelegramReactions(ctx, msg) + if !res.Success { + return ErrFailToQueueEvent + } + + return t.handleTelegramReactions(ctx, msg) case *tg.MessageService: sender := t.getEventSender(msg, false) @@ -183,29 +190,41 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent } switch action := msg.Action.(type) { case *tg.MessageActionChatEditTitle: - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{ EventMeta: eventMeta.WithType(bridgev2.RemoteEventChatInfoChange), ChatInfoChange: &bridgev2.ChatInfoChange{ChatInfo: &bridgev2.ChatInfo{Name: &action.Title}}, }) + if !res.Success { + return ErrFailToQueueEvent + } case *tg.MessageActionChatEditPhoto: switch peer := msg.PeerID.(type) { case *tg.PeerChat: - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{ EventMeta: eventMeta.WithType(bridgev2.RemoteEventChatInfoChange), ChatInfoChange: &bridgev2.ChatInfoChange{ChatInfo: &bridgev2.ChatInfo{Avatar: t.avatarFromPhoto(ctx, ids.PeerTypeChat, peer.ChatID, action.Photo)}}, }) + if !res.Success { + return ErrFailToQueueEvent + } case *tg.PeerChannel: - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{ EventMeta: eventMeta.WithType(bridgev2.RemoteEventChatInfoChange), ChatInfoChange: &bridgev2.ChatInfoChange{ChatInfo: &bridgev2.ChatInfo{Avatar: t.avatarFromPhoto(ctx, ids.PeerTypeChannel, peer.ChannelID, action.Photo)}}, }) + if !res.Success { + return ErrFailToQueueEvent + } } case *tg.MessageActionChatDeletePhoto: - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{ EventMeta: eventMeta.WithType(bridgev2.RemoteEventChatInfoChange), ChatInfoChange: &bridgev2.ChatInfoChange{ChatInfo: &bridgev2.ChatInfo{Avatar: &bridgev2.Avatar{Remove: true}}}, }) + if !res.Success { + return ErrFailToQueueEvent + } case *tg.MessageActionChatAddUser: memberChanges := &bridgev2.ChatMemberList{ MemberMap: map[networkid.UserID]bridgev2.ChatMember{}, @@ -216,12 +235,15 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent Membership: event.MembershipJoin, } } - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{ EventMeta: eventMeta.WithType(bridgev2.RemoteEventChatInfoChange), ChatInfoChange: &bridgev2.ChatInfoChange{MemberChanges: memberChanges}, }) + if !res.Success { + return ErrFailToQueueEvent + } case *tg.MessageActionChatJoinedByLink: - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{ EventMeta: eventMeta.WithType(bridgev2.RemoteEventChatInfoChange), ChatInfoChange: &bridgev2.ChatInfoChange{ MemberChanges: &bridgev2.ChatMemberList{ @@ -231,12 +253,14 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent }, }, }) + if !res.Success { + return ErrFailToQueueEvent + } case *tg.MessageActionChatDeleteUser: if action.UserID == t.telegramUserID { - t.selfLeaveChat(eventMeta.PortalKey) - return nil + return t.selfLeaveChat(eventMeta.PortalKey) } - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{ EventMeta: eventMeta.WithType(bridgev2.RemoteEventChatInfoChange), ChatInfoChange: &bridgev2.ChatInfoChange{ MemberChanges: &bridgev2.ChatMemberList{ @@ -249,6 +273,9 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent }, }, }) + if !res.Success { + return ErrFailToQueueEvent + } case *tg.MessageActionChatCreate: memberMap := map[networkid.UserID]bridgev2.ChatMember{} for _, userID := range action.Users { @@ -258,7 +285,7 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent } } - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ EventMeta: eventMeta. WithType(bridgev2.RemoteEventChatResync). WithCreatePortal(true), @@ -272,7 +299,10 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent CanBackfill: true, }, }) - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[any]{ + if !res.Success { + return ErrFailToQueueEvent + } + res = t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[any]{ EventMeta: eventMeta.WithType(bridgev2.RemoteEventMessage), ID: ids.GetMessageIDFromMessage(msg), ConvertMessageFunc: func(ctx context.Context, portal *bridgev2.Portal, intent bridgev2.MatrixAPI, data any) (*bridgev2.ConvertedMessage, error) { @@ -286,10 +316,13 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent }, nil }, }) + if !res.Success { + return ErrFailToQueueEvent + } case *tg.MessageActionChannelCreate: modLevel := 50 - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ EventMeta: eventMeta. WithType(bridgev2.RemoteEventChatResync). WithCreatePortal(true), @@ -310,7 +343,10 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent CanBackfill: true, }, }) - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[any]{ + if !res.Success { + return ErrFailToQueueEvent + } + res = t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[any]{ EventMeta: eventMeta.WithType(bridgev2.RemoteEventMessage), ID: ids.GetMessageIDFromMessage(msg), ConvertMessageFunc: func(ctx context.Context, portal *bridgev2.Portal, intent bridgev2.MatrixAPI, data any) (*bridgev2.ConvertedMessage, error) { @@ -324,8 +360,11 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent }, nil }, }) + if !res.Success { + return ErrFailToQueueEvent + } case *tg.MessageActionSetMessagesTTL: - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ EventMeta: eventMeta.WithType(bridgev2.RemoteEventChatResync), ChatInfo: &bridgev2.ChatInfo{ ExtraUpdates: func(ctx context.Context, p *bridgev2.Portal) bool { @@ -335,10 +374,13 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent }, }, }) + if !res.Success { + return ErrFailToQueueEvent + } // Send a notice about the TTL change content := bridgev2.DisappearingMessageNotice(time.Duration(action.Period)*time.Second, false) - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[any]{ + res = t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[any]{ EventMeta: eventMeta.WithType(bridgev2.RemoteEventMessage), ID: ids.GetMessageIDFromMessage(msg), ConvertMessageFunc: func(ctx context.Context, portal *bridgev2.Portal, intent bridgev2.MatrixAPI, data any) (*bridgev2.ConvertedMessage, error) { @@ -349,6 +391,9 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent }, nil }, }) + if !res.Success { + return ErrFailToQueueEvent + } case *tg.MessageActionPhoneCall: var body strings.Builder if action.Video { @@ -375,7 +420,7 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent body.WriteString(")") } - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[any]{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[any]{ EventMeta: eventMeta.WithType(bridgev2.RemoteEventMessage), ID: ids.GetMessageIDFromMessage(msg), ConvertMessageFunc: func(ctx context.Context, portal *bridgev2.Portal, intent bridgev2.MatrixAPI, data any) (*bridgev2.ConvertedMessage, error) { @@ -389,6 +434,9 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent }, nil }, }) + if !res.Success { + return ErrFailToQueueEvent + } case *tg.MessageActionGroupCall: var body strings.Builder if action.Duration == 0 { @@ -400,7 +448,7 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent body.WriteString(")") } - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[any]{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[any]{ EventMeta: eventMeta.WithType(bridgev2.RemoteEventMessage), ID: ids.GetMessageIDFromMessage(msg), ConvertMessageFunc: func(ctx context.Context, portal *bridgev2.Portal, intent bridgev2.MatrixAPI, data any) (*bridgev2.ConvertedMessage, error) { @@ -414,6 +462,9 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent }, nil }, }) + if !res.Success { + return ErrFailToQueueEvent + } case *tg.MessageActionInviteToGroupCall: var body, html strings.Builder var mentions event.Mentions @@ -441,7 +492,7 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent } body.WriteString(" to the video chat") html.WriteString(" to the video chat") - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[any]{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[any]{ EventMeta: eventMeta.WithType(bridgev2.RemoteEventMessage), ID: ids.GetMessageIDFromMessage(msg), ConvertMessageFunc: func(ctx context.Context, portal *bridgev2.Portal, intent bridgev2.MatrixAPI, data any) (*bridgev2.ConvertedMessage, error) { @@ -461,9 +512,12 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent }, nil }, }) + if !res.Success { + return ErrFailToQueueEvent + } case *tg.MessageActionGroupCallScheduled: start := time.Unix(int64(action.ScheduleDate), 0) - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[any]{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[any]{ EventMeta: eventMeta. WithType(bridgev2.RemoteEventMessage). WithSender(bridgev2.EventSender{}), // Telegram shows it as not coming from a specific user @@ -482,6 +536,9 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent }, nil }, }) + if !res.Success { + return ErrFailToQueueEvent + } // case *tg.MessageActionChatMigrateTo: // case *tg.MessageActionChannelMigrateFrom: @@ -652,7 +709,7 @@ func (t *TelegramClient) onDeleteMessages(ctx context.Context, channelID int64, } else { portalKey = t.makePortalKeyFromPeer(&tg.PeerChannel{ChannelID: channelID}) } - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.MessageRemove{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.MessageRemove{ EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventMessageRemove, LogContext: func(c zerolog.Context) zerolog.Context { @@ -665,6 +722,9 @@ func (t *TelegramClient) onDeleteMessages(ctx context.Context, channelID int64, }, TargetMessage: ids.MakeMessageID(channelID, messageID), }) + if !res.Success { + return ErrFailToQueueEvent + } } return nil } @@ -774,7 +834,7 @@ func (t *TelegramClient) onMessageEdit(ctx context.Context, update IGetMessage) } } - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[*tg.Message]{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[*tg.Message]{ EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventEdit, LogContext: func(c zerolog.Context) zerolog.Context { @@ -818,6 +878,9 @@ func (t *TelegramClient) onMessageEdit(ctx context.Context, update IGetMessage) return &ce, nil }, }) + if !res.Success { + return ErrFailToQueueEvent + } return nil } @@ -831,7 +894,7 @@ func (t *TelegramClient) handleTyping(portal networkid.PortalKey, sender bridgev timeout = 0 } // TODO send proper TypingTypes - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Typing{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Typing{ EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventTyping, PortalKey: portal, @@ -839,6 +902,9 @@ func (t *TelegramClient) handleTyping(portal networkid.PortalKey, sender bridgev }, Timeout: timeout, }) + if !res.Success { + return ErrFailToQueueEvent + } return nil } @@ -849,7 +915,7 @@ func (t *TelegramClient) updateReadReceipt(ctx context.Context, e tg.Entities, u // (they only say "someone read the message" and not who) return nil } - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Receipt{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Receipt{ EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventReadReceipt, PortalKey: t.makePortalKeyFromPeer(update.Peer), @@ -861,11 +927,14 @@ func (t *TelegramClient) updateReadReceipt(ctx context.Context, e tg.Entities, u LastTarget: ids.MakeMessageID(update.Peer, update.MaxID), ReadUpToStreamOrder: int64(update.MaxID), }) + if !res.Success { + return ErrFailToQueueEvent + } return nil } func (t *TelegramClient) onOwnReadReceipt(portalKey networkid.PortalKey, maxID int) error { - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Receipt{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Receipt{ EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventReadReceipt, PortalKey: portalKey, @@ -874,6 +943,9 @@ func (t *TelegramClient) onOwnReadReceipt(portalKey networkid.PortalKey, maxID i LastTarget: ids.MakeMessageID(portalKey, maxID), ReadUpToStreamOrder: int64(maxID), }) + if !res.Success { + return ErrFailToQueueEvent + } return nil } @@ -1025,7 +1097,7 @@ func (t *TelegramClient) onNotifySettings(ctx context.Context, e tg.Entities, up mutedUntil = &bridgev2.Unmuted } - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ ChatInfo: &bridgev2.ChatInfo{ UserLocal: &bridgev2.UserLocalPortalInfo{ MutedUntil: mutedUntil, @@ -1037,6 +1109,9 @@ func (t *TelegramClient) onNotifySettings(ctx context.Context, e tg.Entities, up PortalKey: t.makePortalKeyFromPeer(update.Peer.(*tg.NotifyPeer).Peer), }, }) + if !res.Success { + return ErrFailToQueueEvent + } return nil } @@ -1074,7 +1149,7 @@ func (t *TelegramClient) onPinnedDialogs(ctx context.Context, e tg.Entities, msg delete(needsUnpinning, portalKey) t.userLogin.Metadata.(*UserLoginMetadata).PinnedDialogs = append(t.userLogin.Metadata.(*UserLoginMetadata).PinnedDialogs, portalKey.ID) - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ ChatInfo: &bridgev2.ChatInfo{ UserLocal: &bridgev2.UserLocalPortalInfo{ Tag: ptr.Ptr(event.RoomTagFavourite), @@ -1086,11 +1161,14 @@ func (t *TelegramClient) onPinnedDialogs(ctx context.Context, e tg.Entities, msg PortalKey: portalKey, }, }) + if !res.Success { + return ErrFailToQueueEvent + } } var empty event.RoomTag for portalKey := range needsUnpinning { - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ ChatInfo: &bridgev2.ChatInfo{ UserLocal: &bridgev2.UserLocalPortalInfo{ Tag: &empty, @@ -1102,6 +1180,9 @@ func (t *TelegramClient) onPinnedDialogs(ctx context.Context, e tg.Entities, msg PortalKey: portalKey, }, }) + if !res.Success { + return ErrFailToQueueEvent + } } return t.userLogin.Save(ctx) @@ -1121,7 +1202,7 @@ func (t *TelegramClient) HandleRoomTag(ctx context.Context, msg *bridgev2.Matrix } func (t *TelegramClient) onChatDefaultBannedRights(ctx context.Context, entities tg.Entities, update *tg.UpdateChatDefaultBannedRights) error { - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ ChatInfo: &bridgev2.ChatInfo{ Members: &bridgev2.ChatMemberList{ PowerLevels: t.getPowerLevelOverridesFromBannedRights(entities.Chats[0], update.DefaultBannedRights), @@ -1133,6 +1214,9 @@ func (t *TelegramClient) onChatDefaultBannedRights(ctx context.Context, entities PortalKey: t.makePortalKeyFromPeer(update.Peer), }, }) + if !res.Success { + return ErrFailToQueueEvent + } return nil } @@ -1158,7 +1242,7 @@ func (t *TelegramClient) onPeerBlocked(ctx context.Context, e tg.Entities, updat }) // Find portals that are DMs with the user - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ ChatInfo: &bridgev2.ChatInfo{ Members: &bridgev2.ChatMemberList{ PowerLevels: t.getDMPowerLevels(ghost), @@ -1170,6 +1254,9 @@ func (t *TelegramClient) onPeerBlocked(ctx context.Context, e tg.Entities, updat PortalKey: t.makePortalKeyFromPeer(update.PeerID), }, }) + if !res.Success { + return ErrFailToQueueEvent + } return nil } @@ -1198,7 +1285,7 @@ func (t *TelegramClient) onPhoneCall(ctx context.Context, e tg.Entities, update } else { body.WriteString("call") } - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[any]{ + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[any]{ EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventMessage, PortalKey: t.makePortalKeyFromID(ids.PeerTypeUser, call.AdminID), @@ -1217,5 +1304,8 @@ func (t *TelegramClient) onPhoneCall(ctx context.Context, e tg.Entities, update }, nil }, }) + if !res.Success { + return ErrFailToQueueEvent + } return nil }