diff --git a/mautrix_telegram/commands/telegram/misc.py b/mautrix_telegram/commands/telegram/misc.py index f21f12ce..74ba53c8 100644 --- a/mautrix_telegram/commands/telegram/misc.py +++ b/mautrix_telegram/commands/telegram/misc.py @@ -18,6 +18,7 @@ from __future__ import annotations from typing import cast import base64 import codecs +import math import re from aiohttp import ClientSession, InvalidURL @@ -427,6 +428,9 @@ async def backfill(evt: CommandEvent) -> None: if not evt.is_portal: await evt.reply("You can only use backfill in portal rooms") return + elif not evt.config["bridge.backfill.enable"]: + await evt.reply("Backfilling is disabled in the bridge config") + return try: limit = int(evt.args[0]) except (ValueError, IndexError): @@ -435,16 +439,14 @@ async def backfill(evt: CommandEvent) -> None: if not evt.config["bridge.backfill.normal_groups"] and portal.peer_type == "chat": await evt.reply("Backfilling normal groups is disabled in the bridge config") return - try: - await portal.backfill(evt.sender, limit=limit) - except TakeoutInitDelayError: - msg = ( - "Please accept the data export request from a mobile device, " - "then re-run the backfill command." - ) - if portal.peer_type == "user": - from mautrix.appservice import IntentAPI - - await portal.main_intent.send_notice(evt.room_id, msg) - else: - await evt.reply(msg) + if portal.backfill_msc2716: + messages_per_batch = evt.config["bridge.backfill.incremental.messages_per_batch"] + batches = math.ceil(limit / messages_per_batch) + rounded = "" + if batches * messages_per_batch != limit: + rounded = f" (rounded message limit to {batches}*{messages_per_batch})" + await portal.enqueue_backfill(evt.sender, priority=0, max_batches=batches) + await evt.reply(f"Backfill queued{rounded}") + else: + output = await portal.forward_backfill(evt.sender, initial=False, override_limit=limit) + await evt.reply(output) diff --git a/mautrix_telegram/config.py b/mautrix_telegram/config.py index ba225d20..0d66c646 100644 --- a/mautrix_telegram/config.py +++ b/mautrix_telegram/config.py @@ -158,12 +158,18 @@ class Config(BaseBridgeConfig): copy("bridge.bridge_matrix_leave") copy("bridge.kick_on_logout") copy("bridge.always_read_joined_telegram_notice") - copy("bridge.backfill.invite_own_puppet") - copy("bridge.backfill.takeout_limit") - copy("bridge.backfill.initial_limit") - copy("bridge.backfill.missed_limit") - copy("bridge.backfill.disable_notifications") + copy("bridge.backfill.enable") + copy("bridge.backfill.msc2716") + copy("bridge.backfill.double_puppet_backfill") copy("bridge.backfill.normal_groups") + copy("bridge.backfill.forward.initial_limit") + copy("bridge.backfill.forward.sync_limit") + copy("bridge.backfill.incremental.messages_per_batch") + copy("bridge.backfill.incremental.post_batch_delay") + copy("bridge.backfill.incremental.max_batches.user") + copy("bridge.backfill.incremental.max_batches.normal_group") + copy("bridge.backfill.incremental.max_batches.supergroup") + copy("bridge.backfill.incremental.max_batches.channel") copy("bridge.initial_power_level_overrides.group") copy("bridge.initial_power_level_overrides.user") diff --git a/mautrix_telegram/db/__init__.py b/mautrix_telegram/db/__init__.py index 40f0706e..482f7867 100644 --- a/mautrix_telegram/db/__init__.py +++ b/mautrix_telegram/db/__init__.py @@ -15,6 +15,7 @@ # along with this program. If not, see . from mautrix.util.async_db import Database +from .backfill_queue import Backfill from .bot_chat import BotChat from .disappearing_message import DisappearingMessage from .message import Message @@ -38,6 +39,7 @@ def init(db: Database) -> None: BotChat, PgSession, DisappearingMessage, + Backfill, ): table.db = db @@ -54,4 +56,5 @@ __all__ = [ "BotChat", "PgSession", "DisappearingMessage", + "Backfill", ] diff --git a/mautrix_telegram/db/backfill_queue.py b/mautrix_telegram/db/backfill_queue.py new file mode 100644 index 00000000..e86c5def --- /dev/null +++ b/mautrix_telegram/db/backfill_queue.py @@ -0,0 +1,175 @@ +# mautrix-telegram - A Matrix-Telegram puppeting bridge +# Copyright (C) 2022 Tulir Asokan, Sumner Evans +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +from __future__ import annotations + +from typing import TYPE_CHECKING, ClassVar +from datetime import datetime, timedelta + +from asyncpg import Record +from attr import dataclass + +from mautrix.types import RoomID, UserID +from mautrix.util.async_db import Database + +fake_db = Database.create("") if TYPE_CHECKING else None + + +@dataclass +class Backfill: + db: ClassVar[Database] = fake_db + + queue_id: int | None + user_mxid: UserID + priority: int + portal_tgid: int + portal_tg_receiver: int + messages_per_batch: int + post_batch_delay: int + max_batches: int + dispatch_time: datetime | None + completed_at: datetime | None + cooldown_timeout: datetime | None + + @staticmethod + def new( + user_mxid: UserID, + priority: int, + portal_tgid: int, + portal_tg_receiver: int, + messages_per_batch: int, + post_batch_delay: int = 0, + max_batches: int = -1, + ) -> "Backfill": + return Backfill( + queue_id=None, + user_mxid=user_mxid, + priority=priority, + portal_tgid=portal_tgid, + portal_tg_receiver=portal_tg_receiver, + messages_per_batch=messages_per_batch, + post_batch_delay=post_batch_delay, + max_batches=max_batches, + dispatch_time=None, + completed_at=None, + cooldown_timeout=None, + ) + + @classmethod + def _from_row(cls, row: Record | None) -> Backfill | None: + if row is None: + return None + return cls(**row) + + columns = [ + "user_mxid", + "priority", + "portal_tgid", + "portal_tg_receiver", + "messages_per_batch", + "post_batch_delay", + "max_batches", + "dispatch_time", + "completed_at", + "cooldown_timeout", + ] + columns_str = ",".join(columns) + + @classmethod + async def get_next(cls, user_mxid: UserID) -> Backfill | None: + q = f""" + SELECT queue_id, {cls.columns_str} + FROM backfill_queue + WHERE user_mxid=$1 + AND ( + dispatch_time IS NULL + OR ( + dispatch_time < $2 + AND completed_at IS NULL + ) + ) + AND ( + cooldown_timeout IS NULL + OR cooldown_timeout < current_timestamp + ) + ORDER BY priority, queue_id + LIMIT 1 + """ + return cls._from_row( + await cls.db.fetchrow(q, user_mxid, datetime.now() - timedelta(minutes=15)) + ) + + @classmethod + async def get( + cls, + user_mxid: UserID, + portal_tgid: int, + portal_tg_receiver: int, + ) -> Backfill | None: + q = f""" + SELECT queue_id, {cls.columns_str} + FROM backfill_queue + WHERE user_mxid=$1 + AND portal_tgid=$2 + AND portal_tg_receiver=$3 + ORDER BY priority, queue_id + LIMIT 1 + """ + return cls._from_row(await cls.db.fetchrow(q, user_mxid, portal_tgid, portal_tg_receiver)) + + @classmethod + async def delete_all(cls, user_mxid: UserID) -> None: + await cls.db.execute("DELETE FROM backfill_queue WHERE user_mxid=$1", user_mxid) + + @classmethod + async def delete_for_portal(cls, tgid: int, tg_receiver: int) -> None: + q = "DELETE FROM backfill_queue WHERE portal_tgid=$1 AND portal_tg_receiver=$2" + await cls.db.execute(q, tgid, tg_receiver) + + async def insert(self) -> None: + q = f""" + INSERT INTO backfill_queue ({self.columns_str}) + VALUES ({','.join(f'${i+1}' for i in range(len(self.columns)))}) + RETURNING queue_id + """ + row = await self.db.fetchrow( + q, + self.user_mxid, + self.priority, + self.portal_tgid, + self.portal_tg_receiver, + self.messages_per_batch, + self.post_batch_delay, + self.max_batches, + self.dispatch_time, + self.completed_at, + self.cooldown_timeout, + ) + self.queue_id = row["queue_id"] + + async def mark_dispatched(self) -> None: + q = "UPDATE backfill_queue SET dispatch_time=$1 WHERE queue_id=$2" + await self.db.execute(q, datetime.now(), self.queue_id) + + async def mark_done(self) -> None: + q = "UPDATE backfill_queue SET completed_at=$1 WHERE queue_id=$2" + await self.db.execute(q, datetime.now(), self.queue_id) + + async def set_cooldown_timeout(self, timeout) -> None: + """ + Set the backfill request to cooldown for ``timeout`` seconds. + """ + q = "UPDATE backfill_queue SET cooldown_timeout=$1 WHERE queue_id=$2" + await self.db.execute(q, datetime.now() + timedelta(seconds=timeout), self.queue_id) diff --git a/mautrix_telegram/db/message.py b/mautrix_telegram/db/message.py index ae6c3c26..d30376f9 100644 --- a/mautrix_telegram/db/message.py +++ b/mautrix_telegram/db/message.py @@ -19,6 +19,7 @@ from typing import TYPE_CHECKING, ClassVar from asyncpg import Record from attr import dataclass +import attr from mautrix.types import EventID, RoomID, UserID from mautrix.util.async_db import Database, Scheme @@ -122,6 +123,14 @@ class Message: ) return cls._from_row(await cls.db.fetchrow(q, mx_room, tg_space)) + @classmethod + async def find_first(cls, mx_room: RoomID, tg_space: TelegramID) -> Message | None: + q = ( + f"SELECT {cls.columns} FROM message WHERE mx_room=$1 AND tg_space=$2 " + f"ORDER BY tgid ASC LIMIT 1" + ) + return cls._from_row(await cls.db.fetchrow(q, mx_room, tg_space)) + @classmethod async def delete_all(cls, mx_room: RoomID) -> None: await cls.db.execute("DELETE FROM message WHERE mx_room=$1", mx_room) @@ -173,6 +182,23 @@ class Message: q = "DELETE FROM message WHERE mxid=$1 AND mx_room=$2" await cls.db.execute(q, temp_mxid, mx_room) + @classmethod + async def bulk_insert(cls, messages: list[Message]) -> None: + columns = cls.columns.split(", ") + records = [attr.astuple(message) for message in messages] + async with cls.db.acquire() as conn, conn.transaction(): + if cls.db.scheme == Scheme.POSTGRES: + await conn.copy_records_to_table("message", records=records, columns=columns) + else: + await conn.executemany(cls._insert_query, records) + + _insert_query: ClassVar[ + str + ] = """ + INSERT INTO message (mxid, mx_room, tgid, tg_space, edit_index, redacted, content_hash, sender_mxid, sender) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + """ + @property def _values(self): return ( @@ -188,13 +214,7 @@ class Message: ) async def insert(self) -> None: - q = """ - INSERT INTO message ( - mxid, mx_room, tgid, tg_space, edit_index, redacted, content_hash, - sender_mxid, sender - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) - """ - await self.db.execute(q, *self._values) + await self.db.execute(self._insert_query, *self._values) async def delete(self) -> None: q = "DELETE FROM message WHERE mxid=$1 AND mx_room=$2 AND tg_space=$3" diff --git a/mautrix_telegram/example-config.yaml b/mautrix_telegram/example-config.yaml index 5ec4c552..6937327c 100644 --- a/mautrix_telegram/example-config.yaml +++ b/mautrix_telegram/example-config.yaml @@ -337,34 +337,49 @@ bridge: create_group_on_invite: true # Settings for backfilling messages from Telegram. backfill: - # Whether or not the Telegram ghosts of logged in Matrix users should be - # invited to private chats when backfilling history from Telegram. This is - # usually needed to prevent rate limits and to allow timestamp massaging. - invite_own_puppet: true - # Maximum number of messages to backfill without using a takeout. - # The first time a takeout is used, the user has to manually approve it from a different - # device. If initial_limit or missed_limit are higher than this value, the bridge will ask - # the user to accept the takeout after logging in before syncing any chats. - takeout_limit: 100 - # Maximum number of messages to backfill initially. - # Set to 0 to disable backfilling when creating portal, or -1 to disable the limit. + # Allow backfilling at all? + enable: true + # Use MSC2716 for backfilling? # - # N.B. Initial backfill will only start after member sync. Make sure your - # max_initial_member_sync is set to a low enough value so it doesn't take forever. - initial_limit: 0 - # Maximum number of messages to backfill if messages were missed while the bridge was - # disconnected. Note that this only works for logged in users and only if the chat isn't - # older than sync_update_limit - # Set to 0 to disable backfilling missed messages. - missed_limit: 50 - # If using double puppeting, should notifications be disabled - # while the initial backfill is in progress? - disable_notifications: false + # This requires a server with MSC2716 support, which is currently an experimental feature in Synapse. + # It can be enabled by setting experimental_features -> msc2716_enabled to true in homeserver.yaml. + msc2716: false + # Use double puppets for backfilling? + # + # If using MSC2716, the double puppets must be in the appservice's user ID namespace + # (because the bridge can't use the double puppet access token with batch sending). + double_puppet_backfill: false # Whether or not to enable backfilling in normal groups. # Normal groups have numerous technical problems in Telegram, and backfilling normal groups # will likely cause problems if there are multiple Matrix users in the group. normal_groups: false + # Forward backfilling limits. These apply to both MSC2716 and legacy backfill. + forward: + # Number of messages to backfill immediately after creating a portal. + initial_limit: 10 + # Number of messages to backfill when syncing chats. + sync_limit: 100 + + # Settings for incremental backfill of history. These only apply when using MSC2716. + incremental: + # Maximum number of messages to backfill per batch. + messages_per_batch: 100 + # The number of seconds to wait after backfilling the batch of messages. + post_batch_delay: 20 + # The maximum number of batches to backfill per portal, split by the chat type. + # If set to -1, all messages in the chat will eventually be backfilled. + max_batches: + # Direct chats + user: -1 + # Normal groups. Note that the normal_groups option above must be enabled + # for these to be backfilled. + normal_group: -1 + # Supergroups + supergroup: 10 + # Broadcast channels + channel: -1 + # Overrides for base power levels. initial_power_level_overrides: user: {} diff --git a/mautrix_telegram/matrix.py b/mautrix_telegram/matrix.py index 08a741f4..3c163d58 100644 --- a/mautrix_telegram/matrix.py +++ b/mautrix_telegram/matrix.py @@ -16,15 +16,14 @@ from __future__ import annotations from typing import TYPE_CHECKING +import sys from mautrix.bridge import BaseMatrixHandler -from mautrix.errors import MatrixError from mautrix.types import ( Event, EventID, EventType, MemberStateEventContent, - MessageType, PresenceEvent, PresenceState, ReactionEvent, @@ -36,7 +35,6 @@ from mautrix.types import ( RoomTopicStateEventContent as TopicContent, SingleReceiptEventContent, StateEvent, - TextMessageEventContent, TypingEvent, UserID, ) @@ -63,6 +61,21 @@ class MatrixHandler(BaseMatrixHandler): self._previously_typing = {} + async def check_versions(self) -> None: + await super().check_versions() + if self.config["bridge.backfill.msc2716"] and not ( + support := self.versions.supports("org.matrix.msc2716") + ): + self.log.fatal( + "Backfilling with MSC2716 is enabled in bridge config, but " + + ( + "batch sending is not enabled on homeserver" + if support is False + else "homeserver does not support batch sending" + ) + ) + sys.exit(18) + async def handle_puppet_group_invite( self, room_id: RoomID, diff --git a/mautrix_telegram/portal.py b/mautrix_telegram/portal.py index 9e6ab0cf..3d883c1d 100644 --- a/mautrix_telegram/portal.py +++ b/mautrix_telegram/portal.py @@ -132,12 +132,15 @@ from telethon.tl.types import ( UserProfilePhotoEmpty, ) from telethon.utils import encode_waveform +import attr from mautrix.appservice import DOUBLE_PUPPET_SOURCE_KEY, IntentAPI from mautrix.bridge import BasePortal, NotificationDisabler, RejectMatrixInvite, async_getter_lock from mautrix.errors import IntentError, MatrixRequestError, MForbidden from mautrix.types import ( BatchID, + BatchSendEvent, + BatchSendStateEvent, BeeperMessageStatusEventContent, ContentURI, EventID, @@ -148,6 +151,7 @@ from mautrix.types import ( LocationMessageEventContent, MediaMessageEventContent, Membership, + MemberStateEventContent, MessageEventContent, MessageStatus, MessageStatusReason, @@ -182,6 +186,7 @@ from . import ( ) from .config import Config from .db import ( + Backfill, DisappearingMessage, Message as DBMessage, Portal as DBPortal, @@ -204,6 +209,7 @@ if TYPE_CHECKING: StateBridge = EventType.find("m.bridge", EventType.Class.STATE) StateHalfShotBridge = EventType.find("uk.half-shot.bridge", EventType.Class.STATE) DummyPortalCreated = EventType.find("fi.mau.dummy.portal_created", EventType.Class.MESSAGE) +StateMarker = EventType.find("org.matrix.msc2716.marker", EventType.Class.STATE) InviteList = Union[UserID, List[UserID]] UpdateTyping = Union[UpdateUserTyping, UpdateChatUserTyping, UpdateChannelUserTyping] @@ -250,6 +256,7 @@ class Portal(DBPortal, BasePortal): backfill_lock: SimpleLock backfill_method_lock: asyncio.Lock backfill_leave: set[IntentAPI] | None + backfill_msc2716: bool alias: RoomAlias | None @@ -269,6 +276,7 @@ class Portal(DBPortal, BasePortal): _sponsored_seen: dict[UserID, bool] _new_messages_after_sponsored: bool + _member_list_cache: dict[EventID, set[UserID]] _prev_reaction_poll: dict[UserID, float] _msg_conv: putil.TelegramMessageConverter @@ -322,6 +330,7 @@ class Portal(DBPortal, BasePortal): self.log = self.log.getChild(self.tgid_log if self.tgid else self.mxid) self._main_intent = None self.deleted = False + self.backfill_lock = SimpleLock( "Waiting for backfilling to finish before handling %s", log=self.log ) @@ -341,6 +350,7 @@ class Portal(DBPortal, BasePortal): self._new_messages_after_sponsored = True self._bridging_blocked_at_runtime = False + self._member_list_cache = {} self._prev_reaction_poll = defaultdict(lambda: 0.0) self._msg_conv = putil.TelegramMessageConverter(self) @@ -428,6 +438,7 @@ class Portal(DBPortal, BasePortal): cls.filter_mode = cls.config["bridge.filter.mode"] cls.filter_list = cls.config["bridge.filter.list"] cls.hs_domain = cls.config["homeserver.domain"] + cls.backfill_msc2716 = cls.config["bridge.backfill.msc2716"] cls.alias_template = SimpleTemplate( cls.config["bridge.alias_template"], "groupname", @@ -905,32 +916,30 @@ class Portal(DBPortal, BasePortal): self.mxid = room_id self.by_mxid[self.mxid] = self - self.first_event_id = await self.main_intent.send_message_event( - self.mxid, DummyPortalCreated, {} - ) await self.save() self.log.debug(f"Matrix room created: {self.mxid}") await self.az.state_store.set_power_levels(self.mxid, power_levels) await user.register_portal(self) await self.invite_to_matrix(invites) + await self.update_matrix_room(user, entity, puppet, levels=power_levels, users=users) - update_room = asyncio.create_task( - self.update_matrix_room(user, entity, puppet, levels=power_levels, users=users) + self.first_event_id = await self.main_intent.send_message_event( + self.mxid, DummyPortalCreated, {} ) - - if self.config["bridge.backfill.initial_limit"] > 0: - self.log.debug( - "Initial backfill is enabled. Waiting for room members to sync " - "and then starting backfill" + if not self.bridge.homeserver_software.is_hungry: + self._member_list_cache[self.first_event_id] = set( + (await self.main_intent.get_joined_members(self.mxid)).keys() ) - await update_room + await self.save() + if isinstance(user, u.User) or not self.backfill_msc2716: try: - if isinstance(user, u.User): - await self.backfill(user, is_initial=True) + await self.forward_backfill(user, initial=True) except Exception: - self.log.exception("Failed to backfill new portal") + self.log.exception("Error in initial backfill") + if self.backfill_msc2716: + await self.enqueue_backfill(user, priority=50) return self.mxid @@ -2595,142 +2604,188 @@ class Portal(DBPortal, BasePortal): await DBMessage.replace_temp_mxid(temporary_identifier, self.mxid, event_id) @property - def _takeout_options(self) -> dict[str, bool | int]: - return { - "files": True, - "megagroups": self.megagroup, - "chats": self.peer_type == "chat", - "users": self.peer_type == "user", - "channels": (self.peer_type == "channel" and not self.megagroup), - "max_file_size": min(self.matrix.media_config.upload_size, 2000 * 1024 * 1024), - } + def _default_max_batches(self) -> int: + if self.peer_type == "user": + own_type = "user" + elif self.peer_type == "chat": + own_type = "normal_group" + elif self.megagroup: + own_type = "supergroup" + else: + own_type = "channel" + return self.config[f"bridge.backfill.incremental.max_batches.{own_type}"] + + async def enqueue_backfill( + self, + source: u.User, + priority: int, + max_batches: int | None = None, + messages_per_batch: int | None = None, + ) -> None: + # TODO check that there are no queued backfills + # if not await Backfill.get(source.mxid, self.tgid, self.tg_receiver): + await Backfill.new( + user_mxid=source.mxid, + priority=priority, + portal_tgid=self.tgid, + portal_tg_receiver=self.tg_receiver, + messages_per_batch=( + messages_per_batch or self.config["bridge.backfill.incremental.messages_per_batch"] + ), + post_batch_delay=self.config["bridge.backfill.incremental.post_batch_delay"], + max_batches=max_batches or self._default_max_batches, + ).insert() + source.wakeup_backfill_task.set() + + async def forward_backfill( + self, + source: u.User, + initial: bool, + last_tgid: int | None = None, + override_limit: int | None = None, + ) -> str: + type = "initial" if initial else "sync" + limit = override_limit or self.config[f"bridge.backfill.forward.{type}_limit"] + with self.backfill_lock: + output = await self.backfill( + source, source.client, forward=True, forward_limit=limit, last_tgid=last_tgid + ) + self.log.debug(f"Forward backfill complete, status: {output}") + return output async def backfill( self, source: u.User, - is_initial: bool = False, - limit: int | None = None, - last_id: int | None = None, - ) -> None: + client: MautrixTelegramClient, + req: Backfill | None = None, + forward: bool = False, + forward_limit: int | None = None, + last_tgid: int | None = None, + ) -> str: async with self.backfill_method_lock: - await self._locked_backfill(source, is_initial, limit, last_id) + return await self._locked_backfill( + source, client, req, forward, forward_limit, last_tgid + ) async def _locked_backfill( self, source: u.User, - is_initial: bool = False, - limit: int | None = None, + client: MautrixTelegramClient, + req: Backfill | None = None, + forward: bool = False, + forward_limit: int | None = None, last_tgid: int | None = None, - ) -> None: - limit = limit or ( - self.config["bridge.backfill.initial_limit"] - if is_initial - else self.config["bridge.backfill.missed_limit"] - ) - if limit == 0: - return + ) -> str: + assert forward != bool(req) if not self.config["bridge.backfill.normal_groups"] and self.peer_type == "chat": - return - last_in_room = await DBMessage.find_last( - self.mxid, (source.tgid if self.peer_type != "channel" else self.tgid) - ) - min_id = last_in_room.tgid if last_in_room else 0 - if last_tgid is None: - messages = await source.client.get_messages(self.peer, limit=1) - if not messages: - # The chat seems empty - return - last_tgid = messages[0].id - if last_tgid <= min_id or (last_tgid == 1 and self.peer_type == "channel"): - # Nothing to backfill - return - if limit < 0: - limit = last_tgid - min_id - limit_type = "unlimited" - elif self.peer_type == "channel": - min_id = max(last_tgid - limit, min_id) - # This is now just an approximate message count, not the actual limit. - limit = last_tgid - min_id - limit_type = "channel" - else: - # This limit will be higher than the actual message count if there are any messages - # in other DMs or normal groups, but that's not too bad. - limit = min(last_tgid - min_id, limit) - limit_type = "dm/minigroup" - self.log.debug( - f"Backfilling up to {limit} messages after ID {min_id} through {source.mxid} " - f"(last message: {last_tgid}, limit type: {limit_type})" - ) - with self.backfill_lock: - await self._backfill(source, min_id, limit) - - async def _backfill(self, source: u.User, min_id: int, limit: int) -> None: - self.backfill_leave = set() - if ( - self.peer_type == "user" - and self.tgid != source.tgid - and self.config["bridge.backfill.invite_own_puppet"] - ): - self.log.debug("Adding %s's default puppet to room for backfilling", source.mxid) - sender = await p.Puppet.get_by_tgid(source.tgid) - await self.main_intent.invite_user(self.mxid, sender.default_mxid) - await sender.default_mxid_intent.join_room_by_id(self.mxid) - self.backfill_leave.add(sender.default_mxid_intent) - - client = source.client - async with NotificationDisabler(self.mxid, source): - if limit > self.config["bridge.backfill.takeout_limit"]: - self.log.debug(f"Opening takeout client for {source.tgid}") - async with client.takeout(**self._takeout_options) as takeout: - count, handled = await self._backfill_messages(source, min_id, limit, takeout) + return "Backfilling normal groups is disabled in the bridge config" + tg_space = source.tgid if self.peer_type != "channel" else self.tgid + prev_event_id = self.first_event_id + if forward: + last_in_room = await DBMessage.find_last(self.mxid, tg_space) + if last_in_room: + prev_event_id = last_in_room.mxid + min_id = last_in_room.tgid else: - count, handled = await self._backfill_messages(source, min_id, limit, client) - - for intent in self.backfill_leave: - self.log.trace("Leaving room with %s post-backfill", intent.mxid) - await intent.leave_room(self.mxid) - self.backfill_leave = None - self.log.info( - "Backfilled %d (of %d fetched) messages through %s", handled, count, source.mxid - ) - - async def _backfill_messages( - self, source: u.User, min_id: int, limit: int, client: MautrixTelegramClient - ) -> tuple[int, int]: - count = handled_count = 0 - entity = await self.get_input_entity(source) - if self.peer_type == "channel": - # This is a channel or supergroup, so we'll backfill messages based on the ID. - # There are some cases, such as deleted messages, where this may backfill less - # messages than the limit. - self.log.debug(f"Iterating all messages starting with {min_id} (approx: {limit})") - messages = client.iter_messages(entity, reverse=True, min_id=min_id) - async for message in messages: - count += 1 - was_handled = await self._handle_telegram_backfill_message(source, message) - handled_count += 1 if was_handled else 0 - else: - # Private chats and normal groups don't have their own message ID namespace, - # which means we'll have to fetch messages a different way. + min_id = 0 + if last_tgid is None: + messages = await source.client.get_messages(self.peer, limit=1) + if not messages: + return "Chat is empty, nothing to backfill" + last_tgid = messages[0].id + if last_tgid <= min_id or (last_tgid == 1 and self.peer_type == "channel"): + return ( + f"Last bridged message {min_id} is equal to or greater than last message " + f"in Telegram chat {last_tgid}, nothing to backfill" + ) + limit = last_tgid - min_id + if (forward_limit or 0) > 0: + limit = min(limit, forward_limit) self.log.debug( - f"Fetching up to {limit} most recent messages, ignoring anything before {min_id}" + f"Backfilling up to {limit} messages after ID {min_id} through {source.mxid} " + f"(last message: {last_tgid})" ) - messages = await client.get_messages(entity, min_id=min_id, limit=limit) - for message in reversed(messages): - count += 1 - if message.id <= min_id: - self.log.trace( - f"Skipping {message.id} in backfill response as it's lower than " - f"the last bridged message ({min_id})" - ) - continue - was_handled = await self._handle_telegram_backfill_message(source, message) - handled_count += 1 if was_handled else 0 - return count, handled_count + anchor_id = min_id + else: + limit = req.messages_per_batch + first_in_room = await DBMessage.find_first(self.mxid, tg_space) + anchor_id = first_in_room.tgid if first_in_room else None + self.log.debug( + f"Backfilling up to {req.messages_per_batch} historical messages " + f"before {anchor_id} through {source.mxid}" + ) + insertion_id, message_count, first_id = await self._backfill_messages( + source, client, forward, anchor_id, limit, prev_event_id + ) + if prev_event_id == self.first_event_id: + if insertion_id and not self.base_insertion_id: + self.base_insertion_id = insertion_id + elif not insertion_id: + insertion_id = self.base_insertion_id + await self.save() + # TODO this should probably check actual event count instead of message count + if message_count > 0 and self.backfill_msc2716: + await self.main_intent.send_state_event( + self.mxid, + StateMarker, + { + "org.matrix.msc2716.marker.insertion": insertion_id, + "com.beeper.timestamp": int(time.time() * 1000), + }, + state_key=insertion_id, + ) + if forward: + self.log.debug(f"Forward backfill finished with {message_count} messages") + elif message_count > 0 and first_id and first_id > 1: + if req.max_batches in (0, 1): + return "Already backfilled enough batches, not enqueuing more" + self.log.debug(f"Enqueuing more backfill through {source.mxid}") + await self.enqueue_backfill( + source, + priority=100, + messages_per_batch=req.messages_per_batch, + max_batches=-1 if req.max_batches < 0 else (req.max_batches - 1), + ) + else: + self.log.debug("No more messages to backfill") + return f"Backfilled {message_count} messages" - async def _handle_telegram_backfill_message( - self, source: au.AbstractUser, msg: Message | MessageService - ) -> bool: + def _can_double_puppet_backfill(self, custom_mxid: UserID) -> bool: + if not self.backfill_msc2716: + return True + if not self.config["bridge.backfill.double_puppet_backfill"]: + return False + if self.bridge.homeserver_software.is_hungry: + return True + + # Batch sending can only use local users, so don't allow double puppets on other servers. + if custom_mxid[custom_mxid.index(":") + 1 :] != self.config["homeserver.domain"]: + return False + return True + + async def _get_members_at(self, event_id: EventID) -> set[UserID]: + try: + return self._member_list_cache[event_id] + except KeyError: + pass + # TODO cache the list in db? + self.log.debug(f"Fetching member list at {event_id}") + ctx = await self.main_intent.get_event_context(self.mxid, event_id, limit=0) + members = { + evt.state_key + for evt in ctx.state + if evt.type == EventType.ROOM_MEMBER and evt.content.membership == Membership.JOIN + } + self.log.debug(f"Found {len(members)} members at {event_id}") + self._member_list_cache[event_id] = members + return members + + async def _convert_batch_msg( + self, + source: u.User, + msg: Message, + add_member: Callable[[IntentAPI, str, ContentURI], Awaitable[None]], + ) -> tuple[putil.ConvertedMessage, IntentAPI]: if msg.from_id and isinstance(msg.from_id, (PeerUser, PeerChannel)): sender = await p.Puppet.get_by_peer(msg.from_id) elif isinstance(msg.peer_id, PeerUser): @@ -2740,20 +2795,171 @@ class Portal(DBPortal, BasePortal): sender = await p.Puppet.get_by_peer(msg.peer_id) else: sender = None - if isinstance(msg, MessageService): - if isinstance(msg.action, MessageActionContactSignUp): - await self.handle_telegram_joined(source, sender, msg, backfill=True) - return True - else: - self.log.debug( - f"Unhandled service message {type(msg.action).__name__} in backfill" - ) - elif isinstance(msg, Message): - await self.handle_telegram_message(source, sender, msg) - return True + if sender: + intent = sender.intent_for(self) + if not sender.displayname: + entity = await source.client.get_entity(sender.peer) + await sender.update_info(source, entity) else: - self.log.debug(f"Unhandled message type {type(msg).__name__} in backfill") - return False + intent = self.main_intent + if intent.api.is_real_user and not self._can_double_puppet_backfill(intent.mxid): + intent = sender.default_mxid_intent + if sender: + await add_member(intent, sender.displayname, sender.avatar_url) + is_bot = sender.is_bot if sender else False + converted = await self._msg_conv.convert(source, intent, is_bot, msg) + return converted, intent + + async def _wrap_batch_msg( + self, + intent: IntentAPI, + msg: Message, + converted: putil.ConvertedMessage, + caption: bool = False, + ) -> BatchSendEvent: + if caption: + content = converted.caption + event_type = EventType.ROOM_MESSAGE + else: + content = converted.content + event_type = converted.type + if self.encrypted and self.matrix.e2ee: + event_type, content = await self.matrix.e2ee.encrypt(self.mxid, event_type, content) + return BatchSendEvent( + sender=intent.mxid, + timestamp=int(msg.date.timestamp() * 1000), + content=content, + type=event_type, + ) + + async def _backfill_messages( + self, + source: u.User, + client: MautrixTelegramClient, + forward: bool, + anchor_id: int, + limit: int, + prev_event_id: EventID, + ) -> tuple[EventID | None, int, int]: + entity = await self.get_input_entity(source) + events = [] + intents = [] + metas = [] + state_events = [] + do_batch_send = self.backfill_msc2716 + added_members = ( + await self._get_members_at(prev_event_id) + if not self.bridge.homeserver_software.is_hungry and do_batch_send + else [] + ) + before_first_msg_timestamp = 0 + + async def add_member(intent: IntentAPI, displayname: str, avatar_url: ContentURI) -> None: + if self.bridge.homeserver_software.is_hungry or intent.mxid in added_members: + return + added_members.add(intent.mxid) + if not do_batch_send: + # TODO leave these members? + await intent.ensure_joined(self.mxid) + return + invite_event = BatchSendStateEvent( + type=EventType.ROOM_MEMBER, + state_key=intent.mxid, + sender=self.main_intent.mxid, + timestamp=before_first_msg_timestamp, + content=MemberStateEventContent( + membership=Membership.INVITE, + displayname=displayname, + avatar_url=avatar_url, + ), + ) + join_event = attr.evolve( + invite_event, + content=attr.evolve(invite_event.content, membership=Membership.JOIN), + sender=intent.mxid, + ) + state_events.append(invite_event) + state_events.append(join_event) + + first_id = anchor_id + message_count = 0 + minmax = {"min_id": anchor_id} if forward else {"max_id": anchor_id} + if not forward and not anchor_id: + anchor_id = 2**31 - 1 + minmax = {} + self.log.debug(f"Iterating messages through {source.tgid} with {limit=}, {minmax}") + # Iterate messages newest to oldest and collect the results + async for msg in client.iter_messages(entity, limit=limit, **minmax): + message_count += 1 + if (forward and msg.id <= anchor_id) or (not forward and msg.id >= anchor_id): + continue + elif isinstance(msg, MessageService): + # TODO some service messages can be backfilled + continue + if not before_first_msg_timestamp: + first_id = msg.id + before_first_msg_timestamp = int(msg.date.timestamp() * 1000) - 1 + + converted, intent = await self._convert_batch_msg(source, msg, add_member) + events.append(await self._wrap_batch_msg(intent, msg, converted)) + intents.append(intent) + metas.append(msg) + if converted.caption: + events.append(await self._wrap_batch_msg(intent, msg, converted, caption=True)) + intents.append(intent) + metas.append(None) + if len(events) == 0: + self.log.debug( + f"Didn't get any events to send out of {message_count} messages fetched " + f"(first received ID: {first_id})" + ) + return None, message_count, first_id + self.log.debug( + f"Got {len(events)} events to send out of {message_count} messages fetched " + f"(first received ID: {first_id})" + ) + if do_batch_send: + resp = await self.main_intent.batch_send( + self.mxid, + prev_event_id, + batch_id=self.next_batch_id if not forward else None, + # We iterated the events in reverse chronological order, + # so reverse them before sending + events=list(reversed(events)), + state_events_at_start=state_events, + beeper_new_messages=forward, + ) + if prev_event_id == self.first_event_id and resp.next_batch_id: + self.next_batch_id = resp.next_batch_id + base_insertion_event_id = resp.base_insertion_event_id + event_ids = resp.event_ids + else: + base_insertion_event_id = None + event_ids = [ + await intent.send_message_event( + self.mxid, evt.type, evt.content, timestamp=evt.timestamp + ) + for evt, intent in zip(reversed(events), reversed(intents)) + ] + tg_space = source.tgid if self.peer_type != "channel" else self.tgid + await DBMessage.bulk_insert( + [ + DBMessage( + mxid=event_id, + mx_room=self.mxid, + tgid=msg.id, + tg_space=tg_space, + edit_index=0, + content_hash=self.dedup.hash_event(msg), + # TODO sender + ) + # Original arrays are in reverse chronological order, but event IDs are + # chronological (because we reversed the original messages list before sending) + for event_id, msg in zip(event_ids, reversed(metas)) + if msg is not None + ] + ) + return base_insertion_event_id, message_count, first_id def _split_dm_reaction_counts(self, counts: list[ReactionCount]) -> list[MessagePeerReaction]: reactions = [] diff --git a/mautrix_telegram/puppet.py b/mautrix_telegram/puppet.py index 5a34d1cf..47883fbb 100644 --- a/mautrix_telegram/puppet.py +++ b/mautrix_telegram/puppet.py @@ -399,7 +399,7 @@ class Puppet(DBPuppet, BasePuppet): async def default_puppet_should_leave_room(self, room_id: RoomID) -> bool: portal: p.Portal = await p.Portal.get_by_mxid(room_id) - return portal and not portal.backfill_lock.locked and portal.peer_type != "user" + return portal and portal.peer_type != "user" # endregion # region Getters diff --git a/mautrix_telegram/user.py b/mautrix_telegram/user.py index c03d8cfd..34537383 100644 --- a/mautrix_telegram/user.py +++ b/mautrix_telegram/user.py @@ -20,7 +20,12 @@ from datetime import datetime, timezone import asyncio import time -from telethon.errors import AuthKeyDuplicatedError, RPCError, UnauthorizedError +from telethon.errors import ( + AuthKeyDuplicatedError, + RPCError, + TakeoutInitDelayError, + UnauthorizedError, +) from telethon.tl.custom import Dialog from telethon.tl.functions.account import UpdateStatusRequest from telethon.tl.functions.contacts import GetContactsRequest, SearchRequest @@ -57,7 +62,8 @@ from mautrix.util.opt_prometheus import Gauge from . import portal as po, puppet as pu, util from .abstract_user import AbstractUser -from .db import Message as DBMessage, PgSession, User as DBUser +from .db import Backfill, Message as DBMessage, PgSession, User as DBUser +from .tgclient import MautrixTelegramClient from .types import TelegramID if TYPE_CHECKING: @@ -86,7 +92,11 @@ class User(DBUser, AbstractUser, BaseUser): _ensure_started_lock: asyncio.Lock _track_connection_task: asyncio.Task | None + _backfill_task: asyncio.Task | None + wakeup_backfill_task: asyncio.Event _is_backfilling: bool + takeout_retry_immediate: asyncio.Event + takeout_requested: bool _available_emoji_reactions: set[str] | None _available_emoji_reactions_hash: int | None @@ -119,6 +129,12 @@ class User(DBUser, AbstractUser, BaseUser): self._track_connection_task = None self._is_backfilling = False self._portals_cache = None + + self._backfill_task = None + self.wakeup_backfill_task = asyncio.Event() + self.takeout_retry_immediate = asyncio.Event() + self.takeout_requested = False + self._available_emoji_reactions = None self._available_emoji_reactions_hash = None self._available_emoji_reactions_fetched = 0 @@ -248,6 +264,14 @@ class User(DBUser, AbstractUser, BaseUser): self.client and self.client._sender and self.client._sender._transport_connected() ) + @property + def _bridge_state_info(self) -> dict[str, Any]: + if self.takeout_requested: + return { + "takeout_requested": True, + } + return {} + async def _track_connection(self) -> None: self.log.debug("Starting loop to track connection state") while True: @@ -260,6 +284,7 @@ class User(DBUser, AbstractUser, BaseUser): if self._is_backfilling else BridgeStateEvent.CONNECTED, ttl=3600, + info=self._bridge_state_info, ) else: await self.push_bridge_state( @@ -284,7 +309,7 @@ class User(DBUser, AbstractUser, BaseUser): else: state_event = BridgeStateEvent.UNKNOWN_ERROR ttl = 240 - return [BridgeState(state_event=state_event, ttl=ttl)] + return [BridgeState(state_event=state_event, ttl=ttl, info=self._bridge_state_info)] async def get_puppet(self) -> pu.Puppet | None: if not self.tgid: @@ -302,6 +327,9 @@ class User(DBUser, AbstractUser, BaseUser): if self._track_connection_task: self._track_connection_task.cancel() self._track_connection_task = None + if self._backfill_task: + self._backfill_task.cancel() + self._backfill_task = None await super().stop() self._track_metric(METRIC_CONNECTED, False) @@ -318,6 +346,8 @@ class User(DBUser, AbstractUser, BaseUser): return self._track_metric(METRIC_LOGGED_IN, True) + if not self._backfill_task or self._backfill_task.done(): + self._backfill_task = asyncio.create_task(self._handle_backfill_requests_loop()) try: puppet = await pu.Puppet.get_by_tgid(self.tgid) @@ -327,7 +357,7 @@ class User(DBUser, AbstractUser, BaseUser): except Exception: self.log.exception("Failed to automatically enable custom puppet") - if not self.is_bot and self.config["bridge.startup_sync"]: + if not self.is_bot and (self.config["bridge.startup_sync"] or first_login): try: self._is_backfilling = True await self.sync_dialogs() @@ -337,6 +367,80 @@ class User(DBUser, AbstractUser, BaseUser): finally: self._is_backfilling = False + @property + def _takeout_options(self) -> dict[str, bool | int]: + return { + "users": True, + "chats": self.config["bridge.backfill.normal_groups"], + "megagroups": True, + "channels": True, + "files": True, + "max_file_size": min(self.bridge.matrix.media_config.upload_size, 2000 * 1024 * 1024), + } + + async def _handle_backfill_requests_loop(self) -> None: + while True: + req = await Backfill.get_next(self.mxid) + if not req: + await self.wakeup_backfill_task.wait() + self.wakeup_backfill_task.clear() + else: + await self._takeout_and_backfill(req) + + async def _takeout_and_backfill(self, first_req: Backfill, first_attempt: bool = True) -> None: + self.takeout_retry_immediate.clear() + self.takeout_requested = True + try: + async with self.client.takeout(**self._takeout_options) as takeout_client: + self.takeout_requested = False + self.log.info("Acquired takeout client successfully") + await self._backfill_loop_with_client(takeout_client, first_req) + self.log.info("Backfills finished, exiting takeout") + except TakeoutInitDelayError as e: + if first_attempt: + self.log.info( + f"Takeout requested, will wait for retry request or {e.seconds} seconds" + ) + else: + self.log.warning( + f"Got takeout init delay again after retry, waiting for {e.seconds} seconds" + ) + try: + await asyncio.wait_for(self.takeout_retry_immediate.wait(), timeout=e.seconds) + self.log.info("Retrying takeout") + except asyncio.TimeoutError: + self.log.info("Takeout timeout expired") + await self._takeout_and_backfill(first_req, first_attempt=False) + + async def _backfill_loop_with_client( + self, client: MautrixTelegramClient, first_req: Backfill + ) -> None: + missed_reqs = 0 + while missed_reqs < 20: + req = first_req or await Backfill.get_next(self.mxid) + first_req = None + if not req: + missed_reqs += 1 + try: + await asyncio.wait_for(self.wakeup_backfill_task.wait(), timeout=30) + except asyncio.TimeoutError: + pass + self.wakeup_backfill_task.clear() + continue + missed_reqs = 0 + self.log.info("Backfill request %s", req) + try: + portal = await po.Portal.get_by_tgid( + TelegramID(req.portal_tgid), tg_receiver=TelegramID(req.portal_tg_receiver) + ) + await req.mark_dispatched() + await portal.backfill(self, client, req=req) + await req.mark_done() + await asyncio.sleep(req.post_batch_delay) + except Exception: + self.log.exception("Error handling backfill request for %s", req.portal_tgid) + await req.set_cooldown_timeout(10) + async def update(self, update: TypeUpdate) -> bool: if not self.is_bot: return False @@ -573,7 +677,7 @@ class User(DBUser, AbstractUser, BaseUser): was_created = False if portal.mxid: try: - await portal.backfill(self, last_id=dialog.message.id) + await portal.forward_backfill(self, initial=False, last_tgid=dialog.message.id) except Exception: self.log.exception(f"Error while backfilling {portal.tgid_log}") try: diff --git a/mautrix_telegram/web/provisioning/__init__.py b/mautrix_telegram/web/provisioning/__init__.py index 3217529b..006c8911 100644 --- a/mautrix_telegram/web/provisioning/__init__.py +++ b/mautrix_telegram/web/provisioning/__init__.py @@ -71,6 +71,8 @@ class ProvisioningAPI(AuthAPI): ) self.app.router.add_route("POST", f"{user_prefix}/pm/{{identifier}}", self.start_dm) + self.app.router.add_route("POST", f"{user_prefix}/retry_takeout", self.retry_takeout) + self.app.router.add_route("POST", f"{user_prefix}/logout", self.logout) self.app.router.add_route("POST", f"{user_prefix}/login/bot_token", self.send_bot_token) self.app.router.add_route("POST", f"{user_prefix}/login/request_code", self.request_code) @@ -494,6 +496,22 @@ class ProvisioningAPI(AuthAPI): status=201 if just_created else 200, ) + async def retry_takeout(self, request: web.Request) -> web.Response: + data, user, err = await self.get_user_request_info( + request, expect_logged_in=True, want_data=False + ) + if err is not None: + return err + if not user.takeout_requested: + return web.json_response( + { + "error": "There was no takeout requested", + }, + status=400, + ) + user.takeout_retry_immediate.set() + return web.json_response({}, status=200) + async def send_bot_token(self, request: web.Request) -> web.Response: data, user, err = await self.get_user_request_info(request) if err is not None: diff --git a/requirements.txt b/requirements.txt index 86f3281e..1971bf3b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ python-magic>=0.4,<0.5 commonmark>=0.8,<0.10 aiohttp>=3,<4 yarl>=1,<2 -mautrix>=0.18.2,<0.19 +mautrix>=0.18.3,<0.19 #telethon>=1.24,<1.25 tulir-telethon==1.26.0a6 asyncpg>=0.20,<0.27