From e266d1ac800eed157ea1204648cab9eb6ee9a644 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Thu, 24 Oct 2024 12:36:18 -0600 Subject: [PATCH] reactions: poll for reactions on read receipt Signed-off-by: Sumner Evans --- go.mod | 2 +- go.sum | 4 +- pkg/connector/backfill.go | 43 +++++++------ pkg/connector/client.go | 5 ++ pkg/connector/matrix.go | 39 ++++++++++-- pkg/connector/reactions.go | 121 +++++++++++++++++++++++++++++++++---- pkg/connector/telegram.go | 3 + 7 files changed, 175 insertions(+), 42 deletions(-) diff --git a/go.mod b/go.mod index 71fcfaa6..b352f23f 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c golang.org/x/net v0.30.0 - maunium.net/go/mautrix v0.21.2-0.20241022095053-8a8163106d95 + maunium.net/go/mautrix v0.21.2-0.20241023204042-6fd4b8a2132d ) require ( diff --git a/go.sum b/go.sum index b00b2f53..5881b718 100644 --- a/go.sum +++ b/go.sum @@ -117,8 +117,8 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M= maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA= -maunium.net/go/mautrix v0.21.2-0.20241022095053-8a8163106d95 h1:/y/+rB6JduDIjS0SD4STxwE75IcqIwFfUaWocFifws8= -maunium.net/go/mautrix v0.21.2-0.20241022095053-8a8163106d95/go.mod h1:sjCZR1R/3NET/WjkcXPL6WpAHlWKku9HjRsdOkbM8Qw= +maunium.net/go/mautrix v0.21.2-0.20241023204042-6fd4b8a2132d h1:pW2F/uX9eqziumLBDiFAx2XwfiwPuKI6XyKqOkRDNCk= +maunium.net/go/mautrix v0.21.2-0.20241023204042-6fd4b8a2132d/go.mod h1:sjCZR1R/3NET/WjkcXPL6WpAHlWKku9HjRsdOkbM8Qw= nhooyr.io/websocket v1.8.17 h1:KEVeLJkUywCKVsnLIDlD/5gtayKp8VoCkksHCGGfT9Y= nhooyr.io/websocket v1.8.17/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= rsc.io/qr v0.2.0 h1:6vBLea5/NRMVTz8V66gipeLycZMl/+UlFmk8DvqQ6WY= diff --git a/pkg/connector/backfill.go b/pkg/connector/backfill.go index c847af18..46e6dbe7 100644 --- a/pkg/connector/backfill.go +++ b/pkg/connector/backfill.go @@ -263,11 +263,11 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2 break } - if msg.TypeID() != tg.MessageTypeID { + message, ok := msg.(*tg.Message) + if !ok { log.Warn().Str("type", msg.TypeName()).Msg("skipping backfilling unsupported message type") continue } - message := msg.(*tg.Message) sender := t.getEventSender(message) intent := portal.GetIntentFor(ctx, sender, t.userLogin, bridgev2.RemoteEventBackfill) @@ -275,10 +275,6 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2 if err != nil { return nil, err } - reactionsList, _, customEmojis, err := t.computeReactionsList(ctx, message) - if err != nil { - return nil, err - } backfillMessage := bridgev2.BackfillMessage{ ConvertedMessage: converted, @@ -287,23 +283,30 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2 Timestamp: time.Unix(int64(message.Date), 0), } - for _, reaction := range reactionsList { - peer, ok := reaction.PeerID.(*tg.PeerUser) - if !ok { - return nil, fmt.Errorf("unknown peer type %T", reaction.PeerID) - } - - emojiID, emoji, err := computeEmojiAndID(reaction.Reaction, customEmojis) + if reactions, ok := message.GetReactions(); ok { + reactionsList, _, customEmojis, err := t.computeReactionsList(ctx, message.PeerID, message.ID, reactions) if err != nil { - return nil, fmt.Errorf("failed to compute emoji and ID: %w", err) + return nil, err } - backfillMessage.Reactions = append(backfillMessage.Reactions, &bridgev2.BackfillReaction{ - Timestamp: time.Unix(int64(reaction.Date), 0), - Sender: t.senderForUserID(peer.UserID), - EmojiID: emojiID, - Emoji: emoji, - }) + for _, reaction := range reactionsList { + peer, ok := reaction.PeerID.(*tg.PeerUser) + if !ok { + return nil, fmt.Errorf("unknown peer type %T", reaction.PeerID) + } + + emojiID, emoji, err := computeEmojiAndID(reaction.Reaction, customEmojis) + if err != nil { + return nil, fmt.Errorf("failed to compute emoji and ID: %w", err) + } + + backfillMessage.Reactions = append(backfillMessage.Reactions, &bridgev2.BackfillReaction{ + Timestamp: time.Unix(int64(reaction.Date), 0), + Sender: t.senderForUserID(peer.UserID), + EmojiID: emojiID, + Emoji: emoji, + }) + } } backfillMessages = append(backfillMessages, &backfillMessage) diff --git a/pkg/connector/client.go b/pkg/connector/client.go index 444941b7..a6744698 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -65,6 +65,8 @@ type TelegramClient struct { takeoutDialogsOnce sync.Once activeCalls map[int64]networkid.PortalKey + + prevReactionPoll map[networkid.PortalKey]time.Time } var ( @@ -133,6 +135,9 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge userLogin: login, takeoutAccepted: exsync.NewEvent(), + + activeCalls: map[int64]networkid.PortalKey{}, + prevReactionPoll: map[networkid.PortalKey]time.Time{}, } dispatcher := UpdateDispatcher{ UpdateDispatcher: tg.NewUpdateDispatcher(), diff --git a/pkg/connector/matrix.go b/pkg/connector/matrix.go index 508d3e89..4db72701 100644 --- a/pkg/connector/matrix.go +++ b/pkg/connector/matrix.go @@ -455,7 +455,12 @@ func (t *TelegramClient) HandleMatrixReactionRemove(ctx context.Context, msg *br } func (t *TelegramClient) HandleMatrixReadReceipt(ctx context.Context, msg *bridgev2.MatrixReadReceipt) error { - peerType, id, parseErr := ids.ParsePortalID(msg.Portal.ID) + log := zerolog.Ctx(ctx).With(). + Str("action", "handle_matrix_read_receipt"). + Str("portal_id", string(msg.Portal.ID)). + Bool("is_supergroup", msg.Portal.Metadata.(*PortalMetadata).IsSuperGroup). + Logger() + peerType, portalID, parseErr := ids.ParsePortalID(msg.Portal.ID) if parseErr != nil { return parseErr } @@ -464,7 +469,7 @@ func (t *TelegramClient) HandleMatrixReadReceipt(ctx context.Context, msg *bridg return parseErr } - var readMentionsErr, readReactionsErr, readMessagesErr error + var readMentionsErr, readReactionsErr, readMessagesErr, reactionPollErr error var wg sync.WaitGroup // Read mentions @@ -514,22 +519,44 @@ func (t *TelegramClient) HandleMatrixReadReceipt(ctx context.Context, msg *bridg }) case ids.PeerTypeChannel: var accessHash int64 - accessHash, readMessagesErr = t.ScopedStore.GetAccessHash(ctx, ids.PeerTypeChannel, id) + accessHash, readMessagesErr = t.ScopedStore.GetAccessHash(ctx, ids.PeerTypeChannel, portalID) if readMessagesErr != nil { return } _, readMessagesErr = t.client.API().ChannelsReadHistory(ctx, &tg.ChannelsReadHistoryRequest{ - Channel: &tg.InputChannel{ChannelID: id, AccessHash: accessHash}, + Channel: &tg.InputChannel{ChannelID: portalID, AccessHash: accessHash}, }) + + if !msg.Portal.Metadata.(*PortalMetadata).IsSuperGroup { + // TODO handle sponsored message read receipts + } default: readMessagesErr = fmt.Errorf("unknown peer type %s", peerType) } }() - // TODO handle sponsored message read receipts + // Poll for reactions + wg.Add(1) + go func() { + defer wg.Done() + if peerType != ids.PeerTypeChannel || msg.Portal.Metadata.(*PortalMetadata).IsSuperGroup { + log.Debug().Msg("Not polling reactions because peer is not a channel or is a super-group") + return + } + + // If it hasn't been 20 seconds since the last poll, skip + now := time.Now() + if prev, ok := t.prevReactionPoll[msg.Portal.PortalKey]; ok && now.Before(prev.Add(20*time.Second)) { + log.Debug().Msg("Not polling reactions because last poll was less than 20 seconds ago") + return + } + t.prevReactionPoll[msg.Portal.PortalKey] = now + + reactionPollErr = t.pollForReactions(ctx, msg.Portal.PortalKey, inputPeer) + }() wg.Wait() - return errors.Join(readMentionsErr, readReactionsErr, readMessagesErr) + return errors.Join(readMentionsErr, readReactionsErr, readMessagesErr, reactionPollErr) } func (t *TelegramClient) HandleMatrixTyping(ctx context.Context, msg *bridgev2.MatrixTyping) error { diff --git a/pkg/connector/reactions.go b/pkg/connector/reactions.go index 4e566e47..57c6bcf7 100644 --- a/pkg/connector/reactions.go +++ b/pkg/connector/reactions.go @@ -14,19 +14,15 @@ import ( "go.mau.fi/mautrix-telegram/pkg/connector/ids" ) -func (t *TelegramClient) computeReactionsList(ctx context.Context, msg *tg.Message) (reactions []tg.MessagePeerReaction, isFull bool, customEmojis map[networkid.EmojiID]string, err error) { +func (t *TelegramClient) computeReactionsList(ctx context.Context, peer tg.PeerClass, msgID int, msgReactions tg.MessageReactions) (reactions []tg.MessagePeerReaction, isFull bool, customEmojis map[networkid.EmojiID]string, err error) { log := zerolog.Ctx(ctx).With().Str("fn", "computeReactionsList").Logger() - if _, set := msg.GetReactions(); !set { - return - } - var totalCount int - for _, r := range msg.Reactions.Results { + for _, r := range msgReactions.Results { totalCount += r.Count } - reactionsList := msg.Reactions.RecentReactions - if totalCount > 0 && len(reactionsList) == 0 && !msg.Reactions.CanSeeList { + reactionsList := msgReactions.RecentReactions + if totalCount > 0 && len(reactionsList) == 0 && !msgReactions.CanSeeList { // We don't know who reacted in a channel, so we can't bridge it properly either log.Warn().Msg("Can't see reaction list in channel") return @@ -39,8 +35,8 @@ func (t *TelegramClient) computeReactionsList(ctx context.Context, msg *tg.Messa // # return if len(reactionsList) < totalCount { - if user, ok := msg.PeerID.(*tg.PeerUser); ok { - reactionsList = splitDMReactionCounts(msg.Reactions.Results, user.UserID, t.telegramUserID) + if user, ok := peer.(*tg.PeerUser); ok { + reactionsList = splitDMReactionCounts(msgReactions.Results, user.UserID, t.telegramUserID) // TODO // } else if t.isBot { @@ -48,12 +44,12 @@ func (t *TelegramClient) computeReactionsList(ctx context.Context, msg *tg.Messa // return // TODO should calls to this be limited? - } else if peer, err := t.inputPeerForPortalID(ctx, t.makePortalKeyFromPeer(msg.PeerID).ID); err != nil { + } else if peer, err := t.inputPeerForPortalID(ctx, t.makePortalKeyFromPeer(peer).ID); err != nil { return nil, false, nil, fmt.Errorf("failed to get input peer: %w", err) } else { reactions, err := APICallWithUpdates(ctx, t, func() (*tg.MessagesMessageReactionsList, error) { return t.client.API().MessagesGetMessageReactionsList(ctx, &tg.MessagesGetMessageReactionsListRequest{ - Peer: peer, ID: msg.ID, Limit: 100, + Peer: peer, ID: msgID, Limit: 100, }) }) if err != nil { @@ -104,7 +100,7 @@ func (t *TelegramClient) handleTelegramReactions(ctx context.Context, msg *tg.Me return } - reactionsList, isFull, customEmojis, err := t.computeReactionsList(ctx, msg) + reactionsList, isFull, customEmojis, err := t.computeReactionsList(ctx, msg.PeerID, msg.ID, msg.Reactions) if err != nil { log.Err(err).Msg("failed to compute reactions list") return @@ -197,3 +193,102 @@ func (t *TelegramClient) getReactionLimit(ctx context.Context, sender networkid. } } } + +func (t *TelegramClient) pollForReactions(ctx context.Context, portalKey networkid.PortalKey, inputPeer tg.InputPeerClass) error { + log := zerolog.Ctx(ctx).With(). + Stringer("portal_key", portalKey). + Str("action", "poll_for_reactions"). + Logger() + + log.Debug().Msg("Polling reactions for recent messages") + + messages, err := t.main.Bridge.DB.Message.GetLastNInPortal(ctx, portalKey, 20) + if err != nil { + return err + } + + messageIDs := make([]int, len(messages)) + for i, msg := range messages { + _, messageIDs[i], err = ids.ParseMessageID(msg.ID) + if err != nil { + return err + } + } + + updates, err := APICallWithUpdates(ctx, t, func() (*tg.Updates, error) { + u, err := t.client.API().MessagesGetMessagesReactions(ctx, &tg.MessagesGetMessagesReactionsRequest{ + Peer: inputPeer, + ID: messageIDs, + }) + if err != nil { + return nil, err + } + if updates, ok := u.(*tg.Updates); ok { + return updates, nil + } else { + return nil, fmt.Errorf("unexpected updates type %T", u) + } + }) + if err != nil { + return fmt.Errorf("failed to get messages reactions: %w", err) + } + + for _, update := range updates.Updates { + if reaction, ok := update.(*tg.UpdateMessageReactions); ok { + dbMsg, err := t.main.Bridge.DB.Message.GetFirstPartByID(ctx, t.loginID, ids.MakeMessageID(portalKey, reaction.MsgID)) + if err != nil { + return fmt.Errorf("failed to get message from database: %w", err) + } else if dbMsg == nil { + return fmt.Errorf("message not found in database: %w", err) + } + + reactionsList, isFull, customEmojis, err := t.computeReactionsList(ctx, reaction.Peer, reaction.MsgID, reaction.Reactions) + if err != nil { + return fmt.Errorf("failed to compute reactions list: %w", err) + } + + users := map[networkid.UserID]*bridgev2.ReactionSyncUser{} + for _, reaction := range reactionsList { + peer, ok := reaction.PeerID.(*tg.PeerUser) + if !ok { + return fmt.Errorf("unknown peer type %T", reaction.PeerID) + } + userID := ids.MakeUserID(peer.UserID) + reactionLimit, err := t.getReactionLimit(ctx, userID) + if err != nil { + reactionLimit = 1 + log.Err(err).Int64("id", peer.UserID).Msg("failed to get reaction limit") + } + if _, ok := users[userID]; !ok { + users[userID] = &bridgev2.ReactionSyncUser{HasAllReactions: isFull, MaxCount: reactionLimit} + } + + emojiID, emoji, err := computeEmojiAndID(reaction.Reaction, customEmojis) + if err != nil { + return fmt.Errorf("failed to compute emoji and ID: %w", err) + } + + users[userID].Reactions = append(users[userID].Reactions, &bridgev2.BackfillReaction{ + Timestamp: time.Unix(int64(reaction.Date), 0), + Sender: t.senderForUserID(peer.UserID), + EmojiID: emojiID, + Emoji: emoji, + }) + } + t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ReactionSync{ + EventMeta: simplevent.EventMeta{ + Type: bridgev2.RemoteEventReactionSync, + LogContext: func(c zerolog.Context) zerolog.Context { + return c.Int("message_id", reaction.MsgID) + }, + PortalKey: dbMsg.Room, + }, + TargetMessage: dbMsg.ID, + Reactions: &bridgev2.ReactionSyncData{Users: users, HasAllUsers: isFull}, + }) + } else { + log.Warn().Type("update_type", update).Msg("Unexpected update type in get reactions response") + } + } + return nil +} diff --git a/pkg/connector/telegram.go b/pkg/connector/telegram.go index 19514e04..f570cd37 100644 --- a/pkg/connector/telegram.go +++ b/pkg/connector/telegram.go @@ -584,6 +584,9 @@ func (t *TelegramClient) onMessageEdit(ctx context.Context, update IGetMessage) ce.ModifiedParts = append(ce.ModifiedParts, part.ToEditPart(existing[i])) } } + if len(ce.ModifiedParts) == 0 { + return nil, bridgev2.ErrIgnoringRemoteEvent + } return &ce, nil }, })