diff --git a/go.mod b/go.mod
index 63de0f9d..e198db5a 100644
--- a/go.mod
+++ b/go.mod
@@ -41,7 +41,7 @@ require (
golang.org/x/sync v0.18.0
golang.org/x/tools v0.39.0
gopkg.in/yaml.v3 v3.0.1
- maunium.net/go/mautrix v0.26.1-0.20251203195941-02ce6ff91851
+ maunium.net/go/mautrix v0.26.1-0.20251206105112-4efa4bdac5e3
rsc.io/qr v0.2.0
)
diff --git a/go.sum b/go.sum
index aaebb1ad..d12a1127 100644
--- a/go.sum
+++ b/go.sum
@@ -237,7 +237,7 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M=
maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA=
-maunium.net/go/mautrix v0.26.1-0.20251203195941-02ce6ff91851 h1:5dty5IkJGxpLj0SQ2+wwKIcrPfZML1uHFcGaQIA9te0=
-maunium.net/go/mautrix v0.26.1-0.20251203195941-02ce6ff91851/go.mod h1:NaesYcOQWFDbixVYywCVS+Twlzab9hOUpFNlCBlvciE=
+maunium.net/go/mautrix v0.26.1-0.20251206105112-4efa4bdac5e3 h1:llPUQswvRVaWkWqwH1P6T51wkj3fWCXu4rxewN0RLsY=
+maunium.net/go/mautrix v0.26.1-0.20251206105112-4efa4bdac5e3/go.mod h1:NaesYcOQWFDbixVYywCVS+Twlzab9hOUpFNlCBlvciE=
rsc.io/qr v0.2.0 h1:6vBLea5/NRMVTz8V66gipeLycZMl/+UlFmk8DvqQ6WY=
rsc.io/qr v0.2.0/go.mod h1:IF+uZjkb9fqyeF/4tlBoynqmQxUoPfWEKh921coOuXs=
diff --git a/pkg/connector/backfill.go b/pkg/connector/backfill.go
index 9b18a24e..28399755 100644
--- a/pkg/connector/backfill.go
+++ b/pkg/connector/backfill.go
@@ -29,6 +29,7 @@ import (
"maunium.net/go/mautrix/bridgev2/networkid"
"go.mau.fi/mautrix-telegram/pkg/connector/ids"
+ "go.mau.fi/mautrix-telegram/pkg/gotd/bin"
"go.mau.fi/mautrix-telegram/pkg/gotd/tg"
"go.mau.fi/mautrix-telegram/pkg/gotd/tgerr"
)
@@ -106,7 +107,7 @@ func (t *TelegramClient) takeoutDialogs(ctx context.Context, takeoutID int64) er
}
if t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor != "" {
var err error
- req.OffsetPeer, err = t.inputPeerForPortalID(ctx, t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor)
+ req.OffsetPeer, _, err = t.inputPeerForPortalID(ctx, t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor)
if err != nil {
return fmt.Errorf("failed to get input peer for pagination: %w", err)
}
@@ -150,7 +151,7 @@ func (t *TelegramClient) takeoutDialogs(ctx context.Context, takeoutID int64) er
return fmt.Errorf("failed to handle dialogs: %w", err)
}
- portalKey := t.makePortalKeyFromPeer(dialogs.GetDialogs()[len(dialogs.GetDialogs())-1].GetPeer())
+ portalKey := t.makePortalKeyFromPeer(dialogs.GetDialogs()[len(dialogs.GetDialogs())-1].GetPeer(), 0)
if t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor == portalKey.ID {
t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlDone = true
@@ -164,7 +165,7 @@ func (t *TelegramClient) takeoutDialogs(ctx context.Context, takeoutID int64) er
return fmt.Errorf("failed to save user login: %w", err)
}
- req.OffsetPeer, err = t.inputPeerForPortalID(ctx, portalKey.ID)
+ req.OffsetPeer, _, err = t.inputPeerForPortalID(ctx, portalKey.ID)
if err != nil {
return fmt.Errorf("failed to get input peer for pagination: %w", err)
}
@@ -206,43 +207,57 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2
}()
}
- peer, err := t.inputPeerForPortalID(ctx, fetchParams.Portal.ID)
+ peer, topicID, err := t.inputPeerForPortalID(ctx, fetchParams.Portal.ID)
if err != nil {
return nil, err
}
- req := tg.MessagesGetHistoryRequest{
- Peer: peer,
- Limit: fetchParams.Count,
- }
+ var minID, offsetID int
if fetchParams.AnchorMessage != nil {
if fetchParams.Forward {
- _, req.MinID, err = ids.ParseMessageID(fetchParams.AnchorMessage.ID)
+ _, minID, err = ids.ParseMessageID(fetchParams.AnchorMessage.ID)
} else {
- _, req.OffsetID, err = ids.ParseMessageID(fetchParams.AnchorMessage.ID)
+ _, offsetID, err = ids.ParseMessageID(fetchParams.AnchorMessage.ID)
}
if err != nil {
return nil, err
}
}
+ if fetchParams.Portal.Metadata.(*PortalMetadata).IsForumGeneral {
+ topicID = 1
+ }
+ var req bin.Object
+ if topicID == ids.TopicIDSpaceRoom {
+ return nil, nil
+ } else if topicID > 0 {
+ req = &tg.MessagesGetRepliesRequest{
+ Peer: peer,
+ MsgID: topicID,
+ Limit: fetchParams.Count,
+ MinID: minID,
+ OffsetID: offsetID,
+ }
+ } else {
+ req = &tg.MessagesGetHistoryRequest{
+ Peer: peer,
+ Limit: fetchParams.Count,
+ MinID: minID,
+ OffsetID: offsetID,
+ }
+ }
+ if !fetchParams.Forward {
+ req = &tg.InvokeWithTakeoutRequest{TakeoutID: takeoutID, Query: req}
+ }
log.Info().Any("req", req).Msg("Fetching messages")
msgs, err := APICallWithUpdates(ctx, t, func() (tg.ModifiedMessagesMessages, error) {
- var rawMsgs tg.MessagesMessagesClass
- if fetchParams.Forward {
- rawMsgs, err = t.client.API().MessagesGetHistory(ctx, &req)
- } else {
- var messages tg.MessagesMessagesBox
- err = t.client.Invoke(ctx,
- &tg.InvokeWithTakeoutRequest{TakeoutID: takeoutID, Query: &req},
- &messages)
- rawMsgs = messages.Messages
- }
+ var box tg.MessagesMessagesBox
+ err = t.client.Invoke(ctx, req, &box)
if err != nil {
return nil, err
}
- msgs, ok := rawMsgs.(tg.ModifiedMessagesMessages)
+ msgs, ok := box.Messages.(tg.ModifiedMessagesMessages)
if !ok {
- return nil, fmt.Errorf("unsupported messages type %T", rawMsgs)
+ return nil, fmt.Errorf("unsupported messages type %T", box.Messages)
}
return msgs, nil
})
@@ -251,11 +266,7 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2
}
messages := msgs.GetMessages()
-
- portal, err := t.main.Bridge.GetPortalByKey(ctx, fetchParams.Portal.PortalKey)
- if err != nil {
- return nil, err
- }
+ portal := fetchParams.Portal
// If the first message is the last read message, mark the chat as read
// during backfill.
@@ -363,7 +374,7 @@ func (c *TelegramClient) GetBackfillMaxBatchCount(ctx context.Context, portal *b
log := zerolog.Ctx(ctx).With().
Str("method", "GetBackfillMaxBatchCount").
Logger()
- peerType, _, err := ids.ParsePortalID(portal.ID)
+ peerType, _, topicID, err := ids.ParsePortalID(portal.ID)
if err != nil {
log.Err(err).Msg("failed to parse portal ID")
return 0
@@ -374,7 +385,11 @@ func (c *TelegramClient) GetBackfillMaxBatchCount(ctx context.Context, portal *b
case ids.PeerTypeChat:
return c.main.Bridge.Config.Backfill.Queue.GetOverride("normal_group")
case ids.PeerTypeChannel:
- if portal.Metadata.(*PortalMetadata).IsSuperGroup {
+ if topicID == ids.TopicIDSpaceRoom {
+ return 0
+ } else if topicID > 0 {
+ return c.main.Bridge.Config.Backfill.Queue.GetOverride("topic", "supergroup")
+ } else if portal.Metadata.(*PortalMetadata).IsSuperGroup {
return c.main.Bridge.Config.Backfill.Queue.GetOverride("supergroup")
} else {
return c.main.Bridge.Config.Backfill.Queue.GetOverride("channel")
diff --git a/pkg/connector/capabilities.go b/pkg/connector/capabilities.go
index 1a1da104..7a1ec175 100644
--- a/pkg/connector/capabilities.go
+++ b/pkg/connector/capabilities.go
@@ -229,8 +229,9 @@ func (t *TelegramClient) GetCapabilities(ctx context.Context, portal *bridgev2.P
Timers: telegramTimers,
},
State: event.StateFeatureMap{
- event.StateRoomName.Type: {Level: event.CapLevelFullySupported},
- event.StateRoomAvatar.Type: {Level: event.CapLevelFullySupported},
+ event.StateRoomName.Type: {Level: event.CapLevelFullySupported},
+ event.StateRoomAvatar.Type: {Level: event.CapLevelFullySupported},
+ event.StateBeeperDisappearingTimer.Type: {Level: event.CapLevelFullySupported},
},
}
// TODO non-admins can only edit messages within 48 hours
@@ -258,17 +259,29 @@ func (t *TelegramClient) GetCapabilities(ctx context.Context, portal *bridgev2.P
feat.ReactionCount = 3
}
portalMetadata := portal.Metadata.(*PortalMetadata)
- peerType, _, _ := ids.ParsePortalID(portal.ID)
+ peerType, _, topicID, _ := ids.ParsePortalID(portal.ID)
+ if topicID > 0 {
+ baseID += "+topic"
+ // TODO do topics have other changes?
+ delete(feat.State, event.StateRoomAvatar.Type)
+ delete(feat.State, event.StateBeeperDisappearingTimer.Type)
+ feat.DisappearingTimer = nil
+ } else if topicID == ids.TopicIDSpaceRoom {
+ baseID += "+spaceroom"
+ feat = &event.RoomFeatures{}
+ }
switch portal.RoomType {
case database.RoomTypeDM:
baseID += "+dm"
feat.DeleteChat = true
feat.DeleteChatForEveryone = true
- feat.State = nil
+ feat.State = event.StateFeatureMap{
+ event.StateBeeperDisappearingTimer.Type: {Level: event.CapLevelFullySupported},
+ }
default:
// Group creators can delete the chat for everyone, unless it's a large channel
- if peerType == ids.PeerTypeChat || portalMetadata.ParticipantsCount < 1000 {
+ if peerType == ids.PeerTypeChat || portalMetadata.ParticipantsCount < 1000 || topicID > 0 {
baseID += "+deletablegroup"
feat.DeleteChatForEveryone = true
}
diff --git a/pkg/connector/chatinfo.go b/pkg/connector/chatinfo.go
index 08ea920f..dce850c3 100644
--- a/pkg/connector/chatinfo.go
+++ b/pkg/connector/chatinfo.go
@@ -40,6 +40,7 @@ var (
modPowerLevel = ptr.Ptr(50)
superadminPowerLevel = ptr.Ptr(75)
creatorPowerLevel = ptr.Ptr(95)
+ nobodyPowerLevel = ptr.Ptr(99)
otherPowerLevel = ptr.Ptr(40)
anonymousPowerLevel = ptr.Ptr(41)
@@ -135,9 +136,10 @@ type memberFetchMeta struct {
Input *tg.InputChannel
IsBroadcast bool
ParticipantsHidden bool
+ IsForum bool
}
-func (t *TelegramClient) wrapChatInfo(rawChat tg.ChatClass) (*bridgev2.ChatInfo, *memberFetchMeta, error) {
+func (t *TelegramClient) wrapChatInfo(portalID networkid.PortalID, rawChat tg.ChatClass) (*bridgev2.ChatInfo, *memberFetchMeta, error) {
info := bridgev2.ChatInfo{
Type: ptr.Ptr(database.RoomTypeDefault),
CanBackfill: true,
@@ -147,8 +149,8 @@ func (t *TelegramClient) wrapChatInfo(rawChat tg.ChatClass) (*bridgev2.ChatInfo,
},
ExcludeChangesFromTimeline: true,
}
- var isMegagroup, isBroadcast, left bool
- var channelInput *tg.InputChannel
+ var mfm memberFetchMeta
+ var isMegagroup, isForumGeneral, left bool
var avatarErr error
switch chat := rawChat.(type) {
case *tg.Chat:
@@ -158,13 +160,28 @@ func (t *TelegramClient) wrapChatInfo(rawChat tg.ChatClass) (*bridgev2.ChatInfo,
info.Members.PowerLevels = t.getPowerLevelOverridesFromBannedRights(chat, chat.DefaultBannedRights)
left = chat.Left
case *tg.Channel:
- channelInput = chat.AsInput()
+ mfm.Input = chat.AsInput()
+ mfm.IsBroadcast = chat.Broadcast
info.Name = &chat.Title
info.Members.TotalMemberCount = chat.ParticipantsCount
isMegagroup = chat.Megagroup
- isBroadcast = chat.Broadcast
info.Avatar, avatarErr = t.convertChatPhoto(chat.AsInputPeer(), chat.Photo)
info.Members.PowerLevels = t.getPowerLevelOverridesFromBannedRights(chat, chat.DefaultBannedRights)
+ _, _, topicID, _ := ids.ParsePortalID(portalID)
+ if chat.Forum {
+ if topicID == ids.TopicIDSpaceRoom {
+ info.Type = ptr.Ptr(database.RoomTypeSpace)
+ } else if topicID == 0 {
+ isForumGeneral = true
+ info.Name = ptr.Ptr("#General - " + *info.Name)
+ }
+ if topicID != ids.TopicIDSpaceRoom {
+ info.ParentID = ptr.Ptr(ids.MakeForumParentPortalID(chat.ID))
+ }
+ mfm.IsForum = true
+ } else if topicID != 0 {
+ return nil, nil, fmt.Errorf("channel %d is not a forum, cannot have topics", chat.GetID())
+ }
left = chat.Left
if chat.Broadcast {
info.Members.MemberMap.Set(bridgev2.ChatMember{
@@ -188,13 +205,21 @@ func (t *TelegramClient) wrapChatInfo(rawChat tg.ChatClass) (*bridgev2.ChatInfo,
meta := portal.Metadata.(*PortalMetadata)
_ = updatePortalLastSyncAt(ctx, portal)
changed := meta.SetIsSuperGroup(isMegagroup)
+ changed = meta.SetIsForumGeneral(isForumGeneral) || changed
if info.Members.TotalMemberCount != 0 && meta.ParticipantsCount != info.Members.TotalMemberCount {
meta.ParticipantsCount = info.Members.TotalMemberCount
changed = true
}
return changed
}
- return &info, &memberFetchMeta{Input: channelInput, IsBroadcast: isBroadcast}, nil
+ return &info, &mfm, nil
+}
+
+func (t *TelegramClient) overrideChatInfoWithTopic(info *bridgev2.ChatInfo, topic *tg.ForumTopic) {
+ info.Name = ptr.Ptr(topic.Title + " - " + *info.Name)
+ if topic.Closed {
+ info.Members.PowerLevels.EventsDefault = nobodyPowerLevel
+ }
}
func (t *TelegramClient) getChannelParticipants(ctx context.Context, req *tg.ChannelsGetParticipantsRequest) (*tg.ChannelsChannelParticipants, error) {
@@ -271,7 +296,7 @@ func (t *TelegramClient) fillUserLocalMeta(info *bridgev2.ChatInfo, dialog *tg.D
}
}
-func (t *TelegramClient) wrapFullChatInfo(fullChat *tg.MessagesChatFull) (*bridgev2.ChatInfo, *memberFetchMeta, error) {
+func (t *TelegramClient) wrapFullChatInfo(portalID networkid.PortalID, fullChat *tg.MessagesChatFull) (*bridgev2.ChatInfo, *memberFetchMeta, error) {
var chat tg.ChatClass
for _, c := range fullChat.GetChats() {
if c.GetID() == fullChat.FullChat.GetID() {
@@ -283,7 +308,7 @@ func (t *TelegramClient) wrapFullChatInfo(fullChat *tg.MessagesChatFull) (*bridg
return nil, nil, fmt.Errorf("chat ID %d not found in full chat", fullChat.FullChat.GetID())
}
- info, mfm, err := t.wrapChatInfo(chat)
+ info, mfm, err := t.wrapChatInfo(portalID, chat)
if err != nil {
return nil, nil, err
}
@@ -437,7 +462,7 @@ func (t *TelegramClient) filterChannelParticipants(participants []tg.ChannelPart
}
func (t *TelegramClient) GetChatInfo(ctx context.Context, portal *bridgev2.Portal) (*bridgev2.ChatInfo, error) {
- peerType, id, err := ids.ParsePortalID(portal.ID)
+ peerType, id, topicID, err := ids.ParsePortalID(portal.ID)
if err != nil {
return nil, err
}
@@ -452,21 +477,41 @@ func (t *TelegramClient) GetChatInfo(ctx context.Context, portal *bridgev2.Porta
if err != nil {
return nil, err
}
- info, _, err := t.wrapFullChatInfo(fullChat)
+ info, _, err := t.wrapFullChatInfo(portal.ID, fullChat)
return info, err
case ids.PeerTypeChannel:
accessHash, err := t.ScopedStore.GetAccessHash(ctx, ids.PeerTypeChannel, id)
if err != nil {
return nil, fmt.Errorf("failed to get channel access hash: %w", err)
}
- inputChannel := &tg.InputChannel{ChannelID: id, AccessHash: accessHash}
+ if topicID > 0 {
+ resp, err := APICallWithUpdates(ctx, t, func() (*tg.MessagesForumTopics, error) {
+ return t.client.API().MessagesGetForumTopicsByID(ctx, &tg.MessagesGetForumTopicsByIDRequest{
+ Peer: &tg.InputPeerChannel{ChannelID: id, AccessHash: accessHash},
+ Topics: []int{topicID},
+ })
+ })
+ if err != nil {
+ return nil, err
+ }
+ channel, topic, err := getTopicInfoFromResponse(resp, id, topicID)
+ if err != nil {
+ return nil, err
+ }
+ info, _, err := t.wrapChatInfo(portal.ID, channel)
+ if err != nil {
+ return nil, err
+ }
+ t.overrideChatInfoWithTopic(info, topic)
+ return info, nil
+ }
fullChat, err := APICallWithUpdates(ctx, t, func() (*tg.MessagesChatFull, error) {
- return t.client.API().ChannelsGetFullChannel(ctx, inputChannel)
+ return t.client.API().ChannelsGetFullChannel(ctx, &tg.InputChannel{ChannelID: id, AccessHash: accessHash})
})
if err != nil {
return nil, err
}
- info, mfm, err := t.wrapFullChatInfo(fullChat)
+ info, mfm, err := t.wrapFullChatInfo(portal.ID, fullChat)
if err != nil {
return nil, err
}
@@ -480,6 +525,35 @@ func (t *TelegramClient) GetChatInfo(ctx context.Context, portal *bridgev2.Porta
}
}
+func getTopicInfoFromResponse(resp *tg.MessagesForumTopics, channelID int64, topicID int) (channel *tg.Channel, topic *tg.ForumTopic, err error) {
+ var ok bool
+ for _, ch := range resp.GetChats() {
+ if ch.GetID() == channelID {
+ channel, ok = ch.(*tg.Channel)
+ if !ok {
+ return nil, nil, fmt.Errorf("chat ID %d is %T not *tg.Channel", channelID, ch)
+ }
+ break
+ }
+ }
+ if channel == nil {
+ return nil, nil, fmt.Errorf("channel ID %d not found in chats", channelID)
+ }
+ for _, tp := range resp.GetTopics() {
+ if tp.GetID() == topicID {
+ topic, ok = tp.(*tg.ForumTopic)
+ if !ok {
+ return nil, nil, fmt.Errorf("topic ID %d is %T not *tg.ForumTopic", topicID, tp)
+ }
+ break
+ }
+ }
+ if topic == nil {
+ return nil, nil, fmt.Errorf("topic ID %d not found in topics", topicID)
+ }
+ return
+}
+
func (t *TelegramClient) getDMPowerLevels(ghost *bridgev2.Ghost) *bridgev2.PowerLevelOverrides {
var plo bridgev2.PowerLevelOverrides
if ghost.Metadata.(*GhostMetadata).Blocked {
diff --git a/pkg/connector/client.go b/pkg/connector/client.go
index f6a3d445..31579a49 100644
--- a/pkg/connector/client.go
+++ b/pkg/connector/client.go
@@ -138,7 +138,7 @@ func (u UpdateDispatcher) Handle(ctx context.Context, updates tg.UpdatesClass) e
return u.UpdateDispatcher.Handle(ctx, updates)
}
-var messageLinkRegex = regexp.MustCompile(`^https?://t(?:elegram)?\.(?:me|dog)/([A-Za-z][A-Za-z0-9_]{3,31}[A-Za-z0-9]|[Cc]/[0-9]{1,20})/([0-9]{1,20})$`)
+var messageLinkRegex = regexp.MustCompile(`^https?://t(?:elegram)?\.(?:me|dog)/([A-Za-z][A-Za-z0-9_]{3,31}[A-Za-z0-9]|[Cc]/[0-9]{1,20})/([0-9]{1,20})(?:/([0-9]{1,20}))?$`)
func (tg *TelegramConnector) deviceConfig() telegram.DeviceConfig {
return telegram.DeviceConfig{
@@ -221,24 +221,24 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge
return client.onMessageEdit(ctx, update)
})
dispatcher.OnUserTyping(func(ctx context.Context, e tg.Entities, update *tg.UpdateUserTyping) error {
- return client.handleTyping(client.makePortalKeyFromID(ids.PeerTypeUser, update.UserID), client.senderForUserID(update.UserID), update.Action)
+ return client.handleTyping(client.makePortalKeyFromID(ids.PeerTypeUser, update.UserID, 0), client.senderForUserID(update.UserID), update.Action)
})
dispatcher.OnChatUserTyping(func(ctx context.Context, e tg.Entities, update *tg.UpdateChatUserTyping) error {
if update.FromID.TypeID() != tg.PeerUserTypeID {
log.Warn().Str("from_id_type", update.FromID.TypeName()).Msg("unsupported from_id type")
return nil
}
- return client.handleTyping(client.makePortalKeyFromID(ids.PeerTypeChat, update.ChatID), client.getPeerSender(update.FromID), update.Action)
+ return client.handleTyping(client.makePortalKeyFromID(ids.PeerTypeChat, update.ChatID, 0), client.getPeerSender(update.FromID), update.Action)
})
dispatcher.OnChannelUserTyping(func(ctx context.Context, e tg.Entities, update *tg.UpdateChannelUserTyping) error {
- return client.handleTyping(client.makePortalKeyFromID(ids.PeerTypeChannel, update.ChannelID), client.getPeerSender(update.FromID), update.Action)
+ return client.handleTyping(client.makePortalKeyFromID(ids.PeerTypeChannel, update.ChannelID, update.TopMsgID), client.getPeerSender(update.FromID), update.Action)
})
dispatcher.OnReadHistoryOutbox(client.updateReadReceipt)
dispatcher.OnReadHistoryInbox(func(ctx context.Context, e tg.Entities, update *tg.UpdateReadHistoryInbox) error {
- return client.onOwnReadReceipt(client.makePortalKeyFromPeer(update.Peer), update.MaxID)
+ return client.onOwnReadReceipt(client.makePortalKeyFromPeer(update.Peer, update.TopMsgID), update.MaxID)
})
dispatcher.OnReadChannelInbox(func(ctx context.Context, e tg.Entities, update *tg.UpdateReadChannelInbox) error {
- return client.onOwnReadReceipt(client.makePortalKeyFromID(ids.PeerTypeChannel, update.ChannelID), update.MaxID)
+ return client.onOwnReadReceipt(client.makePortalKeyFromID(ids.PeerTypeChannel, update.ChannelID, 0), update.MaxID)
})
dispatcher.OnNotifySettings(client.onNotifySettings)
dispatcher.OnPinnedDialogs(client.onPinnedDialogs)
@@ -249,15 +249,18 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge
client.updatesManager = updates.New(updates.Config{
OnChannelTooLong: func(channelID int64) error {
+ // TODO resync topics?
res := tc.Bridge.QueueRemoteEvent(login, &simplevent.ChatResync{
EventMeta: simplevent.EventMeta{
Type: bridgev2.RemoteEventChatResync,
LogContext: func(c zerolog.Context) zerolog.Context {
return c.Str("update", "channel_too_long").Int64("channel_id", channelID)
},
- PortalKey: client.makePortalKeyFromID(ids.PeerTypeChannel, channelID),
+ PortalKey: client.makePortalKeyFromID(ids.PeerTypeChannel, channelID, 0),
+ },
+ CheckNeedsBackfillFunc: func(ctx context.Context, latestMessage *database.Message) (bool, error) {
+ return true, nil
},
- CheckNeedsBackfillFunc: func(ctx context.Context, latestMessage *database.Message) (bool, error) { return true, nil },
})
return resultToError(res)
@@ -296,7 +299,7 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge
return userInfo, nil
},
GetUserInfoByUsername: func(ctx context.Context, username string) (telegramfmt.UserInfo, error) {
- if peerType, userID, err := client.ScopedStore.GetEntityIDByUsername(ctx, username); err != nil {
+ if peerType, userID, err := client.main.Store.Username.GetEntityID(ctx, username); err != nil {
return telegramfmt.UserInfo{}, err
} else if peerType != ids.PeerTypeUser {
return telegramfmt.UserInfo{}, fmt.Errorf("unexpected peer type: %s", peerType)
@@ -330,7 +333,17 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge
log.Err(err).Msg("error parsing message ID")
return url
}
- log = log.With().Str("group", group).Int("msg_id", msgID).Logger()
+ var topicID int
+ if len(submatches) == 4 && submatches[3] != "" {
+ lastID, err := strconv.Atoi(submatches[3])
+ if err != nil {
+ log.Err(err).Msg("error parsing actual message ID")
+ return url
+ }
+ topicID = msgID
+ msgID = lastID
+ }
+ log = log.With().Str("group", group).Int("topic_id", topicID).Int("msg_id", msgID).Logger()
var portalKey networkid.PortalKey
if strings.HasPrefix(group, "C/") || strings.HasPrefix(group, "c/") {
@@ -339,16 +352,16 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge
log.Err(err).Msg("error parsing channel ID")
return url
}
- portalKey = client.makePortalKeyFromID(ids.PeerTypeChannel, chatID)
+ portalKey = client.makePortalKeyFromID(ids.PeerTypeChannel, chatID, topicID)
} else if submatches[1] == "premium" {
- portalKey = client.makePortalKeyFromID(ids.PeerTypeUser, 777000)
+ portalKey = client.makePortalKeyFromID(ids.PeerTypeUser, 777000, 0)
} else if userID, err := strconv.ParseInt(submatches[1], 10, 64); err == nil && userID > 0 {
- portalKey = client.makePortalKeyFromID(ids.PeerTypeUser, userID)
- } else if peerType, peerID, err := client.ScopedStore.GetEntityIDByUsername(ctx, submatches[1]); err != nil {
+ portalKey = client.makePortalKeyFromID(ids.PeerTypeUser, userID, 0)
+ } else if peerType, peerID, err := client.main.Store.Username.GetEntityID(ctx, submatches[1]); err != nil {
log.Err(err).Msg("Failed to get entity ID by username")
return url
} else if peerType != "" {
- portalKey = client.makePortalKeyFromID(peerType, peerID)
+ portalKey = client.makePortalKeyFromID(peerType, peerID, topicID)
} else {
return url
}
@@ -382,7 +395,7 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge
return "", "", 0, false
} else if accessHash, err := client.ScopedStore.GetAccessHash(ctx, peerType, telegramUserID); err != nil || accessHash == 0 {
return "", "", 0, false
- } else if username, err := client.ScopedStore.GetUsername(ctx, peerType, telegramUserID); err != nil {
+ } else if username, err := client.main.Store.Username.Get(ctx, peerType, telegramUserID); err != nil {
return "", "", 0, false
} else {
return userID, username, accessHash, true
diff --git a/pkg/connector/directdownload.go b/pkg/connector/directdownload.go
index 228b0b9f..61783314 100644
--- a/pkg/connector/directdownload.go
+++ b/pkg/connector/directdownload.go
@@ -109,7 +109,7 @@ func (tc *TelegramConnector) Download(ctx context.Context, mediaID networkid.Med
return nil, fmt.Errorf("failed to get user login: %w", err)
}
- logins, err := tc.Bridge.GetUserLoginsInPortal(ctx, ids.PeerTypeChannel.InternalAsPortalKey(info.PeerID, ""))
+ logins, err := tc.Bridge.GetUserLoginsInPortal(ctx, ids.InternalMakePortalKey(ids.PeerTypeChannel, info.PeerID, 0, ""))
if err != nil {
return nil, err
} else if len(logins) == 0 {
diff --git a/pkg/connector/handlematrix.go b/pkg/connector/handlematrix.go
index b9cbfb2d..d1d7f118 100644
--- a/pkg/connector/handlematrix.go
+++ b/pkg/connector/handlematrix.go
@@ -94,8 +94,10 @@ func (t *TelegramClient) HandleMatrixViewingChat(ctx context.Context, msg *bridg
if msg.Portal == nil {
return nil
}
+ _, _, topicID, _ := ids.ParsePortalID(msg.Portal.PortalKey.ID)
+ // TODO sync topic parent space
meta := msg.Portal.Metadata.(*PortalMetadata)
- if !meta.FullSynced || meta.LastSync.Add(24*time.Hour).Before(time.Now()) {
+ if (topicID == 0 && !meta.FullSynced) || meta.LastSync.Add(24*time.Hour).Before(time.Now()) {
t.userLogin.QueueRemoteEvent(&simplevent.ChatResync{
EventMeta: simplevent.EventMeta{
Type: bridgev2.RemoteEventChatResync,
@@ -255,17 +257,23 @@ func parseRandomID(txnID networkid.RawTransactionID) int64 {
}
func (t *TelegramClient) HandleMatrixMessage(ctx context.Context, msg *bridgev2.MatrixMessage) (resp *bridgev2.MatrixMessageResponse, err error) {
+ if msg.Portal.RoomType == database.RoomTypeSpace {
+ return nil, fmt.Errorf("can't send messages to space portals")
+ }
// Handle Matrix events only after initial connection has been established to avoid deadlocking gotd
err = t.clientInitialized.Wait(ctx)
if err != nil {
return nil, err
}
- peer, err := t.inputPeerForPortalID(ctx, msg.Portal.ID)
+ peer, topicID, err := t.inputPeerForPortalID(ctx, msg.Portal.ID)
if err != nil {
return nil, err
}
- log := zerolog.Ctx(ctx).With().Stringer("portal_key", msg.Portal.PortalKey).Any("peer_id", peer).Logger()
+ log := zerolog.Ctx(ctx).With().
+ Stringer("portal_key", msg.Portal.PortalKey).
+ Any("peer_id", peer).
+ Logger()
ctx = log.WithContext(ctx)
var contentURI id.ContentURIString
@@ -283,6 +291,13 @@ func (t *TelegramClient) HandleMatrixMessage(ctx context.Context, msg *bridgev2.
}
replyTo = &tg.InputReplyToMessage{ReplyToMsgID: messageID}
}
+ if topicID > 0 {
+ if replyTo == nil {
+ replyTo = &tg.InputReplyToMessage{ReplyToMsgID: topicID}
+ } else {
+ replyTo.(*tg.InputReplyToMessage).TopMsgID = topicID
+ }
+ }
randomID := parseRandomID(msg.InputTransactionID)
@@ -396,8 +411,6 @@ func (t *TelegramClient) HandleMatrixMessage(ctx context.Context, msg *bridgev2.
resp = &bridgev2.MatrixMessageResponse{
DB: &database.Message{
ID: messageID,
- MXID: msg.Event.ID,
- Room: msg.Portal.PortalKey,
SenderID: t.userID,
Timestamp: timestamp,
Metadata: &MessageMetadata{
@@ -411,12 +424,15 @@ func (t *TelegramClient) HandleMatrixMessage(ctx context.Context, msg *bridgev2.
}
func (t *TelegramClient) HandleMatrixEdit(ctx context.Context, msg *bridgev2.MatrixEdit) error {
+ if msg.Portal.RoomType == database.RoomTypeSpace {
+ return fmt.Errorf("can't send messages to space portals")
+ }
log := zerolog.Ctx(ctx).With().
Str("conversion_direction", "to_telegram").
Str("handler", "matrix_edit").
Logger()
- peer, err := t.inputPeerForPortalID(ctx, msg.Portal.ID)
+ peer, _, err := t.inputPeerForPortalID(ctx, msg.Portal.ID)
if err != nil {
return err
}
@@ -483,11 +499,13 @@ func (t *TelegramClient) HandleMatrixEdit(ctx context.Context, msg *bridgev2.Mat
}
func (t *TelegramClient) HandleMatrixMessageRemove(ctx context.Context, msg *bridgev2.MatrixMessageRemove) error {
- if dbMsg, err := t.main.Bridge.DB.Message.GetPartByMXID(ctx, msg.TargetMessage.MXID); err != nil {
+ if msg.Portal.RoomType == database.RoomTypeSpace {
+ return fmt.Errorf("can't send messages to space portals")
+ } else if dbMsg, err := t.main.Bridge.DB.Message.GetPartByMXID(ctx, msg.TargetMessage.MXID); err != nil {
return err
} else if _, messageID, err := ids.ParseMessageID(dbMsg.ID); err != nil {
return err
- } else if peer, err := t.inputPeerForPortalID(ctx, msg.Portal.ID); err != nil {
+ } else if peer, _, err := t.inputPeerForPortalID(ctx, msg.Portal.ID); err != nil {
return err
} else {
_, err := message.NewSender(t.client.API()).
@@ -499,6 +517,9 @@ func (t *TelegramClient) HandleMatrixMessageRemove(ctx context.Context, msg *bri
}
func (t *TelegramClient) PreHandleMatrixReaction(ctx context.Context, msg *bridgev2.MatrixReaction) (bridgev2.MatrixReactionPreResponse, error) {
+ if msg.Portal.RoomType == database.RoomTypeSpace {
+ return bridgev2.MatrixReactionPreResponse{}, fmt.Errorf("can't send messages to space portals")
+ }
log := zerolog.Ctx(ctx).With().
Str("conversion_direction", "to_telegram").
Str("handler", "pre_handle_matrix_reaction").
@@ -562,7 +583,7 @@ func (t *TelegramClient) appendEmojiID(reactionList []tg.ReactionClass, emojiID
}
func (t *TelegramClient) HandleMatrixReaction(ctx context.Context, msg *bridgev2.MatrixReaction) (reaction *database.Reaction, err error) {
- peer, err := t.inputPeerForPortalID(ctx, msg.Portal.ID)
+ peer, _, err := t.inputPeerForPortalID(ctx, msg.Portal.ID)
if err != nil {
return nil, err
}
@@ -599,7 +620,10 @@ func (t *TelegramClient) HandleMatrixReaction(ctx context.Context, msg *bridgev2
}
func (t *TelegramClient) HandleMatrixReactionRemove(ctx context.Context, msg *bridgev2.MatrixReactionRemove) error {
- peer, err := t.inputPeerForPortalID(ctx, msg.Portal.ID)
+ if msg.Portal.RoomType == database.RoomTypeSpace {
+ return fmt.Errorf("can't send messages to space portals")
+ }
+ peer, _, err := t.inputPeerForPortalID(ctx, msg.Portal.ID)
if err != nil {
return err
}
@@ -637,16 +661,22 @@ func (t *TelegramClient) HandleMatrixReactionRemove(ctx context.Context, msg *br
}
func (t *TelegramClient) HandleMatrixReadReceipt(ctx context.Context, msg *bridgev2.MatrixReadReceipt) error {
+ if msg.Portal.RoomType == database.RoomTypeSpace {
+ return nil
+ }
log := zerolog.Ctx(ctx).With().
Str("action", "handle_matrix_read_receipt").
Str("portal_id", string(msg.Portal.ID)).
Bool("is_supergroup", msg.Portal.Metadata.(*PortalMetadata).IsSuperGroup).
Logger()
- peerType, portalID, parseErr := ids.ParsePortalID(msg.Portal.ID)
+ peerType, portalID, topicID, parseErr := ids.ParsePortalID(msg.Portal.ID)
if parseErr != nil {
return parseErr
}
- inputPeer, parseErr := t.inputPeerForPortalID(ctx, msg.Portal.ID)
+ if msg.Portal.Metadata.(*PortalMetadata).IsForumGeneral {
+ topicID = 1
+ }
+ inputPeer, _, parseErr := t.inputPeerForPortalID(ctx, msg.Portal.ID)
if parseErr != nil {
return parseErr
}
@@ -659,7 +689,8 @@ func (t *TelegramClient) HandleMatrixReadReceipt(ctx context.Context, msg *bridg
go func() {
defer wg.Done()
_, readMentionsErr = t.client.API().MessagesReadMentions(ctx, &tg.MessagesReadMentionsRequest{
- Peer: inputPeer,
+ Peer: inputPeer,
+ TopMsgID: topicID,
})
}()
@@ -668,7 +699,8 @@ func (t *TelegramClient) HandleMatrixReadReceipt(ctx context.Context, msg *bridg
go func() {
defer wg.Done()
_, readMentionsErr = t.client.API().MessagesReadReactions(ctx, &tg.MessagesReadReactionsRequest{
- Peer: inputPeer,
+ Peer: inputPeer,
+ TopMsgID: topicID,
})
}()
@@ -707,6 +739,7 @@ func (t *TelegramClient) HandleMatrixReadReceipt(ctx context.Context, msg *bridg
}
_, readMessagesErr = t.client.API().ChannelsReadHistory(ctx, &tg.ChannelsReadHistoryRequest{
Channel: &tg.InputChannel{ChannelID: portalID, AccessHash: accessHash},
+ MaxID: maxID,
})
if !msg.Portal.Metadata.(*PortalMetadata).IsSuperGroup {
@@ -741,21 +774,43 @@ func (t *TelegramClient) HandleMatrixReadReceipt(ctx context.Context, msg *bridg
}
func (t *TelegramClient) HandleMatrixTyping(ctx context.Context, msg *bridgev2.MatrixTyping) error {
- inputPeer, err := t.inputPeerForPortalID(ctx, msg.Portal.ID)
+ if msg.Portal.RoomType == database.RoomTypeSpace {
+ return nil
+ }
+ inputPeer, topicID, err := t.inputPeerForPortalID(ctx, msg.Portal.ID)
if err != nil {
return err
}
+ if msg.Portal.Metadata.(*PortalMetadata).IsForumGeneral {
+ topicID = 1
+ }
+ var action tg.SendMessageActionClass
+ switch msg.Type {
+ case bridgev2.TypingTypeText:
+ action = &tg.SendMessageTypingAction{}
+ case bridgev2.TypingTypeRecordingMedia:
+ // TODO media types?
+ action = &tg.SendMessageRecordVideoAction{}
+ case bridgev2.TypingTypeUploadingMedia:
+ action = &tg.SendMessageUploadVideoAction{}
+ }
+ if !msg.IsTyping {
+ action = &tg.SendMessageCancelAction{}
+ }
_, err = t.client.API().MessagesSetTyping(ctx, &tg.MessagesSetTypingRequest{
- Peer: inputPeer,
- Action: &tg.SendMessageTypingAction{},
+ Peer: inputPeer,
+ TopMsgID: topicID,
+ Action: action,
})
return err
}
func (t *TelegramClient) HandleMatrixDisappearingTimer(ctx context.Context, msg *bridgev2.MatrixDisappearingTimer) (bool, error) {
- inputPeer, err := t.inputPeerForPortalID(ctx, msg.Portal.ID)
+ inputPeer, topicID, err := t.inputPeerForPortalID(ctx, msg.Portal.ID)
if err != nil {
return false, err
+ } else if topicID > 0 {
+ return false, fmt.Errorf("topics can't have their own disappearing timer")
}
_, err = t.client.API().MessagesSetHistoryTTL(ctx, &tg.MessagesSetHistoryTTLRequest{
Peer: inputPeer,
@@ -771,7 +826,7 @@ func (t *TelegramClient) HandleMatrixDisappearingTimer(ctx context.Context, msg
}
func (t *TelegramClient) HandleMute(ctx context.Context, msg *bridgev2.MatrixMute) error {
- inputPeer, err := t.inputPeerForPortalID(ctx, msg.Portal.ID)
+ inputPeer, topicID, err := t.inputPeerForPortalID(ctx, msg.Portal.ID)
if err != nil {
return err
}
@@ -780,18 +835,26 @@ func (t *TelegramClient) HandleMute(ctx context.Context, msg *bridgev2.MatrixMut
Silent: msg.Content.IsMuted(),
MuteUntil: int(max(0, min(msg.Content.GetMutedUntilTime().Unix(), math.MaxInt32))),
}
+ var peer tg.InputNotifyPeerClass
+ if topicID > 0 {
+ peer = &tg.InputNotifyForumTopic{Peer: inputPeer, TopMsgID: topicID}
+ } else {
+ peer = &tg.InputNotifyPeer{Peer: inputPeer}
+ }
_, err = t.client.API().AccountUpdateNotifySettings(ctx, &tg.AccountUpdateNotifySettingsRequest{
- Peer: &tg.InputNotifyPeer{Peer: inputPeer},
+ Peer: peer,
Settings: settings,
})
return err
}
func (t *TelegramClient) HandleRoomTag(ctx context.Context, msg *bridgev2.MatrixRoomTag) error {
- inputPeer, err := t.inputPeerForPortalID(ctx, msg.Portal.ID)
+ inputPeer, topicID, err := t.inputPeerForPortalID(ctx, msg.Portal.ID)
if err != nil {
return err
+ } else if topicID > 0 {
+ return fmt.Errorf("topics can't be pinned for yourself")
}
_, err = t.client.API().MessagesToggleDialogPin(ctx, &tg.MessagesToggleDialogPinRequest{
@@ -802,7 +865,7 @@ func (t *TelegramClient) HandleRoomTag(ctx context.Context, msg *bridgev2.Matrix
}
func (t *TelegramClient) HandleMatrixDeleteChat(ctx context.Context, chat *bridgev2.MatrixDeleteChat) error {
- peerType, id, err := ids.ParsePortalID(chat.Portal.ID)
+ peerType, id, topicID, err := ids.ParsePortalID(chat.Portal.ID)
if err != nil {
return err
}
@@ -839,12 +902,21 @@ func (t *TelegramClient) HandleMatrixDeleteChat(ctx context.Context, chat *bridg
if err != nil {
return err
}
- channel := &tg.InputChannel{
- ChannelID: id,
- AccessHash: accessHash,
- }
if chat.Content.DeleteForEveryone {
- _, err := t.client.API().ChannelsDeleteChannel(ctx, channel)
+ if topicID > 0 {
+ _, err = t.client.API().MessagesDeleteTopicHistory(ctx, &tg.MessagesDeleteTopicHistoryRequest{
+ Peer: &tg.InputPeerChannel{
+ ChannelID: id,
+ AccessHash: accessHash,
+ },
+ TopMsgID: topicID,
+ })
+ } else {
+ _, err = t.client.API().ChannelsDeleteChannel(ctx, &tg.InputChannel{
+ ChannelID: id,
+ AccessHash: accessHash,
+ })
+ }
if err != nil {
return err
}
@@ -859,7 +931,7 @@ func (t *TelegramClient) HandleMatrixDeleteChat(ctx context.Context, chat *bridg
}
func (t *TelegramClient) HandleMatrixRoomName(ctx context.Context, msg *bridgev2.MatrixRoomName) (bool, error) {
- peerType, id, err := ids.ParsePortalID(msg.Portal.ID)
+ peerType, id, topicID, err := ids.ParsePortalID(msg.Portal.ID)
if err != nil {
return false, err
}
@@ -879,13 +951,24 @@ func (t *TelegramClient) HandleMatrixRoomName(ctx context.Context, msg *bridgev2
if err != nil {
return false, err
}
- _, err = t.client.API().ChannelsEditTitle(ctx, &tg.ChannelsEditTitleRequest{
- Channel: &tg.InputChannel{
- ChannelID: id,
- AccessHash: accessHash,
- },
- Title: msg.Content.Name,
- })
+ if topicID > 0 {
+ _, err = t.client.API().MessagesEditForumTopic(ctx, &tg.MessagesEditForumTopicRequest{
+ Peer: &tg.InputPeerChannel{
+ ChannelID: id,
+ AccessHash: accessHash,
+ },
+ TopicID: topicID,
+ Title: msg.Content.Name,
+ })
+ } else {
+ _, err = t.client.API().ChannelsEditTitle(ctx, &tg.ChannelsEditTitleRequest{
+ Channel: &tg.InputChannel{
+ ChannelID: id,
+ AccessHash: accessHash,
+ },
+ Title: msg.Content.Name,
+ })
+ }
if err != nil {
return false, err
}
@@ -896,13 +979,15 @@ func (t *TelegramClient) HandleMatrixRoomName(ctx context.Context, msg *bridgev2
}
func (t *TelegramClient) HandleMatrixRoomAvatar(ctx context.Context, msg *bridgev2.MatrixRoomAvatar) (bool, error) {
- peerType, id, err := ids.ParsePortalID(msg.Portal.ID)
+ peerType, id, topicID, err := ids.ParsePortalID(msg.Portal.ID)
if err != nil {
return false, err
}
if peerType == ids.PeerTypeUser {
return false, fmt.Errorf("changing user avatar is not supported")
+ } else if topicID > 0 {
+ return false, fmt.Errorf("changing group topic avatar is not supported")
}
var photo tg.InputChatPhotoClass
diff --git a/pkg/connector/handletelegram.go b/pkg/connector/handletelegram.go
index 8e8cdba2..b5b93ac8 100644
--- a/pkg/connector/handletelegram.go
+++ b/pkg/connector/handletelegram.go
@@ -47,25 +47,63 @@ import (
type IGetMessage interface {
GetMessage() tg.MessageClass
+ String() string
}
type IGetMessages interface {
GetMessages() []int
}
-func (t *TelegramClient) selfLeaveChat(portalKey networkid.PortalKey) error {
+func (t *TelegramClient) selfLeaveChat(ctx context.Context, portalKey networkid.PortalKey) error {
+ peerType, id, _, err := ids.ParsePortalID(portalKey.ID)
+ if err != nil {
+ return err
+ }
+ if peerType == ids.PeerTypeChannel {
+ topics, err := t.main.Store.Topic.GetAll(ctx, id)
+ if err != nil {
+ return err
+ }
+ for _, topicID := range topics {
+ res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatDelete{
+ EventMeta: simplevent.EventMeta{
+ Type: bridgev2.RemoteEventChatDelete,
+ PortalKey: t.makePortalKeyFromID(peerType, id, topicID),
+ Sender: t.mySender(),
+ },
+ OnlyForMe: true,
+ })
+ if err = resultToError(res); err != nil {
+ return err
+ }
+ }
+ }
res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatDelete{
EventMeta: simplevent.EventMeta{
- Type: bridgev2.RemoteEventChatDelete,
- LogContext: func(c zerolog.Context) zerolog.Context {
- return c.Stringer("portal_key", portalKey)
- },
+ Type: bridgev2.RemoteEventChatDelete,
PortalKey: portalKey,
Sender: t.mySender(),
},
OnlyForMe: true,
})
- return resultToError(res)
+ if err = resultToError(res); err != nil {
+ return err
+ }
+ if peerType == ids.PeerTypeChannel {
+ // This is a no-op if there's no space portal
+ res = t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatDelete{
+ EventMeta: simplevent.EventMeta{
+ Type: bridgev2.RemoteEventChatDelete,
+ PortalKey: t.makePortalKeyFromID(peerType, id, ids.TopicIDSpaceRoom),
+ Sender: t.mySender(),
+ },
+ OnlyForMe: true,
+ })
+ if err = resultToError(res); err != nil {
+ return err
+ }
+ }
+ return nil
}
func (t *TelegramClient) onUpdateChannel(ctx context.Context, e tg.Entities, update *tg.UpdateChannel) error {
@@ -75,7 +113,8 @@ func (t *TelegramClient) onUpdateChannel(ctx context.Context, e tg.Entities, upd
Logger()
log.Debug().Msg("Fetching channel due to UpdateChannel event")
- portalKey := t.makePortalKeyFromID(ids.PeerTypeChannel, update.ChannelID)
+ // TODO resync topic portals?
+ portalKey := t.makePortalKeyFromID(ids.PeerTypeChannel, update.ChannelID, 0)
chats, err := APICallWithOnlyChatUpdates(ctx, t, func() (tg.MessagesChatsClass, error) {
if accessHash, err := t.ScopedStore.GetAccessHash(ctx, ids.PeerTypeChannel, update.ChannelID); err != nil {
@@ -88,17 +127,17 @@ func (t *TelegramClient) onUpdateChannel(ctx context.Context, e tg.Entities, upd
})
if err != nil {
if tgerr.Is(err, tg.ErrChannelInvalid, tg.ErrChannelPrivate) {
- return t.selfLeaveChat(portalKey)
+ return t.selfLeaveChat(ctx, portalKey)
}
log.Err(err).Msg("Failed to get channel info after UpdateChannel event")
} else if len(chats.GetChats()) != 1 {
log.Warn().Int("chat_count", len(chats.GetChats())).Msg("Got more than 1 chat in GetChannels response")
} else if channel, ok := chats.GetChats()[0].(*tg.Channel); !ok {
log.Error().Type("chat_type", chats.GetChats()[0]).Msg("Expected channel, got something else. Leaving the channel.")
- return t.selfLeaveChat(portalKey)
+ return t.selfLeaveChat(ctx, portalKey)
} else if channel.Left {
log.Error().Msg("Update was for a left channel. Leaving the channel.")
- return t.selfLeaveChat(portalKey)
+ return t.selfLeaveChat(ctx, portalKey)
} else {
res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{
EventMeta: simplevent.EventMeta{
@@ -106,7 +145,7 @@ func (t *TelegramClient) onUpdateChannel(ctx context.Context, e tg.Entities, upd
CreatePortal: true,
},
GetChatInfoFunc: func(ctx context.Context, portal *bridgev2.Portal) (*bridgev2.ChatInfo, error) {
- chatInfo, mfm, err := t.wrapChatInfo(channel)
+ chatInfo, mfm, err := t.wrapChatInfo(portal.ID, channel)
if err != nil {
return nil, err
}
@@ -126,7 +165,7 @@ func (t *TelegramClient) onUpdateChannel(ctx context.Context, e tg.Entities, upd
func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Entities, update IGetMessage) error {
log := *zerolog.Ctx(ctx)
- log.Trace().Any("message_content", update).Msg("Raw message content")
+ log.Trace().Stringer("message_content", update).Msg("Raw message content")
switch msg := update.GetMessage().(type) {
case *tg.Message:
var isBroadcastChannel bool
@@ -167,7 +206,7 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent
Stringer("peer_id", msg.PeerID)
},
Sender: sender,
- PortalKey: t.makePortalKeyFromPeer(msg.PeerID),
+ PortalKey: t.makePortalKeyFromPeer(msg.PeerID, t.getTopicID(ctx, msg.PeerID, msg.ReplyTo)),
CreatePortal: true,
Timestamp: time.Unix(int64(msg.Date), 0),
StreamOrder: int64(msg.GetID()),
@@ -193,12 +232,37 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, entities tg.Ent
}
}
+func (t *TelegramClient) getTopicID(ctx context.Context, peerID tg.PeerClass, rawReplyTo tg.MessageReplyHeaderClass) int {
+ topicID := rawGetTopicID(rawReplyTo)
+ if topicID != 0 {
+ channelPeer, _ := peerID.(*tg.PeerChannel)
+ err := t.main.Store.Topic.Add(ctx, channelPeer.GetChannelID(), topicID)
+ if err != nil {
+ zerolog.Ctx(ctx).Err(err).Msg("Failed to save topic ID")
+ }
+ }
+ return topicID
+}
+
+func rawGetTopicID(rawReplyTo tg.MessageReplyHeaderClass) int {
+ switch replyTo := rawReplyTo.(type) {
+ case *tg.MessageReplyHeader:
+ if replyTo.ForumTopic {
+ if replyTo.ReplyToTopID != 0 {
+ return replyTo.ReplyToTopID
+ }
+ return replyTo.ReplyToMsgID
+ }
+ }
+ return 0
+}
+
func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.MessageService) error {
log := zerolog.Ctx(ctx)
sender := t.getEventSender(msg, false)
eventMeta := simplevent.EventMeta{
- PortalKey: t.makePortalKeyFromPeer(msg.PeerID),
+ PortalKey: t.makePortalKeyFromPeer(msg.PeerID, t.getTopicID(ctx, msg.PeerID, msg.ReplyTo)),
Sender: sender,
Timestamp: time.Unix(int64(msg.Date), 0),
LogContext: func(c zerolog.Context) zerolog.Context {
@@ -217,9 +281,7 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa
EventMeta: eventMeta.WithType(bridgev2.RemoteEventChatInfoChange),
ChatInfoChange: &bridgev2.ChatInfoChange{ChatInfo: &bridgev2.ChatInfo{Name: &action.Title}},
})
- if err := resultToError(res); err != nil {
- return err
- }
+ return resultToError(res)
case *tg.MessageActionChatEditPhoto:
switch peer := msg.PeerID.(type) {
case *tg.PeerChat:
@@ -229,9 +291,7 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa
Avatar: t.avatarFromPhoto(ctx, ids.PeerTypeChat, peer.ChatID, action.Photo),
}},
})
- if err := resultToError(res); err != nil {
- return err
- }
+ return resultToError(res)
case *tg.PeerChannel:
res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{
EventMeta: eventMeta.WithType(bridgev2.RemoteEventChatInfoChange),
@@ -239,9 +299,9 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa
Avatar: t.avatarFromPhoto(ctx, ids.PeerTypeChannel, peer.ChannelID, action.Photo),
}},
})
- if err := resultToError(res); err != nil {
- return err
- }
+ return resultToError(res)
+ default:
+ return nil
}
case *tg.MessageActionChatDeletePhoto:
@@ -249,9 +309,7 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa
EventMeta: eventMeta.WithType(bridgev2.RemoteEventChatInfoChange),
ChatInfoChange: &bridgev2.ChatInfoChange{ChatInfo: &bridgev2.ChatInfo{Avatar: &bridgev2.Avatar{Remove: true}}},
})
- if err := resultToError(res); err != nil {
- return err
- }
+ return resultToError(res)
case *tg.MessageActionChatAddUser:
memberChanges := &bridgev2.ChatMemberList{
MemberMap: map[networkid.UserID]bridgev2.ChatMember{},
@@ -266,9 +324,7 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa
EventMeta: eventMeta.WithType(bridgev2.RemoteEventChatInfoChange),
ChatInfoChange: &bridgev2.ChatInfoChange{MemberChanges: memberChanges},
})
- if err := resultToError(res); err != nil {
- return err
- }
+ return resultToError(res)
case *tg.MessageActionChatJoinedByLink:
res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{
EventMeta: eventMeta.WithType(bridgev2.RemoteEventChatInfoChange),
@@ -281,12 +337,10 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa
},
},
})
- if err := resultToError(res); err != nil {
- return err
- }
+ return resultToError(res)
case *tg.MessageActionChatDeleteUser:
if action.UserID == t.telegramUserID {
- return t.selfLeaveChat(eventMeta.PortalKey)
+ return t.selfLeaveChat(ctx, eventMeta.PortalKey)
}
res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{
EventMeta: eventMeta.WithType(bridgev2.RemoteEventChatInfoChange),
@@ -299,9 +353,7 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa
},
},
})
- if err := resultToError(res); err != nil {
- return err
- }
+ return resultToError(res)
case *tg.MessageActionChatCreate:
res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[any]{
EventMeta: eventMeta.WithType(bridgev2.RemoteEventMessage).WithCreatePortal(true),
@@ -318,9 +370,7 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa
}, nil
},
})
- if err := resultToError(res); err != nil {
- return err
- }
+ return resultToError(res)
case *tg.MessageActionChannelCreate:
res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{
@@ -346,9 +396,7 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa
}, nil
},
})
- if err := resultToError(res); err != nil {
- return err
- }
+ return resultToError(res)
case *tg.MessageActionSetMessagesTTL:
setting := database.DisappearingSetting{
Type: event.DisappearingTypeAfterSend,
@@ -362,9 +410,7 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa
},
},
})
- if err := resultToError(res); err != nil {
- return err
- }
+ return resultToError(res)
case *tg.MessageActionPhoneCall:
var body strings.Builder
if action.Video {
@@ -406,9 +452,7 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa
}, nil
},
})
- if err := resultToError(res); err != nil {
- return err
- }
+ return resultToError(res)
case *tg.MessageActionGroupCall:
var body string
if action.Duration == 0 {
@@ -429,9 +473,7 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa
}, nil
},
})
- if err := resultToError(res); err != nil {
- return err
- }
+ return resultToError(res)
case *tg.MessageActionInviteToGroupCall:
var body, html strings.Builder
var mentions event.Mentions
@@ -446,7 +488,7 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa
return err
} else {
var name string
- if username, err := t.ScopedStore.GetUsername(ctx, ids.PeerTypeUser, userID); err != nil {
+ if username, err := t.main.Store.Username.Get(ctx, ids.PeerTypeUser, userID); err != nil {
name = "@" + username
} else {
name = ghost.Name
@@ -477,9 +519,7 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa
}, nil
},
})
- if err := resultToError(res); err != nil {
- return err
- }
+ return resultToError(res)
case *tg.MessageActionGroupCallScheduled:
start := time.Unix(int64(action.ScheduleDate), 0)
res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[any]{
@@ -499,13 +539,11 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa
}, nil
},
})
- if err := resultToError(res); err != nil {
- return err
- }
+ return resultToError(res)
case *tg.MessageActionChatMigrateTo:
log.Debug().Int64("channel_id", action.ChannelID).Msg("MessageActionChatMigrateTo")
- newPortalKey := t.makePortalKeyFromID(ids.PeerTypeChannel, action.ChannelID)
+ newPortalKey := t.makePortalKeyFromID(ids.PeerTypeChannel, action.ChannelID, 0)
if err := t.migrateChat(ctx, eventMeta.PortalKey, newPortalKey); err != nil {
log.Err(err).Msg("Failed to migrate chat to channel")
return err
@@ -528,45 +566,39 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa
}, nil
},
})
- if err := resultToError(res); err != nil {
- return err
+ return resultToError(res)
+
+ case *tg.MessageActionTopicCreate:
+ channelPeer, _ := msg.PeerID.(*tg.PeerChannel)
+ err := t.main.Store.Topic.Add(ctx, channelPeer.GetChannelID(), msg.ID)
+ if err != nil {
+ return fmt.Errorf("failed to store new topic: %w", err)
}
+ res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{
+ EventMeta: eventMeta.
+ WithPortalKey(t.makePortalKeyFromPeer(msg.PeerID, msg.ID)).
+ WithType(bridgev2.RemoteEventChatResync).
+ WithCreatePortal(true),
+ GetChatInfoFunc: t.GetChatInfo,
+ })
+ return resultToError(res)
+ case *tg.MessageActionTopicEdit:
+ // TODO specific changes?
+ res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{
+ EventMeta: eventMeta.
+ WithPortalKey(t.makePortalKeyFromPeer(msg.PeerID, msg.ID)).
+ WithType(bridgev2.RemoteEventChatResync).
+ WithCreatePortal(true),
+ GetChatInfoFunc: t.GetChatInfo,
+ })
+ return resultToError(res)
- // case *tg.MessageActionChannelMigrateFrom:
-
- // case *tg.MessageActionPinMessage:
- // case *tg.MessageActionHistoryClear:
- // case *tg.MessageActionGameScore:
- // case *tg.MessageActionPaymentSentMe:
- // case *tg.MessageActionPaymentSent:
- // case *tg.MessageActionScreenshotTaken:
- // case *tg.MessageActionCustomAction:
- // case *tg.MessageActionBotAllowed:
- // case *tg.MessageActionSecureValuesSentMe:
- // case *tg.MessageActionSecureValuesSent:
- // case *tg.MessageActionContactSignUp:
- // case *tg.MessageActionGeoProximityReached:
- // case *tg.MessageActionSetChatTheme:
- // case *tg.MessageActionChatJoinedByRequest:
- // case *tg.MessageActionWebViewDataSentMe:
- // case *tg.MessageActionWebViewDataSent:
- // case *tg.MessageActionGiftPremium:
- // case *tg.MessageActionTopicCreate:
- // case *tg.MessageActionTopicEdit:
- // case *tg.MessageActionSuggestProfilePhoto:
- // case *tg.MessageActionRequestedPeer:
- // case *tg.MessageActionSetChatWallPaper:
- // case *tg.MessageActionGiftCode:
- // case *tg.MessageActionGiveawayLaunch:
- // case *tg.MessageActionGiveawayResults:
- // case *tg.MessageActionBoostApply:
- // case *tg.MessageActionRequestedPeerSentMe:
default:
log.Warn().
Type("action_type", action).
Msg("ignoring unknown action type")
+ return nil
}
- return nil
}
func (t *TelegramClient) migrateChat(ctx context.Context, oldPortalKey, newPortalKey networkid.PortalKey) error {
@@ -684,25 +716,22 @@ func (t *TelegramClient) onUserName(ctx context.Context, e tg.Entities, update *
func (t *TelegramClient) onDeleteMessages(ctx context.Context, channelID int64, update IGetMessages) error {
for _, messageID := range update.GetMessages() {
- var portalKey networkid.PortalKey
- if channelID == 0 {
- // TODO have mautrix-go do this part too?
- parts, err := t.main.Bridge.DB.Message.GetAllPartsByID(ctx, t.loginID, ids.MakeMessageID(channelID, messageID))
- if err != nil {
- return err
- }
- if len(parts) == 0 {
- zerolog.Ctx(ctx).Debug().
- Int("message_id", messageID).
- Int64("channel_id", channelID).
- Msg("ignoring delete of message we have no parts for")
- continue
- }
- // TODO can deletes happen across rooms?
- portalKey = parts[0].Room
- } else {
- portalKey = t.makePortalKeyFromPeer(&tg.PeerChannel{ChannelID: channelID})
+ // TODO have mautrix-go do this part too?
+ parts, err := t.main.Bridge.DB.Message.GetAllPartsByID(ctx, t.loginID, ids.MakeMessageID(channelID, messageID))
+ if err != nil {
+ return err
}
+ if len(parts) == 0 {
+ zerolog.Ctx(ctx).Debug().
+ Int("message_id", messageID).
+ Int64("channel_id", channelID).
+ Msg("ignoring delete of message we have no parts for")
+ continue
+ }
+ // TODO can deletes happen across rooms?
+ portalKey := parts[0].Room
+ // TODO optimize non-topic portal deletion by using channel ID?
+ //portalKey = t.makePortalKeyFromPeer(&tg.PeerChannel{ChannelID: channelID})
res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.MessageRemove{
EventMeta: simplevent.EventMeta{
Type: bridgev2.RemoteEventMessageRemove,
@@ -769,7 +798,7 @@ func (t *TelegramClient) updateChannel(ctx context.Context, channel *tg.Channel)
}
if username, set := channel.GetUsername(); set {
- err := t.ScopedStore.SetUsername(ctx, ids.PeerTypeChannel, channel.ID, username)
+ err := t.main.Store.Username.Set(ctx, ids.PeerTypeChannel, channel.ID, username)
if err != nil {
return nil, err
}
@@ -796,7 +825,7 @@ func (t *TelegramClient) onEntityUpdate(ctx context.Context, e tg.Entities) erro
}
for chatID, chat := range e.Chats {
if chat.GetLeft() {
- t.selfLeaveChat(t.makePortalKeyFromID(ids.PeerTypeChat, chatID))
+ t.selfLeaveChat(ctx, t.makePortalKeyFromID(ids.PeerTypeChat, chatID, 0))
}
}
for _, channel := range e.Channels {
@@ -821,7 +850,7 @@ func (t *TelegramClient) onMessageEdit(ctx context.Context, update IGetMessage)
zerolog.Ctx(ctx).Err(err).Msg("Failed to handle reactions on edited message")
}
- portalKey := t.makePortalKeyFromPeer(msg.PeerID)
+ portalKey := t.makePortalKeyFromPeer(msg.PeerID, t.getTopicID(ctx, msg.PeerID, msg.ReplyTo))
portal, err := t.main.Bridge.GetPortalByKey(ctx, portalKey)
if err != nil {
return err
@@ -872,14 +901,12 @@ func (t *TelegramClient) onMessageEdit(ctx context.Context, update IGetMessage)
}
}
- var ce bridgev2.ConvertedEdit
- if !bytes.Equal(existingPart.Metadata.(*MessageMetadata).ContentHash, converted.Parts[0].DBMetadata.(*MessageMetadata).ContentHash) {
- ce.ModifiedParts = append(ce.ModifiedParts, converted.Parts[0].ToEditPart(existingPart))
+ if bytes.Equal(existingPart.Metadata.(*MessageMetadata).ContentHash, converted.Parts[0].DBMetadata.(*MessageMetadata).ContentHash) {
+ return nil, fmt.Errorf("%w (content hash didn't change)", bridgev2.ErrIgnoringRemoteEvent)
}
- if len(ce.ModifiedParts) == 0 {
- return nil, bridgev2.ErrIgnoringRemoteEvent
- }
- return &ce, nil
+ return &bridgev2.ConvertedEdit{
+ ModifiedParts: []*bridgev2.ConvertedEditPart{converted.Parts[0].ToEditPart(existingPart)},
+ }, nil
},
})
return resultToError(res)
@@ -890,10 +917,19 @@ func (t *TelegramClient) handleTyping(portal networkid.PortalKey, sender bridgev
return nil
}
timeout := time.Duration(6) * time.Second
- if action.TypeID() != tg.SendMessageTypingActionTypeID {
+ var typingType bridgev2.TypingType
+ switch action.(type) {
+ case *tg.SendMessageTypingAction:
+ typingType = bridgev2.TypingTypeText
+ case *tg.SendMessageRecordAudioAction, *tg.SendMessageRecordRoundAction, *tg.SendMessageRecordVideoAction:
+ typingType = bridgev2.TypingTypeRecordingMedia
+ case *tg.SendMessageUploadAudioAction, *tg.SendMessageUploadDocumentAction, *tg.SendMessageUploadPhotoAction, *tg.SendMessageUploadRoundAction, *tg.SendMessageUploadVideoAction:
+ typingType = bridgev2.TypingTypeUploadingMedia
+ case *tg.SendMessageCancelAction:
+ timeout = 0
+ default:
timeout = 0
}
- // TODO send proper TypingTypes
res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Typing{
EventMeta: simplevent.EventMeta{
Type: bridgev2.RemoteEventTyping,
@@ -901,6 +937,7 @@ func (t *TelegramClient) handleTyping(portal networkid.PortalKey, sender bridgev
Sender: sender,
},
Timeout: timeout,
+ Type: typingType,
})
return resultToError(res)
}
@@ -915,7 +952,7 @@ func (t *TelegramClient) updateReadReceipt(ctx context.Context, e tg.Entities, u
res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Receipt{
EventMeta: simplevent.EventMeta{
Type: bridgev2.RemoteEventReadReceipt,
- PortalKey: t.makePortalKeyFromPeer(update.Peer),
+ PortalKey: t.makePortalKeyFromPeer(update.Peer, 0),
Sender: bridgev2.EventSender{
SenderLogin: ids.MakeUserLoginID(user.UserID),
Sender: ids.MakeUserID(user.UserID),
@@ -940,25 +977,25 @@ func (t *TelegramClient) onOwnReadReceipt(portalKey networkid.PortalKey, maxID i
return resultToError(res)
}
-func (t *TelegramClient) inputPeerForPortalID(ctx context.Context, portalID networkid.PortalID) (tg.InputPeerClass, error) {
- peerType, id, err := ids.ParsePortalID(portalID)
+func (t *TelegramClient) inputPeerForPortalID(ctx context.Context, portalID networkid.PortalID) (tg.InputPeerClass, int, error) {
+ peerType, id, topicID, err := ids.ParsePortalID(portalID)
if err != nil {
- return nil, err
+ return nil, 0, err
}
switch peerType {
case ids.PeerTypeUser:
if accessHash, err := t.ScopedStore.GetAccessHash(ctx, ids.PeerTypeUser, id); err != nil {
- return nil, fmt.Errorf("failed to get user access hash for %d: %w", id, err)
+ return nil, 0, fmt.Errorf("failed to get user access hash for %d: %w", id, err)
} else {
- return &tg.InputPeerUser{UserID: id, AccessHash: accessHash}, nil
+ return &tg.InputPeerUser{UserID: id, AccessHash: accessHash}, 0, nil
}
case ids.PeerTypeChat:
- return &tg.InputPeerChat{ChatID: id}, nil
+ return &tg.InputPeerChat{ChatID: id}, 0, nil
case ids.PeerTypeChannel:
if accessHash, err := t.ScopedStore.GetAccessHash(ctx, ids.PeerTypeChannel, id); err != nil {
- return nil, err
+ return nil, 0, err
} else {
- return &tg.InputPeerChannel{ChannelID: id, AccessHash: accessHash}, nil
+ return &tg.InputPeerChannel{ChannelID: id, AccessHash: accessHash}, topicID, nil
}
default:
panic("invalid peer type")
@@ -1097,10 +1134,17 @@ func (t *TelegramClient) transferEmojisToMatrix(ctx context.Context, customEmoji
}
func (t *TelegramClient) onNotifySettings(ctx context.Context, e tg.Entities, update *tg.UpdateNotifySettings) error {
- if update.Peer.TypeID() != tg.NotifyPeerTypeID {
+ var portalKey networkid.PortalKey
+ switch typedPeer := update.Peer.(type) {
+ case *tg.NotifyPeer:
+ portalKey = t.makePortalKeyFromPeer(typedPeer.Peer, 0)
+ case *tg.NotifyForumTopic:
+ portalKey = t.makePortalKeyFromPeer(typedPeer.Peer, typedPeer.TopMsgID)
+ default:
zerolog.Ctx(ctx).Debug().
- Str("peer_type", update.Peer.TypeName()).
- Msg("Ignoring unsupported peer type")
+ Type("peer_type", update.Peer).
+ Any("peer", update.Peer).
+ Msg("Ignoring unsupported notify settings peer type")
return nil
}
@@ -1111,16 +1155,17 @@ func (t *TelegramClient) onNotifySettings(ctx context.Context, e tg.Entities, up
mutedUntil = &bridgev2.Unmuted
}
- res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{
- ChatInfo: &bridgev2.ChatInfo{
- UserLocal: &bridgev2.UserLocalPortalInfo{
- MutedUntil: mutedUntil,
+ res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{
+ ChatInfoChange: &bridgev2.ChatInfoChange{
+ ChatInfo: &bridgev2.ChatInfo{
+ UserLocal: &bridgev2.UserLocalPortalInfo{
+ MutedUntil: mutedUntil,
+ },
},
- CanBackfill: true,
},
EventMeta: simplevent.EventMeta{
- Type: bridgev2.RemoteEventChatResync,
- PortalKey: t.makePortalKeyFromPeer(update.Peer.(*tg.NotifyPeer).Peer),
+ Type: bridgev2.RemoteEventChatInfoChange,
+ PortalKey: portalKey,
},
})
return resultToError(res)
@@ -1129,11 +1174,11 @@ func (t *TelegramClient) onNotifySettings(ctx context.Context, e tg.Entities, up
func (t *TelegramClient) onPinnedDialogs(ctx context.Context, e tg.Entities, msg *tg.UpdatePinnedDialogs) error {
needsUnpinning := map[networkid.PortalKey]struct{}{}
for _, portalID := range t.userLogin.Metadata.(*UserLoginMetadata).PinnedDialogs {
- pt, id, err := ids.ParsePortalID(portalID)
+ pt, id, _, err := ids.ParsePortalID(portalID)
if err != nil {
return err
}
- needsUnpinning[t.makePortalKeyFromID(pt, id)] = struct{}{}
+ needsUnpinning[t.makePortalKeyFromID(pt, id, 0)] = struct{}{}
}
t.userLogin.Metadata.(*UserLoginMetadata).PinnedDialogs = nil
@@ -1142,19 +1187,20 @@ func (t *TelegramClient) onPinnedDialogs(ctx context.Context, e tg.Entities, msg
if !ok {
continue
}
- portalKey := t.makePortalKeyFromPeer(dialog.Peer)
+ portalKey := t.makePortalKeyFromPeer(dialog.Peer, 0)
delete(needsUnpinning, portalKey)
t.userLogin.Metadata.(*UserLoginMetadata).PinnedDialogs = append(t.userLogin.Metadata.(*UserLoginMetadata).PinnedDialogs, portalKey.ID)
- res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{
- ChatInfo: &bridgev2.ChatInfo{
- UserLocal: &bridgev2.UserLocalPortalInfo{
- Tag: ptr.Ptr(event.RoomTagFavourite),
+ res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{
+ ChatInfoChange: &bridgev2.ChatInfoChange{
+ ChatInfo: &bridgev2.ChatInfo{
+ UserLocal: &bridgev2.UserLocalPortalInfo{
+ Tag: ptr.Ptr(event.RoomTagFavourite),
+ },
},
- CanBackfill: true,
},
EventMeta: simplevent.EventMeta{
- Type: bridgev2.RemoteEventChatResync,
+ Type: bridgev2.RemoteEventChatInfoChange,
PortalKey: portalKey,
},
})
@@ -1165,15 +1211,16 @@ func (t *TelegramClient) onPinnedDialogs(ctx context.Context, e tg.Entities, msg
var empty event.RoomTag
for portalKey := range needsUnpinning {
- res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{
- ChatInfo: &bridgev2.ChatInfo{
- UserLocal: &bridgev2.UserLocalPortalInfo{
- Tag: &empty,
+ res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{
+ ChatInfoChange: &bridgev2.ChatInfoChange{
+ ChatInfo: &bridgev2.ChatInfo{
+ UserLocal: &bridgev2.UserLocalPortalInfo{
+ Tag: &empty,
+ },
},
- CanBackfill: true,
},
EventMeta: simplevent.EventMeta{
- Type: bridgev2.RemoteEventChatResync,
+ Type: bridgev2.RemoteEventChatInfoChange,
PortalKey: portalKey,
},
})
@@ -1186,16 +1233,18 @@ func (t *TelegramClient) onPinnedDialogs(ctx context.Context, e tg.Entities, msg
}
func (t *TelegramClient) onChatDefaultBannedRights(ctx context.Context, entities tg.Entities, update *tg.UpdateChatDefaultBannedRights) error {
- res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{
- ChatInfo: &bridgev2.ChatInfo{
- Members: &bridgev2.ChatMemberList{
- PowerLevels: t.getPowerLevelOverridesFromBannedRights(entities.Chats[0], update.DefaultBannedRights),
+ // TODO update all topic portals
+ res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{
+ ChatInfoChange: &bridgev2.ChatInfoChange{
+ ChatInfo: &bridgev2.ChatInfo{
+ Members: &bridgev2.ChatMemberList{
+ PowerLevels: t.getPowerLevelOverridesFromBannedRights(entities.Chats[0], update.DefaultBannedRights),
+ },
},
- CanBackfill: true,
},
EventMeta: simplevent.EventMeta{
- Type: bridgev2.RemoteEventChatResync,
- PortalKey: t.makePortalKeyFromPeer(update.Peer),
+ Type: bridgev2.RemoteEventChatInfoChange,
+ PortalKey: t.makePortalKeyFromPeer(update.Peer, 0),
},
})
return resultToError(res)
@@ -1233,7 +1282,7 @@ func (t *TelegramClient) onPeerBlocked(ctx context.Context, e tg.Entities, updat
},
EventMeta: simplevent.EventMeta{
Type: bridgev2.RemoteEventChatResync,
- PortalKey: t.makePortalKeyFromPeer(update.PeerID),
+ PortalKey: t.makePortalKeyFromPeer(update.PeerID, 0),
},
})
return resultToError(res)
@@ -1264,7 +1313,7 @@ func (t *TelegramClient) onPhoneCall(ctx context.Context, e tg.Entities, update
res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[any]{
EventMeta: simplevent.EventMeta{
Type: bridgev2.RemoteEventMessage,
- PortalKey: t.makePortalKeyFromID(ids.PeerTypeUser, call.AdminID),
+ PortalKey: t.makePortalKeyFromID(ids.PeerTypeUser, call.AdminID, 0),
CreatePortal: true,
Sender: t.senderForUserID(call.AdminID),
},
diff --git a/pkg/connector/ids.go b/pkg/connector/ids.go
index 0213baf0..150bcaa5 100644
--- a/pkg/connector/ids.go
+++ b/pkg/connector/ids.go
@@ -23,16 +23,16 @@ import (
"go.mau.fi/mautrix-telegram/pkg/gotd/tg"
)
-func (t *TelegramClient) makePortalKeyFromPeer(peer tg.PeerClass) networkid.PortalKey {
- key := ids.InternalMakePortalKey(peer, t.loginID)
+func (t *TelegramClient) makePortalKeyFromPeer(peer tg.PeerClass, topicID int) networkid.PortalKey {
+ key := ids.InternalPeerToPortalKey(peer, topicID, t.loginID)
if t.main.Bridge.Config.SplitPortals {
key.Receiver = t.userLogin.ID
}
return key
}
-func (t *TelegramClient) makePortalKeyFromID(peerType ids.PeerType, chatID int64) networkid.PortalKey {
- key := peerType.InternalAsPortalKey(chatID, t.loginID)
+func (t *TelegramClient) makePortalKeyFromID(peerType ids.PeerType, chatID int64, topicID int) networkid.PortalKey {
+ key := ids.InternalMakePortalKey(peerType, chatID, topicID, t.loginID)
if t.main.Bridge.Config.SplitPortals {
key.Receiver = t.userLogin.ID
}
diff --git a/pkg/connector/ids/ids.go b/pkg/connector/ids/ids.go
index 6a91f1f5..ac59ab15 100644
--- a/pkg/connector/ids/ids.go
+++ b/pkg/connector/ids/ids.go
@@ -81,7 +81,7 @@ func MakeMessageID(rawChatID any, messageID int) networkid.MessageID {
var channelID int64
switch typedChatID := rawChatID.(type) {
case networkid.PortalKey:
- peerType, entityID, _ := ParsePortalID(typedChatID.ID)
+ peerType, entityID, _, _ := ParsePortalID(typedChatID.ID)
if peerType == PeerTypeChannel {
channelID = entityID
}
@@ -163,52 +163,52 @@ func (pt PeerType) AsByte() byte {
}
}
-func (pt PeerType) InternalAsPortalKey(chatID int64, receiver networkid.UserLoginID) networkid.PortalKey {
+func MakePortalID(pt PeerType, chatID int64) networkid.PortalID {
+ return networkid.PortalID(fmt.Sprintf("%s:%d", pt, chatID))
+}
+
+const TopicIDSpaceRoom = -1
+
+func MakeForumParentPortalID(channelID int64) networkid.PortalID {
+ return MakeTopicPortalID(channelID, TopicIDSpaceRoom)
+}
+
+func MakeTopicPortalID(channelID int64, topicID int) networkid.PortalID {
+ return networkid.PortalID(fmt.Sprintf("%s:%d:%d", PeerTypeChannel, channelID, topicID))
+}
+
+func InternalMakePortalKey(pt PeerType, chatID int64, topicID int, receiver networkid.UserLoginID) networkid.PortalKey {
portalKey := networkid.PortalKey{
- ID: networkid.PortalID(fmt.Sprintf("%s:%d", pt, chatID)),
+ ID: MakePortalID(pt, chatID),
}
if pt == PeerTypeUser || pt == PeerTypeChat {
portalKey.Receiver = receiver
+ } else if topicID != 0 {
+ portalKey.ID = MakeTopicPortalID(chatID, topicID)
}
return portalKey
}
-func GetChatID(peer tg.PeerClass) int64 {
+func InternalPeerToPortalKey(peer tg.PeerClass, topicID int, receiver networkid.UserLoginID) networkid.PortalKey {
switch v := peer.(type) {
case *tg.PeerUser:
- return v.UserID
+ return InternalMakePortalKey(PeerTypeUser, v.UserID, topicID, receiver)
case *tg.PeerChat:
- return v.ChatID
+ return InternalMakePortalKey(PeerTypeChat, v.ChatID, topicID, receiver)
case *tg.PeerChannel:
- return v.ChannelID
+ return InternalMakePortalKey(PeerTypeChannel, v.ChannelID, topicID, receiver)
default:
panic(fmt.Errorf("unknown peer class type %T", v))
}
}
-func InternalMakePortalKey(peer tg.PeerClass, receiver networkid.UserLoginID) networkid.PortalKey {
- switch v := peer.(type) {
- case *tg.PeerUser:
- return networkid.PortalKey{
- ID: networkid.PortalID(fmt.Sprintf("%s:%d", PeerTypeUser, v.UserID)),
- Receiver: receiver,
- }
- case *tg.PeerChat:
- return networkid.PortalKey{
- ID: networkid.PortalID(fmt.Sprintf("%s:%d", PeerTypeChat, v.ChatID)),
- Receiver: receiver,
- }
- case *tg.PeerChannel:
- return networkid.PortalKey{ID: networkid.PortalID(fmt.Sprintf("%s:%d", PeerTypeChannel, v.ChannelID))}
- default:
- panic(fmt.Errorf("unknown peer class type %T", v))
- }
-}
-
-func ParsePortalID(portalID networkid.PortalID) (pt PeerType, id int64, err error) {
+func ParsePortalID(portalID networkid.PortalID) (pt PeerType, id int64, topicID int, err error) {
parts := strings.Split(string(portalID), ":")
pt = PeerType(parts[0])
id, err = strconv.ParseInt(parts[1], 10, 64)
+ if len(parts) == 3 && err == nil && pt == PeerTypeChannel {
+ topicID, err = strconv.Atoi(parts[2])
+ }
return
}
diff --git a/pkg/connector/media/transfer.go b/pkg/connector/media/transfer.go
index 72bf3733..2ebc6cfa 100644
--- a/pkg/connector/media/transfer.go
+++ b/pkg/connector/media/transfer.go
@@ -437,7 +437,7 @@ func (t *ReadyTransferer) DownloadBytes(ctx context.Context) ([]byte, error) {
// DirectDownloadURL returns the direct download URL for the media.
func (t *ReadyTransferer) DirectDownloadURL(ctx context.Context, loggedInUserID int64, portal *bridgev2.Portal, msgID int, thumbnail bool, telegramMediaID int64) (id.ContentURIString, *event.FileInfo, error) {
- peerType, chatID, err := ids.ParsePortalID(portal.ID)
+ peerType, chatID, _, err := ids.ParsePortalID(portal.ID)
if err != nil {
return "", nil, err
}
diff --git a/pkg/connector/metadata.go b/pkg/connector/metadata.go
index 7905223c..bb2f22df 100644
--- a/pkg/connector/metadata.go
+++ b/pkg/connector/metadata.go
@@ -50,6 +50,7 @@ type GhostMetadata struct {
type PortalMetadata struct {
IsSuperGroup bool `json:"is_supergroup,omitempty"`
+ IsForumGeneral bool `json:"is_forum_general,omitempty"`
ReadUpTo int `json:"read_up_to,omitempty"`
AllowedReactions []string `json:"allowed_reactions"`
LastSync jsontime.Unix `json:"last_sync,omitempty"`
@@ -63,6 +64,12 @@ func (pm *PortalMetadata) SetIsSuperGroup(isSupergroup bool) (changed bool) {
return changed
}
+func (pm *PortalMetadata) SetIsForumGeneral(isForumGeneral bool) (changed bool) {
+ changed = pm.IsForumGeneral != isForumGeneral
+ pm.IsForumGeneral = isForumGeneral
+ return changed
+}
+
type MessageMetadata struct {
ContentHash []byte `json:"content_hash,omitempty"`
ContentURI id.ContentURIString `json:"content_uri,omitempty"`
diff --git a/pkg/connector/push.go b/pkg/connector/push.go
index a8ae396f..b8561da2 100644
--- a/pkg/connector/push.go
+++ b/pkg/connector/push.go
@@ -50,6 +50,7 @@ type PushCustomData struct {
MessageID int `json:"msg_id,string"`
ChannelID int64 `json:"channel_id,string"`
+ TopicID int `json:"top_msg_id,string"`
ChatID int64 `json:"chat_id,string"`
FromID int64 `json:"from_id,string"`
@@ -281,11 +282,11 @@ func (t *TelegramClient) ConnectBackground(ctx context.Context, params *bridgev2
}
var err error
if data.Custom.ChannelID != 0 {
- relatedPortal, err = t.main.Bridge.GetPortalByKey(ctx, t.makePortalKeyFromID(ids.PeerTypeChannel, data.Custom.ChannelID))
+ relatedPortal, err = t.main.Bridge.GetPortalByKey(ctx, t.makePortalKeyFromID(ids.PeerTypeChannel, data.Custom.ChannelID, data.Custom.TopicID))
} else if data.Custom.ChatID != 0 {
- relatedPortal, err = t.main.Bridge.GetPortalByKey(ctx, t.makePortalKeyFromID(ids.PeerTypeChat, data.Custom.ChatID))
+ relatedPortal, err = t.main.Bridge.GetPortalByKey(ctx, t.makePortalKeyFromID(ids.PeerTypeChat, data.Custom.ChatID, 0))
} else if data.Custom.FromID != 0 {
- relatedPortal, err = t.main.Bridge.GetPortalByKey(ctx, t.makePortalKeyFromID(ids.PeerTypeUser, data.Custom.FromID))
+ relatedPortal, err = t.main.Bridge.GetPortalByKey(ctx, t.makePortalKeyFromID(ids.PeerTypeUser, data.Custom.FromID, 0))
}
if err != nil {
return fmt.Errorf("failed to get related portal: %w", err)
diff --git a/pkg/connector/reactions.go b/pkg/connector/reactions.go
index 612d156c..8edeccc5 100644
--- a/pkg/connector/reactions.go
+++ b/pkg/connector/reactions.go
@@ -23,6 +23,7 @@ import (
"github.com/rs/zerolog"
"maunium.net/go/mautrix/bridgev2"
+ "maunium.net/go/mautrix/bridgev2/database"
"maunium.net/go/mautrix/bridgev2/networkid"
"maunium.net/go/mautrix/bridgev2/simplevent"
@@ -60,10 +61,11 @@ func (t *TelegramClient) computeReactionsList(ctx context.Context, peer tg.PeerC
// // Can't fetch exact reaction senders as a bot
// return
- // TODO should calls to this be limited?
- } else if peer, err := t.inputPeerForPortalID(ctx, t.makePortalKeyFromPeer(peer).ID); err != nil {
+ // TODO remove redundant peer roundtrip, just add a peer -> input peer helper
+ } else if peer, _, err := t.inputPeerForPortalID(ctx, t.makePortalKeyFromPeer(peer, 0).ID); err != nil {
return nil, false, nil, fmt.Errorf("failed to get input peer: %w", err)
} else {
+ // TODO should calls to this be limited?
reactions, err := APICallWithUpdates(ctx, t, func() (*tg.MessagesMessageReactionsList, error) {
return t.client.API().MessagesGetMessageReactionsList(ctx, &tg.MessagesGetMessageReactionsListRequest{
Peer: peer, ID: msgID, Limit: 100,
@@ -152,7 +154,7 @@ func (t *TelegramClient) handleTelegramReactions(ctx context.Context, msg *tg.Me
LogContext: func(c zerolog.Context) zerolog.Context {
return c.Int("message_id", msg.ID)
},
- PortalKey: t.makePortalKeyFromPeer(msg.PeerID),
+ PortalKey: t.makePortalKeyFromPeer(msg.PeerID, t.getTopicID(ctx, msg.PeerID, msg.ReplyTo)),
},
TargetMessage: ids.GetMessageIDFromMessage(msg),
Reactions: &bridgev2.ReactionSyncData{Users: users, HasAllUsers: isFull},
@@ -207,7 +209,7 @@ func (t *TelegramClient) getReactionLimit(ctx context.Context, sender networkid.
func (t *TelegramClient) maybePollForReactions(ctx context.Context, portal *bridgev2.Portal) error {
// Only poll for reactions in supergroups
- if portal == nil || !portal.Metadata.(*PortalMetadata).IsSuperGroup {
+ if portal == nil || !portal.Metadata.(*PortalMetadata).IsSuperGroup || portal.RoomType == database.RoomTypeSpace {
return nil
}
@@ -225,7 +227,7 @@ func (t *TelegramClient) maybePollForReactions(ctx context.Context, portal *brid
}
func (t *TelegramClient) pollForReactions(ctx context.Context, portalKey networkid.PortalKey) error {
- inputPeer, parseErr := t.inputPeerForPortalID(ctx, portalKey.ID)
+ inputPeer, _, parseErr := t.inputPeerForPortalID(ctx, portalKey.ID)
if parseErr != nil {
return parseErr
}
diff --git a/pkg/connector/startchat.go b/pkg/connector/startchat.go
index 8f26bff8..ae7ab4b1 100644
--- a/pkg/connector/startchat.go
+++ b/pkg/connector/startchat.go
@@ -53,7 +53,7 @@ func (t *TelegramClient) getResolveIdentifierResponseForUser(ctx context.Context
UserID: networkUserID,
UserInfo: userInfo,
Chat: &bridgev2.CreateChatResponse{
- PortalKey: t.makePortalKeyFromID(ids.PeerTypeUser, user.GetID()),
+ PortalKey: t.makePortalKeyFromID(ids.PeerTypeUser, user.GetID(), 0),
},
}, nil
}
@@ -70,7 +70,7 @@ func (t *TelegramClient) getResolveIdentifierResponseForUserID(ctx context.Conte
resp = &bridgev2.ResolveIdentifierResponse{
UserID: networkUserID,
Chat: &bridgev2.CreateChatResponse{
- PortalKey: t.makePortalKeyFromID(ids.PeerTypeUser, userID),
+ PortalKey: t.makePortalKeyFromID(ids.PeerTypeUser, userID, 0),
},
}
resp.Ghost, err = t.main.Bridge.GetExistingGhostByID(ctx, networkUserID)
@@ -108,7 +108,7 @@ func (t *TelegramClient) ResolveIdentifier(ctx context.Context, identifier strin
if identifier[0] == '+' {
normalized := strings.TrimPrefix(identifier, "+")
- if userID, err := t.ScopedStore.GetUserIDByPhoneNumber(ctx, normalized); err != nil {
+ if userID, err := t.main.Store.PhoneNumber.GetUserID(ctx, normalized); err != nil {
return nil, fmt.Errorf("failed to get user ID by phone number: %w", err)
} else if userID == 0 {
log.Info().Msg("Phone number not found in database")
@@ -121,7 +121,7 @@ func (t *TelegramClient) ResolveIdentifier(ctx context.Context, identifier strin
return t.getResolveIdentifierResponseForUserID(ctx, userID)
} else if match := usernameRe.FindStringSubmatch(identifier); match != nil && !strings.Contains(identifier, "__") {
// This is a username
- entityType, userID, err := t.ScopedStore.GetEntityIDByUsername(ctx, match[1])
+ entityType, userID, err := t.main.Store.Username.GetEntityID(ctx, match[1])
if entityType == ids.PeerTypeUser && (err == nil || userID != 0) {
// We know this username.
resp, err := t.getResolveIdentifierResponseForUserID(ctx, userID)
@@ -273,7 +273,7 @@ func (t *TelegramClient) CreateGroup(ctx context.Context, params *bridgev2.Group
} else if chat, ok := chats[0].(*tg.Chat); !ok {
return nil, fmt.Errorf("unexpected chat type: %T", chats[0])
} else {
- portalKey := t.makePortalKeyFromID(ids.PeerTypeChat, chat.ID)
+ portalKey := t.makePortalKeyFromID(ids.PeerTypeChat, chat.ID, 0)
if params.RoomID != "" {
portal, err := t.main.Bridge.GetPortalByKey(ctx, portalKey)
if err != nil {
diff --git a/pkg/connector/store/container.go b/pkg/connector/store/container.go
index b8892097..b18a376a 100644
--- a/pkg/connector/store/container.go
+++ b/pkg/connector/store/container.go
@@ -20,6 +20,7 @@ import (
"context"
"go.mau.fi/util/dbutil"
+ "go.mau.fi/util/exsync"
"go.mau.fi/mautrix-telegram/pkg/connector/store/upgrades"
)
@@ -28,6 +29,9 @@ type Container struct {
*dbutil.Database
TelegramFile *TelegramFileQuery
+ Username *UsernameQuery
+ PhoneNumber *PhoneNumberQuery
+ Topic *TopicQuery
}
func NewStore(db *dbutil.Database, log dbutil.DatabaseLogger) *Container {
@@ -35,6 +39,9 @@ func NewStore(db *dbutil.Database, log dbutil.DatabaseLogger) *Container {
Database: db.Child("telegram_version", upgrades.Table, log),
TelegramFile: &TelegramFileQuery{dbutil.MakeQueryHelper(db, newTelegramFile)},
+ Username: &UsernameQuery{db},
+ PhoneNumber: &PhoneNumberQuery{db},
+ Topic: &TopicQuery{db: db, existingTopics: exsync.NewSet[topicKey]()},
}
}
diff --git a/pkg/connector/store/phonenumber.go b/pkg/connector/store/phonenumber.go
new file mode 100644
index 00000000..cb7c674e
--- /dev/null
+++ b/pkg/connector/store/phonenumber.go
@@ -0,0 +1,56 @@
+// mautrix-telegram - A Matrix-Telegram puppeting bridge.
+// Copyright (C) 2025 Tulir Asokan
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package store
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+
+ "go.mau.fi/util/dbutil"
+)
+
+type PhoneNumberQuery struct {
+ db *dbutil.Database
+}
+
+const (
+ getEntityIDForPhoneNumber = "SELECT entity_id FROM telegram_phone_number WHERE phone_number=$1"
+ setPhoneNumberQuery = `
+ INSERT INTO telegram_phone_number (phone_number, entity_id)
+ VALUES ($1, $2)
+ ON CONFLICT (phone_number) DO UPDATE SET entity_id=excluded.entity_id
+ `
+ clearPhoneNumberQuery = "DELETE FROM telegram_phone_number WHERE entity_id=$1"
+)
+
+func (s *PhoneNumberQuery) GetUserID(ctx context.Context, phoneNumber string) (userID int64, err error) {
+ err = s.db.QueryRow(ctx, getEntityIDForPhoneNumber, phoneNumber).Scan(&userID)
+ if errors.Is(err, sql.ErrNoRows) {
+ err = nil
+ }
+ return
+}
+
+func (s *PhoneNumberQuery) Set(ctx context.Context, userID int64, phoneNumber string) (err error) {
+ if phoneNumber == "" {
+ _, err = s.db.Exec(ctx, clearPhoneNumberQuery, userID)
+ } else {
+ _, err = s.db.Exec(ctx, setPhoneNumberQuery, phoneNumber, userID)
+ }
+ return
+}
diff --git a/pkg/connector/store/scoped_store.go b/pkg/connector/store/scoped_store.go
index 9a799359..9ddc8741 100644
--- a/pkg/connector/store/scoped_store.go
+++ b/pkg/connector/store/scoped_store.go
@@ -21,7 +21,6 @@ import (
"database/sql"
"errors"
"fmt"
- "strings"
"go.mau.fi/util/dbutil"
@@ -71,27 +70,6 @@ const (
ON CONFLICT (user_id, entity_type, entity_id) DO UPDATE SET access_hash=excluded.access_hash
`
deleteAccessHashesForUserQuery = "DELETE FROM telegram_access_hash WHERE user_id=$1"
-
- // User Username Queries
- getUsernameQuery = "SELECT username FROM telegram_username WHERE entity_type=$1 AND entity_id=$2"
- setUsernameQuery = `
- INSERT INTO telegram_username (username, entity_type, entity_id)
- VALUES ($1, $2, $3)
- ON CONFLICT (username) DO UPDATE SET
- entity_type=excluded.entity_type,
- entity_id=excluded.entity_id
- `
- getByUsernameQuery = "SELECT entity_type, entity_id FROM telegram_username WHERE LOWER(username)=$1"
- clearUsernameQuery = `DELETE FROM telegram_username WHERE entity_type=$1 AND entity_id=$2`
-
- // User Phone Number Queries
- getEntityIDForPhoneNumber = "SELECT entity_id FROM telegram_phone_number WHERE phone_number=$1"
- setPhoneNumberQuery = `
- INSERT INTO telegram_phone_number (phone_number, entity_id)
- VALUES ($1, $2)
- ON CONFLICT (phone_number) DO UPDATE SET entity_id=excluded.entity_id
- `
- clearPhoneNumberQuery = "DELETE FROM telegram_phone_number WHERE entity_id=$1"
)
var _ updates.StateStorage = (*ScopedStore)(nil)
@@ -242,48 +220,6 @@ func (s *ScopedStore) DeleteAccessHashesForUser(ctx context.Context) (err error)
return
}
-func (s *ScopedStore) GetUsername(ctx context.Context, entityType ids.PeerType, userID int64) (username string, err error) {
- err = s.db.QueryRow(ctx, getUsernameQuery, entityType, userID).Scan(&username)
- if errors.Is(err, sql.ErrNoRows) {
- err = nil
- }
- return
-}
-
-func (s *ScopedStore) SetUsername(ctx context.Context, entityType ids.PeerType, entityID int64, username string) (err error) {
- if username == "" {
- _, err = s.db.Exec(ctx, clearUsernameQuery, entityType, entityID)
- } else {
- _, err = s.db.Exec(ctx, setUsernameQuery, username, entityType, entityID)
- }
- return
-}
-
-func (s *ScopedStore) GetEntityIDByUsername(ctx context.Context, username string) (entityType ids.PeerType, entityID int64, err error) {
- err = s.db.QueryRow(ctx, getByUsernameQuery, strings.ToLower(username)).Scan(&entityType, &entityID)
- if errors.Is(err, sql.ErrNoRows) {
- err = nil
- }
- return
-}
-
-func (s *ScopedStore) GetUserIDByPhoneNumber(ctx context.Context, phoneNumber string) (userID int64, err error) {
- err = s.db.QueryRow(ctx, getEntityIDForPhoneNumber, phoneNumber).Scan(&userID)
- if errors.Is(err, sql.ErrNoRows) {
- err = nil
- }
- return
-}
-
-func (s *ScopedStore) SetPhoneNumber(ctx context.Context, userID int64, phoneNumber string) (err error) {
- if phoneNumber == "" {
- _, err = s.db.Exec(ctx, clearPhoneNumberQuery, userID)
- } else {
- _, err = s.db.Exec(ctx, setPhoneNumberQuery, phoneNumber, userID)
- }
- return
-}
-
// Helper Functions
func (s *ScopedStore) assertUserIDMatches(userID int64) {
diff --git a/pkg/connector/store/topic.go b/pkg/connector/store/topic.go
new file mode 100644
index 00000000..ebb8b474
--- /dev/null
+++ b/pkg/connector/store/topic.go
@@ -0,0 +1,62 @@
+// mautrix-telegram - A Matrix-Telegram puppeting bridge.
+// Copyright (C) 2025 Tulir Asokan
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package store
+
+import (
+ "context"
+
+ "go.mau.fi/util/dbutil"
+ "go.mau.fi/util/exsync"
+)
+
+type topicKey struct {
+ ChannelID int64
+ TopicID int
+}
+
+type TopicQuery struct {
+ db *dbutil.Database
+ existingTopics *exsync.Set[topicKey]
+}
+
+const (
+ getAllTopicsQuery = `SELECT topic_id FROM telegram_topic WHERE channel_id=$1`
+ addTopicQuery = `INSERT INTO telegram_topic (channel_id, topic_id) VALUES ($1, $2) ON CONFLICT DO NOTHING`
+ deleteTopicQuery = `DELETE FROM telegram_topic WHERE channel_id=$1 AND topic_id=$2`
+)
+
+func (s *TopicQuery) Add(ctx context.Context, channelID int64, topicID int) (err error) {
+ if channelID == 0 {
+ return nil
+ }
+ if s.existingTopics.Add(topicKey{ChannelID: channelID, TopicID: topicID}) {
+ _, err = s.db.Exec(ctx, addTopicQuery, channelID, topicID)
+ }
+ return
+}
+
+func (s *TopicQuery) Delete(ctx context.Context, channelID int64, topicID int) (err error) {
+ s.existingTopics.Remove(topicKey{ChannelID: channelID, TopicID: topicID})
+ _, err = s.db.Exec(ctx, deleteTopicQuery, channelID, topicID)
+ return
+}
+
+var intScanner = dbutil.ConvertRowFn[int](dbutil.ScanSingleColumn[int])
+
+func (s *TopicQuery) GetAll(ctx context.Context, channelID int64) (topics []int, err error) {
+ return intScanner.NewRowIter(s.db.Query(ctx, getAllTopicsQuery, channelID)).AsList()
+}
diff --git a/pkg/connector/store/upgrades/00-latest.sql b/pkg/connector/store/upgrades/00-latest.sql
index 1c2ed722..556f4256 100644
--- a/pkg/connector/store/upgrades/00-latest.sql
+++ b/pkg/connector/store/upgrades/00-latest.sql
@@ -1,4 +1,4 @@
--- v0 -> v5 (compatible with v2+): Latest revision
+-- v0 -> v6 (compatible with v2+): Latest revision
CREATE TABLE telegram_user_state (
user_id BIGINT NOT NULL PRIMARY KEY,
@@ -51,3 +51,10 @@ CREATE TABLE telegram_file (
mime_type TEXT,
size BIGINT
);
+
+CREATE TABLE telegram_topic (
+ channel_id BIGINT NOT NULL,
+ topic_id BIGINT NOT NULL,
+
+ PRIMARY KEY (channel_id, topic_id)
+);
diff --git a/pkg/connector/store/upgrades/06-topic-index.sql b/pkg/connector/store/upgrades/06-topic-index.sql
new file mode 100644
index 00000000..6bef3ee5
--- /dev/null
+++ b/pkg/connector/store/upgrades/06-topic-index.sql
@@ -0,0 +1,8 @@
+-- v6 (compatible with v2+): Add table for topics
+
+CREATE TABLE telegram_topic (
+ channel_id BIGINT NOT NULL,
+ topic_id BIGINT NOT NULL,
+
+ PRIMARY KEY (channel_id, topic_id)
+);
diff --git a/pkg/connector/store/username.go b/pkg/connector/store/username.go
new file mode 100644
index 00000000..04fe2504
--- /dev/null
+++ b/pkg/connector/store/username.go
@@ -0,0 +1,70 @@
+// mautrix-telegram - A Matrix-Telegram puppeting bridge.
+// Copyright (C) 2025 Tulir Asokan
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package store
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+ "strings"
+
+ "go.mau.fi/util/dbutil"
+
+ "go.mau.fi/mautrix-telegram/pkg/connector/ids"
+)
+
+type UsernameQuery struct {
+ db *dbutil.Database
+}
+
+const (
+ getUsernameQuery = "SELECT username FROM telegram_username WHERE entity_type=$1 AND entity_id=$2"
+ setUsernameQuery = `
+ INSERT INTO telegram_username (username, entity_type, entity_id)
+ VALUES ($1, $2, $3)
+ ON CONFLICT (username) DO UPDATE SET
+ entity_type=excluded.entity_type,
+ entity_id=excluded.entity_id
+ `
+ getByUsernameQuery = "SELECT entity_type, entity_id FROM telegram_username WHERE LOWER(username)=$1"
+ clearUsernameQuery = `DELETE FROM telegram_username WHERE entity_type=$1 AND entity_id=$2`
+)
+
+func (s *UsernameQuery) Get(ctx context.Context, entityType ids.PeerType, userID int64) (username string, err error) {
+ err = s.db.QueryRow(ctx, getUsernameQuery, entityType, userID).Scan(&username)
+ if errors.Is(err, sql.ErrNoRows) {
+ err = nil
+ }
+ return
+}
+
+func (s *UsernameQuery) Set(ctx context.Context, entityType ids.PeerType, entityID int64, username string) (err error) {
+ if username == "" {
+ _, err = s.db.Exec(ctx, clearUsernameQuery, entityType, entityID)
+ } else {
+ _, err = s.db.Exec(ctx, setUsernameQuery, username, entityType, entityID)
+ }
+ return
+}
+
+func (s *UsernameQuery) GetEntityID(ctx context.Context, username string) (entityType ids.PeerType, entityID int64, err error) {
+ err = s.db.QueryRow(ctx, getByUsernameQuery, strings.ToLower(username)).Scan(&entityType, &entityID)
+ if errors.Is(err, sql.ErrNoRows) {
+ err = nil
+ }
+ return
+}
diff --git a/pkg/connector/sync.go b/pkg/connector/sync.go
index 970112db..207b1633 100644
--- a/pkg/connector/sync.go
+++ b/pkg/connector/sync.go
@@ -69,7 +69,7 @@ func (t *TelegramClient) resetPinnedDialogs(ctx context.Context, dialogs []tg.Di
t.userLogin.Metadata.(*UserLoginMetadata).PinnedDialogs = nil
for _, dialog := range dialogs {
if dialog.GetPinned() {
- portalKey := t.makePortalKeyFromPeer(dialog.GetPeer())
+ portalKey := t.makePortalKeyFromPeer(dialog.GetPeer(), 0)
t.userLogin.Metadata.(*UserLoginMetadata).PinnedDialogs = append(t.userLogin.Metadata.(*UserLoginMetadata).PinnedDialogs, portalKey.ID)
}
}
@@ -105,7 +105,7 @@ func (t *TelegramClient) handleDialogs(ctx context.Context, dialogs tg.ModifiedM
Logger()
log.Debug().Msg("Syncing dialog")
- portalKey := t.makePortalKeyFromPeer(dialog.GetPeer())
+ portalKey := t.makePortalKeyFromPeer(dialog.GetPeer(), 0)
portal, err := t.main.Bridge.GetPortalByKey(ctx, portalKey)
if err != nil {
return err
@@ -168,7 +168,7 @@ func (t *TelegramClient) handleDialogs(ctx context.Context, dialogs tg.ModifiedM
continue
}
var mfm *memberFetchMeta
- chatInfo, mfm, err = t.wrapChatInfo(channel)
+ chatInfo, mfm, err = t.wrapChatInfo(portal.ID, channel)
if err != nil {
return fmt.Errorf("failed to get chat info for %s: %w", portalKey, err)
}
diff --git a/pkg/connector/tomatrix.go b/pkg/connector/tomatrix.go
index 89914e4b..e0088cbe 100644
--- a/pkg/connector/tomatrix.go
+++ b/pkg/connector/tomatrix.go
@@ -143,7 +143,7 @@ func (c *TelegramClient) convertToMatrix(
}
var perMessageProfile *event.BeeperPerMessageProfile
- if peerType, _, err := ids.ParsePortalID(portal.ID); err != nil {
+ if peerType, _, _, err := ids.ParsePortalID(portal.ID); err != nil {
return nil, fmt.Errorf("failed to parse portal ID: %w", err)
} else if peerType == ids.PeerTypeChannel && !portal.Metadata.(*PortalMetadata).IsSuperGroup {
var sender *networkid.UserID
@@ -210,11 +210,13 @@ func (c *TelegramClient) convertToMatrix(
if replyTo, ok := msg.GetReplyTo(); ok {
switch replyTo := replyTo.(type) {
case *tg.MessageReplyHeader:
- cm.ReplyTo = &networkid.MessageOptionalPartID{}
- if peerID, present := replyTo.GetReplyToPeerID(); present {
- cm.ReplyTo.MessageID = ids.MakeMessageID(peerID, replyTo.ReplyToMsgID)
- } else {
- cm.ReplyTo.MessageID = ids.MakeMessageID(portal.PortalKey, replyTo.ReplyToMsgID)
+ if (replyTo.ReplyToTopID != 0 || !replyTo.ForumTopic) && replyTo.ReplyToTopID != replyTo.ReplyToMsgID {
+ cm.ReplyTo = &networkid.MessageOptionalPartID{}
+ if peerID, present := replyTo.GetReplyToPeerID(); present {
+ cm.ReplyTo.MessageID = ids.MakeMessageID(peerID, replyTo.ReplyToMsgID)
+ } else {
+ cm.ReplyTo.MessageID = ids.MakeMessageID(portal.PortalKey, replyTo.ReplyToMsgID)
+ }
}
default:
log.Warn().Type("reply_to", replyTo).Msg("unhandled reply to type")
@@ -529,7 +531,7 @@ func (c *TelegramClient) convertMediaRequiringUpload(
if err != nil {
if tgerr.Is(err, tg.ErrFileReferenceExpired) && allowRefetch {
log.Warn().Err(err).Msg("Failed to transfer media, trying to refetch from message")
- peerType, peerID, err := ids.ParsePortalID(portal.ID)
+ peerType, peerID, _, err := ids.ParsePortalID(portal.ID)
if err != nil {
log.Err(err).Msg("Failed to parse portal ID to refetch media")
} else if msgMedia, err = c.refetchMedia(ctx, peerType, peerID, msgID); err != nil {
diff --git a/pkg/connector/userinfo.go b/pkg/connector/userinfo.go
index b7d3094c..f4269be7 100644
--- a/pkg/connector/userinfo.go
+++ b/pkg/connector/userinfo.go
@@ -56,7 +56,7 @@ func (t *TelegramClient) wrapChannelGhostInfo(ctx context.Context, channel *tg.C
var identifiers []string
if username, set := channel.GetUsername(); set {
- err = t.ScopedStore.SetUsername(ctx, ids.PeerTypeChannel, channel.ID, username)
+ err = t.main.Store.Username.Set(ctx, ids.PeerTypeChannel, channel.ID, username)
if err != nil {
return nil, err
}
@@ -88,7 +88,7 @@ func (t *TelegramClient) wrapUserInfo(ctx context.Context, u tg.UserClass) (*bri
}
}
- if err := t.ScopedStore.SetUsername(ctx, ids.PeerTypeUser, user.ID, user.Username); err != nil {
+ if err := t.main.Store.Username.Set(ctx, ids.PeerTypeUser, user.ID, user.Username); err != nil {
return nil, err
}
@@ -101,7 +101,7 @@ func (t *TelegramClient) wrapUserInfo(ctx context.Context, u tg.UserClass) (*bri
if phone, ok := user.GetPhone(); ok {
normalized := strings.TrimPrefix(phone, "+")
identifiers = append(identifiers, fmt.Sprintf("tel:+%s", normalized))
- if err := t.ScopedStore.SetPhoneNumber(ctx, user.ID, normalized); err != nil {
+ if err := t.main.Store.PhoneNumber.Set(ctx, user.ID, normalized); err != nil {
return nil, err
}
}