takeout: use takeout to list dialogs once permission granted

Signed-off-by: Sumner Evans <sumner.evans@automattic.com>
This commit is contained in:
Sumner Evans
2024-09-10 00:06:02 -06:00
parent 4692d46305
commit fab98cfdea
4 changed files with 101 additions and 12 deletions
+85 -6
View File
@@ -10,6 +10,7 @@ import (
"github.com/gotd/td/tg"
"github.com/gotd/td/tgerr"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"maunium.net/go/mautrix/bridgev2"
"maunium.net/go/mautrix/bridgev2/database"
"maunium.net/go/mautrix/bridgev2/networkid"
@@ -19,11 +20,25 @@ import (
// getTakeoutID blocks until the takeout ID is available.
func (t *TelegramClient) getTakeoutID(ctx context.Context) (takeoutID int64, err error) {
log := zerolog.Ctx(ctx).With().Str("function", "getTakeoutID").Logger()
// Always stop the takeout timeout timer
if t.stopTakeoutTimer != nil {
t.stopTakeoutTimer.Stop()
}
if t.userLogin.Metadata.(*UserLoginMetadata).TakeoutID != 0 {
// Resume fetching dialogs using takeout and enqueueing them for
// backfill.
go t.takeoutDialogsOnce.Do(func() {
if err = t.takeoutDialogs(ctx, takeoutID); err != nil {
log.Err(err).Msg("Failed to takeout dialogs")
}
})
return t.userLogin.Metadata.(*UserLoginMetadata).TakeoutID, nil
}
t.stopTakeoutTimer = time.AfterFunc(max(time.Hour, time.Duration(t.main.Bridge.Config.Backfill.Queue.BatchDelay*2)), sync.OnceFunc(func() { t.stopTakeout(ctx) }))
log := zerolog.Ctx(ctx).With().Str("function", "getTakeoutID").Logger()
for {
t.takeoutAccepted.Clear()
@@ -46,16 +61,81 @@ func (t *TelegramClient) getTakeoutID(ctx context.Context) (takeoutID int64, err
return 0, err
}
if t.stopTakeoutTimer != nil {
t.stopTakeoutTimer.Stop()
}
t.stopTakeoutTimer = time.AfterFunc(max(time.Hour, time.Duration(t.main.Bridge.Config.Backfill.Queue.BatchDelay*2)), sync.OnceFunc(func() { t.stopTakeout(ctx) }))
// Fetch all dialogs using takeout and enqueue them for backfill.
go t.takeoutDialogsOnce.Do(func() {
if err = t.takeoutDialogs(ctx, takeoutID); err != nil {
log.Err(err).Msg("Failed to takeout dialogs")
}
})
t.userLogin.Metadata.(*UserLoginMetadata).TakeoutID = accountTakeout.ID
return accountTakeout.ID, t.userLogin.Save(ctx)
}
}
func (t *TelegramClient) takeoutDialogs(ctx context.Context, takeoutID int64) error {
log := zerolog.Ctx(ctx).With().Str("loop", "chat_fetch").Logger()
if t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlDone {
log.Debug().Msg("Dialogs already crawled")
return nil
}
req := tg.MessagesGetDialogsRequest{
Limit: 100,
OffsetPeer: &tg.InputPeerEmpty{},
}
if t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor != "" {
var err error
req.OffsetPeer, err = t.inputPeerForPortalID(ctx, t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor)
if err != nil {
return fmt.Errorf("failed to get input peer for pagination: %w", err)
}
}
for {
log.Info().Stringer("cursor", req.OffsetPeer).Msg("Fetching dialogs")
dialogs, err := APICallWithUpdates(ctx, t, func() (*tg.MessagesDialogs, error) {
var dialogs tg.MessagesDialogs
return &dialogs, t.client.Invoke(ctx,
&tg.InvokeWithTakeoutRequest{TakeoutID: takeoutID, Query: &req},
&dialogs)
})
if err != nil {
return fmt.Errorf("failed to get dialogs: %w", err)
} else if len(dialogs.GetDialogs()) == 0 {
t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlDone = true
if err = t.userLogin.Save(ctx); err != nil {
return fmt.Errorf("failed to save user login: %w", err)
}
log.Debug().Msg("No more dialogs found")
return nil
}
err = t.handleDialogs(ctx, dialogs, -1)
if err != nil {
return fmt.Errorf("failed to handle dialogs: %w", err)
}
portalKey := ids.MakePortalKey(dialogs.GetDialogs()[len(dialogs.GetDialogs())-1].GetPeer(), t.userLogin.ID)
if t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor == portalKey.ID {
t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlDone = true
t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor = ""
log.Debug().Msg("No more dialogs found")
return nil
} else {
t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor = portalKey.ID
}
if err = t.userLogin.Save(ctx); err != nil {
return fmt.Errorf("failed to save user login: %w", err)
}
req.OffsetPeer, err = t.inputPeerForPortalID(ctx, portalKey.ID)
if err != nil {
return fmt.Errorf("failed to get input peer for pagination: %w", err)
}
}
}
func (t *TelegramClient) stopTakeout(ctx context.Context) error {
t.takeoutLock.Lock()
defer t.takeoutLock.Unlock()
@@ -82,7 +162,6 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2
return nil, err
}
t.stopTakeoutTimer.Stop()
defer t.stopTakeoutTimer.Reset(max(time.Hour, time.Duration(t.main.Bridge.Config.Backfill.Queue.BatchDelay*2)))
}
+5 -3
View File
@@ -53,9 +53,10 @@ type TelegramClient struct {
cachedContacts *tg.ContactsContacts
cachedContactsHash int64
takeoutLock sync.Mutex
takeoutAccepted *exsync.Event
stopTakeoutTimer *time.Timer
takeoutLock sync.Mutex
takeoutAccepted *exsync.Event
stopTakeoutTimer *time.Timer
takeoutDialogsOnce sync.Once
}
var (
@@ -418,6 +419,7 @@ func (t *TelegramClient) getSingleUser(ctx context.Context, id int64) (tg.UserCl
} else if users, err := t.client.API().UsersGetUsers(ctx, []tg.InputUserClass{inputUser}); err != nil {
return nil, err
} else if len(users) == 0 {
// TODO does this mean the user is deleted? Need to handle this a bit better
return nil, fmt.Errorf("failed to get user info for user %d", id)
} else {
return users[0], nil
+4
View File
@@ -12,6 +12,7 @@ import (
"maunium.net/go/mautrix/bridgev2"
"maunium.net/go/mautrix/bridgev2/bridgeconfig"
"maunium.net/go/mautrix/bridgev2/database"
"maunium.net/go/mautrix/bridgev2/networkid"
"maunium.net/go/mautrix/id"
"go.mau.fi/mautrix-telegram/pkg/connector/media"
@@ -161,6 +162,9 @@ type UserLoginMetadata struct {
Phone string `json:"phone"`
Session UserLoginSession `json:"session"`
TakeoutID int64 `json:"takeout_id,omitempty"`
TakeoutDialogCrawlDone bool `json:"takeout_portal_crawl_done,omitempty"`
TakeoutDialogCrawlCursor networkid.PortalID `json:"takeout_portal_crawl_cursor,omitempty"`
}
func (s *UserLoginSession) Load(_ context.Context) (*session.Data, error) {
+7 -3
View File
@@ -16,8 +16,6 @@ import (
)
func (t *TelegramClient) SyncChats(ctx context.Context) error {
log := zerolog.Ctx(ctx)
limit := t.main.Config.Sync.UpdateLimit
if limit <= 0 {
limit = math.MaxInt32
@@ -40,6 +38,12 @@ func (t *TelegramClient) SyncChats(ctx context.Context) error {
return err
}
return t.handleDialogs(ctx, dialogs, t.main.Config.Sync.CreateLimit)
}
func (t *TelegramClient) handleDialogs(ctx context.Context, dialogs tg.ModifiedMessagesDialogs, createLimit int) error {
log := zerolog.Ctx(ctx)
users := map[networkid.UserID]tg.UserClass{}
for _, user := range dialogs.GetUsers() {
users[ids.MakeUserID(user.GetID())] = user
@@ -89,7 +93,7 @@ func (t *TelegramClient) SyncChats(ctx context.Context) error {
}
created++ // The portal will have to be created
if created > t.main.Config.Sync.CreateLimit {
if createLimit >= 0 && created > createLimit {
break
}
}