diff --git a/go.mod b/go.mod index 2b6b349a..b63b10e9 100644 --- a/go.mod +++ b/go.mod @@ -1,17 +1,17 @@ module go.mau.fi/mautrix-telegram -go 1.21 +go 1.22 require ( github.com/gotd/td v0.105.0 github.com/rs/zerolog v1.33.0 github.com/stretchr/testify v1.9.0 - go.mau.fi/util v0.6.1-0.20240802175451-b430ebbffc98 + go.mau.fi/util v0.7.0 go.mau.fi/zerozap v0.1.1 go.uber.org/zap v1.27.0 - golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 - golang.org/x/net v0.27.0 - maunium.net/go/mautrix v0.19.1-0.20240807155838-eabab275895d + golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa + golang.org/x/net v0.28.0 + maunium.net/go/mautrix v0.20.1-0.20240820143721-591ac60f0caa ) require ( @@ -30,6 +30,7 @@ require ( github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-sqlite3 v1.14.22 // indirect + github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rs/xid v1.5.0 // indirect github.com/segmentio/asm v1.2.0 // indirect @@ -44,10 +45,10 @@ require ( go.opentelemetry.io/otel/trace v1.28.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.25.0 // indirect - golang.org/x/sync v0.7.0 // indirect - golang.org/x/sys v0.22.0 // indirect - golang.org/x/text v0.16.0 // indirect + golang.org/x/crypto v0.26.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/sys v0.24.0 // indirect + golang.org/x/text v0.17.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect maunium.net/go/mauflag v1.0.0 // indirect diff --git a/go.sum b/go.sum index 3b4281d4..7bc3c9dc 100644 --- a/go.sum +++ b/go.sum @@ -42,6 +42,8 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7 h1:Dx7Ovyv/SFnMFw3fD4oEoeorXc6saIiQ23LrGLth0Gw= +github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -69,8 +71,8 @@ github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= github.com/yuin/goldmark v1.7.4 h1:BDXOHExt+A7gwPCJgPIIq7ENvceR7we7rOS9TNoLZeg= github.com/yuin/goldmark v1.7.4/go.mod h1:uzxRWxtg69N339t3louHJ7+O03ezfj6PlliRlaOzY1E= -go.mau.fi/util v0.6.1-0.20240802175451-b430ebbffc98 h1:gJ0peWecBm6TtlxKFVIc1KbooXSCHtPfsfb2Eha5A0A= -go.mau.fi/util v0.6.1-0.20240802175451-b430ebbffc98/go.mod h1:S1juuPWGau2GctPY3FR/4ec/MDLhAG2QPhdnUwpzWIo= +go.mau.fi/util v0.7.0 h1:l31z+ivrSQw+cv/9eFebEqtQW2zhxivGypn+JT0h/ws= +go.mau.fi/util v0.7.0/go.mod h1:bWYreIoTULL/UiRbZdfddPh7uWDFW5yX4YCv5FB0eE0= go.mau.fi/zeroconfig v0.1.3 h1:As9wYDKmktjmNZW5i1vn8zvJlmGKHeVxHVIBMXsm4kM= go.mau.fi/zeroconfig v0.1.3/go.mod h1:NcSJkf180JT+1IId76PcMuLTNa1CzsFFZ0nBygIQM70= go.mau.fi/zerozap v0.1.1 h1:mxE/dW4wtkqBYOXOEEzXldk5qKB+ahsZXjoTGnvEhZQ= @@ -87,22 +89,22 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= -golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= -golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8= -golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= -golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= -golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= +golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa h1:ELnwvuAXPNtPk1TJRuGkI9fDTwym6AYBu0qzT8AcHdI= +golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa/go.mod h1:akd2r19cwCdwSwWeIdzYQGa/EZZyqcOdwWiwj5L5eKQ= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= -golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= -golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= @@ -112,8 +114,8 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M= maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA= -maunium.net/go/mautrix v0.19.1-0.20240807155838-eabab275895d h1:+PtYgqxswmN5UM9XSLKO88TjYJHApwWn0j4fAnCslxg= -maunium.net/go/mautrix v0.19.1-0.20240807155838-eabab275895d/go.mod h1:ZWyxoQxRTBxzWIMs0kQCVogZIY0clTu33h102veCT/Q= +maunium.net/go/mautrix v0.20.1-0.20240820143721-591ac60f0caa h1:um7ddCVXb4wvb0pmtgoQc8GClUpmXeVYQE1BrI7gS7g= +maunium.net/go/mautrix v0.20.1-0.20240820143721-591ac60f0caa/go.mod h1:NhWZ4jpQ2CW+t6TmGrnydAIL0htdoXmGiNTdHb2PzL4= nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0= nhooyr.io/websocket v1.8.11/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= rsc.io/qr v0.2.0 h1:6vBLea5/NRMVTz8V66gipeLycZMl/+UlFmk8DvqQ6WY= diff --git a/pkg/connector/backfill.go b/pkg/connector/backfill.go new file mode 100644 index 00000000..8c30d5d5 --- /dev/null +++ b/pkg/connector/backfill.go @@ -0,0 +1,163 @@ +package connector + +import ( + "context" + "fmt" + "slices" + "time" + + "github.com/gotd/td/tg" + "github.com/rs/zerolog" + "maunium.net/go/mautrix/bridgev2" + "maunium.net/go/mautrix/bridgev2/database" + "maunium.net/go/mautrix/bridgev2/networkid" + + "go.mau.fi/mautrix-telegram/pkg/connector/ids" +) + +func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2.FetchMessagesParams) (*bridgev2.FetchMessagesResponse, error) { + log := zerolog.Ctx(ctx).With(). + Str("method", "FetchMessages"). + Logger() + ctx = log.WithContext(ctx) + + peer, err := t.inputPeerForPortalID(ctx, fetchParams.Portal.ID) + if err != nil { + return nil, err + } + + req := tg.MessagesGetHistoryRequest{ + Peer: peer, + Limit: fetchParams.Count, + } + if fetchParams.AnchorMessage != nil && !fetchParams.Forward { + req.MaxID, err = ids.ParseMessageID(fetchParams.AnchorMessage.ID) + if err != nil { + return nil, err + } + } + rawMsgs, err := t.client.API().MessagesGetHistory(ctx, &req) + if err != nil { + return nil, err + } + msgs, ok := rawMsgs.(interface{ GetMessages() []tg.MessageClass }) + if !ok { + return nil, fmt.Errorf("unsupported messages type %T", rawMsgs) + } + + var markRead bool // TODO implement + messages := msgs.GetMessages() + + var cursor networkid.PaginationCursor + if len(messages) > 0 { + cursor = ids.MakePaginationCursorID(messages[len(messages)-1].GetID()) + } + + var stopAt int + if fetchParams.AnchorMessage != nil && fetchParams.Forward { + stopAt, err = ids.ParseMessageID(fetchParams.AnchorMessage.ID) + if err != nil { + return nil, err + } + } + + var backfillMessages []*bridgev2.BackfillMessage + for _, msg := range messages { + // If we are doing forward backfill and we get to the anchor message, + // don't convert any more messages. + if stopAt > 0 && msg.GetID() <= stopAt { + break + } + + if msg.TypeID() != tg.MessageTypeID { + log.Warn().Str("type", msg.TypeName()).Msg("skipping backfilling unsupported message type") + continue + } + message := msg.(*tg.Message) + + portal, err := t.main.Bridge.GetPortalByKey(ctx, fetchParams.Portal.PortalKey) + if err != nil { + return nil, err + } + + sender := t.getEventSender(message) + intent := portal.GetIntentFor(ctx, sender, t.userLogin, bridgev2.RemoteEventBackfill) + converted, err := t.convertToMatrix(ctx, portal, intent, message) + if err != nil { + return nil, err + } + reactionsList, _, customEmojis, err := t.computeReactionsList(ctx, message) + if err != nil { + return nil, err + } + + backfillMessage := bridgev2.BackfillMessage{ + ConvertedMessage: converted, + Sender: sender, + ID: ids.MakeMessageID(message.ID), + Timestamp: time.Unix(int64(message.Date), 0), + } + + for _, reaction := range reactionsList { + peer, ok := reaction.PeerID.(*tg.PeerUser) + if !ok { + return nil, fmt.Errorf("unknown peer type %T", reaction.PeerID) + } + + emojiID, emoji, err := computeEmojiAndID(reaction.Reaction, customEmojis) + if err != nil { + return nil, fmt.Errorf("failed to compute emoji and ID: %w", err) + } + + backfillMessage.Reactions = append(backfillMessage.Reactions, &bridgev2.BackfillReaction{ + Timestamp: time.Unix(int64(reaction.Date), 0), + Sender: bridgev2.EventSender{ + IsFromMe: reaction.My, + SenderLogin: ids.MakeUserLoginID(peer.UserID), + Sender: ids.MakeUserID(peer.UserID), + }, + EmojiID: emojiID, + Emoji: emoji, + }) + } + + backfillMessages = append(backfillMessages, &backfillMessage) + } + + // They are returned with most recent message first, so reverse the order. + slices.Reverse(backfillMessages) + + return &bridgev2.FetchMessagesResponse{ + Messages: backfillMessages, + Cursor: cursor, + HasMore: len(messages) == fetchParams.Count, + Forward: fetchParams.Forward, + MarkRead: markRead, + }, nil +} + +func (c *TelegramClient) GetBackfillMaxBatchCount(ctx context.Context, portal *bridgev2.Portal, task *database.BackfillTask) int { + log := zerolog.Ctx(ctx).With(). + Str("method", "GetBackfillMaxBatchCount"). + Logger() + peerType, _, err := ids.ParsePortalID(portal.ID) + if err != nil { + log.Err(err).Msg("failed to parse portal ID") + return 0 + } + switch peerType { + case ids.PeerTypeUser: + return c.main.Bridge.Config.Backfill.Queue.GetOverride("user") + case ids.PeerTypeChat: + return c.main.Bridge.Config.Backfill.Queue.GetOverride("normal_group") + case ids.PeerTypeChannel: + if portal.Metadata.(*PortalMetadata).IsSuperGroup { + return c.main.Bridge.Config.Backfill.Queue.GetOverride("supergroup") + } else { + return c.main.Bridge.Config.Backfill.Queue.GetOverride("channel") + } + default: + log.Error().Str("peer_type", string(peerType)).Msg("unknown peer type") + return 0 + } +} diff --git a/pkg/connector/chatinfo.go b/pkg/connector/chatinfo.go index 3e804573..30cb24b6 100644 --- a/pkg/connector/chatinfo.go +++ b/pkg/connector/chatinfo.go @@ -16,8 +16,9 @@ import ( func (t *TelegramClient) getDMChatInfo(ctx context.Context, userID int64) (*bridgev2.ChatInfo, error) { chatInfo := bridgev2.ChatInfo{ - Type: ptr.Ptr(database.RoomTypeDM), - Members: &bridgev2.ChatMemberList{IsFull: true}, + Type: ptr.Ptr(database.RoomTypeDM), + Members: &bridgev2.ChatMemberList{IsFull: true}, + CanBackfill: true, } accessHash, found, err := t.ScopedStore.GetUserAccessHash(ctx, userID) if err != nil { @@ -57,25 +58,36 @@ func (t *TelegramClient) getGroupChatInfo(ctx context.Context, fullChat *tg.Mess return nil, false, err } + var name *string + var isBroadcastChannel, isMegagroup bool + for _, c := range fullChat.GetChats() { + if c.GetID() == chatID { + switch chat := c.(type) { + case *tg.Chat: + name = &chat.Title + case *tg.Channel: + name = &chat.Title + isBroadcastChannel = chat.Broadcast + isMegagroup = chat.Megagroup + } + break + } + } + chatInfo := bridgev2.ChatInfo{ + Name: name, Type: ptr.Ptr(database.RoomTypeGroupDM), // TODO Is this correct for channels? Members: &bridgev2.ChatMemberList{ IsFull: true, Members: []bridgev2.ChatMember{{EventSender: t.mySender()}}, }, - } - var isBroadcastChannel bool - for _, c := range fullChat.GetChats() { - if c.GetID() == chatID { - switch chat := c.(type) { - case *tg.Chat: - chatInfo.Name = &chat.Title - case *tg.Channel: - chatInfo.Name = &chat.Title - isBroadcastChannel = chat.Broadcast - } - break - } + CanBackfill: true, + ExtraUpdates: func(ctx context.Context, p *bridgev2.Portal) bool { + meta := p.Metadata.(*PortalMetadata) + changed := meta.IsSuperGroup != isMegagroup + meta.IsSuperGroup = isMegagroup + return changed + }, } if ttl, ok := fullChat.FullChat.GetTTLPeriod(); ok { diff --git a/pkg/connector/client.go b/pkg/connector/client.go index 7fc1615e..e0cff497 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -48,13 +48,15 @@ type TelegramClient struct { } var ( - _ bridgev2.NetworkAPI = (*TelegramClient)(nil) - _ bridgev2.EditHandlingNetworkAPI = (*TelegramClient)(nil) - _ bridgev2.ReactionHandlingNetworkAPI = (*TelegramClient)(nil) - _ bridgev2.RedactionHandlingNetworkAPI = (*TelegramClient)(nil) - _ bridgev2.ReadReceiptHandlingNetworkAPI = (*TelegramClient)(nil) - _ bridgev2.ReadReceiptHandlingNetworkAPI = (*TelegramClient)(nil) - _ bridgev2.TypingHandlingNetworkAPI = (*TelegramClient)(nil) + _ bridgev2.NetworkAPI = (*TelegramClient)(nil) + _ bridgev2.EditHandlingNetworkAPI = (*TelegramClient)(nil) + _ bridgev2.ReactionHandlingNetworkAPI = (*TelegramClient)(nil) + _ bridgev2.RedactionHandlingNetworkAPI = (*TelegramClient)(nil) + _ bridgev2.ReadReceiptHandlingNetworkAPI = (*TelegramClient)(nil) + _ bridgev2.ReadReceiptHandlingNetworkAPI = (*TelegramClient)(nil) + _ bridgev2.TypingHandlingNetworkAPI = (*TelegramClient)(nil) + _ bridgev2.BackfillingNetworkAPI = (*TelegramClient)(nil) + _ bridgev2.BackfillingNetworkAPIWithLimits = (*TelegramClient)(nil) // _ bridgev2.IdentifierResolvingNetworkAPI = (*TelegramClient)(nil) // _ bridgev2.GroupCreatingNetworkAPI = (*TelegramClient)(nil) // _ bridgev2.ContactListingNetworkAPI = (*TelegramClient)(nil) diff --git a/pkg/connector/config.go b/pkg/connector/config.go index 003025ac..bd759805 100644 --- a/pkg/connector/config.go +++ b/pkg/connector/config.go @@ -87,7 +87,7 @@ func (tg *TelegramConnector) ValidateConfig() error { func (tg *TelegramConnector) GetDBMetaTypes() database.MetaTypes { return database.MetaTypes{ Ghost: func() any { return &GhostMetadata{} }, - Portal: nil, + Portal: func() any { return &PortalMetadata{} }, Message: func() any { return &MessageMetadata{} }, Reaction: nil, UserLogin: func() any { return &UserLoginMetadata{} }, @@ -99,6 +99,10 @@ type GhostMetadata struct { IsBot bool `json:"is_bot,omitempty"` } +type PortalMetadata struct { + IsSuperGroup bool `json:"is_supergroup,omitempty"` +} + type MessageMetadata struct { ContentHash []byte `json:"content_hash,omitempty"` ContentURI id.ContentURIString `json:"content_uri,omitempty"` diff --git a/pkg/connector/ids/ids.go b/pkg/connector/ids/ids.go index ea84e6a8..f7e64146 100644 --- a/pkg/connector/ids/ids.go +++ b/pkg/connector/ids/ids.go @@ -30,6 +30,10 @@ func MakeMessageID(messageID int) networkid.MessageID { return networkid.MessageID(strconv.Itoa(messageID)) } +func MakePaginationCursorID(messageID int) networkid.PaginationCursor { + return networkid.PaginationCursor(strconv.Itoa(messageID)) +} + func ParseMessageID(messageID networkid.MessageID) (int, error) { return strconv.Atoi(string(messageID)) } diff --git a/pkg/connector/reactions.go b/pkg/connector/reactions.go new file mode 100644 index 00000000..73d87ef8 --- /dev/null +++ b/pkg/connector/reactions.go @@ -0,0 +1,193 @@ +package connector + +import ( + "context" + "fmt" + "time" + + "github.com/gotd/td/tg" + "github.com/rs/zerolog" + "maunium.net/go/mautrix/bridgev2" + "maunium.net/go/mautrix/bridgev2/networkid" + "maunium.net/go/mautrix/bridgev2/simplevent" + + "go.mau.fi/mautrix-telegram/pkg/connector/ids" +) + +func (t *TelegramClient) computeReactionsList(ctx context.Context, msg *tg.Message) (reactions []tg.MessagePeerReaction, isFull bool, customEmojis map[networkid.EmojiID]string, err error) { + log := zerolog.Ctx(ctx).With().Str("fn", "computeReactionsList").Logger() + if _, set := msg.GetReactions(); !set { + return + } + + var totalCount int + for _, r := range msg.Reactions.Results { + totalCount += r.Count + } + + reactionsList := msg.Reactions.RecentReactions + if totalCount > 0 && len(reactionsList) == 0 && !msg.Reactions.CanSeeList { + // We don't know who reacted in a channel, so we can't bridge it properly either + log.Warn().Msg("Can't see reaction list in channel") + return + } + + // TODO + // if self.peer_type == "channel" and not self.megagroup: + // # This should never happen with the previous if + // self.log.warning(f"Can see reaction list in channel ({data!s})") + // # return + + if len(reactionsList) < totalCount { + if user, ok := msg.PeerID.(*tg.PeerUser); ok { + reactionsList = splitDMReactionCounts(msg.Reactions.Results, user.UserID, t.telegramUserID) + + // TODO + // } else if t.isBot { + // // Can't fetch exact reaction senders as a bot + // return + + // TODO should calls to this be limited? + } else if peer, err := t.inputPeerForPortalID(ctx, ids.MakePortalKey(msg.PeerID, t.loginID).ID); err != nil { + return nil, false, nil, fmt.Errorf("failed to get input peer: %w", err) + } else { + reactions, err := t.client.API().MessagesGetMessageReactionsList(ctx, &tg.MessagesGetMessageReactionsListRequest{ + Peer: peer, ID: msg.ID, Limit: 100, + }) + if err != nil { + return nil, false, nil, fmt.Errorf("failed to get reactions list: %w", err) + } + reactionsList = reactions.Reactions + } + } + + var customEmojiIDs []int64 + for _, reaction := range reactionsList { + if e, ok := reaction.Reaction.(*tg.ReactionCustomEmoji); ok { + customEmojiIDs = append(customEmojiIDs, e.DocumentID) + } else if reaction.Reaction.TypeID() != tg.ReactionEmojiTypeID { + return nil, false, nil, fmt.Errorf("unsupported reaction type %T", reaction.Reaction) + } + } + + customEmojis, err = t.transferEmojisToMatrix(ctx, customEmojiIDs) + return reactionsList, len(reactionsList) == totalCount, customEmojis, err +} + +func computeEmojiAndID(reaction tg.ReactionClass, customEmojis map[networkid.EmojiID]string) (emojiID networkid.EmojiID, emoji string, err error) { + if r, ok := reaction.(*tg.ReactionCustomEmoji); ok { + emojiID = ids.MakeEmojiIDFromDocumentID(r.DocumentID) + emoji = customEmojis[emojiID] + } else if r, ok := reaction.(*tg.ReactionEmoji); ok { + emojiID = ids.MakeEmojiIDFromEmoticon(r.Emoticon) + emoji = r.Emoticon + } else { + return "", "", fmt.Errorf("invalid reaction type %T", reaction) + } + return +} + +func (t *TelegramClient) handleTelegramReactions(ctx context.Context, msg *tg.Message) { + log := zerolog.Ctx(ctx).With(). + Str("handler", "handle_telegram_reactions"). + Int("message_id", msg.ID). + Logger() + + dbMsg, err := t.main.Bridge.DB.Message.GetFirstPartByID(ctx, t.loginID, ids.MakeMessageID(msg.ID)) + if err != nil { + log.Err(err).Msg("failed to get message from database") + return + } else if dbMsg == nil { + log.Warn().Msg("message not found in database") + return + } + + reactionsList, isFull, customEmojis, err := t.computeReactionsList(ctx, msg) + if err != nil { + log.Err(err).Msg("failed to compute reactions list") + return + } + + users := map[networkid.UserID]*bridgev2.ReactionSyncUser{} + for _, reaction := range reactionsList { + peer, ok := reaction.PeerID.(*tg.PeerUser) + if !ok { + log.Error().Type("peer_id", reaction.PeerID).Msg("unknown peer type") + return + } + userID := ids.MakeUserID(peer.UserID) + reactionLimit, err := t.getReactionLimit(ctx, userID) + if err != nil { + reactionLimit = 1 + log.Err(err).Int64("id", peer.UserID).Msg("failed to get reaction limit") + } + if _, ok := users[userID]; !ok { + users[userID] = &bridgev2.ReactionSyncUser{HasAllReactions: isFull, MaxCount: reactionLimit} + } + + emojiID, emoji, err := computeEmojiAndID(reaction.Reaction, customEmojis) + if err != nil { + log.Err(err).Msg("failed to compute emoji and ID") + return + } + + users[userID].Reactions = append(users[userID].Reactions, &bridgev2.BackfillReaction{ + Timestamp: time.Unix(int64(reaction.Date), 0), + Sender: bridgev2.EventSender{ + IsFromMe: reaction.My, + SenderLogin: ids.MakeUserLoginID(peer.UserID), + Sender: userID, + }, + EmojiID: emojiID, + Emoji: emoji, + }) + } + + t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ReactionSync{ + EventMeta: simplevent.EventMeta{ + Type: bridgev2.RemoteEventReactionSync, + LogContext: func(c zerolog.Context) zerolog.Context { + return c.Int("message_id", msg.ID) + }, + PortalKey: dbMsg.Room, + }, + TargetMessage: dbMsg.ID, + Reactions: &bridgev2.ReactionSyncData{Users: users, HasAllUsers: isFull}, + }) +} + +func splitDMReactionCounts(res []tg.ReactionCount, theirUserID, myUserID int64) (reactions []tg.MessagePeerReaction) { + for _, item := range res { + if item.Count == 2 || item.ChosenOrder > 0 { + reactions = append(reactions, tg.MessagePeerReaction{ + Reaction: item.Reaction, + PeerID: &tg.PeerUser{UserID: myUserID}, + }) + } + + if item.Count == 2 { + reactions = append(reactions, tg.MessagePeerReaction{ + Reaction: item.Reaction, + PeerID: &tg.PeerUser{UserID: theirUserID}, + }) + } + } + return +} + +func (t *TelegramClient) getReactionLimit(ctx context.Context, sender networkid.UserID) (limit int, err error) { + config, err := t.getAppConfigCached(ctx) + if err != nil { + return 0, err + } + + ghost, err := t.main.Bridge.GetGhostByID(ctx, sender) + if err != nil { + return 0, err + } + if ghost.Metadata.(*GhostMetadata).IsPremium { + return int(config["reactions_user_max_premium"].(float64)), nil + } else { + return int(config["reactions_user_max_default"].(float64)), nil + } +} diff --git a/pkg/connector/telegram.go b/pkg/connector/telegram.go index c120c2ae..04b68d22 100644 --- a/pkg/connector/telegram.go +++ b/pkg/connector/telegram.go @@ -359,140 +359,6 @@ func (t *TelegramClient) onOwnReadReceipt(portalKey networkid.PortalKey, maxID i return nil } -func (t *TelegramClient) handleTelegramReactions(ctx context.Context, msg *tg.Message) { - log := zerolog.Ctx(ctx).With(). - Str("handler", "handle_telegram_reactions"). - Int("message_id", msg.ID). - Logger() - - if _, set := msg.GetReactions(); !set { - log.Debug().Msg("no reactions set on message") - return - } - var totalCount int - for _, r := range msg.Reactions.Results { - totalCount += r.Count - } - - reactionsList := msg.Reactions.RecentReactions - if totalCount > 0 && len(reactionsList) == 0 && !msg.Reactions.CanSeeList { - // We don't know who reacted in a channel, so we can't bridge it properly either - log.Warn().Msg("Can't see reaction list in channel") - return - } - - // TODO - // if self.peer_type == "channel" and not self.megagroup: - // # This should never happen with the previous if - // self.log.warning(f"Can see reaction list in channel ({data!s})") - // # return - - dbMsg, err := t.main.Bridge.DB.Message.GetFirstPartByID(ctx, t.loginID, ids.MakeMessageID(msg.ID)) - if err != nil { - log.Err(err).Msg("failed to get message from database") - return - } else if dbMsg == nil { - log.Warn().Msg("no message found in database") - return - } - - if len(reactionsList) < totalCount { - if user, ok := msg.PeerID.(*tg.PeerUser); ok { - reactionsList = splitDMReactionCounts(msg.Reactions.Results, user.UserID, t.telegramUserID) - - // TODO - // } else if t.isBot { - // // Can't fetch exact reaction senders as a bot - // return - - // TODO should calls to this be limited? - } else if peer, err := t.inputPeerForPortalID(ctx, ids.MakePortalKey(msg.PeerID, t.loginID).ID); err != nil { - log.Err(err).Msg("failed to get input peer") - return - } else { - reactions, err := t.client.API().MessagesGetMessageReactionsList(ctx, &tg.MessagesGetMessageReactionsListRequest{ - Peer: peer, ID: msg.ID, Limit: 100, - }) - if err != nil { - log.Err(err).Msg("failed to get reactions list") - return - } - reactionsList = reactions.Reactions - } - } - - var customEmojiIDs []int64 - for _, reaction := range reactionsList { - if e, ok := reaction.Reaction.(*tg.ReactionCustomEmoji); ok { - customEmojiIDs = append(customEmojiIDs, e.DocumentID) - } else if reaction.Reaction.TypeID() != tg.ReactionEmojiTypeID { - log.Error().Type("reaction", reaction.Reaction).Msg("unknown reaction type") - return - } - } - - customEmojis, err := t.transferEmojisToMatrix(ctx, customEmojiIDs) - if err != nil { - log.Err(err).Msg("failed to transfer emojis") - return - } - - isFull := len(reactionsList) == totalCount - users := map[networkid.UserID]*bridgev2.ReactionSyncUser{} - for _, reaction := range reactionsList { - peer, ok := reaction.PeerID.(*tg.PeerUser) - if !ok { - log.Error().Type("peer_id", reaction.PeerID).Msg("unknown peer type") - return - } - userID := ids.MakeUserID(peer.UserID) - reactionLimit, err := t.getReactionLimit(ctx, userID) - if err != nil { - reactionLimit = 1 - log.Err(err).Int64("id", peer.UserID).Msg("failed to get reaction limit") - } - if _, ok := users[userID]; !ok { - users[userID] = &bridgev2.ReactionSyncUser{HasAllReactions: isFull, MaxCount: reactionLimit} - } - - var emojiID networkid.EmojiID - var emoji string - if r, ok := reaction.Reaction.(*tg.ReactionCustomEmoji); ok { - emojiID = ids.MakeEmojiIDFromDocumentID(r.DocumentID) - emoji = customEmojis[emojiID] - } else if r, ok := reaction.Reaction.(*tg.ReactionEmoji); ok { - emojiID = ids.MakeEmojiIDFromEmoticon(r.Emoticon) - emoji = r.Emoticon - } else { - log.Error().Type("reaction_type", reaction.Reaction).Msg("invalid reaction type") - return - } - - users[userID].Reactions = append(users[userID].Reactions, &bridgev2.BackfillReaction{ - Timestamp: time.Unix(int64(reaction.Date), 0), - Sender: bridgev2.EventSender{ - IsFromMe: reaction.My, - SenderLogin: ids.MakeUserLoginID(peer.UserID), - Sender: userID, - }, - EmojiID: emojiID, - Emoji: emoji, - }) - } - - t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ReactionSync{ - EventMeta: simplevent.EventMeta{ - Type: bridgev2.RemoteEventReactionSync, - LogContext: func(c zerolog.Context) zerolog.Context { - return c.Int("message_id", msg.ID) - }, - PortalKey: dbMsg.Room, - }, - TargetMessage: dbMsg.ID, - Reactions: &bridgev2.ReactionSyncData{Users: users, HasAllUsers: isFull}, - }) -} - func (t *TelegramClient) inputPeerForPortalID(ctx context.Context, portalID networkid.PortalID) (tg.InputPeerClass, error) { peerType, id, err := ids.ParsePortalID(portalID) if err != nil { @@ -522,25 +388,6 @@ func (t *TelegramClient) inputPeerForPortalID(ctx context.Context, portalID netw } } -func splitDMReactionCounts(res []tg.ReactionCount, theirUserID, myUserID int64) (reactions []tg.MessagePeerReaction) { - for _, item := range res { - if item.Count == 2 || item.ChosenOrder > 0 { - reactions = append(reactions, tg.MessagePeerReaction{ - Reaction: item.Reaction, - PeerID: &tg.PeerUser{UserID: myUserID}, - }) - } - - if item.Count == 2 { - reactions = append(reactions, tg.MessagePeerReaction{ - Reaction: item.Reaction, - PeerID: &tg.PeerUser{UserID: theirUserID}, - }) - } - } - return -} - func (t *TelegramClient) getAppConfigCached(ctx context.Context) (map[string]any, error) { if t.appConfig == nil { cfg, err := t.client.API().HelpGetAppConfig(ctx, t.appConfigHash) @@ -564,23 +411,6 @@ func (t *TelegramClient) getAppConfigCached(ctx context.Context) (map[string]any return t.appConfig, nil } -func (t *TelegramClient) getReactionLimit(ctx context.Context, sender networkid.UserID) (limit int, err error) { - config, err := t.getAppConfigCached(ctx) - if err != nil { - return 0, err - } - - ghost, err := t.main.Bridge.GetGhostByID(ctx, sender) - if err != nil { - return 0, err - } - if ghost.Metadata.(*GhostMetadata).IsPremium { - return int(config["reactions_user_max_premium"].(float64)), nil - } else { - return int(config["reactions_user_max_default"].(float64)), nil - } -} - func (t *TelegramClient) transferEmojisToMatrix(ctx context.Context, customEmojiIDs []int64) (result map[networkid.EmojiID]string, err error) { result, customEmojiIDs = emojis.ConvertKnownEmojis(customEmojiIDs)