diff --git a/pkg/connector/backfill.go b/pkg/connector/backfill.go index 11bed9f6..b0787f78 100644 --- a/pkg/connector/backfill.go +++ b/pkg/connector/backfill.go @@ -153,56 +153,91 @@ func (tc *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev if fetchParams.Portal.Metadata.(*PortalMetadata).IsForumGeneral { topicID = 1 } - var req bin.Object if topicID == ids.TopicIDSpaceRoom { return nil, nil - } else if topicID > 0 { - req = &tg.MessagesGetRepliesRequest{ - Peer: peer, - MsgID: topicID, - Limit: fetchParams.Count, - MinID: minID, - OffsetID: offsetID, - } - } else { - req = &tg.MessagesGetHistoryRequest{ - Peer: peer, - Limit: fetchParams.Count, - MinID: minID, - OffsetID: offsetID, - } } - if takeoutID != 0 { - req = &tg.InvokeWithTakeoutRequest{TakeoutID: takeoutID, Query: req} - } - log.Info().Any("req", req).Msg("Fetching messages") - msgs, err := APICallWithUpdates(ctx, tc, func() (tg.ModifiedMessagesMessages, error) { - var box tg.MessagesMessagesBox - // TODO a single request can only fetch 100 messages, use multiple requests if the requested count is higher - err = tc.client.Invoke(ctx, req, &box) - if err != nil { - return nil, err - } - msgs, ok := box.Messages.(tg.ModifiedMessagesMessages) - if !ok { - return nil, fmt.Errorf("unsupported messages type %T", box.Messages) - } - return msgs, nil - }) - if err != nil { - if tgerr.Is(err, tg.ErrTakeoutInvalid) { - tc.metadata.TakeoutID = 0 - err := tc.userLogin.Save(ctx) - if err != nil { - log.Err(err).Msg("Failed to save user login after clearing takeout ID") - } else { - log.Debug().Msg("Cleared invalid takeout ID") + limit := fetchParams.Count + const chunkLimit = 100 + makeReq := func() bin.Object { + if topicID > 0 { + return &tg.MessagesGetRepliesRequest{ + Peer: peer, + MsgID: topicID, + Limit: min(limit, chunkLimit), + MinID: minID, + OffsetID: offsetID, + } + } + return &tg.MessagesGetHistoryRequest{ + Peer: peer, + Limit: min(limit, chunkLimit), + MinID: minID, + OffsetID: offsetID, + } + } + var messages []tg.MessageClass + requestCount := 0 + for limit > 0 { + requestCount++ + req := makeReq() + if takeoutID != 0 { + req = &tg.InvokeWithTakeoutRequest{TakeoutID: takeoutID, Query: req} + } + log.Info().Any("req", req).Msg("Fetching messages") + resp, err := APICallWithUpdates(ctx, tc, func() (tg.ModifiedMessagesMessages, error) { + var box tg.MessagesMessagesBox + retry := true + attempts := 0 + var err error + for retry && attempts < 5 { + retry, err = tgerr.FloodWait(ctx, tc.client.Invoke(ctx, req, &box)) + attempts++ + } + if err != nil { + return nil, err + } + msgs, ok := box.Messages.(tg.ModifiedMessagesMessages) + if !ok { + return nil, fmt.Errorf("unsupported messages type %T", box.Messages) + } + return msgs, nil + }) + if err != nil { + if tgerr.Is(err, tg.ErrTakeoutInvalid) { + tc.metadata.TakeoutID = 0 + err := tc.userLogin.Save(ctx) + if err != nil { + log.Err(err).Msg("Failed to save user login after clearing takeout ID") + } else { + log.Debug().Msg("Cleared invalid takeout ID") + } + } + return nil, err + } + newMessages := resp.GetMessages() + if messages == nil { + messages = newMessages + } else { + messages = append(messages, resp.GetMessages()...) + } + if len(newMessages) < chunkLimit || !fetchParams.Forward { + break + } + limit -= len(newMessages) + offsetID = newMessages[len(newMessages)-1].GetID() + if takeoutID == 0 { + waitTime := time.Duration(min(requestCount*2, 15)) * time.Second + log.Debug(). + Dur("wait_time", waitTime). + Msg("Not using takeout, waiting before requesting another batch of messages") + select { + case <-time.After(waitTime): + case <-ctx.Done(): + return nil, ctx.Err() } } - return nil, err } - messages := msgs.GetMessages() portal := fetchParams.Portal // If the first message is the last read message, mark the chat as read