From d5f87d2ec11fb2b6b7b3135a5d84a53978ad4902 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Sat, 6 Dec 2025 14:52:39 +0200 Subject: [PATCH] all: add support for topics and refactor other things --- go.mod | 2 +- go.sum | 4 +- pkg/connector/backfill.go | 73 ++-- pkg/connector/capabilities.go | 23 +- pkg/connector/chatinfo.go | 100 ++++- pkg/connector/client.go | 45 ++- pkg/connector/directdownload.go | 2 +- pkg/connector/handlematrix.go | 157 ++++++-- pkg/connector/handletelegram.go | 377 ++++++++++-------- pkg/connector/ids.go | 8 +- pkg/connector/ids/ids.go | 54 +-- pkg/connector/media/transfer.go | 2 +- pkg/connector/metadata.go | 7 + pkg/connector/push.go | 7 +- pkg/connector/reactions.go | 12 +- pkg/connector/startchat.go | 10 +- pkg/connector/store/container.go | 7 + pkg/connector/store/phonenumber.go | 56 +++ pkg/connector/store/scoped_store.go | 64 --- pkg/connector/store/topic.go | 62 +++ pkg/connector/store/upgrades/00-latest.sql | 9 +- .../store/upgrades/06-topic-index.sql | 8 + pkg/connector/store/username.go | 70 ++++ pkg/connector/sync.go | 6 +- pkg/connector/tomatrix.go | 16 +- pkg/connector/userinfo.go | 6 +- 26 files changed, 797 insertions(+), 390 deletions(-) create mode 100644 pkg/connector/store/phonenumber.go create mode 100644 pkg/connector/store/topic.go create mode 100644 pkg/connector/store/upgrades/06-topic-index.sql create mode 100644 pkg/connector/store/username.go diff --git a/go.mod b/go.mod index 63de0f9d..e198db5a 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( golang.org/x/sync v0.18.0 golang.org/x/tools v0.39.0 gopkg.in/yaml.v3 v3.0.1 - maunium.net/go/mautrix v0.26.1-0.20251203195941-02ce6ff91851 + maunium.net/go/mautrix v0.26.1-0.20251206105112-4efa4bdac5e3 rsc.io/qr v0.2.0 ) diff --git a/go.sum b/go.sum index aaebb1ad..d12a1127 100644 --- a/go.sum +++ b/go.sum @@ -237,7 +237,7 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M= maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA= -maunium.net/go/mautrix v0.26.1-0.20251203195941-02ce6ff91851 h1:5dty5IkJGxpLj0SQ2+wwKIcrPfZML1uHFcGaQIA9te0= -maunium.net/go/mautrix v0.26.1-0.20251203195941-02ce6ff91851/go.mod h1:NaesYcOQWFDbixVYywCVS+Twlzab9hOUpFNlCBlvciE= +maunium.net/go/mautrix v0.26.1-0.20251206105112-4efa4bdac5e3 h1:llPUQswvRVaWkWqwH1P6T51wkj3fWCXu4rxewN0RLsY= +maunium.net/go/mautrix v0.26.1-0.20251206105112-4efa4bdac5e3/go.mod h1:NaesYcOQWFDbixVYywCVS+Twlzab9hOUpFNlCBlvciE= rsc.io/qr v0.2.0 h1:6vBLea5/NRMVTz8V66gipeLycZMl/+UlFmk8DvqQ6WY= rsc.io/qr v0.2.0/go.mod h1:IF+uZjkb9fqyeF/4tlBoynqmQxUoPfWEKh921coOuXs= diff --git a/pkg/connector/backfill.go b/pkg/connector/backfill.go index 9b18a24e..28399755 100644 --- a/pkg/connector/backfill.go +++ b/pkg/connector/backfill.go @@ -29,6 +29,7 @@ import ( "maunium.net/go/mautrix/bridgev2/networkid" "go.mau.fi/mautrix-telegram/pkg/connector/ids" + "go.mau.fi/mautrix-telegram/pkg/gotd/bin" "go.mau.fi/mautrix-telegram/pkg/gotd/tg" "go.mau.fi/mautrix-telegram/pkg/gotd/tgerr" ) @@ -106,7 +107,7 @@ func (t *TelegramClient) takeoutDialogs(ctx context.Context, takeoutID int64) er } if t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor != "" { var err error - req.OffsetPeer, err = t.inputPeerForPortalID(ctx, t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor) + req.OffsetPeer, _, err = t.inputPeerForPortalID(ctx, t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor) if err != nil { return fmt.Errorf("failed to get input peer for pagination: %w", err) } @@ -150,7 +151,7 @@ func (t *TelegramClient) takeoutDialogs(ctx context.Context, takeoutID int64) er return fmt.Errorf("failed to handle dialogs: %w", err) } - portalKey := t.makePortalKeyFromPeer(dialogs.GetDialogs()[len(dialogs.GetDialogs())-1].GetPeer()) + portalKey := t.makePortalKeyFromPeer(dialogs.GetDialogs()[len(dialogs.GetDialogs())-1].GetPeer(), 0) if t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor == portalKey.ID { t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlDone = true @@ -164,7 +165,7 @@ func (t *TelegramClient) takeoutDialogs(ctx context.Context, takeoutID int64) er return fmt.Errorf("failed to save user login: %w", err) } - req.OffsetPeer, err = t.inputPeerForPortalID(ctx, portalKey.ID) + req.OffsetPeer, _, err = t.inputPeerForPortalID(ctx, portalKey.ID) if err != nil { return fmt.Errorf("failed to get input peer for pagination: %w", err) } @@ -206,43 +207,57 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2 }() } - peer, err := t.inputPeerForPortalID(ctx, fetchParams.Portal.ID) + peer, topicID, err := t.inputPeerForPortalID(ctx, fetchParams.Portal.ID) if err != nil { return nil, err } - req := tg.MessagesGetHistoryRequest{ - Peer: peer, - Limit: fetchParams.Count, - } + var minID, offsetID int if fetchParams.AnchorMessage != nil { if fetchParams.Forward { - _, req.MinID, err = ids.ParseMessageID(fetchParams.AnchorMessage.ID) + _, minID, err = ids.ParseMessageID(fetchParams.AnchorMessage.ID) } else { - _, req.OffsetID, err = ids.ParseMessageID(fetchParams.AnchorMessage.ID) + _, offsetID, err = ids.ParseMessageID(fetchParams.AnchorMessage.ID) } if err != nil { return nil, err } } + if fetchParams.Portal.Metadata.(*PortalMetadata).IsForumGeneral { + topicID = 1 + } + var req bin.Object + if topicID == ids.TopicIDSpaceRoom { + return nil, nil + } else if topicID > 0 { + req = &tg.MessagesGetRepliesRequest{ + Peer: peer, + MsgID: topicID, + Limit: fetchParams.Count, + MinID: minID, + OffsetID: offsetID, + } + } else { + req = &tg.MessagesGetHistoryRequest{ + Peer: peer, + Limit: fetchParams.Count, + MinID: minID, + OffsetID: offsetID, + } + } + if !fetchParams.Forward { + req = &tg.InvokeWithTakeoutRequest{TakeoutID: takeoutID, Query: req} + } log.Info().Any("req", req).Msg("Fetching messages") msgs, err := APICallWithUpdates(ctx, t, func() (tg.ModifiedMessagesMessages, error) { - var rawMsgs tg.MessagesMessagesClass - if fetchParams.Forward { - rawMsgs, err = t.client.API().MessagesGetHistory(ctx, &req) - } else { - var messages tg.MessagesMessagesBox - err = t.client.Invoke(ctx, - &tg.InvokeWithTakeoutRequest{TakeoutID: takeoutID, Query: &req}, - &messages) - rawMsgs = messages.Messages - } + var box tg.MessagesMessagesBox + err = t.client.Invoke(ctx, req, &box) if err != nil { return nil, err } - msgs, ok := rawMsgs.(tg.ModifiedMessagesMessages) + msgs, ok := box.Messages.(tg.ModifiedMessagesMessages) if !ok { - return nil, fmt.Errorf("unsupported messages type %T", rawMsgs) + return nil, fmt.Errorf("unsupported messages type %T", box.Messages) } return msgs, nil }) @@ -251,11 +266,7 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2 } messages := msgs.GetMessages() - - portal, err := t.main.Bridge.GetPortalByKey(ctx, fetchParams.Portal.PortalKey) - if err != nil { - return nil, err - } + portal := fetchParams.Portal // If the first message is the last read message, mark the chat as read // during backfill. @@ -363,7 +374,7 @@ func (c *TelegramClient) GetBackfillMaxBatchCount(ctx context.Context, portal *b log := zerolog.Ctx(ctx).With(). Str("method", "GetBackfillMaxBatchCount"). Logger() - peerType, _, err := ids.ParsePortalID(portal.ID) + peerType, _, topicID, err := ids.ParsePortalID(portal.ID) if err != nil { log.Err(err).Msg("failed to parse portal ID") return 0 @@ -374,7 +385,11 @@ func (c *TelegramClient) GetBackfillMaxBatchCount(ctx context.Context, portal *b case ids.PeerTypeChat: return c.main.Bridge.Config.Backfill.Queue.GetOverride("normal_group") case ids.PeerTypeChannel: - if portal.Metadata.(*PortalMetadata).IsSuperGroup { + if topicID == ids.TopicIDSpaceRoom { + return 0 + } else if topicID > 0 { + return c.main.Bridge.Config.Backfill.Queue.GetOverride("topic", "supergroup") + } else if portal.Metadata.(*PortalMetadata).IsSuperGroup { return c.main.Bridge.Config.Backfill.Queue.GetOverride("supergroup") } else { return c.main.Bridge.Config.Backfill.Queue.GetOverride("channel") diff --git a/pkg/connector/capabilities.go b/pkg/connector/capabilities.go index 1a1da104..7a1ec175 100644 --- a/pkg/connector/capabilities.go +++ b/pkg/connector/capabilities.go @@ -229,8 +229,9 @@ func (t *TelegramClient) GetCapabilities(ctx context.Context, portal *bridgev2.P Timers: telegramTimers, }, State: event.StateFeatureMap{ - event.StateRoomName.Type: {Level: event.CapLevelFullySupported}, - event.StateRoomAvatar.Type: {Level: event.CapLevelFullySupported}, + event.StateRoomName.Type: {Level: event.CapLevelFullySupported}, + event.StateRoomAvatar.Type: {Level: event.CapLevelFullySupported}, + event.StateBeeperDisappearingTimer.Type: {Level: event.CapLevelFullySupported}, }, } // TODO non-admins can only edit messages within 48 hours @@ -258,17 +259,29 @@ func (t *TelegramClient) GetCapabilities(ctx context.Context, portal *bridgev2.P feat.ReactionCount = 3 } portalMetadata := portal.Metadata.(*PortalMetadata) - peerType, _, _ := ids.ParsePortalID(portal.ID) + peerType, _, topicID, _ := ids.ParsePortalID(portal.ID) + if topicID > 0 { + baseID += "+topic" + // TODO do topics have other changes? + delete(feat.State, event.StateRoomAvatar.Type) + delete(feat.State, event.StateBeeperDisappearingTimer.Type) + feat.DisappearingTimer = nil + } else if topicID == ids.TopicIDSpaceRoom { + baseID += "+spaceroom" + feat = &event.RoomFeatures{} + } switch portal.RoomType { case database.RoomTypeDM: baseID += "+dm" feat.DeleteChat = true feat.DeleteChatForEveryone = true - feat.State = nil + feat.State = event.StateFeatureMap{ + event.StateBeeperDisappearingTimer.Type: {Level: event.CapLevelFullySupported}, + } default: // Group creators can delete the chat for everyone, unless it's a large channel - if peerType == ids.PeerTypeChat || portalMetadata.ParticipantsCount < 1000 { + if peerType == ids.PeerTypeChat || portalMetadata.ParticipantsCount < 1000 || topicID > 0 { baseID += "+deletablegroup" feat.DeleteChatForEveryone = true } diff --git a/pkg/connector/chatinfo.go b/pkg/connector/chatinfo.go index 08ea920f..dce850c3 100644 --- a/pkg/connector/chatinfo.go +++ b/pkg/connector/chatinfo.go @@ -40,6 +40,7 @@ var ( modPowerLevel = ptr.Ptr(50) superadminPowerLevel = ptr.Ptr(75) creatorPowerLevel = ptr.Ptr(95) + nobodyPowerLevel = ptr.Ptr(99) otherPowerLevel = ptr.Ptr(40) anonymousPowerLevel = ptr.Ptr(41) @@ -135,9 +136,10 @@ type memberFetchMeta struct { Input *tg.InputChannel IsBroadcast bool ParticipantsHidden bool + IsForum bool } -func (t *TelegramClient) wrapChatInfo(rawChat tg.ChatClass) (*bridgev2.ChatInfo, *memberFetchMeta, error) { +func (t *TelegramClient) wrapChatInfo(portalID networkid.PortalID, rawChat tg.ChatClass) (*bridgev2.ChatInfo, *memberFetchMeta, error) { info := bridgev2.ChatInfo{ Type: ptr.Ptr(database.RoomTypeDefault), CanBackfill: true, @@ -147,8 +149,8 @@ func (t *TelegramClient) wrapChatInfo(rawChat tg.ChatClass) (*bridgev2.ChatInfo, }, ExcludeChangesFromTimeline: true, } - var isMegagroup, isBroadcast, left bool - var channelInput *tg.InputChannel + var mfm memberFetchMeta + var isMegagroup, isForumGeneral, left bool var avatarErr error switch chat := rawChat.(type) { case *tg.Chat: @@ -158,13 +160,28 @@ func (t *TelegramClient) wrapChatInfo(rawChat tg.ChatClass) (*bridgev2.ChatInfo, info.Members.PowerLevels = t.getPowerLevelOverridesFromBannedRights(chat, chat.DefaultBannedRights) left = chat.Left case *tg.Channel: - channelInput = chat.AsInput() + mfm.Input = chat.AsInput() + mfm.IsBroadcast = chat.Broadcast info.Name = &chat.Title info.Members.TotalMemberCount = chat.ParticipantsCount isMegagroup = chat.Megagroup - isBroadcast = chat.Broadcast info.Avatar, avatarErr = t.convertChatPhoto(chat.AsInputPeer(), chat.Photo) info.Members.PowerLevels = t.getPowerLevelOverridesFromBannedRights(chat, chat.DefaultBannedRights) + _, _, topicID, _ := ids.ParsePortalID(portalID) + if chat.Forum { + if topicID == ids.TopicIDSpaceRoom { + info.Type = ptr.Ptr(database.RoomTypeSpace) + } else if topicID == 0 { + isForumGeneral = true + info.Name = ptr.Ptr("#General - " + *info.Name) + } + if topicID != ids.TopicIDSpaceRoom { + info.ParentID = ptr.Ptr(ids.MakeForumParentPortalID(chat.ID)) + } + mfm.IsForum = true + } else if topicID != 0 { + return nil, nil, fmt.Errorf("channel %d is not a forum, cannot have topics", chat.GetID()) + } left = chat.Left if chat.Broadcast { info.Members.MemberMap.Set(bridgev2.ChatMember{ @@ -188,13 +205,21 @@ func (t *TelegramClient) wrapChatInfo(rawChat tg.ChatClass) (*bridgev2.ChatInfo, meta := portal.Metadata.(*PortalMetadata) _ = updatePortalLastSyncAt(ctx, portal) changed := meta.SetIsSuperGroup(isMegagroup) + changed = meta.SetIsForumGeneral(isForumGeneral) || changed if info.Members.TotalMemberCount != 0 && meta.ParticipantsCount != info.Members.TotalMemberCount { meta.ParticipantsCount = info.Members.TotalMemberCount changed = true } return changed } - return &info, &memberFetchMeta{Input: channelInput, IsBroadcast: isBroadcast}, nil + return &info, &mfm, nil +} + +func (t *TelegramClient) overrideChatInfoWithTopic(info *bridgev2.ChatInfo, topic *tg.ForumTopic) { + info.Name = ptr.Ptr(topic.Title + " - " + *info.Name) + if topic.Closed { + info.Members.PowerLevels.EventsDefault = nobodyPowerLevel + } } func (t *TelegramClient) getChannelParticipants(ctx context.Context, req *tg.ChannelsGetParticipantsRequest) (*tg.ChannelsChannelParticipants, error) { @@ -271,7 +296,7 @@ func (t *TelegramClient) fillUserLocalMeta(info *bridgev2.ChatInfo, dialog *tg.D } } -func (t *TelegramClient) wrapFullChatInfo(fullChat *tg.MessagesChatFull) (*bridgev2.ChatInfo, *memberFetchMeta, error) { +func (t *TelegramClient) wrapFullChatInfo(portalID networkid.PortalID, fullChat *tg.MessagesChatFull) (*bridgev2.ChatInfo, *memberFetchMeta, error) { var chat tg.ChatClass for _, c := range fullChat.GetChats() { if c.GetID() == fullChat.FullChat.GetID() { @@ -283,7 +308,7 @@ func (t *TelegramClient) wrapFullChatInfo(fullChat *tg.MessagesChatFull) (*bridg return nil, nil, fmt.Errorf("chat ID %d not found in full chat", fullChat.FullChat.GetID()) } - info, mfm, err := t.wrapChatInfo(chat) + info, mfm, err := t.wrapChatInfo(portalID, chat) if err != nil { return nil, nil, err } @@ -437,7 +462,7 @@ func (t *TelegramClient) filterChannelParticipants(participants []tg.ChannelPart } func (t *TelegramClient) GetChatInfo(ctx context.Context, portal *bridgev2.Portal) (*bridgev2.ChatInfo, error) { - peerType, id, err := ids.ParsePortalID(portal.ID) + peerType, id, topicID, err := ids.ParsePortalID(portal.ID) if err != nil { return nil, err } @@ -452,21 +477,41 @@ func (t *TelegramClient) GetChatInfo(ctx context.Context, portal *bridgev2.Porta if err != nil { return nil, err } - info, _, err := t.wrapFullChatInfo(fullChat) + info, _, err := t.wrapFullChatInfo(portal.ID, fullChat) return info, err case ids.PeerTypeChannel: accessHash, err := t.ScopedStore.GetAccessHash(ctx, ids.PeerTypeChannel, id) if err != nil { return nil, fmt.Errorf("failed to get channel access hash: %w", err) } - inputChannel := &tg.InputChannel{ChannelID: id, AccessHash: accessHash} + if topicID > 0 { + resp, err := APICallWithUpdates(ctx, t, func() (*tg.MessagesForumTopics, error) { + return t.client.API().MessagesGetForumTopicsByID(ctx, &tg.MessagesGetForumTopicsByIDRequest{ + Peer: &tg.InputPeerChannel{ChannelID: id, AccessHash: accessHash}, + Topics: []int{topicID}, + }) + }) + if err != nil { + return nil, err + } + channel, topic, err := getTopicInfoFromResponse(resp, id, topicID) + if err != nil { + return nil, err + } + info, _, err := t.wrapChatInfo(portal.ID, channel) + if err != nil { + return nil, err + } + t.overrideChatInfoWithTopic(info, topic) + return info, nil + } fullChat, err := APICallWithUpdates(ctx, t, func() (*tg.MessagesChatFull, error) { - return t.client.API().ChannelsGetFullChannel(ctx, inputChannel) + return t.client.API().ChannelsGetFullChannel(ctx, &tg.InputChannel{ChannelID: id, AccessHash: accessHash}) }) if err != nil { return nil, err } - info, mfm, err := t.wrapFullChatInfo(fullChat) + info, mfm, err := t.wrapFullChatInfo(portal.ID, fullChat) if err != nil { return nil, err } @@ -480,6 +525,35 @@ func (t *TelegramClient) GetChatInfo(ctx context.Context, portal *bridgev2.Porta } } +func getTopicInfoFromResponse(resp *tg.MessagesForumTopics, channelID int64, topicID int) (channel *tg.Channel, topic *tg.ForumTopic, err error) { + var ok bool + for _, ch := range resp.GetChats() { + if ch.GetID() == channelID { + channel, ok = ch.(*tg.Channel) + if !ok { + return nil, nil, fmt.Errorf("chat ID %d is %T not *tg.Channel", channelID, ch) + } + break + } + } + if channel == nil { + return nil, nil, fmt.Errorf("channel ID %d not found in chats", channelID) + } + for _, tp := range resp.GetTopics() { + if tp.GetID() == topicID { + topic, ok = tp.(*tg.ForumTopic) + if !ok { + return nil, nil, fmt.Errorf("topic ID %d is %T not *tg.ForumTopic", topicID, tp) + } + break + } + } + if topic == nil { + return nil, nil, fmt.Errorf("topic ID %d not found in topics", topicID) + } + return +} + func (t *TelegramClient) getDMPowerLevels(ghost *bridgev2.Ghost) *bridgev2.PowerLevelOverrides { var plo bridgev2.PowerLevelOverrides if ghost.Metadata.(*GhostMetadata).Blocked { diff --git a/pkg/connector/client.go b/pkg/connector/client.go index f6a3d445..31579a49 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -138,7 +138,7 @@ func (u UpdateDispatcher) Handle(ctx context.Context, updates tg.UpdatesClass) e return u.UpdateDispatcher.Handle(ctx, updates) } -var messageLinkRegex = regexp.MustCompile(`^https?://t(?:elegram)?\.(?:me|dog)/([A-Za-z][A-Za-z0-9_]{3,31}[A-Za-z0-9]|[Cc]/[0-9]{1,20})/([0-9]{1,20})$`) +var messageLinkRegex = regexp.MustCompile(`^https?://t(?:elegram)?\.(?:me|dog)/([A-Za-z][A-Za-z0-9_]{3,31}[A-Za-z0-9]|[Cc]/[0-9]{1,20})/([0-9]{1,20})(?:/([0-9]{1,20}))?$`) func (tg *TelegramConnector) deviceConfig() telegram.DeviceConfig { return telegram.DeviceConfig{ @@ -221,24 +221,24 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge return client.onMessageEdit(ctx, update) }) dispatcher.OnUserTyping(func(ctx context.Context, e tg.Entities, update *tg.UpdateUserTyping) error { - return client.handleTyping(client.makePortalKeyFromID(ids.PeerTypeUser, update.UserID), client.senderForUserID(update.UserID), update.Action) + return client.handleTyping(client.makePortalKeyFromID(ids.PeerTypeUser, update.UserID, 0), client.senderForUserID(update.UserID), update.Action) }) dispatcher.OnChatUserTyping(func(ctx context.Context, e tg.Entities, update *tg.UpdateChatUserTyping) error { if update.FromID.TypeID() != tg.PeerUserTypeID { log.Warn().Str("from_id_type", update.FromID.TypeName()).Msg("unsupported from_id type") return nil } - return client.handleTyping(client.makePortalKeyFromID(ids.PeerTypeChat, update.ChatID), client.getPeerSender(update.FromID), update.Action) + return client.handleTyping(client.makePortalKeyFromID(ids.PeerTypeChat, update.ChatID, 0), client.getPeerSender(update.FromID), update.Action) }) dispatcher.OnChannelUserTyping(func(ctx context.Context, e tg.Entities, update *tg.UpdateChannelUserTyping) error { - return client.handleTyping(client.makePortalKeyFromID(ids.PeerTypeChannel, update.ChannelID), client.getPeerSender(update.FromID), update.Action) + return client.handleTyping(client.makePortalKeyFromID(ids.PeerTypeChannel, update.ChannelID, update.TopMsgID), client.getPeerSender(update.FromID), update.Action) }) dispatcher.OnReadHistoryOutbox(client.updateReadReceipt) dispatcher.OnReadHistoryInbox(func(ctx context.Context, e tg.Entities, update *tg.UpdateReadHistoryInbox) error { - return client.onOwnReadReceipt(client.makePortalKeyFromPeer(update.Peer), update.MaxID) + return client.onOwnReadReceipt(client.makePortalKeyFromPeer(update.Peer, update.TopMsgID), update.MaxID) }) dispatcher.OnReadChannelInbox(func(ctx context.Context, e tg.Entities, update *tg.UpdateReadChannelInbox) error { - return client.onOwnReadReceipt(client.makePortalKeyFromID(ids.PeerTypeChannel, update.ChannelID), update.MaxID) + return client.onOwnReadReceipt(client.makePortalKeyFromID(ids.PeerTypeChannel, update.ChannelID, 0), update.MaxID) }) dispatcher.OnNotifySettings(client.onNotifySettings) dispatcher.OnPinnedDialogs(client.onPinnedDialogs) @@ -249,15 +249,18 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge client.updatesManager = updates.New(updates.Config{ OnChannelTooLong: func(channelID int64) error { + // TODO resync topics? res := tc.Bridge.QueueRemoteEvent(login, &simplevent.ChatResync{ EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventChatResync, LogContext: func(c zerolog.Context) zerolog.Context { return c.Str("update", "channel_too_long").Int64("channel_id", channelID) }, - PortalKey: client.makePortalKeyFromID(ids.PeerTypeChannel, channelID), + PortalKey: client.makePortalKeyFromID(ids.PeerTypeChannel, channelID, 0), + }, + CheckNeedsBackfillFunc: func(ctx context.Context, latestMessage *database.Message) (bool, error) { + return true, nil }, - CheckNeedsBackfillFunc: func(ctx context.Context, latestMessage *database.Message) (bool, error) { return true, nil }, }) return resultToError(res) @@ -296,7 +299,7 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge return userInfo, nil }, GetUserInfoByUsername: func(ctx context.Context, username string) (telegramfmt.UserInfo, error) { - if peerType, userID, err := client.ScopedStore.GetEntityIDByUsername(ctx, username); err != nil { + if peerType, userID, err := client.main.Store.Username.GetEntityID(ctx, username); err != nil { return telegramfmt.UserInfo{}, err } else if peerType != ids.PeerTypeUser { return telegramfmt.UserInfo{}, fmt.Errorf("unexpected peer type: %s", peerType) @@ -330,7 +333,17 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge log.Err(err).Msg("error parsing message ID") return url } - log = log.With().Str("group", group).Int("msg_id", msgID).Logger() + var topicID int + if len(submatches) == 4 && submatches[3] != "" { + lastID, err := strconv.Atoi(submatches[3]) + if err != nil { + log.Err(err).Msg("error parsing actual message ID") + return url + } + topicID = msgID + msgID = lastID + } + log = log.With().Str("group", group).Int("topic_id", topicID).Int("msg_id", msgID).Logger() var portalKey networkid.PortalKey if strings.HasPrefix(group, "C/") || strings.HasPrefix(group, "c/") { @@ -339,16 +352,16 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge log.Err(err).Msg("error parsing channel ID") return url } - portalKey = client.makePortalKeyFromID(ids.PeerTypeChannel, chatID) + portalKey = client.makePortalKeyFromID(ids.PeerTypeChannel, chatID, topicID) } else if submatches[1] == "premium" { - portalKey = client.makePortalKeyFromID(ids.PeerTypeUser, 777000) + portalKey = client.makePortalKeyFromID(ids.PeerTypeUser, 777000, 0) } else if userID, err := strconv.ParseInt(submatches[1], 10, 64); err == nil && userID > 0 { - portalKey = client.makePortalKeyFromID(ids.PeerTypeUser, userID) - } else if peerType, peerID, err := client.ScopedStore.GetEntityIDByUsername(ctx, submatches[1]); err != nil { + portalKey = client.makePortalKeyFromID(ids.PeerTypeUser, userID, 0) + } else if peerType, peerID, err := client.main.Store.Username.GetEntityID(ctx, submatches[1]); err != nil { log.Err(err).Msg("Failed to get entity ID by username") return url } else if peerType != "" { - portalKey = client.makePortalKeyFromID(peerType, peerID) + portalKey = client.makePortalKeyFromID(peerType, peerID, topicID) } else { return url } @@ -382,7 +395,7 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge return "", "", 0, false } else if accessHash, err := client.ScopedStore.GetAccessHash(ctx, peerType, telegramUserID); err != nil || accessHash == 0 { return "", "", 0, false - } else if username, err := client.ScopedStore.GetUsername(ctx, peerType, telegramUserID); err != nil { + } else if username, err := client.main.Store.Username.Get(ctx, peerType, telegramUserID); err != nil { return "", "", 0, false } else { return userID, username, accessHash, true diff --git a/pkg/connector/directdownload.go b/pkg/connector/directdownload.go index 228b0b9f..61783314 100644 --- a/pkg/connector/directdownload.go +++ b/pkg/connector/directdownload.go @@ -109,7 +109,7 @@ func (tc *TelegramConnector) Download(ctx context.Context, mediaID networkid.Med return nil, fmt.Errorf("failed to get user login: %w", err) } - logins, err := tc.Bridge.GetUserLoginsInPortal(ctx, ids.PeerTypeChannel.InternalAsPortalKey(info.PeerID, "")) + logins, err := tc.Bridge.GetUserLoginsInPortal(ctx, ids.InternalMakePortalKey(ids.PeerTypeChannel, info.PeerID, 0, "")) if err != nil { return nil, err } else if len(logins) == 0 { diff --git a/pkg/connector/handlematrix.go b/pkg/connector/handlematrix.go index b9cbfb2d..d1d7f118 100644 --- a/pkg/connector/handlematrix.go +++ b/pkg/connector/handlematrix.go @@ -94,8 +94,10 @@ func (t *TelegramClient) HandleMatrixViewingChat(ctx context.Context, msg *bridg if msg.Portal == nil { return nil } + _, _, topicID, _ := ids.ParsePortalID(msg.Portal.PortalKey.ID) + // TODO sync topic parent space meta := msg.Portal.Metadata.(*PortalMetadata) - if !meta.FullSynced || meta.LastSync.Add(24*time.Hour).Before(time.Now()) { + if (topicID == 0 && !meta.FullSynced) || meta.LastSync.Add(24*time.Hour).Before(time.Now()) { t.userLogin.QueueRemoteEvent(&simplevent.ChatResync{ EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventChatResync, @@ -255,17 +257,23 @@ func parseRandomID(txnID networkid.RawTransactionID) int64 { } func (t *TelegramClient) HandleMatrixMessage(ctx context.Context, msg *bridgev2.MatrixMessage) (resp *bridgev2.MatrixMessageResponse, err error) { + if msg.Portal.RoomType == database.RoomTypeSpace { + return nil, fmt.Errorf("can't send messages to space portals") + } // Handle Matrix events only after initial connection has been established to avoid deadlocking gotd err = t.clientInitialized.Wait(ctx) if err != nil { return nil, err } - peer, err := t.inputPeerForPortalID(ctx, msg.Portal.ID) + peer, topicID, err := t.inputPeerForPortalID(ctx, msg.Portal.ID) if err != nil { return nil, err } - log := zerolog.Ctx(ctx).With().Stringer("portal_key", msg.Portal.PortalKey).Any("peer_id", peer).Logger() + log := zerolog.Ctx(ctx).With(). + Stringer("portal_key", msg.Portal.PortalKey). + Any("peer_id", peer). + Logger() ctx = log.WithContext(ctx) var contentURI id.ContentURIString @@ -283,6 +291,13 @@ func (t *TelegramClient) HandleMatrixMessage(ctx context.Context, msg *bridgev2. } replyTo = &tg.InputReplyToMessage{ReplyToMsgID: messageID} } + if topicID > 0 { + if replyTo == nil { + replyTo = &tg.InputReplyToMessage{ReplyToMsgID: topicID} + } else { + replyTo.(*tg.InputReplyToMessage).TopMsgID = topicID + } + } randomID := parseRandomID(msg.InputTransactionID) @@ -396,8 +411,6 @@ func (t *TelegramClient) HandleMatrixMessage(ctx context.Context, msg *bridgev2. resp = &bridgev2.MatrixMessageResponse{ DB: &database.Message{ ID: messageID, - MXID: msg.Event.ID, - Room: msg.Portal.PortalKey, SenderID: t.userID, Timestamp: timestamp, Metadata: &MessageMetadata{ @@ -411,12 +424,15 @@ func (t *TelegramClient) HandleMatrixMessage(ctx context.Context, msg *bridgev2. } func (t *TelegramClient) HandleMatrixEdit(ctx context.Context, msg *bridgev2.MatrixEdit) error { + if msg.Portal.RoomType == database.RoomTypeSpace { + return fmt.Errorf("can't send messages to space portals") + } log := zerolog.Ctx(ctx).With(). Str("conversion_direction", "to_telegram"). Str("handler", "matrix_edit"). Logger() - peer, err := t.inputPeerForPortalID(ctx, msg.Portal.ID) + peer, _, err := t.inputPeerForPortalID(ctx, msg.Portal.ID) if err != nil { return err } @@ -483,11 +499,13 @@ func (t *TelegramClient) HandleMatrixEdit(ctx context.Context, msg *bridgev2.Mat } func (t *TelegramClient) HandleMatrixMessageRemove(ctx context.Context, msg *bridgev2.MatrixMessageRemove) error { - if dbMsg, err := t.main.Bridge.DB.Message.GetPartByMXID(ctx, msg.TargetMessage.MXID); err != nil { + if msg.Portal.RoomType == database.RoomTypeSpace { + return fmt.Errorf("can't send messages to space portals") + } else if dbMsg, err := t.main.Bridge.DB.Message.GetPartByMXID(ctx, msg.TargetMessage.MXID); err != nil { return err } else if _, messageID, err := ids.ParseMessageID(dbMsg.ID); err != nil { return err - } else if peer, err := t.inputPeerForPortalID(ctx, msg.Portal.ID); err != nil { + } else if peer, _, err := t.inputPeerForPortalID(ctx, msg.Portal.ID); err != nil { return err } else { _, err := message.NewSender(t.client.API()). @@ -499,6 +517,9 @@ func (t *TelegramClient) HandleMatrixMessageRemove(ctx context.Context, msg *bri } func (t *TelegramClient) PreHandleMatrixReaction(ctx context.Context, msg *bridgev2.MatrixReaction) (bridgev2.MatrixReactionPreResponse, error) { + if msg.Portal.RoomType == database.RoomTypeSpace { + return bridgev2.MatrixReactionPreResponse{}, fmt.Errorf("can't send messages to space portals") + } log := zerolog.Ctx(ctx).With(). Str("conversion_direction", "to_telegram"). Str("handler", "pre_handle_matrix_reaction"). @@ -562,7 +583,7 @@ func (t *TelegramClient) appendEmojiID(reactionList []tg.ReactionClass, emojiID } func (t *TelegramClient) HandleMatrixReaction(ctx context.Context, msg *bridgev2.MatrixReaction) (reaction *database.Reaction, err error) { - peer, err := t.inputPeerForPortalID(ctx, msg.Portal.ID) + peer, _, err := t.inputPeerForPortalID(ctx, msg.Portal.ID) if err != nil { return nil, err } @@ -599,7 +620,10 @@ func (t *TelegramClient) HandleMatrixReaction(ctx context.Context, msg *bridgev2 } func (t *TelegramClient) HandleMatrixReactionRemove(ctx context.Context, msg *bridgev2.MatrixReactionRemove) error { - peer, err := t.inputPeerForPortalID(ctx, msg.Portal.ID) + if msg.Portal.RoomType == database.RoomTypeSpace { + return fmt.Errorf("can't send messages to space portals") + } + peer, _, err := t.inputPeerForPortalID(ctx, msg.Portal.ID) if err != nil { return err } @@ -637,16 +661,22 @@ func (t *TelegramClient) HandleMatrixReactionRemove(ctx context.Context, msg *br } func (t *TelegramClient) HandleMatrixReadReceipt(ctx context.Context, msg *bridgev2.MatrixReadReceipt) error { + if msg.Portal.RoomType == database.RoomTypeSpace { + return nil + } log := zerolog.Ctx(ctx).With(). Str("action", "handle_matrix_read_receipt"). Str("portal_id", string(msg.Portal.ID)). Bool("is_supergroup", msg.Portal.Metadata.(*PortalMetadata).IsSuperGroup). Logger() - peerType, portalID, parseErr := ids.ParsePortalID(msg.Portal.ID) + peerType, portalID, topicID, parseErr := ids.ParsePortalID(msg.Portal.ID) if parseErr != nil { return parseErr } - inputPeer, parseErr := t.inputPeerForPortalID(ctx, msg.Portal.ID) + if msg.Portal.Metadata.(*PortalMetadata).IsForumGeneral { + topicID = 1 + } + inputPeer, _, parseErr := t.inputPeerForPortalID(ctx, msg.Portal.ID) if parseErr != nil { return parseErr } @@ -659,7 +689,8 @@ func (t *TelegramClient) HandleMatrixReadReceipt(ctx context.Context, msg *bridg go func() { defer wg.Done() _, readMentionsErr = t.client.API().MessagesReadMentions(ctx, &tg.MessagesReadMentionsRequest{ - Peer: inputPeer, + Peer: inputPeer, + TopMsgID: topicID, }) }() @@ -668,7 +699,8 @@ func (t *TelegramClient) HandleMatrixReadReceipt(ctx context.Context, msg *bridg go func() { defer wg.Done() _, readMentionsErr = t.client.API().MessagesReadReactions(ctx, &tg.MessagesReadReactionsRequest{ - Peer: inputPeer, + Peer: inputPeer, + TopMsgID: topicID, }) }() @@ -707,6 +739,7 @@ func (t *TelegramClient) HandleMatrixReadReceipt(ctx context.Context, msg *bridg } _, readMessagesErr = t.client.API().ChannelsReadHistory(ctx, &tg.ChannelsReadHistoryRequest{ Channel: &tg.InputChannel{ChannelID: portalID, AccessHash: accessHash}, + MaxID: maxID, }) if !msg.Portal.Metadata.(*PortalMetadata).IsSuperGroup { @@ -741,21 +774,43 @@ func (t *TelegramClient) HandleMatrixReadReceipt(ctx context.Context, msg *bridg } func (t *TelegramClient) HandleMatrixTyping(ctx context.Context, msg *bridgev2.MatrixTyping) error { - inputPeer, err := t.inputPeerForPortalID(ctx, msg.Portal.ID) + if msg.Portal.RoomType == database.RoomTypeSpace { + return nil + } + inputPeer, topicID, err := t.inputPeerForPortalID(ctx, msg.Portal.ID) if err != nil { return err } + if msg.Portal.Metadata.(*PortalMetadata).IsForumGeneral { + topicID = 1 + } + var action tg.SendMessageActionClass + switch msg.Type { + case bridgev2.TypingTypeText: + action = &tg.SendMessageTypingAction{} + case bridgev2.TypingTypeRecordingMedia: + // TODO media types? + action = &tg.SendMessageRecordVideoAction{} + case bridgev2.TypingTypeUploadingMedia: + action = &tg.SendMessageUploadVideoAction{} + } + if !msg.IsTyping { + action = &tg.SendMessageCancelAction{} + } _, err = t.client.API().MessagesSetTyping(ctx, &tg.MessagesSetTypingRequest{ - Peer: inputPeer, - Action: &tg.SendMessageTypingAction{}, + Peer: inputPeer, + TopMsgID: topicID, + Action: action, }) return err } func (t *TelegramClient) HandleMatrixDisappearingTimer(ctx context.Context, msg *bridgev2.MatrixDisappearingTimer) (bool, error) { - inputPeer, err := t.inputPeerForPortalID(ctx, msg.Portal.ID) + inputPeer, topicID, err := t.inputPeerForPortalID(ctx, msg.Portal.ID) if err != nil { return false, err + } else if topicID > 0 { + return false, fmt.Errorf("topics can't have their own disappearing timer") } _, err = t.client.API().MessagesSetHistoryTTL(ctx, &tg.MessagesSetHistoryTTLRequest{ Peer: inputPeer, @@ -771,7 +826,7 @@ func (t *TelegramClient) HandleMatrixDisappearingTimer(ctx context.Context, msg } func (t *TelegramClient) HandleMute(ctx context.Context, msg *bridgev2.MatrixMute) error { - inputPeer, err := t.inputPeerForPortalID(ctx, msg.Portal.ID) + inputPeer, topicID, err := t.inputPeerForPortalID(ctx, msg.Portal.ID) if err != nil { return err } @@ -780,18 +835,26 @@ func (t *TelegramClient) HandleMute(ctx context.Context, msg *bridgev2.MatrixMut Silent: msg.Content.IsMuted(), MuteUntil: int(max(0, min(msg.Content.GetMutedUntilTime().Unix(), math.MaxInt32))), } + var peer tg.InputNotifyPeerClass + if topicID > 0 { + peer = &tg.InputNotifyForumTopic{Peer: inputPeer, TopMsgID: topicID} + } else { + peer = &tg.InputNotifyPeer{Peer: inputPeer} + } _, err = t.client.API().AccountUpdateNotifySettings(ctx, &tg.AccountUpdateNotifySettingsRequest{ - Peer: &tg.InputNotifyPeer{Peer: inputPeer}, + Peer: peer, Settings: settings, }) return err } func (t *TelegramClient) HandleRoomTag(ctx context.Context, msg *bridgev2.MatrixRoomTag) error { - inputPeer, err := t.inputPeerForPortalID(ctx, msg.Portal.ID) + inputPeer, topicID, err := t.inputPeerForPortalID(ctx, msg.Portal.ID) if err != nil { return err + } else if topicID > 0 { + return fmt.Errorf("topics can't be pinned for yourself") } _, err = t.client.API().MessagesToggleDialogPin(ctx, &tg.MessagesToggleDialogPinRequest{ @@ -802,7 +865,7 @@ func (t *TelegramClient) HandleRoomTag(ctx context.Context, msg *bridgev2.Matrix } func (t *TelegramClient) HandleMatrixDeleteChat(ctx context.Context, chat *bridgev2.MatrixDeleteChat) error { - peerType, id, err := ids.ParsePortalID(chat.Portal.ID) + peerType, id, topicID, err := ids.ParsePortalID(chat.Portal.ID) if err != nil { return err } @@ -839,12 +902,21 @@ func (t *TelegramClient) HandleMatrixDeleteChat(ctx context.Context, chat *bridg if err != nil { return err } - channel := &tg.InputChannel{ - ChannelID: id, - AccessHash: accessHash, - } if chat.Content.DeleteForEveryone { - _, err := t.client.API().ChannelsDeleteChannel(ctx, channel) + if topicID > 0 { + _, err = t.client.API().MessagesDeleteTopicHistory(ctx, &tg.MessagesDeleteTopicHistoryRequest{ + Peer: &tg.InputPeerChannel{ + ChannelID: id, + AccessHash: accessHash, + }, + TopMsgID: topicID, + }) + } else { + _, err = t.client.API().ChannelsDeleteChannel(ctx, &tg.InputChannel{ + ChannelID: id, + AccessHash: accessHash, + }) + } if err != nil { return err } @@ -859,7 +931,7 @@ func (t *TelegramClient) HandleMatrixDeleteChat(ctx context.Context, chat *bridg } func (t *TelegramClient) HandleMatrixRoomName(ctx context.Context, msg *bridgev2.MatrixRoomName) (bool, error) { - peerType, id, err := ids.ParsePortalID(msg.Portal.ID) + peerType, id, topicID, err := ids.ParsePortalID(msg.Portal.ID) if err != nil { return false, err } @@ -879,13 +951,24 @@ func (t *TelegramClient) HandleMatrixRoomName(ctx context.Context, msg *bridgev2 if err != nil { return false, err } - _, err = t.client.API().ChannelsEditTitle(ctx, &tg.ChannelsEditTitleRequest{ - Channel: &tg.InputChannel{ - ChannelID: id, - AccessHash: accessHash, - }, - Title: msg.Content.Name, - }) + if topicID > 0 { + _, err = t.client.API().MessagesEditForumTopic(ctx, &tg.MessagesEditForumTopicRequest{ + Peer: &tg.InputPeerChannel{ + ChannelID: id, + AccessHash: accessHash, + }, + TopicID: topicID, + Title: msg.Content.Name, + }) + } else { + _, err = t.client.API().ChannelsEditTitle(ctx, &tg.ChannelsEditTitleRequest{ + Channel: &tg.InputChannel{ + ChannelID: id, + AccessHash: accessHash, + }, + Title: msg.Content.Name, + }) + } if err != nil { return false, err } @@ -896,13 +979,15 @@ func (t *TelegramClient) HandleMatrixRoomName(ctx context.Context, msg *bridgev2 } func (t *TelegramClient) HandleMatrixRoomAvatar(ctx context.Context, msg *bridgev2.MatrixRoomAvatar) (bool, error) { - peerType, id, err := ids.ParsePortalID(msg.Portal.ID) + peerType, id, topicID, err := ids.ParsePortalID(msg.Portal.ID) if err != nil { return false, err } if peerType == ids.PeerTypeUser { return false, fmt.Errorf("changing user avatar is not supported") + } else if topicID > 0 { + return false, fmt.Errorf("changing group topic avatar is not supported") } var photo tg.InputChatPhotoClass diff --git a/pkg/connector/handletelegram.go b/pkg/connector/handletelegram.go index 8e8cdba2..b5b93ac8 100644 --- a/pkg/connector/handletelegram.go +++ b/pkg/connector/handletelegram.go @@ -47,25 +47,63 @@ import ( type IGetMessage interface { GetMessage() tg.MessageClass + String() string } type IGetMessages interface { GetMessages() []int } -func (t *TelegramClient) selfLeaveChat(portalKey networkid.PortalKey) error { +func (t *TelegramClient) selfLeaveChat(ctx context.Context, portalKey networkid.PortalKey) error { + peerType, id, _, err := ids.ParsePortalID(portalKey.ID) + if err != nil { + return err + } + if peerType == ids.PeerTypeChannel { + topics, err := t.main.Store.Topic.GetAll(ctx, id) + if err != nil { + return err + } + for _, topicID := range topics { + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatDelete{ + EventMeta: simplevent.EventMeta{ + Type: bridgev2.RemoteEventChatDelete, + PortalKey: t.makePortalKeyFromID(peerType, id, topicID), + Sender: t.mySender(), + }, + OnlyForMe: true, + }) + if err = resultToError(res); err != nil { + return err + } + } + } res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatDelete{ EventMeta: simplevent.EventMeta{ - Type: bridgev2.RemoteEventChatDelete, - LogContext: func(c zerolog.Context) zerolog.Context { - return c.Stringer("portal_key", portalKey) - }, + Type: bridgev2.RemoteEventChatDelete, PortalKey: portalKey, Sender: t.mySender(), }, OnlyForMe: true, }) - return resultToError(res) + if err = resultToError(res); err != nil { + return err + } + if peerType == ids.PeerTypeChannel { + // This is a no-op if there's no space portal + res = t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatDelete{ + EventMeta: simplevent.EventMeta{ + Type: bridgev2.RemoteEventChatDelete, + PortalKey: t.makePortalKeyFromID(peerType, id, ids.TopicIDSpaceRoom), + Sender: t.mySender(), + }, + OnlyForMe: true, + }) + if err = resultToError(res); err != nil { + return err + } + } + return nil } func (t *TelegramClient) onUpdateChannel(ctx context.Context, e tg.Entities, update *tg.UpdateChannel) error { @@ -75,7 +113,8 @@ func (t *TelegramClient) onUpdateChannel(ctx context.Context, e tg.Entities, upd Logger() log.Debug().Msg("Fetching channel due to UpdateChannel event") - portalKey := t.makePortalKeyFromID(ids.PeerTypeChannel, update.ChannelID) + // TODO resync topic portals? + portalKey := t.makePortalKeyFromID(ids.PeerTypeChannel, update.ChannelID, 0) chats, err := APICallWithOnlyChatUpdates(ctx, t, func() (tg.MessagesChatsClass, error) { if accessHash, err := t.ScopedStore.GetAccessHash(ctx, ids.PeerTypeChannel, update.ChannelID); err != nil { @@ -88,17 +127,17 @@ func (t *TelegramClient) onUpdateChannel(ctx context.Context, e tg.Entities, upd }) if err != nil { if tgerr.Is(err, tg.ErrChannelInvalid, tg.ErrChannelPrivate) { - return t.selfLeaveChat(portalKey) + return t.selfLeaveChat(ctx, portalKey) } log.Err(err).Msg("Failed to get channel info after UpdateChannel event") } else if len(chats.GetChats()) != 1 { log.Warn().Int("chat_count", len(chats.GetChats())).Msg("Got more than 1 chat in GetChannels response") } else if channel, ok := chats.GetChats()[0].(*tg.Channel); !ok { log.Error().Type("chat_type", chats.GetChats()[0]).Msg("Expected channel, got something else. Leaving the channel.") - return t.selfLeaveChat(portalKey) + return t.selfLeaveChat(ctx, portalKey) } else if channel.Left { log.Error().Msg("Update was for a left channel. Leaving the channel.") - return t.selfLeaveChat(portalKey) + return t.selfLeaveChat(ctx, portalKey) } else { res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ EventMeta: simplevent.EventMeta{ @@ -106,7 +145,7 @@ func (t *TelegramClient) onUpdateChannel(ctx context.Context, e tg.Entities, upd CreatePortal: true, }, GetChatInfoFunc: func(ctx context.Context, portal *bridgev2.Portal) (*bridgev2.ChatInfo, error) { - chatInfo, mfm, err := t.wrapChatInfo(channel) + chatInfo, mfm, err := t.wrapChatInfo(portal.ID, channel) if err != nil { return nil, err } @@ -126,7 +165,7 @@ func (t *TelegramClient) onUpdateChannel(ctx context.Context, e tg.Entities, upd func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Entities, update IGetMessage) error { log := *zerolog.Ctx(ctx) - log.Trace().Any("message_content", update).Msg("Raw message content") + log.Trace().Stringer("message_content", update).Msg("Raw message content") switch msg := update.GetMessage().(type) { case *tg.Message: var isBroadcastChannel bool @@ -167,7 +206,7 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent Stringer("peer_id", msg.PeerID) }, Sender: sender, - PortalKey: t.makePortalKeyFromPeer(msg.PeerID), + PortalKey: t.makePortalKeyFromPeer(msg.PeerID, t.getTopicID(ctx, msg.PeerID, msg.ReplyTo)), CreatePortal: true, Timestamp: time.Unix(int64(msg.Date), 0), StreamOrder: int64(msg.GetID()), @@ -193,12 +232,37 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent } } +func (t *TelegramClient) getTopicID(ctx context.Context, peerID tg.PeerClass, rawReplyTo tg.MessageReplyHeaderClass) int { + topicID := rawGetTopicID(rawReplyTo) + if topicID != 0 { + channelPeer, _ := peerID.(*tg.PeerChannel) + err := t.main.Store.Topic.Add(ctx, channelPeer.GetChannelID(), topicID) + if err != nil { + zerolog.Ctx(ctx).Err(err).Msg("Failed to save topic ID") + } + } + return topicID +} + +func rawGetTopicID(rawReplyTo tg.MessageReplyHeaderClass) int { + switch replyTo := rawReplyTo.(type) { + case *tg.MessageReplyHeader: + if replyTo.ForumTopic { + if replyTo.ReplyToTopID != 0 { + return replyTo.ReplyToTopID + } + return replyTo.ReplyToMsgID + } + } + return 0 +} + func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.MessageService) error { log := zerolog.Ctx(ctx) sender := t.getEventSender(msg, false) eventMeta := simplevent.EventMeta{ - PortalKey: t.makePortalKeyFromPeer(msg.PeerID), + PortalKey: t.makePortalKeyFromPeer(msg.PeerID, t.getTopicID(ctx, msg.PeerID, msg.ReplyTo)), Sender: sender, Timestamp: time.Unix(int64(msg.Date), 0), LogContext: func(c zerolog.Context) zerolog.Context { @@ -217,9 +281,7 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa EventMeta: eventMeta.WithType(bridgev2.RemoteEventChatInfoChange), ChatInfoChange: &bridgev2.ChatInfoChange{ChatInfo: &bridgev2.ChatInfo{Name: &action.Title}}, }) - if err := resultToError(res); err != nil { - return err - } + return resultToError(res) case *tg.MessageActionChatEditPhoto: switch peer := msg.PeerID.(type) { case *tg.PeerChat: @@ -229,9 +291,7 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa Avatar: t.avatarFromPhoto(ctx, ids.PeerTypeChat, peer.ChatID, action.Photo), }}, }) - if err := resultToError(res); err != nil { - return err - } + return resultToError(res) case *tg.PeerChannel: res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{ EventMeta: eventMeta.WithType(bridgev2.RemoteEventChatInfoChange), @@ -239,9 +299,9 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa Avatar: t.avatarFromPhoto(ctx, ids.PeerTypeChannel, peer.ChannelID, action.Photo), }}, }) - if err := resultToError(res); err != nil { - return err - } + return resultToError(res) + default: + return nil } case *tg.MessageActionChatDeletePhoto: @@ -249,9 +309,7 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa EventMeta: eventMeta.WithType(bridgev2.RemoteEventChatInfoChange), ChatInfoChange: &bridgev2.ChatInfoChange{ChatInfo: &bridgev2.ChatInfo{Avatar: &bridgev2.Avatar{Remove: true}}}, }) - if err := resultToError(res); err != nil { - return err - } + return resultToError(res) case *tg.MessageActionChatAddUser: memberChanges := &bridgev2.ChatMemberList{ MemberMap: map[networkid.UserID]bridgev2.ChatMember{}, @@ -266,9 +324,7 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa EventMeta: eventMeta.WithType(bridgev2.RemoteEventChatInfoChange), ChatInfoChange: &bridgev2.ChatInfoChange{MemberChanges: memberChanges}, }) - if err := resultToError(res); err != nil { - return err - } + return resultToError(res) case *tg.MessageActionChatJoinedByLink: res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{ EventMeta: eventMeta.WithType(bridgev2.RemoteEventChatInfoChange), @@ -281,12 +337,10 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa }, }, }) - if err := resultToError(res); err != nil { - return err - } + return resultToError(res) case *tg.MessageActionChatDeleteUser: if action.UserID == t.telegramUserID { - return t.selfLeaveChat(eventMeta.PortalKey) + return t.selfLeaveChat(ctx, eventMeta.PortalKey) } res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{ EventMeta: eventMeta.WithType(bridgev2.RemoteEventChatInfoChange), @@ -299,9 +353,7 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa }, }, }) - if err := resultToError(res); err != nil { - return err - } + return resultToError(res) case *tg.MessageActionChatCreate: res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[any]{ EventMeta: eventMeta.WithType(bridgev2.RemoteEventMessage).WithCreatePortal(true), @@ -318,9 +370,7 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa }, nil }, }) - if err := resultToError(res); err != nil { - return err - } + return resultToError(res) case *tg.MessageActionChannelCreate: res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ @@ -346,9 +396,7 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa }, nil }, }) - if err := resultToError(res); err != nil { - return err - } + return resultToError(res) case *tg.MessageActionSetMessagesTTL: setting := database.DisappearingSetting{ Type: event.DisappearingTypeAfterSend, @@ -362,9 +410,7 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa }, }, }) - if err := resultToError(res); err != nil { - return err - } + return resultToError(res) case *tg.MessageActionPhoneCall: var body strings.Builder if action.Video { @@ -406,9 +452,7 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa }, nil }, }) - if err := resultToError(res); err != nil { - return err - } + return resultToError(res) case *tg.MessageActionGroupCall: var body string if action.Duration == 0 { @@ -429,9 +473,7 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa }, nil }, }) - if err := resultToError(res); err != nil { - return err - } + return resultToError(res) case *tg.MessageActionInviteToGroupCall: var body, html strings.Builder var mentions event.Mentions @@ -446,7 +488,7 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa return err } else { var name string - if username, err := t.ScopedStore.GetUsername(ctx, ids.PeerTypeUser, userID); err != nil { + if username, err := t.main.Store.Username.Get(ctx, ids.PeerTypeUser, userID); err != nil { name = "@" + username } else { name = ghost.Name @@ -477,9 +519,7 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa }, nil }, }) - if err := resultToError(res); err != nil { - return err - } + return resultToError(res) case *tg.MessageActionGroupCallScheduled: start := time.Unix(int64(action.ScheduleDate), 0) res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[any]{ @@ -499,13 +539,11 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa }, nil }, }) - if err := resultToError(res); err != nil { - return err - } + return resultToError(res) case *tg.MessageActionChatMigrateTo: log.Debug().Int64("channel_id", action.ChannelID).Msg("MessageActionChatMigrateTo") - newPortalKey := t.makePortalKeyFromID(ids.PeerTypeChannel, action.ChannelID) + newPortalKey := t.makePortalKeyFromID(ids.PeerTypeChannel, action.ChannelID, 0) if err := t.migrateChat(ctx, eventMeta.PortalKey, newPortalKey); err != nil { log.Err(err).Msg("Failed to migrate chat to channel") return err @@ -528,45 +566,39 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa }, nil }, }) - if err := resultToError(res); err != nil { - return err + return resultToError(res) + + case *tg.MessageActionTopicCreate: + channelPeer, _ := msg.PeerID.(*tg.PeerChannel) + err := t.main.Store.Topic.Add(ctx, channelPeer.GetChannelID(), msg.ID) + if err != nil { + return fmt.Errorf("failed to store new topic: %w", err) } + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ + EventMeta: eventMeta. + WithPortalKey(t.makePortalKeyFromPeer(msg.PeerID, msg.ID)). + WithType(bridgev2.RemoteEventChatResync). + WithCreatePortal(true), + GetChatInfoFunc: t.GetChatInfo, + }) + return resultToError(res) + case *tg.MessageActionTopicEdit: + // TODO specific changes? + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ + EventMeta: eventMeta. + WithPortalKey(t.makePortalKeyFromPeer(msg.PeerID, msg.ID)). + WithType(bridgev2.RemoteEventChatResync). + WithCreatePortal(true), + GetChatInfoFunc: t.GetChatInfo, + }) + return resultToError(res) - // case *tg.MessageActionChannelMigrateFrom: - - // case *tg.MessageActionPinMessage: - // case *tg.MessageActionHistoryClear: - // case *tg.MessageActionGameScore: - // case *tg.MessageActionPaymentSentMe: - // case *tg.MessageActionPaymentSent: - // case *tg.MessageActionScreenshotTaken: - // case *tg.MessageActionCustomAction: - // case *tg.MessageActionBotAllowed: - // case *tg.MessageActionSecureValuesSentMe: - // case *tg.MessageActionSecureValuesSent: - // case *tg.MessageActionContactSignUp: - // case *tg.MessageActionGeoProximityReached: - // case *tg.MessageActionSetChatTheme: - // case *tg.MessageActionChatJoinedByRequest: - // case *tg.MessageActionWebViewDataSentMe: - // case *tg.MessageActionWebViewDataSent: - // case *tg.MessageActionGiftPremium: - // case *tg.MessageActionTopicCreate: - // case *tg.MessageActionTopicEdit: - // case *tg.MessageActionSuggestProfilePhoto: - // case *tg.MessageActionRequestedPeer: - // case *tg.MessageActionSetChatWallPaper: - // case *tg.MessageActionGiftCode: - // case *tg.MessageActionGiveawayLaunch: - // case *tg.MessageActionGiveawayResults: - // case *tg.MessageActionBoostApply: - // case *tg.MessageActionRequestedPeerSentMe: default: log.Warn(). Type("action_type", action). Msg("ignoring unknown action type") + return nil } - return nil } func (t *TelegramClient) migrateChat(ctx context.Context, oldPortalKey, newPortalKey networkid.PortalKey) error { @@ -684,25 +716,22 @@ func (t *TelegramClient) onUserName(ctx context.Context, e tg.Entities, update * func (t *TelegramClient) onDeleteMessages(ctx context.Context, channelID int64, update IGetMessages) error { for _, messageID := range update.GetMessages() { - var portalKey networkid.PortalKey - if channelID == 0 { - // TODO have mautrix-go do this part too? - parts, err := t.main.Bridge.DB.Message.GetAllPartsByID(ctx, t.loginID, ids.MakeMessageID(channelID, messageID)) - if err != nil { - return err - } - if len(parts) == 0 { - zerolog.Ctx(ctx).Debug(). - Int("message_id", messageID). - Int64("channel_id", channelID). - Msg("ignoring delete of message we have no parts for") - continue - } - // TODO can deletes happen across rooms? - portalKey = parts[0].Room - } else { - portalKey = t.makePortalKeyFromPeer(&tg.PeerChannel{ChannelID: channelID}) + // TODO have mautrix-go do this part too? + parts, err := t.main.Bridge.DB.Message.GetAllPartsByID(ctx, t.loginID, ids.MakeMessageID(channelID, messageID)) + if err != nil { + return err } + if len(parts) == 0 { + zerolog.Ctx(ctx).Debug(). + Int("message_id", messageID). + Int64("channel_id", channelID). + Msg("ignoring delete of message we have no parts for") + continue + } + // TODO can deletes happen across rooms? + portalKey := parts[0].Room + // TODO optimize non-topic portal deletion by using channel ID? + //portalKey = t.makePortalKeyFromPeer(&tg.PeerChannel{ChannelID: channelID}) res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.MessageRemove{ EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventMessageRemove, @@ -769,7 +798,7 @@ func (t *TelegramClient) updateChannel(ctx context.Context, channel *tg.Channel) } if username, set := channel.GetUsername(); set { - err := t.ScopedStore.SetUsername(ctx, ids.PeerTypeChannel, channel.ID, username) + err := t.main.Store.Username.Set(ctx, ids.PeerTypeChannel, channel.ID, username) if err != nil { return nil, err } @@ -796,7 +825,7 @@ func (t *TelegramClient) onEntityUpdate(ctx context.Context, e tg.Entities) erro } for chatID, chat := range e.Chats { if chat.GetLeft() { - t.selfLeaveChat(t.makePortalKeyFromID(ids.PeerTypeChat, chatID)) + t.selfLeaveChat(ctx, t.makePortalKeyFromID(ids.PeerTypeChat, chatID, 0)) } } for _, channel := range e.Channels { @@ -821,7 +850,7 @@ func (t *TelegramClient) onMessageEdit(ctx context.Context, update IGetMessage) zerolog.Ctx(ctx).Err(err).Msg("Failed to handle reactions on edited message") } - portalKey := t.makePortalKeyFromPeer(msg.PeerID) + portalKey := t.makePortalKeyFromPeer(msg.PeerID, t.getTopicID(ctx, msg.PeerID, msg.ReplyTo)) portal, err := t.main.Bridge.GetPortalByKey(ctx, portalKey) if err != nil { return err @@ -872,14 +901,12 @@ func (t *TelegramClient) onMessageEdit(ctx context.Context, update IGetMessage) } } - var ce bridgev2.ConvertedEdit - if !bytes.Equal(existingPart.Metadata.(*MessageMetadata).ContentHash, converted.Parts[0].DBMetadata.(*MessageMetadata).ContentHash) { - ce.ModifiedParts = append(ce.ModifiedParts, converted.Parts[0].ToEditPart(existingPart)) + if bytes.Equal(existingPart.Metadata.(*MessageMetadata).ContentHash, converted.Parts[0].DBMetadata.(*MessageMetadata).ContentHash) { + return nil, fmt.Errorf("%w (content hash didn't change)", bridgev2.ErrIgnoringRemoteEvent) } - if len(ce.ModifiedParts) == 0 { - return nil, bridgev2.ErrIgnoringRemoteEvent - } - return &ce, nil + return &bridgev2.ConvertedEdit{ + ModifiedParts: []*bridgev2.ConvertedEditPart{converted.Parts[0].ToEditPart(existingPart)}, + }, nil }, }) return resultToError(res) @@ -890,10 +917,19 @@ func (t *TelegramClient) handleTyping(portal networkid.PortalKey, sender bridgev return nil } timeout := time.Duration(6) * time.Second - if action.TypeID() != tg.SendMessageTypingActionTypeID { + var typingType bridgev2.TypingType + switch action.(type) { + case *tg.SendMessageTypingAction: + typingType = bridgev2.TypingTypeText + case *tg.SendMessageRecordAudioAction, *tg.SendMessageRecordRoundAction, *tg.SendMessageRecordVideoAction: + typingType = bridgev2.TypingTypeRecordingMedia + case *tg.SendMessageUploadAudioAction, *tg.SendMessageUploadDocumentAction, *tg.SendMessageUploadPhotoAction, *tg.SendMessageUploadRoundAction, *tg.SendMessageUploadVideoAction: + typingType = bridgev2.TypingTypeUploadingMedia + case *tg.SendMessageCancelAction: + timeout = 0 + default: timeout = 0 } - // TODO send proper TypingTypes res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Typing{ EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventTyping, @@ -901,6 +937,7 @@ func (t *TelegramClient) handleTyping(portal networkid.PortalKey, sender bridgev Sender: sender, }, Timeout: timeout, + Type: typingType, }) return resultToError(res) } @@ -915,7 +952,7 @@ func (t *TelegramClient) updateReadReceipt(ctx context.Context, e tg.Entities, u res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Receipt{ EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventReadReceipt, - PortalKey: t.makePortalKeyFromPeer(update.Peer), + PortalKey: t.makePortalKeyFromPeer(update.Peer, 0), Sender: bridgev2.EventSender{ SenderLogin: ids.MakeUserLoginID(user.UserID), Sender: ids.MakeUserID(user.UserID), @@ -940,25 +977,25 @@ func (t *TelegramClient) onOwnReadReceipt(portalKey networkid.PortalKey, maxID i return resultToError(res) } -func (t *TelegramClient) inputPeerForPortalID(ctx context.Context, portalID networkid.PortalID) (tg.InputPeerClass, error) { - peerType, id, err := ids.ParsePortalID(portalID) +func (t *TelegramClient) inputPeerForPortalID(ctx context.Context, portalID networkid.PortalID) (tg.InputPeerClass, int, error) { + peerType, id, topicID, err := ids.ParsePortalID(portalID) if err != nil { - return nil, err + return nil, 0, err } switch peerType { case ids.PeerTypeUser: if accessHash, err := t.ScopedStore.GetAccessHash(ctx, ids.PeerTypeUser, id); err != nil { - return nil, fmt.Errorf("failed to get user access hash for %d: %w", id, err) + return nil, 0, fmt.Errorf("failed to get user access hash for %d: %w", id, err) } else { - return &tg.InputPeerUser{UserID: id, AccessHash: accessHash}, nil + return &tg.InputPeerUser{UserID: id, AccessHash: accessHash}, 0, nil } case ids.PeerTypeChat: - return &tg.InputPeerChat{ChatID: id}, nil + return &tg.InputPeerChat{ChatID: id}, 0, nil case ids.PeerTypeChannel: if accessHash, err := t.ScopedStore.GetAccessHash(ctx, ids.PeerTypeChannel, id); err != nil { - return nil, err + return nil, 0, err } else { - return &tg.InputPeerChannel{ChannelID: id, AccessHash: accessHash}, nil + return &tg.InputPeerChannel{ChannelID: id, AccessHash: accessHash}, topicID, nil } default: panic("invalid peer type") @@ -1097,10 +1134,17 @@ func (t *TelegramClient) transferEmojisToMatrix(ctx context.Context, customEmoji } func (t *TelegramClient) onNotifySettings(ctx context.Context, e tg.Entities, update *tg.UpdateNotifySettings) error { - if update.Peer.TypeID() != tg.NotifyPeerTypeID { + var portalKey networkid.PortalKey + switch typedPeer := update.Peer.(type) { + case *tg.NotifyPeer: + portalKey = t.makePortalKeyFromPeer(typedPeer.Peer, 0) + case *tg.NotifyForumTopic: + portalKey = t.makePortalKeyFromPeer(typedPeer.Peer, typedPeer.TopMsgID) + default: zerolog.Ctx(ctx).Debug(). - Str("peer_type", update.Peer.TypeName()). - Msg("Ignoring unsupported peer type") + Type("peer_type", update.Peer). + Any("peer", update.Peer). + Msg("Ignoring unsupported notify settings peer type") return nil } @@ -1111,16 +1155,17 @@ func (t *TelegramClient) onNotifySettings(ctx context.Context, e tg.Entities, up mutedUntil = &bridgev2.Unmuted } - res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ - ChatInfo: &bridgev2.ChatInfo{ - UserLocal: &bridgev2.UserLocalPortalInfo{ - MutedUntil: mutedUntil, + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{ + ChatInfoChange: &bridgev2.ChatInfoChange{ + ChatInfo: &bridgev2.ChatInfo{ + UserLocal: &bridgev2.UserLocalPortalInfo{ + MutedUntil: mutedUntil, + }, }, - CanBackfill: true, }, EventMeta: simplevent.EventMeta{ - Type: bridgev2.RemoteEventChatResync, - PortalKey: t.makePortalKeyFromPeer(update.Peer.(*tg.NotifyPeer).Peer), + Type: bridgev2.RemoteEventChatInfoChange, + PortalKey: portalKey, }, }) return resultToError(res) @@ -1129,11 +1174,11 @@ func (t *TelegramClient) onNotifySettings(ctx context.Context, e tg.Entities, up func (t *TelegramClient) onPinnedDialogs(ctx context.Context, e tg.Entities, msg *tg.UpdatePinnedDialogs) error { needsUnpinning := map[networkid.PortalKey]struct{}{} for _, portalID := range t.userLogin.Metadata.(*UserLoginMetadata).PinnedDialogs { - pt, id, err := ids.ParsePortalID(portalID) + pt, id, _, err := ids.ParsePortalID(portalID) if err != nil { return err } - needsUnpinning[t.makePortalKeyFromID(pt, id)] = struct{}{} + needsUnpinning[t.makePortalKeyFromID(pt, id, 0)] = struct{}{} } t.userLogin.Metadata.(*UserLoginMetadata).PinnedDialogs = nil @@ -1142,19 +1187,20 @@ func (t *TelegramClient) onPinnedDialogs(ctx context.Context, e tg.Entities, msg if !ok { continue } - portalKey := t.makePortalKeyFromPeer(dialog.Peer) + portalKey := t.makePortalKeyFromPeer(dialog.Peer, 0) delete(needsUnpinning, portalKey) t.userLogin.Metadata.(*UserLoginMetadata).PinnedDialogs = append(t.userLogin.Metadata.(*UserLoginMetadata).PinnedDialogs, portalKey.ID) - res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ - ChatInfo: &bridgev2.ChatInfo{ - UserLocal: &bridgev2.UserLocalPortalInfo{ - Tag: ptr.Ptr(event.RoomTagFavourite), + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{ + ChatInfoChange: &bridgev2.ChatInfoChange{ + ChatInfo: &bridgev2.ChatInfo{ + UserLocal: &bridgev2.UserLocalPortalInfo{ + Tag: ptr.Ptr(event.RoomTagFavourite), + }, }, - CanBackfill: true, }, EventMeta: simplevent.EventMeta{ - Type: bridgev2.RemoteEventChatResync, + Type: bridgev2.RemoteEventChatInfoChange, PortalKey: portalKey, }, }) @@ -1165,15 +1211,16 @@ func (t *TelegramClient) onPinnedDialogs(ctx context.Context, e tg.Entities, msg var empty event.RoomTag for portalKey := range needsUnpinning { - res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ - ChatInfo: &bridgev2.ChatInfo{ - UserLocal: &bridgev2.UserLocalPortalInfo{ - Tag: &empty, + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{ + ChatInfoChange: &bridgev2.ChatInfoChange{ + ChatInfo: &bridgev2.ChatInfo{ + UserLocal: &bridgev2.UserLocalPortalInfo{ + Tag: &empty, + }, }, - CanBackfill: true, }, EventMeta: simplevent.EventMeta{ - Type: bridgev2.RemoteEventChatResync, + Type: bridgev2.RemoteEventChatInfoChange, PortalKey: portalKey, }, }) @@ -1186,16 +1233,18 @@ func (t *TelegramClient) onPinnedDialogs(ctx context.Context, e tg.Entities, msg } func (t *TelegramClient) onChatDefaultBannedRights(ctx context.Context, entities tg.Entities, update *tg.UpdateChatDefaultBannedRights) error { - res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ - ChatInfo: &bridgev2.ChatInfo{ - Members: &bridgev2.ChatMemberList{ - PowerLevels: t.getPowerLevelOverridesFromBannedRights(entities.Chats[0], update.DefaultBannedRights), + // TODO update all topic portals + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{ + ChatInfoChange: &bridgev2.ChatInfoChange{ + ChatInfo: &bridgev2.ChatInfo{ + Members: &bridgev2.ChatMemberList{ + PowerLevels: t.getPowerLevelOverridesFromBannedRights(entities.Chats[0], update.DefaultBannedRights), + }, }, - CanBackfill: true, }, EventMeta: simplevent.EventMeta{ - Type: bridgev2.RemoteEventChatResync, - PortalKey: t.makePortalKeyFromPeer(update.Peer), + Type: bridgev2.RemoteEventChatInfoChange, + PortalKey: t.makePortalKeyFromPeer(update.Peer, 0), }, }) return resultToError(res) @@ -1233,7 +1282,7 @@ func (t *TelegramClient) onPeerBlocked(ctx context.Context, e tg.Entities, updat }, EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventChatResync, - PortalKey: t.makePortalKeyFromPeer(update.PeerID), + PortalKey: t.makePortalKeyFromPeer(update.PeerID, 0), }, }) return resultToError(res) @@ -1264,7 +1313,7 @@ func (t *TelegramClient) onPhoneCall(ctx context.Context, e tg.Entities, update res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[any]{ EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventMessage, - PortalKey: t.makePortalKeyFromID(ids.PeerTypeUser, call.AdminID), + PortalKey: t.makePortalKeyFromID(ids.PeerTypeUser, call.AdminID, 0), CreatePortal: true, Sender: t.senderForUserID(call.AdminID), }, diff --git a/pkg/connector/ids.go b/pkg/connector/ids.go index 0213baf0..150bcaa5 100644 --- a/pkg/connector/ids.go +++ b/pkg/connector/ids.go @@ -23,16 +23,16 @@ import ( "go.mau.fi/mautrix-telegram/pkg/gotd/tg" ) -func (t *TelegramClient) makePortalKeyFromPeer(peer tg.PeerClass) networkid.PortalKey { - key := ids.InternalMakePortalKey(peer, t.loginID) +func (t *TelegramClient) makePortalKeyFromPeer(peer tg.PeerClass, topicID int) networkid.PortalKey { + key := ids.InternalPeerToPortalKey(peer, topicID, t.loginID) if t.main.Bridge.Config.SplitPortals { key.Receiver = t.userLogin.ID } return key } -func (t *TelegramClient) makePortalKeyFromID(peerType ids.PeerType, chatID int64) networkid.PortalKey { - key := peerType.InternalAsPortalKey(chatID, t.loginID) +func (t *TelegramClient) makePortalKeyFromID(peerType ids.PeerType, chatID int64, topicID int) networkid.PortalKey { + key := ids.InternalMakePortalKey(peerType, chatID, topicID, t.loginID) if t.main.Bridge.Config.SplitPortals { key.Receiver = t.userLogin.ID } diff --git a/pkg/connector/ids/ids.go b/pkg/connector/ids/ids.go index 6a91f1f5..ac59ab15 100644 --- a/pkg/connector/ids/ids.go +++ b/pkg/connector/ids/ids.go @@ -81,7 +81,7 @@ func MakeMessageID(rawChatID any, messageID int) networkid.MessageID { var channelID int64 switch typedChatID := rawChatID.(type) { case networkid.PortalKey: - peerType, entityID, _ := ParsePortalID(typedChatID.ID) + peerType, entityID, _, _ := ParsePortalID(typedChatID.ID) if peerType == PeerTypeChannel { channelID = entityID } @@ -163,52 +163,52 @@ func (pt PeerType) AsByte() byte { } } -func (pt PeerType) InternalAsPortalKey(chatID int64, receiver networkid.UserLoginID) networkid.PortalKey { +func MakePortalID(pt PeerType, chatID int64) networkid.PortalID { + return networkid.PortalID(fmt.Sprintf("%s:%d", pt, chatID)) +} + +const TopicIDSpaceRoom = -1 + +func MakeForumParentPortalID(channelID int64) networkid.PortalID { + return MakeTopicPortalID(channelID, TopicIDSpaceRoom) +} + +func MakeTopicPortalID(channelID int64, topicID int) networkid.PortalID { + return networkid.PortalID(fmt.Sprintf("%s:%d:%d", PeerTypeChannel, channelID, topicID)) +} + +func InternalMakePortalKey(pt PeerType, chatID int64, topicID int, receiver networkid.UserLoginID) networkid.PortalKey { portalKey := networkid.PortalKey{ - ID: networkid.PortalID(fmt.Sprintf("%s:%d", pt, chatID)), + ID: MakePortalID(pt, chatID), } if pt == PeerTypeUser || pt == PeerTypeChat { portalKey.Receiver = receiver + } else if topicID != 0 { + portalKey.ID = MakeTopicPortalID(chatID, topicID) } return portalKey } -func GetChatID(peer tg.PeerClass) int64 { +func InternalPeerToPortalKey(peer tg.PeerClass, topicID int, receiver networkid.UserLoginID) networkid.PortalKey { switch v := peer.(type) { case *tg.PeerUser: - return v.UserID + return InternalMakePortalKey(PeerTypeUser, v.UserID, topicID, receiver) case *tg.PeerChat: - return v.ChatID + return InternalMakePortalKey(PeerTypeChat, v.ChatID, topicID, receiver) case *tg.PeerChannel: - return v.ChannelID + return InternalMakePortalKey(PeerTypeChannel, v.ChannelID, topicID, receiver) default: panic(fmt.Errorf("unknown peer class type %T", v)) } } -func InternalMakePortalKey(peer tg.PeerClass, receiver networkid.UserLoginID) networkid.PortalKey { - switch v := peer.(type) { - case *tg.PeerUser: - return networkid.PortalKey{ - ID: networkid.PortalID(fmt.Sprintf("%s:%d", PeerTypeUser, v.UserID)), - Receiver: receiver, - } - case *tg.PeerChat: - return networkid.PortalKey{ - ID: networkid.PortalID(fmt.Sprintf("%s:%d", PeerTypeChat, v.ChatID)), - Receiver: receiver, - } - case *tg.PeerChannel: - return networkid.PortalKey{ID: networkid.PortalID(fmt.Sprintf("%s:%d", PeerTypeChannel, v.ChannelID))} - default: - panic(fmt.Errorf("unknown peer class type %T", v)) - } -} - -func ParsePortalID(portalID networkid.PortalID) (pt PeerType, id int64, err error) { +func ParsePortalID(portalID networkid.PortalID) (pt PeerType, id int64, topicID int, err error) { parts := strings.Split(string(portalID), ":") pt = PeerType(parts[0]) id, err = strconv.ParseInt(parts[1], 10, 64) + if len(parts) == 3 && err == nil && pt == PeerTypeChannel { + topicID, err = strconv.Atoi(parts[2]) + } return } diff --git a/pkg/connector/media/transfer.go b/pkg/connector/media/transfer.go index 72bf3733..2ebc6cfa 100644 --- a/pkg/connector/media/transfer.go +++ b/pkg/connector/media/transfer.go @@ -437,7 +437,7 @@ func (t *ReadyTransferer) DownloadBytes(ctx context.Context) ([]byte, error) { // DirectDownloadURL returns the direct download URL for the media. func (t *ReadyTransferer) DirectDownloadURL(ctx context.Context, loggedInUserID int64, portal *bridgev2.Portal, msgID int, thumbnail bool, telegramMediaID int64) (id.ContentURIString, *event.FileInfo, error) { - peerType, chatID, err := ids.ParsePortalID(portal.ID) + peerType, chatID, _, err := ids.ParsePortalID(portal.ID) if err != nil { return "", nil, err } diff --git a/pkg/connector/metadata.go b/pkg/connector/metadata.go index 7905223c..bb2f22df 100644 --- a/pkg/connector/metadata.go +++ b/pkg/connector/metadata.go @@ -50,6 +50,7 @@ type GhostMetadata struct { type PortalMetadata struct { IsSuperGroup bool `json:"is_supergroup,omitempty"` + IsForumGeneral bool `json:"is_forum_general,omitempty"` ReadUpTo int `json:"read_up_to,omitempty"` AllowedReactions []string `json:"allowed_reactions"` LastSync jsontime.Unix `json:"last_sync,omitempty"` @@ -63,6 +64,12 @@ func (pm *PortalMetadata) SetIsSuperGroup(isSupergroup bool) (changed bool) { return changed } +func (pm *PortalMetadata) SetIsForumGeneral(isForumGeneral bool) (changed bool) { + changed = pm.IsForumGeneral != isForumGeneral + pm.IsForumGeneral = isForumGeneral + return changed +} + type MessageMetadata struct { ContentHash []byte `json:"content_hash,omitempty"` ContentURI id.ContentURIString `json:"content_uri,omitempty"` diff --git a/pkg/connector/push.go b/pkg/connector/push.go index a8ae396f..b8561da2 100644 --- a/pkg/connector/push.go +++ b/pkg/connector/push.go @@ -50,6 +50,7 @@ type PushCustomData struct { MessageID int `json:"msg_id,string"` ChannelID int64 `json:"channel_id,string"` + TopicID int `json:"top_msg_id,string"` ChatID int64 `json:"chat_id,string"` FromID int64 `json:"from_id,string"` @@ -281,11 +282,11 @@ func (t *TelegramClient) ConnectBackground(ctx context.Context, params *bridgev2 } var err error if data.Custom.ChannelID != 0 { - relatedPortal, err = t.main.Bridge.GetPortalByKey(ctx, t.makePortalKeyFromID(ids.PeerTypeChannel, data.Custom.ChannelID)) + relatedPortal, err = t.main.Bridge.GetPortalByKey(ctx, t.makePortalKeyFromID(ids.PeerTypeChannel, data.Custom.ChannelID, data.Custom.TopicID)) } else if data.Custom.ChatID != 0 { - relatedPortal, err = t.main.Bridge.GetPortalByKey(ctx, t.makePortalKeyFromID(ids.PeerTypeChat, data.Custom.ChatID)) + relatedPortal, err = t.main.Bridge.GetPortalByKey(ctx, t.makePortalKeyFromID(ids.PeerTypeChat, data.Custom.ChatID, 0)) } else if data.Custom.FromID != 0 { - relatedPortal, err = t.main.Bridge.GetPortalByKey(ctx, t.makePortalKeyFromID(ids.PeerTypeUser, data.Custom.FromID)) + relatedPortal, err = t.main.Bridge.GetPortalByKey(ctx, t.makePortalKeyFromID(ids.PeerTypeUser, data.Custom.FromID, 0)) } if err != nil { return fmt.Errorf("failed to get related portal: %w", err) diff --git a/pkg/connector/reactions.go b/pkg/connector/reactions.go index 612d156c..8edeccc5 100644 --- a/pkg/connector/reactions.go +++ b/pkg/connector/reactions.go @@ -23,6 +23,7 @@ import ( "github.com/rs/zerolog" "maunium.net/go/mautrix/bridgev2" + "maunium.net/go/mautrix/bridgev2/database" "maunium.net/go/mautrix/bridgev2/networkid" "maunium.net/go/mautrix/bridgev2/simplevent" @@ -60,10 +61,11 @@ func (t *TelegramClient) computeReactionsList(ctx context.Context, peer tg.PeerC // // Can't fetch exact reaction senders as a bot // return - // TODO should calls to this be limited? - } else if peer, err := t.inputPeerForPortalID(ctx, t.makePortalKeyFromPeer(peer).ID); err != nil { + // TODO remove redundant peer roundtrip, just add a peer -> input peer helper + } else if peer, _, err := t.inputPeerForPortalID(ctx, t.makePortalKeyFromPeer(peer, 0).ID); err != nil { return nil, false, nil, fmt.Errorf("failed to get input peer: %w", err) } else { + // TODO should calls to this be limited? reactions, err := APICallWithUpdates(ctx, t, func() (*tg.MessagesMessageReactionsList, error) { return t.client.API().MessagesGetMessageReactionsList(ctx, &tg.MessagesGetMessageReactionsListRequest{ Peer: peer, ID: msgID, Limit: 100, @@ -152,7 +154,7 @@ func (t *TelegramClient) handleTelegramReactions(ctx context.Context, msg *tg.Me LogContext: func(c zerolog.Context) zerolog.Context { return c.Int("message_id", msg.ID) }, - PortalKey: t.makePortalKeyFromPeer(msg.PeerID), + PortalKey: t.makePortalKeyFromPeer(msg.PeerID, t.getTopicID(ctx, msg.PeerID, msg.ReplyTo)), }, TargetMessage: ids.GetMessageIDFromMessage(msg), Reactions: &bridgev2.ReactionSyncData{Users: users, HasAllUsers: isFull}, @@ -207,7 +209,7 @@ func (t *TelegramClient) getReactionLimit(ctx context.Context, sender networkid. func (t *TelegramClient) maybePollForReactions(ctx context.Context, portal *bridgev2.Portal) error { // Only poll for reactions in supergroups - if portal == nil || !portal.Metadata.(*PortalMetadata).IsSuperGroup { + if portal == nil || !portal.Metadata.(*PortalMetadata).IsSuperGroup || portal.RoomType == database.RoomTypeSpace { return nil } @@ -225,7 +227,7 @@ func (t *TelegramClient) maybePollForReactions(ctx context.Context, portal *brid } func (t *TelegramClient) pollForReactions(ctx context.Context, portalKey networkid.PortalKey) error { - inputPeer, parseErr := t.inputPeerForPortalID(ctx, portalKey.ID) + inputPeer, _, parseErr := t.inputPeerForPortalID(ctx, portalKey.ID) if parseErr != nil { return parseErr } diff --git a/pkg/connector/startchat.go b/pkg/connector/startchat.go index 8f26bff8..ae7ab4b1 100644 --- a/pkg/connector/startchat.go +++ b/pkg/connector/startchat.go @@ -53,7 +53,7 @@ func (t *TelegramClient) getResolveIdentifierResponseForUser(ctx context.Context UserID: networkUserID, UserInfo: userInfo, Chat: &bridgev2.CreateChatResponse{ - PortalKey: t.makePortalKeyFromID(ids.PeerTypeUser, user.GetID()), + PortalKey: t.makePortalKeyFromID(ids.PeerTypeUser, user.GetID(), 0), }, }, nil } @@ -70,7 +70,7 @@ func (t *TelegramClient) getResolveIdentifierResponseForUserID(ctx context.Conte resp = &bridgev2.ResolveIdentifierResponse{ UserID: networkUserID, Chat: &bridgev2.CreateChatResponse{ - PortalKey: t.makePortalKeyFromID(ids.PeerTypeUser, userID), + PortalKey: t.makePortalKeyFromID(ids.PeerTypeUser, userID, 0), }, } resp.Ghost, err = t.main.Bridge.GetExistingGhostByID(ctx, networkUserID) @@ -108,7 +108,7 @@ func (t *TelegramClient) ResolveIdentifier(ctx context.Context, identifier strin if identifier[0] == '+' { normalized := strings.TrimPrefix(identifier, "+") - if userID, err := t.ScopedStore.GetUserIDByPhoneNumber(ctx, normalized); err != nil { + if userID, err := t.main.Store.PhoneNumber.GetUserID(ctx, normalized); err != nil { return nil, fmt.Errorf("failed to get user ID by phone number: %w", err) } else if userID == 0 { log.Info().Msg("Phone number not found in database") @@ -121,7 +121,7 @@ func (t *TelegramClient) ResolveIdentifier(ctx context.Context, identifier strin return t.getResolveIdentifierResponseForUserID(ctx, userID) } else if match := usernameRe.FindStringSubmatch(identifier); match != nil && !strings.Contains(identifier, "__") { // This is a username - entityType, userID, err := t.ScopedStore.GetEntityIDByUsername(ctx, match[1]) + entityType, userID, err := t.main.Store.Username.GetEntityID(ctx, match[1]) if entityType == ids.PeerTypeUser && (err == nil || userID != 0) { // We know this username. resp, err := t.getResolveIdentifierResponseForUserID(ctx, userID) @@ -273,7 +273,7 @@ func (t *TelegramClient) CreateGroup(ctx context.Context, params *bridgev2.Group } else if chat, ok := chats[0].(*tg.Chat); !ok { return nil, fmt.Errorf("unexpected chat type: %T", chats[0]) } else { - portalKey := t.makePortalKeyFromID(ids.PeerTypeChat, chat.ID) + portalKey := t.makePortalKeyFromID(ids.PeerTypeChat, chat.ID, 0) if params.RoomID != "" { portal, err := t.main.Bridge.GetPortalByKey(ctx, portalKey) if err != nil { diff --git a/pkg/connector/store/container.go b/pkg/connector/store/container.go index b8892097..b18a376a 100644 --- a/pkg/connector/store/container.go +++ b/pkg/connector/store/container.go @@ -20,6 +20,7 @@ import ( "context" "go.mau.fi/util/dbutil" + "go.mau.fi/util/exsync" "go.mau.fi/mautrix-telegram/pkg/connector/store/upgrades" ) @@ -28,6 +29,9 @@ type Container struct { *dbutil.Database TelegramFile *TelegramFileQuery + Username *UsernameQuery + PhoneNumber *PhoneNumberQuery + Topic *TopicQuery } func NewStore(db *dbutil.Database, log dbutil.DatabaseLogger) *Container { @@ -35,6 +39,9 @@ func NewStore(db *dbutil.Database, log dbutil.DatabaseLogger) *Container { Database: db.Child("telegram_version", upgrades.Table, log), TelegramFile: &TelegramFileQuery{dbutil.MakeQueryHelper(db, newTelegramFile)}, + Username: &UsernameQuery{db}, + PhoneNumber: &PhoneNumberQuery{db}, + Topic: &TopicQuery{db: db, existingTopics: exsync.NewSet[topicKey]()}, } } diff --git a/pkg/connector/store/phonenumber.go b/pkg/connector/store/phonenumber.go new file mode 100644 index 00000000..cb7c674e --- /dev/null +++ b/pkg/connector/store/phonenumber.go @@ -0,0 +1,56 @@ +// mautrix-telegram - A Matrix-Telegram puppeting bridge. +// Copyright (C) 2025 Tulir Asokan +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package store + +import ( + "context" + "database/sql" + "errors" + + "go.mau.fi/util/dbutil" +) + +type PhoneNumberQuery struct { + db *dbutil.Database +} + +const ( + getEntityIDForPhoneNumber = "SELECT entity_id FROM telegram_phone_number WHERE phone_number=$1" + setPhoneNumberQuery = ` + INSERT INTO telegram_phone_number (phone_number, entity_id) + VALUES ($1, $2) + ON CONFLICT (phone_number) DO UPDATE SET entity_id=excluded.entity_id + ` + clearPhoneNumberQuery = "DELETE FROM telegram_phone_number WHERE entity_id=$1" +) + +func (s *PhoneNumberQuery) GetUserID(ctx context.Context, phoneNumber string) (userID int64, err error) { + err = s.db.QueryRow(ctx, getEntityIDForPhoneNumber, phoneNumber).Scan(&userID) + if errors.Is(err, sql.ErrNoRows) { + err = nil + } + return +} + +func (s *PhoneNumberQuery) Set(ctx context.Context, userID int64, phoneNumber string) (err error) { + if phoneNumber == "" { + _, err = s.db.Exec(ctx, clearPhoneNumberQuery, userID) + } else { + _, err = s.db.Exec(ctx, setPhoneNumberQuery, phoneNumber, userID) + } + return +} diff --git a/pkg/connector/store/scoped_store.go b/pkg/connector/store/scoped_store.go index 9a799359..9ddc8741 100644 --- a/pkg/connector/store/scoped_store.go +++ b/pkg/connector/store/scoped_store.go @@ -21,7 +21,6 @@ import ( "database/sql" "errors" "fmt" - "strings" "go.mau.fi/util/dbutil" @@ -71,27 +70,6 @@ const ( ON CONFLICT (user_id, entity_type, entity_id) DO UPDATE SET access_hash=excluded.access_hash ` deleteAccessHashesForUserQuery = "DELETE FROM telegram_access_hash WHERE user_id=$1" - - // User Username Queries - getUsernameQuery = "SELECT username FROM telegram_username WHERE entity_type=$1 AND entity_id=$2" - setUsernameQuery = ` - INSERT INTO telegram_username (username, entity_type, entity_id) - VALUES ($1, $2, $3) - ON CONFLICT (username) DO UPDATE SET - entity_type=excluded.entity_type, - entity_id=excluded.entity_id - ` - getByUsernameQuery = "SELECT entity_type, entity_id FROM telegram_username WHERE LOWER(username)=$1" - clearUsernameQuery = `DELETE FROM telegram_username WHERE entity_type=$1 AND entity_id=$2` - - // User Phone Number Queries - getEntityIDForPhoneNumber = "SELECT entity_id FROM telegram_phone_number WHERE phone_number=$1" - setPhoneNumberQuery = ` - INSERT INTO telegram_phone_number (phone_number, entity_id) - VALUES ($1, $2) - ON CONFLICT (phone_number) DO UPDATE SET entity_id=excluded.entity_id - ` - clearPhoneNumberQuery = "DELETE FROM telegram_phone_number WHERE entity_id=$1" ) var _ updates.StateStorage = (*ScopedStore)(nil) @@ -242,48 +220,6 @@ func (s *ScopedStore) DeleteAccessHashesForUser(ctx context.Context) (err error) return } -func (s *ScopedStore) GetUsername(ctx context.Context, entityType ids.PeerType, userID int64) (username string, err error) { - err = s.db.QueryRow(ctx, getUsernameQuery, entityType, userID).Scan(&username) - if errors.Is(err, sql.ErrNoRows) { - err = nil - } - return -} - -func (s *ScopedStore) SetUsername(ctx context.Context, entityType ids.PeerType, entityID int64, username string) (err error) { - if username == "" { - _, err = s.db.Exec(ctx, clearUsernameQuery, entityType, entityID) - } else { - _, err = s.db.Exec(ctx, setUsernameQuery, username, entityType, entityID) - } - return -} - -func (s *ScopedStore) GetEntityIDByUsername(ctx context.Context, username string) (entityType ids.PeerType, entityID int64, err error) { - err = s.db.QueryRow(ctx, getByUsernameQuery, strings.ToLower(username)).Scan(&entityType, &entityID) - if errors.Is(err, sql.ErrNoRows) { - err = nil - } - return -} - -func (s *ScopedStore) GetUserIDByPhoneNumber(ctx context.Context, phoneNumber string) (userID int64, err error) { - err = s.db.QueryRow(ctx, getEntityIDForPhoneNumber, phoneNumber).Scan(&userID) - if errors.Is(err, sql.ErrNoRows) { - err = nil - } - return -} - -func (s *ScopedStore) SetPhoneNumber(ctx context.Context, userID int64, phoneNumber string) (err error) { - if phoneNumber == "" { - _, err = s.db.Exec(ctx, clearPhoneNumberQuery, userID) - } else { - _, err = s.db.Exec(ctx, setPhoneNumberQuery, phoneNumber, userID) - } - return -} - // Helper Functions func (s *ScopedStore) assertUserIDMatches(userID int64) { diff --git a/pkg/connector/store/topic.go b/pkg/connector/store/topic.go new file mode 100644 index 00000000..ebb8b474 --- /dev/null +++ b/pkg/connector/store/topic.go @@ -0,0 +1,62 @@ +// mautrix-telegram - A Matrix-Telegram puppeting bridge. +// Copyright (C) 2025 Tulir Asokan +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package store + +import ( + "context" + + "go.mau.fi/util/dbutil" + "go.mau.fi/util/exsync" +) + +type topicKey struct { + ChannelID int64 + TopicID int +} + +type TopicQuery struct { + db *dbutil.Database + existingTopics *exsync.Set[topicKey] +} + +const ( + getAllTopicsQuery = `SELECT topic_id FROM telegram_topic WHERE channel_id=$1` + addTopicQuery = `INSERT INTO telegram_topic (channel_id, topic_id) VALUES ($1, $2) ON CONFLICT DO NOTHING` + deleteTopicQuery = `DELETE FROM telegram_topic WHERE channel_id=$1 AND topic_id=$2` +) + +func (s *TopicQuery) Add(ctx context.Context, channelID int64, topicID int) (err error) { + if channelID == 0 { + return nil + } + if s.existingTopics.Add(topicKey{ChannelID: channelID, TopicID: topicID}) { + _, err = s.db.Exec(ctx, addTopicQuery, channelID, topicID) + } + return +} + +func (s *TopicQuery) Delete(ctx context.Context, channelID int64, topicID int) (err error) { + s.existingTopics.Remove(topicKey{ChannelID: channelID, TopicID: topicID}) + _, err = s.db.Exec(ctx, deleteTopicQuery, channelID, topicID) + return +} + +var intScanner = dbutil.ConvertRowFn[int](dbutil.ScanSingleColumn[int]) + +func (s *TopicQuery) GetAll(ctx context.Context, channelID int64) (topics []int, err error) { + return intScanner.NewRowIter(s.db.Query(ctx, getAllTopicsQuery, channelID)).AsList() +} diff --git a/pkg/connector/store/upgrades/00-latest.sql b/pkg/connector/store/upgrades/00-latest.sql index 1c2ed722..556f4256 100644 --- a/pkg/connector/store/upgrades/00-latest.sql +++ b/pkg/connector/store/upgrades/00-latest.sql @@ -1,4 +1,4 @@ --- v0 -> v5 (compatible with v2+): Latest revision +-- v0 -> v6 (compatible with v2+): Latest revision CREATE TABLE telegram_user_state ( user_id BIGINT NOT NULL PRIMARY KEY, @@ -51,3 +51,10 @@ CREATE TABLE telegram_file ( mime_type TEXT, size BIGINT ); + +CREATE TABLE telegram_topic ( + channel_id BIGINT NOT NULL, + topic_id BIGINT NOT NULL, + + PRIMARY KEY (channel_id, topic_id) +); diff --git a/pkg/connector/store/upgrades/06-topic-index.sql b/pkg/connector/store/upgrades/06-topic-index.sql new file mode 100644 index 00000000..6bef3ee5 --- /dev/null +++ b/pkg/connector/store/upgrades/06-topic-index.sql @@ -0,0 +1,8 @@ +-- v6 (compatible with v2+): Add table for topics + +CREATE TABLE telegram_topic ( + channel_id BIGINT NOT NULL, + topic_id BIGINT NOT NULL, + + PRIMARY KEY (channel_id, topic_id) +); diff --git a/pkg/connector/store/username.go b/pkg/connector/store/username.go new file mode 100644 index 00000000..04fe2504 --- /dev/null +++ b/pkg/connector/store/username.go @@ -0,0 +1,70 @@ +// mautrix-telegram - A Matrix-Telegram puppeting bridge. +// Copyright (C) 2025 Tulir Asokan +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package store + +import ( + "context" + "database/sql" + "errors" + "strings" + + "go.mau.fi/util/dbutil" + + "go.mau.fi/mautrix-telegram/pkg/connector/ids" +) + +type UsernameQuery struct { + db *dbutil.Database +} + +const ( + getUsernameQuery = "SELECT username FROM telegram_username WHERE entity_type=$1 AND entity_id=$2" + setUsernameQuery = ` + INSERT INTO telegram_username (username, entity_type, entity_id) + VALUES ($1, $2, $3) + ON CONFLICT (username) DO UPDATE SET + entity_type=excluded.entity_type, + entity_id=excluded.entity_id + ` + getByUsernameQuery = "SELECT entity_type, entity_id FROM telegram_username WHERE LOWER(username)=$1" + clearUsernameQuery = `DELETE FROM telegram_username WHERE entity_type=$1 AND entity_id=$2` +) + +func (s *UsernameQuery) Get(ctx context.Context, entityType ids.PeerType, userID int64) (username string, err error) { + err = s.db.QueryRow(ctx, getUsernameQuery, entityType, userID).Scan(&username) + if errors.Is(err, sql.ErrNoRows) { + err = nil + } + return +} + +func (s *UsernameQuery) Set(ctx context.Context, entityType ids.PeerType, entityID int64, username string) (err error) { + if username == "" { + _, err = s.db.Exec(ctx, clearUsernameQuery, entityType, entityID) + } else { + _, err = s.db.Exec(ctx, setUsernameQuery, username, entityType, entityID) + } + return +} + +func (s *UsernameQuery) GetEntityID(ctx context.Context, username string) (entityType ids.PeerType, entityID int64, err error) { + err = s.db.QueryRow(ctx, getByUsernameQuery, strings.ToLower(username)).Scan(&entityType, &entityID) + if errors.Is(err, sql.ErrNoRows) { + err = nil + } + return +} diff --git a/pkg/connector/sync.go b/pkg/connector/sync.go index 970112db..207b1633 100644 --- a/pkg/connector/sync.go +++ b/pkg/connector/sync.go @@ -69,7 +69,7 @@ func (t *TelegramClient) resetPinnedDialogs(ctx context.Context, dialogs []tg.Di t.userLogin.Metadata.(*UserLoginMetadata).PinnedDialogs = nil for _, dialog := range dialogs { if dialog.GetPinned() { - portalKey := t.makePortalKeyFromPeer(dialog.GetPeer()) + portalKey := t.makePortalKeyFromPeer(dialog.GetPeer(), 0) t.userLogin.Metadata.(*UserLoginMetadata).PinnedDialogs = append(t.userLogin.Metadata.(*UserLoginMetadata).PinnedDialogs, portalKey.ID) } } @@ -105,7 +105,7 @@ func (t *TelegramClient) handleDialogs(ctx context.Context, dialogs tg.ModifiedM Logger() log.Debug().Msg("Syncing dialog") - portalKey := t.makePortalKeyFromPeer(dialog.GetPeer()) + portalKey := t.makePortalKeyFromPeer(dialog.GetPeer(), 0) portal, err := t.main.Bridge.GetPortalByKey(ctx, portalKey) if err != nil { return err @@ -168,7 +168,7 @@ func (t *TelegramClient) handleDialogs(ctx context.Context, dialogs tg.ModifiedM continue } var mfm *memberFetchMeta - chatInfo, mfm, err = t.wrapChatInfo(channel) + chatInfo, mfm, err = t.wrapChatInfo(portal.ID, channel) if err != nil { return fmt.Errorf("failed to get chat info for %s: %w", portalKey, err) } diff --git a/pkg/connector/tomatrix.go b/pkg/connector/tomatrix.go index 89914e4b..e0088cbe 100644 --- a/pkg/connector/tomatrix.go +++ b/pkg/connector/tomatrix.go @@ -143,7 +143,7 @@ func (c *TelegramClient) convertToMatrix( } var perMessageProfile *event.BeeperPerMessageProfile - if peerType, _, err := ids.ParsePortalID(portal.ID); err != nil { + if peerType, _, _, err := ids.ParsePortalID(portal.ID); err != nil { return nil, fmt.Errorf("failed to parse portal ID: %w", err) } else if peerType == ids.PeerTypeChannel && !portal.Metadata.(*PortalMetadata).IsSuperGroup { var sender *networkid.UserID @@ -210,11 +210,13 @@ func (c *TelegramClient) convertToMatrix( if replyTo, ok := msg.GetReplyTo(); ok { switch replyTo := replyTo.(type) { case *tg.MessageReplyHeader: - cm.ReplyTo = &networkid.MessageOptionalPartID{} - if peerID, present := replyTo.GetReplyToPeerID(); present { - cm.ReplyTo.MessageID = ids.MakeMessageID(peerID, replyTo.ReplyToMsgID) - } else { - cm.ReplyTo.MessageID = ids.MakeMessageID(portal.PortalKey, replyTo.ReplyToMsgID) + if (replyTo.ReplyToTopID != 0 || !replyTo.ForumTopic) && replyTo.ReplyToTopID != replyTo.ReplyToMsgID { + cm.ReplyTo = &networkid.MessageOptionalPartID{} + if peerID, present := replyTo.GetReplyToPeerID(); present { + cm.ReplyTo.MessageID = ids.MakeMessageID(peerID, replyTo.ReplyToMsgID) + } else { + cm.ReplyTo.MessageID = ids.MakeMessageID(portal.PortalKey, replyTo.ReplyToMsgID) + } } default: log.Warn().Type("reply_to", replyTo).Msg("unhandled reply to type") @@ -529,7 +531,7 @@ func (c *TelegramClient) convertMediaRequiringUpload( if err != nil { if tgerr.Is(err, tg.ErrFileReferenceExpired) && allowRefetch { log.Warn().Err(err).Msg("Failed to transfer media, trying to refetch from message") - peerType, peerID, err := ids.ParsePortalID(portal.ID) + peerType, peerID, _, err := ids.ParsePortalID(portal.ID) if err != nil { log.Err(err).Msg("Failed to parse portal ID to refetch media") } else if msgMedia, err = c.refetchMedia(ctx, peerType, peerID, msgID); err != nil { diff --git a/pkg/connector/userinfo.go b/pkg/connector/userinfo.go index b7d3094c..f4269be7 100644 --- a/pkg/connector/userinfo.go +++ b/pkg/connector/userinfo.go @@ -56,7 +56,7 @@ func (t *TelegramClient) wrapChannelGhostInfo(ctx context.Context, channel *tg.C var identifiers []string if username, set := channel.GetUsername(); set { - err = t.ScopedStore.SetUsername(ctx, ids.PeerTypeChannel, channel.ID, username) + err = t.main.Store.Username.Set(ctx, ids.PeerTypeChannel, channel.ID, username) if err != nil { return nil, err } @@ -88,7 +88,7 @@ func (t *TelegramClient) wrapUserInfo(ctx context.Context, u tg.UserClass) (*bri } } - if err := t.ScopedStore.SetUsername(ctx, ids.PeerTypeUser, user.ID, user.Username); err != nil { + if err := t.main.Store.Username.Set(ctx, ids.PeerTypeUser, user.ID, user.Username); err != nil { return nil, err } @@ -101,7 +101,7 @@ func (t *TelegramClient) wrapUserInfo(ctx context.Context, u tg.UserClass) (*bri if phone, ok := user.GetPhone(); ok { normalized := strings.TrimPrefix(phone, "+") identifiers = append(identifiers, fmt.Sprintf("tel:+%s", normalized)) - if err := t.ScopedStore.SetPhoneNumber(ctx, user.ID, normalized); err != nil { + if err := t.main.Store.PhoneNumber.Set(ctx, user.ID, normalized); err != nil { return nil, err } }