backfill: clear saved takeout ID on takeout invalid error
This commit is contained in:
@@ -41,6 +41,10 @@ var (
|
||||
|
||||
// getTakeoutID blocks until the takeout ID is available.
|
||||
func (t *TelegramClient) getTakeoutID(ctx context.Context) (takeoutID int64, err error) {
|
||||
if t.metadata.TakeoutInvalidated {
|
||||
// TODO should we just backfill without takeout here?
|
||||
return 0, fmt.Errorf("takeout invalidated, cannot backfill")
|
||||
}
|
||||
// Always stop the takeout timeout timer
|
||||
if t.stopTakeoutTimer != nil {
|
||||
t.stopTakeoutTimer.Stop()
|
||||
@@ -193,6 +197,7 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2
|
||||
|
||||
var takeoutID int64
|
||||
var err error
|
||||
// TODO use takeout for forward backfill if already available
|
||||
if !fetchParams.Forward { // Backwards
|
||||
t.takeoutLock.Lock()
|
||||
defer t.takeoutLock.Unlock()
|
||||
@@ -201,13 +206,15 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if t.stopTakeoutTimer == nil {
|
||||
t.stopTakeoutTimer = time.AfterFunc(max(time.Hour, time.Duration(t.main.Bridge.Config.Backfill.Queue.BatchDelay*2)), sync.OnceFunc(func() { t.stopTakeout(ctx) }))
|
||||
} else {
|
||||
t.stopTakeoutTimer.Reset(max(time.Hour, time.Duration(t.main.Bridge.Config.Backfill.Queue.BatchDelay*2)))
|
||||
}
|
||||
}()
|
||||
if takeoutID != 0 {
|
||||
defer func() {
|
||||
if t.stopTakeoutTimer == nil {
|
||||
t.stopTakeoutTimer = time.AfterFunc(max(time.Hour, time.Duration(t.main.Bridge.Config.Backfill.Queue.BatchDelay*2)), sync.OnceFunc(func() { t.stopTakeout(ctx) }))
|
||||
} else {
|
||||
t.stopTakeoutTimer.Reset(max(time.Hour, time.Duration(t.main.Bridge.Config.Backfill.Queue.BatchDelay*2)))
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
peer, topicID, err := t.inputPeerForPortalID(ctx, fetchParams.Portal.ID)
|
||||
@@ -248,7 +255,7 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2
|
||||
OffsetID: offsetID,
|
||||
}
|
||||
}
|
||||
if !fetchParams.Forward {
|
||||
if takeoutID != 0 {
|
||||
req = &tg.InvokeWithTakeoutRequest{TakeoutID: takeoutID, Query: req}
|
||||
}
|
||||
log.Info().Any("req", req).Msg("Fetching messages")
|
||||
@@ -265,6 +272,16 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2
|
||||
return msgs, nil
|
||||
})
|
||||
if err != nil {
|
||||
if tgerr.Is(err, tg.ErrTakeoutInvalid) {
|
||||
t.metadata.TakeoutID = 0
|
||||
t.metadata.TakeoutInvalidated = true
|
||||
err := t.userLogin.Save(ctx)
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Failed to save user login after clearing takeout ID")
|
||||
} else {
|
||||
log.Debug().Msg("Cleared invalid takeout ID")
|
||||
}
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
@@ -80,6 +80,8 @@ type UserLoginMetadata struct {
|
||||
Session UserLoginSession `json:"session"`
|
||||
TakeoutID int64 `json:"takeout_id,omitempty"`
|
||||
|
||||
TakeoutInvalidated bool `json:"takeout_invalidated,omitempty"`
|
||||
|
||||
TakeoutDialogCrawlDone bool `json:"takeout_portal_crawl_done,omitempty"`
|
||||
TakeoutDialogCrawlCursor networkid.PortalID `json:"takeout_portal_crawl_cursor,omitempty"`
|
||||
|
||||
@@ -92,6 +94,7 @@ func (u *UserLoginMetadata) ResetOnLogout() {
|
||||
u.Session.AuthKey = nil
|
||||
u.TakeoutID = 0
|
||||
u.TakeoutDialogCrawlDone = false
|
||||
u.TakeoutInvalidated = false
|
||||
u.TakeoutDialogCrawlCursor = networkid.PortalID("")
|
||||
u.PushEncryptionKey = nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user