diff --git a/go.mod b/go.mod index b63b10e9..d6a85485 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa golang.org/x/net v0.28.0 - maunium.net/go/mautrix v0.20.1-0.20240820143721-591ac60f0caa + maunium.net/go/mautrix v0.20.1-0.20240821194048-675d176b4662 ) require ( diff --git a/go.sum b/go.sum index 7bc3c9dc..1862bf2c 100644 --- a/go.sum +++ b/go.sum @@ -114,8 +114,8 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M= maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA= -maunium.net/go/mautrix v0.20.1-0.20240820143721-591ac60f0caa h1:um7ddCVXb4wvb0pmtgoQc8GClUpmXeVYQE1BrI7gS7g= -maunium.net/go/mautrix v0.20.1-0.20240820143721-591ac60f0caa/go.mod h1:NhWZ4jpQ2CW+t6TmGrnydAIL0htdoXmGiNTdHb2PzL4= +maunium.net/go/mautrix v0.20.1-0.20240821194048-675d176b4662 h1:f7iw18KChWZtHA6dKJcm4uv3yjxzej4ToDuK8JwS4j0= +maunium.net/go/mautrix v0.20.1-0.20240821194048-675d176b4662/go.mod h1:NhWZ4jpQ2CW+t6TmGrnydAIL0htdoXmGiNTdHb2PzL4= nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0= nhooyr.io/websocket v1.8.11/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= rsc.io/qr v0.2.0 h1:6vBLea5/NRMVTz8V66gipeLycZMl/+UlFmk8DvqQ6WY= diff --git a/pkg/connector/chatinfo.go b/pkg/connector/chatinfo.go index 0509def3..85abd7cb 100644 --- a/pkg/connector/chatinfo.go +++ b/pkg/connector/chatinfo.go @@ -9,6 +9,7 @@ import ( "go.mau.fi/util/ptr" "maunium.net/go/mautrix/bridgev2" "maunium.net/go/mautrix/bridgev2/database" + "maunium.net/go/mautrix/bridgev2/networkid" "go.mau.fi/mautrix-telegram/pkg/connector/ids" "go.mau.fi/mautrix-telegram/pkg/connector/media" @@ -39,15 +40,16 @@ func (t *TelegramClient) getDMChatInfo(ctx context.Context, userID int64) (*brid } else if err = t.updateGhostWithUserInfo(ctx, userID, userInfo); err != nil { return nil, err } else { - chatInfo.Members.Members = []bridgev2.ChatMember{ - { + networkUserID := ids.MakeUserID(userID) + chatInfo.Members.MemberMap = map[networkid.UserID]bridgev2.ChatMember{ + networkUserID: { EventSender: bridgev2.EventSender{ SenderLogin: ids.MakeUserLoginID(userID), Sender: ids.MakeUserID(userID), }, UserInfo: userInfo, }, - {EventSender: t.mySender()}, + t.userID: {EventSender: t.mySender()}, } } return &chatInfo, nil @@ -74,8 +76,8 @@ func (t *TelegramClient) getGroupChatInfo(fullChat *tg.MessagesChatFull, chatID Name: name, Type: ptr.Ptr(database.RoomTypeGroupDM), // TODO Is this correct for channels? Members: &bridgev2.ChatMemberList{ - IsFull: true, - Members: []bridgev2.ChatMember{{EventSender: t.mySender()}}, + IsFull: true, + MemberMap: map[networkid.UserID]bridgev2.ChatMember{}, }, CanBackfill: true, ExtraUpdates: func(ctx context.Context, p *bridgev2.Portal) bool { @@ -178,15 +180,16 @@ func (t *TelegramClient) GetChatInfo(ctx context.Context, portal *bridgev2.Porta continue } - chatInfo.Members.Members = append(chatInfo.Members.Members, bridgev2.ChatMember{ + sender := ids.MakeUserID(user.GetUserID()) + chatInfo.Members.MemberMap[sender] = bridgev2.ChatMember{ EventSender: bridgev2.EventSender{ IsFromMe: user.GetUserID() == t.telegramUserID, SenderLogin: ids.MakeUserLoginID(user.GetUserID()), - Sender: ids.MakeUserID(user.GetUserID()), + Sender: sender, }, - }) + } - if len(chatInfo.Members.Members) >= t.main.Config.MemberList.NormalizedMaxInitialSync() { + if len(chatInfo.Members.MemberMap) >= t.main.Config.MemberList.NormalizedMaxInitialSync() { break } } @@ -264,7 +267,9 @@ func (t *TelegramClient) GetChatInfo(ctx context.Context, portal *bridgev2.Porta return nil, err } chatInfo.Members.IsFull = len(participants.Participants) < limit - chatInfo.Members.Members = append(chatInfo.Members.Members, t.filterChannelParticipants(participants.Participants, limit)...) + for _, participant := range t.filterChannelParticipants(participants.Participants, limit) { + chatInfo.Members.MemberMap[participant.Sender] = participant + } } else { remaining := t.main.Config.MemberList.NormalizedMaxInitialSync() var offset int @@ -289,15 +294,14 @@ func (t *TelegramClient) GetChatInfo(ctx context.Context, portal *bridgev2.Porta if err != nil { return nil, err } - participants, ok := p.(*tg.ChannelsChannelParticipants) - if !ok { - return nil, fmt.Errorf("returned participants is %T not *tg.ChannelsChannelParticipants", p) - } if len(participants.Participants) == 0 { chatInfo.Members.IsFull = true break } - chatInfo.Members.Members = append(chatInfo.Members.Members, t.filterChannelParticipants(participants.Participants, limit)...) + + for _, participant := range t.filterChannelParticipants(participants.Participants, limit) { + chatInfo.Members.MemberMap[participant.Sender] = participant + } offset += len(participants.Participants) remaining -= len(participants.Participants) diff --git a/pkg/connector/commands.go b/pkg/connector/commands.go new file mode 100644 index 00000000..823f6a1f --- /dev/null +++ b/pkg/connector/commands.go @@ -0,0 +1,72 @@ +package connector + +import ( + "slices" + "sync" + + "github.com/gotd/td/tg" + "maunium.net/go/mautrix/bridgev2/commands" +) + +var cmdSync = &commands.FullHandler{ + Func: fnSync, + Name: "sync", + Help: commands.HelpMeta{ + Section: commands.HelpSectionGeneral, + Description: "Synchronize your chat portals, contacts and/or own info.", + Args: "[`chats`|`contacts`|`me`]", + }, + RequiresLogin: true, +} + +func fnSync(ce *commands.Event) { + var only string + if len(ce.Args) > 0 { + if !slices.Contains([]string{"chats", "contacts", "me"}, ce.Args[0]) { + ce.Reply("Invalid argument. Use `chats`, `contacts` or `me`.") + return + } + only = ce.Args[0] + } + + var wg sync.WaitGroup + for _, login := range ce.User.GetUserLogins() { + client := login.Client.(*TelegramClient) + if only == "" || only == "chats" { + ce.Reply("Synchronizing chats for %s...", login.ID) + wg.Add(1) + go func() { + defer wg.Done() + if err := client.SyncChats(ce.Ctx); err != nil { + ce.Reply("Failed to synchronize chats for %s: %v", login.ID, err) + } + }() + } + if only == "" || only == "contacts" { + ce.Reply("Synchronizing contacts...") + wg.Add(1) + go func() { + // TODO + ce.Reply("Contact sync is not yet implemented!") + defer wg.Done() + }() + } + if only == "" || only == "me" { + ce.Reply("Synchronizing your info...") + wg.Add(1) + go func() { + wg.Done() + if users, err := client.client.API().UsersGetUsers(ce.Ctx, []tg.InputUserClass{&tg.InputUserSelf{}}); err != nil { + ce.Reply("Failed to get your info for %s: %v", login.ID, err) + } else if len(users) == 0 { + ce.Reply("Failed to get your info for %s: no users returned", login.ID) + } else if userInfo, err := client.getUserInfoFromTelegramUser(ce.Ctx, users[0]); err != nil { + ce.Reply("Failed to get your info for %s: %v", login.ID, err) + } else if err = client.updateGhostWithUserInfo(ce.Ctx, client.telegramUserID, userInfo); err != nil { + ce.Reply("Failed to update your info for %s: %v", login.ID, err) + } + }() + } + } + wg.Wait() +} diff --git a/pkg/connector/config.go b/pkg/connector/config.go index bd759805..00c5f7a9 100644 --- a/pkg/connector/config.go +++ b/pkg/connector/config.go @@ -42,6 +42,12 @@ type TelegramConfig struct { IntervalSeconds int `yaml:"interval_seconds"` TimeoutSeconds int `yaml:"timeout_seconds"` } `yaml:"ping"` + + Sync struct { + UpdateLimit int `yaml:"update_limit"` + CreateLimit int `yaml:"create_limit"` + DirectChats bool `yaml:"direct_chats"` + } `yaml:"sync"` } func (c TelegramConfig) ShouldBridge(participantCount int) bool { @@ -65,6 +71,9 @@ func upgradeConfig(helper up.Helper) { helper.Copy(up.Int, "max_member_count") helper.Copy(up.Int, "ping", "interval_seconds") helper.Copy(up.Int, "ping", "timeout_seconds") + helper.Copy(up.Int, "sync", "update_limit") + helper.Copy(up.Int, "sync", "create_limit") + helper.Copy(up.Bool, "sync", "direct_chats") } func (tg *TelegramConnector) GetConfig() (example string, data any, upgrader up.Upgrader) { diff --git a/pkg/connector/connector.go b/pkg/connector/connector.go index 37c0ba0d..1fcda746 100644 --- a/pkg/connector/connector.go +++ b/pkg/connector/connector.go @@ -21,6 +21,7 @@ import ( "go.mau.fi/util/dbutil" "maunium.net/go/mautrix/bridgev2" + "maunium.net/go/mautrix/bridgev2/commands" "go.mau.fi/mautrix-telegram/pkg/connector/store" ) @@ -46,6 +47,7 @@ func NewConnector() *TelegramConnector { func (tg *TelegramConnector) Init(bridge *bridgev2.Bridge) { tg.Store = store.NewStore(bridge.DB.Database, dbutil.ZeroLogger(bridge.Log.With().Str("db_section", "telegram").Logger())) tg.Bridge = bridge + tg.Bridge.Commands.(*commands.Processor).AddHandlers(cmdSync) } func (tg *TelegramConnector) Start(ctx context.Context) error { diff --git a/pkg/connector/directdownload.go b/pkg/connector/directdownload.go index 9a1fa5c5..49f64b0a 100644 --- a/pkg/connector/directdownload.go +++ b/pkg/connector/directdownload.go @@ -18,10 +18,6 @@ import ( var _ bridgev2.DirectMediableNetwork = (*TelegramConnector)(nil) -type getMessages interface { - GetMessages() []tg.MessageClass -} - func (tc *TelegramConnector) Download(ctx context.Context, mediaID networkid.MediaID) (mediaproxy.GetMediaResponse, error) { info, err := ids.ParseDirectMediaInfo(mediaID) if err != nil { diff --git a/pkg/connector/example-config.yaml b/pkg/connector/example-config.yaml index 6d155992..22798b51 100644 --- a/pkg/connector/example-config.yaml +++ b/pkg/connector/example-config.yaml @@ -54,3 +54,14 @@ ping: interval_seconds: 30 # The timeout (in seconds) for a single ping. timeout_seconds: 10 + +sync: + # Number of most recently active dialogs to check when syncing chats. + # Set to 0 to remove limit. + update_limit: 0 + # Number of most recently active dialogs to create portals for when syncing + # chats. + # Set to 0 to remove limit. + create_limit: 15 + # Whether or not to sync and create portals for direct chats at startup. + direct_chats: false diff --git a/pkg/connector/sync.go b/pkg/connector/sync.go new file mode 100644 index 00000000..a3671fed --- /dev/null +++ b/pkg/connector/sync.go @@ -0,0 +1,134 @@ +package connector + +import ( + "context" + "fmt" + "math" + + "github.com/gotd/td/tg" + "github.com/rs/zerolog" + "maunium.net/go/mautrix/bridgev2" + "maunium.net/go/mautrix/bridgev2/database" + "maunium.net/go/mautrix/bridgev2/simplevent" + + "go.mau.fi/mautrix-telegram/pkg/connector/ids" +) + +func (t *TelegramClient) SyncChats(ctx context.Context) error { + log := zerolog.Ctx(ctx) + + limit := t.main.Config.Sync.UpdateLimit + if limit <= 0 { + limit = math.MaxInt32 + } + + dialogs, err := APICallWithUpdates(ctx, t, func() (tg.ModifiedMessagesDialogs, error) { + d, err := t.client.API().MessagesGetDialogs(ctx, &tg.MessagesGetDialogsRequest{ + Limit: limit, + OffsetPeer: &tg.InputPeerEmpty{}, + }) + if err != nil { + return nil, err + } else if dialogs, ok := d.(tg.ModifiedMessagesDialogs); !ok { + return nil, fmt.Errorf("unexpected dialogs type %T", d) + } else { + return dialogs, nil + } + }) + if err != nil { + return err + } + + var created int + for _, d := range dialogs.GetDialogs() { + if d.TypeID() != tg.DialogTypeID { + continue + } + dialog := d.(*tg.Dialog) + + log := log.With(). + Stringer("peer", dialog.Peer). + Int("top_message", dialog.TopMessage). + Logger() + + portalKey := ids.MakePortalKey(dialog.GetPeer(), t.loginID) + portal, err := t.main.Bridge.GetPortalByKey(ctx, portalKey) + if err != nil { + log.Err(err).Msg("Failed to get portal") + continue + } + + // TODO make sure that the user isn't deleted. + + if portal == nil || portal.MXID == "" { + // Check what the latest message is + messages, err := APICallWithUpdates(ctx, t, func() (tg.ModifiedMessagesMessages, error) { + inputMessages := []tg.InputMessageClass{ + &tg.InputMessageID{ID: dialog.TopMessage}, + } + var msgs tg.MessagesMessagesClass + switch v := dialog.Peer.(type) { + case *tg.PeerUser, *tg.PeerChat: + msgs, err = t.client.API().MessagesGetMessages(ctx, inputMessages) + case *tg.PeerChannel: + var accessHash int64 + var found bool + accessHash, found, err = t.ScopedStore.GetChannelAccessHash(ctx, t.telegramUserID, v.ChannelID) + if err != nil { + return nil, fmt.Errorf("failed to get channel access hash: %w", err) + } else if !found { + return nil, fmt.Errorf("channel access hash for %d not found", v.ChannelID) + } else { + msgs, err = t.client.API().ChannelsGetMessages(ctx, &tg.ChannelsGetMessagesRequest{ + Channel: &tg.InputChannel{ChannelID: v.ChannelID, AccessHash: accessHash}, + ID: inputMessages, + }) + } + default: + return nil, fmt.Errorf("unknown peer type %T", dialog.Peer) + } + if err != nil { + return nil, err + } else if messages, ok := msgs.(tg.ModifiedMessagesMessages); !ok { + return nil, fmt.Errorf("unsupported messages type %T", messages) + } else { + return messages, nil + } + }) + if err != nil { + log.Err(err).Msg("Failed to get latest message for portal") + continue + } else if len(messages.GetMessages()) == 0 { + log.Warn().Msg("No messages found for portal") + continue + } + topMessage := messages.GetMessages()[0] + if topMessage.TypeID() == tg.MessageServiceTypeID { + action := topMessage.(*tg.MessageService).Action + if action.TypeID() == tg.MessageActionContactSignUpTypeID || action.TypeID() == tg.MessageActionHistoryClearTypeID { + log.Debug().Str("action_type", action.TypeName()).Msg("Not syncing portal because it's a contact sign up or history clear") + continue + } + } + + created++ // The portal will have to be created + if created > t.main.Config.Sync.CreateLimit { + break + } + } + + // TODO use the bundled backfill data? + t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ + EventMeta: simplevent.EventMeta{ + Type: bridgev2.RemoteEventChatResync, + LogContext: func(c zerolog.Context) zerolog.Context { + return c.Str("update", "sync") + }, + PortalKey: portalKey, + CreatePortal: true, + }, + CheckNeedsBackfillFunc: func(ctx context.Context, latestMessage *database.Message) (bool, error) { return true, nil }, + }) + } + return nil +} diff --git a/pkg/connector/telegram.go b/pkg/connector/telegram.go index 04b68d22..19908c06 100644 --- a/pkg/connector/telegram.go +++ b/pkg/connector/telegram.go @@ -92,30 +92,33 @@ func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, update IGetMess case *tg.MessageActionChatDeletePhoto: chatInfoChange.ChatInfoChange.ChatInfo = &bridgev2.ChatInfo{Avatar: &bridgev2.Avatar{Remove: true}} case *tg.MessageActionChatAddUser: - var members []bridgev2.ChatMember + chatInfoChange.ChatInfoChange.MemberChanges = &bridgev2.ChatMemberList{ + MemberMap: map[networkid.UserID]bridgev2.ChatMember{}, + } for _, userID := range action.Users { - members = append(members, bridgev2.ChatMember{ + sender := ids.MakeUserID(userID) + chatInfoChange.ChatInfoChange.MemberChanges.MemberMap[sender] = bridgev2.ChatMember{ EventSender: bridgev2.EventSender{ SenderLogin: ids.MakeUserLoginID(userID), - Sender: ids.MakeUserID(userID), + Sender: sender, }, Membership: event.MembershipJoin, - }) + } } - chatInfoChange.ChatInfoChange.MemberChanges = &bridgev2.ChatMemberList{Members: members} case *tg.MessageActionChatJoinedByLink: chatInfoChange.ChatInfoChange.MemberChanges = &bridgev2.ChatMemberList{ - Members: []bridgev2.ChatMember{ - {EventSender: sender, Membership: event.MembershipJoin}, + MemberMap: map[networkid.UserID]bridgev2.ChatMember{ + sender.Sender: {EventSender: sender, Membership: event.MembershipJoin}, }, } case *tg.MessageActionChatDeleteUser: + sender := ids.MakeUserID(action.UserID) chatInfoChange.ChatInfoChange.MemberChanges = &bridgev2.ChatMemberList{ - Members: []bridgev2.ChatMember{ - { + MemberMap: map[networkid.UserID]bridgev2.ChatMember{ + sender: { EventSender: bridgev2.EventSender{ SenderLogin: ids.MakeUserLoginID(action.UserID), - Sender: ids.MakeUserID(action.UserID), + Sender: sender, }, Membership: event.MembershipLeave, },