From a86c2c25443398d2b64e4d8efcef5473b448ecc2 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Wed, 7 Aug 2024 12:38:01 -0600 Subject: [PATCH] read receipts: bridges TG <-> Matrix Signed-off-by: Sumner Evans --- pkg/connector/chatinfo.go | 20 ++-------- pkg/connector/client.go | 17 +++++++++ pkg/connector/matrix.go | 80 ++++++++++++++++++++++++++++++++++++++- pkg/connector/telegram.go | 34 ++++++++++++++++- 4 files changed, 131 insertions(+), 20 deletions(-) diff --git a/pkg/connector/chatinfo.go b/pkg/connector/chatinfo.go index 2930513d..6f043b34 100644 --- a/pkg/connector/chatinfo.go +++ b/pkg/connector/chatinfo.go @@ -38,13 +38,7 @@ func (t *TelegramClient) getDMChatInfo(ctx context.Context, userID int64) (*brid }, UserInfo: userInfo, }, - { - EventSender: bridgev2.EventSender{ - IsFromMe: true, - SenderLogin: t.loginID, - Sender: t.userID, - }, - }, + {EventSender: t.mySender()}, } } return &chatInfo, nil @@ -58,16 +52,8 @@ func (t *TelegramClient) getGroupChatInfo(ctx context.Context, fullChat *tg.Mess chatInfo := bridgev2.ChatInfo{ Type: ptr.Ptr(database.RoomTypeGroupDM), // TODO Is this correct for channels? Members: &bridgev2.ChatMemberList{ - IsFull: true, - Members: []bridgev2.ChatMember{ - { - EventSender: bridgev2.EventSender{ - IsFromMe: true, - SenderLogin: t.loginID, - Sender: t.userID, - }, - }, - }, + IsFull: true, + Members: []bridgev2.ChatMember{{EventSender: t.mySender()}}, }, } var isBroadcastChannel bool diff --git a/pkg/connector/client.go b/pkg/connector/client.go index 3fc6a7aa..e6080f23 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -142,6 +142,15 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge dispatcher.OnChannelUserTyping(func(ctx context.Context, e tg.Entities, update *tg.UpdateChannelUserTyping) error { return client.handleTyping(ids.PeerTypeChannel.AsPortalKey(update.ChannelID, ""), update.FromID.(*tg.PeerUser).UserID, update.Action) }) + dispatcher.OnReadHistoryOutbox(func(ctx context.Context, e tg.Entities, update *tg.UpdateReadHistoryOutbox) error { + return client.updateReadReceipt(update) + }) + dispatcher.OnReadHistoryInbox(func(ctx context.Context, e tg.Entities, update *tg.UpdateReadHistoryInbox) error { + return client.onOwnReadReceipt(ids.MakePortalKey(update.Peer, login.ID), update.MaxID) + }) + dispatcher.OnReadChannelInbox(func(ctx context.Context, e tg.Entities, update *tg.UpdateReadChannelInbox) error { + return client.onOwnReadReceipt(ids.PeerTypeChannel.AsPortalKey(update.ChannelID, ""), update.MaxID) + }) client.ScopedStore = tc.Store.GetScopedStore(telegramUserID) @@ -448,3 +457,11 @@ func (t *TelegramClient) GetCapabilities(ctx context.Context, portal *bridgev2.P Reactions: true, } } + +func (t *TelegramClient) mySender() bridgev2.EventSender { + return bridgev2.EventSender{ + IsFromMe: true, + SenderLogin: t.loginID, + Sender: t.userID, + } +} diff --git a/pkg/connector/matrix.go b/pkg/connector/matrix.go index 52dd1405..692cc974 100644 --- a/pkg/connector/matrix.go +++ b/pkg/connector/matrix.go @@ -3,10 +3,12 @@ package connector import ( "context" "crypto/sha256" + "errors" "fmt" "math/rand" "strconv" "strings" + "sync" "time" "github.com/gotd/td/telegram/message" @@ -396,8 +398,82 @@ func (t *TelegramClient) HandleMatrixReactionRemove(ctx context.Context, msg *br } func (t *TelegramClient) HandleMatrixReadReceipt(ctx context.Context, msg *bridgev2.MatrixReadReceipt) error { - // TODO - return nil + peerType, id, parseErr := ids.ParsePortalID(msg.Portal.ID) + if parseErr != nil { + return parseErr + } + inputPeer, parseErr := t.inputPeerForPortalID(ctx, msg.Portal.ID) + if parseErr != nil { + return parseErr + } + + var readMentionsErr, readReactionsErr, readMessagesErr error + var wg sync.WaitGroup + + // Read mentions + wg.Add(1) + go func() { + defer wg.Done() + _, readMentionsErr = t.client.API().MessagesReadMentions(ctx, &tg.MessagesReadMentionsRequest{ + Peer: inputPeer, + }) + }() + + // Read reactions + wg.Add(1) + go func() { + defer wg.Done() + _, readMentionsErr = t.client.API().MessagesReadReactions(ctx, &tg.MessagesReadReactionsRequest{ + Peer: inputPeer, + }) + }() + + // Read messages + wg.Add(1) + go func() { + defer wg.Done() + + message := msg.ExactMessage + if message == nil { + message, readMessagesErr = t.main.Bridge.DB.Message.GetLastPartAtOrBeforeTime(ctx, msg.Portal.PortalKey, time.Now()) + if readMessagesErr != nil { + return + } + } + var maxID int + maxID, readMessagesErr = ids.ParseMessageID(message.ID) + if readMessagesErr != nil { + return + } + + switch peerType { + case ids.PeerTypeUser, ids.PeerTypeChat: + _, readMessagesErr = t.client.API().MessagesReadHistory(ctx, &tg.MessagesReadHistoryRequest{ + Peer: inputPeer, + MaxID: maxID, + }) + case ids.PeerTypeChannel: + var accessHash int64 + var found bool + accessHash, found, readMessagesErr = t.ScopedStore.GetChannelAccessHash(ctx, t.telegramUserID, id) + if readMessagesErr != nil { + return + } else if !found { + readMessagesErr = fmt.Errorf("channel access hash not found for %d", id) + return + } + _, readMessagesErr = t.client.API().ChannelsReadHistory(ctx, &tg.ChannelsReadHistoryRequest{ + Channel: &tg.InputChannel{ChannelID: id, AccessHash: accessHash}, + }) + default: + readMessagesErr = fmt.Errorf("unknown peer type %s", peerType) + } + }() + + // TODO handle sponsored message read receipts + + wg.Wait() + return errors.Join(readMentionsErr, readReactionsErr, readMessagesErr) } func (t *TelegramClient) HandleMatrixTyping(ctx context.Context, msg *bridgev2.MatrixTyping) error { diff --git a/pkg/connector/telegram.go b/pkg/connector/telegram.go index 7bf2b3d5..c120c2ae 100644 --- a/pkg/connector/telegram.go +++ b/pkg/connector/telegram.go @@ -174,7 +174,7 @@ func (t *TelegramClient) getEventSender(msg interface { GetPeerID() tg.PeerClass }) bridgev2.EventSender { if msg.GetOut() { - return bridgev2.EventSender{IsFromMe: true, SenderLogin: t.loginID, Sender: t.userID} + return t.mySender() } else if f, ok := msg.GetFromID(); ok && f.TypeID() == tg.PeerUserTypeID { from := f.(*tg.PeerUser) return bridgev2.EventSender{ @@ -305,6 +305,7 @@ func (t *TelegramClient) onMessageEdit(ctx context.Context, update IGetMessage) return nil } + func (t *TelegramClient) handleTyping(portal networkid.PortalKey, userID int64, action tg.SendMessageActionClass) error { if userID == t.telegramUserID { return nil @@ -327,6 +328,37 @@ func (t *TelegramClient) handleTyping(portal networkid.PortalKey, userID int64, return nil } +func (t *TelegramClient) updateReadReceipt(update *tg.UpdateReadHistoryOutbox) error { + user, ok := update.Peer.(*tg.PeerUser) + if !ok { + return fmt.Errorf("unsupported peer type %T", update.Peer) + } + t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Receipt{ + EventMeta: simplevent.EventMeta{ + Type: bridgev2.RemoteEventReadReceipt, + PortalKey: ids.MakePortalKey(update.Peer, t.loginID), + Sender: bridgev2.EventSender{ + SenderLogin: ids.MakeUserLoginID(user.UserID), + Sender: ids.MakeUserID(user.UserID), + }, + }, + LastTarget: ids.MakeMessageID(update.MaxID), + }) + return nil +} + +func (t *TelegramClient) onOwnReadReceipt(portalKey networkid.PortalKey, maxID int) error { + t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Receipt{ + EventMeta: simplevent.EventMeta{ + Type: bridgev2.RemoteEventReadReceipt, + PortalKey: portalKey, + Sender: t.mySender(), + }, + LastTarget: ids.MakeMessageID(maxID), + }) + return nil +} + func (t *TelegramClient) handleTelegramReactions(ctx context.Context, msg *tg.Message) { log := zerolog.Ctx(ctx).With(). Str("handler", "handle_telegram_reactions").