From 390f9f422e2310cde527155dd1627087744932a4 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Wed, 10 Dec 2025 19:40:39 +0200 Subject: [PATCH] backfill: clear saved takeout ID on takeout invalid error --- pkg/connector/backfill.go | 33 +++++++++++++++++++++++++-------- pkg/connector/metadata.go | 3 +++ 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/pkg/connector/backfill.go b/pkg/connector/backfill.go index 3e159ca9..86b93967 100644 --- a/pkg/connector/backfill.go +++ b/pkg/connector/backfill.go @@ -41,6 +41,10 @@ var ( // getTakeoutID blocks until the takeout ID is available. func (t *TelegramClient) getTakeoutID(ctx context.Context) (takeoutID int64, err error) { + if t.metadata.TakeoutInvalidated { + // TODO should we just backfill without takeout here? + return 0, fmt.Errorf("takeout invalidated, cannot backfill") + } // Always stop the takeout timeout timer if t.stopTakeoutTimer != nil { t.stopTakeoutTimer.Stop() @@ -193,6 +197,7 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2 var takeoutID int64 var err error + // TODO use takeout for forward backfill if already available if !fetchParams.Forward { // Backwards t.takeoutLock.Lock() defer t.takeoutLock.Unlock() @@ -201,13 +206,15 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2 return nil, err } - defer func() { - if t.stopTakeoutTimer == nil { - t.stopTakeoutTimer = time.AfterFunc(max(time.Hour, time.Duration(t.main.Bridge.Config.Backfill.Queue.BatchDelay*2)), sync.OnceFunc(func() { t.stopTakeout(ctx) })) - } else { - t.stopTakeoutTimer.Reset(max(time.Hour, time.Duration(t.main.Bridge.Config.Backfill.Queue.BatchDelay*2))) - } - }() + if takeoutID != 0 { + defer func() { + if t.stopTakeoutTimer == nil { + t.stopTakeoutTimer = time.AfterFunc(max(time.Hour, time.Duration(t.main.Bridge.Config.Backfill.Queue.BatchDelay*2)), sync.OnceFunc(func() { t.stopTakeout(ctx) })) + } else { + t.stopTakeoutTimer.Reset(max(time.Hour, time.Duration(t.main.Bridge.Config.Backfill.Queue.BatchDelay*2))) + } + }() + } } peer, topicID, err := t.inputPeerForPortalID(ctx, fetchParams.Portal.ID) @@ -248,7 +255,7 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2 OffsetID: offsetID, } } - if !fetchParams.Forward { + if takeoutID != 0 { req = &tg.InvokeWithTakeoutRequest{TakeoutID: takeoutID, Query: req} } log.Info().Any("req", req).Msg("Fetching messages") @@ -265,6 +272,16 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2 return msgs, nil }) if err != nil { + if tgerr.Is(err, tg.ErrTakeoutInvalid) { + t.metadata.TakeoutID = 0 + t.metadata.TakeoutInvalidated = true + err := t.userLogin.Save(ctx) + if err != nil { + log.Err(err).Msg("Failed to save user login after clearing takeout ID") + } else { + log.Debug().Msg("Cleared invalid takeout ID") + } + } return nil, err } diff --git a/pkg/connector/metadata.go b/pkg/connector/metadata.go index debde235..a708ada5 100644 --- a/pkg/connector/metadata.go +++ b/pkg/connector/metadata.go @@ -80,6 +80,8 @@ type UserLoginMetadata struct { Session UserLoginSession `json:"session"` TakeoutID int64 `json:"takeout_id,omitempty"` + TakeoutInvalidated bool `json:"takeout_invalidated,omitempty"` + TakeoutDialogCrawlDone bool `json:"takeout_portal_crawl_done,omitempty"` TakeoutDialogCrawlCursor networkid.PortalID `json:"takeout_portal_crawl_cursor,omitempty"` @@ -92,6 +94,7 @@ func (u *UserLoginMetadata) ResetOnLogout() { u.Session.AuthKey = nil u.TakeoutID = 0 u.TakeoutDialogCrawlDone = false + u.TakeoutInvalidated = false u.TakeoutDialogCrawlCursor = networkid.PortalID("") u.PushEncryptionKey = nil }