diff --git a/pkg/connector/backfill.go b/pkg/connector/backfill.go
index 86b93967..1db18ead 100644
--- a/pkg/connector/backfill.go
+++ b/pkg/connector/backfill.go
@@ -55,7 +55,7 @@ func (t *TelegramClient) getTakeoutID(ctx context.Context) (takeoutID int64, err
// Resume fetching dialogs using takeout and enqueueing them for
// backfill.
go t.takeoutDialogsOnce.Do(func() {
- if err = t.takeoutDialogs(ctx, takeoutID); err != nil {
+ if err = t.syncChats(ctx, takeoutID, false); err != nil {
log.Err(err).Msg("Failed to takeout dialogs")
}
})
@@ -88,7 +88,7 @@ func (t *TelegramClient) getTakeoutID(ctx context.Context) (takeoutID int64, err
// Fetch all dialogs using takeout and enqueue them for backfill.
go t.takeoutDialogsOnce.Do(func() {
- if err = t.takeoutDialogs(ctx, takeoutID); err != nil {
+ if err = t.syncChats(ctx, takeoutID, false); err != nil {
log.Err(err).Msg("Failed to takeout dialogs")
}
})
@@ -98,84 +98,6 @@ func (t *TelegramClient) getTakeoutID(ctx context.Context) (takeoutID int64, err
}
}
-func (t *TelegramClient) takeoutDialogs(ctx context.Context, takeoutID int64) error {
- log := zerolog.Ctx(ctx).With().Str("loop", "chat_fetch").Logger()
- if t.metadata.TakeoutDialogCrawlDone {
- log.Debug().Msg("Dialogs already crawled")
- return nil
- }
-
- req := tg.MessagesGetDialogsRequest{
- Limit: 100,
- OffsetPeer: &tg.InputPeerEmpty{},
- }
- if t.metadata.TakeoutDialogCrawlCursor != "" {
- var err error
- req.OffsetPeer, _, err = t.inputPeerForPortalID(ctx, t.metadata.TakeoutDialogCrawlCursor)
- if err != nil {
- return fmt.Errorf("failed to get input peer for pagination: %w", err)
- }
- }
- for {
- log.Info().Stringer("cursor", req.OffsetPeer).Msg("Fetching dialogs")
- dialogs, err := APICallWithUpdates(ctx, t, func() (tg.ModifiedMessagesDialogs, error) {
- var dialogs tg.MessagesDialogsBox
- err := t.client.Invoke(ctx,
- &tg.InvokeWithTakeoutRequest{TakeoutID: takeoutID, Query: &req},
- &dialogs)
- if err != nil {
- return nil, err
- } else if modified, ok := dialogs.Dialogs.AsModified(); !ok {
- return nil, fmt.Errorf("unexpected response type: %T", dialogs.Dialogs)
- } else {
- return modified, nil
- }
- })
- if err != nil {
- return fmt.Errorf("failed to get dialogs: %w", err)
- } else if len(dialogs.GetDialogs()) == 0 {
- t.metadata.TakeoutDialogCrawlDone = true
- if err = t.userLogin.Save(ctx); err != nil {
- return fmt.Errorf("failed to save user login: %w", err)
- }
- log.Debug().Msg("No more dialogs found")
- return nil
- }
-
- if req.OffsetPeer.TypeID() == tg.InputPeerEmptyTypeID {
- // This is the first fetch of dialogs, reset the pinned dialogs
- // based on the list.
- if err := t.resetPinnedDialogs(ctx, dialogs.GetDialogs()); err != nil {
- return err
- }
- }
-
- err = t.handleDialogs(ctx, dialogs, -1)
- if err != nil {
- return fmt.Errorf("failed to handle dialogs: %w", err)
- }
-
- portalKey := t.makePortalKeyFromPeer(dialogs.GetDialogs()[len(dialogs.GetDialogs())-1].GetPeer(), 0)
-
- if t.metadata.TakeoutDialogCrawlCursor == portalKey.ID {
- t.metadata.TakeoutDialogCrawlDone = true
- t.metadata.TakeoutDialogCrawlCursor = ""
- log.Debug().Msg("No more dialogs found")
- return nil
- } else {
- t.metadata.TakeoutDialogCrawlCursor = portalKey.ID
- }
- if err = t.userLogin.Save(ctx); err != nil {
- return fmt.Errorf("failed to save user login: %w", err)
- }
-
- req.OffsetPeer, _, err = t.inputPeerForPortalID(ctx, portalKey.ID)
- if err != nil {
- return fmt.Errorf("failed to get input peer for pagination: %w", err)
- }
- }
-}
-
func (t *TelegramClient) stopTakeout(ctx context.Context) error {
t.takeoutLock.Lock()
defer t.takeoutLock.Unlock()
@@ -197,8 +119,7 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2
var takeoutID int64
var err error
- // TODO use takeout for forward backfill if already available
- if !fetchParams.Forward { // Backwards
+ if (t.main.Config.Takeout.ForwardBackfill && fetchParams.Forward) || (t.main.Config.Takeout.BackwardBackfill && !fetchParams.Forward) {
t.takeoutLock.Lock()
defer t.takeoutLock.Unlock()
takeoutID, err = t.getTakeoutID(ctx)
@@ -261,6 +182,7 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2
log.Info().Any("req", req).Msg("Fetching messages")
msgs, err := APICallWithUpdates(ctx, t, func() (tg.ModifiedMessagesMessages, error) {
var box tg.MessagesMessagesBox
+ // TODO a single request can only fetch 100 messages, use multiple requests if the requested count is higher
err = t.client.Invoke(ctx, req, &box)
if err != nil {
return nil, err
diff --git a/pkg/connector/chatsync.go b/pkg/connector/chatsync.go
new file mode 100644
index 00000000..42e87674
--- /dev/null
+++ b/pkg/connector/chatsync.go
@@ -0,0 +1,356 @@
+// mautrix-telegram - A Matrix-Telegram puppeting bridge.
+// Copyright (C) 2025 Sumner Evans
+// Copyright (C) 2026 Tulir Asokan
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package connector
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/rs/zerolog"
+ "maunium.net/go/mautrix/bridgev2"
+ "maunium.net/go/mautrix/bridgev2/database"
+ "maunium.net/go/mautrix/bridgev2/networkid"
+ "maunium.net/go/mautrix/bridgev2/simplevent"
+
+ "go.mau.fi/mautrix-telegram/pkg/connector/ids"
+ "go.mau.fi/mautrix-telegram/pkg/gotd/bin"
+ "go.mau.fi/mautrix-telegram/pkg/gotd/tg"
+ "go.mau.fi/mautrix-telegram/pkg/gotd/tgerr"
+)
+
+func (t *TelegramClient) syncChats(ctx context.Context, takeoutID int64, onLogin bool) error {
+ if takeoutID != 0 && !t.main.Config.Takeout.DialogSync {
+ return nil
+ }
+ logWith := zerolog.Ctx(ctx).With().Str("loop", "chat sync")
+ if onLogin {
+ logWith = logWith.Bool("on_login", true)
+ }
+ if takeoutID != 0 {
+ logWith = logWith.Int64("takeout_id", takeoutID)
+ }
+ log := logWith.Logger()
+
+ if !t.syncChatsLock.TryLock() {
+ log.Warn().Msg("Waiting for chat sync lock")
+ t.syncChatsLock.Lock()
+ log.Debug().Msg("Acquired chat sync lock after waiting")
+ }
+ defer t.syncChatsLock.Unlock()
+
+ if t.metadata.DialogSyncComplete {
+ log.Debug().Msg("Dialogs already synced")
+ return nil
+ }
+
+ isFullSync := true
+ updateLimit := subtractLimit(t.main.Config.Sync.UpdateLimit, t.metadata.DialogSyncCount)
+ if onLogin && t.main.Config.Takeout.DialogSync {
+ updateLimit = t.main.Config.Sync.LoginLimit
+ isFullSync = false
+ }
+ createLimit := subtractLimit(t.main.Config.Sync.CreateLimit, t.metadata.DialogSyncCount)
+
+ var req tg.MessagesGetDialogsRequest
+ isFirst := true
+ if t.metadata.DialogSyncCursor != "" {
+ isFirst = false
+ var err error
+ req.OffsetPeer, _, err = t.inputPeerForPortalID(ctx, t.metadata.DialogSyncCursor)
+ if err != nil {
+ return fmt.Errorf("failed to get input peer for pagination: %w", err)
+ }
+ } else {
+ req.OffsetPeer = &tg.InputPeerEmpty{}
+ }
+ var wrappedReq bin.Object
+ if takeoutID != 0 {
+ wrappedReq = &tg.InvokeWithTakeoutRequest{TakeoutID: takeoutID, Query: &req}
+ } else {
+ wrappedReq = &req
+ }
+ for updateLimit < 0 || updateLimit > 0 {
+ if updateLimit < 0 {
+ req.Limit = 100
+ } else {
+ req.Limit = min(100, updateLimit)
+ }
+ log.Info().
+ Stringer("request", &req).
+ Int("update_limit", updateLimit).
+ Int("create_limit", createLimit).
+ Msg("Fetching dialogs")
+ dialogs, err := APICallWithUpdates(ctx, t, func() (tg.ModifiedMessagesDialogs, error) {
+ var dialogs tg.MessagesDialogsBox
+ retry := true
+ var err error
+ for retry {
+ retry, err = tgerr.FloodWait(ctx, t.client.Invoke(ctx, wrappedReq, &dialogs))
+ }
+ if err != nil {
+ return nil, err
+ } else if modified, ok := dialogs.Dialogs.AsModified(); !ok {
+ return nil, fmt.Errorf("unexpected response type: %T", dialogs.Dialogs)
+ } else {
+ return modified, nil
+ }
+ })
+ if err != nil {
+ return fmt.Errorf("failed to get dialogs: %w", err)
+ } else if len(dialogs.GetDialogs()) == 0 {
+ log.Debug().Msg("No more dialogs found (empty response)")
+ break
+ }
+
+ if isFirst {
+ // This is the first fetch of dialogs, reset the pinned dialogs based on the list.
+ if err = t.resetPinnedDialogs(ctx, dialogs.GetDialogs()); err != nil {
+ return fmt.Errorf("failed to save pinned dialogs: %w", err)
+ }
+ }
+ isFirst = false
+
+ dialogList := dialogs.GetDialogs()
+ if updateLimit > 0 && len(dialogList) > updateLimit {
+ dialogList = dialogList[:updateLimit]
+ }
+ err = t.handleDialogs(ctx, dialogList, dialogs, createLimit)
+ if err != nil {
+ return fmt.Errorf("failed to handle dialogs: %w", err)
+ }
+ updateLimit = subtractLimit(updateLimit, len(dialogList))
+ createLimit = subtractLimit(createLimit, len(dialogList))
+
+ cursorPortalKey := t.makePortalKeyFromPeer(dialogList[len(dialogList)-1].GetPeer(), 0)
+ if t.metadata.DialogSyncCursor == cursorPortalKey.ID {
+ log.Debug().Msg("No more dialogs found (last dialog is same as old cursor)")
+ break
+ }
+ t.metadata.DialogSyncCursor = cursorPortalKey.ID
+ t.metadata.DialogSyncCount += len(dialogList)
+ if err = t.userLogin.Save(ctx); err != nil {
+ return fmt.Errorf("failed to save user login to update cursor: %w", err)
+ }
+
+ req.OffsetPeer, _, err = t.inputPeerForPortalID(ctx, cursorPortalKey.ID)
+ if err != nil {
+ return fmt.Errorf("failed to get input peer for pagination: %w", err)
+ }
+ }
+ if isFullSync {
+ t.metadata.DialogSyncComplete = true
+ t.metadata.DialogSyncCursor = ""
+ t.metadata.DialogSyncCount = 0
+ if err := t.userLogin.Save(ctx); err != nil {
+ return fmt.Errorf("failed to save user login after successful sync: %w", err)
+ }
+ }
+ log.Info().Msg("Finished dialog sync")
+ return nil
+}
+
+func subtractLimit(limit, count int) int {
+ if limit < 0 {
+ return limit
+ }
+ limit -= count
+ if limit < 0 {
+ return 0
+ }
+ return limit
+}
+
+func (t *TelegramClient) resetPinnedDialogs(ctx context.Context, dialogs []tg.DialogClass) error {
+ t.metadata.PinnedDialogs = nil
+ for _, dialog := range dialogs {
+ if dialog.GetPinned() {
+ portalKey := t.makePortalKeyFromPeer(dialog.GetPeer(), 0)
+ t.metadata.PinnedDialogs = append(t.metadata.PinnedDialogs, portalKey.ID)
+ }
+ }
+ return t.userLogin.Save(ctx)
+}
+
+func (t *TelegramClient) handleDialogs(ctx context.Context, dialogList []tg.DialogClass, meta tg.ModifiedMessagesDialogs, createLimit int) error {
+ log := zerolog.Ctx(ctx)
+
+ users := map[int64]tg.UserClass{}
+ for _, user := range meta.GetUsers() {
+ users[user.GetID()] = user
+ }
+ chats := map[int64]tg.ChatClass{}
+ for _, chat := range meta.GetChats() {
+ chats[chat.GetID()] = chat
+ }
+ messages := map[networkid.MessageID]tg.MessageClass{}
+ for _, message := range meta.GetMessages() {
+ messages[ids.GetMessageIDFromMessage(message)] = message
+ }
+
+ for i, d := range dialogList {
+ dialog, ok := d.(*tg.Dialog)
+ if !ok {
+ continue
+ }
+
+ log := log.With().
+ Stringer("peer", dialog.Peer).
+ Int("top_message", dialog.TopMessage).
+ Logger()
+ log.Debug().Msg("Syncing dialog")
+
+ portalKey := t.makePortalKeyFromPeer(dialog.GetPeer(), 0)
+ portal, err := t.main.Bridge.GetPortalByKey(ctx, portalKey)
+ if err != nil {
+ return err
+ }
+ if dialog.UnreadCount == 0 && !dialog.UnreadMark {
+ portal.Metadata.(*PortalMetadata).ReadUpTo = dialog.TopMessage
+ }
+
+ var chatInfo *bridgev2.ChatInfo
+ switch peer := dialog.Peer.(type) {
+ case *tg.PeerUser:
+ switch user := users[peer.UserID].(type) {
+ case *tg.User:
+ if user.GetDeleted() {
+ log.Debug().Int64("user_id", peer.UserID).Msg("Not syncing portal because user is deleted")
+ continue
+ }
+ chatInfo, err = t.getDMChatInfo(ctx, peer.UserID)
+ if err != nil {
+ return fmt.Errorf("failed to get dm info for %d: %w", peer.UserID, err)
+ }
+ default:
+ log.Debug().
+ Int64("user_id", peer.UserID).
+ Type("user_type", user).
+ Msg("Not syncing portal because user type is unsupported")
+ continue
+ }
+ case *tg.PeerChat:
+ switch chat := chats[peer.ChatID].(type) {
+ case *tg.Chat:
+ // Need to get full chat info to get the member list
+ chatInfo, err = t.GetChatInfo(ctx, portal)
+ if err != nil {
+ return fmt.Errorf("failed to get chat info for %s: %w", portalKey, err)
+ }
+ case *tg.ChatForbidden:
+ log.Debug().
+ Int64("chat_id", peer.ChatID).
+ Msg("Not syncing portal because chat is forbidden")
+ continue
+ default:
+ log.Debug().
+ Int64("chat_id", peer.ChatID).
+ Type("chat_type", chat).
+ Msg("Not syncing portal because chat type is unsupported")
+ continue
+ }
+ case *tg.PeerChannel:
+ switch channel := chats[peer.ChannelID].(type) {
+ case *tg.Channel:
+ var mfm *memberFetchMeta
+ chatInfo, mfm, err = t.wrapChatInfo(portal.ID, channel)
+ if err != nil {
+ return fmt.Errorf("failed to get chat info for %s: %w", portalKey, err)
+ }
+ err = t.fillChannelMembers(ctx, mfm, chatInfo.Members)
+ if err != nil {
+ log.Err(err).Msg("Failed to get channel members")
+ }
+ case *tg.ChannelForbidden:
+ log.Debug().
+ Int64("channel_id", peer.ChannelID).
+ Msg("Not syncing portal because channel is forbidden")
+ continue
+ default:
+ log.Debug().
+ Int64("channel_id", peer.ChannelID).
+ Type("channel_type", channel).
+ Msg("Not syncing portal because channel type is unsupported")
+ continue
+ }
+ }
+
+ if portal.MXID == "" {
+ // Check what the latest message is
+ topMessage := messages[ids.MakeMessageID(dialog.Peer, dialog.TopMessage)]
+ if topMessage == nil {
+ if dialog.TopMessage == 0 {
+ log.Debug().Msg("Not syncing portal because there are no messages")
+ continue
+ }
+ log.Warn().Msg("TopMessage of dialog not in messages map")
+ } else if topMessage.TypeID() == tg.MessageServiceTypeID {
+ action := topMessage.(*tg.MessageService).Action
+ if action.TypeID() == tg.MessageActionContactSignUpTypeID || action.TypeID() == tg.MessageActionHistoryClearTypeID {
+ log.Debug().Str("action_type", action.TypeName()).Msg("Not syncing portal because it's a contact sign up or history clear")
+ continue
+ }
+ }
+
+ if createLimit >= 0 && i >= createLimit {
+ continue
+ }
+ }
+
+ t.fillUserLocalMeta(chatInfo, dialog)
+
+ res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{
+ ChatInfo: chatInfo,
+ EventMeta: simplevent.EventMeta{
+ Type: bridgev2.RemoteEventChatResync,
+ LogContext: func(c zerolog.Context) zerolog.Context {
+ return c.Str("update", "sync")
+ },
+ PortalKey: portalKey,
+ CreatePortal: true,
+ },
+ CheckNeedsBackfillFunc: func(ctx context.Context, latestMessage *database.Message) (bool, error) {
+ if latestMessage == nil {
+ return true, nil
+ }
+ _, latestMessageID, err := ids.ParseMessageID(latestMessage.ID)
+ if err != nil {
+ panic(err)
+ }
+ return dialog.TopMessage > latestMessageID, nil
+ },
+ })
+ if err = resultToError(res); err != nil {
+ return err
+ }
+
+ // Generate a read receipt from the last known read message id
+ res = t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Receipt{
+ EventMeta: simplevent.EventMeta{
+ Type: bridgev2.RemoteEventReadReceipt,
+ PortalKey: portalKey,
+ Sender: t.mySender(),
+ },
+ LastTarget: ids.MakeMessageID(portalKey, dialog.ReadInboxMaxID),
+ ReadUpToStreamOrder: int64(dialog.ReadInboxMaxID),
+ })
+ if err = resultToError(res); err != nil {
+ return err
+ }
+ }
+ return nil
+}
diff --git a/pkg/connector/client.go b/pkg/connector/client.go
index 1ab5499f..8b4e3f27 100644
--- a/pkg/connector/client.go
+++ b/pkg/connector/client.go
@@ -110,6 +110,7 @@ type TelegramClient struct {
takeoutAccepted *exsync.Event
stopTakeoutTimer *time.Timer
takeoutDialogsOnce sync.Once
+ syncChatsLock sync.Mutex
prevReactionPoll map[networkid.PortalKey]time.Time
prevReactionPollLock sync.Mutex
diff --git a/pkg/connector/commands.go b/pkg/connector/commands.go
index f7571fab..0cbb9fec 100644
--- a/pkg/connector/commands.go
+++ b/pkg/connector/commands.go
@@ -54,7 +54,7 @@ func fnSync(ce *commands.Event) {
wg.Add(1)
go func() {
defer wg.Done()
- if err := client.SyncChats(ce.Ctx); err != nil {
+ if err := client.syncChats(ce.Ctx, 0, false); err != nil {
ce.Reply("Failed to synchronize chats for %s: %v", login.ID, err)
}
}()
diff --git a/pkg/connector/config.go b/pkg/connector/config.go
index 29f77625..5d14bd34 100644
--- a/pkg/connector/config.go
+++ b/pkg/connector/config.go
@@ -68,9 +68,16 @@ type TelegramConfig struct {
Sync struct {
UpdateLimit int `yaml:"update_limit"`
CreateLimit int `yaml:"create_limit"`
+ LoginLimit int `yaml:"login_sync_limit"`
DirectChats bool `yaml:"direct_chats"`
} `yaml:"sync"`
+ Takeout struct {
+ DialogSync bool `yaml:"dialog_sync"`
+ ForwardBackfill bool `yaml:"forward_backfill"`
+ BackwardBackfill bool `yaml:"backward_backfill"`
+ } `yaml:"takeout"`
+
ContactAvatars bool `yaml:"contact_avatars"`
ContactNames bool `yaml:"contact_names"`
MaxMemberCount int `yaml:"max_member_count"`
@@ -110,7 +117,11 @@ func upgradeConfig(helper up.Helper) {
helper.Copy(up.Int, "ping", "timeout_seconds")
helper.Copy(up.Int, "sync", "update_limit")
helper.Copy(up.Int, "sync", "create_limit")
+ helper.Copy(up.Int, "sync", "login_sync_limit")
helper.Copy(up.Bool, "sync", "direct_chats")
+ helper.Copy(up.Bool, "takeout", "dialog_sync")
+ helper.Copy(up.Bool, "takeout", "forward_backfill")
+ helper.Copy(up.Bool, "takeout", "backward_backfill")
helper.Copy(up.Bool, "contact_avatars")
helper.Copy(up.Bool, "contact_names")
helper.Copy(up.Int, "max_member_count")
@@ -130,6 +141,7 @@ func (tg *TelegramConnector) GetConfig() (example string, data any, upgrader up.
{"member_list"},
{"ping"},
{"sync"},
+ {"takeout"},
{"max_member_count"},
},
Base: ExampleConfig,
diff --git a/pkg/connector/example-config.yaml b/pkg/connector/example-config.yaml
index acf3d8b7..7d829638 100644
--- a/pkg/connector/example-config.yaml
+++ b/pkg/connector/example-config.yaml
@@ -56,14 +56,29 @@ ping:
sync:
# Number of most recently active dialogs to check when syncing chats.
- # Set to 0 to remove limit.
- update_limit: 0
- # Number of most recently active dialogs to create portals for when syncing
- # chats.
- # Set to 0 to remove limit.
+ # Set to -1 to remove limit.
+ update_limit: 100
+ # Number of most recently active dialogs to create portals for when syncing chats.
+ # Set to -1 to remove limit.
create_limit: 15
+ # Number of chats to sync immediately on login before the data export is accepted.
+ # The create_limit above still applies. This is ignored if takeout.dialog_sync is false.
+ login_sync_limit: 15
# Whether or not to sync and create portals for direct chats at startup.
- direct_chats: false
+ direct_chats: true
+
+takeout:
+ # Should the bridge use the data export mode for syncing the full chat list?
+ # If true, login_sync_limit of chats is synced immediately on login,
+ # then the rest are synced after the takeout is accepted.
+ dialog_sync: false
+ # Should the bridge use the data export mode for forward backfilling messages?
+ # This should be set to true if the forward backfill limits are set to high values,
+ # but is probably not necessary otherwise.
+ forward_backfill: false
+ # Should the bridge use the data export mode for backward backfilling messages?
+ # This only affects the backfill queue, which is only available on Beeper.
+ backward_backfill: false
# Maximum number of participants in chats to bridge. Only applies when the
# portal is being created. If there are more members when trying to create a
diff --git a/pkg/connector/login.go b/pkg/connector/login.go
index b01151de..7495a3b9 100644
--- a/pkg/connector/login.go
+++ b/pkg/connector/login.go
@@ -241,7 +241,7 @@ func (bl *baseLogin) finalizeLogin(
err := client.clientInitialized.Wait(bgCtx)
if err != nil {
log.Err(err).Msg("Failed to wait for client init to sync chats after login")
- } else if err = client.SyncChats(log.WithContext(client.clientCtx)); err != nil {
+ } else if err = client.syncChats(log.WithContext(client.clientCtx), 0, true); err != nil {
log.Err(err).Msg("Failed to sync chats")
}
}()
@@ -250,6 +250,9 @@ func (bl *baseLogin) finalizeLogin(
if metadata.IsBot {
return
}
+ if !bl.main.Config.Takeout.BackwardBackfill && !bl.main.Config.Takeout.ForwardBackfill && !bl.main.Config.Takeout.DialogSync {
+ return
+ }
log := ul.Log.With().Str("component", "post-login takeout").Logger()
client.takeoutLock.Lock()
defer client.takeoutLock.Unlock()
diff --git a/pkg/connector/metadata.go b/pkg/connector/metadata.go
index 33799180..fa87c85d 100644
--- a/pkg/connector/metadata.go
+++ b/pkg/connector/metadata.go
@@ -57,7 +57,7 @@ func (gm *GhostMetadata) IsMin() bool {
type PortalMetadata struct {
IsSuperGroup bool `json:"is_supergroup,omitempty"`
IsForumGeneral bool `json:"is_forum_general,omitempty"`
- ReadUpTo int `json:"read_up_to,omitempty"`
+ ReadUpTo int `json:"read_up_to,omitempty"` // FIXME this shouldn't be here
AllowedReactions []string `json:"allowed_reactions"`
LastSync jsontime.Unix `json:"last_sync,omitempty"`
FullSynced bool `json:"full_synced,omitempty"`
@@ -90,8 +90,9 @@ type UserLoginMetadata struct {
TakeoutInvalidated bool `json:"takeout_invalidated,omitempty"`
- TakeoutDialogCrawlDone bool `json:"takeout_portal_crawl_done,omitempty"`
- TakeoutDialogCrawlCursor networkid.PortalID `json:"takeout_portal_crawl_cursor,omitempty"`
+ DialogSyncComplete bool `json:"takeout_portal_crawl_done,omitempty"`
+ DialogSyncCursor networkid.PortalID `json:"takeout_portal_crawl_cursor,omitempty"`
+ DialogSyncCount int `json:"dialog_sync_count,omitempty"`
PinnedDialogs []networkid.PortalID `json:"pinned_dialogs,omitempty"`
@@ -101,9 +102,10 @@ type UserLoginMetadata struct {
func (u *UserLoginMetadata) ResetOnLogout() {
u.Session.AuthKey = nil
u.TakeoutID = 0
- u.TakeoutDialogCrawlDone = false
u.TakeoutInvalidated = false
- u.TakeoutDialogCrawlCursor = networkid.PortalID("")
+ u.DialogSyncComplete = false
+ u.DialogSyncCursor = networkid.PortalID("")
+ u.DialogSyncCount = 0
u.PushEncryptionKey = nil
}
diff --git a/pkg/connector/sync.go b/pkg/connector/sync.go
deleted file mode 100644
index d456c4fe..00000000
--- a/pkg/connector/sync.go
+++ /dev/null
@@ -1,249 +0,0 @@
-// mautrix-telegram - A Matrix-Telegram puppeting bridge.
-// Copyright (C) 2025 Sumner Evans
-//
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with this program. If not, see .
-
-package connector
-
-import (
- "context"
- "fmt"
- "math"
-
- "github.com/rs/zerolog"
- "maunium.net/go/mautrix/bridgev2"
- "maunium.net/go/mautrix/bridgev2/database"
- "maunium.net/go/mautrix/bridgev2/networkid"
- "maunium.net/go/mautrix/bridgev2/simplevent"
-
- "go.mau.fi/mautrix-telegram/pkg/connector/ids"
- "go.mau.fi/mautrix-telegram/pkg/gotd/tg"
-)
-
-func (t *TelegramClient) SyncChats(ctx context.Context) error {
- limit := t.main.Config.Sync.UpdateLimit
- if limit <= 0 {
- limit = math.MaxInt32
- }
- zerolog.Ctx(ctx).Info().
- Int("update_limit", limit).
- Int("create_limit", t.main.Config.Sync.CreateLimit).
- Msg("syncing chats")
-
- dialogs, err := APICallWithUpdates(ctx, t, func() (tg.ModifiedMessagesDialogs, error) {
- d, err := t.client.API().MessagesGetDialogs(ctx, &tg.MessagesGetDialogsRequest{
- Limit: limit,
- OffsetPeer: &tg.InputPeerEmpty{},
- })
- if err != nil {
- return nil, err
- } else if dialogs, ok := d.(tg.ModifiedMessagesDialogs); !ok {
- return nil, fmt.Errorf("unexpected dialogs type %T", d)
- } else {
- return dialogs, nil
- }
- })
- if err != nil {
- return err
- }
-
- if err := t.resetPinnedDialogs(ctx, dialogs.GetDialogs()); err != nil {
- return err
- }
-
- return t.handleDialogs(ctx, dialogs, t.main.Config.Sync.CreateLimit)
-}
-
-func (t *TelegramClient) resetPinnedDialogs(ctx context.Context, dialogs []tg.DialogClass) error {
- t.metadata.PinnedDialogs = nil
- for _, dialog := range dialogs {
- if dialog.GetPinned() {
- portalKey := t.makePortalKeyFromPeer(dialog.GetPeer(), 0)
- t.metadata.PinnedDialogs = append(t.metadata.PinnedDialogs, portalKey.ID)
- }
- }
- return t.userLogin.Save(ctx)
-}
-
-func (t *TelegramClient) handleDialogs(ctx context.Context, dialogs tg.ModifiedMessagesDialogs, createLimit int) error {
- log := zerolog.Ctx(ctx)
-
- users := map[networkid.UserID]tg.UserClass{}
- for _, user := range dialogs.GetUsers() {
- users[ids.MakeUserID(user.GetID())] = user
- }
- chats := map[int64]tg.ChatClass{}
- for _, chat := range dialogs.GetChats() {
- chats[chat.GetID()] = chat
- }
- messages := map[networkid.MessageID]tg.MessageClass{}
- for _, message := range dialogs.GetMessages() {
- messages[ids.GetMessageIDFromMessage(message)] = message
- }
-
- var created int
- for _, d := range dialogs.GetDialogs() {
- dialog, ok := d.(*tg.Dialog)
- if !ok {
- continue
- }
-
- log := log.With().
- Stringer("peer", dialog.Peer).
- Int("top_message", dialog.TopMessage).
- Logger()
- log.Debug().Msg("Syncing dialog")
-
- portalKey := t.makePortalKeyFromPeer(dialog.GetPeer(), 0)
- portal, err := t.main.Bridge.GetPortalByKey(ctx, portalKey)
- if err != nil {
- return err
- }
- if dialog.UnreadCount == 0 && !dialog.UnreadMark {
- portal.Metadata.(*PortalMetadata).ReadUpTo = dialog.TopMessage
- }
-
- // If this is a DM, make sure that the user isn't deleted.
- if user, ok := dialog.Peer.(*tg.PeerUser); ok {
- if users[ids.MakeUserID(user.UserID)].(*tg.User).GetDeleted() {
- log.Debug().Msg("Not syncing portal because user is deleted")
- continue
- }
- }
-
- var chatInfo *bridgev2.ChatInfo
- switch peer := dialog.Peer.(type) {
- case *tg.PeerUser:
- userID := ids.MakeUserID(peer.UserID)
- if users[userID].(*tg.User).GetDeleted() {
- log.Debug().Int64("user_id", peer.UserID).Msg("Not syncing portal because user is deleted")
- continue
- }
- chatInfo, err = t.getDMChatInfo(ctx, peer.UserID)
- if err != nil {
- return fmt.Errorf("failed to get dm info for %d: %w", peer.UserID, err)
- }
- case *tg.PeerChat:
- chat := chats[peer.ChatID]
- if chat.TypeID() == tg.ChatForbiddenTypeID {
- log.Debug().
- Int64("chat_id", peer.ChatID).
- Msg("Not syncing portal because chat is forbidden")
- continue
- } else if chat.TypeID() != tg.ChatTypeID {
- log.Debug().
- Int64("chat_id", peer.ChatID).
- Type("chat_type", chat).
- Msg("Not syncing portal because chat type is unsupported")
- continue
- }
- // Need to get full chat info to get the member list
- chatInfo, err = t.GetChatInfo(ctx, portal)
- if err != nil {
- return fmt.Errorf("failed to get chat info for %s: %w", portalKey, err)
- }
- case *tg.PeerChannel:
- channel := chats[peer.ChannelID]
- if channel.TypeID() == tg.ChannelForbiddenTypeID {
- log.Debug().
- Int64("channel_id", peer.ChannelID).
- Msg("Not syncing portal because channel is forbidden")
- continue
- } else if channel.TypeID() != tg.ChannelTypeID {
- log.Debug().
- Int64("channel_id", peer.ChannelID).
- Type("channel_type", channel).
- Msg("Not syncing portal because channel type is unsupported")
- continue
- }
- var mfm *memberFetchMeta
- chatInfo, mfm, err = t.wrapChatInfo(portal.ID, channel)
- if err != nil {
- return fmt.Errorf("failed to get chat info for %s: %w", portalKey, err)
- }
- err = t.fillChannelMembers(ctx, mfm, chatInfo.Members)
- if err != nil {
- log.Err(err).Msg("Failed to get channel members")
- }
- }
-
- if portal == nil || portal.MXID == "" {
- // Check what the latest message is
- topMessage := messages[ids.MakeMessageID(dialog.Peer, dialog.TopMessage)]
- if topMessage == nil {
- if dialog.TopMessage == 0 {
- log.Debug().Msg("Not syncing portal because there are no messages")
- continue
- } else {
- log.Warn().Msg("TopMessage of dialog not in messages map")
- }
- } else if topMessage.TypeID() == tg.MessageServiceTypeID {
- action := topMessage.(*tg.MessageService).Action
- if action.TypeID() == tg.MessageActionContactSignUpTypeID || action.TypeID() == tg.MessageActionHistoryClearTypeID {
- log.Debug().Str("action_type", action.TypeName()).Msg("Not syncing portal because it's a contact sign up or history clear")
- continue
- }
- }
-
- created++ // The portal will have to be created
- if createLimit >= 0 && created > createLimit {
- break
- }
- }
-
- t.fillUserLocalMeta(chatInfo, dialog)
-
- res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{
- ChatInfo: chatInfo,
- EventMeta: simplevent.EventMeta{
- Type: bridgev2.RemoteEventChatResync,
- LogContext: func(c zerolog.Context) zerolog.Context {
- return c.Str("update", "sync")
- },
- PortalKey: portalKey,
- CreatePortal: true,
- },
- CheckNeedsBackfillFunc: func(ctx context.Context, latestMessage *database.Message) (bool, error) {
- if latestMessage == nil {
- return true, nil
- }
- _, latestMessageID, err := ids.ParseMessageID(latestMessage.ID)
- if err != nil {
- panic(err)
- }
- return dialog.TopMessage > latestMessageID, nil
- },
- })
-
- if err = resultToError(res); err != nil {
- return err
- }
-
- // Generate a read receipt from the last known read message id
- res = t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Receipt{
- EventMeta: simplevent.EventMeta{
- Type: bridgev2.RemoteEventReadReceipt,
- PortalKey: portalKey,
- Sender: t.mySender(),
- },
- LastTarget: ids.MakeMessageID(portalKey, dialog.ReadInboxMaxID),
- ReadUpToStreamOrder: int64(dialog.ReadInboxMaxID),
- })
-
- if err = resultToError(res); err != nil {
- return err
- }
- }
- return nil
-}
diff --git a/pkg/gotd/tgerr/flood_wait.go b/pkg/gotd/tgerr/flood_wait.go
index 87398774..b3fab8ca 100644
--- a/pkg/gotd/tgerr/flood_wait.go
+++ b/pkg/gotd/tgerr/flood_wait.go
@@ -74,7 +74,7 @@ func FloodWait(ctx context.Context, err error, opts ...FloodWaitOption) (bool, e
if d, ok := AsFloodWait(err); ok && d < opt.maxDuration {
timer := opt.clock.Timer(d + 1*time.Second)
defer clock.StopTimer(timer)
- zerolog.Ctx(ctx).Debug().Dur("duration", d).Msg("Waiting on flood wait")
+ zerolog.Ctx(ctx).Warn().Dur("duration", d).Msg("Waiting on flood wait")
select {
case <-timer.C():
return true, err