From 89b1caadbf3af79368981c462326bcf05c92f10b Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Wed, 4 Sep 2024 15:18:25 -0600 Subject: [PATCH] takeout: use takeout for backwards backfill Signed-off-by: Sumner Evans --- pkg/connector/backfill.go | 82 ++++++++++++++++++++++++++++++++++++-- pkg/connector/client.go | 8 ++++ pkg/connector/config.go | 5 ++- pkg/connector/connector.go | 10 ++--- pkg/connector/telegram.go | 10 +++++ 5 files changed, 104 insertions(+), 11 deletions(-) diff --git a/pkg/connector/backfill.go b/pkg/connector/backfill.go index 6aa65ec0..fd9021ff 100644 --- a/pkg/connector/backfill.go +++ b/pkg/connector/backfill.go @@ -4,9 +4,11 @@ import ( "context" "fmt" "slices" + "sync" "time" "github.com/gotd/td/tg" + "github.com/gotd/td/tgerr" "github.com/rs/zerolog" "maunium.net/go/mautrix/bridgev2" "maunium.net/go/mautrix/bridgev2/database" @@ -15,12 +17,75 @@ import ( "go.mau.fi/mautrix-telegram/pkg/connector/ids" ) +// getTakeoutID blocks until the takeout ID is available. +func (t *TelegramClient) getTakeoutID(ctx context.Context) (takeoutID int64, err error) { + log := zerolog.Ctx(ctx).With().Str("function", "getTakeoutID").Logger() + if t.userLogin.Metadata.(*UserLoginMetadata).TakeoutID != 0 { + return t.userLogin.Metadata.(*UserLoginMetadata).TakeoutID, nil + } + + for { + t.takeoutAccepted.Clear() + + accountTakeout, err := t.client.API().AccountInitTakeoutSession(ctx, &tg.AccountInitTakeoutSessionRequest{ + MessageUsers: true, + MessageChats: true, + MessageMegagroups: true, + MessageChannels: true, + Files: true, + FileMaxSize: min(t.main.maxFileSize, 2000*1024*1024), + }) + if rpcErr, ok := tgerr.As(err); ok && rpcErr.IsOneOf(tg.ErrTakeoutInitDelay) { + log.Warn(). + Err(err). + Int("delay", rpcErr.Argument). + Msg("Takeout requested, will wait for retry request or delay") + t.takeoutAccepted.WaitTimeout(time.Duration(rpcErr.Argument) * time.Second) + continue + } else if err != nil { + return 0, err + } + + if t.stopTakeoutTimer != nil { + t.stopTakeoutTimer.Stop() + } + t.stopTakeoutTimer = time.AfterFunc(max(time.Hour, time.Duration(t.main.Bridge.Config.Backfill.Queue.BatchDelay*2)), sync.OnceFunc(func() { t.stopTakeout(ctx) })) + + t.userLogin.Metadata.(*UserLoginMetadata).TakeoutID = accountTakeout.ID + return accountTakeout.ID, t.userLogin.Save(ctx) + } +} + +func (t *TelegramClient) stopTakeout(ctx context.Context) error { + t.takeoutLock.Lock() + defer t.takeoutLock.Unlock() + + _, err := t.client.API().AccountFinishTakeoutSession(ctx, &tg.AccountFinishTakeoutSessionRequest{Success: true}) + if err != nil { + return err + } + t.userLogin.Metadata.(*UserLoginMetadata).TakeoutID = 0 + return t.userLogin.Save(ctx) +} + func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2.FetchMessagesParams) (*bridgev2.FetchMessagesResponse, error) { - log := zerolog.Ctx(ctx).With(). - Str("method", "FetchMessages"). - Logger() + log := zerolog.Ctx(ctx).With().Str("method", "FetchMessages").Logger() ctx = log.WithContext(ctx) + var takeoutID int64 + var err error + if !fetchParams.Forward { // Backwards + t.takeoutLock.Lock() + defer t.takeoutLock.Unlock() + takeoutID, err = t.getTakeoutID(ctx) + if err != nil { + return nil, err + } + + t.stopTakeoutTimer.Stop() + defer t.stopTakeoutTimer.Reset(max(time.Hour, time.Duration(t.main.Bridge.Config.Backfill.Queue.BatchDelay*2))) + } + peer, err := t.inputPeerForPortalID(ctx, fetchParams.Portal.ID) if err != nil { return nil, err @@ -37,7 +102,16 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2 } } msgs, err := APICallWithUpdates(ctx, t, func() (tg.ModifiedMessagesMessages, error) { - rawMsgs, err := t.client.API().MessagesGetHistory(ctx, &req) + var rawMsgs tg.MessagesMessagesClass + if fetchParams.Forward { + rawMsgs, err = t.client.API().MessagesGetHistory(ctx, &req) + } else { + var messages tg.MessagesMessagesBox + err = t.client.Invoke(ctx, + &tg.InvokeWithTakeoutRequest{TakeoutID: takeoutID, Query: &req}, + &messages) + rawMsgs = messages.Messages + } if err != nil { return nil, err } diff --git a/pkg/connector/client.go b/pkg/connector/client.go index eb81da2c..3c453615 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -8,12 +8,14 @@ import ( "slices" "strconv" "strings" + "sync" "time" "github.com/gotd/td/telegram" "github.com/gotd/td/telegram/updates" "github.com/gotd/td/tg" "github.com/rs/zerolog" + "go.mau.fi/util/exsync" "go.mau.fi/zerozap" "go.uber.org/zap" "maunium.net/go/mautrix/bridge/status" @@ -50,6 +52,10 @@ type TelegramClient struct { cachedContacts *tg.ContactsContacts cachedContactsHash int64 + + takeoutLock sync.Mutex + takeoutAccepted *exsync.Event + stopTakeoutTimer *time.Timer } var ( @@ -114,6 +120,8 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge loginID: login.ID, userID: networkid.UserID(login.ID), userLogin: login, + + takeoutAccepted: exsync.NewEvent(), } dispatcher := UpdateDispatcher{ UpdateDispatcher: tg.NewUpdateDispatcher(), diff --git a/pkg/connector/config.go b/pkg/connector/config.go index b5c39dfc..21fffdd6 100644 --- a/pkg/connector/config.go +++ b/pkg/connector/config.go @@ -158,8 +158,9 @@ type UserLoginSession struct { } type UserLoginMetadata struct { - Phone string `json:"phone"` - Session UserLoginSession `json:"session"` + Phone string `json:"phone"` + Session UserLoginSession `json:"session"` + TakeoutID int64 `json:"takeout_id,omitempty"` } func (s *UserLoginSession) Load(_ context.Context) (*session.Data, error) { diff --git a/pkg/connector/connector.go b/pkg/connector/connector.go index 3f19155b..ce18fe69 100644 --- a/pkg/connector/connector.go +++ b/pkg/connector/connector.go @@ -32,11 +32,11 @@ type TelegramConnector struct { Store *store.Container useDirectMedia bool + maxFileSize int64 } var _ bridgev2.NetworkConnector = (*TelegramConnector)(nil) - -// var _ bridgev2.MaxFileSizeingNetwork = (*TelegramConnector)(nil) +var _ bridgev2.MaxFileSizeingNetwork = (*TelegramConnector)(nil) func (tg *TelegramConnector) Init(bridge *bridgev2.Bridge) { tg.Store = store.NewStore(bridge.DB.Database, dbutil.ZeroLogger(bridge.Log.With().Str("db_section", "telegram").Logger())) @@ -53,9 +53,9 @@ func (tc *TelegramConnector) LoadUserLogin(ctx context.Context, login *bridgev2. return } -// TODO -// func (tg *TelegramConnector) SetMaxFileSize(maxSize int64) { -// } +func (tg *TelegramConnector) SetMaxFileSize(maxSize int64) { + tg.maxFileSize = maxSize +} func (tg *TelegramConnector) GetName() bridgev2.BridgeName { return bridgev2.BridgeName{ diff --git a/pkg/connector/telegram.go b/pkg/connector/telegram.go index f768adcd..a378c793 100644 --- a/pkg/connector/telegram.go +++ b/pkg/connector/telegram.go @@ -386,6 +386,16 @@ func (t *TelegramClient) onMessageEdit(ctx context.Context, update IGetMessage) sender := t.getEventSender(msg) + // Check if this edit was a data export request acceptance message + if sender.Sender == networkid.UserID("777000") { + if strings.Contains(msg.Message, "Data export request") && strings.Contains(msg.Message, "Accepted") { + zerolog.Ctx(ctx).Info(). + Int("message_id", msg.ID). + Msg("Received an edit to message that looks like the data export was accepted, marking takeout as retriable") + t.takeoutAccepted.Set() + } + } + t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[*tg.Message]{ EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventEdit,