From d108ac5d949f06575301658218cb35295e9878b9 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Thu, 30 Dec 2021 17:43:45 +0200 Subject: [PATCH] Add support for Telegram->Matrix reactions --- ROADMAP.md | 6 +- mautrix_telegram/abstract_user.py | 9 + mautrix_telegram/config.py | 3 - mautrix_telegram/db/__init__.py | 4 +- mautrix_telegram/db/message.py | 27 ++- mautrix_telegram/db/reaction.py | 84 ++++++++ mautrix_telegram/db/upgrade/__init__.py | 2 +- mautrix_telegram/db/upgrade/v03_reactions.py | 37 ++++ mautrix_telegram/example-config.yaml | 10 - mautrix_telegram/portal.py | 201 ++++++++++++++++-- mautrix_telegram/portal_util/__init__.py | 2 +- mautrix_telegram/portal_util/deduplication.py | 122 ++++++----- mautrix_telegram/portal_util/send_lock.py | 13 ++ 13 files changed, 427 insertions(+), 93 deletions(-) create mode 100644 mautrix_telegram/db/reaction.py create mode 100644 mautrix_telegram/db/upgrade/v03_reactions.py diff --git a/ROADMAP.md b/ROADMAP.md index 918fbe7a..336fb935 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -3,6 +3,7 @@ * Matrix → Telegram * [x] Message content (text, formatting, files, etc..) * [x] Message redactions + * [ ] Message reactions * [x] Message edits * [ ] ‡ Message history * [x] Presence @@ -12,8 +13,8 @@ * [x] Power level * [x] Normal chats * [ ] Non-hardcoded PL requirements - * [x] Supergroups/channels - * [ ] Precise bridging (non-hardcoded PL requirements, bridge specific permissions, etc..) + * [x] Supergroups/channels + * [ ] Precise bridging (non-hardcoded PL requirements, bridge specific permissions, etc..) * [x] Membership actions (invite/kick/join/leave) * [x] Room metadata changes (name, topic, avatar) * [x] Initial room metadata @@ -27,6 +28,7 @@ * [x] Games * [ ] Buttons * [x] Message deletions + * [x] Message reactions * [x] Message edits * [x] Message history * [x] Manually (`!tg backfill`) diff --git a/mautrix_telegram/abstract_user.py b/mautrix_telegram/abstract_user.py index 3799512f..39270496 100644 --- a/mautrix_telegram/abstract_user.py +++ b/mautrix_telegram/abstract_user.py @@ -46,6 +46,7 @@ from telethon.tl.types import ( UpdateEditChannelMessage, UpdateEditMessage, UpdateFolderPeers, + UpdateMessageReactions, UpdateNewChannelMessage, UpdateNewMessage, UpdateNotifySettings, @@ -312,6 +313,8 @@ class AbstractUser(ABC): await self.delete_message(update) elif isinstance(update, UpdateDeleteChannelMessages): await self.delete_channel_message(update) + elif isinstance(update, UpdateMessageReactions): + await self.update_reactions(update) elif isinstance(update, (UpdateChatUserTyping, UpdateChannelUserTyping, UpdateUserTyping)): await self.update_typing(update) elif isinstance(update, UpdateUserStatus): @@ -559,6 +562,12 @@ class AbstractUser(ABC): await message.delete() await self._try_redact(message) + async def update_reactions(self, update: UpdateMessageReactions) -> None: + portal = await po.Portal.get_by_entity(update.peer, tg_receiver=self.tgid) + if not portal or not portal.mxid or not portal.allow_bridging: + return + await portal.handle_telegram_reactions(self, TelegramID(update.msg_id), update.reactions) + async def update_message(self, original_update: UpdateMessage) -> None: update, sender, portal = await self.get_message_details(original_update) if not portal: diff --git a/mautrix_telegram/config.py b/mautrix_telegram/config.py index 08f00f72..1fc93a3d 100644 --- a/mautrix_telegram/config.py +++ b/mautrix_telegram/config.py @@ -178,9 +178,6 @@ class Config(BaseBridgeConfig): else: copy("bridge.bridge_notices") - copy("bridge.deduplication.pre_db_check") - copy("bridge.deduplication.cache_queue_length") - if "bridge.message_formats.m_text" in self: del self["bridge.message_formats"] copy_dict("bridge.message_formats", override_existing_map=False) diff --git a/mautrix_telegram/db/__init__.py b/mautrix_telegram/db/__init__.py index ebb94fcf..119ebdbd 100644 --- a/mautrix_telegram/db/__init__.py +++ b/mautrix_telegram/db/__init__.py @@ -19,6 +19,7 @@ from .bot_chat import BotChat from .message import Message from .portal import Portal from .puppet import Puppet +from .reaction import Reaction from .telegram_file import TelegramFile from .telethon_session import PgSession from .upgrade import upgrade_table @@ -26,7 +27,7 @@ from .user import User def init(db: Database) -> None: - for table in (Portal, Message, User, Puppet, TelegramFile, BotChat, PgSession): + for table in (Portal, Message, Reaction, User, Puppet, TelegramFile, BotChat, PgSession): table.db = db @@ -35,6 +36,7 @@ __all__ = [ "init", "Portal", "Message", + "Reaction", "User", "Puppet", "TelegramFile", diff --git a/mautrix_telegram/db/message.py b/mautrix_telegram/db/message.py index dcbd0a5e..f1625fbc 100644 --- a/mautrix_telegram/db/message.py +++ b/mautrix_telegram/db/message.py @@ -38,6 +38,7 @@ class Message: tg_space: TelegramID edit_index: int redacted: bool = False + content_hash: bytes | None = None @classmethod def _from_row(cls, row: Record | None) -> Message | None: @@ -45,7 +46,7 @@ class Message: return None return cls(**row) - columns: ClassVar[str] = "mxid, mx_room, tgid, tg_space, edit_index, redacted" + columns: ClassVar[str] = "mxid, mx_room, tgid, tg_space, edit_index, redacted, content_hash" @classmethod async def get_all_by_tgid(cls, tgid: TelegramID, tg_space: TelegramID) -> list[Message]: @@ -142,14 +143,24 @@ class Message: q = "UPDATE message SET mxid=$1 WHERE mxid=$2 AND mx_room=$3" await cls.db.execute(q, real_mxid, temp_mxid, mx_room) + @property + def _values(self): + return ( + self.mxid, + self.mx_room, + self.tgid, + self.tg_space, + self.edit_index, + self.redacted, + self.content_hash, + ) + async def insert(self) -> None: - q = ( - "INSERT INTO message (mxid, mx_room, tgid, tg_space, edit_index, redacted) " - "VALUES ($1, $2, $3, $4, $5, $6)" - ) - await self.db.execute( - q, self.mxid, self.mx_room, self.tgid, self.tg_space, self.edit_index, self.redacted - ) + q = """ + INSERT INTO message (mxid, mx_room, tgid, tg_space, edit_index, redacted, content_hash) + VALUES ($1, $2, $3, $4, $5, $6, $7) + """ + await self.db.execute(q, *self._values) async def delete(self) -> None: q = "DELETE FROM message WHERE mxid=$1 AND mx_room=$2 AND tg_space=$3" diff --git a/mautrix_telegram/db/reaction.py b/mautrix_telegram/db/reaction.py new file mode 100644 index 00000000..71c49dd7 --- /dev/null +++ b/mautrix_telegram/db/reaction.py @@ -0,0 +1,84 @@ +# mautrix-telegram - A Matrix-Telegram puppeting bridge +# Copyright (C) 2021 Tulir Asokan +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +from __future__ import annotations + +from typing import TYPE_CHECKING, ClassVar + +from asyncpg import Record +from attr import dataclass + +from mautrix.types import EventID, RoomID +from mautrix.util.async_db import Database + +from ..types import TelegramID + +fake_db = Database.create("") if TYPE_CHECKING else None + + +@dataclass +class Reaction: + db: ClassVar[Database] = fake_db + + mxid: EventID + mx_room: RoomID + msg_mxid: EventID + tg_sender: TelegramID + reaction: str + + @classmethod + def _from_row(cls, row: Record | None) -> Reaction | None: + if row is None: + return None + return cls(**row) + + columns: ClassVar[str] = "mxid, mx_room, msg_mxid, tg_sender, reaction" + + @classmethod + async def delete_all(cls, mx_room: RoomID) -> None: + await cls.db.execute("DELETE FROM reaction WHERE mx_room=$1", mx_room) + + @classmethod + async def get_by_mxid(cls, mxid: EventID, mx_room: RoomID) -> Reaction | None: + q = f"SELECT {cls.columns} FROM reaction WHERE mxid=$1 AND mx_room=$2" + return cls._from_row(await cls.db.fetchrow(q, mxid, mx_room)) + + @classmethod + async def get_all_by_message(cls, mxid: EventID, mx_room: RoomID) -> list[Reaction]: + q = f"SELECT {cls.columns} FROM reaction WHERE msg_mxid=$1 AND mx_room=$2" + rows = await cls.db.fetch(q, mxid, mx_room) + return [cls._from_row(row) for row in rows] + + @property + def _values(self): + return ( + self.mxid, + self.mx_room, + self.msg_mxid, + self.tg_sender, + self.reaction, + ) + + async def save(self) -> None: + q = """ + INSERT INTO reaction (mxid, mx_room, msg_mxid, tg_sender, reaction) + VALUES ($1, $2, $3, $4, $5) ON CONFLICT (msg_mxid, mx_room, tg_sender) + DO UPDATE SET mxid=$1, reaction=$5 + """ + await self.db.execute(q, *self._values) + + async def delete(self) -> None: + q = "DELETE FROM reaction WHERE msg_mxid=$1 AND mx_room=$2 AND tg_sender=$3" + await self.db.execute(q, self.msg_mxid, self.mx_room, self.tg_sender) diff --git a/mautrix_telegram/db/upgrade/__init__.py b/mautrix_telegram/db/upgrade/__init__.py index 2146830c..18875090 100644 --- a/mautrix_telegram/db/upgrade/__init__.py +++ b/mautrix_telegram/db/upgrade/__init__.py @@ -2,4 +2,4 @@ from mautrix.util.async_db import UpgradeTable upgrade_table = UpgradeTable() -from . import v01_initial_revision, v02_sponsored_events +from . import v01_initial_revision, v02_sponsored_events, v03_reactions diff --git a/mautrix_telegram/db/upgrade/v03_reactions.py b/mautrix_telegram/db/upgrade/v03_reactions.py new file mode 100644 index 00000000..e9edd67d --- /dev/null +++ b/mautrix_telegram/db/upgrade/v03_reactions.py @@ -0,0 +1,37 @@ +# mautrix-telegram - A Matrix-Telegram puppeting bridge +# Copyright (C) 2021 Tulir Asokan +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +from asyncpg import Connection + +from . import upgrade_table + + +@upgrade_table.register(description="Add support for reactions") +async def upgrade_v3(conn: Connection) -> None: + await conn.execute( + """CREATE TABLE reaction ( + mxid TEXT NOT NULL, + mx_room TEXT NOT NULL, + msg_mxid TEXT NOT NULL, + tg_sender BIGINT, + reaction TEXT NOT NULL, + + PRIMARY KEY (msg_mxid, mx_room, tg_sender), + UNIQUE (mxid, mx_room) + )""" + ) + await conn.execute("ALTER TABLE message ALTER COLUMN mxid SET NOT NULL") + await conn.execute("ALTER TABLE message ALTER COLUMN mx_room SET NOT NULL") + await conn.execute("ALTER TABLE message ADD COLUMN content_hash bytea") diff --git a/mautrix_telegram/example-config.yaml b/mautrix_telegram/example-config.yaml index cfc59e2f..054156fb 100644 --- a/mautrix_telegram/example-config.yaml +++ b/mautrix_telegram/example-config.yaml @@ -331,16 +331,6 @@ bridge: exceptions: - "@importantbot:example.com" - # Some config options related to Telegram message deduplication. - # The default values are usually fine, but some debug messages/warnings might recommend you - # change these. - deduplication: - # Whether or not to check the database if the message about to be sent is a duplicate. - pre_db_check: false - # The number of latest events to keep when checking for duplicates. - # You might need to increase this on high-traffic bridge instances. - cache_queue_length: 20 - # The formats to use when sending messages to Telegram via the relay bot. # Text msgtypes (m.text, m.notice and m.emote) support HTML, media msgtypes don't. # diff --git a/mautrix_telegram/portal.py b/mautrix_telegram/portal.py index de33d942..1b55798c 100644 --- a/mautrix_telegram/portal.py +++ b/mautrix_telegram/portal.py @@ -63,6 +63,7 @@ from telethon.tl.functions.messages import ( EditChatPhotoRequest, EditChatTitleRequest, ExportChatInviteRequest, + GetMessageReactionsListRequest, MigrateChatRequest, SetTypingRequest, UnpinAllMessagesRequest, @@ -112,6 +113,8 @@ from telethon.tl.types import ( MessageMediaPhoto, MessageMediaPoll, MessageMediaUnsupported, + MessageReactions, + MessageUserReaction, PeerChannel, PeerChat, PeerUser, @@ -122,6 +125,7 @@ from telethon.tl.types import ( PhotoSizeEmpty, PhotoSizeProgressive, Poll, + ReactionCount, SendMessageCancelAction, SendMessageTypingAction, SponsoredMessage, @@ -177,13 +181,19 @@ from mautrix.types import ( UserID, VideoInfo, ) +from mautrix.util import variation_selector from mautrix.util.message_send_checkpoint import MessageSendCheckpointStatus from mautrix.util.simple_lock import SimpleLock from mautrix.util.simple_template import SimpleTemplate from . import abstract_user as au, formatter, portal_util as putil, puppet as p, user as u, util from .config import Config -from .db import Message as DBMessage, Portal as DBPortal, TelegramFile as DBTelegramFile +from .db import ( + Message as DBMessage, + Portal as DBPortal, + Reaction as DBReaction, + TelegramFile as DBTelegramFile, +) from .tgclient import MautrixTelegramClient from .types import TelegramID from .util import sane_mimetypes @@ -248,6 +258,7 @@ class Portal(DBPortal, BasePortal): dedup: putil.PortalDedup send_lock: putil.PortalSendLock + reaction_lock: putil.PortalReactionLock _pin_lock: asyncio.Lock _main_intent: IntentAPI | None @@ -307,6 +318,7 @@ class Portal(DBPortal, BasePortal): self.dedup = putil.PortalDedup(self) self.send_lock = putil.PortalSendLock() + self.reaction_lock = putil.PortalReactionLock() self._pin_lock = asyncio.Lock() self._room_create_lock = asyncio.Lock() @@ -405,10 +417,6 @@ class Portal(DBPortal, BasePortal): ) NotificationDisabler.puppet_cls = p.Puppet NotificationDisabler.config_enabled = cls.config["bridge.backfill.disable_notifications"] - putil.PortalDedup.dedup_pre_db_check = cls.config["bridge.deduplication.pre_db_check"] - putil.PortalDedup.dedup_cache_queue_length = cls.config[ - "bridge.deduplication.cache_queue_length" - ] # endregion # region Matrix -> Telegram metadata @@ -1648,7 +1656,7 @@ class Portal(DBPortal, BasePortal): self, event_id: EventID, space: TelegramID, edit_index: int, response: TypeMessage ) -> None: self.log.trace("Handled Matrix message: %s", response) - self.dedup.check(response, (event_id, space), force_hash=edit_index != 0) + event_hash, _ = self.dedup.check(response, (event_id, space), force_hash=edit_index != 0) if edit_index < 0: prev_edit = await DBMessage.get_one_by_tgid(TelegramID(response.id), space, -1) edit_index = prev_edit.edit_index + 1 @@ -1658,6 +1666,7 @@ class Portal(DBPortal, BasePortal): mx_room=self.mxid, mxid=event_id, edit_index=edit_index, + content_hash=event_hash, ).insert() async def _send_bridge_error( @@ -2395,13 +2404,18 @@ class Portal(DBPortal, BasePortal): self.log.debug("Ignoring game message edit event") return + if self.peer_type != "channel" and isinstance(evt, Message) and evt.reactions is not None: + asyncio.create_task( + self.try_handle_telegram_reactions(source, TelegramID(evt.id), evt.reactions) + ) + async with self.send_lock(sender.tgid if sender else None, required=False): tg_space = self.tgid if self.peer_type == "channel" else source.tgid temporary_identifier = EventID( f"${random.randint(1000000000000, 9999999999999)}TGBRIDGEDITEMP" ) - duplicate_found = self.dedup.check( + event_hash, duplicate_found = self.dedup.check( evt, (temporary_identifier, tg_space), force_hash=True ) if duplicate_found: @@ -2418,6 +2432,7 @@ class Portal(DBPortal, BasePortal): tg_space=tg_space, tgid=TelegramID(evt.id), edit_index=prev_edit_msg.edit_index + 1, + content_hash=event_hash, ).insert() return @@ -2431,6 +2446,15 @@ class Portal(DBPortal, BasePortal): "in database." ) return + prev_edit_msg = ( + await DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space, -1) or editing_msg + ) + if prev_edit_msg.content_hash == event_hash: + self.log.debug( + f"Ignoring edit of message {evt.id}@{tg_space} (src {source.tgid}):" + " content hash didn't change" + ) + return content.msgtype = ( MessageType.NOTICE @@ -2444,15 +2468,13 @@ class Portal(DBPortal, BasePortal): await intent.set_typing(self.mxid, is_typing=False) event_id = await self._send_message(intent, content) - prev_edit_msg = ( - await DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space, -1) or editing_msg - ) await DBMessage( mxid=event_id, mx_room=self.mxid, tg_space=tg_space, tgid=TelegramID(evt.id), edit_index=prev_edit_msg.edit_index + 1, + content_hash=event_hash, ).insert() await DBMessage.replace_temp_mxid(temporary_identifier, self.mxid, event_id) @@ -2587,6 +2609,139 @@ class Portal(DBPortal, BasePortal): count += 1 return count + def _split_dm_reaction_counts(self, counts: list[ReactionCount]) -> list[MessageUserReaction]: + if len(counts) == 1: + item = counts[0] + if item.count == 2: + return [ + MessageUserReaction(reaction=item.reaction, user_id=self.tgid), + MessageUserReaction(reaction=item.reaction, user_id=self.tg_receiver), + ] + elif item.count == 1: + return [ + MessageUserReaction( + reaction=item.reaction, + user_id=self.tg_receiver if item.chosen else self.tgid, + ), + ] + elif len(counts) == 2: + item1, item2 = counts + return [ + MessageUserReaction( + reaction=item1.reaction, + user_id=self.tg_receiver if item1.chosen else self.tgid, + ), + MessageUserReaction( + reaction=item2.reaction, + user_id=self.tg_receiver if item2.chosen else self.tgid, + ), + ] + return [] + + async def try_handle_telegram_reactions( + self, + source: au.AbstractUser, + msg_id: TelegramID, + data: MessageReactions, + dbm: DBMessage | None = None, + ) -> None: + try: + await self.handle_telegram_reactions(source, msg_id, data, dbm) + except Exception: + self.log.exception(f"Error handling reactions in message {msg_id}") + + async def handle_telegram_reactions( + self, + source: au.AbstractUser, + msg_id: TelegramID, + data: MessageReactions, + dbm: DBMessage | None = None, + ) -> None: + if self.peer_type == "channel" and not self.megagroup: + # We don't know who reacted in a channel, so we can't bridge it properly either + return + + tg_space = self.tgid if self.peer_type == "channel" else source.tgid + if dbm is None: + dbm = await DBMessage.get_one_by_tgid(msg_id, tg_space) + if dbm is None: + return + + total_count = sum(item.count for item in data.results) + recent_reactions = data.recent_reactons or [] + if not recent_reactions and total_count > 0: + if self.peer_type == "user": + recent_reactions = self._split_dm_reaction_counts(data.results) + elif source.is_bot: + # Can't fetch exact reaction senders as a bot + return + else: + # TODO this doesn't work for some reason + return + # resp = await source.client( + # GetMessageReactionsListRequest(peer=self.peer, id=dbm.tgid, limit=20) + # ) + # recent_reactions = resp.reactions + + async with self.reaction_lock(dbm.mxid): + await self._handle_telegram_reactions_locked(dbm, recent_reactions, total_count) + + async def _handle_telegram_reactions_locked( + self, msg: DBMessage, reaction_list: list[MessageUserReaction], total_count: int + ) -> None: + reactions = {reaction.user_id: reaction.reaction for reaction in reaction_list} + is_full = len(reactions) == total_count + + existing_reactions = await DBReaction.get_all_by_message(msg.mxid, msg.mx_room) + + removed: list[DBReaction] = [] + changed: list[tuple[DBReaction, str]] = [] + for existing_reaction in existing_reactions: + new_reaction = reactions.get(existing_reaction.tg_sender) + if new_reaction is None: + if is_full: + removed.append(existing_reaction) + # else: assume the reaction is still there, too much effort to fetch it + elif new_reaction == existing_reaction.reaction: + reactions.pop(existing_reaction.tg_sender) + else: + changed.append((existing_reaction, new_reaction)) + + for sender, new_emoji in reactions.items(): + self.log.debug(f"Bridging reaction {new_emoji} by {sender} to {msg.tgid}") + puppet: p.Puppet = await p.Puppet.get_by_tgid(sender) + mxid = await puppet.intent_for(self).react( + msg.mx_room, msg.mxid, variation_selector.add(new_emoji) + ) + await DBReaction( + mxid=mxid, + mx_room=msg.mx_room, + msg_mxid=msg.mxid, + tg_sender=sender, + reaction=new_emoji, + ).save() + for removed_reaction in removed: + self.log.debug( + f"Removing reaction {removed_reaction.reaction} by {removed_reaction.tg_sender} " + f"to {msg.tgid}" + ) + puppet = await p.Puppet.get_by_tgid(removed_reaction.tg_sender) + await puppet.intent_for(self).redact(removed_reaction.mx_room, removed_reaction.mxid) + await removed_reaction.delete() + for changed_reaction, new_emoji in changed: + self.log.debug( + f"Updating reaction {changed_reaction.reaction} -> {new_emoji} " + f"by {changed_reaction.tg_sender} to {msg.tgid}" + ) + puppet = await p.Puppet.get_by_tgid(changed_reaction.tg_sender) + intent = puppet.intent_for(self) + await intent.redact(changed_reaction.mx_room, changed_reaction.mxid) + changed_reaction.mxid = await intent.react( + msg.mx_room, msg.mxid, variation_selector.add(new_emoji) + ) + changed_reaction.reaction = new_emoji + await changed_reaction.save() + async def handle_telegram_message( self, source: au.AbstractUser, sender: p.Puppet, evt: Message ) -> None: @@ -2613,7 +2768,7 @@ class Portal(DBPortal, BasePortal): temporary_identifier = EventID( f"${random.randint(1000000000000, 9999999999999)}TGBRIDGETEMP" ) - duplicate_found = self.dedup.check(evt, (temporary_identifier, tg_space)) + event_hash, duplicate_found = self.dedup.check(evt, (temporary_identifier, tg_space)) if duplicate_found: mxid, other_tg_space = duplicate_found self.log.debug( @@ -2627,17 +2782,16 @@ class Portal(DBPortal, BasePortal): mxid=mxid, tg_space=tg_space, edit_index=0, + content_hash=event_hash, ).insert() return - if self.backfill_lock.locked or (self.dedup.pre_db_check and self.peer_type == "channel"): + if self.backfill_lock.locked or self.peer_type == "channel": msg = await DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space) if msg: self.log.debug( f"Ignoring message {evt.id} (src {source.tgid}) as it was already " - f"handled into {msg.mxid}. This duplicate was catched in the db " - "check. If you get this message often, consider increasing " - "bridge.deduplication.cache_queue_length in the config." + f"handled into {msg.mxid}." ) return @@ -2702,7 +2856,10 @@ class Portal(DBPortal, BasePortal): self._new_messages_after_sponsored = True - prev_id = self.dedup.update(evt, (event_id, tg_space), (temporary_identifier, tg_space)) + another_event_hash, prev_id = self.dedup.update( + evt, (event_id, tg_space), (temporary_identifier, tg_space) + ) + assert another_event_hash == event_hash if prev_id: self.log.debug( f"Sent message {evt.id}@{tg_space} to Matrix as {event_id}. " @@ -2717,13 +2874,15 @@ class Portal(DBPortal, BasePortal): self.log.debug("Handled telegram message %d -> %s", evt.id, event_id) try: - await DBMessage( + dbm = DBMessage( tgid=TelegramID(evt.id), mx_room=self.mxid, mxid=event_id, tg_space=tg_space, edit_index=0, - ).insert() + content_hash=event_hash, + ) + await dbm.insert() await DBMessage.replace_temp_mxid(temporary_identifier, self.mxid, event_id) except (IntegrityError, UniqueViolationError) as e: self.log.exception( @@ -2733,6 +2892,11 @@ class Portal(DBPortal, BasePortal): "pre_db_check in the config." ) await intent.redact(self.mxid, event_id) + return + if isinstance(evt, Message) and evt.reactions: + asyncio.create_task( + self.try_handle_telegram_reactions(source, dbm.tgid, evt.reactions, dbm=dbm) + ) await self._send_delivery_receipt(event_id) async def _create_room_on_action( @@ -2956,6 +3120,7 @@ class Portal(DBPortal, BasePortal): pass await super().delete() await DBMessage.delete_all(self.mxid) + await DBReaction.delete_all(self.mxid) self.deleted = True # endregion diff --git a/mautrix_telegram/portal_util/__init__.py b/mautrix_telegram/portal_util/__init__.py index 20e0ec41..e79ffb3b 100644 --- a/mautrix_telegram/portal_util/__init__.py +++ b/mautrix_telegram/portal_util/__init__.py @@ -2,5 +2,5 @@ from .deduplication import PortalDedup from .media_fallback import make_contact_event_content, make_dice_event_content from .participants import get_users from .power_levels import get_base_power_levels, participants_to_power_levels -from .send_lock import PortalSendLock +from .send_lock import PortalReactionLock, PortalSendLock from .sponsored_message import get_sponsored_message, make_sponsored_message_content diff --git a/mautrix_telegram/portal_util/deduplication.py b/mautrix_telegram/portal_util/deduplication.py index 0c038e9d..bd3702da 100644 --- a/mautrix_telegram/portal_util/deduplication.py +++ b/mautrix_telegram/portal_util/deduplication.py @@ -15,20 +15,30 @@ # along with this program. If not, see . from __future__ import annotations -from typing import Tuple +from typing import Any, Generator, Tuple, Union from collections import deque import hashlib from telethon.tl.patched import Message, MessageService from telethon.tl.types import ( + Message, MessageMediaContact, + MessageMediaDice, MessageMediaDocument, + MessageMediaGame, MessageMediaGeo, MessageMediaPhoto, - TypeMessage, + MessageMediaPoll, + MessageMediaUnsupported, + MessageService, + PeerChannel, + PeerChat, + PeerUser, TypeUpdates, UpdateNewChannelMessage, UpdateNewMessage, + UpdateShortChatMessage, + UpdateShortMessage, ) from mautrix.types import EventID @@ -37,60 +47,67 @@ from .. import portal as po from ..types import TelegramID DedupMXID = Tuple[EventID, TelegramID] +TypeMessage = Union[Message, MessageService, UpdateShortMessage, UpdateShortChatMessage] + +media_content_table = { + MessageMediaContact: lambda media: [media.user_id], + MessageMediaDocument: lambda media: [media.document.id], + MessageMediaPhoto: lambda media: [media.photo.id if media.photo else 0], + MessageMediaGeo: lambda media: [media.geo.long, media.geo.lat], + MessageMediaGame: lambda media: [media.id], + MessageMediaPoll: lambda media: [media.id], + MessageMediaDice: lambda media: [media.emoticon], + MessageMediaUnsupported: lambda media: ["unsupported media"], +} class PortalDedup: pre_db_check: bool = False - cache_queue_length: int = 20 + cache_queue_length: int = 256 - _dedup: deque[str] - _dedup_mxid: dict[str, DedupMXID] - _dedup_action: deque[str] + _dedup: deque[bytes | int] + _dedup_mxid: dict[bytes | int, DedupMXID] + _dedup_action: deque[bytes | int] _portal: po.Portal def __init__(self, portal: po.Portal) -> None: self._dedup = deque() self._dedup_mxid = {} - self._dedup_action = deque() + self._dedup_action = deque(maxlen=self.cache_queue_length) self._portal = portal @property def _always_force_hash(self) -> bool: return self._portal.peer_type == "chat" - @staticmethod - def _hash_event(event: TypeMessage) -> str: - # Non-channel messages are unique per-user (wtf telegram), so we have no other choice than - # to deduplicate based on a hash of the message content. - - # The timestamp is only accurate to the second, so we can't rely solely on that either. + def _hash_content(self, event: TypeMessage) -> Generator[Any, None, None]: + if not self._always_force_hash: + yield event.id + yield int(event.date.timestamp()) if isinstance(event, MessageService): - hash_content = [event.date.timestamp(), event.from_id, event.action] + yield event.from_id + yield event.action else: - hash_content = [event.date.timestamp(), event.message.strip()] + yield event.message.strip() if event.fwd_from: - hash_content += [event.fwd_from.from_id] - elif isinstance(event, Message) and event.media: - try: - hash_content += { - MessageMediaContact: lambda media: [media.user_id], - MessageMediaDocument: lambda media: [media.document.id], - MessageMediaPhoto: lambda media: [media.photo.id if media.photo else 0], - MessageMediaGeo: lambda media: [media.geo.long, media.geo.lat], - }[type(event.media)](event.media) - except KeyError: - pass - return hashlib.md5("-".join(str(a) for a in hash_content).encode("utf-8")).hexdigest() + yield event.fwd_from.from_id + if isinstance(event, Message) and event.media: + media_hash_func = media_content_table.get(type(event.media)) or ( + lambda media: ["unknown media"] + ) + yield media_hash_func(event.media) + + def _hash_event(self, event: TypeMessage) -> bytes: + return hashlib.sha256( + "-".join(str(a) for a in self._hash_content(event)).encode("utf-8") + ).digest() def check_action(self, event: TypeMessage) -> bool: - evt_hash = self._hash_event(event) if self._always_force_hash else event.id - if evt_hash in self._dedup_action: + dedup_id = self._hash_event(event) if self._always_force_hash else event.id + if dedup_id in self._dedup_action: return True - self._dedup_action.append(evt_hash) - - if len(self._dedup_action) > self.cache_queue_length: - self._dedup_action.popleft() + self._dedup_action.appendleft(dedup_id) return False def update( @@ -99,31 +116,38 @@ class PortalDedup: mxid: DedupMXID = None, expected_mxid: DedupMXID | None = None, force_hash: bool = False, - ) -> DedupMXID | None: - evt_hash = self._hash_event(event) if self._always_force_hash or force_hash else event.id + ) -> tuple[bytes, DedupMXID | None]: + evt_hash = self._hash_event(event) + dedup_id = evt_hash if self._always_force_hash or force_hash else event.id try: - found_mxid = self._dedup_mxid[evt_hash] + found_mxid = self._dedup_mxid[dedup_id] except KeyError: - return EventID("None"), TelegramID(0) + return evt_hash, None if found_mxid != expected_mxid: - return found_mxid - self._dedup_mxid[evt_hash] = mxid - return None + return evt_hash, found_mxid + self._dedup_mxid[dedup_id] = mxid + if evt_hash != dedup_id: + self._dedup_mxid[evt_hash] = mxid + return evt_hash, None def check( self, event: TypeMessage, mxid: DedupMXID = None, force_hash: bool = False - ) -> DedupMXID | None: - evt_hash = self._hash_event(event) if self._always_force_hash or force_hash else event.id - if evt_hash in self._dedup: - return self._dedup_mxid[evt_hash] + ) -> tuple[bytes, DedupMXID | None]: + evt_hash = self._hash_event(event) + dedup_id = evt_hash if self._always_force_hash or force_hash else event.id + if dedup_id in self._dedup: + return evt_hash, self._dedup_mxid[dedup_id] - self._dedup_mxid[evt_hash] = mxid - self._dedup.append(evt_hash) + self._dedup_mxid[dedup_id] = mxid + self._dedup.appendleft(dedup_id) + if evt_hash != dedup_id: + self._dedup_mxid[evt_hash] = mxid + self._dedup.appendleft(evt_hash) - if len(self._dedup) > self.cache_queue_length: - del self._dedup_mxid[self._dedup.popleft()] - return None + while len(self._dedup) > self.cache_queue_length: + del self._dedup_mxid[self._dedup.pop()] + return evt_hash, None def register_outgoing_actions(self, response: TypeUpdates) -> None: for update in response.updates: diff --git a/mautrix_telegram/portal_util/send_lock.py b/mautrix_telegram/portal_util/send_lock.py index 249df119..0e524717 100644 --- a/mautrix_telegram/portal_util/send_lock.py +++ b/mautrix_telegram/portal_util/send_lock.py @@ -16,6 +16,9 @@ from __future__ import annotations from asyncio import Lock +from collections import defaultdict + +from mautrix.types import EventID from ..types import TelegramID @@ -42,3 +45,13 @@ class PortalSendLock: return self._send_locks[user_id] except KeyError: return self._send_locks.setdefault(user_id, Lock()) if required else self._noop_lock + + +class PortalReactionLock: + _reaction_locks: dict[EventID, Lock] + + def __init__(self) -> None: + self._reaction_locks = defaultdict(lambda: Lock()) + + def __call__(self, mxid: EventID) -> Lock: + return self._reaction_locks[mxid]