Compare commits

...

20 Commits

Author SHA1 Message Date
Tulir Asokan 58bc6788aa Bump version to 0.12.1 2022-09-26 21:42:51 +03:00
Tulir Asokan 5a767a2d92 Update Telethon 2022-09-25 17:06:17 +03:00
Tulir Asokan 282ad43180 Update changelog and mautrix-python 2022-09-24 13:58:13 +03:00
Tulir Asokan bcb30ce807 Update Telethon 2022-09-21 15:27:41 +03:00
Tulir Asokan 2d865f006e Don't use row.get to be compatible with sqlite3.Row 2022-09-20 18:43:41 +03:00
Tulir Asokan b2daebead6 Catch errors when updating read status or tags. Fixes #812 2022-09-20 11:11:59 +03:00
Tulir Asokan 4210091e9a Fix some bugs 2022-09-20 01:59:47 +03:00
Tulir Asokan 4db09f2240 Update Telethon 2022-09-20 00:32:47 +03:00
Tulir Asokan e0260eb551 Don't recreate update loop on UnauthorizedErrors 2022-09-20 00:26:42 +03:00
Tulir Asokan ed1e5474bf Update latest revision migration 2022-09-19 19:10:16 +03:00
Tulir Asokan 65bd7fcc49 Use mautrix-python magic wrapper. Fixes #594 2022-09-17 15:00:49 +03:00
Tulir Asokan 80834ccec1 Update changelog 2022-09-17 14:29:50 +03:00
Tulir Asokan 026c39a3de Add support for new reaction stuff
* Custom emojis in reactions
* Premium users can react 3 times to a single message
* Reactions to recent messages are now polled on read receipt
2022-09-17 14:25:06 +03:00
Tulir Asokan 95939dfa02 Update mautrix-python to fix encrypting when a single device is out of OTKs 2022-09-15 21:55:01 +03:00
Tulir Asokan 279da9097c Update mautrix-python 2022-09-15 17:18:35 +03:00
Tulir Asokan 97126332da Add option to bypass startup script. Closes #838 2022-09-15 17:18:35 +03:00
Tulir Asokan 6641b9a16c Save own ID as message sender ID for messages without sender 2022-09-15 17:18:35 +03:00
Tulir Asokan 927c9afa84 Move config env overrides to mautrix-python 2022-09-15 17:18:35 +03:00
Tulir Asokan d41d7ca0a6 Handle ChatParticipantsForbidden 2022-09-15 17:18:35 +03:00
Tulir Asokan ad0c6cfc8d Run connection tracking task if status_endpoint is set 2022-09-13 16:36:38 +03:00
23 changed files with 457 additions and 178 deletions
+28
View File
@@ -1,3 +1,31 @@
# v0.12.1 (2022-09-26)
### Added
* Support for custom emojis in reactions.
* Like other bridges with custom emoji reactions, they're bridged as `mxc://`
URIs, so client support is required to render them properly.
### Improved
* The bridge will now poll for reactions to 20 most recent messages when
receiving a read receipt. This works around Telegram's bad protocol that
doesn't notify clients on reactions to other users' messages.
* The docker image now has an option to bypass the startup script by setting
the `MAUTRIX_DIRECT_STARTUP` environment variable. Additionally, it will
refuse to run as a non-root user if that variable is not set (and print an
error message suggesting to either set the variable or use a custom command).
* Moved environment variable overrides for config fields to mautrix-python.
The new system also allows loading JSON values to enable overriding maps like
`login_shared_secret_map`.
### Fixed
* `ChatParticipantsForbidden` is handled properly when syncing non-supergroup
info.
* Fixed some bugs with file transfers when using SQLite.
* Fixed error when attempting to log in again after logging out.
* Fixed QR login not working.
* Fixed error syncing chats if bridging a message had previously been
interrupted.
# v0.12.0 (2022-08-26) # v0.12.0 (2022-08-26)
**N.B.** This release requires a homeserver with Matrix v1.1 support, which **N.B.** This release requires a homeserver with Matrix v1.1 support, which
+14
View File
@@ -1,4 +1,18 @@
#!/bin/sh #!/bin/sh
if [ ! -z "$MAUTRIX_DIRECT_STARTUP" ]; then
if [ $(id -u) == 0 ]; then
echo "|------------------------------------------|"
echo "| Warning: running bridge unsafely as root |"
echo "|------------------------------------------|"
fi
exec python3 -m mautrix_telegram -c /data/config.yaml
elif [ $(id -u) != 0 ]; then
echo "The startup script must run as root. It will use su-exec to drop permissions before running the bridge."
echo "To bypass the startup script, either set the `MAUTRIX_DIRECT_STARTUP` environment variable,"
echo "or just use `python3 -m mautrix_telegram -c /data/config.yaml` as the run command."
echo "Note that the config and registration will not be auto-generated when bypassing the startup script."
exit 1
fi
# Define functions. # Define functions.
function fixperms { function fixperms {
+1 -1
View File
@@ -1,2 +1,2 @@
__version__ = "0.12.0" __version__ = "0.12.1"
__author__ = "Tulir Asokan <tulir@maunium.net>" __author__ = "Tulir Asokan <tulir@maunium.net>"
+10 -5
View File
@@ -22,6 +22,7 @@ import logging
import platform import platform
import time import time
from telethon.errors import UnauthorizedError
from telethon.network import ( from telethon.network import (
Connection, Connection,
ConnectionTcpFull, ConnectionTcpFull,
@@ -238,6 +239,9 @@ class AbstractUser(ABC):
self.log.critical(f"Stopping due to update handling error {type(err).__name__}") self.log.critical(f"Stopping due to update handling error {type(err).__name__}")
self.bridge.manual_stop(50) self.bridge.manual_stop(50)
else: else:
if isinstance(err, UnauthorizedError):
self.log.warning("Not recreating Telethon update loop")
return
self.log.info("Recreating Telethon update loop in 60 seconds") self.log.info("Recreating Telethon update loop in 60 seconds")
await asyncio.sleep(60) await asyncio.sleep(60)
self.log.debug("Now recreating Telethon update loop") self.log.debug("Now recreating Telethon update loop")
@@ -297,17 +301,18 @@ class AbstractUser(ABC):
async def ensure_started(self, even_if_no_session=False) -> AbstractUser: async def ensure_started(self, even_if_no_session=False) -> AbstractUser:
if self.connected: if self.connected:
return self return self
if even_if_no_session or await PgSession.has(self.mxid): session_exists = await PgSession.has(self.mxid)
if even_if_no_session or session_exists:
self.log.debug( self.log.debug(
"Starting client due to ensure_started" f"Starting client due to ensure_started({even_if_no_session=}, {session_exists=})"
f"(even_if_no_session={even_if_no_session})"
) )
await self.start(delete_unless_authenticated=not even_if_no_session) await self.start(delete_unless_authenticated=not even_if_no_session)
return self return self
async def stop(self) -> None: async def stop(self) -> None:
await self.client.disconnect() if self.client:
self.client = None await self.client.disconnect()
self.client = None
# region Telegram update handling # region Telegram update handling
+8
View File
@@ -29,6 +29,7 @@ from telethon.tl.types import (
ChatForbidden, ChatForbidden,
ChatParticipantAdmin, ChatParticipantAdmin,
ChatParticipantCreator, ChatParticipantCreator,
ChatParticipantsForbidden,
InputChannel, InputChannel,
InputUser, InputUser,
MessageActionChatAddUser, MessageActionChatAddUser,
@@ -112,6 +113,7 @@ class Bot(AbstractUser):
) )
self._me_info = None self._me_info = None
self._me_mxid = None self._me_mxid = None
self._login_wait_fut = self.loop.create_future()
async def get_me(self, use_cache: bool = True) -> tuple[User, UserID]: async def get_me(self, use_cache: bool = True) -> tuple[User, UserID]:
if not use_cache or not self._me_mxid: if not use_cache or not self._me_mxid:
@@ -145,6 +147,8 @@ class Bot(AbstractUser):
self.tgid = TelegramID(info.id) self.tgid = TelegramID(info.id)
self.tg_username = info.username self.tg_username = info.username
self.mxid = pu.Puppet.get_mxid_from_id(self.tgid) self.mxid = pu.Puppet.get_mxid_from_id(self.tgid)
self._login_wait_fut.set_result(None)
self._login_wait_fut = None
chat_ids = [chat_id for chat_id, chat_type in self.chats.items() if chat_type == "chat"] chat_ids = [chat_id for chat_id, chat_type in self.chats.items() if chat_type == "chat"]
response = await self.client(GetChatsRequest(chat_ids)) response = await self.client(GetChatsRequest(chat_ids))
@@ -198,6 +202,8 @@ class Bot(AbstractUser):
return pcp return pcp
elif isinstance(chat, PeerChat): elif isinstance(chat, PeerChat):
chat = await self.client(GetFullChatRequest(chat.chat_id)) chat = await self.client(GetFullChatRequest(chat.chat_id))
if isinstance(chat.full_chat.participants, ChatParticipantsForbidden):
return None
participants = chat.full_chat.participants.participants participants = chat.full_chat.participants.participants
for p in participants: for p in participants:
self._admin_cache[chat.channel_id, tgid] = (p, time.time()) self._admin_cache[chat.channel_id, tgid] = (p, time.time())
@@ -415,6 +421,8 @@ class Bot(AbstractUser):
await self.add_chat(TelegramID(action.channel_id), "channel") await self.add_chat(TelegramID(action.channel_id), "channel")
async def update(self, update) -> bool: async def update(self, update) -> bool:
if self._login_wait_fut:
await self._login_wait_fut
if not isinstance(update, (UpdateNewMessage, UpdateNewChannelMessage)): if not isinstance(update, (UpdateNewMessage, UpdateNewChannelMessage)):
return False return False
if isinstance(update.message, MessageService): if isinstance(update.message, MessageService):
-8
View File
@@ -35,12 +35,6 @@ Permissions = NamedTuple(
class Config(BaseBridgeConfig): class Config(BaseBridgeConfig):
def __getitem__(self, key: str) -> Any:
try:
return os.environ[f"MAUTRIX_TELEGRAM_{key.replace('.', '_').upper()}"]
except KeyError:
return super().__getitem__(key)
@property @property
def forbidden_defaults(self) -> List[ForbiddenDefault]: def forbidden_defaults(self) -> List[ForbiddenDefault]:
return [ return [
@@ -63,8 +57,6 @@ class Config(BaseBridgeConfig):
super().do_update(helper) super().do_update(helper)
copy, copy_dict, base = helper copy, copy_dict, base = helper
copy("homeserver.asmux")
if "appservice.protocol" in self and "appservice.address" not in self: if "appservice.protocol" in self and "appservice.address" not in self:
protocol, hostname, port = ( protocol, hostname, port = (
self["appservice.protocol"], self["appservice.protocol"],
+11
View File
@@ -152,6 +152,17 @@ class Message:
rows = await cls.db.fetch(q, mx_room, tg_space, *mxids) rows = await cls.db.fetch(q, mx_room, tg_space, *mxids)
return [cls._from_row(row) for row in rows] return [cls._from_row(row) for row in rows]
@classmethod
async def find_recent(
cls, mx_room: RoomID, not_sender: TelegramID, limit: int = 20
) -> list[Message]:
q = f"""
SELECT {cls.columns} FROM message
WHERE mx_room=$1 AND sender<>$2
ORDER BY tgid DESC LIMIT $3
"""
return [cls._from_row(row) for row in await cls.db.fetch(q, mx_room, not_sender, limit)]
@classmethod @classmethod
async def replace_temp_mxid(cls, temp_mxid: str, mx_room: RoomID, real_mxid: EventID) -> None: async def replace_temp_mxid(cls, temp_mxid: str, mx_room: RoomID, real_mxid: EventID) -> None:
q = "UPDATE message SET mxid=$1 WHERE mxid=$2 AND mx_room=$3" q = "UPDATE message SET mxid=$1 WHERE mxid=$2 AND mx_room=$3"
+8 -4
View File
@@ -50,6 +50,7 @@ class Puppet:
avatar_set: bool avatar_set: bool
is_bot: bool | None is_bot: bool | None
is_channel: bool is_channel: bool
is_premium: bool
custom_mxid: UserID | None custom_mxid: UserID | None
access_token: str | None access_token: str | None
@@ -67,7 +68,8 @@ class Puppet:
columns: ClassVar[str] = ( columns: ClassVar[str] = (
"id, is_registered, displayname, displayname_source, displayname_contact, " "id, is_registered, displayname, displayname_source, displayname_contact, "
"displayname_quality, disable_updates, username, phone, photo_id, avatar_url, " "displayname_quality, disable_updates, username, phone, photo_id, avatar_url, "
"name_set, avatar_set, is_bot, is_channel, custom_mxid, access_token, next_batch, base_url" "name_set, avatar_set, is_bot, is_channel, is_premium, "
"custom_mxid, access_token, next_batch, base_url"
) )
@classmethod @classmethod
@@ -108,6 +110,7 @@ class Puppet:
self.avatar_set, self.avatar_set,
self.is_bot, self.is_bot,
self.is_channel, self.is_channel,
self.is_premium,
self.custom_mxid, self.custom_mxid,
self.access_token, self.access_token,
self.next_batch, self.next_batch,
@@ -120,7 +123,7 @@ class Puppet:
SET is_registered=$2, displayname=$3, displayname_source=$4, displayname_contact=$5, SET is_registered=$2, displayname=$3, displayname_source=$4, displayname_contact=$5,
displayname_quality=$6, disable_updates=$7, username=$8, phone=$9, photo_id=$10, displayname_quality=$6, disable_updates=$7, username=$8, phone=$9, photo_id=$10,
avatar_url=$11, name_set=$12, avatar_set=$13, is_bot=$14, is_channel=$15, avatar_url=$11, name_set=$12, avatar_set=$13, is_bot=$14, is_channel=$15,
custom_mxid=$16, access_token=$17, next_batch=$18, base_url=$19 is_premium=$16, custom_mxid=$17, access_token=$18, next_batch=$19, base_url=$20
WHERE id=$1 WHERE id=$1
""" """
await self.db.execute(q, *self._values) await self.db.execute(q, *self._values)
@@ -130,8 +133,9 @@ class Puppet:
INSERT INTO puppet ( INSERT INTO puppet (
id, is_registered, displayname, displayname_source, displayname_contact, id, is_registered, displayname, displayname_source, displayname_contact,
displayname_quality, disable_updates, username, phone, photo_id, avatar_url, name_set, displayname_quality, disable_updates, username, phone, photo_id, avatar_url, name_set,
avatar_set, is_bot, is_channel, custom_mxid, access_token, next_batch, base_url avatar_set, is_bot, is_channel, is_premium, custom_mxid, access_token, next_batch,
base_url
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18,
$19) $19, $20)
""" """
await self.db.execute(q, *self._values) await self.db.execute(q, *self._values)
+15 -6
View File
@@ -19,6 +19,7 @@ from typing import TYPE_CHECKING, ClassVar
from asyncpg import Record from asyncpg import Record
from attr import dataclass from attr import dataclass
from telethon.tl.types import ReactionCustomEmoji, ReactionEmoji, TypeReaction
from mautrix.types import EventID, RoomID from mautrix.types import EventID, RoomID
from mautrix.util.async_db import Database from mautrix.util.async_db import Database
@@ -58,9 +59,10 @@ class Reaction:
@classmethod @classmethod
async def get_by_sender( async def get_by_sender(
cls, mxid: EventID, mx_room: RoomID, tg_sender: TelegramID cls, mxid: EventID, mx_room: RoomID, tg_sender: TelegramID
) -> Reaction | None: ) -> list[Reaction]:
q = f"SELECT {cls.columns} FROM reaction WHERE msg_mxid=$1 AND mx_room=$2 AND tg_sender=$3" q = f"SELECT {cls.columns} FROM reaction WHERE msg_mxid=$1 AND mx_room=$2 AND tg_sender=$3"
return cls._from_row(await cls.db.fetchrow(q, mxid, mx_room, tg_sender)) rows = await cls.db.fetch(q, mxid, mx_room, tg_sender)
return [cls._from_row(row) for row in rows]
@classmethod @classmethod
async def get_all_by_message(cls, mxid: EventID, mx_room: RoomID) -> list[Reaction]: async def get_all_by_message(cls, mxid: EventID, mx_room: RoomID) -> list[Reaction]:
@@ -68,6 +70,13 @@ class Reaction:
rows = await cls.db.fetch(q, mxid, mx_room) rows = await cls.db.fetch(q, mxid, mx_room)
return [cls._from_row(row) for row in rows] return [cls._from_row(row) for row in rows]
@property
def telegram(self) -> TypeReaction:
if self.reaction.isdecimal():
return ReactionCustomEmoji(document_id=int(self.reaction))
else:
return ReactionEmoji(emoticon=self.reaction)
@property @property
def _values(self): def _values(self):
return ( return (
@@ -81,11 +90,11 @@ class Reaction:
async def save(self) -> None: async def save(self) -> None:
q = """ q = """
INSERT INTO reaction (mxid, mx_room, msg_mxid, tg_sender, reaction) INSERT INTO reaction (mxid, mx_room, msg_mxid, tg_sender, reaction)
VALUES ($1, $2, $3, $4, $5) ON CONFLICT (msg_mxid, mx_room, tg_sender) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (msg_mxid, mx_room, tg_sender, reaction)
DO UPDATE SET mxid=$1, reaction=$5 DO UPDATE SET mxid=excluded.mxid
""" """
await self.db.execute(q, *self._values) await self.db.execute(q, *self._values)
async def delete(self) -> None: async def delete(self) -> None:
q = "DELETE FROM reaction WHERE msg_mxid=$1 AND mx_room=$2 AND tg_sender=$3" q = "DELETE FROM reaction WHERE msg_mxid=$1 AND mx_room=$2 AND tg_sender=$3 AND reaction=$4"
await self.db.execute(q, self.msg_mxid, self.mx_room, self.tg_sender) await self.db.execute(q, self.msg_mxid, self.mx_room, self.tg_sender, self.reaction)
+9 -1
View File
@@ -77,11 +77,19 @@ class TelegramFile:
file = cls._from_row(row) file = cls._from_row(row)
if file is None: if file is None:
return None return None
thumbnail_id = row.get("thumbnail", None) try:
thumbnail_id = row["thumbnail"]
except KeyError:
thumbnail_id = None
if thumbnail_id and not _thumbnail: if thumbnail_id and not _thumbnail:
file.thumbnail = await cls.get(thumbnail_id, _thumbnail=True) file.thumbnail = await cls.get(thumbnail_id, _thumbnail=True)
return file return file
@classmethod
async def find_by_mxc(cls, mxc: ContentURI) -> TelegramFile | None:
q = f"SELECT {cls.columns} FROM telegram_file WHERE mxc=$1"
return cls._from_row(await cls.db.fetchrow(q, mxc))
async def insert(self) -> None: async def insert(self) -> None:
q = ( q = (
"INSERT INTO telegram_file (id, mxc, mime_type, was_converted, size, width, height, " "INSERT INTO telegram_file (id, mxc, mime_type, was_converted, size, width, height, "
+1
View File
@@ -15,4 +15,5 @@ from . import (
v10_more_backfill_fields, v10_more_backfill_fields,
v11_backfill_queue, v11_backfill_queue,
v12_message_sender, v12_message_sender,
v13_multiple_reactions,
) )
@@ -13,12 +13,12 @@
# #
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from mautrix.util.async_db import Connection from mautrix.util.async_db import Connection, Scheme
latest_version = 10 latest_version = 13
async def create_latest_tables(conn: Connection) -> int: async def create_latest_tables(conn: Connection, scheme: Scheme) -> int:
await conn.execute( await conn.execute(
"""CREATE TABLE "user" ( """CREATE TABLE "user" (
mxid TEXT PRIMARY KEY, mxid TEXT PRIMARY KEY,
@@ -26,6 +26,7 @@ async def create_latest_tables(conn: Connection) -> int:
tg_username TEXT, tg_username TEXT,
tg_phone TEXT, tg_phone TEXT,
is_bot BOOLEAN NOT NULL DEFAULT false, is_bot BOOLEAN NOT NULL DEFAULT false,
is_premium BOOLEAN NOT NULL DEFAULT false,
saved_contacts INTEGER NOT NULL DEFAULT 0 saved_contacts INTEGER NOT NULL DEFAULT 0
)""" )"""
) )
@@ -66,6 +67,8 @@ async def create_latest_tables(conn: Connection) -> int:
edit_index INTEGER, edit_index INTEGER,
redacted BOOLEAN NOT NULL DEFAULT false, redacted BOOLEAN NOT NULL DEFAULT false,
content_hash bytea, content_hash bytea,
sender_mxid TEXT,
sender BIGINT,
PRIMARY KEY (tgid, tg_space, edit_index), PRIMARY KEY (tgid, tg_space, edit_index),
UNIQUE (mxid, mx_room, tg_space) UNIQUE (mxid, mx_room, tg_space)
)""" )"""
@@ -78,7 +81,7 @@ async def create_latest_tables(conn: Connection) -> int:
tg_sender BIGINT, tg_sender BIGINT,
reaction TEXT NOT NULL, reaction TEXT NOT NULL,
PRIMARY KEY (msg_mxid, mx_room, tg_sender), PRIMARY KEY (msg_mxid, mx_room, tg_sender, reaction),
UNIQUE (mxid, mx_room) UNIQUE (mxid, mx_room)
)""" )"""
) )
@@ -111,6 +114,7 @@ async def create_latest_tables(conn: Connection) -> int:
avatar_set BOOLEAN NOT NULL DEFAULT false, avatar_set BOOLEAN NOT NULL DEFAULT false,
is_bot BOOLEAN, is_bot BOOLEAN,
is_channel BOOLEAN NOT NULL DEFAULT false, is_channel BOOLEAN NOT NULL DEFAULT false,
is_premium BOOLEAN NOT NULL DEFAULT false,
access_token TEXT, access_token TEXT,
custom_mxid TEXT, custom_mxid TEXT,
@@ -135,6 +139,7 @@ async def create_latest_tables(conn: Connection) -> int:
ON UPDATE CASCADE ON DELETE SET NULL ON UPDATE CASCADE ON DELETE SET NULL
)""" )"""
) )
await conn.execute("CREATE INDEX telegram_file_mxc_idx ON telegram_file(mxc)")
await conn.execute( await conn.execute(
"""CREATE TABLE bot_chat ( """CREATE TABLE bot_chat (
id BIGINT PRIMARY KEY, id BIGINT PRIMARY KEY,
@@ -204,4 +209,28 @@ async def create_latest_tables(conn: Connection) -> int:
PRIMARY KEY (session_id, entity_id) PRIMARY KEY (session_id, entity_id)
)""" )"""
) )
gen = ""
if scheme in (Scheme.POSTGRES, Scheme.COCKROACH):
gen = "GENERATED ALWAYS AS IDENTITY"
await conn.execute(
f"""
CREATE TABLE backfill_queue (
queue_id INTEGER PRIMARY KEY {gen},
user_mxid TEXT,
priority INTEGER NOT NULL,
portal_tgid BIGINT,
portal_tg_receiver BIGINT,
messages_per_batch INTEGER NOT NULL,
post_batch_delay INTEGER NOT NULL,
max_batches INTEGER NOT NULL,
dispatch_time TIMESTAMP,
completed_at TIMESTAMP,
cooldown_timeout TIMESTAMP,
FOREIGN KEY (user_mxid) REFERENCES "user"(mxid) ON DELETE CASCADE ON UPDATE CASCADE,
FOREIGN KEY (portal_tgid, portal_tg_receiver)
REFERENCES portal(tgid, tg_receiver) ON DELETE CASCADE
)
"""
)
return latest_version return latest_version
@@ -24,29 +24,21 @@ legacy_version_query = "SELECT version_num FROM alembic_version"
last_legacy_version = "bfc0a39bfe02" last_legacy_version = "bfc0a39bfe02"
def table_exists(scheme: str, name: str) -> str: async def first_upgrade_target(conn: Connection, scheme: Scheme) -> int:
if scheme == Scheme.SQLITE: is_legacy = await conn.table_exists("alembic_version")
return f"SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE type='table' AND name='{name}')"
elif scheme in (Scheme.POSTGRES, Scheme.COCKROACH):
return f"SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_name='{name}')"
raise RuntimeError("unsupported database scheme")
async def first_upgrade_target(conn: Connection, scheme: str) -> int:
is_legacy = await conn.fetchval(table_exists(scheme, "alembic_version"))
# If it's a legacy db, the upgrade process will go to v1 and run each migration up to latest. # If it's a legacy db, the upgrade process will go to v1 and run each migration up to latest.
# If it's a new db, we'll create the latest tables directly (see create_latest_tables call). # If it's a new db, we'll create the latest tables directly (see create_latest_tables call).
return 1 if is_legacy else latest_version return 1 if is_legacy else latest_version
@upgrade_table.register(description="Initial asyncpg revision", upgrades_to=first_upgrade_target) @upgrade_table.register(description="Initial asyncpg revision", upgrades_to=first_upgrade_target)
async def upgrade_v1(conn: Connection, scheme: str) -> int: async def upgrade_v1(conn: Connection, scheme: Scheme) -> int:
is_legacy = await conn.fetchval(table_exists(scheme, "alembic_version")) is_legacy = await conn.table_exists("alembic_version")
if is_legacy: if is_legacy:
await migrate_legacy_to_v1(conn, scheme) await migrate_legacy_to_v1(conn, scheme)
return 1 return 1
else: else:
return await create_latest_tables(conn) return await create_latest_tables(conn, scheme)
async def drop_constraints(conn: Connection, table: str, contype: str) -> None: async def drop_constraints(conn: Connection, table: str, contype: str) -> None:
@@ -59,14 +51,14 @@ async def drop_constraints(conn: Connection, table: str, contype: str) -> None:
await conn.execute(f"ALTER TABLE {table} {drops}") await conn.execute(f"ALTER TABLE {table} {drops}")
async def migrate_legacy_to_v1(conn: Connection, scheme: str) -> None: async def migrate_legacy_to_v1(conn: Connection, scheme: Scheme) -> None:
legacy_version = await conn.fetchval(legacy_version_query) legacy_version = await conn.fetchval(legacy_version_query)
if legacy_version != last_legacy_version: if legacy_version != last_legacy_version:
raise RuntimeError( raise RuntimeError(
"Legacy database is not on last version. " "Legacy database is not on last version. "
"Please upgrade the old database with alembic or drop it completely first." "Please upgrade the old database with alembic or drop it completely first."
) )
if scheme != "sqlite": if scheme != Scheme.SQLITE:
await drop_constraints(conn, "contact", contype="f") await drop_constraints(conn, "contact", contype="f")
await conn.execute( await conn.execute(
""" """
@@ -131,12 +123,12 @@ async def migrate_legacy_to_v1(conn: Connection, scheme: str) -> None:
await conn.execute("DROP TABLE alembic_version") await conn.execute("DROP TABLE alembic_version")
async def update_state_store(conn: Connection, scheme: str) -> None: async def update_state_store(conn: Connection, scheme: Scheme) -> None:
# The Matrix state store already has more or less the correct schema, so set the version # The Matrix state store already has more or less the correct schema, so set the version
await conn.execute("CREATE TABLE mx_version (version INTEGER PRIMARY KEY)") await conn.execute("CREATE TABLE mx_version (version INTEGER PRIMARY KEY)")
await conn.execute("INSERT INTO mx_version (version) VALUES (2)") await conn.execute("INSERT INTO mx_version (version) VALUES (2)")
await conn.execute("UPDATE mx_user_profile SET membership='LEAVE' WHERE membership='LEFT'") await conn.execute("UPDATE mx_user_profile SET membership='LEAVE' WHERE membership='LEFT'")
if scheme != "sqlite": if scheme != Scheme.SQLITE:
# Also add the membership type on postgres # Also add the membership type on postgres
await conn.execute( await conn.execute(
"CREATE TYPE membership AS ENUM ('join', 'leave', 'invite', 'ban', 'knock')" "CREATE TYPE membership AS ENUM ('join', 'leave', 'invite', 'ban', 'knock')"
@@ -0,0 +1,54 @@
# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2022 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from mautrix.util.async_db import Connection, Scheme
from . import upgrade_table
@upgrade_table.register(description="Allow multiple reactions from the same user")
async def upgrade_v13(conn: Connection, scheme: Scheme) -> None:
await conn.execute("CREATE INDEX telegram_file_mxc_idx ON telegram_file(mxc)")
await conn.execute('ALTER TABLE "user" ADD COLUMN is_premium BOOLEAN NOT NULL DEFAULT false')
await conn.execute("ALTER TABLE puppet ADD COLUMN is_premium BOOLEAN NOT NULL DEFAULT false")
if scheme == Scheme.POSTGRES:
await conn.execute(
"""
ALTER TABLE reaction
DROP CONSTRAINT reaction_pkey,
ADD CONSTRAINT reaction_pkey PRIMARY KEY (msg_mxid, mx_room, tg_sender, reaction)
"""
)
else:
await conn.execute(
"""CREATE TABLE new_reaction (
mxid TEXT NOT NULL,
mx_room TEXT NOT NULL,
msg_mxid TEXT NOT NULL,
tg_sender BIGINT,
reaction TEXT NOT NULL,
PRIMARY KEY (msg_mxid, mx_room, tg_sender, reaction),
UNIQUE (mxid, mx_room)
)"""
)
await conn.execute(
"""
INSERT INTO new_reaction (mxid, mx_room, msg_mxid, tg_sender, reaction)
SELECT mxid, mx_room, msg_mxid, tg_sender, reaction FROM reaction
"""
)
await conn.execute("DROP TABLE reaction")
await conn.execute("ALTER TABLE new_reaction RENAME TO reaction")
+14 -9
View File
@@ -37,6 +37,7 @@ class User:
tg_username: str | None tg_username: str | None
tg_phone: str | None tg_phone: str | None
is_bot: bool is_bot: bool
is_premium: bool
saved_contacts: int saved_contacts: int
@classmethod @classmethod
@@ -45,7 +46,9 @@ class User:
return None return None
return cls(**row) return cls(**row)
columns: ClassVar[str] = "mxid, tgid, tg_username, tg_phone, is_bot, saved_contacts" columns: ClassVar[str] = ", ".join(
("mxid", "tgid", "tg_username", "tg_phone", "is_bot", "is_premium", "saved_contacts")
)
@classmethod @classmethod
async def get_by_tgid(cls, tgid: TelegramID) -> User | None: async def get_by_tgid(cls, tgid: TelegramID) -> User | None:
@@ -78,21 +81,23 @@ class User:
self.tg_username, self.tg_username,
self.tg_phone, self.tg_phone,
self.is_bot, self.is_bot,
self.is_premium,
self.saved_contacts, self.saved_contacts,
) )
async def save(self) -> None: async def save(self) -> None:
q = ( q = """
'UPDATE "user" SET tgid=$2, tg_username=$3, tg_phone=$4, is_bot=$5, saved_contacts=$6 ' UPDATE "user" SET tgid=$2, tg_username=$3, tg_phone=$4, is_bot=$5, is_premium=$6,
"WHERE mxid=$1" saved_contacts=$7
) WHERE mxid=$1
"""
await self.db.execute(q, *self._values) await self.db.execute(q, *self._values)
async def insert(self) -> None: async def insert(self) -> None:
q = ( q = """
'INSERT INTO "user" (mxid, tgid, tg_username, tg_phone, is_bot, saved_contacts) ' INSERT INTO "user" (mxid, tgid, tg_username, tg_phone, is_bot, is_premium, saved_contacts)
"VALUES ($1, $2, $3, $4, $5, $6)" VALUES ($1, $2, $3, $4, $5, $6, $7)
) """
await self.db.execute(q, *self._values) await self.db.execute(q, *self._values)
async def get_contacts(self) -> list[TelegramID]: async def get_contacts(self) -> list[TelegramID]:
+6 -1
View File
@@ -7,7 +7,9 @@ homeserver:
# Whether or not to verify the SSL certificate of the homeserver. # Whether or not to verify the SSL certificate of the homeserver.
# Only applies if address starts with https:// # Only applies if address starts with https://
verify_ssl: true verify_ssl: true
asmux: false # What software is the homeserver running?
# Standard Matrix homeservers like Synapse, Dendrite and Conduit should just use "standard" here.
software: standard
# Number of retries for all HTTP requests if the homeserver isn't reachable. # Number of retries for all HTTP requests if the homeserver isn't reachable.
http_retry_count: 4 http_retry_count: 4
# The URL to push real-time bridge status to. # The URL to push real-time bridge status to.
@@ -45,6 +47,7 @@ appservice:
# https://magicstack.github.io/asyncpg/current/api/index.html#asyncpg.pool.create_pool # https://magicstack.github.io/asyncpg/current/api/index.html#asyncpg.pool.create_pool
# https://docs.python.org/3/library/sqlite3.html#sqlite3.connect # https://docs.python.org/3/library/sqlite3.html#sqlite3.connect
# For sqlite, min_size is used as the connection thread pool size and max_size is ignored. # For sqlite, min_size is used as the connection thread pool size and max_size is ignored.
# Additionally, SQLite supports init_commands as an array of SQL queries to run on connect (e.g. to set PRAGMAs).
database_opts: database_opts:
min_size: 1 min_size: 1
max_size: 10 max_size: 10
@@ -255,6 +258,8 @@ bridge:
# Default to encryption, force-enable encryption in all portals the bridge creates # Default to encryption, force-enable encryption in all portals the bridge creates
# This will cause the bridge bot to be in private chats for the encryption to work properly. # This will cause the bridge bot to be in private chats for the encryption to work properly.
default: false default: false
# Whether to use MSC2409/MSC3202 instead of /sync long polling for receiving encryption-related data.
appservice: false
# Require encryption, drop any unencrypted messages. # Require encryption, drop any unencrypted messages.
require: false require: false
# Enable key sharing? If enabled, key requests for rooms where users are in will be fulfilled. # Enable key sharing? If enabled, key requests for rooms where users are in will be fulfilled.
+194 -98
View File
@@ -16,6 +16,7 @@
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING, Any, AsyncGenerator, Awaitable, Callable, List, Union, cast from typing import TYPE_CHECKING, Any, AsyncGenerator, Awaitable, Callable, List, Union, cast
from collections import defaultdict
from datetime import datetime from datetime import datetime
from html import escape as escape_html from html import escape as escape_html
from sqlite3 import IntegrityError from sqlite3 import IntegrityError
@@ -52,6 +53,7 @@ from telethon.tl.functions.messages import (
EditChatTitleRequest, EditChatTitleRequest,
ExportChatInviteRequest, ExportChatInviteRequest,
GetMessageReactionsListRequest, GetMessageReactionsListRequest,
GetMessagesReactionsRequest,
MigrateChatRequest, MigrateChatRequest,
SendReactionRequest, SendReactionRequest,
SetTypingRequest, SetTypingRequest,
@@ -102,6 +104,8 @@ from telethon.tl.types import (
Photo, Photo,
PhotoEmpty, PhotoEmpty,
ReactionCount, ReactionCount,
ReactionCustomEmoji,
ReactionEmoji,
SendMessageCancelAction, SendMessageCancelAction,
SendMessageTypingAction, SendMessageTypingAction,
SponsoredMessage, SponsoredMessage,
@@ -113,11 +117,13 @@ from telethon.tl.types import (
TypeMessage, TypeMessage,
TypeMessageAction, TypeMessageAction,
TypePeer, TypePeer,
TypeReaction,
TypeUser, TypeUser,
TypeUserFull, TypeUserFull,
TypeUserProfilePhoto, TypeUserProfilePhoto,
UpdateChannelUserTyping, UpdateChannelUserTyping,
UpdateChatUserTyping, UpdateChatUserTyping,
UpdateMessageReactions,
UpdateNewMessage, UpdateNewMessage,
UpdateUserTyping, UpdateUserTyping,
User, User,
@@ -126,7 +132,6 @@ from telethon.tl.types import (
UserProfilePhotoEmpty, UserProfilePhotoEmpty,
) )
from telethon.utils import encode_waveform from telethon.utils import encode_waveform
import magic
from mautrix.appservice import DOUBLE_PUPPET_SOURCE_KEY, IntentAPI from mautrix.appservice import DOUBLE_PUPPET_SOURCE_KEY, IntentAPI
from mautrix.bridge import BasePortal, NotificationDisabler, RejectMatrixInvite, async_getter_lock from mautrix.bridge import BasePortal, NotificationDisabler, RejectMatrixInvite, async_getter_lock
@@ -161,7 +166,7 @@ from mautrix.types import (
UserID, UserID,
VideoInfo, VideoInfo,
) )
from mautrix.util import variation_selector from mautrix.util import magic, variation_selector
from mautrix.util.message_send_checkpoint import MessageSendCheckpointStatus from mautrix.util.message_send_checkpoint import MessageSendCheckpointStatus
from mautrix.util.simple_lock import SimpleLock from mautrix.util.simple_lock import SimpleLock
from mautrix.util.simple_template import SimpleTemplate from mautrix.util.simple_template import SimpleTemplate
@@ -181,6 +186,7 @@ from .db import (
Message as DBMessage, Message as DBMessage,
Portal as DBPortal, Portal as DBPortal,
Reaction as DBReaction, Reaction as DBReaction,
TelegramFile as DBTelegramFile,
) )
from .tgclient import MautrixTelegramClient from .tgclient import MautrixTelegramClient
from .types import TelegramID from .types import TelegramID
@@ -204,6 +210,8 @@ UpdateTyping = Union[UpdateUserTyping, UpdateChatUserTyping, UpdateChannelUserTy
TypeChatPhoto = Union[ChatPhoto, ChatPhotoEmpty, Photo, PhotoEmpty] TypeChatPhoto = Union[ChatPhoto, ChatPhotoEmpty, Photo, PhotoEmpty]
MediaHandler = Callable[["au.AbstractUser", IntentAPI, Message, RelatesTo], Awaitable[EventID]] MediaHandler = Callable[["au.AbstractUser", IntentAPI, Message, RelatesTo], Awaitable[EventID]]
REACTION_POLL_MIN_INTERVAL = 20
class BridgingError(Exception): class BridgingError(Exception):
pass pass
@@ -261,6 +269,8 @@ class Portal(DBPortal, BasePortal):
_sponsored_seen: dict[UserID, bool] _sponsored_seen: dict[UserID, bool]
_new_messages_after_sponsored: bool _new_messages_after_sponsored: bool
_prev_reaction_poll: dict[UserID, float]
_msg_conv: putil.TelegramMessageConverter _msg_conv: putil.TelegramMessageConverter
def __init__( def __init__(
@@ -331,6 +341,8 @@ class Portal(DBPortal, BasePortal):
self._new_messages_after_sponsored = True self._new_messages_after_sponsored = True
self._bridging_blocked_at_runtime = False self._bridging_blocked_at_runtime = False
self._prev_reaction_poll = defaultdict(lambda: 0.0)
self._msg_conv = putil.TelegramMessageConverter(self) self._msg_conv = putil.TelegramMessageConverter(self)
# region Properties # region Properties
@@ -1440,8 +1452,13 @@ class Portal(DBPortal, BasePortal):
await user.client.send_read_acknowledge( await user.client.send_read_acknowledge(
self.peer, max_id=message.tgid, clear_mentions=True, clear_reactions=True self.peer, max_id=message.tgid, clear_mentions=True, clear_reactions=True
) )
if self.peer_type == "channel" and not self.megagroup: if self.peer_type == "channel":
asyncio.create_task(self._try_handle_read_for_sponsored_msg(user, event_id, timestamp)) if not self.megagroup:
asyncio.create_task(
self._try_handle_read_for_sponsored_msg(user, event_id, timestamp)
)
else:
asyncio.create_task(self._poll_telegram_reactions(user))
async def _preproc_kick_ban( async def _preproc_kick_ban(
self, user: u.User | p.Puppet, source: u.User self, user: u.User | p.Puppet, source: u.User
@@ -2212,12 +2229,31 @@ class Portal(DBPortal, BasePortal):
await real_deleter.client.delete_messages(self.peer, [message.tgid]) await real_deleter.client.delete_messages(self.peer, [message.tgid])
async def handle_matrix_reaction( async def handle_matrix_reaction(
self, user: u.User, target_event_id: EventID, reaction: str, reaction_event_id: EventID self, user: u.User, target_event_id: EventID, emoji: str, reaction_event_id: EventID
) -> None: ) -> None:
emoji_id = emoji
reaction = ReactionEmoji(emoticon=variation_selector.remove(emoji))
if emoji.startswith("mxc://"):
db_reaction = await DBTelegramFile.find_by_mxc(ContentURI(emoji))
if not db_reaction or not db_reaction.id.isdecimal():
self.log.debug(f"Dropping unknown reaction {emoji} by {user.mxid}")
if not self.has_bot:
await self.main_intent.redact(
self.mxid, reaction_event_id, reason="Unrecognized custom emoji"
)
await self._send_bridge_error(
user,
Exception("Unrecognized custom emoji"),
reaction_event_id,
EventType.REACTION,
)
return
reaction = ReactionCustomEmoji(document_id=int(db_reaction.id))
emoji_id = db_reaction.id
try: try:
async with self.reaction_lock(target_event_id): async with self.reaction_lock(target_event_id):
await self._handle_matrix_reaction( await self._handle_matrix_reaction(
user, target_event_id, reaction, reaction_event_id user, target_event_id, emoji_id, reaction, reaction_event_id
) )
except IgnoredMessageError as e: except IgnoredMessageError as e:
self.log.debug(str(e)) self.log.debug(str(e))
@@ -2244,7 +2280,12 @@ class Portal(DBPortal, BasePortal):
asyncio.create_task(self._send_message_status(reaction_event_id, err=None)) asyncio.create_task(self._send_message_status(reaction_event_id, err=None))
async def _handle_matrix_reaction( async def _handle_matrix_reaction(
self, user: u.User, target_event_id: EventID, emoji: str, reaction_event_id: EventID self,
user: u.User,
target_event_id: EventID,
emoji_id: str,
reaction: TypeReaction,
reaction_event_id: EventID,
) -> None: ) -> None:
tg_space = self.tgid if self.peer_type == "channel" else user.tgid tg_space = self.tgid if self.peer_type == "channel" else user.tgid
msg = await DBMessage.get_by_mxid(target_event_id, self.mxid, tg_space) msg = await DBMessage.get_by_mxid(target_event_id, self.mxid, tg_space)
@@ -2259,23 +2300,34 @@ class Portal(DBPortal, BasePortal):
elif msg.edit_index != 0: elif msg.edit_index != 0:
raise IgnoredMessageError(f"Ignoring Matrix reaction to edit event {target_event_id}") raise IgnoredMessageError(f"Ignoring Matrix reaction to edit event {target_event_id}")
emoji = variation_selector.remove(emoji) existing_reacts = await DBReaction.get_by_sender(msg.mxid, msg.mx_room, user.tgid)
existing_react = await DBReaction.get_by_sender(msg.mxid, msg.mx_room, user.tgid) new_tg_reactions: list[TypeReaction] = []
await user.client(SendReactionRequest(peer=self.peer, msg_id=msg.tgid, reaction=emoji)) reactions_to_remove: list[DBReaction] = []
if existing_react: max_reactions = 3 if user.is_premium else 1
puppet = await user.get_puppet() max_reactions -= 1 # Leave one reaction of space for the new reaction
await puppet.intent_for(self).redact(existing_react.mx_room, existing_react.mxid) for db_reaction in existing_reacts:
existing_react.mxid = reaction_event_id if db_reaction.reaction == emoji_id:
existing_react.reaction = emoji raise IgnoredMessageError("Ignoring duplicate Matrix reaction")
await existing_react.save() if len(new_tg_reactions) < max_reactions:
else: new_tg_reactions.append(db_reaction.telegram)
await DBReaction( else:
mxid=reaction_event_id, reactions_to_remove.append(db_reaction)
mx_room=self.mxid, new_tg_reactions.append(reaction)
msg_mxid=msg.mxid,
tg_sender=user.tgid, await user.client(
reaction=emoji, SendReactionRequest(peer=self.peer, msg_id=msg.tgid, reaction=new_tg_reactions)
).save() )
puppet = await user.get_puppet()
for db_reaction in reactions_to_remove:
await db_reaction.delete()
await puppet.intent_for(self).redact(db_reaction.mx_room, db_reaction.mxid)
await DBReaction(
mxid=reaction_event_id,
mx_room=self.mxid,
msg_mxid=msg.mxid,
tg_sender=user.tgid,
reaction=emoji_id,
).save()
async def _update_telegram_power_level( async def _update_telegram_power_level(
self, sender: u.User, user_id: TelegramID, level: int self, sender: u.User, user_id: TelegramID, level: int
@@ -2352,7 +2404,7 @@ class Portal(DBPortal, BasePortal):
self.avatar_url = url self.avatar_url = url
file = await self.main_intent.download_media(url) file = await self.main_intent.download_media(url)
mime = magic.from_buffer(file, mime=True) mime = magic.mimetype(file)
ext = sane_mimetypes.guess_extension(mime) ext = sane_mimetypes.guess_extension(mime)
uploaded = await sender.client.upload_file(file, file_name=f"avatar{ext}") uploaded = await sender.client.upload_file(file, file_name=f"avatar{ext}")
photo = InputChatUploadedPhoto(file=uploaded) photo = InputChatUploadedPhoto(file=uploaded)
@@ -2444,7 +2496,7 @@ class Portal(DBPortal, BasePortal):
asyncio.create_task( asyncio.create_task(
self.try_handle_telegram_reactions(source, TelegramID(evt.id), evt.reactions) self.try_handle_telegram_reactions(source, TelegramID(evt.id), evt.reactions)
) )
sender_id = sender.tgid if sender else None sender_id = sender.tgid if sender else self.tgid
async with self.send_lock(sender_id, required=False): async with self.send_lock(sender_id, required=False):
tg_space = self.tgid if self.peer_type == "channel" else source.tgid tg_space = self.tgid if self.peer_type == "channel" else source.tgid
@@ -2681,35 +2733,46 @@ class Portal(DBPortal, BasePortal):
return False return False
def _split_dm_reaction_counts(self, counts: list[ReactionCount]) -> list[MessagePeerReaction]: def _split_dm_reaction_counts(self, counts: list[ReactionCount]) -> list[MessagePeerReaction]:
if len(counts) == 1: reactions = []
item = counts[0] for item in counts:
if item.count == 2: if item.count == 2:
return [ reactions += [
MessagePeerReaction(reaction=item.reaction, peer_id=PeerUser(self.tgid)), MessagePeerReaction(reaction=item.reaction, peer_id=PeerUser(self.tgid)),
MessagePeerReaction( MessagePeerReaction(
reaction=item.reaction, peer_id=PeerUser(self.tg_receiver) reaction=item.reaction, peer_id=PeerUser(self.tg_receiver)
), ),
] ]
elif item.count == 1: elif item.count == 1:
return [ reactions.append(
MessagePeerReaction( MessagePeerReaction(
reaction=item.reaction, reaction=item.reaction,
peer_id=PeerUser(self.tg_receiver if item.chosen else self.tgid), peer_id=PeerUser(self.tg_receiver if item.chosen_order else self.tgid),
), )
] )
elif len(counts) == 2: return reactions
item1, item2 = counts
return [ async def _poll_telegram_reactions(self, source: au.AbstractUser) -> None:
MessagePeerReaction( now = time.monotonic()
reaction=item1.reaction, if self._prev_reaction_poll[source.mxid] + REACTION_POLL_MIN_INTERVAL > now:
peer_id=PeerUser(self.tg_receiver if item1.chosen else self.tgid), self.log.trace(
), f"Not polling reactions through {source.mxid}, "
MessagePeerReaction( f"last poll was less than {REACTION_POLL_MIN_INTERVAL} seconds ago"
reaction=item2.reaction, )
peer_id=PeerUser(self.tg_receiver if item2.chosen else self.tgid), return
), self._prev_reaction_poll[source.mxid] = now
] self.log.debug(f"Polling reactions for recent messages through {source.mxid}")
return [] messages = await DBMessage.find_recent(self.mxid, source.tgid)
message_ids = [message.tgid for message in messages]
updates = await source.client(GetMessagesReactionsRequest(peer=self.peer, id=message_ids))
for user in updates.users:
user: User
puppet = await p.Puppet.get_by_tgid(TelegramID(user.id))
await puppet.update_info(source, user)
for upd in updates.updates:
if isinstance(upd, UpdateMessageReactions):
await self.handle_telegram_reactions(source, TelegramID(upd.msg_id), upd.reactions)
else:
self.log.warning(f"Unexpected update type {type(upd)} in get reactions response")
async def try_handle_telegram_reactions( async def try_handle_telegram_reactions(
self, self,
@@ -2732,9 +2795,15 @@ class Portal(DBPortal, BasePortal):
dbm: DBMessage | None = None, dbm: DBMessage | None = None,
timestamp: datetime | None = None, timestamp: datetime | None = None,
) -> None: ) -> None:
if self.peer_type == "channel" and not self.megagroup: total_count = sum(item.count for item in data.results)
recent_reactions = data.recent_reactions or []
if total_count > 0 and not recent_reactions and not data.can_see_list:
# We don't know who reacted in a channel, so we can't bridge it properly either # We don't know who reacted in a channel, so we can't bridge it properly either
return return
if self.peer_type == "channel" and not self.megagroup:
# This should never happen with the previous if
self.log.warning(f"Can see reaction list in channel ({data!s})")
# return
tg_space = self.tgid if self.peer_type == "channel" else source.tgid tg_space = self.tgid if self.peer_type == "channel" else source.tgid
if dbm is None: if dbm is None:
@@ -2742,69 +2811,109 @@ class Portal(DBPortal, BasePortal):
if dbm is None: if dbm is None:
return return
total_count = sum(item.count for item in data.results) if not recent_reactions or len(recent_reactions) < total_count:
recent_reactions = data.recent_reactions or []
if not recent_reactions and total_count > 0:
if self.peer_type == "user": if self.peer_type == "user":
recent_reactions = self._split_dm_reaction_counts(data.results) recent_reactions = self._split_dm_reaction_counts(data.results)
elif source.is_bot: elif source.is_bot:
# Can't fetch exact reaction senders as a bot # Can't fetch exact reaction senders as a bot
return return
else: else:
# TODO this doesn't work for some reason # TODO should calls to this be limited?
return resp = await source.client(
# resp = await source.client( GetMessageReactionsListRequest(peer=self.peer, id=dbm.tgid, limit=100)
# GetMessageReactionsListRequest(peer=self.peer, id=dbm.tgid, limit=20) )
# ) recent_reactions = resp.reactions
# recent_reactions = resp.reactions
async with self.reaction_lock(dbm.mxid): async with self.reaction_lock(dbm.mxid):
await self._handle_telegram_reactions_locked( await self._handle_telegram_reactions_locked(
dbm, recent_reactions, total_count, timestamp=timestamp source, dbm, recent_reactions, total_count, timestamp=timestamp
) )
@staticmethod
def _reactions_filter(lst: list[TypeReaction], existing: DBReaction) -> bool:
if not lst:
return False
for reaction in lst:
if isinstance(reaction, ReactionCustomEmoji) and existing.reaction == str(
reaction.document_id
):
lst.remove(reaction)
return True
elif isinstance(reaction, ReactionEmoji) and existing.reaction == reaction.emoticon:
lst.remove(reaction)
return True
return False
@staticmethod
async def _get_reaction_limit(sender: TelegramID) -> int:
puppet = await p.Puppet.get_by_tgid(sender, create=False)
if puppet and puppet.is_premium:
return 3
return 1
async def _handle_telegram_reactions_locked( async def _handle_telegram_reactions_locked(
self, self,
source: au.AbstractUser,
msg: DBMessage, msg: DBMessage,
reaction_list: list[MessagePeerReaction], reaction_list: list[MessagePeerReaction],
total_count: int, total_count: int,
timestamp: datetime | None = None, timestamp: datetime | None = None,
) -> None: ) -> None:
reactions = { reactions: dict[TelegramID, list[TypeReaction]] = {}
p.Puppet.get_id_from_peer(reaction.peer_id): reaction.reaction custom_emoji_ids: list[int] = []
for reaction in reaction_list for reaction in reaction_list:
if isinstance(reaction.peer_id, (PeerUser, PeerChannel)) if isinstance(reaction.peer_id, (PeerUser, PeerChannel)) and isinstance(
} reaction.reaction, (ReactionEmoji, ReactionCustomEmoji)
is_full = len(reactions) == total_count ):
reactions.setdefault(p.Puppet.get_id_from_peer(reaction.peer_id), []).append(
reaction.reaction
)
if isinstance(reaction.reaction, ReactionCustomEmoji):
custom_emoji_ids.append(reaction.reaction.document_id)
is_full = len(reaction_list) == total_count
custom_emojis = await util.transfer_custom_emojis_to_matrix(source, custom_emoji_ids)
existing_reactions = await DBReaction.get_all_by_message(msg.mxid, msg.mx_room) existing_reactions = await DBReaction.get_all_by_message(msg.mxid, msg.mx_room)
removed: list[DBReaction] = [] removed: list[DBReaction] = []
changed: list[tuple[DBReaction, str]] = []
for existing_reaction in existing_reactions: for existing_reaction in existing_reactions:
new_reaction = reactions.get(existing_reaction.tg_sender) sender_id = existing_reaction.tg_sender
if new_reaction is None: new_reactions = reactions.get(sender_id)
if is_full: if self._reactions_filter(new_reactions, existing_reaction):
if new_reactions is not None and len(new_reactions) == 0:
reactions.pop(sender_id)
else:
if is_full or (
new_reactions is not None
and len(new_reactions) == await self._get_reaction_limit(sender_id)
):
removed.append(existing_reaction) removed.append(existing_reaction)
# else: assume the reaction is still there, too much effort to fetch it # else: assume the reaction is still there, too much effort to fetch it
elif new_reaction == existing_reaction.reaction:
reactions.pop(existing_reaction.tg_sender)
else:
changed.append((existing_reaction, new_reaction))
for sender, new_emoji in reactions.items(): new_reaction: TypeReaction
self.log.debug(f"Bridging reaction {new_emoji} by {sender} to {msg.tgid}") for sender, new_reactions in reactions.items():
puppet: p.Puppet = await p.Puppet.get_by_tgid(sender) for new_reaction in new_reactions:
mxid = await puppet.intent_for(self).react( if isinstance(new_reaction, ReactionEmoji):
msg.mx_room, msg.mxid, variation_selector.add(new_emoji), timestamp=timestamp emoji_id = new_reaction.emoticon
) matrix_reaction = variation_selector.add(new_reaction.emoticon)
await DBReaction( elif isinstance(new_reaction, ReactionCustomEmoji):
mxid=mxid, emoji_id = str(new_reaction.document_id)
mx_room=msg.mx_room, matrix_reaction = custom_emojis[new_reaction.document_id].mxc
msg_mxid=msg.mxid, else:
tg_sender=sender, self.log.warning("Unknown reaction type %s", type(new_reaction))
reaction=new_emoji, continue
).save() self.log.debug(f"Bridging reaction {emoji_id} by {sender} to {msg.tgid}")
puppet: p.Puppet = await p.Puppet.get_by_tgid(sender)
mxid = await puppet.intent_for(self).react(
msg.mx_room, msg.mxid, matrix_reaction, timestamp=timestamp
)
await DBReaction(
mxid=mxid,
mx_room=msg.mx_room,
msg_mxid=msg.mxid,
tg_sender=sender,
reaction=emoji_id,
).save()
for removed_reaction in removed: for removed_reaction in removed:
self.log.debug( self.log.debug(
f"Removing reaction {removed_reaction.reaction} by {removed_reaction.tg_sender} " f"Removing reaction {removed_reaction.reaction} by {removed_reaction.tg_sender} "
@@ -2813,19 +2922,6 @@ class Portal(DBPortal, BasePortal):
puppet = await p.Puppet.get_by_tgid(removed_reaction.tg_sender) puppet = await p.Puppet.get_by_tgid(removed_reaction.tg_sender)
await puppet.intent_for(self).redact(removed_reaction.mx_room, removed_reaction.mxid) await puppet.intent_for(self).redact(removed_reaction.mx_room, removed_reaction.mxid)
await removed_reaction.delete() await removed_reaction.delete()
for changed_reaction, new_emoji in changed:
self.log.debug(
f"Updating reaction {changed_reaction.reaction} -> {new_emoji} "
f"by {changed_reaction.tg_sender} to {msg.tgid}"
)
puppet = await p.Puppet.get_by_tgid(changed_reaction.tg_sender)
intent = puppet.intent_for(self)
await intent.redact(changed_reaction.mx_room, changed_reaction.mxid)
changed_reaction.mxid = await intent.react(
msg.mx_room, msg.mxid, variation_selector.add(new_emoji), timestamp=timestamp
)
changed_reaction.reaction = new_emoji
await changed_reaction.save()
async def handle_telegram_message( async def handle_telegram_message(
self, source: au.AbstractUser, sender: p.Puppet | None, evt: Message self, source: au.AbstractUser, sender: p.Puppet | None, evt: Message
@@ -2850,7 +2946,7 @@ class Portal(DBPortal, BasePortal):
) )
return return
sender_id = sender.tgid if sender else None sender_id = sender.tgid if sender else self.tgid
async with self.send_lock(sender_id, required=False): async with self.send_lock(sender_id, required=False):
tg_space = self.tgid if self.peer_type == "channel" else source.tgid tg_space = self.tgid if self.peer_type == "channel" else source.tgid
@@ -24,6 +24,7 @@ from telethon.tl.types import (
ChannelParticipantBanned, ChannelParticipantBanned,
ChannelParticipantsRecent, ChannelParticipantsRecent,
ChannelParticipantsSearch, ChannelParticipantsSearch,
ChatParticipantsForbidden,
InputChannel, InputChannel,
InputUser, InputUser,
TypeChannelParticipant, TypeChannelParticipant,
@@ -93,6 +94,8 @@ async def get_users(
) -> list[TypeUser]: ) -> list[TypeUser]:
if peer_type == "chat": if peer_type == "chat":
chat = await client(GetFullChatRequest(chat_id=tgid)) chat = await client(GetFullChatRequest(chat_id=tgid))
if isinstance(chat.full_chat.participants, ChatParticipantsForbidden):
return []
users = list(_filter_participants(chat.users, chat.full_chat.participants.participants)) users = list(_filter_participants(chat.users, chat.full_chat.participants.participants))
return users[:limit] if limit > 0 else users return users[:limit] if limit > 0 else users
elif peer_type == "channel": elif peer_type == "channel":
+7 -1
View File
@@ -80,6 +80,7 @@ class Puppet(DBPuppet, BasePuppet):
avatar_set: bool = False, avatar_set: bool = False,
is_bot: bool = False, is_bot: bool = False,
is_channel: bool = False, is_channel: bool = False,
is_premium: bool = False,
custom_mxid: UserID | None = None, custom_mxid: UserID | None = None,
access_token: str | None = None, access_token: str | None = None,
next_batch: SyncToken | None = None, next_batch: SyncToken | None = None,
@@ -101,6 +102,7 @@ class Puppet(DBPuppet, BasePuppet):
avatar_set=avatar_set, avatar_set=avatar_set,
is_bot=is_bot, is_bot=is_bot,
is_channel=is_channel, is_channel=is_channel,
is_premium=is_premium,
custom_mxid=custom_mxid, custom_mxid=custom_mxid,
access_token=access_token, access_token=access_token,
next_batch=next_batch, next_batch=next_batch,
@@ -255,11 +257,15 @@ class Puppet(DBPuppet, BasePuppet):
async def update_info(self, source: au.AbstractUser, info: User | Channel) -> None: async def update_info(self, source: au.AbstractUser, info: User | Channel) -> None:
is_bot = False if isinstance(info, Channel) else info.bot is_bot = False if isinstance(info, Channel) else info.bot
is_premium = False if isinstance(info, Channel) else info.premium
is_channel = isinstance(info, Channel) is_channel = isinstance(info, Channel)
changed = is_bot != self.is_bot or is_channel != self.is_channel changed = (
is_bot != self.is_bot or is_channel != self.is_channel or is_premium != self.is_premium
)
self.is_bot = is_bot self.is_bot = is_bot
self.is_channel = is_channel self.is_channel = is_channel
self.is_premium = is_premium
if self.username != info.username: if self.username != info.username:
self.username = info.username self.username = info.username
+25 -13
View File
@@ -92,6 +92,7 @@ class User(DBUser, AbstractUser, BaseUser):
tg_username: str | None = None, tg_username: str | None = None,
tg_phone: str | None = None, tg_phone: str | None = None,
is_bot: bool = False, is_bot: bool = False,
is_premium: bool = False,
saved_contacts: int = 0, saved_contacts: int = 0,
) -> None: ) -> None:
super().__init__( super().__init__(
@@ -100,6 +101,7 @@ class User(DBUser, AbstractUser, BaseUser):
tg_username=tg_username, tg_username=tg_username,
tg_phone=tg_phone, tg_phone=tg_phone,
is_bot=is_bot, is_bot=is_bot,
is_premium=is_premium,
saved_contacts=saved_contacts, saved_contacts=saved_contacts,
) )
AbstractUser.__init__(self) AbstractUser.__init__(self)
@@ -290,7 +292,9 @@ class User(DBUser, AbstractUser, BaseUser):
self._track_metric(METRIC_CONNECTED, False) self._track_metric(METRIC_CONNECTED, False)
async def post_login(self, info: TLUser = None, first_login: bool = False) -> None: async def post_login(self, info: TLUser = None, first_login: bool = False) -> None:
if self.config["metrics.enabled"] and not self._track_connection_task: if (
self.config["metrics.enabled"] or self.config["homeserver.status_endpoint"]
) and not self._track_connection_task:
self._track_connection_task = asyncio.create_task(self._track_connection()) self._track_connection_task = asyncio.create_task(self._track_connection())
try: try:
@@ -369,6 +373,9 @@ class User(DBUser, AbstractUser, BaseUser):
if self.is_bot != info.bot: if self.is_bot != info.bot:
self.is_bot = info.bot self.is_bot = info.bot
changed = True changed = True
if self.is_premium != info.premium:
self.is_premium = info.premium
changed = True
if self.tg_username != info.username: if self.tg_username != info.username:
self.tg_username = info.username self.tg_username = info.username
changed = True changed = True
@@ -414,10 +421,11 @@ class User(DBUser, AbstractUser, BaseUser):
pass pass
self.tgid = None self.tgid = None
ok = await self.client.log_out() ok = await self.client.log_out()
await self.client.session.delete() sess = self.client.session
await self.stop()
await sess.delete()
await self.delete() await self.delete()
self.by_mxid.pop(self.mxid, None) self.by_mxid.pop(self.mxid, None)
await self.stop()
self._track_metric(METRIC_LOGGED_IN, False) self._track_metric(METRIC_LOGGED_IN, False)
return ok return ok
@@ -574,16 +582,19 @@ class User(DBUser, AbstractUser, BaseUser):
last_read = await DBMessage.get_one_by_tgid( last_read = await DBMessage.get_one_by_tgid(
portal.tgid, tg_space, dialog.dialog.read_inbox_max_id portal.tgid, tg_space, dialog.dialog.read_inbox_max_id
) )
if last_read: try:
await puppet.intent.mark_read(last_read.mx_room, last_read.mxid) if last_read:
if was_created or not self.config["bridge.tag_only_on_create"]: await puppet.intent.mark_read(last_read.mx_room, last_read.mxid)
await self._mute_room(puppet, portal, dialog.dialog.notify_settings.mute_until) if was_created or not self.config["bridge.tag_only_on_create"]:
await self._tag_room( await self._mute_room(puppet, portal, dialog.dialog.notify_settings.mute_until)
puppet, portal, self.config["bridge.pinned_tag"], dialog.pinned await self._tag_room(
) puppet, portal, self.config["bridge.pinned_tag"], dialog.pinned
await self._tag_room( )
puppet, portal, self.config["bridge.archive_tag"], dialog.archived await self._tag_room(
) puppet, portal, self.config["bridge.archive_tag"], dialog.archived
)
except Exception:
self.log.exception(f"Error updating read status and tags for {portal.tgid_log}")
async def get_cached_portals(self) -> dict[tuple[TelegramID, TelegramID], po.Portal]: async def get_cached_portals(self) -> dict[tuple[TelegramID, TelegramID], po.Portal]:
if self._portals_cache is None: if self._portals_cache is None:
@@ -686,6 +697,7 @@ class User(DBUser, AbstractUser, BaseUser):
await puppet.update_info(self, user) await puppet.update_info(self, user)
contacts[user.id] = puppet.contact_info contacts[user.id] = puppet.contact_info
await self.set_contacts(contacts.keys()) await self.set_contacts(contacts.keys())
self.log.debug("Contact syncing complete")
return contacts return contacts
# endregion # endregion
+1 -1
View File
@@ -1,4 +1,4 @@
from .color_log import ColorFormatter from .color_log import ColorFormatter
from .file_transfer import convert_image, transfer_file_to_matrix from .file_transfer import convert_image, transfer_custom_emojis_to_matrix, transfer_file_to_matrix
from .parallel_file_transfer import parallel_transfer_to_telegram from .parallel_file_transfer import parallel_transfer_to_telegram
from .recursive_dict import recursive_del, recursive_get, recursive_set from .recursive_dict import recursive_del, recursive_get, recursive_set
+4 -7
View File
@@ -42,9 +42,9 @@ from telethon.tl.types import (
PhotoSize, PhotoSize,
TypePhotoSize, TypePhotoSize,
) )
import magic
from mautrix.appservice import IntentAPI from mautrix.appservice import IntentAPI
from mautrix.util import magic
from .. import abstract_user as au from .. import abstract_user as au
from ..db import TelegramFile as DBTelegramFile from ..db import TelegramFile as DBTelegramFile
@@ -177,7 +177,7 @@ async def transfer_thumbnail_to_matrix(
else: else:
file = await client.download_file(thumbnail_loc) file = await client.download_file(thumbnail_loc)
width, height = None, None width, height = None, None
mime_type = magic.from_buffer(file, mime=True) mime_type = magic.mimetype(file)
decryption_info = None decryption_info = None
upload_mime_type = mime_type upload_mime_type = mime_type
@@ -335,13 +335,10 @@ async def _unlocked_transfer_file_to_matrix(
return None return None
width, height = None, None width, height = None, None
mime_type = magic.from_buffer(file, mime=True) mime_type = magic.mimetype(file)
image_converted = False image_converted = False
# A weird bug in alpine/magic makes it return application/octet-stream for gzips... is_tgs = mime_type == "application/gzip"
is_tgs = mime_type == "application/gzip" or (
mime_type == "application/octet-stream" and magic.from_buffer(file).startswith("gzip")
)
if is_sticker and tgs_convert and is_tgs: if is_sticker and tgs_convert and is_tgs:
converted_anim = await convert_tgs_to( converted_anim = await convert_tgs_to(
file, tgs_convert["target"], **tgs_convert["args"] file, tgs_convert["target"], **tgs_convert["args"]
+2 -2
View File
@@ -3,9 +3,9 @@ python-magic>=0.4,<0.5
commonmark>=0.8,<0.10 commonmark>=0.8,<0.10
aiohttp>=3,<4 aiohttp>=3,<4
yarl>=1,<2 yarl>=1,<2
mautrix>=0.17.8,<0.18 mautrix>=0.18.2,<0.19
#telethon>=1.24,<1.25 #telethon>=1.24,<1.25
tulir-telethon==1.25.0a20 tulir-telethon==1.26.0a5
asyncpg>=0.20,<0.27 asyncpg>=0.20,<0.27
mako>=1,<2 mako>=1,<2
setuptools setuptools