From fab98cfdeaca6882b04ab125d23122080dd4c490 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Tue, 10 Sep 2024 00:06:02 -0600 Subject: [PATCH] takeout: use takeout to list dialogs once permission granted Signed-off-by: Sumner Evans --- pkg/connector/backfill.go | 91 ++++++++++++++++++++++++++++++++++++--- pkg/connector/client.go | 8 ++-- pkg/connector/config.go | 4 ++ pkg/connector/sync.go | 10 +++-- 4 files changed, 101 insertions(+), 12 deletions(-) diff --git a/pkg/connector/backfill.go b/pkg/connector/backfill.go index fd9021ff..ab8fbd2b 100644 --- a/pkg/connector/backfill.go +++ b/pkg/connector/backfill.go @@ -10,6 +10,7 @@ import ( "github.com/gotd/td/tg" "github.com/gotd/td/tgerr" "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "maunium.net/go/mautrix/bridgev2" "maunium.net/go/mautrix/bridgev2/database" "maunium.net/go/mautrix/bridgev2/networkid" @@ -19,11 +20,25 @@ import ( // getTakeoutID blocks until the takeout ID is available. func (t *TelegramClient) getTakeoutID(ctx context.Context) (takeoutID int64, err error) { - log := zerolog.Ctx(ctx).With().Str("function", "getTakeoutID").Logger() + // Always stop the takeout timeout timer + if t.stopTakeoutTimer != nil { + t.stopTakeoutTimer.Stop() + } + if t.userLogin.Metadata.(*UserLoginMetadata).TakeoutID != 0 { + // Resume fetching dialogs using takeout and enqueueing them for + // backfill. + go t.takeoutDialogsOnce.Do(func() { + if err = t.takeoutDialogs(ctx, takeoutID); err != nil { + log.Err(err).Msg("Failed to takeout dialogs") + } + }) return t.userLogin.Metadata.(*UserLoginMetadata).TakeoutID, nil } + t.stopTakeoutTimer = time.AfterFunc(max(time.Hour, time.Duration(t.main.Bridge.Config.Backfill.Queue.BatchDelay*2)), sync.OnceFunc(func() { t.stopTakeout(ctx) })) + + log := zerolog.Ctx(ctx).With().Str("function", "getTakeoutID").Logger() for { t.takeoutAccepted.Clear() @@ -46,16 +61,81 @@ func (t *TelegramClient) getTakeoutID(ctx context.Context) (takeoutID int64, err return 0, err } - if t.stopTakeoutTimer != nil { - t.stopTakeoutTimer.Stop() - } - t.stopTakeoutTimer = time.AfterFunc(max(time.Hour, time.Duration(t.main.Bridge.Config.Backfill.Queue.BatchDelay*2)), sync.OnceFunc(func() { t.stopTakeout(ctx) })) + // Fetch all dialogs using takeout and enqueue them for backfill. + go t.takeoutDialogsOnce.Do(func() { + if err = t.takeoutDialogs(ctx, takeoutID); err != nil { + log.Err(err).Msg("Failed to takeout dialogs") + } + }) t.userLogin.Metadata.(*UserLoginMetadata).TakeoutID = accountTakeout.ID return accountTakeout.ID, t.userLogin.Save(ctx) } } +func (t *TelegramClient) takeoutDialogs(ctx context.Context, takeoutID int64) error { + log := zerolog.Ctx(ctx).With().Str("loop", "chat_fetch").Logger() + if t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlDone { + log.Debug().Msg("Dialogs already crawled") + return nil + } + + req := tg.MessagesGetDialogsRequest{ + Limit: 100, + OffsetPeer: &tg.InputPeerEmpty{}, + } + if t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor != "" { + var err error + req.OffsetPeer, err = t.inputPeerForPortalID(ctx, t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor) + if err != nil { + return fmt.Errorf("failed to get input peer for pagination: %w", err) + } + } + for { + log.Info().Stringer("cursor", req.OffsetPeer).Msg("Fetching dialogs") + dialogs, err := APICallWithUpdates(ctx, t, func() (*tg.MessagesDialogs, error) { + var dialogs tg.MessagesDialogs + return &dialogs, t.client.Invoke(ctx, + &tg.InvokeWithTakeoutRequest{TakeoutID: takeoutID, Query: &req}, + &dialogs) + }) + if err != nil { + return fmt.Errorf("failed to get dialogs: %w", err) + } else if len(dialogs.GetDialogs()) == 0 { + t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlDone = true + if err = t.userLogin.Save(ctx); err != nil { + return fmt.Errorf("failed to save user login: %w", err) + } + log.Debug().Msg("No more dialogs found") + return nil + } + + err = t.handleDialogs(ctx, dialogs, -1) + if err != nil { + return fmt.Errorf("failed to handle dialogs: %w", err) + } + + portalKey := ids.MakePortalKey(dialogs.GetDialogs()[len(dialogs.GetDialogs())-1].GetPeer(), t.userLogin.ID) + + if t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor == portalKey.ID { + t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlDone = true + t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor = "" + log.Debug().Msg("No more dialogs found") + return nil + } else { + t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor = portalKey.ID + } + if err = t.userLogin.Save(ctx); err != nil { + return fmt.Errorf("failed to save user login: %w", err) + } + + req.OffsetPeer, err = t.inputPeerForPortalID(ctx, portalKey.ID) + if err != nil { + return fmt.Errorf("failed to get input peer for pagination: %w", err) + } + } +} + func (t *TelegramClient) stopTakeout(ctx context.Context) error { t.takeoutLock.Lock() defer t.takeoutLock.Unlock() @@ -82,7 +162,6 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2 return nil, err } - t.stopTakeoutTimer.Stop() defer t.stopTakeoutTimer.Reset(max(time.Hour, time.Duration(t.main.Bridge.Config.Backfill.Queue.BatchDelay*2))) } diff --git a/pkg/connector/client.go b/pkg/connector/client.go index bbb5c9ba..52c32f00 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -53,9 +53,10 @@ type TelegramClient struct { cachedContacts *tg.ContactsContacts cachedContactsHash int64 - takeoutLock sync.Mutex - takeoutAccepted *exsync.Event - stopTakeoutTimer *time.Timer + takeoutLock sync.Mutex + takeoutAccepted *exsync.Event + stopTakeoutTimer *time.Timer + takeoutDialogsOnce sync.Once } var ( @@ -418,6 +419,7 @@ func (t *TelegramClient) getSingleUser(ctx context.Context, id int64) (tg.UserCl } else if users, err := t.client.API().UsersGetUsers(ctx, []tg.InputUserClass{inputUser}); err != nil { return nil, err } else if len(users) == 0 { + // TODO does this mean the user is deleted? Need to handle this a bit better return nil, fmt.Errorf("failed to get user info for user %d", id) } else { return users[0], nil diff --git a/pkg/connector/config.go b/pkg/connector/config.go index 21fffdd6..5cda2536 100644 --- a/pkg/connector/config.go +++ b/pkg/connector/config.go @@ -12,6 +12,7 @@ import ( "maunium.net/go/mautrix/bridgev2" "maunium.net/go/mautrix/bridgev2/bridgeconfig" "maunium.net/go/mautrix/bridgev2/database" + "maunium.net/go/mautrix/bridgev2/networkid" "maunium.net/go/mautrix/id" "go.mau.fi/mautrix-telegram/pkg/connector/media" @@ -161,6 +162,9 @@ type UserLoginMetadata struct { Phone string `json:"phone"` Session UserLoginSession `json:"session"` TakeoutID int64 `json:"takeout_id,omitempty"` + + TakeoutDialogCrawlDone bool `json:"takeout_portal_crawl_done,omitempty"` + TakeoutDialogCrawlCursor networkid.PortalID `json:"takeout_portal_crawl_cursor,omitempty"` } func (s *UserLoginSession) Load(_ context.Context) (*session.Data, error) { diff --git a/pkg/connector/sync.go b/pkg/connector/sync.go index df73b4bb..eac7bed7 100644 --- a/pkg/connector/sync.go +++ b/pkg/connector/sync.go @@ -16,8 +16,6 @@ import ( ) func (t *TelegramClient) SyncChats(ctx context.Context) error { - log := zerolog.Ctx(ctx) - limit := t.main.Config.Sync.UpdateLimit if limit <= 0 { limit = math.MaxInt32 @@ -40,6 +38,12 @@ func (t *TelegramClient) SyncChats(ctx context.Context) error { return err } + return t.handleDialogs(ctx, dialogs, t.main.Config.Sync.CreateLimit) +} + +func (t *TelegramClient) handleDialogs(ctx context.Context, dialogs tg.ModifiedMessagesDialogs, createLimit int) error { + log := zerolog.Ctx(ctx) + users := map[networkid.UserID]tg.UserClass{} for _, user := range dialogs.GetUsers() { users[ids.MakeUserID(user.GetID())] = user @@ -89,7 +93,7 @@ func (t *TelegramClient) SyncChats(ctx context.Context) error { } created++ // The portal will have to be created - if created > t.main.Config.Sync.CreateLimit { + if createLimit >= 0 && created > createLimit { break } }