reactions: use ReactionSync event

Signed-off-by: Sumner Evans <sumner.evans@automattic.com>
This commit is contained in:
Sumner Evans
2024-08-05 14:45:15 -06:00
parent f56f520308
commit 18337c6941
4 changed files with 60 additions and 151 deletions
+1 -1
View File
@@ -11,7 +11,7 @@ require (
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
golang.org/x/net v0.27.0
maunium.net/go/mautrix v0.19.1-0.20240805194656-9fffe6e54d7e
maunium.net/go/mautrix v0.19.1-0.20240806155836-f6b0feab9566
)
require (
+2 -2
View File
@@ -112,8 +112,8 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M=
maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA=
maunium.net/go/mautrix v0.19.1-0.20240805194656-9fffe6e54d7e h1:OE04/iUnv8oG7UvjzMk40vkkrJvXs3NfbgvdJjAbsFg=
maunium.net/go/mautrix v0.19.1-0.20240805194656-9fffe6e54d7e/go.mod h1:ZWyxoQxRTBxzWIMs0kQCVogZIY0clTu33h102veCT/Q=
maunium.net/go/mautrix v0.19.1-0.20240806155836-f6b0feab9566 h1:3cp7ffpnUyViQDaXoPvw0Pq+0ax4toN4J4OLPLJs59Q=
maunium.net/go/mautrix v0.19.1-0.20240806155836-f6b0feab9566/go.mod h1:ZWyxoQxRTBxzWIMs0kQCVogZIY0clTu33h102veCT/Q=
nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0=
nhooyr.io/websocket v1.8.11/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c=
rsc.io/qr v0.2.0 h1:6vBLea5/NRMVTz8V66gipeLycZMl/+UlFmk8DvqQ6WY=
-4
View File
@@ -8,7 +8,6 @@ import (
"slices"
"strconv"
"strings"
"sync"
"github.com/gotd/td/telegram"
"github.com/gotd/td/telegram/updates"
@@ -39,8 +38,6 @@ type TelegramClient struct {
client *telegram.Client
clientCancel context.CancelFunc
reactionMessageLocks map[int]*sync.Mutex
appConfig map[string]any
appConfigHash int
@@ -151,7 +148,6 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge
UpdateHandler: updatesManager,
})
client.clientCancel, err = connectTelegramClient(ctx, client.client)
client.reactionMessageLocks = map[int]*sync.Mutex{}
client.telegramFmtParams = &telegramfmt.FormatParams{
GetUserInfoByID: func(ctx context.Context, id int64) (telegramfmt.UserInfo, error) {
+57 -144
View File
@@ -4,8 +4,6 @@ import (
"bytes"
"context"
"fmt"
"slices"
"sync"
"time"
"github.com/gotd/td/tg"
@@ -13,6 +11,7 @@ import (
"maunium.net/go/mautrix/bridgev2"
"maunium.net/go/mautrix/bridgev2/database"
"maunium.net/go/mautrix/bridgev2/networkid"
"maunium.net/go/mautrix/bridgev2/simplevent"
"go.mau.fi/mautrix-telegram/pkg/connector/emojis"
"go.mau.fi/mautrix-telegram/pkg/connector/ids"
@@ -314,14 +313,6 @@ func (t *TelegramClient) handleTelegramReactions(ctx context.Context, msg *tg.Me
}
}
if _, ok := t.reactionMessageLocks[msg.ID]; !ok {
t.reactionMessageLocks[msg.ID] = &sync.Mutex{}
}
t.reactionMessageLocks[msg.ID].Lock()
defer t.reactionMessageLocks[msg.ID].Unlock()
isFull := len(reactionsList) == totalCount
reactions := map[networkid.UserID][]tg.MessagePeerReaction{}
var customEmojiIDs []int64
for _, reaction := range reactionsList {
if e, ok := reaction.Reaction.(*tg.ReactionCustomEmoji); ok {
@@ -330,19 +321,68 @@ func (t *TelegramClient) handleTelegramReactions(ctx context.Context, msg *tg.Me
log.Error().Type("reaction", reaction.Reaction).Msg("unknown reaction type")
return
}
}
if p, ok := reaction.PeerID.(*tg.PeerUser); !ok {
customEmojis, err := t.transferEmojisToMatrix(ctx, customEmojiIDs)
if err != nil {
log.Err(err).Msg("failed to transfer emojis")
return
}
isFull := len(reactionsList) == totalCount
users := map[networkid.UserID]*bridgev2.ReactionSyncUser{}
for _, reaction := range reactionsList {
peer, ok := reaction.PeerID.(*tg.PeerUser)
if !ok {
log.Error().Type("peer_id", reaction.PeerID).Msg("unknown peer type")
return
} else {
reactions[ids.MakeUserID(p.UserID)] = append(reactions[ids.MakeUserID(p.UserID)], reaction)
}
userID := ids.MakeUserID(peer.UserID)
reactionLimit, err := t.getReactionLimit(ctx, userID)
if err != nil {
reactionLimit = 1
log.Err(err).Int64("id", peer.UserID).Msg("failed to get reaction limit")
}
if _, ok := users[userID]; !ok {
users[userID] = &bridgev2.ReactionSyncUser{HasAllReactions: isFull, MaxCount: reactionLimit}
}
var emojiID networkid.EmojiID
var emoji string
if r, ok := reaction.Reaction.(*tg.ReactionCustomEmoji); ok {
emojiID = ids.MakeEmojiIDFromDocumentID(r.DocumentID)
emoji = customEmojis[emojiID]
} else if r, ok := reaction.Reaction.(*tg.ReactionEmoji); ok {
emojiID = ids.MakeEmojiIDFromEmoticon(r.Emoticon)
emoji = r.Emoticon
} else {
log.Error().Type("reaction_type", reaction.Reaction).Msg("invalid reaction type")
return
}
users[userID].Reactions = append(users[userID].Reactions, &bridgev2.BackfillReaction{
Timestamp: time.Unix(int64(reaction.Date), 0),
Sender: bridgev2.EventSender{
IsFromMe: reaction.My,
SenderLogin: ids.MakeUserLoginID(peer.UserID),
Sender: userID,
},
EmojiID: emojiID,
Emoji: emoji,
})
}
err = t.handleTelegramParsedReactionsLocked(ctx, dbMsg, reactions, customEmojiIDs, isFull, nil, nil)
if err != nil {
log.Err(err).Msg("failed to handle reactions")
}
t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ReactionSync{
EventMeta: simplevent.EventMeta{
Type: bridgev2.RemoteEventReactionSync,
LogContext: func(c zerolog.Context) zerolog.Context {
return c.Str("message_id", string(msg.ID))
},
PortalKey: dbMsg.Room,
},
TargetMessage: dbMsg.ID,
Reactions: &bridgev2.ReactionSyncData{Users: users, HasAllUsers: isFull},
})
}
func (t *TelegramClient) inputPeerForPortalID(ctx context.Context, portalID networkid.PortalID) (tg.InputPeerClass, error) {
@@ -451,130 +491,3 @@ func (t *TelegramClient) transferEmojisToMatrix(ctx context.Context, customEmoji
}
return
}
func (t *TelegramClient) handleTelegramParsedReactionsLocked(ctx context.Context, msg *database.Message, reactions map[networkid.UserID][]tg.MessagePeerReaction, customEmojiIDs []int64, isFull bool, onlyUserID *networkid.UserID, timestamp *time.Time) error {
customEmojis, err := t.transferEmojisToMatrix(ctx, customEmojiIDs)
if err != nil {
return err
}
existingReactions, err := t.main.Bridge.DB.Reaction.GetAllToMessage(ctx, msg.ID)
if err != nil {
return err
}
var removed []*database.Reaction
for _, existing := range existingReactions {
if onlyUserID != nil && existing.SenderID != *onlyUserID {
continue
}
var matched bool
reactions[existing.SenderID], matched, err = reactionsFilter(reactions[existing.SenderID], existing)
if err != nil {
return err
} else if !matched {
if isFull {
removed = append(removed, existing)
} else if reactionLimit, err := t.getReactionLimit(ctx, existing.SenderID); err != nil {
return err
} else if len(reactions[existing.SenderID]) >= reactionLimit {
removed = append(removed, existing)
}
}
}
for sender, reactions := range reactions {
senderID, err := ids.ParseUserID(sender)
if err != nil {
return err
}
for _, reaction := range reactions {
var emojiID networkid.EmojiID
var emoji string
if r, ok := reaction.Reaction.(*tg.ReactionCustomEmoji); ok {
emojiID = ids.MakeEmojiIDFromDocumentID(r.DocumentID)
emoji = customEmojis[emojiID]
} else if r, ok := reaction.Reaction.(*tg.ReactionEmoji); ok {
emojiID = ids.MakeEmojiIDFromEmoticon(r.Emoticon)
emoji = r.Emoticon
} else {
return fmt.Errorf("invalid reaction type %T", reaction.Reaction)
}
evt := &bridgev2.SimpleRemoteEvent[any]{
Type: bridgev2.RemoteEventReaction,
LogContext: func(c zerolog.Context) zerolog.Context {
return c.
Any("reaction", reaction.Reaction).
Str("sender_id", string(sender)).
Str("message_id", string(msg.ID))
},
Sender: bridgev2.EventSender{
IsFromMe: reaction.My,
SenderLogin: ids.MakeUserLoginID(senderID),
Sender: sender,
},
PortalKey: msg.Room,
TargetMessage: msg.ID,
EmojiID: emojiID,
Emoji: emoji,
}
if timestamp != nil {
evt.Timestamp = *timestamp
}
t.main.Bridge.QueueRemoteEvent(t.userLogin, evt)
}
}
for _, r := range removed {
senderID, err := ids.ParseUserID(r.SenderID)
if err != nil {
return err
}
evt := &bridgev2.SimpleRemoteEvent[any]{
Type: bridgev2.RemoteEventReactionRemove,
LogContext: func(c zerolog.Context) zerolog.Context {
return c.
Str("emoji_id", string(r.EmojiID)).
Str("sender_id", string(r.SenderID)).
Str("message_id", string(msg.ID))
},
Sender: bridgev2.EventSender{
IsFromMe: t.userID == r.SenderID,
SenderLogin: ids.MakeUserLoginID(senderID),
Sender: r.SenderID,
},
PortalKey: msg.Room,
TargetMessage: r.MessageID,
EmojiID: r.EmojiID,
}
if timestamp != nil {
evt.Timestamp = *timestamp
}
t.main.Bridge.QueueRemoteEvent(t.userLogin, evt)
}
return nil
}
func reactionsFilter(reactions []tg.MessagePeerReaction, existing *database.Reaction) (newReactions []tg.MessagePeerReaction, matched bool, err error) {
if len(reactions) == 0 {
return nil, false, nil
}
documentID, emoticon, err := ids.ParseEmojiID(existing.EmojiID)
if err != nil {
return nil, false, err
}
newReactions = slices.DeleteFunc(reactions, func(r tg.MessagePeerReaction) bool {
if rce, ok := r.Reaction.(*tg.ReactionCustomEmoji); ok {
return documentID == rce.DocumentID
} else if r, ok := r.Reaction.(*tg.ReactionEmoji); ok {
return emoticon == r.Emoticon
}
return false
})
return newReactions, len(newReactions) < len(reactions), nil
}