diff --git a/go.mod b/go.mod index 50d595b8..b0918308 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 golang.org/x/net v0.27.0 - maunium.net/go/mautrix v0.19.1-0.20240805194656-9fffe6e54d7e + maunium.net/go/mautrix v0.19.1-0.20240806155836-f6b0feab9566 ) require ( diff --git a/go.sum b/go.sum index 01ea65f9..ee2d430f 100644 --- a/go.sum +++ b/go.sum @@ -112,8 +112,8 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M= maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA= -maunium.net/go/mautrix v0.19.1-0.20240805194656-9fffe6e54d7e h1:OE04/iUnv8oG7UvjzMk40vkkrJvXs3NfbgvdJjAbsFg= -maunium.net/go/mautrix v0.19.1-0.20240805194656-9fffe6e54d7e/go.mod h1:ZWyxoQxRTBxzWIMs0kQCVogZIY0clTu33h102veCT/Q= +maunium.net/go/mautrix v0.19.1-0.20240806155836-f6b0feab9566 h1:3cp7ffpnUyViQDaXoPvw0Pq+0ax4toN4J4OLPLJs59Q= +maunium.net/go/mautrix v0.19.1-0.20240806155836-f6b0feab9566/go.mod h1:ZWyxoQxRTBxzWIMs0kQCVogZIY0clTu33h102veCT/Q= nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0= nhooyr.io/websocket v1.8.11/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= rsc.io/qr v0.2.0 h1:6vBLea5/NRMVTz8V66gipeLycZMl/+UlFmk8DvqQ6WY= diff --git a/pkg/connector/client.go b/pkg/connector/client.go index 63e012e0..d02f4b11 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -8,7 +8,6 @@ import ( "slices" "strconv" "strings" - "sync" "github.com/gotd/td/telegram" "github.com/gotd/td/telegram/updates" @@ -39,8 +38,6 @@ type TelegramClient struct { client *telegram.Client clientCancel context.CancelFunc - reactionMessageLocks map[int]*sync.Mutex - appConfig map[string]any appConfigHash int @@ -151,7 +148,6 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge UpdateHandler: updatesManager, }) client.clientCancel, err = connectTelegramClient(ctx, client.client) - client.reactionMessageLocks = map[int]*sync.Mutex{} client.telegramFmtParams = &telegramfmt.FormatParams{ GetUserInfoByID: func(ctx context.Context, id int64) (telegramfmt.UserInfo, error) { diff --git a/pkg/connector/telegram.go b/pkg/connector/telegram.go index 19fcf20a..6ea7eb78 100644 --- a/pkg/connector/telegram.go +++ b/pkg/connector/telegram.go @@ -4,8 +4,6 @@ import ( "bytes" "context" "fmt" - "slices" - "sync" "time" "github.com/gotd/td/tg" @@ -13,6 +11,7 @@ import ( "maunium.net/go/mautrix/bridgev2" "maunium.net/go/mautrix/bridgev2/database" "maunium.net/go/mautrix/bridgev2/networkid" + "maunium.net/go/mautrix/bridgev2/simplevent" "go.mau.fi/mautrix-telegram/pkg/connector/emojis" "go.mau.fi/mautrix-telegram/pkg/connector/ids" @@ -314,14 +313,6 @@ func (t *TelegramClient) handleTelegramReactions(ctx context.Context, msg *tg.Me } } - if _, ok := t.reactionMessageLocks[msg.ID]; !ok { - t.reactionMessageLocks[msg.ID] = &sync.Mutex{} - } - t.reactionMessageLocks[msg.ID].Lock() - defer t.reactionMessageLocks[msg.ID].Unlock() - - isFull := len(reactionsList) == totalCount - reactions := map[networkid.UserID][]tg.MessagePeerReaction{} var customEmojiIDs []int64 for _, reaction := range reactionsList { if e, ok := reaction.Reaction.(*tg.ReactionCustomEmoji); ok { @@ -330,19 +321,68 @@ func (t *TelegramClient) handleTelegramReactions(ctx context.Context, msg *tg.Me log.Error().Type("reaction", reaction.Reaction).Msg("unknown reaction type") return } + } - if p, ok := reaction.PeerID.(*tg.PeerUser); !ok { + customEmojis, err := t.transferEmojisToMatrix(ctx, customEmojiIDs) + if err != nil { + log.Err(err).Msg("failed to transfer emojis") + return + } + + isFull := len(reactionsList) == totalCount + users := map[networkid.UserID]*bridgev2.ReactionSyncUser{} + for _, reaction := range reactionsList { + peer, ok := reaction.PeerID.(*tg.PeerUser) + if !ok { log.Error().Type("peer_id", reaction.PeerID).Msg("unknown peer type") return - } else { - reactions[ids.MakeUserID(p.UserID)] = append(reactions[ids.MakeUserID(p.UserID)], reaction) } + userID := ids.MakeUserID(peer.UserID) + reactionLimit, err := t.getReactionLimit(ctx, userID) + if err != nil { + reactionLimit = 1 + log.Err(err).Int64("id", peer.UserID).Msg("failed to get reaction limit") + } + if _, ok := users[userID]; !ok { + users[userID] = &bridgev2.ReactionSyncUser{HasAllReactions: isFull, MaxCount: reactionLimit} + } + + var emojiID networkid.EmojiID + var emoji string + if r, ok := reaction.Reaction.(*tg.ReactionCustomEmoji); ok { + emojiID = ids.MakeEmojiIDFromDocumentID(r.DocumentID) + emoji = customEmojis[emojiID] + } else if r, ok := reaction.Reaction.(*tg.ReactionEmoji); ok { + emojiID = ids.MakeEmojiIDFromEmoticon(r.Emoticon) + emoji = r.Emoticon + } else { + log.Error().Type("reaction_type", reaction.Reaction).Msg("invalid reaction type") + return + } + + users[userID].Reactions = append(users[userID].Reactions, &bridgev2.BackfillReaction{ + Timestamp: time.Unix(int64(reaction.Date), 0), + Sender: bridgev2.EventSender{ + IsFromMe: reaction.My, + SenderLogin: ids.MakeUserLoginID(peer.UserID), + Sender: userID, + }, + EmojiID: emojiID, + Emoji: emoji, + }) } - err = t.handleTelegramParsedReactionsLocked(ctx, dbMsg, reactions, customEmojiIDs, isFull, nil, nil) - if err != nil { - log.Err(err).Msg("failed to handle reactions") - } + t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ReactionSync{ + EventMeta: simplevent.EventMeta{ + Type: bridgev2.RemoteEventReactionSync, + LogContext: func(c zerolog.Context) zerolog.Context { + return c.Str("message_id", string(msg.ID)) + }, + PortalKey: dbMsg.Room, + }, + TargetMessage: dbMsg.ID, + Reactions: &bridgev2.ReactionSyncData{Users: users, HasAllUsers: isFull}, + }) } func (t *TelegramClient) inputPeerForPortalID(ctx context.Context, portalID networkid.PortalID) (tg.InputPeerClass, error) { @@ -451,130 +491,3 @@ func (t *TelegramClient) transferEmojisToMatrix(ctx context.Context, customEmoji } return } - -func (t *TelegramClient) handleTelegramParsedReactionsLocked(ctx context.Context, msg *database.Message, reactions map[networkid.UserID][]tg.MessagePeerReaction, customEmojiIDs []int64, isFull bool, onlyUserID *networkid.UserID, timestamp *time.Time) error { - customEmojis, err := t.transferEmojisToMatrix(ctx, customEmojiIDs) - if err != nil { - return err - } - - existingReactions, err := t.main.Bridge.DB.Reaction.GetAllToMessage(ctx, msg.ID) - if err != nil { - return err - } - - var removed []*database.Reaction - for _, existing := range existingReactions { - if onlyUserID != nil && existing.SenderID != *onlyUserID { - continue - } - var matched bool - reactions[existing.SenderID], matched, err = reactionsFilter(reactions[existing.SenderID], existing) - if err != nil { - return err - } else if !matched { - if isFull { - removed = append(removed, existing) - } else if reactionLimit, err := t.getReactionLimit(ctx, existing.SenderID); err != nil { - return err - } else if len(reactions[existing.SenderID]) >= reactionLimit { - removed = append(removed, existing) - } - } - } - - for sender, reactions := range reactions { - senderID, err := ids.ParseUserID(sender) - if err != nil { - return err - } - - for _, reaction := range reactions { - var emojiID networkid.EmojiID - var emoji string - if r, ok := reaction.Reaction.(*tg.ReactionCustomEmoji); ok { - emojiID = ids.MakeEmojiIDFromDocumentID(r.DocumentID) - emoji = customEmojis[emojiID] - } else if r, ok := reaction.Reaction.(*tg.ReactionEmoji); ok { - emojiID = ids.MakeEmojiIDFromEmoticon(r.Emoticon) - emoji = r.Emoticon - } else { - return fmt.Errorf("invalid reaction type %T", reaction.Reaction) - } - - evt := &bridgev2.SimpleRemoteEvent[any]{ - Type: bridgev2.RemoteEventReaction, - LogContext: func(c zerolog.Context) zerolog.Context { - return c. - Any("reaction", reaction.Reaction). - Str("sender_id", string(sender)). - Str("message_id", string(msg.ID)) - }, - Sender: bridgev2.EventSender{ - IsFromMe: reaction.My, - SenderLogin: ids.MakeUserLoginID(senderID), - Sender: sender, - }, - PortalKey: msg.Room, - TargetMessage: msg.ID, - EmojiID: emojiID, - Emoji: emoji, - } - if timestamp != nil { - evt.Timestamp = *timestamp - } - t.main.Bridge.QueueRemoteEvent(t.userLogin, evt) - } - } - - for _, r := range removed { - senderID, err := ids.ParseUserID(r.SenderID) - if err != nil { - return err - } - evt := &bridgev2.SimpleRemoteEvent[any]{ - Type: bridgev2.RemoteEventReactionRemove, - LogContext: func(c zerolog.Context) zerolog.Context { - return c. - Str("emoji_id", string(r.EmojiID)). - Str("sender_id", string(r.SenderID)). - Str("message_id", string(msg.ID)) - }, - Sender: bridgev2.EventSender{ - IsFromMe: t.userID == r.SenderID, - SenderLogin: ids.MakeUserLoginID(senderID), - Sender: r.SenderID, - }, - PortalKey: msg.Room, - TargetMessage: r.MessageID, - EmojiID: r.EmojiID, - } - if timestamp != nil { - evt.Timestamp = *timestamp - } - t.main.Bridge.QueueRemoteEvent(t.userLogin, evt) - } - - return nil -} - -func reactionsFilter(reactions []tg.MessagePeerReaction, existing *database.Reaction) (newReactions []tg.MessagePeerReaction, matched bool, err error) { - if len(reactions) == 0 { - return nil, false, nil - } - - documentID, emoticon, err := ids.ParseEmojiID(existing.EmojiID) - if err != nil { - return nil, false, err - } - - newReactions = slices.DeleteFunc(reactions, func(r tg.MessagePeerReaction) bool { - if rce, ok := r.Reaction.(*tg.ReactionCustomEmoji); ok { - return documentID == rce.DocumentID - } else if r, ok := r.Reaction.(*tg.ReactionEmoji); ok { - return emoticon == r.Emoticon - } - return false - }) - return newReactions, len(newReactions) < len(reactions), nil -}