664d6050df
For some reason, even though we provide an offset, Telegram sometimes sends us more events than we request, including newer events than the offset ID. Messages beyond the offset are then chopped off by the bridgev2 code, but we continue trying to backfill the portal thinking that there is more to backfill. This causes infinite backfill loops. Signed-off-by: Sumner Evans <sumner.evans@automattic.com>
360 lines
11 KiB
Go
360 lines
11 KiB
Go
package connector
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"slices"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gotd/td/tg"
|
|
"github.com/gotd/td/tgerr"
|
|
"github.com/rs/zerolog"
|
|
"maunium.net/go/mautrix/bridgev2"
|
|
"maunium.net/go/mautrix/bridgev2/database"
|
|
"maunium.net/go/mautrix/bridgev2/networkid"
|
|
|
|
"go.mau.fi/mautrix-telegram/pkg/connector/ids"
|
|
)
|
|
|
|
// getTakeoutID blocks until the takeout ID is available.
|
|
func (t *TelegramClient) getTakeoutID(ctx context.Context) (takeoutID int64, err error) {
|
|
// Always stop the takeout timeout timer
|
|
if t.stopTakeoutTimer != nil {
|
|
t.stopTakeoutTimer.Stop()
|
|
}
|
|
log := zerolog.Ctx(ctx).With().Str("function", "getTakeoutID").Logger()
|
|
|
|
if t.userLogin.Metadata.(*UserLoginMetadata).TakeoutID != 0 {
|
|
// Resume fetching dialogs using takeout and enqueueing them for
|
|
// backfill.
|
|
go t.takeoutDialogsOnce.Do(func() {
|
|
if err = t.takeoutDialogs(ctx, takeoutID); err != nil {
|
|
log.Err(err).Msg("Failed to takeout dialogs")
|
|
}
|
|
})
|
|
return t.userLogin.Metadata.(*UserLoginMetadata).TakeoutID, nil
|
|
}
|
|
|
|
t.stopTakeoutTimer = time.AfterFunc(max(time.Hour, time.Duration(t.main.Bridge.Config.Backfill.Queue.BatchDelay*2)), sync.OnceFunc(func() { t.stopTakeout(ctx) }))
|
|
|
|
for {
|
|
t.takeoutAccepted.Clear()
|
|
|
|
accountTakeout, err := t.client.API().AccountInitTakeoutSession(ctx, &tg.AccountInitTakeoutSessionRequest{
|
|
MessageUsers: true,
|
|
MessageChats: true,
|
|
MessageMegagroups: true,
|
|
MessageChannels: true,
|
|
Files: true,
|
|
FileMaxSize: min(t.main.maxFileSize, 2000*1024*1024),
|
|
})
|
|
if rpcErr, ok := tgerr.As(err); ok && rpcErr.IsOneOf(tg.ErrTakeoutInitDelay) {
|
|
log.Warn().
|
|
Err(err).
|
|
Int("delay", rpcErr.Argument).
|
|
Msg("Takeout requested, will wait for retry request or delay")
|
|
t.takeoutAccepted.WaitTimeout(time.Duration(rpcErr.Argument) * time.Second)
|
|
continue
|
|
} else if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Fetch all dialogs using takeout and enqueue them for backfill.
|
|
go t.takeoutDialogsOnce.Do(func() {
|
|
if err = t.takeoutDialogs(ctx, takeoutID); err != nil {
|
|
log.Err(err).Msg("Failed to takeout dialogs")
|
|
}
|
|
})
|
|
|
|
t.userLogin.Metadata.(*UserLoginMetadata).TakeoutID = accountTakeout.ID
|
|
return accountTakeout.ID, t.userLogin.Save(ctx)
|
|
}
|
|
}
|
|
|
|
func (t *TelegramClient) takeoutDialogs(ctx context.Context, takeoutID int64) error {
|
|
log := zerolog.Ctx(ctx).With().Str("loop", "chat_fetch").Logger()
|
|
if t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlDone {
|
|
log.Debug().Msg("Dialogs already crawled")
|
|
return nil
|
|
}
|
|
|
|
req := tg.MessagesGetDialogsRequest{
|
|
Limit: 100,
|
|
OffsetPeer: &tg.InputPeerEmpty{},
|
|
}
|
|
if t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor != "" {
|
|
var err error
|
|
req.OffsetPeer, err = t.inputPeerForPortalID(ctx, t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get input peer for pagination: %w", err)
|
|
}
|
|
}
|
|
for {
|
|
log.Info().Stringer("cursor", req.OffsetPeer).Msg("Fetching dialogs")
|
|
dialogs, err := APICallWithUpdates(ctx, t, func() (tg.ModifiedMessagesDialogs, error) {
|
|
var dialogs tg.MessagesDialogsBox
|
|
err := t.client.Invoke(ctx,
|
|
&tg.InvokeWithTakeoutRequest{TakeoutID: takeoutID, Query: &req},
|
|
&dialogs)
|
|
if err != nil {
|
|
return nil, err
|
|
} else if modified, ok := dialogs.Dialogs.AsModified(); !ok {
|
|
return nil, fmt.Errorf("unexpected response type: %T", dialogs.Dialogs)
|
|
} else {
|
|
return modified, nil
|
|
}
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get dialogs: %w", err)
|
|
} else if len(dialogs.GetDialogs()) == 0 {
|
|
t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlDone = true
|
|
if err = t.userLogin.Save(ctx); err != nil {
|
|
return fmt.Errorf("failed to save user login: %w", err)
|
|
}
|
|
log.Debug().Msg("No more dialogs found")
|
|
return nil
|
|
}
|
|
|
|
if req.OffsetPeer.TypeID() == tg.InputPeerEmptyTypeID {
|
|
// This is the first fetch of dialogs, reset the pinned dialogs
|
|
// based on the list.
|
|
if err := t.resetPinnedDialogs(ctx, dialogs.GetDialogs()); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
err = t.handleDialogs(ctx, dialogs, -1)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to handle dialogs: %w", err)
|
|
}
|
|
|
|
portalKey := t.makePortalKeyFromPeer(dialogs.GetDialogs()[len(dialogs.GetDialogs())-1].GetPeer())
|
|
|
|
if t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor == portalKey.ID {
|
|
t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlDone = true
|
|
t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor = ""
|
|
log.Debug().Msg("No more dialogs found")
|
|
return nil
|
|
} else {
|
|
t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor = portalKey.ID
|
|
}
|
|
if err = t.userLogin.Save(ctx); err != nil {
|
|
return fmt.Errorf("failed to save user login: %w", err)
|
|
}
|
|
|
|
req.OffsetPeer, err = t.inputPeerForPortalID(ctx, portalKey.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get input peer for pagination: %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (t *TelegramClient) stopTakeout(ctx context.Context) error {
|
|
t.takeoutLock.Lock()
|
|
defer t.takeoutLock.Unlock()
|
|
|
|
_, err := t.client.API().AccountFinishTakeoutSession(ctx, &tg.AccountFinishTakeoutSessionRequest{Success: true})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
t.userLogin.Metadata.(*UserLoginMetadata).TakeoutID = 0
|
|
return t.userLogin.Save(ctx)
|
|
}
|
|
|
|
func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2.FetchMessagesParams) (*bridgev2.FetchMessagesResponse, error) {
|
|
log := zerolog.Ctx(ctx).With().Str("method", "FetchMessages").Logger()
|
|
ctx = log.WithContext(ctx)
|
|
|
|
var takeoutID int64
|
|
var err error
|
|
if !fetchParams.Forward { // Backwards
|
|
t.takeoutLock.Lock()
|
|
defer t.takeoutLock.Unlock()
|
|
takeoutID, err = t.getTakeoutID(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
defer func() {
|
|
if t.stopTakeoutTimer == nil {
|
|
t.stopTakeoutTimer = time.AfterFunc(max(time.Hour, time.Duration(t.main.Bridge.Config.Backfill.Queue.BatchDelay*2)), sync.OnceFunc(func() { t.stopTakeout(ctx) }))
|
|
} else {
|
|
t.stopTakeoutTimer.Reset(max(time.Hour, time.Duration(t.main.Bridge.Config.Backfill.Queue.BatchDelay*2)))
|
|
}
|
|
}()
|
|
}
|
|
|
|
peer, err := t.inputPeerForPortalID(ctx, fetchParams.Portal.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
req := tg.MessagesGetHistoryRequest{
|
|
Peer: peer,
|
|
Limit: fetchParams.Count,
|
|
}
|
|
if fetchParams.AnchorMessage != nil {
|
|
if fetchParams.Forward {
|
|
_, req.MinID, err = ids.ParseMessageID(fetchParams.AnchorMessage.ID)
|
|
} else {
|
|
_, req.OffsetID, err = ids.ParseMessageID(fetchParams.AnchorMessage.ID)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
log.Info().Any("req", req).Msg("Fetching messages")
|
|
msgs, err := APICallWithUpdates(ctx, t, func() (tg.ModifiedMessagesMessages, error) {
|
|
var rawMsgs tg.MessagesMessagesClass
|
|
if fetchParams.Forward {
|
|
rawMsgs, err = t.client.API().MessagesGetHistory(ctx, &req)
|
|
} else {
|
|
var messages tg.MessagesMessagesBox
|
|
err = t.client.Invoke(ctx,
|
|
&tg.InvokeWithTakeoutRequest{TakeoutID: takeoutID, Query: &req},
|
|
&messages)
|
|
rawMsgs = messages.Messages
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
msgs, ok := rawMsgs.(tg.ModifiedMessagesMessages)
|
|
if !ok {
|
|
return nil, fmt.Errorf("unsupported messages type %T", rawMsgs)
|
|
}
|
|
return msgs, nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
messages := msgs.GetMessages()
|
|
|
|
portal, err := t.main.Bridge.GetPortalByKey(ctx, fetchParams.Portal.PortalKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// If the first message is the last read message, mark the chat as read
|
|
// during backfill.
|
|
markRead := fetchParams.Forward && portal.Metadata.(*PortalMetadata).ReadUpTo == messages[0].GetID()
|
|
|
|
var cursor networkid.PaginationCursor
|
|
if len(messages) > 0 {
|
|
cursor = ids.MakePaginationCursorID(messages[len(messages)-1].GetID())
|
|
}
|
|
|
|
var stopAt int
|
|
if fetchParams.AnchorMessage != nil {
|
|
_, stopAt, err = ids.ParseMessageID(fetchParams.AnchorMessage.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
log = log.With().Int("stop_at", stopAt).Logger()
|
|
}
|
|
|
|
var backfillMessages []*bridgev2.BackfillMessage
|
|
for _, msg := range messages {
|
|
log := log.With().Int("message_id", msg.GetID()).Logger()
|
|
if stopAt > 0 {
|
|
if fetchParams.Forward && msg.GetID() <= stopAt {
|
|
// If we are doing forward backfill and we get to the anchor
|
|
// message, don't convert any more messages.
|
|
log.Debug().Msg("stopping at anchor message")
|
|
break
|
|
} else if msg.GetID() >= stopAt {
|
|
// If we are doing backwards backfill and we get a message more
|
|
// recent than the anchor message, skip it.
|
|
log.Debug().Msg("skipping message past anchor message")
|
|
continue
|
|
}
|
|
}
|
|
|
|
message, ok := msg.(*tg.Message)
|
|
if !ok {
|
|
log.Warn().Str("type", msg.TypeName()).Msg("skipping backfilling unsupported message type")
|
|
continue
|
|
}
|
|
|
|
sender := t.getEventSender(message, !portal.Metadata.(*PortalMetadata).IsSuperGroup)
|
|
intent := portal.GetIntentFor(ctx, sender, t.userLogin, bridgev2.RemoteEventBackfill)
|
|
converted, err := t.convertToMatrix(ctx, portal, intent, message)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
backfillMessage := bridgev2.BackfillMessage{
|
|
ConvertedMessage: converted,
|
|
Sender: sender,
|
|
ID: ids.GetMessageIDFromMessage(message),
|
|
Timestamp: time.Unix(int64(message.Date), 0),
|
|
}
|
|
|
|
if reactions, ok := message.GetReactions(); ok {
|
|
reactionsList, _, customEmojis, err := t.computeReactionsList(ctx, message.PeerID, message.ID, reactions)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, reaction := range reactionsList {
|
|
peer, ok := reaction.PeerID.(*tg.PeerUser)
|
|
if !ok {
|
|
return nil, fmt.Errorf("unknown peer type %T", reaction.PeerID)
|
|
}
|
|
|
|
emojiID, emoji, err := computeEmojiAndID(reaction.Reaction, customEmojis)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to compute emoji and ID: %w", err)
|
|
}
|
|
|
|
backfillMessage.Reactions = append(backfillMessage.Reactions, &bridgev2.BackfillReaction{
|
|
Timestamp: time.Unix(int64(reaction.Date), 0),
|
|
Sender: t.senderForUserID(peer.UserID),
|
|
EmojiID: emojiID,
|
|
Emoji: emoji,
|
|
})
|
|
}
|
|
}
|
|
|
|
backfillMessages = append(backfillMessages, &backfillMessage)
|
|
}
|
|
|
|
// They are returned with most recent message first, so reverse the order.
|
|
slices.Reverse(backfillMessages)
|
|
|
|
return &bridgev2.FetchMessagesResponse{
|
|
Messages: backfillMessages,
|
|
Cursor: cursor,
|
|
HasMore: len(backfillMessages) > 0,
|
|
Forward: fetchParams.Forward,
|
|
MarkRead: markRead,
|
|
}, nil
|
|
}
|
|
|
|
func (c *TelegramClient) GetBackfillMaxBatchCount(ctx context.Context, portal *bridgev2.Portal, task *database.BackfillTask) int {
|
|
log := zerolog.Ctx(ctx).With().
|
|
Str("method", "GetBackfillMaxBatchCount").
|
|
Logger()
|
|
peerType, _, err := ids.ParsePortalID(portal.ID)
|
|
if err != nil {
|
|
log.Err(err).Msg("failed to parse portal ID")
|
|
return 0
|
|
}
|
|
switch peerType {
|
|
case ids.PeerTypeUser:
|
|
return c.main.Bridge.Config.Backfill.Queue.GetOverride("user")
|
|
case ids.PeerTypeChat:
|
|
return c.main.Bridge.Config.Backfill.Queue.GetOverride("normal_group")
|
|
case ids.PeerTypeChannel:
|
|
if portal.Metadata.(*PortalMetadata).IsSuperGroup {
|
|
return c.main.Bridge.Config.Backfill.Queue.GetOverride("supergroup")
|
|
} else {
|
|
return c.main.Bridge.Config.Backfill.Queue.GetOverride("channel")
|
|
}
|
|
default:
|
|
log.Error().Str("peer_type", string(peerType)).Msg("unknown peer type")
|
|
return 0
|
|
}
|
|
}
|