diff --git a/pkg/gotd/telegram/updates/access_hash_feeder.go b/pkg/gotd/telegram/updates/access_hash_feeder.go index eb522eb6..a42665d2 100644 --- a/pkg/gotd/telegram/updates/access_hash_feeder.go +++ b/pkg/gotd/telegram/updates/access_hash_feeder.go @@ -72,14 +72,10 @@ func (s *internalState) handleDifference(ctx context.Context, date int) (chats [ ctx, span := s.tracer.Start(ctx, "updates.handleDifference") defer span.End() - diff, err := s.client.UpdatesGetDifference(ctx, &tg.UpdatesGetDifferenceRequest{ - Pts: s.pts.State(), - Qts: s.qts.State(), - Date: date, - }) + diff, err := s.getDifferenceWithRetry(ctx, date) if err != nil { s.log.Error("UpdatesGetDifference error", zap.Error(err)) - return nil, nil, fmt.Errorf("get difference: %w", err) + return nil, nil, fmt.Errorf("get difference failed in handleDifference: %w", err) } switch diff := diff.(type) { diff --git a/pkg/gotd/telegram/updates/state.go b/pkg/gotd/telegram/updates/state.go index 710f77c5..e5acfb32 100644 --- a/pkg/gotd/telegram/updates/state.go +++ b/pkg/gotd/telegram/updates/state.go @@ -165,10 +165,12 @@ func (s *internalState) Run(ctx context.Context) error { } s.log.Info("Starting updates handler") defer s.log.Info("Updates handler stopped") - s.getDifferenceLogger(ctx) + err := s.getDifferenceLogger(ctx) + if err != nil { + return fmt.Errorf("initial getDifference failed: %w", err) + } for { - var err error select { case <-ctx.Done(): return fmt.Errorf("parent context cancelled: %w", ctx.Err()) @@ -471,6 +473,31 @@ func (s *internalState) createAndRunChannelState(ctx context.Context, channelID, return state } +func (s *internalState) getDifferenceWithRetry(ctx context.Context, date int) (diff tg.UpdatesDifferenceClass, err error) { + for { + diff, err = s.client.UpdatesGetDifference(ctx, &tg.UpdatesGetDifferenceRequest{ + Pts: s.pts.State(), + Qts: s.qts.State(), + Date: date, + }) + if err == nil || isFatalError(err) { + return diff, err + } + dur, ok := tgerr.AsFloodWait(err) + if ok { + s.log.Warn("Flood wait error while getting difference", zap.Duration("wait", dur)) + } else { + s.log.Error("Failed to get difference, retrying in 5 seconds...", zap.Error(err)) + dur = 5 * time.Second + } + select { + case <-time.After(dur): + case <-ctx.Done(): + return nil, fmt.Errorf("context canceled while waiting to retry: %w", ctx.Err()) + } + } +} + func (s *internalState) getDifference(ctx context.Context) error { ctx, span := s.tracer.Start(ctx, "getDifference") defer span.End() @@ -493,33 +520,7 @@ func (s *internalState) getDifference(ctx context.Context) error { s.date = state.Date } - var diff tg.UpdatesDifferenceClass - var err error - for { - diff, err = s.client.UpdatesGetDifference(ctx, &tg.UpdatesGetDifferenceRequest{ - Pts: s.pts.State(), - Qts: s.qts.State(), - Date: s.date, - }) - if isFatalError(err) { - return err - } else if err != nil { - dur, ok := tgerr.AsFloodWait(err) - if ok { - s.log.Warn("Flood wait error while getting difference", zap.Duration("wait", dur)) - } else { - s.log.Error("Failed to get difference, retrying in 5 seconds...", zap.Error(err)) - dur = 5 * time.Second - } - select { - case <-time.After(dur): - case <-ctx.Done(): - return fmt.Errorf("context canceled while waiting to retry: %w", ctx.Err()) - } - } else { - break - } - } + diff, err := s.getDifferenceWithRetry(ctx, s.date) if err != nil { return errors.Wrap(err, "get difference") }