takeout: use takeout for backwards backfill

Signed-off-by: Sumner Evans <sumner.evans@automattic.com>
This commit is contained in:
Sumner Evans
2024-09-04 15:18:25 -06:00
parent 4d4060f37b
commit 89b1caadbf
5 changed files with 104 additions and 11 deletions
+78 -4
View File
@@ -4,9 +4,11 @@ import (
"context"
"fmt"
"slices"
"sync"
"time"
"github.com/gotd/td/tg"
"github.com/gotd/td/tgerr"
"github.com/rs/zerolog"
"maunium.net/go/mautrix/bridgev2"
"maunium.net/go/mautrix/bridgev2/database"
@@ -15,12 +17,75 @@ import (
"go.mau.fi/mautrix-telegram/pkg/connector/ids"
)
// getTakeoutID blocks until the takeout ID is available.
func (t *TelegramClient) getTakeoutID(ctx context.Context) (takeoutID int64, err error) {
log := zerolog.Ctx(ctx).With().Str("function", "getTakeoutID").Logger()
if t.userLogin.Metadata.(*UserLoginMetadata).TakeoutID != 0 {
return t.userLogin.Metadata.(*UserLoginMetadata).TakeoutID, nil
}
for {
t.takeoutAccepted.Clear()
accountTakeout, err := t.client.API().AccountInitTakeoutSession(ctx, &tg.AccountInitTakeoutSessionRequest{
MessageUsers: true,
MessageChats: true,
MessageMegagroups: true,
MessageChannels: true,
Files: true,
FileMaxSize: min(t.main.maxFileSize, 2000*1024*1024),
})
if rpcErr, ok := tgerr.As(err); ok && rpcErr.IsOneOf(tg.ErrTakeoutInitDelay) {
log.Warn().
Err(err).
Int("delay", rpcErr.Argument).
Msg("Takeout requested, will wait for retry request or delay")
t.takeoutAccepted.WaitTimeout(time.Duration(rpcErr.Argument) * time.Second)
continue
} else if err != nil {
return 0, err
}
if t.stopTakeoutTimer != nil {
t.stopTakeoutTimer.Stop()
}
t.stopTakeoutTimer = time.AfterFunc(max(time.Hour, time.Duration(t.main.Bridge.Config.Backfill.Queue.BatchDelay*2)), sync.OnceFunc(func() { t.stopTakeout(ctx) }))
t.userLogin.Metadata.(*UserLoginMetadata).TakeoutID = accountTakeout.ID
return accountTakeout.ID, t.userLogin.Save(ctx)
}
}
func (t *TelegramClient) stopTakeout(ctx context.Context) error {
t.takeoutLock.Lock()
defer t.takeoutLock.Unlock()
_, err := t.client.API().AccountFinishTakeoutSession(ctx, &tg.AccountFinishTakeoutSessionRequest{Success: true})
if err != nil {
return err
}
t.userLogin.Metadata.(*UserLoginMetadata).TakeoutID = 0
return t.userLogin.Save(ctx)
}
func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2.FetchMessagesParams) (*bridgev2.FetchMessagesResponse, error) {
log := zerolog.Ctx(ctx).With().
Str("method", "FetchMessages").
Logger()
log := zerolog.Ctx(ctx).With().Str("method", "FetchMessages").Logger()
ctx = log.WithContext(ctx)
var takeoutID int64
var err error
if !fetchParams.Forward { // Backwards
t.takeoutLock.Lock()
defer t.takeoutLock.Unlock()
takeoutID, err = t.getTakeoutID(ctx)
if err != nil {
return nil, err
}
t.stopTakeoutTimer.Stop()
defer t.stopTakeoutTimer.Reset(max(time.Hour, time.Duration(t.main.Bridge.Config.Backfill.Queue.BatchDelay*2)))
}
peer, err := t.inputPeerForPortalID(ctx, fetchParams.Portal.ID)
if err != nil {
return nil, err
@@ -37,7 +102,16 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2
}
}
msgs, err := APICallWithUpdates(ctx, t, func() (tg.ModifiedMessagesMessages, error) {
rawMsgs, err := t.client.API().MessagesGetHistory(ctx, &req)
var rawMsgs tg.MessagesMessagesClass
if fetchParams.Forward {
rawMsgs, err = t.client.API().MessagesGetHistory(ctx, &req)
} else {
var messages tg.MessagesMessagesBox
err = t.client.Invoke(ctx,
&tg.InvokeWithTakeoutRequest{TakeoutID: takeoutID, Query: &req},
&messages)
rawMsgs = messages.Messages
}
if err != nil {
return nil, err
}
+8
View File
@@ -8,12 +8,14 @@ import (
"slices"
"strconv"
"strings"
"sync"
"time"
"github.com/gotd/td/telegram"
"github.com/gotd/td/telegram/updates"
"github.com/gotd/td/tg"
"github.com/rs/zerolog"
"go.mau.fi/util/exsync"
"go.mau.fi/zerozap"
"go.uber.org/zap"
"maunium.net/go/mautrix/bridge/status"
@@ -50,6 +52,10 @@ type TelegramClient struct {
cachedContacts *tg.ContactsContacts
cachedContactsHash int64
takeoutLock sync.Mutex
takeoutAccepted *exsync.Event
stopTakeoutTimer *time.Timer
}
var (
@@ -114,6 +120,8 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge
loginID: login.ID,
userID: networkid.UserID(login.ID),
userLogin: login,
takeoutAccepted: exsync.NewEvent(),
}
dispatcher := UpdateDispatcher{
UpdateDispatcher: tg.NewUpdateDispatcher(),
+3 -2
View File
@@ -158,8 +158,9 @@ type UserLoginSession struct {
}
type UserLoginMetadata struct {
Phone string `json:"phone"`
Session UserLoginSession `json:"session"`
Phone string `json:"phone"`
Session UserLoginSession `json:"session"`
TakeoutID int64 `json:"takeout_id,omitempty"`
}
func (s *UserLoginSession) Load(_ context.Context) (*session.Data, error) {
+5 -5
View File
@@ -32,11 +32,11 @@ type TelegramConnector struct {
Store *store.Container
useDirectMedia bool
maxFileSize int64
}
var _ bridgev2.NetworkConnector = (*TelegramConnector)(nil)
// var _ bridgev2.MaxFileSizeingNetwork = (*TelegramConnector)(nil)
var _ bridgev2.MaxFileSizeingNetwork = (*TelegramConnector)(nil)
func (tg *TelegramConnector) Init(bridge *bridgev2.Bridge) {
tg.Store = store.NewStore(bridge.DB.Database, dbutil.ZeroLogger(bridge.Log.With().Str("db_section", "telegram").Logger()))
@@ -53,9 +53,9 @@ func (tc *TelegramConnector) LoadUserLogin(ctx context.Context, login *bridgev2.
return
}
// TODO
// func (tg *TelegramConnector) SetMaxFileSize(maxSize int64) {
// }
func (tg *TelegramConnector) SetMaxFileSize(maxSize int64) {
tg.maxFileSize = maxSize
}
func (tg *TelegramConnector) GetName() bridgev2.BridgeName {
return bridgev2.BridgeName{
+10
View File
@@ -386,6 +386,16 @@ func (t *TelegramClient) onMessageEdit(ctx context.Context, update IGetMessage)
sender := t.getEventSender(msg)
// Check if this edit was a data export request acceptance message
if sender.Sender == networkid.UserID("777000") {
if strings.Contains(msg.Message, "Data export request") && strings.Contains(msg.Message, "Accepted") {
zerolog.Ctx(ctx).Info().
Int("message_id", msg.ID).
Msg("Received an edit to message that looks like the data export was accepted, marking takeout as retriable")
t.takeoutAccepted.Set()
}
}
t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Message[*tg.Message]{
EventMeta: simplevent.EventMeta{
Type: bridgev2.RemoteEventEdit,