Compare commits

...

23 Commits

Author SHA1 Message Date
Tulir Asokan 2a6b075ff2 Bump version to 0.11.1 2022-01-10 15:45:30 +02:00
Tulir Asokan e321bc30d0 Update some small things 2022-01-09 00:06:35 +02:00
Tulir Asokan 63fafec1b7 Make telegram blue text more readable on dark themes. Fixes #729 2022-01-08 23:27:57 +02:00
Tulir Asokan 9f48eca5a6 Use min() instead of sorting list 2022-01-05 21:23:58 +02:00
Tulir Asokan 28845b9daf Update dependencies and fix some things in config updater 2022-01-05 21:01:12 +02:00
Tulir Asokan 113f41d1d2 Deduplicate lottieconverter calls in tgs_converter
Also fix finding first frame file

Fixes #690
Closes #728
2022-01-05 21:00:53 +02:00
Tulir Asokan da3180e290 Delete nulls in message table. Fixes #731 2022-01-05 18:53:10 +02:00
Tulir Asokan 1a62463678 Update changelog 2022-01-05 12:30:38 +02:00
Tulir Asokan e584cf534d Merge branch 'sumner/bri-1517-bridge-voice-messages-telegram-matrix' 2022-01-05 12:09:25 +02:00
Tulir Asokan 4c1267cd32 Merge branch 'maybe-fix-corrupted-db-schema'
Closes #719
2022-01-05 12:09:16 +02:00
Tulir Asokan dc8a3d0c2d Don't use parameters for pg_constraint query 2022-01-05 01:53:57 +02:00
Sumner Evans c481ec850d voice messages: bridge from Telegram to native Matrix
Co-authored-by: Tulir Asokan <tulir@maunium.net>
2022-01-04 14:16:57 -07:00
Tulir Asokan a54dd58de7 Send message checkpoints for Matrix edits too 2022-01-04 21:37:41 +02:00
Tulir Asokan b13da92520 Find constraint names dynamically to work around schemas broken by pgloader 2022-01-03 20:12:55 +02:00
Dominik Fuchß 2b6db85e1a Add missing await to get_input_entity in HTML parser (#724) 2021-12-31 11:19:41 +02:00
Tulir Asokan e7a1216ef7 Don't redact reactions in chats with relaybot
There are usually other Matrix users, so redacting reactions only from
logged-in users would be weird.
2021-12-30 23:34:14 +02:00
Tulir Asokan b1da5c7c2c Don't alter columns to not null on sqlite 2021-12-30 19:59:41 +02:00
Tulir Asokan 3b72de34b3 Fix some things in dedup changes 2021-12-30 19:41:45 +02:00
Tulir Asokan af893554cc Add support for Matrix->Telegram reactions 2021-12-30 18:32:10 +02:00
Tulir Asokan d108ac5d94 Add support for Telegram->Matrix reactions 2021-12-30 17:43:45 +02:00
Tulir Asokan e446121192 Fix order of operations when syncing contacts 2021-12-30 12:20:36 +02:00
Tulir Asokan afb73b1d17 Add support for bridging spoilers 2021-12-29 22:11:11 +02:00
Tulir Asokan aae8f78cb4 Try to drop identity in addition to default and id_seq in puppet/bot_chat tables
Closes #720
Closes #721

Co-authored-by: Carl Ambroselli <git@carl-ambroselli.de>
2021-12-29 12:47:32 +02:00
26 changed files with 754 additions and 279 deletions
+3
View File
@@ -8,6 +8,9 @@ charset = utf-8
trim_trailing_whitespace = true trim_trailing_whitespace = true
insert_final_newline = true insert_final_newline = true
[*.md]
trim_trailing_whitespace = false
[*.py] [*.py]
max_line_length = 99 max_line_length = 99
+13 -2
View File
@@ -1,7 +1,18 @@
# (unreleased) # v0.11.1 (2021-01-10)
* Added support for message reactions * Added support for message reactions.
* Added support for spoiler text. * Added support for spoiler text.
* Improved support for voice messages.
* Improved color of blue text from Telegram to be more readable on dark themes.
* Fixed syncing contacts throwing an error for new accounts.
* Fixed migrating pre-v0.11 legacy databases if the database schema had been
corrupted (e.g. by using 3rd party tools for SQLite -> Postgres migration).
* Fixed converting animated stickers to webm with >33 FPS.
* Fixed a bug in v0.11.0 that broke mentioning users in groups
(thanks to [@dfuchss] in [#724]).
[@dfuchss]: https://github.com/dfuchss
[#724]: https://github.com/mautrix/telegram/pull/724
# v0.11.0 (2021-12-28) # v0.11.0 (2021-12-28)
+5 -3
View File
@@ -3,6 +3,7 @@
* Matrix → Telegram * Matrix → Telegram
* [x] Message content (text, formatting, files, etc..) * [x] Message content (text, formatting, files, etc..)
* [x] Message redactions * [x] Message redactions
* [x] Message reactions
* [x] Message edits * [x] Message edits
* [ ] ‡ Message history * [ ] ‡ Message history
* [x] Presence * [x] Presence
@@ -12,8 +13,8 @@
* [x] Power level * [x] Power level
* [x] Normal chats * [x] Normal chats
* [ ] Non-hardcoded PL requirements * [ ] Non-hardcoded PL requirements
* [x] Supergroups/channels * [x] Supergroups/channels
* [ ] Precise bridging (non-hardcoded PL requirements, bridge specific permissions, etc..) * [ ] Precise bridging (non-hardcoded PL requirements, bridge specific permissions, etc..)
* [x] Membership actions (invite/kick/join/leave) * [x] Membership actions (invite/kick/join/leave)
* [x] Room metadata changes (name, topic, avatar) * [x] Room metadata changes (name, topic, avatar)
* [x] Initial room metadata * [x] Initial room metadata
@@ -27,6 +28,7 @@
* [x] Games * [x] Games
* [ ] Buttons * [ ] Buttons
* [x] Message deletions * [x] Message deletions
* [x] Message reactions
* [x] Message edits * [x] Message edits
* [x] Message history * [x] Message history
* [x] Manually (`!tg backfill`) * [x] Manually (`!tg backfill`)
@@ -57,7 +59,7 @@
* [x] Option to use own Matrix account for messages sent from other Telegram clients (double puppeting) * [x] Option to use own Matrix account for messages sent from other Telegram clients (double puppeting)
* [ ] ‡ Calls (hard, not yet supported by Telethon) * [ ] ‡ Calls (hard, not yet supported by Telethon)
* [ ] ‡ Secret chats (i.e. End-to-bridge encryption on Telegram) * [ ] ‡ Secret chats (i.e. End-to-bridge encryption on Telegram)
* [x] End-to-bridge encryption in Matrix rooms (see [wiki](https://github.com/tulir/mautrix-telegram/wiki/End%E2%80%90to%E2%80%90bridge-encryption)) * [x] End-to-bridge encryption in Matrix rooms (see [docs](https://docs.mau.fi/bridges/general/end-to-bridge-encryption.html))
† Information not automatically sent from source, i.e. implementation may not be possible † Information not automatically sent from source, i.e. implementation may not be possible
‡ Maybe, i.e. this feature may or may not be implemented at some point ‡ Maybe, i.e. this feature may or may not be implemented at some point
+1 -1
View File
@@ -1,2 +1,2 @@
__version__ = "0.11.0" __version__ = "0.11.1"
__author__ = "Tulir Asokan <tulir@maunium.net>" __author__ = "Tulir Asokan <tulir@maunium.net>"
+9
View File
@@ -46,6 +46,7 @@ from telethon.tl.types import (
UpdateEditChannelMessage, UpdateEditChannelMessage,
UpdateEditMessage, UpdateEditMessage,
UpdateFolderPeers, UpdateFolderPeers,
UpdateMessageReactions,
UpdateNewChannelMessage, UpdateNewChannelMessage,
UpdateNewMessage, UpdateNewMessage,
UpdateNotifySettings, UpdateNotifySettings,
@@ -312,6 +313,8 @@ class AbstractUser(ABC):
await self.delete_message(update) await self.delete_message(update)
elif isinstance(update, UpdateDeleteChannelMessages): elif isinstance(update, UpdateDeleteChannelMessages):
await self.delete_channel_message(update) await self.delete_channel_message(update)
elif isinstance(update, UpdateMessageReactions):
await self.update_reactions(update)
elif isinstance(update, (UpdateChatUserTyping, UpdateChannelUserTyping, UpdateUserTyping)): elif isinstance(update, (UpdateChatUserTyping, UpdateChannelUserTyping, UpdateUserTyping)):
await self.update_typing(update) await self.update_typing(update)
elif isinstance(update, UpdateUserStatus): elif isinstance(update, UpdateUserStatus):
@@ -559,6 +562,12 @@ class AbstractUser(ABC):
await message.delete() await message.delete()
await self._try_redact(message) await self._try_redact(message)
async def update_reactions(self, update: UpdateMessageReactions) -> None:
portal = await po.Portal.get_by_entity(update.peer, tg_receiver=self.tgid)
if not portal or not portal.mxid or not portal.allow_bridging:
return
await portal.handle_telegram_reactions(self, TelegramID(update.msg_id), update.reactions)
async def update_message(self, original_update: UpdateMessage) -> None: async def update_message(self, original_update: UpdateMessage) -> None:
update, sender, portal = await self.get_message_details(original_update) update, sender, portal = await self.get_message_details(original_update)
if not portal: if not portal:
+4 -9
View File
@@ -171,15 +171,10 @@ class Config(BaseBridgeConfig):
copy("bridge.bot_messages_as_notices") copy("bridge.bot_messages_as_notices")
if isinstance(self["bridge.bridge_notices"], bool): if isinstance(self["bridge.bridge_notices"], bool):
base["bridge.bridge_notices"] = { base["bridge.bridge_notices"]["default"] = self["bridge.bridge_notices"]
"default": self["bridge.bridge_notices"],
"exceptions": ["@importantbot:example.com"],
}
else: else:
copy("bridge.bridge_notices") copy("bridge.bridge_notices.default")
copy("bridge.bridge_notices.exceptions")
copy("bridge.deduplication.pre_db_check")
copy("bridge.deduplication.cache_queue_length")
if "bridge.message_formats.m_text" in self: if "bridge.message_formats.m_text" in self:
del self["bridge.message_formats"] del self["bridge.message_formats"]
@@ -208,7 +203,7 @@ class Config(BaseBridgeConfig):
permissions[entry] = "admin" permissions[entry] = "admin"
base["bridge.permissions"] = permissions base["bridge.permissions"] = permissions
else: else:
copy_dict("bridge.permissions") copy_dict("bridge.permissions", override_existing_map=True)
if "bridge.relaybot" not in self: if "bridge.relaybot" not in self:
copy("bridge.authless_relaybot_portals", "bridge.relaybot.authless_portals") copy("bridge.authless_relaybot_portals", "bridge.relaybot.authless_portals")
+3 -1
View File
@@ -19,6 +19,7 @@ from .bot_chat import BotChat
from .message import Message from .message import Message
from .portal import Portal from .portal import Portal
from .puppet import Puppet from .puppet import Puppet
from .reaction import Reaction
from .telegram_file import TelegramFile from .telegram_file import TelegramFile
from .telethon_session import PgSession from .telethon_session import PgSession
from .upgrade import upgrade_table from .upgrade import upgrade_table
@@ -26,7 +27,7 @@ from .user import User
def init(db: Database) -> None: def init(db: Database) -> None:
for table in (Portal, Message, User, Puppet, TelegramFile, BotChat, PgSession): for table in (Portal, Message, Reaction, User, Puppet, TelegramFile, BotChat, PgSession):
table.db = db table.db = db
@@ -35,6 +36,7 @@ __all__ = [
"init", "init",
"Portal", "Portal",
"Message", "Message",
"Reaction",
"User", "User",
"Puppet", "Puppet",
"TelegramFile", "TelegramFile",
+24 -8
View File
@@ -38,6 +38,7 @@ class Message:
tg_space: TelegramID tg_space: TelegramID
edit_index: int edit_index: int
redacted: bool = False redacted: bool = False
content_hash: bytes | None = None
@classmethod @classmethod
def _from_row(cls, row: Record | None) -> Message | None: def _from_row(cls, row: Record | None) -> Message | None:
@@ -45,7 +46,7 @@ class Message:
return None return None
return cls(**row) return cls(**row)
columns: ClassVar[str] = "mxid, mx_room, tgid, tg_space, edit_index, redacted" columns: ClassVar[str] = "mxid, mx_room, tgid, tg_space, edit_index, redacted, content_hash"
@classmethod @classmethod
async def get_all_by_tgid(cls, tgid: TelegramID, tg_space: TelegramID) -> list[Message]: async def get_all_by_tgid(cls, tgid: TelegramID, tg_space: TelegramID) -> list[Message]:
@@ -142,14 +143,29 @@ class Message:
q = "UPDATE message SET mxid=$1 WHERE mxid=$2 AND mx_room=$3" q = "UPDATE message SET mxid=$1 WHERE mxid=$2 AND mx_room=$3"
await cls.db.execute(q, real_mxid, temp_mxid, mx_room) await cls.db.execute(q, real_mxid, temp_mxid, mx_room)
@classmethod
async def delete_temp_mxid(cls, temp_mxid: str, mx_room: RoomID) -> None:
q = "DELETE FROM message WHERE mxid=$1 AND mx_room=$2"
await cls.db.execute(q, temp_mxid, mx_room)
@property
def _values(self):
return (
self.mxid,
self.mx_room,
self.tgid,
self.tg_space,
self.edit_index,
self.redacted,
self.content_hash,
)
async def insert(self) -> None: async def insert(self) -> None:
q = ( q = """
"INSERT INTO message (mxid, mx_room, tgid, tg_space, edit_index, redacted) " INSERT INTO message (mxid, mx_room, tgid, tg_space, edit_index, redacted, content_hash)
"VALUES ($1, $2, $3, $4, $5, $6)" VALUES ($1, $2, $3, $4, $5, $6, $7)
) """
await self.db.execute( await self.db.execute(q, *self._values)
q, self.mxid, self.mx_room, self.tgid, self.tg_space, self.edit_index, self.redacted
)
async def delete(self) -> None: async def delete(self) -> None:
q = "DELETE FROM message WHERE mxid=$1 AND mx_room=$2 AND tg_space=$3" q = "DELETE FROM message WHERE mxid=$1 AND mx_room=$2 AND tg_space=$3"
+91
View File
@@ -0,0 +1,91 @@
# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2021 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from typing import TYPE_CHECKING, ClassVar
from asyncpg import Record
from attr import dataclass
from mautrix.types import EventID, RoomID
from mautrix.util.async_db import Database
from ..types import TelegramID
fake_db = Database.create("") if TYPE_CHECKING else None
@dataclass
class Reaction:
db: ClassVar[Database] = fake_db
mxid: EventID
mx_room: RoomID
msg_mxid: EventID
tg_sender: TelegramID
reaction: str
@classmethod
def _from_row(cls, row: Record | None) -> Reaction | None:
if row is None:
return None
return cls(**row)
columns: ClassVar[str] = "mxid, mx_room, msg_mxid, tg_sender, reaction"
@classmethod
async def delete_all(cls, mx_room: RoomID) -> None:
await cls.db.execute("DELETE FROM reaction WHERE mx_room=$1", mx_room)
@classmethod
async def get_by_mxid(cls, mxid: EventID, mx_room: RoomID) -> Reaction | None:
q = f"SELECT {cls.columns} FROM reaction WHERE mxid=$1 AND mx_room=$2"
return cls._from_row(await cls.db.fetchrow(q, mxid, mx_room))
@classmethod
async def get_by_sender(
cls, mxid: EventID, mx_room: RoomID, tg_sender: TelegramID
) -> Reaction | None:
q = f"SELECT {cls.columns} FROM reaction WHERE msg_mxid=$1 AND mx_room=$2 AND tg_sender=$3"
return cls._from_row(await cls.db.fetchrow(q, mxid, mx_room, tg_sender))
@classmethod
async def get_all_by_message(cls, mxid: EventID, mx_room: RoomID) -> list[Reaction]:
q = f"SELECT {cls.columns} FROM reaction WHERE msg_mxid=$1 AND mx_room=$2"
rows = await cls.db.fetch(q, mxid, mx_room)
return [cls._from_row(row) for row in rows]
@property
def _values(self):
return (
self.mxid,
self.mx_room,
self.msg_mxid,
self.tg_sender,
self.reaction,
)
async def save(self) -> None:
q = """
INSERT INTO reaction (mxid, mx_room, msg_mxid, tg_sender, reaction)
VALUES ($1, $2, $3, $4, $5) ON CONFLICT (msg_mxid, mx_room, tg_sender)
DO UPDATE SET mxid=$1, reaction=$5
"""
await self.db.execute(q, *self._values)
async def delete(self) -> None:
q = "DELETE FROM reaction WHERE msg_mxid=$1 AND mx_room=$2 AND tg_sender=$3"
await self.db.execute(q, self.msg_mxid, self.mx_room, self.tg_sender)
+1 -1
View File
@@ -2,4 +2,4 @@ from mautrix.util.async_db import UpgradeTable
upgrade_table = UpgradeTable() upgrade_table = UpgradeTable()
from . import v01_initial_revision, v02_sponsored_events from . import v01_initial_revision, v02_sponsored_events, v03_reactions
@@ -13,6 +13,8 @@
# #
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from asyncpg import Connection from asyncpg import Connection
from . import upgrade_table from . import upgrade_table
@@ -38,6 +40,16 @@ async def upgrade_v1(conn: Connection, scheme: str) -> None:
await create_v1_tables(conn) await create_v1_tables(conn)
async def drop_constraints(conn: Connection, table: str, contype: str) -> None:
q = (
"SELECT conname FROM pg_constraint con INNER JOIN pg_class rel ON rel.oid=con.conrelid "
f"WHERE rel.relname='{table}' AND contype='{contype}'"
)
names = [row["conname"] for row in await conn.fetch(q)]
drops = ", ".join(f"DROP CONSTRAINT {name}" for name in names)
await conn.execute(f"ALTER TABLE {table} {drops}")
async def migrate_legacy_to_v1(conn: Connection, scheme: str) -> None: async def migrate_legacy_to_v1(conn: Connection, scheme: str) -> None:
legacy_version = await conn.fetchval(legacy_version_query) legacy_version = await conn.fetchval(legacy_version_query)
if legacy_version != last_legacy_version: if legacy_version != last_legacy_version:
@@ -46,37 +58,38 @@ async def migrate_legacy_to_v1(conn: Connection, scheme: str) -> None:
"Please upgrade the old database with alembic or drop it completely first." "Please upgrade the old database with alembic or drop it completely first."
) )
if scheme != "sqlite": if scheme != "sqlite":
await drop_constraints(conn, "contact", contype="f")
await conn.execute( await conn.execute(
""" """
ALTER TABLE contact ALTER TABLE contact
DROP CONSTRAINT contact_user_fkey,
DROP CONSTRAINT contact_contact_fkey,
ADD CONSTRAINT contact_user_fkey FOREIGN KEY (contact) REFERENCES puppet(id) ADD CONSTRAINT contact_user_fkey FOREIGN KEY (contact) REFERENCES puppet(id)
ON DELETE CASCADE ON UPDATE CASCADE, ON DELETE CASCADE ON UPDATE CASCADE,
ADD CONSTRAINT contact_contact_fkey FOREIGN KEY ("user") REFERENCES "user"(tgid) ADD CONSTRAINT contact_contact_fkey FOREIGN KEY ("user") REFERENCES "user"(tgid)
ON DELETE CASCADE ON UPDATE CASCADE ON DELETE CASCADE ON UPDATE CASCADE
""" """
) )
await drop_constraints(conn, "telethon_sessions", contype="p")
await conn.execute( await conn.execute(
""" """
ALTER TABLE telethon_sessions ALTER TABLE telethon_sessions
DROP CONSTRAINT telethon_sessions_pkey,
ADD CONSTRAINT telethon_sessions_pkey PRIMARY KEY (session_id) ADD CONSTRAINT telethon_sessions_pkey PRIMARY KEY (session_id)
""" """
) )
await drop_constraints(conn, "telegram_file", contype="f")
await conn.execute( await conn.execute(
""" """
ALTER TABLE telegram_file ALTER TABLE telegram_file
DROP CONSTRAINT fk_file_thumbnail,
ADD CONSTRAINT fk_file_thumbnail ADD CONSTRAINT fk_file_thumbnail
FOREIGN KEY (thumbnail) REFERENCES telegram_file(id) FOREIGN KEY (thumbnail) REFERENCES telegram_file(id)
ON UPDATE CASCADE ON DELETE SET NULL ON UPDATE CASCADE ON DELETE SET NULL
""" """
) )
await conn.execute("ALTER TABLE puppet ALTER COLUMN id DROP IDENTITY IF EXISTS")
await conn.execute("ALTER TABLE puppet ALTER COLUMN id DROP DEFAULT") await conn.execute("ALTER TABLE puppet ALTER COLUMN id DROP DEFAULT")
await conn.execute("DROP SEQUENCE puppet_id_seq") await conn.execute("DROP SEQUENCE IF EXISTS puppet_id_seq")
await conn.execute("ALTER TABLE bot_chat ALTER COLUMN id DROP IDENTITY IF EXISTS")
await conn.execute("ALTER TABLE bot_chat ALTER COLUMN id DROP DEFAULT") await conn.execute("ALTER TABLE bot_chat ALTER COLUMN id DROP DEFAULT")
await conn.execute("DROP SEQUENCE bot_chat_id_seq") await conn.execute("DROP SEQUENCE IF EXISTS bot_chat_id_seq")
await conn.execute("ALTER TABLE portal ALTER COLUMN config TYPE jsonb USING config::jsonb") await conn.execute("ALTER TABLE portal ALTER COLUMN config TYPE jsonb USING config::jsonb")
await conn.execute( await conn.execute(
"ALTER TABLE telegram_file ALTER COLUMN decryption_info TYPE jsonb " "ALTER TABLE telegram_file ALTER COLUMN decryption_info TYPE jsonb "
@@ -0,0 +1,39 @@
# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2021 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from asyncpg import Connection
from . import upgrade_table
@upgrade_table.register(description="Add support for reactions")
async def upgrade_v3(conn: Connection, scheme: str) -> None:
await conn.execute(
"""CREATE TABLE reaction (
mxid TEXT NOT NULL,
mx_room TEXT NOT NULL,
msg_mxid TEXT NOT NULL,
tg_sender BIGINT,
reaction TEXT NOT NULL,
PRIMARY KEY (msg_mxid, mx_room, tg_sender),
UNIQUE (mxid, mx_room)
)"""
)
if scheme != "sqlite":
await conn.execute("DELETE FROM message WHERE mxid IS NULL OR mx_room IS NULL")
await conn.execute("ALTER TABLE message ALTER COLUMN mxid SET NOT NULL")
await conn.execute("ALTER TABLE message ALTER COLUMN mx_room SET NOT NULL")
await conn.execute("ALTER TABLE message ADD COLUMN content_hash bytea")
-10
View File
@@ -331,16 +331,6 @@ bridge:
exceptions: exceptions:
- "@importantbot:example.com" - "@importantbot:example.com"
# Some config options related to Telegram message deduplication.
# The default values are usually fine, but some debug messages/warnings might recommend you
# change these.
deduplication:
# Whether or not to check the database if the message about to be sent is a duplicate.
pre_db_check: false
# The number of latest events to keep when checking for duplicates.
# You might need to increase this on high-traffic bridge instances.
cache_queue_length: 20
# The formats to use when sending messages to Telegram via the relay bot. # The formats to use when sending messages to Telegram via the relay bot.
# Text msgtypes (m.text, m.notice and m.emote) support HTML, media msgtypes don't. # Text msgtypes (m.text, m.notice and m.emote) support HTML, media msgtypes don't.
# #
@@ -42,8 +42,8 @@ class MatrixParser(BaseMatrixParser[TelegramMessage]):
async def custom_node_to_fstring( async def custom_node_to_fstring(
self, node: HTMLNode, ctx: RecursionContext self, node: HTMLNode, ctx: RecursionContext
) -> TelegramMessage | None: ) -> TelegramMessage | None:
msg = await self.tag_aware_parse_node(node, ctx)
if node.tag == "command": if node.tag == "command":
msg = await self.tag_aware_parse_node(node, ctx)
return msg.prepend("/").format(TelegramEntityType.COMMAND) return msg.prepend("/").format(TelegramEntityType.COMMAND)
return None return None
@@ -59,7 +59,7 @@ class MatrixParser(BaseMatrixParser[TelegramMessage]):
displayname = user.plain_displayname or msg.text displayname = user.plain_displayname or msg.text
msg = TelegramMessage(displayname) msg = TelegramMessage(displayname)
try: try:
input_entity = self.client.get_input_entity(user.tgid) input_entity = await self.client.get_input_entity(user.tgid)
except (ValueError, TypeError) as e: except (ValueError, TypeError) as e:
log.trace(f"Dropping mention of {user.tgid}: {e}") log.trace(f"Dropping mention of {user.tgid}: {e}")
else: else:
@@ -95,5 +95,8 @@ class MatrixParser(BaseMatrixParser[TelegramMessage]):
async def color_to_fstring(self, msg: TelegramMessage, color: str) -> TelegramMessage: async def color_to_fstring(self, msg: TelegramMessage, color: str) -> TelegramMessage:
return msg return msg
async def spoiler_to_fstring(self, msg: TelegramMessage, spoiler: str) -> TelegramMessage: async def spoiler_to_fstring(self, msg: TelegramMessage, reason: str) -> TelegramMessage:
msg = msg.format(self.e.SPOILER)
if reason:
msg = msg.prepend(f"{reason}: ")
return msg return msg
@@ -29,6 +29,7 @@ from telethon.tl.types import (
MessageEntityMention as Mention, MessageEntityMention as Mention,
MessageEntityMentionName as MentionName, MessageEntityMentionName as MentionName,
MessageEntityPre as Pre, MessageEntityPre as Pre,
MessageEntitySpoiler as Spoiler,
MessageEntityStrike as Strike, MessageEntityStrike as Strike,
MessageEntityTextUrl as TextURL, MessageEntityTextUrl as TextURL,
MessageEntityUnderline as Underline, MessageEntityUnderline as Underline,
@@ -55,6 +56,7 @@ class TelegramEntityType(Enum):
MENTION = Mention MENTION = Mention
MENTION_NAME = InputMentionName MENTION_NAME = InputMentionName
COMMAND = Command COMMAND = Command
SPOILER = Spoiler
USER_MENTION = 1 USER_MENTION = 1
ROOM_MENTION = 2 ROOM_MENTION = 2
+10 -4
View File
@@ -35,6 +35,7 @@ from telethon.tl.types import (
MessageEntityMentionName, MessageEntityMentionName,
MessageEntityPhone, MessageEntityPhone,
MessageEntityPre, MessageEntityPre,
MessageEntitySpoiler,
MessageEntityStrike, MessageEntityStrike,
MessageEntityTextUrl, MessageEntityTextUrl,
MessageEntityUnderline, MessageEntityUnderline,
@@ -289,10 +290,15 @@ async def _telegram_entities_to_matrix(
skip_entity = await _parse_url( skip_entity = await _parse_url(
html, entity_text, entity.url if entity_type == MessageEntityTextUrl else None html, entity_text, entity.url if entity_type == MessageEntityTextUrl else None
) )
elif entity_type == MessageEntityBotCommand: elif entity_type in (
html.append(f"<font color='blue'>{entity_text}</font>") MessageEntityBotCommand,
elif entity_type in (MessageEntityHashtag, MessageEntityCashtag, MessageEntityPhone): MessageEntityHashtag,
html.append(f"<font color='blue'>{entity_text}</font>") MessageEntityCashtag,
MessageEntityPhone,
):
html.append(f"<font color='#3771bb'>{entity_text}</font>")
elif entity_type == MessageEntitySpoiler:
html.append(f"<span data-mx-spoiler>{entity_text}</span>")
else: else:
skip_entity = True skip_entity = True
last_offset = relative_offset + (0 if skip_entity else entity.length) last_offset = relative_offset + (0 if skip_entity else entity.length)
+18 -4
View File
@@ -15,9 +15,8 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING, Iterable from typing import TYPE_CHECKING
from mautrix.appservice import DOUBLE_PUPPET_SOURCE_KEY
from mautrix.bridge import BaseMatrixHandler from mautrix.bridge import BaseMatrixHandler
from mautrix.errors import MatrixError from mautrix.errors import MatrixError
from mautrix.types import ( from mautrix.types import (
@@ -28,9 +27,8 @@ from mautrix.types import (
MessageType, MessageType,
PresenceEvent, PresenceEvent,
PresenceState, PresenceState,
ReactionEvent,
ReceiptEvent, ReceiptEvent,
ReceiptEventContent,
ReceiptType,
RedactionEvent, RedactionEvent,
RoomAvatarStateEventContent as AvatarContent, RoomAvatarStateEventContent as AvatarContent,
RoomID, RoomID,
@@ -278,6 +276,20 @@ class MatrixHandler(BaseMatrixHandler):
await portal.handle_matrix_deletion(sender, evt.redacts, evt.event_id) await portal.handle_matrix_deletion(sender, evt.redacts, evt.event_id)
@staticmethod
async def handle_reaction(evt: ReactionEvent) -> None:
sender = await u.User.get_and_start_by_mxid(evt.sender)
if not await sender.has_full_access():
return
portal = await po.Portal.get_by_mxid(evt.room_id)
if not portal or not portal.allow_bridging:
return
await portal.handle_matrix_reaction(
sender, evt.content.relates_to.event_id, evt.content.relates_to.key, evt.event_id
)
@staticmethod @staticmethod
async def handle_power_levels(evt: StateEvent) -> None: async def handle_power_levels(evt: StateEvent) -> None:
portal = await po.Portal.get_by_mxid(evt.room_id) portal = await po.Portal.get_by_mxid(evt.room_id)
@@ -400,6 +412,8 @@ class MatrixHandler(BaseMatrixHandler):
async def handle_event(self, evt: Event) -> None: async def handle_event(self, evt: Event) -> None:
if evt.type == EventType.ROOM_REDACTION: if evt.type == EventType.ROOM_REDACTION:
await self.handle_redaction(evt) await self.handle_redaction(evt)
elif evt.type == EventType.REACTION:
await self.handle_reaction(evt)
async def handle_state_event(self, evt: StateEvent) -> None: async def handle_state_event(self, evt: StateEvent) -> None:
if evt.type == EventType.ROOM_POWER_LEVELS: if evt.type == EventType.ROOM_POWER_LEVELS:
+364 -74
View File
@@ -45,6 +45,7 @@ from telethon.errors import (
PhotoExtInvalidError, PhotoExtInvalidError,
PhotoInvalidDimensionsError, PhotoInvalidDimensionsError,
PhotoSaveFileInvalidError, PhotoSaveFileInvalidError,
ReactionInvalidError,
RPCError, RPCError,
) )
from telethon.tl.functions.channels import ( from telethon.tl.functions.channels import (
@@ -63,7 +64,9 @@ from telethon.tl.functions.messages import (
EditChatPhotoRequest, EditChatPhotoRequest,
EditChatTitleRequest, EditChatTitleRequest,
ExportChatInviteRequest, ExportChatInviteRequest,
GetMessageReactionsListRequest,
MigrateChatRequest, MigrateChatRequest,
SendReactionRequest,
SetTypingRequest, SetTypingRequest,
UnpinAllMessagesRequest, UnpinAllMessagesRequest,
UpdatePinnedMessageRequest, UpdatePinnedMessageRequest,
@@ -78,6 +81,7 @@ from telethon.tl.types import (
ChatPhotoEmpty, ChatPhotoEmpty,
Document, Document,
DocumentAttributeAnimated, DocumentAttributeAnimated,
DocumentAttributeAudio,
DocumentAttributeFilename, DocumentAttributeFilename,
DocumentAttributeImageSize, DocumentAttributeImageSize,
DocumentAttributeSticker, DocumentAttributeSticker,
@@ -112,6 +116,8 @@ from telethon.tl.types import (
MessageMediaPhoto, MessageMediaPhoto,
MessageMediaPoll, MessageMediaPoll,
MessageMediaUnsupported, MessageMediaUnsupported,
MessageReactions,
MessageUserReaction,
PeerChannel, PeerChannel,
PeerChat, PeerChat,
PeerUser, PeerUser,
@@ -122,6 +128,7 @@ from telethon.tl.types import (
PhotoSizeEmpty, PhotoSizeEmpty,
PhotoSizeProgressive, PhotoSizeProgressive,
Poll, Poll,
ReactionCount,
SendMessageCancelAction, SendMessageCancelAction,
SendMessageTypingAction, SendMessageTypingAction,
SponsoredMessage, SponsoredMessage,
@@ -146,6 +153,7 @@ from telethon.tl.types import (
UserProfilePhoto, UserProfilePhoto,
UserProfilePhotoEmpty, UserProfilePhotoEmpty,
) )
from telethon.utils import decode_waveform
import magic import magic
from mautrix.appservice import DOUBLE_PUPPET_SOURCE_KEY, IntentAPI from mautrix.appservice import DOUBLE_PUPPET_SOURCE_KEY, IntentAPI
@@ -177,13 +185,19 @@ from mautrix.types import (
UserID, UserID,
VideoInfo, VideoInfo,
) )
from mautrix.util import variation_selector
from mautrix.util.message_send_checkpoint import MessageSendCheckpointStatus from mautrix.util.message_send_checkpoint import MessageSendCheckpointStatus
from mautrix.util.simple_lock import SimpleLock from mautrix.util.simple_lock import SimpleLock
from mautrix.util.simple_template import SimpleTemplate from mautrix.util.simple_template import SimpleTemplate
from . import abstract_user as au, formatter, portal_util as putil, puppet as p, user as u, util from . import abstract_user as au, formatter, portal_util as putil, puppet as p, user as u, util
from .config import Config from .config import Config
from .db import Message as DBMessage, Portal as DBPortal, TelegramFile as DBTelegramFile from .db import (
Message as DBMessage,
Portal as DBPortal,
Reaction as DBReaction,
TelegramFile as DBTelegramFile,
)
from .tgclient import MautrixTelegramClient from .tgclient import MautrixTelegramClient
from .types import TelegramID from .types import TelegramID
from .util import sane_mimetypes from .util import sane_mimetypes
@@ -206,6 +220,10 @@ TypeChatPhoto = Union[ChatPhoto, ChatPhotoEmpty, Photo, PhotoEmpty]
MediaHandler = Callable[["au.AbstractUser", IntentAPI, Message, RelatesTo], Awaitable[EventID]] MediaHandler = Callable[["au.AbstractUser", IntentAPI, Message, RelatesTo], Awaitable[EventID]]
class BridgingError(Exception):
pass
class DocAttrs(NamedTuple): class DocAttrs(NamedTuple):
name: str | None name: str | None
mime_type: str | None mime_type: str | None
@@ -214,6 +232,10 @@ class DocAttrs(NamedTuple):
width: int width: int
height: int height: int
is_gif: bool is_gif: bool
is_audio: bool
is_voice: bool
duration: int
waveform: bytes
class Portal(DBPortal, BasePortal): class Portal(DBPortal, BasePortal):
@@ -248,6 +270,7 @@ class Portal(DBPortal, BasePortal):
dedup: putil.PortalDedup dedup: putil.PortalDedup
send_lock: putil.PortalSendLock send_lock: putil.PortalSendLock
reaction_lock: putil.PortalReactionLock
_pin_lock: asyncio.Lock _pin_lock: asyncio.Lock
_main_intent: IntentAPI | None _main_intent: IntentAPI | None
@@ -307,6 +330,7 @@ class Portal(DBPortal, BasePortal):
self.dedup = putil.PortalDedup(self) self.dedup = putil.PortalDedup(self)
self.send_lock = putil.PortalSendLock() self.send_lock = putil.PortalSendLock()
self.reaction_lock = putil.PortalReactionLock()
self._pin_lock = asyncio.Lock() self._pin_lock = asyncio.Lock()
self._room_create_lock = asyncio.Lock() self._room_create_lock = asyncio.Lock()
@@ -405,10 +429,6 @@ class Portal(DBPortal, BasePortal):
) )
NotificationDisabler.puppet_cls = p.Puppet NotificationDisabler.puppet_cls = p.Puppet
NotificationDisabler.config_enabled = cls.config["bridge.backfill.disable_notifications"] NotificationDisabler.config_enabled = cls.config["bridge.backfill.disable_notifications"]
putil.PortalDedup.dedup_pre_db_check = cls.config["bridge.deduplication.pre_db_check"]
putil.PortalDedup.dedup_cache_queue_length = cls.config[
"bridge.deduplication.cache_queue_length"
]
# endregion # endregion
# region Matrix -> Telegram metadata # region Matrix -> Telegram metadata
@@ -1451,35 +1471,27 @@ class Portal(DBPortal, BasePortal):
if content.get_edit(): if content.get_edit():
orig_msg = await DBMessage.get_by_mxid(content.get_edit(), self.mxid, space) orig_msg = await DBMessage.get_by_mxid(content.get_edit(), self.mxid, space)
if orig_msg: if orig_msg:
response = await client.edit_message( resp = await client.edit_message(
self.peer, self.peer,
orig_msg.tgid, orig_msg.tgid,
message, message,
formatting_entities=entities, formatting_entities=entities,
link_preview=lp, link_preview=lp,
) )
await self._add_telegram_message_to_db(event_id, space, -1, response) await self._mark_matrix_handled(
sender, EventType.ROOM_MESSAGE, event_id, space, -1, resp, content.msgtype
)
return return
try: response = await client.send_message(
response = await client.send_message( self.peer,
self.peer, message,
message, reply_to=reply_to,
reply_to=reply_to, formatting_entities=entities,
formatting_entities=entities, link_preview=lp,
link_preview=lp, )
) await self._mark_matrix_handled(
except Exception: sender, EventType.ROOM_MESSAGE, event_id, space, 0, response, content.msgtype
raise )
else:
sender.send_remote_checkpoint(
MessageSendCheckpointStatus.SUCCESS,
event_id,
self.mxid,
EventType.ROOM_MESSAGE,
message_type=content.msgtype,
)
await self._add_telegram_message_to_db(event_id, space, 0, response)
await self._send_delivery_receipt(event_id)
async def _handle_matrix_file( async def _handle_matrix_file(
self, self,
@@ -1508,7 +1520,7 @@ class Portal(DBPortal, BasePortal):
else: else:
if content.file: if content.file:
if not decrypt_attachment: if not decrypt_attachment:
raise Exception( raise BridgingError(
f"Can't bridge encrypted media event {event_id}: " f"Can't bridge encrypted media event {event_id}: "
"encryption dependencies not installed" "encryption dependencies not installed"
) )
@@ -1555,7 +1567,9 @@ class Portal(DBPortal, BasePortal):
) )
async with self.send_lock(sender_id): async with self.send_lock(sender_id):
if await self._matrix_document_edit(client, content, space, capt, media, event_id): if await self._matrix_document_edit(
sender, client, content, space, capt, media, event_id
):
return return
try: try:
try: try:
@@ -1576,18 +1590,13 @@ class Portal(DBPortal, BasePortal):
except Exception: except Exception:
raise raise
else: else:
sender.send_remote_checkpoint( await self._mark_matrix_handled(
MessageSendCheckpointStatus.SUCCESS, sender, EventType.ROOM_MESSAGE, event_id, space, 0, response, content.msgtype
event_id,
self.mxid,
EventType.ROOM_MESSAGE,
message_type=content.msgtype,
) )
await self._add_telegram_message_to_db(event_id, space, 0, response)
await self._send_delivery_receipt(event_id)
async def _matrix_document_edit( async def _matrix_document_edit(
self, self,
sender: u.User,
client: MautrixTelegramClient, client: MautrixTelegramClient,
content: MessageEventContent, content: MessageEventContent,
space: TelegramID, space: TelegramID,
@@ -1599,8 +1608,9 @@ class Portal(DBPortal, BasePortal):
orig_msg = await DBMessage.get_by_mxid(content.get_edit(), self.mxid, space) orig_msg = await DBMessage.get_by_mxid(content.get_edit(), self.mxid, space)
if orig_msg: if orig_msg:
response = await client.edit_message(self.peer, orig_msg.tgid, caption, file=media) response = await client.edit_message(self.peer, orig_msg.tgid, caption, file=media)
await self._add_telegram_message_to_db(event_id, space, -1, response) await self._mark_matrix_handled(
await self._send_delivery_receipt(event_id) sender, EventType.ROOM_MESSAGE, event_id, space, -1, response, content.msgtype
)
return True return True
return False return False
@@ -1625,7 +1635,9 @@ class Portal(DBPortal, BasePortal):
media = MessageMediaGeo(geo=GeoPoint(lat=lat, long=long, access_hash=0)) media = MessageMediaGeo(geo=GeoPoint(lat=lat, long=long, access_hash=0))
async with self.send_lock(sender_id): async with self.send_lock(sender_id):
if await self._matrix_document_edit(client, content, space, caption, media, event_id): if await self._matrix_document_edit(
sender, client, content, space, caption, media, event_id
):
return return
try: try:
response = await client.send_media( response = await client.send_media(
@@ -1634,21 +1646,22 @@ class Portal(DBPortal, BasePortal):
except Exception: except Exception:
raise raise
else: else:
await self._add_telegram_message_to_db(event_id, space, 0, response) await self._mark_matrix_handled(
sender.send_remote_checkpoint( sender, EventType.ROOM_MESSAGE, event_id, space, 0, response, content.msgtype
MessageSendCheckpointStatus.SUCCESS,
event_id,
self.mxid,
EventType.ROOM_MESSAGE,
message_type=content.msgtype,
) )
await self._send_delivery_receipt(event_id)
async def _add_telegram_message_to_db( async def _mark_matrix_handled(
self, event_id: EventID, space: TelegramID, edit_index: int, response: TypeMessage self,
sender: u.User,
event_type: EventType,
event_id: EventID,
space: TelegramID,
edit_index: int,
response: TypeMessage,
msgtype: MessageType | None = None,
) -> None: ) -> None:
self.log.trace("Handled Matrix message: %s", response) self.log.trace("Handled Matrix message: %s", response)
self.dedup.check(response, (event_id, space), force_hash=edit_index != 0) event_hash, _ = self.dedup.check(response, (event_id, space), force_hash=edit_index != 0)
if edit_index < 0: if edit_index < 0:
prev_edit = await DBMessage.get_one_by_tgid(TelegramID(response.id), space, -1) prev_edit = await DBMessage.get_one_by_tgid(TelegramID(response.id), space, -1)
edit_index = prev_edit.edit_index + 1 edit_index = prev_edit.edit_index + 1
@@ -1658,7 +1671,16 @@ class Portal(DBPortal, BasePortal):
mx_room=self.mxid, mx_room=self.mxid,
mxid=event_id, mxid=event_id,
edit_index=edit_index, edit_index=edit_index,
content_hash=event_hash,
).insert() ).insert()
sender.send_remote_checkpoint(
MessageSendCheckpointStatus.SUCCESS,
event_id,
self.mxid,
event_type,
message_type=msgtype,
)
await self._send_delivery_receipt(event_id)
async def _send_bridge_error( async def _send_bridge_error(
self, self,
@@ -1737,7 +1759,7 @@ class Portal(DBPortal, BasePortal):
bridge_notices = self.get_config("bridge_notices.default") bridge_notices = self.get_config("bridge_notices.default")
excepted = sender.mxid in self.get_config("bridge_notices.exceptions") excepted = sender.mxid in self.get_config("bridge_notices.exceptions")
if not bridge_notices and not excepted: if not bridge_notices and not excepted:
raise Exception("Notices are not configured to be bridged.") raise BridgingError("Notices are not configured to be bridged.")
if content.msgtype in (MessageType.TEXT, MessageType.EMOTE, MessageType.NOTICE): if content.msgtype in (MessageType.TEXT, MessageType.EMOTE, MessageType.NOTICE):
await self._pre_process_matrix_message(sender, not logged_in, content) await self._pre_process_matrix_message(sender, not logged_in, content)
@@ -1770,7 +1792,7 @@ class Portal(DBPortal, BasePortal):
f"Didn't handle Matrix event {event_id} due to unknown msgtype {content.msgtype}" f"Didn't handle Matrix event {event_id} due to unknown msgtype {content.msgtype}"
) )
self.log.trace("Unhandled Matrix event content: %s", content) self.log.trace("Unhandled Matrix event content: %s", content)
raise Exception(f"Unhandled msgtype {content.msgtype}") raise BridgingError(f"Unhandled msgtype {content.msgtype}")
async def handle_matrix_unpin_all(self, sender: u.User, pin_event_id: EventID) -> None: async def handle_matrix_unpin_all(self, sender: u.User, pin_event_id: EventID) -> None:
await sender.client(UnpinAllMessagesRequest(peer=self.peer)) await sender.client(UnpinAllMessagesRequest(peer=self.peer))
@@ -1800,9 +1822,12 @@ class Portal(DBPortal, BasePortal):
) -> None: ) -> None:
try: try:
await self._handle_matrix_deletion(deleter, event_id) await self._handle_matrix_deletion(deleter, event_id)
except Exception as e: except BridgingError as e:
self.log.debug(str(e)) self.log.debug(str(e))
await self._send_bridge_error(deleter, e, redaction_event_id, EventType.ROOM_REDACTION) await self._send_bridge_error(deleter, e, redaction_event_id, EventType.ROOM_REDACTION)
except Exception as e:
self.log.exception(f"Failed to bridge redaction by {deleter.mxid}")
await self._send_bridge_error(deleter, e, redaction_event_id, EventType.ROOM_REDACTION)
else: else:
deleter.send_remote_checkpoint( deleter.send_remote_checkpoint(
MessageSendCheckpointStatus.SUCCESS, MessageSendCheckpointStatus.SUCCESS,
@@ -1812,26 +1837,106 @@ class Portal(DBPortal, BasePortal):
) )
await self._send_delivery_receipt(redaction_event_id) await self._send_delivery_receipt(redaction_event_id)
async def _handle_matrix_reaction_deletion(
self, deleter: u.User, event_id: EventID, tg_space: TelegramID
) -> None:
reaction = await DBReaction.get_by_mxid(event_id, self.mxid)
if not reaction:
raise BridgingError(f"Ignoring Matrix redaction of unknown event {event_id}")
elif reaction.tg_sender != deleter.tgid:
raise BridgingError(f"Ignoring Matrix redaction of reaction by another user")
reaction_target = await DBMessage.get_by_mxid(
reaction.msg_mxid, reaction.mx_room, tg_space
)
if not reaction_target or reaction_target.redacted:
raise BridgingError(
f"Ignoring Matrix redaction of reaction to unknown event {reaction.msg_mxid}"
)
async with self.reaction_lock(reaction_target.mxid):
await reaction.delete()
await deleter.client(SendReactionRequest(peer=self.peer, msg_id=reaction_target.tgid))
async def _handle_matrix_deletion(self, deleter: u.User, event_id: EventID) -> None: async def _handle_matrix_deletion(self, deleter: u.User, event_id: EventID) -> None:
real_deleter = deleter if not await deleter.needs_relaybot(self) else self.bot real_deleter = deleter if not await deleter.needs_relaybot(self) else self.bot
space = self.tgid if self.peer_type == "channel" else real_deleter.tgid tg_space = self.tgid if self.peer_type == "channel" else real_deleter.tgid
message = await DBMessage.get_by_mxid(event_id, self.mxid, space) message = await DBMessage.get_by_mxid(event_id, self.mxid, tg_space)
if not message: if not message:
raise Exception(f"Ignoring Matrix redaction of unknown event {event_id}") await self._handle_matrix_reaction_deletion(real_deleter, event_id, tg_space)
elif message.redacted: elif message.redacted:
raise Exception( raise BridgingError(
"Ignoring Matrix redaction of already redacted event " "Ignoring Matrix redaction of already redacted event "
f"{message.mxid} in {message.mx_room}" f"{message.mxid} in {message.mx_room}"
) )
elif message.edit_index != 0: elif message.edit_index != 0:
await message.mark_redacted() await message.mark_redacted()
raise Exception( raise BridgingError(
f"Ignoring Matrix redaction of edit event {message.mxid} in {message.mx_room}" f"Ignoring Matrix redaction of edit event {message.mxid} in {message.mx_room}"
) )
else: else:
await message.mark_redacted() await message.mark_redacted()
await real_deleter.client.delete_messages(self.peer, [message.tgid]) await real_deleter.client.delete_messages(self.peer, [message.tgid])
async def handle_matrix_reaction(
self, user: u.User, target_event_id: EventID, reaction: str, reaction_event_id: EventID
) -> None:
try:
async with self.reaction_lock(target_event_id):
await self._handle_matrix_reaction(
user, target_event_id, reaction, reaction_event_id
)
except BridgingError as e:
self.log.debug(str(e))
await self._send_bridge_error(user, e, reaction_event_id, EventType.REACTION)
except ReactionInvalidError as e:
# Don't redact reactions in relaybot chats, there are usually other Matrix users too.
if not self.has_bot:
await self.main_intent.redact(
self.mxid, reaction_event_id, reason="Emoji not allowed"
)
self.log.debug(f"Failed to bridge reaction by {user.mxid}: emoji not allowed")
await self._send_bridge_error(user, e, reaction_event_id, EventType.REACTION)
except Exception as e:
self.log.exception(f"Failed to bridge reaction by {user.mxid}")
await self._send_bridge_error(user, e, reaction_event_id, EventType.REACTION)
else:
user.send_remote_checkpoint(
MessageSendCheckpointStatus.SUCCESS,
reaction_event_id,
self.mxid,
EventType.REACTION,
)
await self._send_delivery_receipt(reaction_event_id)
async def _handle_matrix_reaction(
self, user: u.User, target_event_id: EventID, emoji: str, reaction_event_id: EventID
) -> None:
tg_space = self.tgid if self.peer_type == "channel" else user.tgid
msg = await DBMessage.get_by_mxid(target_event_id, self.mxid, tg_space)
if not msg:
raise BridgingError(f"Ignoring Matrix reaction to unknown event {target_event_id}")
elif msg.redacted:
raise BridgingError(f"Ignoring Matrix reaction to redacted event {target_event_id}")
elif msg.edit_index != 0:
raise BridgingError(f"Ignoring Matrix reaction to edit event {target_event_id}")
emoji = variation_selector.remove(emoji)
existing_react = await DBReaction.get_by_sender(msg.mxid, msg.mx_room, user.tgid)
await user.client(SendReactionRequest(peer=self.peer, msg_id=msg.tgid, reaction=emoji))
if existing_react:
puppet = await user.get_puppet()
await puppet.intent_for(self).redact(existing_react.mx_room, existing_react.mxid)
existing_react.mxid = reaction_event_id
existing_react.reaction = emoji
await existing_react.save()
else:
await DBReaction(
mxid=reaction_event_id,
mx_room=self.mxid,
msg_mxid=msg.mxid,
tg_sender=user.tgid,
reaction=emoji,
).save()
async def _update_telegram_power_level( async def _update_telegram_power_level(
self, sender: u.User, user_id: TelegramID, level: int self, sender: u.User, user_id: TelegramID, level: int
) -> None: ) -> None:
@@ -2077,7 +2182,7 @@ class Portal(DBPortal, BasePortal):
@staticmethod @staticmethod
def _parse_telegram_document_attributes(attributes: list[TypeDocumentAttribute]) -> DocAttrs: def _parse_telegram_document_attributes(attributes: list[TypeDocumentAttribute]) -> DocAttrs:
name, mime_type, is_sticker, sticker_alt, width, height = None, None, False, None, 0, 0 name, mime_type, is_sticker, sticker_alt, width, height = None, None, False, None, 0, 0
is_gif = False is_gif, is_audio, is_voice, duration, waveform = False, False, False, 0, bytes()
for attr in attributes: for attr in attributes:
if isinstance(attr, DocumentAttributeFilename): if isinstance(attr, DocumentAttributeFilename):
name = name or attr.file_name name = name or attr.file_name
@@ -2091,7 +2196,25 @@ class Portal(DBPortal, BasePortal):
width, height = attr.w, attr.h width, height = attr.w, attr.h
elif isinstance(attr, DocumentAttributeImageSize): elif isinstance(attr, DocumentAttributeImageSize):
width, height = attr.w, attr.h width, height = attr.w, attr.h
return DocAttrs(name, mime_type, is_sticker, sticker_alt, width, height, is_gif) elif isinstance(attr, DocumentAttributeAudio):
is_audio = True
is_voice = attr.voice or False
duration = attr.duration
waveform = decode_waveform(attr.waveform) if attr.waveform else b""
return DocAttrs(
name,
mime_type,
is_sticker,
sticker_alt,
width,
height,
is_gif,
is_audio,
is_voice,
duration,
waveform,
)
@staticmethod @staticmethod
def _parse_telegram_document_meta( def _parse_telegram_document_meta(
@@ -2220,6 +2343,12 @@ class Portal(DBPortal, BasePortal):
"image/": MessageType.IMAGE, "image/": MessageType.IMAGE,
}.get(info.mimetype[:6], MessageType.FILE), }.get(info.mimetype[:6], MessageType.FILE),
) )
if attrs.is_audio:
content["org.matrix.msc1767.audio"] = {"duration": attrs.duration * 1000}
if attrs.waveform:
content["org.matrix.msc1767.audio"]["waveform"] = [x << 5 for x in attrs.waveform]
if attrs.is_voice:
content["org.matrix.msc3245.voice"] = {}
if file.decryption_info: if file.decryption_info:
content.file = file.decryption_info content.file = file.decryption_info
else: else:
@@ -2395,13 +2524,18 @@ class Portal(DBPortal, BasePortal):
self.log.debug("Ignoring game message edit event") self.log.debug("Ignoring game message edit event")
return return
if self.peer_type != "channel" and isinstance(evt, Message) and evt.reactions is not None:
asyncio.create_task(
self.try_handle_telegram_reactions(source, TelegramID(evt.id), evt.reactions)
)
async with self.send_lock(sender.tgid if sender else None, required=False): async with self.send_lock(sender.tgid if sender else None, required=False):
tg_space = self.tgid if self.peer_type == "channel" else source.tgid tg_space = self.tgid if self.peer_type == "channel" else source.tgid
temporary_identifier = EventID( temporary_identifier = EventID(
f"${random.randint(1000000000000, 9999999999999)}TGBRIDGEDITEMP" f"${random.randint(1000000000000, 9999999999999)}TGBRIDGEDITEMP"
) )
duplicate_found = self.dedup.check( event_hash, duplicate_found = self.dedup.check(
evt, (temporary_identifier, tg_space), force_hash=True evt, (temporary_identifier, tg_space), force_hash=True
) )
if duplicate_found: if duplicate_found:
@@ -2410,7 +2544,11 @@ class Portal(DBPortal, BasePortal):
prev_edit_msg = await DBMessage.get_one_by_tgid( prev_edit_msg = await DBMessage.get_one_by_tgid(
TelegramID(evt.id), tg_space, edit_index=-1 TelegramID(evt.id), tg_space, edit_index=-1
) )
if not prev_edit_msg: if (
not prev_edit_msg
or prev_edit_msg.mxid == mxid
or prev_edit_msg.content_hash == event_hash
):
return return
await DBMessage( await DBMessage(
mxid=mxid, mxid=mxid,
@@ -2418,6 +2556,7 @@ class Portal(DBPortal, BasePortal):
tg_space=tg_space, tg_space=tg_space,
tgid=TelegramID(evt.id), tgid=TelegramID(evt.id),
edit_index=prev_edit_msg.edit_index + 1, edit_index=prev_edit_msg.edit_index + 1,
content_hash=event_hash,
).insert() ).insert()
return return
@@ -2431,6 +2570,16 @@ class Portal(DBPortal, BasePortal):
"in database." "in database."
) )
return return
prev_edit_msg = (
await DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space, -1) or editing_msg
)
if prev_edit_msg.content_hash == event_hash:
self.log.debug(
f"Ignoring edit of message {evt.id}@{tg_space} (src {source.tgid}):"
" content hash didn't change"
)
await DBMessage.delete_temp_mxid(temporary_identifier, self.mxid)
return
content.msgtype = ( content.msgtype = (
MessageType.NOTICE MessageType.NOTICE
@@ -2444,15 +2593,13 @@ class Portal(DBPortal, BasePortal):
await intent.set_typing(self.mxid, is_typing=False) await intent.set_typing(self.mxid, is_typing=False)
event_id = await self._send_message(intent, content) event_id = await self._send_message(intent, content)
prev_edit_msg = (
await DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space, -1) or editing_msg
)
await DBMessage( await DBMessage(
mxid=event_id, mxid=event_id,
mx_room=self.mxid, mx_room=self.mxid,
tg_space=tg_space, tg_space=tg_space,
tgid=TelegramID(evt.id), tgid=TelegramID(evt.id),
edit_index=prev_edit_msg.edit_index + 1, edit_index=prev_edit_msg.edit_index + 1,
content_hash=event_hash,
).insert() ).insert()
await DBMessage.replace_temp_mxid(temporary_identifier, self.mxid, event_id) await DBMessage.replace_temp_mxid(temporary_identifier, self.mxid, event_id)
@@ -2587,6 +2734,139 @@ class Portal(DBPortal, BasePortal):
count += 1 count += 1
return count return count
def _split_dm_reaction_counts(self, counts: list[ReactionCount]) -> list[MessageUserReaction]:
if len(counts) == 1:
item = counts[0]
if item.count == 2:
return [
MessageUserReaction(reaction=item.reaction, user_id=self.tgid),
MessageUserReaction(reaction=item.reaction, user_id=self.tg_receiver),
]
elif item.count == 1:
return [
MessageUserReaction(
reaction=item.reaction,
user_id=self.tg_receiver if item.chosen else self.tgid,
),
]
elif len(counts) == 2:
item1, item2 = counts
return [
MessageUserReaction(
reaction=item1.reaction,
user_id=self.tg_receiver if item1.chosen else self.tgid,
),
MessageUserReaction(
reaction=item2.reaction,
user_id=self.tg_receiver if item2.chosen else self.tgid,
),
]
return []
async def try_handle_telegram_reactions(
self,
source: au.AbstractUser,
msg_id: TelegramID,
data: MessageReactions,
dbm: DBMessage | None = None,
) -> None:
try:
await self.handle_telegram_reactions(source, msg_id, data, dbm)
except Exception:
self.log.exception(f"Error handling reactions in message {msg_id}")
async def handle_telegram_reactions(
self,
source: au.AbstractUser,
msg_id: TelegramID,
data: MessageReactions,
dbm: DBMessage | None = None,
) -> None:
if self.peer_type == "channel" and not self.megagroup:
# We don't know who reacted in a channel, so we can't bridge it properly either
return
tg_space = self.tgid if self.peer_type == "channel" else source.tgid
if dbm is None:
dbm = await DBMessage.get_one_by_tgid(msg_id, tg_space)
if dbm is None:
return
total_count = sum(item.count for item in data.results)
recent_reactions = data.recent_reactons or []
if not recent_reactions and total_count > 0:
if self.peer_type == "user":
recent_reactions = self._split_dm_reaction_counts(data.results)
elif source.is_bot:
# Can't fetch exact reaction senders as a bot
return
else:
# TODO this doesn't work for some reason
return
# resp = await source.client(
# GetMessageReactionsListRequest(peer=self.peer, id=dbm.tgid, limit=20)
# )
# recent_reactions = resp.reactions
async with self.reaction_lock(dbm.mxid):
await self._handle_telegram_reactions_locked(dbm, recent_reactions, total_count)
async def _handle_telegram_reactions_locked(
self, msg: DBMessage, reaction_list: list[MessageUserReaction], total_count: int
) -> None:
reactions = {reaction.user_id: reaction.reaction for reaction in reaction_list}
is_full = len(reactions) == total_count
existing_reactions = await DBReaction.get_all_by_message(msg.mxid, msg.mx_room)
removed: list[DBReaction] = []
changed: list[tuple[DBReaction, str]] = []
for existing_reaction in existing_reactions:
new_reaction = reactions.get(existing_reaction.tg_sender)
if new_reaction is None:
if is_full:
removed.append(existing_reaction)
# else: assume the reaction is still there, too much effort to fetch it
elif new_reaction == existing_reaction.reaction:
reactions.pop(existing_reaction.tg_sender)
else:
changed.append((existing_reaction, new_reaction))
for sender, new_emoji in reactions.items():
self.log.debug(f"Bridging reaction {new_emoji} by {sender} to {msg.tgid}")
puppet: p.Puppet = await p.Puppet.get_by_tgid(sender)
mxid = await puppet.intent_for(self).react(
msg.mx_room, msg.mxid, variation_selector.add(new_emoji)
)
await DBReaction(
mxid=mxid,
mx_room=msg.mx_room,
msg_mxid=msg.mxid,
tg_sender=sender,
reaction=new_emoji,
).save()
for removed_reaction in removed:
self.log.debug(
f"Removing reaction {removed_reaction.reaction} by {removed_reaction.tg_sender} "
f"to {msg.tgid}"
)
puppet = await p.Puppet.get_by_tgid(removed_reaction.tg_sender)
await puppet.intent_for(self).redact(removed_reaction.mx_room, removed_reaction.mxid)
await removed_reaction.delete()
for changed_reaction, new_emoji in changed:
self.log.debug(
f"Updating reaction {changed_reaction.reaction} -> {new_emoji} "
f"by {changed_reaction.tg_sender} to {msg.tgid}"
)
puppet = await p.Puppet.get_by_tgid(changed_reaction.tg_sender)
intent = puppet.intent_for(self)
await intent.redact(changed_reaction.mx_room, changed_reaction.mxid)
changed_reaction.mxid = await intent.react(
msg.mx_room, msg.mxid, variation_selector.add(new_emoji)
)
changed_reaction.reaction = new_emoji
await changed_reaction.save()
async def handle_telegram_message( async def handle_telegram_message(
self, source: au.AbstractUser, sender: p.Puppet, evt: Message self, source: au.AbstractUser, sender: p.Puppet, evt: Message
) -> None: ) -> None:
@@ -2613,7 +2893,7 @@ class Portal(DBPortal, BasePortal):
temporary_identifier = EventID( temporary_identifier = EventID(
f"${random.randint(1000000000000, 9999999999999)}TGBRIDGETEMP" f"${random.randint(1000000000000, 9999999999999)}TGBRIDGETEMP"
) )
duplicate_found = self.dedup.check(evt, (temporary_identifier, tg_space)) event_hash, duplicate_found = self.dedup.check(evt, (temporary_identifier, tg_space))
if duplicate_found: if duplicate_found:
mxid, other_tg_space = duplicate_found mxid, other_tg_space = duplicate_found
self.log.debug( self.log.debug(
@@ -2627,17 +2907,16 @@ class Portal(DBPortal, BasePortal):
mxid=mxid, mxid=mxid,
tg_space=tg_space, tg_space=tg_space,
edit_index=0, edit_index=0,
content_hash=event_hash,
).insert() ).insert()
return return
if self.backfill_lock.locked or (self.dedup.pre_db_check and self.peer_type == "channel"): if self.backfill_lock.locked or self.peer_type == "channel":
msg = await DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space) msg = await DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space)
if msg: if msg:
self.log.debug( self.log.debug(
f"Ignoring message {evt.id} (src {source.tgid}) as it was already " f"Ignoring message {evt.id} (src {source.tgid}) as it was already "
f"handled into {msg.mxid}. This duplicate was catched in the db " f"handled into {msg.mxid}."
"check. If you get this message often, consider increasing "
"bridge.deduplication.cache_queue_length in the config."
) )
return return
@@ -2702,7 +2981,10 @@ class Portal(DBPortal, BasePortal):
self._new_messages_after_sponsored = True self._new_messages_after_sponsored = True
prev_id = self.dedup.update(evt, (event_id, tg_space), (temporary_identifier, tg_space)) another_event_hash, prev_id = self.dedup.update(
evt, (event_id, tg_space), (temporary_identifier, tg_space)
)
assert another_event_hash == event_hash
if prev_id: if prev_id:
self.log.debug( self.log.debug(
f"Sent message {evt.id}@{tg_space} to Matrix as {event_id}. " f"Sent message {evt.id}@{tg_space} to Matrix as {event_id}. "
@@ -2717,13 +2999,15 @@ class Portal(DBPortal, BasePortal):
self.log.debug("Handled telegram message %d -> %s", evt.id, event_id) self.log.debug("Handled telegram message %d -> %s", evt.id, event_id)
try: try:
await DBMessage( dbm = DBMessage(
tgid=TelegramID(evt.id), tgid=TelegramID(evt.id),
mx_room=self.mxid, mx_room=self.mxid,
mxid=event_id, mxid=event_id,
tg_space=tg_space, tg_space=tg_space,
edit_index=0, edit_index=0,
).insert() content_hash=event_hash,
)
await dbm.insert()
await DBMessage.replace_temp_mxid(temporary_identifier, self.mxid, event_id) await DBMessage.replace_temp_mxid(temporary_identifier, self.mxid, event_id)
except (IntegrityError, UniqueViolationError) as e: except (IntegrityError, UniqueViolationError) as e:
self.log.exception( self.log.exception(
@@ -2733,6 +3017,11 @@ class Portal(DBPortal, BasePortal):
"pre_db_check in the config." "pre_db_check in the config."
) )
await intent.redact(self.mxid, event_id) await intent.redact(self.mxid, event_id)
return
if isinstance(evt, Message) and evt.reactions:
asyncio.create_task(
self.try_handle_telegram_reactions(source, dbm.tgid, evt.reactions, dbm=dbm)
)
await self._send_delivery_receipt(event_id) await self._send_delivery_receipt(event_id)
async def _create_room_on_action( async def _create_room_on_action(
@@ -2956,6 +3245,7 @@ class Portal(DBPortal, BasePortal):
pass pass
await super().delete() await super().delete()
await DBMessage.delete_all(self.mxid) await DBMessage.delete_all(self.mxid)
await DBReaction.delete_all(self.mxid)
self.deleted = True self.deleted = True
# endregion # endregion
+1 -1
View File
@@ -2,5 +2,5 @@ from .deduplication import PortalDedup
from .media_fallback import make_contact_event_content, make_dice_event_content from .media_fallback import make_contact_event_content, make_dice_event_content
from .participants import get_users from .participants import get_users
from .power_levels import get_base_power_levels, participants_to_power_levels from .power_levels import get_base_power_levels, participants_to_power_levels
from .send_lock import PortalSendLock from .send_lock import PortalReactionLock, PortalSendLock
from .sponsored_message import get_sponsored_message, make_sponsored_message_content from .sponsored_message import get_sponsored_message, make_sponsored_message_content
+73 -49
View File
@@ -15,20 +15,30 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations from __future__ import annotations
from typing import Tuple from typing import Any, Generator, Tuple, Union
from collections import deque from collections import deque
import hashlib import hashlib
from telethon.tl.patched import Message, MessageService from telethon.tl.patched import Message, MessageService
from telethon.tl.types import ( from telethon.tl.types import (
Message,
MessageMediaContact, MessageMediaContact,
MessageMediaDice,
MessageMediaDocument, MessageMediaDocument,
MessageMediaGame,
MessageMediaGeo, MessageMediaGeo,
MessageMediaPhoto, MessageMediaPhoto,
TypeMessage, MessageMediaPoll,
MessageMediaUnsupported,
MessageService,
PeerChannel,
PeerChat,
PeerUser,
TypeUpdates, TypeUpdates,
UpdateNewChannelMessage, UpdateNewChannelMessage,
UpdateNewMessage, UpdateNewMessage,
UpdateShortChatMessage,
UpdateShortMessage,
) )
from mautrix.types import EventID from mautrix.types import EventID
@@ -37,60 +47,67 @@ from .. import portal as po
from ..types import TelegramID from ..types import TelegramID
DedupMXID = Tuple[EventID, TelegramID] DedupMXID = Tuple[EventID, TelegramID]
TypeMessage = Union[Message, MessageService, UpdateShortMessage, UpdateShortChatMessage]
media_content_table = {
MessageMediaContact: lambda media: [media.user_id],
MessageMediaDocument: lambda media: [media.document.id],
MessageMediaPhoto: lambda media: [media.photo.id if media.photo else 0],
MessageMediaGeo: lambda media: [media.geo.long, media.geo.lat],
MessageMediaGame: lambda media: [media.game.id],
MessageMediaPoll: lambda media: [media.poll.id],
MessageMediaDice: lambda media: [media.value, media.emoticon],
MessageMediaUnsupported: lambda media: ["unsupported media"],
}
class PortalDedup: class PortalDedup:
pre_db_check: bool = False pre_db_check: bool = False
cache_queue_length: int = 20 cache_queue_length: int = 256
_dedup: deque[str] _dedup: deque[bytes | int]
_dedup_mxid: dict[str, DedupMXID] _dedup_mxid: dict[bytes | int, DedupMXID]
_dedup_action: deque[str] _dedup_action: deque[bytes | int]
_portal: po.Portal _portal: po.Portal
def __init__(self, portal: po.Portal) -> None: def __init__(self, portal: po.Portal) -> None:
self._dedup = deque() self._dedup = deque()
self._dedup_mxid = {} self._dedup_mxid = {}
self._dedup_action = deque() self._dedup_action = deque(maxlen=self.cache_queue_length)
self._portal = portal self._portal = portal
@property @property
def _always_force_hash(self) -> bool: def _always_force_hash(self) -> bool:
return self._portal.peer_type == "chat" return self._portal.peer_type == "chat"
@staticmethod def _hash_content(self, event: TypeMessage) -> Generator[Any, None, None]:
def _hash_event(event: TypeMessage) -> str: if not self._always_force_hash:
# Non-channel messages are unique per-user (wtf telegram), so we have no other choice than yield event.id
# to deduplicate based on a hash of the message content. yield int(event.date.timestamp())
# The timestamp is only accurate to the second, so we can't rely solely on that either.
if isinstance(event, MessageService): if isinstance(event, MessageService):
hash_content = [event.date.timestamp(), event.from_id, event.action] yield event.from_id
yield event.action
else: else:
hash_content = [event.date.timestamp(), event.message.strip()] yield event.message.strip()
if event.fwd_from: if event.fwd_from:
hash_content += [event.fwd_from.from_id] yield event.fwd_from.from_id
elif isinstance(event, Message) and event.media: if isinstance(event, Message) and event.media:
try: media_hash_func = media_content_table.get(type(event.media)) or (
hash_content += { lambda media: ["unknown media"]
MessageMediaContact: lambda media: [media.user_id], )
MessageMediaDocument: lambda media: [media.document.id], yield media_hash_func(event.media)
MessageMediaPhoto: lambda media: [media.photo.id if media.photo else 0],
MessageMediaGeo: lambda media: [media.geo.long, media.geo.lat], def _hash_event(self, event: TypeMessage) -> bytes:
}[type(event.media)](event.media) return hashlib.sha256(
except KeyError: "-".join(str(a) for a in self._hash_content(event)).encode("utf-8")
pass ).digest()
return hashlib.md5("-".join(str(a) for a in hash_content).encode("utf-8")).hexdigest()
def check_action(self, event: TypeMessage) -> bool: def check_action(self, event: TypeMessage) -> bool:
evt_hash = self._hash_event(event) if self._always_force_hash else event.id dedup_id = self._hash_event(event) if self._always_force_hash else event.id
if evt_hash in self._dedup_action: if dedup_id in self._dedup_action:
return True return True
self._dedup_action.append(evt_hash) self._dedup_action.appendleft(dedup_id)
if len(self._dedup_action) > self.cache_queue_length:
self._dedup_action.popleft()
return False return False
def update( def update(
@@ -99,31 +116,38 @@ class PortalDedup:
mxid: DedupMXID = None, mxid: DedupMXID = None,
expected_mxid: DedupMXID | None = None, expected_mxid: DedupMXID | None = None,
force_hash: bool = False, force_hash: bool = False,
) -> DedupMXID | None: ) -> tuple[bytes, DedupMXID | None]:
evt_hash = self._hash_event(event) if self._always_force_hash or force_hash else event.id evt_hash = self._hash_event(event)
dedup_id = evt_hash if self._always_force_hash or force_hash else event.id
try: try:
found_mxid = self._dedup_mxid[evt_hash] found_mxid = self._dedup_mxid[dedup_id]
except KeyError: except KeyError:
return EventID("None"), TelegramID(0) return evt_hash, None
if found_mxid != expected_mxid: if found_mxid != expected_mxid:
return found_mxid return evt_hash, found_mxid
self._dedup_mxid[evt_hash] = mxid self._dedup_mxid[dedup_id] = mxid
return None if evt_hash != dedup_id:
self._dedup_mxid[evt_hash] = mxid
return evt_hash, None
def check( def check(
self, event: TypeMessage, mxid: DedupMXID = None, force_hash: bool = False self, event: TypeMessage, mxid: DedupMXID = None, force_hash: bool = False
) -> DedupMXID | None: ) -> tuple[bytes, DedupMXID | None]:
evt_hash = self._hash_event(event) if self._always_force_hash or force_hash else event.id evt_hash = self._hash_event(event)
if evt_hash in self._dedup: dedup_id = evt_hash if self._always_force_hash or force_hash else event.id
return self._dedup_mxid[evt_hash] if dedup_id in self._dedup:
return evt_hash, self._dedup_mxid[dedup_id]
self._dedup_mxid[evt_hash] = mxid self._dedup_mxid[dedup_id] = mxid
self._dedup.append(evt_hash) self._dedup.appendleft(dedup_id)
if evt_hash != dedup_id:
self._dedup_mxid[evt_hash] = mxid
self._dedup.appendleft(evt_hash)
if len(self._dedup) > self.cache_queue_length: while len(self._dedup) > self.cache_queue_length:
del self._dedup_mxid[self._dedup.popleft()] del self._dedup_mxid[self._dedup.pop()]
return None return evt_hash, None
def register_outgoing_actions(self, response: TypeUpdates) -> None: def register_outgoing_actions(self, response: TypeUpdates) -> None:
for update in response.updates: for update in response.updates:
+13
View File
@@ -16,6 +16,9 @@
from __future__ import annotations from __future__ import annotations
from asyncio import Lock from asyncio import Lock
from collections import defaultdict
from mautrix.types import EventID
from ..types import TelegramID from ..types import TelegramID
@@ -42,3 +45,13 @@ class PortalSendLock:
return self._send_locks[user_id] return self._send_locks[user_id]
except KeyError: except KeyError:
return self._send_locks.setdefault(user_id, Lock()) if required else self._noop_lock return self._send_locks.setdefault(user_id, Lock()) if required else self._noop_lock
class PortalReactionLock:
_reaction_locks: dict[EventID, Lock]
def __init__(self) -> None:
self._reaction_locks = defaultdict(lambda: Lock())
def __call__(self, mxid: EventID) -> Lock:
return self._reaction_locks[mxid]
+1 -1
View File
@@ -655,10 +655,10 @@ class User(DBUser, AbstractUser, BaseUser):
if self.saved_contacts != response.saved_count: if self.saved_contacts != response.saved_count:
self.saved_contacts = response.saved_count self.saved_contacts = response.saved_count
await self.save() await self.save()
await self.set_contacts(user.id for user in response.users)
for user in response.users: for user in response.users:
puppet = await pu.Puppet.get_by_tgid(user.id) puppet = await pu.Puppet.get_by_tgid(user.id)
await puppet.update_info(self, user) await puppet.update_info(self, user)
await self.set_contacts(user.id for user in response.users)
# endregion # endregion
# region Class instance lookup # region Class instance lookup
+50 -97
View File
@@ -19,12 +19,15 @@ from __future__ import annotations
from typing import Any, Awaitable, Callable from typing import Any, Awaitable, Callable
import asyncio.subprocess import asyncio.subprocess
import logging import logging
import os
import os.path import os.path
import shutil import shutil
import tempfile import tempfile
from attr import dataclass from attr import dataclass
from mautrix.util import ffmpeg
log: logging.Logger = logging.getLogger("mau.util.tgs") log: logging.Logger = logging.getLogger("mau.util.tgs")
@@ -48,61 +51,49 @@ def abswhich(program: str | None) -> str | None:
lottieconverter = abswhich("lottieconverter") lottieconverter = abswhich("lottieconverter")
ffmpeg = abswhich("ffmpeg")
async def _run_lottieconverter(args: tuple[str, ...], input_data: bytes) -> bytes:
proc = await asyncio.create_subprocess_exec(
lottieconverter,
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate(input_data)
if proc.returncode == 0:
return stdout
else:
err_text = stderr.decode("utf-8") if stderr else f"unknown ({proc.returncode})"
raise ffmpeg.ConverterError(f"lottieconverter error: {err_text}")
if lottieconverter: if lottieconverter:
async def tgs_to_png(file: bytes, width: int, height: int, **_: Any) -> ConvertedSticker: async def tgs_to_png(file: bytes, width: int, height: int, **_: Any) -> ConvertedSticker:
frame = 1 frame = 1
proc = await asyncio.create_subprocess_exec( try:
lottieconverter, converted_png = await _run_lottieconverter(
"-", args=("-", "-", "png", f"{width}x{height}", str(frame)),
"-", input_data=file,
"png",
f"{width}x{height}",
str(frame),
stdout=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate(file)
if proc.returncode == 0:
return ConvertedSticker("image/png", stdout)
else:
log.error(
"lottieconverter error: "
+ (
stderr.decode("utf-8")
if stderr is not None
else f"unknown ({proc.returncode})"
)
) )
return ConvertedSticker("image/png", converted_png)
except ffmpeg.ConverterError as e:
log.error(str(e))
return ConvertedSticker("application/gzip", file) return ConvertedSticker("application/gzip", file)
async def tgs_to_gif( async def tgs_to_gif(
file: bytes, width: int, height: int, fps: int = 25, **_: Any file: bytes, width: int, height: int, fps: int = 25, **_: Any
) -> ConvertedSticker: ) -> ConvertedSticker:
proc = await asyncio.create_subprocess_exec( try:
lottieconverter, converted_gif = await _run_lottieconverter(
"-", args=("-", "-", "gif", f"{width}x{height}", str(fps)),
"-", input_data=file,
"gif",
f"{width}x{height}",
str(fps),
stdout=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate(file)
if proc.returncode == 0:
return ConvertedSticker("image/gif", stdout)
else:
log.error(
"lottieconverter error: "
+ (
stderr.decode("utf-8")
if stderr is not None
else f"unknown ({proc.returncode})"
)
) )
return ConvertedSticker("image/gif", converted_gif)
except ffmpeg.ConverterError as e:
log.error(str(e))
return ConvertedSticker("application/gzip", file) return ConvertedSticker("application/gzip", file)
converters["png"] = tgs_to_png converters["png"] = tgs_to_png
@@ -115,62 +106,24 @@ if lottieconverter and ffmpeg:
) -> ConvertedSticker: ) -> ConvertedSticker:
with tempfile.TemporaryDirectory(prefix="tgs_") as tmpdir: with tempfile.TemporaryDirectory(prefix="tgs_") as tmpdir:
file_template = tmpdir + "/out_" file_template = tmpdir + "/out_"
proc = await asyncio.create_subprocess_exec( try:
lottieconverter, await _run_lottieconverter(
"-", args=("-", file_template, "pngs", f"{width}x{height}", str(fps)),
file_template, input_data=file,
"pngs", )
f"{width}x{height}", first_frame_name = min(os.listdir(tmpdir))
str(fps), with open(f"{tmpdir}/{first_frame_name}", "rb") as first_frame_file:
stdout=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
)
_, stderr = await proc.communicate(file)
if proc.returncode == 0:
with open(f"{file_template}00.png", "rb") as first_frame_file:
first_frame_data = first_frame_file.read() first_frame_data = first_frame_file.read()
proc = await asyncio.create_subprocess_exec( webm_data = await ffmpeg.convert_path(
ffmpeg, input_args=("-framerate", str(fps), "-pattern_type", "glob"),
"-hide_banner", input_file=f"{file_template}*.png",
"-loglevel", output_args=("-c:v", "libvpx-vp9", "-pix_fmt", "yuva420p", "-f", "webm"),
"error", output_path_override="-",
"-framerate", output_extension=None,
str(fps),
"-pattern_type",
"glob",
"-i",
file_template + "*.png",
"-c:v",
"libvpx-vp9",
"-pix_fmt",
"yuva420p",
"-f",
"webm",
"-",
stdout=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode == 0:
return ConvertedSticker("video/webm", stdout, "image/png", first_frame_data)
else:
log.error(
"ffmpeg error: "
+ (
stderr.decode("utf-8")
if stderr is not None
else f"unknown ({proc.returncode})"
)
)
else:
log.error(
"lottieconverter error: "
+ (
stderr.decode("utf-8")
if stderr is not None
else f"unknown ({proc.returncode})"
)
) )
return ConvertedSticker("video/webm", webm_data, "image/png", first_frame_data)
except ffmpeg.ConverterError as e:
log.error(str(e))
return ConvertedSticker("application/gzip", file) return ConvertedSticker("application/gzip", file)
converters["webm"] = tgs_to_webm converters["webm"] = tgs_to_webm
+1 -1
View File
@@ -8,7 +8,7 @@ aiodns
brotli brotli
#/qr_login #/qr_login
pillow>=4,<9 pillow>=4,<10
qrcode>=6,<8 qrcode>=6,<8
#/hq_thumbnails #/hq_thumbnails
+3 -3
View File
@@ -3,10 +3,10 @@ python-magic>=0.4,<0.5
commonmark>=0.8,<0.10 commonmark>=0.8,<0.10
aiohttp>=3,<4 aiohttp>=3,<4
yarl>=1,<2 yarl>=1,<2
mautrix>=0.14.0,<0.15 mautrix>=0.14.3,<0.15
#telethon>=1.24,<1.25 #telethon>=1.24,<1.25
# Fork to make session storage async # Fork to make session storage async and update to layer 137
tulir-telethon==1.25.0a1 tulir-telethon==1.25.0a3
asyncpg>=0.20,<0.26 asyncpg>=0.20,<0.26
mako>=1,<2 mako>=1,<2
setuptools setuptools
-1
View File
@@ -1,5 +1,4 @@
import setuptools import setuptools
import glob
from mautrix_telegram.get_version import git_tag, git_revision, version, linkified_version from mautrix_telegram.get_version import git_tag, git_revision, version, linkified_version