sync: add on-command sync

Signed-off-by: Sumner Evans <sumner.evans@automattic.com>
This commit is contained in:
Sumner Evans
2024-08-21 22:44:35 -06:00
parent 24d0d4687a
commit 8b8b689187
10 changed files with 263 additions and 32 deletions
+1 -1
View File
@@ -11,7 +11,7 @@ require (
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa
golang.org/x/net v0.28.0
maunium.net/go/mautrix v0.20.1-0.20240820143721-591ac60f0caa
maunium.net/go/mautrix v0.20.1-0.20240821194048-675d176b4662
)
require (
+2 -2
View File
@@ -114,8 +114,8 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M=
maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA=
maunium.net/go/mautrix v0.20.1-0.20240820143721-591ac60f0caa h1:um7ddCVXb4wvb0pmtgoQc8GClUpmXeVYQE1BrI7gS7g=
maunium.net/go/mautrix v0.20.1-0.20240820143721-591ac60f0caa/go.mod h1:NhWZ4jpQ2CW+t6TmGrnydAIL0htdoXmGiNTdHb2PzL4=
maunium.net/go/mautrix v0.20.1-0.20240821194048-675d176b4662 h1:f7iw18KChWZtHA6dKJcm4uv3yjxzej4ToDuK8JwS4j0=
maunium.net/go/mautrix v0.20.1-0.20240821194048-675d176b4662/go.mod h1:NhWZ4jpQ2CW+t6TmGrnydAIL0htdoXmGiNTdHb2PzL4=
nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0=
nhooyr.io/websocket v1.8.11/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c=
rsc.io/qr v0.2.0 h1:6vBLea5/NRMVTz8V66gipeLycZMl/+UlFmk8DvqQ6WY=
+19 -15
View File
@@ -9,6 +9,7 @@ import (
"go.mau.fi/util/ptr"
"maunium.net/go/mautrix/bridgev2"
"maunium.net/go/mautrix/bridgev2/database"
"maunium.net/go/mautrix/bridgev2/networkid"
"go.mau.fi/mautrix-telegram/pkg/connector/ids"
"go.mau.fi/mautrix-telegram/pkg/connector/media"
@@ -39,15 +40,16 @@ func (t *TelegramClient) getDMChatInfo(ctx context.Context, userID int64) (*brid
} else if err = t.updateGhostWithUserInfo(ctx, userID, userInfo); err != nil {
return nil, err
} else {
chatInfo.Members.Members = []bridgev2.ChatMember{
{
networkUserID := ids.MakeUserID(userID)
chatInfo.Members.MemberMap = map[networkid.UserID]bridgev2.ChatMember{
networkUserID: {
EventSender: bridgev2.EventSender{
SenderLogin: ids.MakeUserLoginID(userID),
Sender: ids.MakeUserID(userID),
},
UserInfo: userInfo,
},
{EventSender: t.mySender()},
t.userID: {EventSender: t.mySender()},
}
}
return &chatInfo, nil
@@ -74,8 +76,8 @@ func (t *TelegramClient) getGroupChatInfo(fullChat *tg.MessagesChatFull, chatID
Name: name,
Type: ptr.Ptr(database.RoomTypeGroupDM), // TODO Is this correct for channels?
Members: &bridgev2.ChatMemberList{
IsFull: true,
Members: []bridgev2.ChatMember{{EventSender: t.mySender()}},
IsFull: true,
MemberMap: map[networkid.UserID]bridgev2.ChatMember{},
},
CanBackfill: true,
ExtraUpdates: func(ctx context.Context, p *bridgev2.Portal) bool {
@@ -178,15 +180,16 @@ func (t *TelegramClient) GetChatInfo(ctx context.Context, portal *bridgev2.Porta
continue
}
chatInfo.Members.Members = append(chatInfo.Members.Members, bridgev2.ChatMember{
sender := ids.MakeUserID(user.GetUserID())
chatInfo.Members.MemberMap[sender] = bridgev2.ChatMember{
EventSender: bridgev2.EventSender{
IsFromMe: user.GetUserID() == t.telegramUserID,
SenderLogin: ids.MakeUserLoginID(user.GetUserID()),
Sender: ids.MakeUserID(user.GetUserID()),
Sender: sender,
},
})
}
if len(chatInfo.Members.Members) >= t.main.Config.MemberList.NormalizedMaxInitialSync() {
if len(chatInfo.Members.MemberMap) >= t.main.Config.MemberList.NormalizedMaxInitialSync() {
break
}
}
@@ -264,7 +267,9 @@ func (t *TelegramClient) GetChatInfo(ctx context.Context, portal *bridgev2.Porta
return nil, err
}
chatInfo.Members.IsFull = len(participants.Participants) < limit
chatInfo.Members.Members = append(chatInfo.Members.Members, t.filterChannelParticipants(participants.Participants, limit)...)
for _, participant := range t.filterChannelParticipants(participants.Participants, limit) {
chatInfo.Members.MemberMap[participant.Sender] = participant
}
} else {
remaining := t.main.Config.MemberList.NormalizedMaxInitialSync()
var offset int
@@ -289,15 +294,14 @@ func (t *TelegramClient) GetChatInfo(ctx context.Context, portal *bridgev2.Porta
if err != nil {
return nil, err
}
participants, ok := p.(*tg.ChannelsChannelParticipants)
if !ok {
return nil, fmt.Errorf("returned participants is %T not *tg.ChannelsChannelParticipants", p)
}
if len(participants.Participants) == 0 {
chatInfo.Members.IsFull = true
break
}
chatInfo.Members.Members = append(chatInfo.Members.Members, t.filterChannelParticipants(participants.Participants, limit)...)
for _, participant := range t.filterChannelParticipants(participants.Participants, limit) {
chatInfo.Members.MemberMap[participant.Sender] = participant
}
offset += len(participants.Participants)
remaining -= len(participants.Participants)
+72
View File
@@ -0,0 +1,72 @@
package connector
import (
"slices"
"sync"
"github.com/gotd/td/tg"
"maunium.net/go/mautrix/bridgev2/commands"
)
var cmdSync = &commands.FullHandler{
Func: fnSync,
Name: "sync",
Help: commands.HelpMeta{
Section: commands.HelpSectionGeneral,
Description: "Synchronize your chat portals, contacts and/or own info.",
Args: "[`chats`|`contacts`|`me`]",
},
RequiresLogin: true,
}
func fnSync(ce *commands.Event) {
var only string
if len(ce.Args) > 0 {
if !slices.Contains([]string{"chats", "contacts", "me"}, ce.Args[0]) {
ce.Reply("Invalid argument. Use `chats`, `contacts` or `me`.")
return
}
only = ce.Args[0]
}
var wg sync.WaitGroup
for _, login := range ce.User.GetUserLogins() {
client := login.Client.(*TelegramClient)
if only == "" || only == "chats" {
ce.Reply("Synchronizing chats for %s...", login.ID)
wg.Add(1)
go func() {
defer wg.Done()
if err := client.SyncChats(ce.Ctx); err != nil {
ce.Reply("Failed to synchronize chats for %s: %v", login.ID, err)
}
}()
}
if only == "" || only == "contacts" {
ce.Reply("Synchronizing contacts...")
wg.Add(1)
go func() {
// TODO
ce.Reply("Contact sync is not yet implemented!")
defer wg.Done()
}()
}
if only == "" || only == "me" {
ce.Reply("Synchronizing your info...")
wg.Add(1)
go func() {
wg.Done()
if users, err := client.client.API().UsersGetUsers(ce.Ctx, []tg.InputUserClass{&tg.InputUserSelf{}}); err != nil {
ce.Reply("Failed to get your info for %s: %v", login.ID, err)
} else if len(users) == 0 {
ce.Reply("Failed to get your info for %s: no users returned", login.ID)
} else if userInfo, err := client.getUserInfoFromTelegramUser(ce.Ctx, users[0]); err != nil {
ce.Reply("Failed to get your info for %s: %v", login.ID, err)
} else if err = client.updateGhostWithUserInfo(ce.Ctx, client.telegramUserID, userInfo); err != nil {
ce.Reply("Failed to update your info for %s: %v", login.ID, err)
}
}()
}
}
wg.Wait()
}
+9
View File
@@ -42,6 +42,12 @@ type TelegramConfig struct {
IntervalSeconds int `yaml:"interval_seconds"`
TimeoutSeconds int `yaml:"timeout_seconds"`
} `yaml:"ping"`
Sync struct {
UpdateLimit int `yaml:"update_limit"`
CreateLimit int `yaml:"create_limit"`
DirectChats bool `yaml:"direct_chats"`
} `yaml:"sync"`
}
func (c TelegramConfig) ShouldBridge(participantCount int) bool {
@@ -65,6 +71,9 @@ func upgradeConfig(helper up.Helper) {
helper.Copy(up.Int, "max_member_count")
helper.Copy(up.Int, "ping", "interval_seconds")
helper.Copy(up.Int, "ping", "timeout_seconds")
helper.Copy(up.Int, "sync", "update_limit")
helper.Copy(up.Int, "sync", "create_limit")
helper.Copy(up.Bool, "sync", "direct_chats")
}
func (tg *TelegramConnector) GetConfig() (example string, data any, upgrader up.Upgrader) {
+2
View File
@@ -21,6 +21,7 @@ import (
"go.mau.fi/util/dbutil"
"maunium.net/go/mautrix/bridgev2"
"maunium.net/go/mautrix/bridgev2/commands"
"go.mau.fi/mautrix-telegram/pkg/connector/store"
)
@@ -46,6 +47,7 @@ func NewConnector() *TelegramConnector {
func (tg *TelegramConnector) Init(bridge *bridgev2.Bridge) {
tg.Store = store.NewStore(bridge.DB.Database, dbutil.ZeroLogger(bridge.Log.With().Str("db_section", "telegram").Logger()))
tg.Bridge = bridge
tg.Bridge.Commands.(*commands.Processor).AddHandlers(cmdSync)
}
func (tg *TelegramConnector) Start(ctx context.Context) error {
-4
View File
@@ -18,10 +18,6 @@ import (
var _ bridgev2.DirectMediableNetwork = (*TelegramConnector)(nil)
type getMessages interface {
GetMessages() []tg.MessageClass
}
func (tc *TelegramConnector) Download(ctx context.Context, mediaID networkid.MediaID) (mediaproxy.GetMediaResponse, error) {
info, err := ids.ParseDirectMediaInfo(mediaID)
if err != nil {
+11
View File
@@ -54,3 +54,14 @@ ping:
interval_seconds: 30
# The timeout (in seconds) for a single ping.
timeout_seconds: 10
sync:
# Number of most recently active dialogs to check when syncing chats.
# Set to 0 to remove limit.
update_limit: 0
# Number of most recently active dialogs to create portals for when syncing
# chats.
# Set to 0 to remove limit.
create_limit: 15
# Whether or not to sync and create portals for direct chats at startup.
direct_chats: false
+134
View File
@@ -0,0 +1,134 @@
package connector
import (
"context"
"fmt"
"math"
"github.com/gotd/td/tg"
"github.com/rs/zerolog"
"maunium.net/go/mautrix/bridgev2"
"maunium.net/go/mautrix/bridgev2/database"
"maunium.net/go/mautrix/bridgev2/simplevent"
"go.mau.fi/mautrix-telegram/pkg/connector/ids"
)
func (t *TelegramClient) SyncChats(ctx context.Context) error {
log := zerolog.Ctx(ctx)
limit := t.main.Config.Sync.UpdateLimit
if limit <= 0 {
limit = math.MaxInt32
}
dialogs, err := APICallWithUpdates(ctx, t, func() (tg.ModifiedMessagesDialogs, error) {
d, err := t.client.API().MessagesGetDialogs(ctx, &tg.MessagesGetDialogsRequest{
Limit: limit,
OffsetPeer: &tg.InputPeerEmpty{},
})
if err != nil {
return nil, err
} else if dialogs, ok := d.(tg.ModifiedMessagesDialogs); !ok {
return nil, fmt.Errorf("unexpected dialogs type %T", d)
} else {
return dialogs, nil
}
})
if err != nil {
return err
}
var created int
for _, d := range dialogs.GetDialogs() {
if d.TypeID() != tg.DialogTypeID {
continue
}
dialog := d.(*tg.Dialog)
log := log.With().
Stringer("peer", dialog.Peer).
Int("top_message", dialog.TopMessage).
Logger()
portalKey := ids.MakePortalKey(dialog.GetPeer(), t.loginID)
portal, err := t.main.Bridge.GetPortalByKey(ctx, portalKey)
if err != nil {
log.Err(err).Msg("Failed to get portal")
continue
}
// TODO make sure that the user isn't deleted.
if portal == nil || portal.MXID == "" {
// Check what the latest message is
messages, err := APICallWithUpdates(ctx, t, func() (tg.ModifiedMessagesMessages, error) {
inputMessages := []tg.InputMessageClass{
&tg.InputMessageID{ID: dialog.TopMessage},
}
var msgs tg.MessagesMessagesClass
switch v := dialog.Peer.(type) {
case *tg.PeerUser, *tg.PeerChat:
msgs, err = t.client.API().MessagesGetMessages(ctx, inputMessages)
case *tg.PeerChannel:
var accessHash int64
var found bool
accessHash, found, err = t.ScopedStore.GetChannelAccessHash(ctx, t.telegramUserID, v.ChannelID)
if err != nil {
return nil, fmt.Errorf("failed to get channel access hash: %w", err)
} else if !found {
return nil, fmt.Errorf("channel access hash for %d not found", v.ChannelID)
} else {
msgs, err = t.client.API().ChannelsGetMessages(ctx, &tg.ChannelsGetMessagesRequest{
Channel: &tg.InputChannel{ChannelID: v.ChannelID, AccessHash: accessHash},
ID: inputMessages,
})
}
default:
return nil, fmt.Errorf("unknown peer type %T", dialog.Peer)
}
if err != nil {
return nil, err
} else if messages, ok := msgs.(tg.ModifiedMessagesMessages); !ok {
return nil, fmt.Errorf("unsupported messages type %T", messages)
} else {
return messages, nil
}
})
if err != nil {
log.Err(err).Msg("Failed to get latest message for portal")
continue
} else if len(messages.GetMessages()) == 0 {
log.Warn().Msg("No messages found for portal")
continue
}
topMessage := messages.GetMessages()[0]
if topMessage.TypeID() == tg.MessageServiceTypeID {
action := topMessage.(*tg.MessageService).Action
if action.TypeID() == tg.MessageActionContactSignUpTypeID || action.TypeID() == tg.MessageActionHistoryClearTypeID {
log.Debug().Str("action_type", action.TypeName()).Msg("Not syncing portal because it's a contact sign up or history clear")
continue
}
}
created++ // The portal will have to be created
if created > t.main.Config.Sync.CreateLimit {
break
}
}
// TODO use the bundled backfill data?
t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{
EventMeta: simplevent.EventMeta{
Type: bridgev2.RemoteEventChatResync,
LogContext: func(c zerolog.Context) zerolog.Context {
return c.Str("update", "sync")
},
PortalKey: portalKey,
CreatePortal: true,
},
CheckNeedsBackfillFunc: func(ctx context.Context, latestMessage *database.Message) (bool, error) { return true, nil },
})
}
return nil
}
+13 -10
View File
@@ -92,30 +92,33 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, update IGetMess
case *tg.MessageActionChatDeletePhoto:
chatInfoChange.ChatInfoChange.ChatInfo = &bridgev2.ChatInfo{Avatar: &bridgev2.Avatar{Remove: true}}
case *tg.MessageActionChatAddUser:
var members []bridgev2.ChatMember
chatInfoChange.ChatInfoChange.MemberChanges = &bridgev2.ChatMemberList{
MemberMap: map[networkid.UserID]bridgev2.ChatMember{},
}
for _, userID := range action.Users {
members = append(members, bridgev2.ChatMember{
sender := ids.MakeUserID(userID)
chatInfoChange.ChatInfoChange.MemberChanges.MemberMap[sender] = bridgev2.ChatMember{
EventSender: bridgev2.EventSender{
SenderLogin: ids.MakeUserLoginID(userID),
Sender: ids.MakeUserID(userID),
Sender: sender,
},
Membership: event.MembershipJoin,
})
}
}
chatInfoChange.ChatInfoChange.MemberChanges = &bridgev2.ChatMemberList{Members: members}
case *tg.MessageActionChatJoinedByLink:
chatInfoChange.ChatInfoChange.MemberChanges = &bridgev2.ChatMemberList{
Members: []bridgev2.ChatMember{
{EventSender: sender, Membership: event.MembershipJoin},
MemberMap: map[networkid.UserID]bridgev2.ChatMember{
sender.Sender: {EventSender: sender, Membership: event.MembershipJoin},
},
}
case *tg.MessageActionChatDeleteUser:
sender := ids.MakeUserID(action.UserID)
chatInfoChange.ChatInfoChange.MemberChanges = &bridgev2.ChatMemberList{
Members: []bridgev2.ChatMember{
{
MemberMap: map[networkid.UserID]bridgev2.ChatMember{
sender: {
EventSender: bridgev2.EventSender{
SenderLogin: ids.MakeUserLoginID(action.UserID),
Sender: ids.MakeUserID(action.UserID),
Sender: sender,
},
Membership: event.MembershipLeave,
},