diff --git a/ROADMAP.md b/ROADMAP.md
index 918fbe7a..336fb935 100644
--- a/ROADMAP.md
+++ b/ROADMAP.md
@@ -3,6 +3,7 @@
* Matrix → Telegram
* [x] Message content (text, formatting, files, etc..)
* [x] Message redactions
+ * [ ] Message reactions
* [x] Message edits
* [ ] ‡ Message history
* [x] Presence
@@ -12,8 +13,8 @@
* [x] Power level
* [x] Normal chats
* [ ] Non-hardcoded PL requirements
- * [x] Supergroups/channels
- * [ ] Precise bridging (non-hardcoded PL requirements, bridge specific permissions, etc..)
+ * [x] Supergroups/channels
+ * [ ] Precise bridging (non-hardcoded PL requirements, bridge specific permissions, etc..)
* [x] Membership actions (invite/kick/join/leave)
* [x] Room metadata changes (name, topic, avatar)
* [x] Initial room metadata
@@ -27,6 +28,7 @@
* [x] Games
* [ ] Buttons
* [x] Message deletions
+ * [x] Message reactions
* [x] Message edits
* [x] Message history
* [x] Manually (`!tg backfill`)
diff --git a/mautrix_telegram/abstract_user.py b/mautrix_telegram/abstract_user.py
index 3799512f..39270496 100644
--- a/mautrix_telegram/abstract_user.py
+++ b/mautrix_telegram/abstract_user.py
@@ -46,6 +46,7 @@ from telethon.tl.types import (
UpdateEditChannelMessage,
UpdateEditMessage,
UpdateFolderPeers,
+ UpdateMessageReactions,
UpdateNewChannelMessage,
UpdateNewMessage,
UpdateNotifySettings,
@@ -312,6 +313,8 @@ class AbstractUser(ABC):
await self.delete_message(update)
elif isinstance(update, UpdateDeleteChannelMessages):
await self.delete_channel_message(update)
+ elif isinstance(update, UpdateMessageReactions):
+ await self.update_reactions(update)
elif isinstance(update, (UpdateChatUserTyping, UpdateChannelUserTyping, UpdateUserTyping)):
await self.update_typing(update)
elif isinstance(update, UpdateUserStatus):
@@ -559,6 +562,12 @@ class AbstractUser(ABC):
await message.delete()
await self._try_redact(message)
+ async def update_reactions(self, update: UpdateMessageReactions) -> None:
+ portal = await po.Portal.get_by_entity(update.peer, tg_receiver=self.tgid)
+ if not portal or not portal.mxid or not portal.allow_bridging:
+ return
+ await portal.handle_telegram_reactions(self, TelegramID(update.msg_id), update.reactions)
+
async def update_message(self, original_update: UpdateMessage) -> None:
update, sender, portal = await self.get_message_details(original_update)
if not portal:
diff --git a/mautrix_telegram/config.py b/mautrix_telegram/config.py
index 08f00f72..1fc93a3d 100644
--- a/mautrix_telegram/config.py
+++ b/mautrix_telegram/config.py
@@ -178,9 +178,6 @@ class Config(BaseBridgeConfig):
else:
copy("bridge.bridge_notices")
- copy("bridge.deduplication.pre_db_check")
- copy("bridge.deduplication.cache_queue_length")
-
if "bridge.message_formats.m_text" in self:
del self["bridge.message_formats"]
copy_dict("bridge.message_formats", override_existing_map=False)
diff --git a/mautrix_telegram/db/__init__.py b/mautrix_telegram/db/__init__.py
index ebb94fcf..119ebdbd 100644
--- a/mautrix_telegram/db/__init__.py
+++ b/mautrix_telegram/db/__init__.py
@@ -19,6 +19,7 @@ from .bot_chat import BotChat
from .message import Message
from .portal import Portal
from .puppet import Puppet
+from .reaction import Reaction
from .telegram_file import TelegramFile
from .telethon_session import PgSession
from .upgrade import upgrade_table
@@ -26,7 +27,7 @@ from .user import User
def init(db: Database) -> None:
- for table in (Portal, Message, User, Puppet, TelegramFile, BotChat, PgSession):
+ for table in (Portal, Message, Reaction, User, Puppet, TelegramFile, BotChat, PgSession):
table.db = db
@@ -35,6 +36,7 @@ __all__ = [
"init",
"Portal",
"Message",
+ "Reaction",
"User",
"Puppet",
"TelegramFile",
diff --git a/mautrix_telegram/db/message.py b/mautrix_telegram/db/message.py
index dcbd0a5e..f1625fbc 100644
--- a/mautrix_telegram/db/message.py
+++ b/mautrix_telegram/db/message.py
@@ -38,6 +38,7 @@ class Message:
tg_space: TelegramID
edit_index: int
redacted: bool = False
+ content_hash: bytes | None = None
@classmethod
def _from_row(cls, row: Record | None) -> Message | None:
@@ -45,7 +46,7 @@ class Message:
return None
return cls(**row)
- columns: ClassVar[str] = "mxid, mx_room, tgid, tg_space, edit_index, redacted"
+ columns: ClassVar[str] = "mxid, mx_room, tgid, tg_space, edit_index, redacted, content_hash"
@classmethod
async def get_all_by_tgid(cls, tgid: TelegramID, tg_space: TelegramID) -> list[Message]:
@@ -142,14 +143,24 @@ class Message:
q = "UPDATE message SET mxid=$1 WHERE mxid=$2 AND mx_room=$3"
await cls.db.execute(q, real_mxid, temp_mxid, mx_room)
+ @property
+ def _values(self):
+ return (
+ self.mxid,
+ self.mx_room,
+ self.tgid,
+ self.tg_space,
+ self.edit_index,
+ self.redacted,
+ self.content_hash,
+ )
+
async def insert(self) -> None:
- q = (
- "INSERT INTO message (mxid, mx_room, tgid, tg_space, edit_index, redacted) "
- "VALUES ($1, $2, $3, $4, $5, $6)"
- )
- await self.db.execute(
- q, self.mxid, self.mx_room, self.tgid, self.tg_space, self.edit_index, self.redacted
- )
+ q = """
+ INSERT INTO message (mxid, mx_room, tgid, tg_space, edit_index, redacted, content_hash)
+ VALUES ($1, $2, $3, $4, $5, $6, $7)
+ """
+ await self.db.execute(q, *self._values)
async def delete(self) -> None:
q = "DELETE FROM message WHERE mxid=$1 AND mx_room=$2 AND tg_space=$3"
diff --git a/mautrix_telegram/db/reaction.py b/mautrix_telegram/db/reaction.py
new file mode 100644
index 00000000..71c49dd7
--- /dev/null
+++ b/mautrix_telegram/db/reaction.py
@@ -0,0 +1,84 @@
+# mautrix-telegram - A Matrix-Telegram puppeting bridge
+# Copyright (C) 2021 Tulir Asokan
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, ClassVar
+
+from asyncpg import Record
+from attr import dataclass
+
+from mautrix.types import EventID, RoomID
+from mautrix.util.async_db import Database
+
+from ..types import TelegramID
+
+fake_db = Database.create("") if TYPE_CHECKING else None
+
+
+@dataclass
+class Reaction:
+ db: ClassVar[Database] = fake_db
+
+ mxid: EventID
+ mx_room: RoomID
+ msg_mxid: EventID
+ tg_sender: TelegramID
+ reaction: str
+
+ @classmethod
+ def _from_row(cls, row: Record | None) -> Reaction | None:
+ if row is None:
+ return None
+ return cls(**row)
+
+ columns: ClassVar[str] = "mxid, mx_room, msg_mxid, tg_sender, reaction"
+
+ @classmethod
+ async def delete_all(cls, mx_room: RoomID) -> None:
+ await cls.db.execute("DELETE FROM reaction WHERE mx_room=$1", mx_room)
+
+ @classmethod
+ async def get_by_mxid(cls, mxid: EventID, mx_room: RoomID) -> Reaction | None:
+ q = f"SELECT {cls.columns} FROM reaction WHERE mxid=$1 AND mx_room=$2"
+ return cls._from_row(await cls.db.fetchrow(q, mxid, mx_room))
+
+ @classmethod
+ async def get_all_by_message(cls, mxid: EventID, mx_room: RoomID) -> list[Reaction]:
+ q = f"SELECT {cls.columns} FROM reaction WHERE msg_mxid=$1 AND mx_room=$2"
+ rows = await cls.db.fetch(q, mxid, mx_room)
+ return [cls._from_row(row) for row in rows]
+
+ @property
+ def _values(self):
+ return (
+ self.mxid,
+ self.mx_room,
+ self.msg_mxid,
+ self.tg_sender,
+ self.reaction,
+ )
+
+ async def save(self) -> None:
+ q = """
+ INSERT INTO reaction (mxid, mx_room, msg_mxid, tg_sender, reaction)
+ VALUES ($1, $2, $3, $4, $5) ON CONFLICT (msg_mxid, mx_room, tg_sender)
+ DO UPDATE SET mxid=$1, reaction=$5
+ """
+ await self.db.execute(q, *self._values)
+
+ async def delete(self) -> None:
+ q = "DELETE FROM reaction WHERE msg_mxid=$1 AND mx_room=$2 AND tg_sender=$3"
+ await self.db.execute(q, self.msg_mxid, self.mx_room, self.tg_sender)
diff --git a/mautrix_telegram/db/upgrade/__init__.py b/mautrix_telegram/db/upgrade/__init__.py
index 2146830c..18875090 100644
--- a/mautrix_telegram/db/upgrade/__init__.py
+++ b/mautrix_telegram/db/upgrade/__init__.py
@@ -2,4 +2,4 @@ from mautrix.util.async_db import UpgradeTable
upgrade_table = UpgradeTable()
-from . import v01_initial_revision, v02_sponsored_events
+from . import v01_initial_revision, v02_sponsored_events, v03_reactions
diff --git a/mautrix_telegram/db/upgrade/v03_reactions.py b/mautrix_telegram/db/upgrade/v03_reactions.py
new file mode 100644
index 00000000..e9edd67d
--- /dev/null
+++ b/mautrix_telegram/db/upgrade/v03_reactions.py
@@ -0,0 +1,37 @@
+# mautrix-telegram - A Matrix-Telegram puppeting bridge
+# Copyright (C) 2021 Tulir Asokan
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+from asyncpg import Connection
+
+from . import upgrade_table
+
+
+@upgrade_table.register(description="Add support for reactions")
+async def upgrade_v3(conn: Connection) -> None:
+ await conn.execute(
+ """CREATE TABLE reaction (
+ mxid TEXT NOT NULL,
+ mx_room TEXT NOT NULL,
+ msg_mxid TEXT NOT NULL,
+ tg_sender BIGINT,
+ reaction TEXT NOT NULL,
+
+ PRIMARY KEY (msg_mxid, mx_room, tg_sender),
+ UNIQUE (mxid, mx_room)
+ )"""
+ )
+ await conn.execute("ALTER TABLE message ALTER COLUMN mxid SET NOT NULL")
+ await conn.execute("ALTER TABLE message ALTER COLUMN mx_room SET NOT NULL")
+ await conn.execute("ALTER TABLE message ADD COLUMN content_hash bytea")
diff --git a/mautrix_telegram/example-config.yaml b/mautrix_telegram/example-config.yaml
index cfc59e2f..054156fb 100644
--- a/mautrix_telegram/example-config.yaml
+++ b/mautrix_telegram/example-config.yaml
@@ -331,16 +331,6 @@ bridge:
exceptions:
- "@importantbot:example.com"
- # Some config options related to Telegram message deduplication.
- # The default values are usually fine, but some debug messages/warnings might recommend you
- # change these.
- deduplication:
- # Whether or not to check the database if the message about to be sent is a duplicate.
- pre_db_check: false
- # The number of latest events to keep when checking for duplicates.
- # You might need to increase this on high-traffic bridge instances.
- cache_queue_length: 20
-
# The formats to use when sending messages to Telegram via the relay bot.
# Text msgtypes (m.text, m.notice and m.emote) support HTML, media msgtypes don't.
#
diff --git a/mautrix_telegram/portal.py b/mautrix_telegram/portal.py
index de33d942..1b55798c 100644
--- a/mautrix_telegram/portal.py
+++ b/mautrix_telegram/portal.py
@@ -63,6 +63,7 @@ from telethon.tl.functions.messages import (
EditChatPhotoRequest,
EditChatTitleRequest,
ExportChatInviteRequest,
+ GetMessageReactionsListRequest,
MigrateChatRequest,
SetTypingRequest,
UnpinAllMessagesRequest,
@@ -112,6 +113,8 @@ from telethon.tl.types import (
MessageMediaPhoto,
MessageMediaPoll,
MessageMediaUnsupported,
+ MessageReactions,
+ MessageUserReaction,
PeerChannel,
PeerChat,
PeerUser,
@@ -122,6 +125,7 @@ from telethon.tl.types import (
PhotoSizeEmpty,
PhotoSizeProgressive,
Poll,
+ ReactionCount,
SendMessageCancelAction,
SendMessageTypingAction,
SponsoredMessage,
@@ -177,13 +181,19 @@ from mautrix.types import (
UserID,
VideoInfo,
)
+from mautrix.util import variation_selector
from mautrix.util.message_send_checkpoint import MessageSendCheckpointStatus
from mautrix.util.simple_lock import SimpleLock
from mautrix.util.simple_template import SimpleTemplate
from . import abstract_user as au, formatter, portal_util as putil, puppet as p, user as u, util
from .config import Config
-from .db import Message as DBMessage, Portal as DBPortal, TelegramFile as DBTelegramFile
+from .db import (
+ Message as DBMessage,
+ Portal as DBPortal,
+ Reaction as DBReaction,
+ TelegramFile as DBTelegramFile,
+)
from .tgclient import MautrixTelegramClient
from .types import TelegramID
from .util import sane_mimetypes
@@ -248,6 +258,7 @@ class Portal(DBPortal, BasePortal):
dedup: putil.PortalDedup
send_lock: putil.PortalSendLock
+ reaction_lock: putil.PortalReactionLock
_pin_lock: asyncio.Lock
_main_intent: IntentAPI | None
@@ -307,6 +318,7 @@ class Portal(DBPortal, BasePortal):
self.dedup = putil.PortalDedup(self)
self.send_lock = putil.PortalSendLock()
+ self.reaction_lock = putil.PortalReactionLock()
self._pin_lock = asyncio.Lock()
self._room_create_lock = asyncio.Lock()
@@ -405,10 +417,6 @@ class Portal(DBPortal, BasePortal):
)
NotificationDisabler.puppet_cls = p.Puppet
NotificationDisabler.config_enabled = cls.config["bridge.backfill.disable_notifications"]
- putil.PortalDedup.dedup_pre_db_check = cls.config["bridge.deduplication.pre_db_check"]
- putil.PortalDedup.dedup_cache_queue_length = cls.config[
- "bridge.deduplication.cache_queue_length"
- ]
# endregion
# region Matrix -> Telegram metadata
@@ -1648,7 +1656,7 @@ class Portal(DBPortal, BasePortal):
self, event_id: EventID, space: TelegramID, edit_index: int, response: TypeMessage
) -> None:
self.log.trace("Handled Matrix message: %s", response)
- self.dedup.check(response, (event_id, space), force_hash=edit_index != 0)
+ event_hash, _ = self.dedup.check(response, (event_id, space), force_hash=edit_index != 0)
if edit_index < 0:
prev_edit = await DBMessage.get_one_by_tgid(TelegramID(response.id), space, -1)
edit_index = prev_edit.edit_index + 1
@@ -1658,6 +1666,7 @@ class Portal(DBPortal, BasePortal):
mx_room=self.mxid,
mxid=event_id,
edit_index=edit_index,
+ content_hash=event_hash,
).insert()
async def _send_bridge_error(
@@ -2395,13 +2404,18 @@ class Portal(DBPortal, BasePortal):
self.log.debug("Ignoring game message edit event")
return
+ if self.peer_type != "channel" and isinstance(evt, Message) and evt.reactions is not None:
+ asyncio.create_task(
+ self.try_handle_telegram_reactions(source, TelegramID(evt.id), evt.reactions)
+ )
+
async with self.send_lock(sender.tgid if sender else None, required=False):
tg_space = self.tgid if self.peer_type == "channel" else source.tgid
temporary_identifier = EventID(
f"${random.randint(1000000000000, 9999999999999)}TGBRIDGEDITEMP"
)
- duplicate_found = self.dedup.check(
+ event_hash, duplicate_found = self.dedup.check(
evt, (temporary_identifier, tg_space), force_hash=True
)
if duplicate_found:
@@ -2418,6 +2432,7 @@ class Portal(DBPortal, BasePortal):
tg_space=tg_space,
tgid=TelegramID(evt.id),
edit_index=prev_edit_msg.edit_index + 1,
+ content_hash=event_hash,
).insert()
return
@@ -2431,6 +2446,15 @@ class Portal(DBPortal, BasePortal):
"in database."
)
return
+ prev_edit_msg = (
+ await DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space, -1) or editing_msg
+ )
+ if prev_edit_msg.content_hash == event_hash:
+ self.log.debug(
+ f"Ignoring edit of message {evt.id}@{tg_space} (src {source.tgid}):"
+ " content hash didn't change"
+ )
+ return
content.msgtype = (
MessageType.NOTICE
@@ -2444,15 +2468,13 @@ class Portal(DBPortal, BasePortal):
await intent.set_typing(self.mxid, is_typing=False)
event_id = await self._send_message(intent, content)
- prev_edit_msg = (
- await DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space, -1) or editing_msg
- )
await DBMessage(
mxid=event_id,
mx_room=self.mxid,
tg_space=tg_space,
tgid=TelegramID(evt.id),
edit_index=prev_edit_msg.edit_index + 1,
+ content_hash=event_hash,
).insert()
await DBMessage.replace_temp_mxid(temporary_identifier, self.mxid, event_id)
@@ -2587,6 +2609,139 @@ class Portal(DBPortal, BasePortal):
count += 1
return count
+ def _split_dm_reaction_counts(self, counts: list[ReactionCount]) -> list[MessageUserReaction]:
+ if len(counts) == 1:
+ item = counts[0]
+ if item.count == 2:
+ return [
+ MessageUserReaction(reaction=item.reaction, user_id=self.tgid),
+ MessageUserReaction(reaction=item.reaction, user_id=self.tg_receiver),
+ ]
+ elif item.count == 1:
+ return [
+ MessageUserReaction(
+ reaction=item.reaction,
+ user_id=self.tg_receiver if item.chosen else self.tgid,
+ ),
+ ]
+ elif len(counts) == 2:
+ item1, item2 = counts
+ return [
+ MessageUserReaction(
+ reaction=item1.reaction,
+ user_id=self.tg_receiver if item1.chosen else self.tgid,
+ ),
+ MessageUserReaction(
+ reaction=item2.reaction,
+ user_id=self.tg_receiver if item2.chosen else self.tgid,
+ ),
+ ]
+ return []
+
+ async def try_handle_telegram_reactions(
+ self,
+ source: au.AbstractUser,
+ msg_id: TelegramID,
+ data: MessageReactions,
+ dbm: DBMessage | None = None,
+ ) -> None:
+ try:
+ await self.handle_telegram_reactions(source, msg_id, data, dbm)
+ except Exception:
+ self.log.exception(f"Error handling reactions in message {msg_id}")
+
+ async def handle_telegram_reactions(
+ self,
+ source: au.AbstractUser,
+ msg_id: TelegramID,
+ data: MessageReactions,
+ dbm: DBMessage | None = None,
+ ) -> None:
+ if self.peer_type == "channel" and not self.megagroup:
+ # We don't know who reacted in a channel, so we can't bridge it properly either
+ return
+
+ tg_space = self.tgid if self.peer_type == "channel" else source.tgid
+ if dbm is None:
+ dbm = await DBMessage.get_one_by_tgid(msg_id, tg_space)
+ if dbm is None:
+ return
+
+ total_count = sum(item.count for item in data.results)
+ recent_reactions = data.recent_reactons or []
+ if not recent_reactions and total_count > 0:
+ if self.peer_type == "user":
+ recent_reactions = self._split_dm_reaction_counts(data.results)
+ elif source.is_bot:
+ # Can't fetch exact reaction senders as a bot
+ return
+ else:
+ # TODO this doesn't work for some reason
+ return
+ # resp = await source.client(
+ # GetMessageReactionsListRequest(peer=self.peer, id=dbm.tgid, limit=20)
+ # )
+ # recent_reactions = resp.reactions
+
+ async with self.reaction_lock(dbm.mxid):
+ await self._handle_telegram_reactions_locked(dbm, recent_reactions, total_count)
+
+ async def _handle_telegram_reactions_locked(
+ self, msg: DBMessage, reaction_list: list[MessageUserReaction], total_count: int
+ ) -> None:
+ reactions = {reaction.user_id: reaction.reaction for reaction in reaction_list}
+ is_full = len(reactions) == total_count
+
+ existing_reactions = await DBReaction.get_all_by_message(msg.mxid, msg.mx_room)
+
+ removed: list[DBReaction] = []
+ changed: list[tuple[DBReaction, str]] = []
+ for existing_reaction in existing_reactions:
+ new_reaction = reactions.get(existing_reaction.tg_sender)
+ if new_reaction is None:
+ if is_full:
+ removed.append(existing_reaction)
+ # else: assume the reaction is still there, too much effort to fetch it
+ elif new_reaction == existing_reaction.reaction:
+ reactions.pop(existing_reaction.tg_sender)
+ else:
+ changed.append((existing_reaction, new_reaction))
+
+ for sender, new_emoji in reactions.items():
+ self.log.debug(f"Bridging reaction {new_emoji} by {sender} to {msg.tgid}")
+ puppet: p.Puppet = await p.Puppet.get_by_tgid(sender)
+ mxid = await puppet.intent_for(self).react(
+ msg.mx_room, msg.mxid, variation_selector.add(new_emoji)
+ )
+ await DBReaction(
+ mxid=mxid,
+ mx_room=msg.mx_room,
+ msg_mxid=msg.mxid,
+ tg_sender=sender,
+ reaction=new_emoji,
+ ).save()
+ for removed_reaction in removed:
+ self.log.debug(
+ f"Removing reaction {removed_reaction.reaction} by {removed_reaction.tg_sender} "
+ f"to {msg.tgid}"
+ )
+ puppet = await p.Puppet.get_by_tgid(removed_reaction.tg_sender)
+ await puppet.intent_for(self).redact(removed_reaction.mx_room, removed_reaction.mxid)
+ await removed_reaction.delete()
+ for changed_reaction, new_emoji in changed:
+ self.log.debug(
+ f"Updating reaction {changed_reaction.reaction} -> {new_emoji} "
+ f"by {changed_reaction.tg_sender} to {msg.tgid}"
+ )
+ puppet = await p.Puppet.get_by_tgid(changed_reaction.tg_sender)
+ intent = puppet.intent_for(self)
+ await intent.redact(changed_reaction.mx_room, changed_reaction.mxid)
+ changed_reaction.mxid = await intent.react(
+ msg.mx_room, msg.mxid, variation_selector.add(new_emoji)
+ )
+ changed_reaction.reaction = new_emoji
+ await changed_reaction.save()
+
async def handle_telegram_message(
self, source: au.AbstractUser, sender: p.Puppet, evt: Message
) -> None:
@@ -2613,7 +2768,7 @@ class Portal(DBPortal, BasePortal):
temporary_identifier = EventID(
f"${random.randint(1000000000000, 9999999999999)}TGBRIDGETEMP"
)
- duplicate_found = self.dedup.check(evt, (temporary_identifier, tg_space))
+ event_hash, duplicate_found = self.dedup.check(evt, (temporary_identifier, tg_space))
if duplicate_found:
mxid, other_tg_space = duplicate_found
self.log.debug(
@@ -2627,17 +2782,16 @@ class Portal(DBPortal, BasePortal):
mxid=mxid,
tg_space=tg_space,
edit_index=0,
+ content_hash=event_hash,
).insert()
return
- if self.backfill_lock.locked or (self.dedup.pre_db_check and self.peer_type == "channel"):
+ if self.backfill_lock.locked or self.peer_type == "channel":
msg = await DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space)
if msg:
self.log.debug(
f"Ignoring message {evt.id} (src {source.tgid}) as it was already "
- f"handled into {msg.mxid}. This duplicate was catched in the db "
- "check. If you get this message often, consider increasing "
- "bridge.deduplication.cache_queue_length in the config."
+ f"handled into {msg.mxid}."
)
return
@@ -2702,7 +2856,10 @@ class Portal(DBPortal, BasePortal):
self._new_messages_after_sponsored = True
- prev_id = self.dedup.update(evt, (event_id, tg_space), (temporary_identifier, tg_space))
+ another_event_hash, prev_id = self.dedup.update(
+ evt, (event_id, tg_space), (temporary_identifier, tg_space)
+ )
+ assert another_event_hash == event_hash
if prev_id:
self.log.debug(
f"Sent message {evt.id}@{tg_space} to Matrix as {event_id}. "
@@ -2717,13 +2874,15 @@ class Portal(DBPortal, BasePortal):
self.log.debug("Handled telegram message %d -> %s", evt.id, event_id)
try:
- await DBMessage(
+ dbm = DBMessage(
tgid=TelegramID(evt.id),
mx_room=self.mxid,
mxid=event_id,
tg_space=tg_space,
edit_index=0,
- ).insert()
+ content_hash=event_hash,
+ )
+ await dbm.insert()
await DBMessage.replace_temp_mxid(temporary_identifier, self.mxid, event_id)
except (IntegrityError, UniqueViolationError) as e:
self.log.exception(
@@ -2733,6 +2892,11 @@ class Portal(DBPortal, BasePortal):
"pre_db_check in the config."
)
await intent.redact(self.mxid, event_id)
+ return
+ if isinstance(evt, Message) and evt.reactions:
+ asyncio.create_task(
+ self.try_handle_telegram_reactions(source, dbm.tgid, evt.reactions, dbm=dbm)
+ )
await self._send_delivery_receipt(event_id)
async def _create_room_on_action(
@@ -2956,6 +3120,7 @@ class Portal(DBPortal, BasePortal):
pass
await super().delete()
await DBMessage.delete_all(self.mxid)
+ await DBReaction.delete_all(self.mxid)
self.deleted = True
# endregion
diff --git a/mautrix_telegram/portal_util/__init__.py b/mautrix_telegram/portal_util/__init__.py
index 20e0ec41..e79ffb3b 100644
--- a/mautrix_telegram/portal_util/__init__.py
+++ b/mautrix_telegram/portal_util/__init__.py
@@ -2,5 +2,5 @@ from .deduplication import PortalDedup
from .media_fallback import make_contact_event_content, make_dice_event_content
from .participants import get_users
from .power_levels import get_base_power_levels, participants_to_power_levels
-from .send_lock import PortalSendLock
+from .send_lock import PortalReactionLock, PortalSendLock
from .sponsored_message import get_sponsored_message, make_sponsored_message_content
diff --git a/mautrix_telegram/portal_util/deduplication.py b/mautrix_telegram/portal_util/deduplication.py
index 0c038e9d..bd3702da 100644
--- a/mautrix_telegram/portal_util/deduplication.py
+++ b/mautrix_telegram/portal_util/deduplication.py
@@ -15,20 +15,30 @@
# along with this program. If not, see .
from __future__ import annotations
-from typing import Tuple
+from typing import Any, Generator, Tuple, Union
from collections import deque
import hashlib
from telethon.tl.patched import Message, MessageService
from telethon.tl.types import (
+ Message,
MessageMediaContact,
+ MessageMediaDice,
MessageMediaDocument,
+ MessageMediaGame,
MessageMediaGeo,
MessageMediaPhoto,
- TypeMessage,
+ MessageMediaPoll,
+ MessageMediaUnsupported,
+ MessageService,
+ PeerChannel,
+ PeerChat,
+ PeerUser,
TypeUpdates,
UpdateNewChannelMessage,
UpdateNewMessage,
+ UpdateShortChatMessage,
+ UpdateShortMessage,
)
from mautrix.types import EventID
@@ -37,60 +47,67 @@ from .. import portal as po
from ..types import TelegramID
DedupMXID = Tuple[EventID, TelegramID]
+TypeMessage = Union[Message, MessageService, UpdateShortMessage, UpdateShortChatMessage]
+
+media_content_table = {
+ MessageMediaContact: lambda media: [media.user_id],
+ MessageMediaDocument: lambda media: [media.document.id],
+ MessageMediaPhoto: lambda media: [media.photo.id if media.photo else 0],
+ MessageMediaGeo: lambda media: [media.geo.long, media.geo.lat],
+ MessageMediaGame: lambda media: [media.id],
+ MessageMediaPoll: lambda media: [media.id],
+ MessageMediaDice: lambda media: [media.emoticon],
+ MessageMediaUnsupported: lambda media: ["unsupported media"],
+}
class PortalDedup:
pre_db_check: bool = False
- cache_queue_length: int = 20
+ cache_queue_length: int = 256
- _dedup: deque[str]
- _dedup_mxid: dict[str, DedupMXID]
- _dedup_action: deque[str]
+ _dedup: deque[bytes | int]
+ _dedup_mxid: dict[bytes | int, DedupMXID]
+ _dedup_action: deque[bytes | int]
_portal: po.Portal
def __init__(self, portal: po.Portal) -> None:
self._dedup = deque()
self._dedup_mxid = {}
- self._dedup_action = deque()
+ self._dedup_action = deque(maxlen=self.cache_queue_length)
self._portal = portal
@property
def _always_force_hash(self) -> bool:
return self._portal.peer_type == "chat"
- @staticmethod
- def _hash_event(event: TypeMessage) -> str:
- # Non-channel messages are unique per-user (wtf telegram), so we have no other choice than
- # to deduplicate based on a hash of the message content.
-
- # The timestamp is only accurate to the second, so we can't rely solely on that either.
+ def _hash_content(self, event: TypeMessage) -> Generator[Any, None, None]:
+ if not self._always_force_hash:
+ yield event.id
+ yield int(event.date.timestamp())
if isinstance(event, MessageService):
- hash_content = [event.date.timestamp(), event.from_id, event.action]
+ yield event.from_id
+ yield event.action
else:
- hash_content = [event.date.timestamp(), event.message.strip()]
+ yield event.message.strip()
if event.fwd_from:
- hash_content += [event.fwd_from.from_id]
- elif isinstance(event, Message) and event.media:
- try:
- hash_content += {
- MessageMediaContact: lambda media: [media.user_id],
- MessageMediaDocument: lambda media: [media.document.id],
- MessageMediaPhoto: lambda media: [media.photo.id if media.photo else 0],
- MessageMediaGeo: lambda media: [media.geo.long, media.geo.lat],
- }[type(event.media)](event.media)
- except KeyError:
- pass
- return hashlib.md5("-".join(str(a) for a in hash_content).encode("utf-8")).hexdigest()
+ yield event.fwd_from.from_id
+ if isinstance(event, Message) and event.media:
+ media_hash_func = media_content_table.get(type(event.media)) or (
+ lambda media: ["unknown media"]
+ )
+ yield media_hash_func(event.media)
+
+ def _hash_event(self, event: TypeMessage) -> bytes:
+ return hashlib.sha256(
+ "-".join(str(a) for a in self._hash_content(event)).encode("utf-8")
+ ).digest()
def check_action(self, event: TypeMessage) -> bool:
- evt_hash = self._hash_event(event) if self._always_force_hash else event.id
- if evt_hash in self._dedup_action:
+ dedup_id = self._hash_event(event) if self._always_force_hash else event.id
+ if dedup_id in self._dedup_action:
return True
- self._dedup_action.append(evt_hash)
-
- if len(self._dedup_action) > self.cache_queue_length:
- self._dedup_action.popleft()
+ self._dedup_action.appendleft(dedup_id)
return False
def update(
@@ -99,31 +116,38 @@ class PortalDedup:
mxid: DedupMXID = None,
expected_mxid: DedupMXID | None = None,
force_hash: bool = False,
- ) -> DedupMXID | None:
- evt_hash = self._hash_event(event) if self._always_force_hash or force_hash else event.id
+ ) -> tuple[bytes, DedupMXID | None]:
+ evt_hash = self._hash_event(event)
+ dedup_id = evt_hash if self._always_force_hash or force_hash else event.id
try:
- found_mxid = self._dedup_mxid[evt_hash]
+ found_mxid = self._dedup_mxid[dedup_id]
except KeyError:
- return EventID("None"), TelegramID(0)
+ return evt_hash, None
if found_mxid != expected_mxid:
- return found_mxid
- self._dedup_mxid[evt_hash] = mxid
- return None
+ return evt_hash, found_mxid
+ self._dedup_mxid[dedup_id] = mxid
+ if evt_hash != dedup_id:
+ self._dedup_mxid[evt_hash] = mxid
+ return evt_hash, None
def check(
self, event: TypeMessage, mxid: DedupMXID = None, force_hash: bool = False
- ) -> DedupMXID | None:
- evt_hash = self._hash_event(event) if self._always_force_hash or force_hash else event.id
- if evt_hash in self._dedup:
- return self._dedup_mxid[evt_hash]
+ ) -> tuple[bytes, DedupMXID | None]:
+ evt_hash = self._hash_event(event)
+ dedup_id = evt_hash if self._always_force_hash or force_hash else event.id
+ if dedup_id in self._dedup:
+ return evt_hash, self._dedup_mxid[dedup_id]
- self._dedup_mxid[evt_hash] = mxid
- self._dedup.append(evt_hash)
+ self._dedup_mxid[dedup_id] = mxid
+ self._dedup.appendleft(dedup_id)
+ if evt_hash != dedup_id:
+ self._dedup_mxid[evt_hash] = mxid
+ self._dedup.appendleft(evt_hash)
- if len(self._dedup) > self.cache_queue_length:
- del self._dedup_mxid[self._dedup.popleft()]
- return None
+ while len(self._dedup) > self.cache_queue_length:
+ del self._dedup_mxid[self._dedup.pop()]
+ return evt_hash, None
def register_outgoing_actions(self, response: TypeUpdates) -> None:
for update in response.updates:
diff --git a/mautrix_telegram/portal_util/send_lock.py b/mautrix_telegram/portal_util/send_lock.py
index 249df119..0e524717 100644
--- a/mautrix_telegram/portal_util/send_lock.py
+++ b/mautrix_telegram/portal_util/send_lock.py
@@ -16,6 +16,9 @@
from __future__ import annotations
from asyncio import Lock
+from collections import defaultdict
+
+from mautrix.types import EventID
from ..types import TelegramID
@@ -42,3 +45,13 @@ class PortalSendLock:
return self._send_locks[user_id]
except KeyError:
return self._send_locks.setdefault(user_id, Lock()) if required else self._noop_lock
+
+
+class PortalReactionLock:
+ _reaction_locks: dict[EventID, Lock]
+
+ def __init__(self) -> None:
+ self._reaction_locks = defaultdict(lambda: Lock())
+
+ def __call__(self, mxid: EventID) -> Lock:
+ return self._reaction_locks[mxid]