From bfe59999510a3985c94956aec06b9cfc4aea1445 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Thu, 19 Mar 2026 01:36:30 +0200 Subject: [PATCH] chatsync: merge post-login and takeout syncs and refactor everything --- pkg/connector/backfill.go | 86 +------- pkg/connector/chatsync.go | 356 ++++++++++++++++++++++++++++++ pkg/connector/client.go | 1 + pkg/connector/commands.go | 2 +- pkg/connector/config.go | 12 + pkg/connector/example-config.yaml | 27 ++- pkg/connector/login.go | 5 +- pkg/connector/metadata.go | 12 +- pkg/connector/sync.go | 249 --------------------- pkg/gotd/tgerr/flood_wait.go | 2 +- 10 files changed, 407 insertions(+), 345 deletions(-) create mode 100644 pkg/connector/chatsync.go delete mode 100644 pkg/connector/sync.go diff --git a/pkg/connector/backfill.go b/pkg/connector/backfill.go index 86b93967..1db18ead 100644 --- a/pkg/connector/backfill.go +++ b/pkg/connector/backfill.go @@ -55,7 +55,7 @@ func (t *TelegramClient) getTakeoutID(ctx context.Context) (takeoutID int64, err // Resume fetching dialogs using takeout and enqueueing them for // backfill. go t.takeoutDialogsOnce.Do(func() { - if err = t.takeoutDialogs(ctx, takeoutID); err != nil { + if err = t.syncChats(ctx, takeoutID, false); err != nil { log.Err(err).Msg("Failed to takeout dialogs") } }) @@ -88,7 +88,7 @@ func (t *TelegramClient) getTakeoutID(ctx context.Context) (takeoutID int64, err // Fetch all dialogs using takeout and enqueue them for backfill. go t.takeoutDialogsOnce.Do(func() { - if err = t.takeoutDialogs(ctx, takeoutID); err != nil { + if err = t.syncChats(ctx, takeoutID, false); err != nil { log.Err(err).Msg("Failed to takeout dialogs") } }) @@ -98,84 +98,6 @@ func (t *TelegramClient) getTakeoutID(ctx context.Context) (takeoutID int64, err } } -func (t *TelegramClient) takeoutDialogs(ctx context.Context, takeoutID int64) error { - log := zerolog.Ctx(ctx).With().Str("loop", "chat_fetch").Logger() - if t.metadata.TakeoutDialogCrawlDone { - log.Debug().Msg("Dialogs already crawled") - return nil - } - - req := tg.MessagesGetDialogsRequest{ - Limit: 100, - OffsetPeer: &tg.InputPeerEmpty{}, - } - if t.metadata.TakeoutDialogCrawlCursor != "" { - var err error - req.OffsetPeer, _, err = t.inputPeerForPortalID(ctx, t.metadata.TakeoutDialogCrawlCursor) - if err != nil { - return fmt.Errorf("failed to get input peer for pagination: %w", err) - } - } - for { - log.Info().Stringer("cursor", req.OffsetPeer).Msg("Fetching dialogs") - dialogs, err := APICallWithUpdates(ctx, t, func() (tg.ModifiedMessagesDialogs, error) { - var dialogs tg.MessagesDialogsBox - err := t.client.Invoke(ctx, - &tg.InvokeWithTakeoutRequest{TakeoutID: takeoutID, Query: &req}, - &dialogs) - if err != nil { - return nil, err - } else if modified, ok := dialogs.Dialogs.AsModified(); !ok { - return nil, fmt.Errorf("unexpected response type: %T", dialogs.Dialogs) - } else { - return modified, nil - } - }) - if err != nil { - return fmt.Errorf("failed to get dialogs: %w", err) - } else if len(dialogs.GetDialogs()) == 0 { - t.metadata.TakeoutDialogCrawlDone = true - if err = t.userLogin.Save(ctx); err != nil { - return fmt.Errorf("failed to save user login: %w", err) - } - log.Debug().Msg("No more dialogs found") - return nil - } - - if req.OffsetPeer.TypeID() == tg.InputPeerEmptyTypeID { - // This is the first fetch of dialogs, reset the pinned dialogs - // based on the list. - if err := t.resetPinnedDialogs(ctx, dialogs.GetDialogs()); err != nil { - return err - } - } - - err = t.handleDialogs(ctx, dialogs, -1) - if err != nil { - return fmt.Errorf("failed to handle dialogs: %w", err) - } - - portalKey := t.makePortalKeyFromPeer(dialogs.GetDialogs()[len(dialogs.GetDialogs())-1].GetPeer(), 0) - - if t.metadata.TakeoutDialogCrawlCursor == portalKey.ID { - t.metadata.TakeoutDialogCrawlDone = true - t.metadata.TakeoutDialogCrawlCursor = "" - log.Debug().Msg("No more dialogs found") - return nil - } else { - t.metadata.TakeoutDialogCrawlCursor = portalKey.ID - } - if err = t.userLogin.Save(ctx); err != nil { - return fmt.Errorf("failed to save user login: %w", err) - } - - req.OffsetPeer, _, err = t.inputPeerForPortalID(ctx, portalKey.ID) - if err != nil { - return fmt.Errorf("failed to get input peer for pagination: %w", err) - } - } -} - func (t *TelegramClient) stopTakeout(ctx context.Context) error { t.takeoutLock.Lock() defer t.takeoutLock.Unlock() @@ -197,8 +119,7 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2 var takeoutID int64 var err error - // TODO use takeout for forward backfill if already available - if !fetchParams.Forward { // Backwards + if (t.main.Config.Takeout.ForwardBackfill && fetchParams.Forward) || (t.main.Config.Takeout.BackwardBackfill && !fetchParams.Forward) { t.takeoutLock.Lock() defer t.takeoutLock.Unlock() takeoutID, err = t.getTakeoutID(ctx) @@ -261,6 +182,7 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2 log.Info().Any("req", req).Msg("Fetching messages") msgs, err := APICallWithUpdates(ctx, t, func() (tg.ModifiedMessagesMessages, error) { var box tg.MessagesMessagesBox + // TODO a single request can only fetch 100 messages, use multiple requests if the requested count is higher err = t.client.Invoke(ctx, req, &box) if err != nil { return nil, err diff --git a/pkg/connector/chatsync.go b/pkg/connector/chatsync.go new file mode 100644 index 00000000..42e87674 --- /dev/null +++ b/pkg/connector/chatsync.go @@ -0,0 +1,356 @@ +// mautrix-telegram - A Matrix-Telegram puppeting bridge. +// Copyright (C) 2025 Sumner Evans +// Copyright (C) 2026 Tulir Asokan +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package connector + +import ( + "context" + "fmt" + + "github.com/rs/zerolog" + "maunium.net/go/mautrix/bridgev2" + "maunium.net/go/mautrix/bridgev2/database" + "maunium.net/go/mautrix/bridgev2/networkid" + "maunium.net/go/mautrix/bridgev2/simplevent" + + "go.mau.fi/mautrix-telegram/pkg/connector/ids" + "go.mau.fi/mautrix-telegram/pkg/gotd/bin" + "go.mau.fi/mautrix-telegram/pkg/gotd/tg" + "go.mau.fi/mautrix-telegram/pkg/gotd/tgerr" +) + +func (t *TelegramClient) syncChats(ctx context.Context, takeoutID int64, onLogin bool) error { + if takeoutID != 0 && !t.main.Config.Takeout.DialogSync { + return nil + } + logWith := zerolog.Ctx(ctx).With().Str("loop", "chat sync") + if onLogin { + logWith = logWith.Bool("on_login", true) + } + if takeoutID != 0 { + logWith = logWith.Int64("takeout_id", takeoutID) + } + log := logWith.Logger() + + if !t.syncChatsLock.TryLock() { + log.Warn().Msg("Waiting for chat sync lock") + t.syncChatsLock.Lock() + log.Debug().Msg("Acquired chat sync lock after waiting") + } + defer t.syncChatsLock.Unlock() + + if t.metadata.DialogSyncComplete { + log.Debug().Msg("Dialogs already synced") + return nil + } + + isFullSync := true + updateLimit := subtractLimit(t.main.Config.Sync.UpdateLimit, t.metadata.DialogSyncCount) + if onLogin && t.main.Config.Takeout.DialogSync { + updateLimit = t.main.Config.Sync.LoginLimit + isFullSync = false + } + createLimit := subtractLimit(t.main.Config.Sync.CreateLimit, t.metadata.DialogSyncCount) + + var req tg.MessagesGetDialogsRequest + isFirst := true + if t.metadata.DialogSyncCursor != "" { + isFirst = false + var err error + req.OffsetPeer, _, err = t.inputPeerForPortalID(ctx, t.metadata.DialogSyncCursor) + if err != nil { + return fmt.Errorf("failed to get input peer for pagination: %w", err) + } + } else { + req.OffsetPeer = &tg.InputPeerEmpty{} + } + var wrappedReq bin.Object + if takeoutID != 0 { + wrappedReq = &tg.InvokeWithTakeoutRequest{TakeoutID: takeoutID, Query: &req} + } else { + wrappedReq = &req + } + for updateLimit < 0 || updateLimit > 0 { + if updateLimit < 0 { + req.Limit = 100 + } else { + req.Limit = min(100, updateLimit) + } + log.Info(). + Stringer("request", &req). + Int("update_limit", updateLimit). + Int("create_limit", createLimit). + Msg("Fetching dialogs") + dialogs, err := APICallWithUpdates(ctx, t, func() (tg.ModifiedMessagesDialogs, error) { + var dialogs tg.MessagesDialogsBox + retry := true + var err error + for retry { + retry, err = tgerr.FloodWait(ctx, t.client.Invoke(ctx, wrappedReq, &dialogs)) + } + if err != nil { + return nil, err + } else if modified, ok := dialogs.Dialogs.AsModified(); !ok { + return nil, fmt.Errorf("unexpected response type: %T", dialogs.Dialogs) + } else { + return modified, nil + } + }) + if err != nil { + return fmt.Errorf("failed to get dialogs: %w", err) + } else if len(dialogs.GetDialogs()) == 0 { + log.Debug().Msg("No more dialogs found (empty response)") + break + } + + if isFirst { + // This is the first fetch of dialogs, reset the pinned dialogs based on the list. + if err = t.resetPinnedDialogs(ctx, dialogs.GetDialogs()); err != nil { + return fmt.Errorf("failed to save pinned dialogs: %w", err) + } + } + isFirst = false + + dialogList := dialogs.GetDialogs() + if updateLimit > 0 && len(dialogList) > updateLimit { + dialogList = dialogList[:updateLimit] + } + err = t.handleDialogs(ctx, dialogList, dialogs, createLimit) + if err != nil { + return fmt.Errorf("failed to handle dialogs: %w", err) + } + updateLimit = subtractLimit(updateLimit, len(dialogList)) + createLimit = subtractLimit(createLimit, len(dialogList)) + + cursorPortalKey := t.makePortalKeyFromPeer(dialogList[len(dialogList)-1].GetPeer(), 0) + if t.metadata.DialogSyncCursor == cursorPortalKey.ID { + log.Debug().Msg("No more dialogs found (last dialog is same as old cursor)") + break + } + t.metadata.DialogSyncCursor = cursorPortalKey.ID + t.metadata.DialogSyncCount += len(dialogList) + if err = t.userLogin.Save(ctx); err != nil { + return fmt.Errorf("failed to save user login to update cursor: %w", err) + } + + req.OffsetPeer, _, err = t.inputPeerForPortalID(ctx, cursorPortalKey.ID) + if err != nil { + return fmt.Errorf("failed to get input peer for pagination: %w", err) + } + } + if isFullSync { + t.metadata.DialogSyncComplete = true + t.metadata.DialogSyncCursor = "" + t.metadata.DialogSyncCount = 0 + if err := t.userLogin.Save(ctx); err != nil { + return fmt.Errorf("failed to save user login after successful sync: %w", err) + } + } + log.Info().Msg("Finished dialog sync") + return nil +} + +func subtractLimit(limit, count int) int { + if limit < 0 { + return limit + } + limit -= count + if limit < 0 { + return 0 + } + return limit +} + +func (t *TelegramClient) resetPinnedDialogs(ctx context.Context, dialogs []tg.DialogClass) error { + t.metadata.PinnedDialogs = nil + for _, dialog := range dialogs { + if dialog.GetPinned() { + portalKey := t.makePortalKeyFromPeer(dialog.GetPeer(), 0) + t.metadata.PinnedDialogs = append(t.metadata.PinnedDialogs, portalKey.ID) + } + } + return t.userLogin.Save(ctx) +} + +func (t *TelegramClient) handleDialogs(ctx context.Context, dialogList []tg.DialogClass, meta tg.ModifiedMessagesDialogs, createLimit int) error { + log := zerolog.Ctx(ctx) + + users := map[int64]tg.UserClass{} + for _, user := range meta.GetUsers() { + users[user.GetID()] = user + } + chats := map[int64]tg.ChatClass{} + for _, chat := range meta.GetChats() { + chats[chat.GetID()] = chat + } + messages := map[networkid.MessageID]tg.MessageClass{} + for _, message := range meta.GetMessages() { + messages[ids.GetMessageIDFromMessage(message)] = message + } + + for i, d := range dialogList { + dialog, ok := d.(*tg.Dialog) + if !ok { + continue + } + + log := log.With(). + Stringer("peer", dialog.Peer). + Int("top_message", dialog.TopMessage). + Logger() + log.Debug().Msg("Syncing dialog") + + portalKey := t.makePortalKeyFromPeer(dialog.GetPeer(), 0) + portal, err := t.main.Bridge.GetPortalByKey(ctx, portalKey) + if err != nil { + return err + } + if dialog.UnreadCount == 0 && !dialog.UnreadMark { + portal.Metadata.(*PortalMetadata).ReadUpTo = dialog.TopMessage + } + + var chatInfo *bridgev2.ChatInfo + switch peer := dialog.Peer.(type) { + case *tg.PeerUser: + switch user := users[peer.UserID].(type) { + case *tg.User: + if user.GetDeleted() { + log.Debug().Int64("user_id", peer.UserID).Msg("Not syncing portal because user is deleted") + continue + } + chatInfo, err = t.getDMChatInfo(ctx, peer.UserID) + if err != nil { + return fmt.Errorf("failed to get dm info for %d: %w", peer.UserID, err) + } + default: + log.Debug(). + Int64("user_id", peer.UserID). + Type("user_type", user). + Msg("Not syncing portal because user type is unsupported") + continue + } + case *tg.PeerChat: + switch chat := chats[peer.ChatID].(type) { + case *tg.Chat: + // Need to get full chat info to get the member list + chatInfo, err = t.GetChatInfo(ctx, portal) + if err != nil { + return fmt.Errorf("failed to get chat info for %s: %w", portalKey, err) + } + case *tg.ChatForbidden: + log.Debug(). + Int64("chat_id", peer.ChatID). + Msg("Not syncing portal because chat is forbidden") + continue + default: + log.Debug(). + Int64("chat_id", peer.ChatID). + Type("chat_type", chat). + Msg("Not syncing portal because chat type is unsupported") + continue + } + case *tg.PeerChannel: + switch channel := chats[peer.ChannelID].(type) { + case *tg.Channel: + var mfm *memberFetchMeta + chatInfo, mfm, err = t.wrapChatInfo(portal.ID, channel) + if err != nil { + return fmt.Errorf("failed to get chat info for %s: %w", portalKey, err) + } + err = t.fillChannelMembers(ctx, mfm, chatInfo.Members) + if err != nil { + log.Err(err).Msg("Failed to get channel members") + } + case *tg.ChannelForbidden: + log.Debug(). + Int64("channel_id", peer.ChannelID). + Msg("Not syncing portal because channel is forbidden") + continue + default: + log.Debug(). + Int64("channel_id", peer.ChannelID). + Type("channel_type", channel). + Msg("Not syncing portal because channel type is unsupported") + continue + } + } + + if portal.MXID == "" { + // Check what the latest message is + topMessage := messages[ids.MakeMessageID(dialog.Peer, dialog.TopMessage)] + if topMessage == nil { + if dialog.TopMessage == 0 { + log.Debug().Msg("Not syncing portal because there are no messages") + continue + } + log.Warn().Msg("TopMessage of dialog not in messages map") + } else if topMessage.TypeID() == tg.MessageServiceTypeID { + action := topMessage.(*tg.MessageService).Action + if action.TypeID() == tg.MessageActionContactSignUpTypeID || action.TypeID() == tg.MessageActionHistoryClearTypeID { + log.Debug().Str("action_type", action.TypeName()).Msg("Not syncing portal because it's a contact sign up or history clear") + continue + } + } + + if createLimit >= 0 && i >= createLimit { + continue + } + } + + t.fillUserLocalMeta(chatInfo, dialog) + + res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ + ChatInfo: chatInfo, + EventMeta: simplevent.EventMeta{ + Type: bridgev2.RemoteEventChatResync, + LogContext: func(c zerolog.Context) zerolog.Context { + return c.Str("update", "sync") + }, + PortalKey: portalKey, + CreatePortal: true, + }, + CheckNeedsBackfillFunc: func(ctx context.Context, latestMessage *database.Message) (bool, error) { + if latestMessage == nil { + return true, nil + } + _, latestMessageID, err := ids.ParseMessageID(latestMessage.ID) + if err != nil { + panic(err) + } + return dialog.TopMessage > latestMessageID, nil + }, + }) + if err = resultToError(res); err != nil { + return err + } + + // Generate a read receipt from the last known read message id + res = t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Receipt{ + EventMeta: simplevent.EventMeta{ + Type: bridgev2.RemoteEventReadReceipt, + PortalKey: portalKey, + Sender: t.mySender(), + }, + LastTarget: ids.MakeMessageID(portalKey, dialog.ReadInboxMaxID), + ReadUpToStreamOrder: int64(dialog.ReadInboxMaxID), + }) + if err = resultToError(res); err != nil { + return err + } + } + return nil +} diff --git a/pkg/connector/client.go b/pkg/connector/client.go index 1ab5499f..8b4e3f27 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -110,6 +110,7 @@ type TelegramClient struct { takeoutAccepted *exsync.Event stopTakeoutTimer *time.Timer takeoutDialogsOnce sync.Once + syncChatsLock sync.Mutex prevReactionPoll map[networkid.PortalKey]time.Time prevReactionPollLock sync.Mutex diff --git a/pkg/connector/commands.go b/pkg/connector/commands.go index f7571fab..0cbb9fec 100644 --- a/pkg/connector/commands.go +++ b/pkg/connector/commands.go @@ -54,7 +54,7 @@ func fnSync(ce *commands.Event) { wg.Add(1) go func() { defer wg.Done() - if err := client.SyncChats(ce.Ctx); err != nil { + if err := client.syncChats(ce.Ctx, 0, false); err != nil { ce.Reply("Failed to synchronize chats for %s: %v", login.ID, err) } }() diff --git a/pkg/connector/config.go b/pkg/connector/config.go index 29f77625..5d14bd34 100644 --- a/pkg/connector/config.go +++ b/pkg/connector/config.go @@ -68,9 +68,16 @@ type TelegramConfig struct { Sync struct { UpdateLimit int `yaml:"update_limit"` CreateLimit int `yaml:"create_limit"` + LoginLimit int `yaml:"login_sync_limit"` DirectChats bool `yaml:"direct_chats"` } `yaml:"sync"` + Takeout struct { + DialogSync bool `yaml:"dialog_sync"` + ForwardBackfill bool `yaml:"forward_backfill"` + BackwardBackfill bool `yaml:"backward_backfill"` + } `yaml:"takeout"` + ContactAvatars bool `yaml:"contact_avatars"` ContactNames bool `yaml:"contact_names"` MaxMemberCount int `yaml:"max_member_count"` @@ -110,7 +117,11 @@ func upgradeConfig(helper up.Helper) { helper.Copy(up.Int, "ping", "timeout_seconds") helper.Copy(up.Int, "sync", "update_limit") helper.Copy(up.Int, "sync", "create_limit") + helper.Copy(up.Int, "sync", "login_sync_limit") helper.Copy(up.Bool, "sync", "direct_chats") + helper.Copy(up.Bool, "takeout", "dialog_sync") + helper.Copy(up.Bool, "takeout", "forward_backfill") + helper.Copy(up.Bool, "takeout", "backward_backfill") helper.Copy(up.Bool, "contact_avatars") helper.Copy(up.Bool, "contact_names") helper.Copy(up.Int, "max_member_count") @@ -130,6 +141,7 @@ func (tg *TelegramConnector) GetConfig() (example string, data any, upgrader up. {"member_list"}, {"ping"}, {"sync"}, + {"takeout"}, {"max_member_count"}, }, Base: ExampleConfig, diff --git a/pkg/connector/example-config.yaml b/pkg/connector/example-config.yaml index acf3d8b7..7d829638 100644 --- a/pkg/connector/example-config.yaml +++ b/pkg/connector/example-config.yaml @@ -56,14 +56,29 @@ ping: sync: # Number of most recently active dialogs to check when syncing chats. - # Set to 0 to remove limit. - update_limit: 0 - # Number of most recently active dialogs to create portals for when syncing - # chats. - # Set to 0 to remove limit. + # Set to -1 to remove limit. + update_limit: 100 + # Number of most recently active dialogs to create portals for when syncing chats. + # Set to -1 to remove limit. create_limit: 15 + # Number of chats to sync immediately on login before the data export is accepted. + # The create_limit above still applies. This is ignored if takeout.dialog_sync is false. + login_sync_limit: 15 # Whether or not to sync and create portals for direct chats at startup. - direct_chats: false + direct_chats: true + +takeout: + # Should the bridge use the data export mode for syncing the full chat list? + # If true, login_sync_limit of chats is synced immediately on login, + # then the rest are synced after the takeout is accepted. + dialog_sync: false + # Should the bridge use the data export mode for forward backfilling messages? + # This should be set to true if the forward backfill limits are set to high values, + # but is probably not necessary otherwise. + forward_backfill: false + # Should the bridge use the data export mode for backward backfilling messages? + # This only affects the backfill queue, which is only available on Beeper. + backward_backfill: false # Maximum number of participants in chats to bridge. Only applies when the # portal is being created. If there are more members when trying to create a diff --git a/pkg/connector/login.go b/pkg/connector/login.go index b01151de..7495a3b9 100644 --- a/pkg/connector/login.go +++ b/pkg/connector/login.go @@ -241,7 +241,7 @@ func (bl *baseLogin) finalizeLogin( err := client.clientInitialized.Wait(bgCtx) if err != nil { log.Err(err).Msg("Failed to wait for client init to sync chats after login") - } else if err = client.SyncChats(log.WithContext(client.clientCtx)); err != nil { + } else if err = client.syncChats(log.WithContext(client.clientCtx), 0, true); err != nil { log.Err(err).Msg("Failed to sync chats") } }() @@ -250,6 +250,9 @@ func (bl *baseLogin) finalizeLogin( if metadata.IsBot { return } + if !bl.main.Config.Takeout.BackwardBackfill && !bl.main.Config.Takeout.ForwardBackfill && !bl.main.Config.Takeout.DialogSync { + return + } log := ul.Log.With().Str("component", "post-login takeout").Logger() client.takeoutLock.Lock() defer client.takeoutLock.Unlock() diff --git a/pkg/connector/metadata.go b/pkg/connector/metadata.go index 33799180..fa87c85d 100644 --- a/pkg/connector/metadata.go +++ b/pkg/connector/metadata.go @@ -57,7 +57,7 @@ func (gm *GhostMetadata) IsMin() bool { type PortalMetadata struct { IsSuperGroup bool `json:"is_supergroup,omitempty"` IsForumGeneral bool `json:"is_forum_general,omitempty"` - ReadUpTo int `json:"read_up_to,omitempty"` + ReadUpTo int `json:"read_up_to,omitempty"` // FIXME this shouldn't be here AllowedReactions []string `json:"allowed_reactions"` LastSync jsontime.Unix `json:"last_sync,omitempty"` FullSynced bool `json:"full_synced,omitempty"` @@ -90,8 +90,9 @@ type UserLoginMetadata struct { TakeoutInvalidated bool `json:"takeout_invalidated,omitempty"` - TakeoutDialogCrawlDone bool `json:"takeout_portal_crawl_done,omitempty"` - TakeoutDialogCrawlCursor networkid.PortalID `json:"takeout_portal_crawl_cursor,omitempty"` + DialogSyncComplete bool `json:"takeout_portal_crawl_done,omitempty"` + DialogSyncCursor networkid.PortalID `json:"takeout_portal_crawl_cursor,omitempty"` + DialogSyncCount int `json:"dialog_sync_count,omitempty"` PinnedDialogs []networkid.PortalID `json:"pinned_dialogs,omitempty"` @@ -101,9 +102,10 @@ type UserLoginMetadata struct { func (u *UserLoginMetadata) ResetOnLogout() { u.Session.AuthKey = nil u.TakeoutID = 0 - u.TakeoutDialogCrawlDone = false u.TakeoutInvalidated = false - u.TakeoutDialogCrawlCursor = networkid.PortalID("") + u.DialogSyncComplete = false + u.DialogSyncCursor = networkid.PortalID("") + u.DialogSyncCount = 0 u.PushEncryptionKey = nil } diff --git a/pkg/connector/sync.go b/pkg/connector/sync.go deleted file mode 100644 index d456c4fe..00000000 --- a/pkg/connector/sync.go +++ /dev/null @@ -1,249 +0,0 @@ -// mautrix-telegram - A Matrix-Telegram puppeting bridge. -// Copyright (C) 2025 Sumner Evans -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package connector - -import ( - "context" - "fmt" - "math" - - "github.com/rs/zerolog" - "maunium.net/go/mautrix/bridgev2" - "maunium.net/go/mautrix/bridgev2/database" - "maunium.net/go/mautrix/bridgev2/networkid" - "maunium.net/go/mautrix/bridgev2/simplevent" - - "go.mau.fi/mautrix-telegram/pkg/connector/ids" - "go.mau.fi/mautrix-telegram/pkg/gotd/tg" -) - -func (t *TelegramClient) SyncChats(ctx context.Context) error { - limit := t.main.Config.Sync.UpdateLimit - if limit <= 0 { - limit = math.MaxInt32 - } - zerolog.Ctx(ctx).Info(). - Int("update_limit", limit). - Int("create_limit", t.main.Config.Sync.CreateLimit). - Msg("syncing chats") - - dialogs, err := APICallWithUpdates(ctx, t, func() (tg.ModifiedMessagesDialogs, error) { - d, err := t.client.API().MessagesGetDialogs(ctx, &tg.MessagesGetDialogsRequest{ - Limit: limit, - OffsetPeer: &tg.InputPeerEmpty{}, - }) - if err != nil { - return nil, err - } else if dialogs, ok := d.(tg.ModifiedMessagesDialogs); !ok { - return nil, fmt.Errorf("unexpected dialogs type %T", d) - } else { - return dialogs, nil - } - }) - if err != nil { - return err - } - - if err := t.resetPinnedDialogs(ctx, dialogs.GetDialogs()); err != nil { - return err - } - - return t.handleDialogs(ctx, dialogs, t.main.Config.Sync.CreateLimit) -} - -func (t *TelegramClient) resetPinnedDialogs(ctx context.Context, dialogs []tg.DialogClass) error { - t.metadata.PinnedDialogs = nil - for _, dialog := range dialogs { - if dialog.GetPinned() { - portalKey := t.makePortalKeyFromPeer(dialog.GetPeer(), 0) - t.metadata.PinnedDialogs = append(t.metadata.PinnedDialogs, portalKey.ID) - } - } - return t.userLogin.Save(ctx) -} - -func (t *TelegramClient) handleDialogs(ctx context.Context, dialogs tg.ModifiedMessagesDialogs, createLimit int) error { - log := zerolog.Ctx(ctx) - - users := map[networkid.UserID]tg.UserClass{} - for _, user := range dialogs.GetUsers() { - users[ids.MakeUserID(user.GetID())] = user - } - chats := map[int64]tg.ChatClass{} - for _, chat := range dialogs.GetChats() { - chats[chat.GetID()] = chat - } - messages := map[networkid.MessageID]tg.MessageClass{} - for _, message := range dialogs.GetMessages() { - messages[ids.GetMessageIDFromMessage(message)] = message - } - - var created int - for _, d := range dialogs.GetDialogs() { - dialog, ok := d.(*tg.Dialog) - if !ok { - continue - } - - log := log.With(). - Stringer("peer", dialog.Peer). - Int("top_message", dialog.TopMessage). - Logger() - log.Debug().Msg("Syncing dialog") - - portalKey := t.makePortalKeyFromPeer(dialog.GetPeer(), 0) - portal, err := t.main.Bridge.GetPortalByKey(ctx, portalKey) - if err != nil { - return err - } - if dialog.UnreadCount == 0 && !dialog.UnreadMark { - portal.Metadata.(*PortalMetadata).ReadUpTo = dialog.TopMessage - } - - // If this is a DM, make sure that the user isn't deleted. - if user, ok := dialog.Peer.(*tg.PeerUser); ok { - if users[ids.MakeUserID(user.UserID)].(*tg.User).GetDeleted() { - log.Debug().Msg("Not syncing portal because user is deleted") - continue - } - } - - var chatInfo *bridgev2.ChatInfo - switch peer := dialog.Peer.(type) { - case *tg.PeerUser: - userID := ids.MakeUserID(peer.UserID) - if users[userID].(*tg.User).GetDeleted() { - log.Debug().Int64("user_id", peer.UserID).Msg("Not syncing portal because user is deleted") - continue - } - chatInfo, err = t.getDMChatInfo(ctx, peer.UserID) - if err != nil { - return fmt.Errorf("failed to get dm info for %d: %w", peer.UserID, err) - } - case *tg.PeerChat: - chat := chats[peer.ChatID] - if chat.TypeID() == tg.ChatForbiddenTypeID { - log.Debug(). - Int64("chat_id", peer.ChatID). - Msg("Not syncing portal because chat is forbidden") - continue - } else if chat.TypeID() != tg.ChatTypeID { - log.Debug(). - Int64("chat_id", peer.ChatID). - Type("chat_type", chat). - Msg("Not syncing portal because chat type is unsupported") - continue - } - // Need to get full chat info to get the member list - chatInfo, err = t.GetChatInfo(ctx, portal) - if err != nil { - return fmt.Errorf("failed to get chat info for %s: %w", portalKey, err) - } - case *tg.PeerChannel: - channel := chats[peer.ChannelID] - if channel.TypeID() == tg.ChannelForbiddenTypeID { - log.Debug(). - Int64("channel_id", peer.ChannelID). - Msg("Not syncing portal because channel is forbidden") - continue - } else if channel.TypeID() != tg.ChannelTypeID { - log.Debug(). - Int64("channel_id", peer.ChannelID). - Type("channel_type", channel). - Msg("Not syncing portal because channel type is unsupported") - continue - } - var mfm *memberFetchMeta - chatInfo, mfm, err = t.wrapChatInfo(portal.ID, channel) - if err != nil { - return fmt.Errorf("failed to get chat info for %s: %w", portalKey, err) - } - err = t.fillChannelMembers(ctx, mfm, chatInfo.Members) - if err != nil { - log.Err(err).Msg("Failed to get channel members") - } - } - - if portal == nil || portal.MXID == "" { - // Check what the latest message is - topMessage := messages[ids.MakeMessageID(dialog.Peer, dialog.TopMessage)] - if topMessage == nil { - if dialog.TopMessage == 0 { - log.Debug().Msg("Not syncing portal because there are no messages") - continue - } else { - log.Warn().Msg("TopMessage of dialog not in messages map") - } - } else if topMessage.TypeID() == tg.MessageServiceTypeID { - action := topMessage.(*tg.MessageService).Action - if action.TypeID() == tg.MessageActionContactSignUpTypeID || action.TypeID() == tg.MessageActionHistoryClearTypeID { - log.Debug().Str("action_type", action.TypeName()).Msg("Not syncing portal because it's a contact sign up or history clear") - continue - } - } - - created++ // The portal will have to be created - if createLimit >= 0 && created > createLimit { - break - } - } - - t.fillUserLocalMeta(chatInfo, dialog) - - res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ - ChatInfo: chatInfo, - EventMeta: simplevent.EventMeta{ - Type: bridgev2.RemoteEventChatResync, - LogContext: func(c zerolog.Context) zerolog.Context { - return c.Str("update", "sync") - }, - PortalKey: portalKey, - CreatePortal: true, - }, - CheckNeedsBackfillFunc: func(ctx context.Context, latestMessage *database.Message) (bool, error) { - if latestMessage == nil { - return true, nil - } - _, latestMessageID, err := ids.ParseMessageID(latestMessage.ID) - if err != nil { - panic(err) - } - return dialog.TopMessage > latestMessageID, nil - }, - }) - - if err = resultToError(res); err != nil { - return err - } - - // Generate a read receipt from the last known read message id - res = t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Receipt{ - EventMeta: simplevent.EventMeta{ - Type: bridgev2.RemoteEventReadReceipt, - PortalKey: portalKey, - Sender: t.mySender(), - }, - LastTarget: ids.MakeMessageID(portalKey, dialog.ReadInboxMaxID), - ReadUpToStreamOrder: int64(dialog.ReadInboxMaxID), - }) - - if err = resultToError(res); err != nil { - return err - } - } - return nil -} diff --git a/pkg/gotd/tgerr/flood_wait.go b/pkg/gotd/tgerr/flood_wait.go index 87398774..b3fab8ca 100644 --- a/pkg/gotd/tgerr/flood_wait.go +++ b/pkg/gotd/tgerr/flood_wait.go @@ -74,7 +74,7 @@ func FloodWait(ctx context.Context, err error, opts ...FloodWaitOption) (bool, e if d, ok := AsFloodWait(err); ok && d < opt.maxDuration { timer := opt.clock.Timer(d + 1*time.Second) defer clock.StopTimer(timer) - zerolog.Ctx(ctx).Debug().Dur("duration", d).Msg("Waiting on flood wait") + zerolog.Ctx(ctx).Warn().Dur("duration", d).Msg("Waiting on flood wait") select { case <-timer.C(): return true, err