From 2c0a2e694b98fa551ad5d1eb4f655532a864db11 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Tue, 28 Jul 2020 17:28:07 +0300 Subject: [PATCH] Add option for automatic backfilling when creating portal --- ROADMAP.md | 4 +- mautrix_telegram/abstract_user.py | 4 +- mautrix_telegram/commands/telegram/misc.py | 10 ++- mautrix_telegram/config.py | 7 +- mautrix_telegram/example-config.yaml | 28 +++++- mautrix_telegram/portal/base.py | 9 +- mautrix_telegram/portal/metadata.py | 65 ++++++++------ mautrix_telegram/portal/telegram.py | 99 ++++++++++++++++------ mautrix_telegram/puppet.py | 2 +- mautrix_telegram/user.py | 10 +-- 10 files changed, 170 insertions(+), 68 deletions(-) diff --git a/ROADMAP.md b/ROADMAP.md index 56308642..6654c677 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -28,9 +28,9 @@ * [ ] Buttons * [x] Message deletions * [x] Message edits - * [ ] Message history + * [x] Message history * [x] Manually (`!tg backfill`) - * [ ] Automatically when creating portal + * [x] Automatically when creating portal * [ ] Automatically for missed messages * [x] Avatars * [x] Presence diff --git a/mautrix_telegram/abstract_user.py b/mautrix_telegram/abstract_user.py index 7f6b2677..77744e8e 100644 --- a/mautrix_telegram/abstract_user.py +++ b/mautrix_telegram/abstract_user.py @@ -1,5 +1,5 @@ # mautrix-telegram - A Matrix-Telegram puppeting bridge -# Copyright (C) 2019 Tulir Asokan +# Copyright (C) 2020 Tulir Asokan # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -426,6 +426,8 @@ class AbstractUser(ABC): self.log.debug(f"Ignoring relaybot-sent message %s to %s", update.id, portal.tgid_log) return + await portal.backfill_lock.wait(update.id) + if isinstance(update, MessageService): if isinstance(update.action, MessageActionChannelMigrateFrom): self.log.trace(f"Ignoring action %s to %s by %d", update.action, portal.tgid_log, diff --git a/mautrix_telegram/commands/telegram/misc.py b/mautrix_telegram/commands/telegram/misc.py index c044725a..0c1b0bd1 100644 --- a/mautrix_telegram/commands/telegram/misc.py +++ b/mautrix_telegram/commands/telegram/misc.py @@ -1,5 +1,5 @@ # mautrix-telegram - A Matrix-Telegram puppeting bridge -# Copyright (C) 2019 Tulir Asokan +# Copyright (C) 2020 Tulir Asokan # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -331,15 +331,19 @@ async def random(evt: CommandEvent) -> EventID: return await evt.reply("Invalid emoji for randomization") -@command_handler(help_section=SECTION_PORTAL_MANAGEMENT, +@command_handler(help_section=SECTION_PORTAL_MANAGEMENT, help_args="[_limit_]", help_text="Backfill messages from Telegram history.") async def backfill(evt: CommandEvent) -> None: if not evt.is_portal: await evt.reply("You can only use backfill in portal rooms") return + try: + limit = int(evt.args[0]) + except (ValueError, IndexError): + limit = -1 portal = po.Portal.get_by_mxid(evt.room_id) try: - await portal.backfill(evt.sender) + await portal.backfill(evt.sender, limit=limit) except TakeoutInitDelayError: msg = ("Please accept the data export request from a mobile device, " "then re-run the backfill command.") diff --git a/mautrix_telegram/config.py b/mautrix_telegram/config.py index 4fa36582..c0d1f292 100644 --- a/mautrix_telegram/config.py +++ b/mautrix_telegram/config.py @@ -1,5 +1,5 @@ # mautrix-telegram - A Matrix-Telegram puppeting bridge -# Copyright (C) 2019 Tulir Asokan +# Copyright (C) 2020 Tulir Asokan # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -113,6 +113,11 @@ class Config(BaseBridgeConfig): copy("bridge.delivery_receipts") copy("bridge.delivery_error_reports") copy("bridge.resend_bridge_info") + copy("bridge.backfill.invite_own_puppet") + copy("bridge.backfill.takeout_limit") + copy("bridge.backfill.initial_limit") + copy("bridge.backfill.missed_limit") + copy("bridge.backfill.disable_notifications") copy("bridge.initial_power_level_overrides.group") copy("bridge.initial_power_level_overrides.user") diff --git a/mautrix_telegram/example-config.yaml b/mautrix_telegram/example-config.yaml index 52a4ce0e..22ffdbb6 100644 --- a/mautrix_telegram/example-config.yaml +++ b/mautrix_telegram/example-config.yaml @@ -129,8 +129,8 @@ bridge: # Maximum number of members to sync per portal when starting up. Other members will be # synced when they send messages. The maximum is 10000, after which the Telegram server # will not send any more members. - # Defaults to no local limit (-> limited to 10000 by server) - max_initial_member_sync: -1 + # -1 means no limit (which means it's limited to 10000 by the server) + max_initial_member_sync: 100 # Whether or not to sync the member list in channels. # If no channel admins have logged into the bridge, the bridge won't be able to sync the member # list regardless of this setting. @@ -232,6 +232,30 @@ bridge: # This field will automatically be changed back to false after it, # except if the config file is not writable. resend_bridge_info: false + # Settings for backfilling messages from Telegram. + backfill: + # Whether or not the Telegram ghosts of logged in Matrix users should be + # invited to private chats when backfilling history from Telegram. This is + # usually needed to prevent rate limits and to allow timestamp massaging. + invite_own_puppet: true + # Maximum number of messages to backfill without using a takeout. + # The first time a takeout is used, the user has to manually approve it from a different + # device. If initial_limit or missed_limit are higher than this value, the bridge will ask + # the user to accept the takeout after logging in before syncing any chats. + takeout_limit: 100 + # Maximum number of messages to backfill initially. + # Set to 0 to disable backfilling when creating portal, or -1 to disable the limit. + # + # N.B. Initial backfill will only start after member sync. Make sure your + # max_initial_member_sync is set to a low enough value so it doesn't take forever. + initial_limit: 0 + # Maximum number of messages to backfill if messages were missed while + # the bridge was disconnected. + # Set to 0 to disable backfilling missed messages. + missed_limit: 100 + # If using double puppeting, should notifications be disabled + # while the initial backfill is in progress? + disable_notifications: false # Overrides for base power levels. initial_power_level_overrides: diff --git a/mautrix_telegram/portal/base.py b/mautrix_telegram/portal/base.py index 67170d3b..9e5bdaef 100644 --- a/mautrix_telegram/portal/base.py +++ b/mautrix_telegram/portal/base.py @@ -33,6 +33,7 @@ from mautrix.appservice import AppService, IntentAPI from mautrix.types import (RoomID, RoomAlias, UserID, EventID, EventType, MessageEventContent, PowerLevelStateEventContent, ContentURI) from mautrix.util.simple_template import SimpleTemplate +from mautrix.util.simple_lock import SimpleLock from mautrix.util.logging import TraceLogger from ..types import TelegramID @@ -93,7 +94,7 @@ class BasePortal(ABC): avatar_url: Optional[ContentURI] encrypted: bool deleted: bool - backfilling: bool + backfill_lock: SimpleLock backfill_leave: Optional[Set[IntentAPI]] log: TraceLogger @@ -127,7 +128,8 @@ class BasePortal(ABC): self._main_intent = None self.deleted = False self.log = self.base_log.getChild(self.tgid_log if self.tgid else self.mxid) - self.backfilling = False + self.backfill_lock = SimpleLock("Waiting for backfilling to finish before handling %s", + log=self.log, loop=self.loop) self.backfill_leave = None self.dedup = PortalDedup(self) @@ -531,7 +533,8 @@ class BasePortal(ABC): pass @abstractmethod - def backfill(self, source: 'AbstractUser') -> Awaitable[None]: + def backfill(self, source: 'AbstractUser', is_initial: bool = False, + limit: Optional[int] = None) -> Awaitable[None]: pass @abstractmethod diff --git a/mautrix_telegram/portal/metadata.py b/mautrix_telegram/portal/metadata.py index 26651cdd..4589a418 100644 --- a/mautrix_telegram/portal/metadata.py +++ b/mautrix_telegram/portal/metadata.py @@ -219,12 +219,6 @@ class PortalMetadata(BasePortal, ABC): if changed: self.save() await self.update_bridge_info() - puppet = p.Puppet.get_by_custom_mxid(user.mxid) - if puppet: - try: - await puppet.intent.ensure_joined(self.mxid) - except Exception: - self.log.exception("Failed to ensure %s is joined to portal", user.mxid) if self.sync_matrix_state: await self.main_intent.get_joined_members(self.mxid) @@ -392,28 +386,40 @@ class PortalMetadata(BasePortal, ABC): if not config["bridge.federate_rooms"]: creation_content["m.federate"] = False - room_id = await self.main_intent.create_room(alias_localpart=alias, preset=preset, - is_direct=direct, invitees=invites or [], - name=self.title, topic=self.about, - initial_state=initial_state, - creation_content=creation_content) - if not room_id: - raise Exception(f"Failed to create room") + with self.backfill_lock: + room_id = await self.main_intent.create_room(alias_localpart=alias, preset=preset, + is_direct=direct, invitees=invites or [], + name=self.title, topic=self.about, + initial_state=initial_state, + creation_content=creation_content) + if not room_id: + raise Exception(f"Failed to create room") - if self.encrypted and self.matrix.e2ee and direct: - try: - await self.az.intent.ensure_joined(room_id) - except Exception: - self.log.warning(f"Failed to add bridge bot to new private chat {room_id}") + if self.encrypted and self.matrix.e2ee and direct: + try: + await self.az.intent.ensure_joined(room_id) + except Exception: + self.log.warning(f"Failed to add bridge bot to new private chat {room_id}") - self.mxid = room_id - self.by_mxid[self.mxid] = self - self.save() - await self.az.state_store.set_power_levels(self.mxid, power_levels) - user.register_portal(self) - asyncio.ensure_future(self.update_matrix_room(user, entity, direct, puppet, - levels=power_levels, users=users, - participants=participants), loop=self.loop) + self.mxid = room_id + self.by_mxid[self.mxid] = self + self.save() + await self.az.state_store.set_power_levels(self.mxid, power_levels) + user.register_portal(self) + + update_room = self.loop.create_task(self.update_matrix_room( + user, entity, direct, puppet, + levels=power_levels, users=users, participants=participants)) + + if config["bridge.backfill.initial_limit"] > 0: + self.log.debug("Initial backfill is enabled. Waiting for room members to sync " + "and then starting backfill") + await update_room + + try: + await self.backfill(user, is_initial=True) + except Exception: + self.log.exception("Failed to backfill new portal") return self.mxid @@ -545,6 +551,13 @@ class PortalMetadata(BasePortal, ABC): if user: await self.invite_to_matrix(user.mxid) + puppet = p.Puppet.get_by_custom_mxid(user.mxid) + if puppet: + try: + await puppet.intent.ensure_joined(self.mxid) + except Exception: + self.log.exception("Failed to ensure %s is joined to portal", user.mxid) + # We can't trust the member list if any of the following cases is true: # * There are close to 10 000 users, because Telegram might not be sending all members. # * The member sync count is limited, because then we might ignore some members. diff --git a/mautrix_telegram/portal/telegram.py b/mautrix_telegram/portal/telegram.py index bb27d5ca..ad6ff332 100644 --- a/mautrix_telegram/portal/telegram.py +++ b/mautrix_telegram/portal/telegram.py @@ -38,12 +38,13 @@ from telethon.tl.types import ( from mautrix.appservice import IntentAPI from mautrix.types import (EventID, UserID, ImageInfo, ThumbnailInfo, RelatesTo, MessageType, EventType, MediaMessageEventContent, TextMessageEventContent, - LocationMessageEventContent, Format, MessageEventContent) + LocationMessageEventContent, Format) from ..types import TelegramID from ..db import Message as DBMessage, TelegramFile as DBTelegramFile from ..util import sane_mimetypes from ..context import Context +from ..tgclient import TelegramClient from .. import puppet as p, user as u, formatter, util from .base import BasePortal @@ -407,43 +408,92 @@ class PortalTelegram(BasePortal, ABC): edit_index=prev_edit_msg.edit_index + 1).insert() DBMessage.update_by_mxid(temporary_identifier, self.mxid, mxid=event_id) - async def backfill(self, source: 'AbstractUser') -> None: - self.log.debug("Backfilling history through %s", source.mxid) + @property + def _takeout_options(self) -> Dict[str, Union[bool, int]]: + return { + "files": True, + "megagroups": self.megagroup, + "chats": self.peer_type == "chat", + "users": self.peer_type == "user", + "channels": (self.peer_type == "channel" and not self.megagroup), + "max_file_size": min(config["bridge.max_document_size"], 2000) * 1024 * 1024 + } + + async def backfill(self, source: 'AbstractUser', is_initial: bool = False, + limit: Optional[int] = None) -> None: + limit = limit or (config["bridge.backfill.initial_limit"] if is_initial + else config["bridge.backfill.missed_limit"]) + if limit == 0: + return last = DBMessage.find_last(self.mxid, (source.tgid if self.peer_type != "channel" else self.tgid)) min_id = last.tgid if last else 0 - self.backfilling = True + message = (await source.client.get_messages(self.peer, limit=1))[0] + if limit < 0: + limit = None + self.log.debug(f"Backfilling approximately {message.id - min_id} messages " + f"through {source.mxid}") + elif self.peer_type == "channel": + # This is a channel or supergroup, so we'll backfill messages based on the ID. + # There are some cases, such as deleted messages, where this may backfill less + # messages than the limit. + min_id = max(message.id - limit, min_id) + limit = None + self.log.debug(f"Backfilling messages after ID {min_id} (last message: {message.id}) " + f"through {source.mxid}") + else: + # Private chats and normal groups don't have their own message ID namespace, + # which means we'll have to fetch messages a different way. + # The _backfill_messages method will detect min_id=None and not use reverse=True + min_id = None + self.log.debug(f"Backfilling up to {limit} messages through {source.mxid}") + with self.backfill_lock: + await self._backfill(source, min_id, limit) + + async def _backfill(self, source: 'AbstractUser', min_id: Optional[int], limit: Optional[int] + ) -> None: self.backfill_leave = set() - if self.peer_type == "user": + if ((self.peer_type == "user" and self.tg_receiver != source.tgid + and config["bridge.backfill.invite_own_puppet"])): self.log.debug("Adding %s's default puppet to room for backfilling", source.mxid) sender = p.Puppet.get(source.tgid) await self.main_intent.invite_user(self.mxid, sender.default_mxid) await sender.default_mxid_intent.join_room_by_id(self.mxid) self.backfill_leave.add(sender.default_mxid_intent) - max_file_size = min(config["bridge.max_document_size"], 1500) * 1024 * 1024 self.log.trace("Opening takeout client for %d, message ID %d->", source.tgid, min_id) - count = 0 - async with source.client.takeout(files=True, megagroups=self.megagroup, - chats=self.peer_type == "chat", - users=self.peer_type == "user", - channels=(self.peer_type == "channel" - and not self.megagroup), - max_file_size=max_file_size - ) as takeout_client: - async for message in takeout_client.iter_messages(await self.get_input_entity(source), - reverse=True, min_id=min_id): - sender = p.Puppet.get(message.sender_id) - # if isinstance(message, MessageService): - # await self.handle_telegram_action(source, sender, message) - await self.handle_telegram_message(source, sender, message) - count += 1 + + client = source.client + if limit > config["bridge.backfill.takeout_limit"]: + async with client.takeout(**self._takeout_options) as takeout: + count = await self._backfill_messages(source, min_id, limit, takeout) + else: + count = await self._backfill_messages(source, min_id, limit, client) + for intent in self.backfill_leave: self.log.trace("Leaving room with %s post-backfill", intent.mxid) await intent.leave_room(self.mxid) - self.backfilling = False self.backfill_leave = None self.log.info("Backfilled %d messages through %s", count, source.mxid) + async def _backfill_messages(self, source: 'AbstractUser', min_id: Optional[int], + limit: Optional[int], client: TelegramClient) -> int: + count = 0 + entity = await self.get_input_entity(source) + if min_id is not None: + messages = client.iter_messages(entity, reverse=True, min_id=min_id) + async for message in messages: + sender = p.Puppet.get(message.sender_id) + # TODO handle service messages? + await self.handle_telegram_message(source, sender, message) + count += 1 + else: + messages = await client.get_messages(entity, limit=limit) + for message in reversed(messages): + sender = p.Puppet.get(message.sender_id) + await self.handle_telegram_message(source, sender, message) + count += 1 + return count + async def handle_telegram_message(self, source: 'AbstractUser', sender: p.Puppet, evt: Message) -> None: if not self.mxid: @@ -472,7 +522,7 @@ class PortalTelegram(BasePortal, ABC): tg_space=tg_space, edit_index=0).insert() return - if self.backfilling or (self.dedup.pre_db_check and self.peer_type == "channel"): + if self.backfill_lock.locked or (self.dedup.pre_db_check and self.peer_type == "channel"): msg = DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space) if msg: self.log.debug(f"Ignoring message {evt.id} (src {source.tgid}) as it was already" @@ -496,7 +546,8 @@ class PortalTelegram(BasePortal, ABC): allowed_media) else None if sender: intent = sender.intent_for(self) - if self.backfilling and intent != sender.default_mxid_intent: + if ((self.backfill_lock.locked and intent != sender.default_mxid_intent + and config["bridge.backfill.invite_own_puppet"])): intent = sender.default_mxid_intent self.backfill_leave.add(intent) else: diff --git a/mautrix_telegram/puppet.py b/mautrix_telegram/puppet.py index c0507500..7aa8cdf6 100644 --- a/mautrix_telegram/puppet.py +++ b/mautrix_telegram/puppet.py @@ -331,7 +331,7 @@ class Puppet(CustomPuppetMixin): def default_puppet_should_leave_room(self, room_id: RoomID) -> bool: portal: p.Portal = p.Portal.get_by_mxid(room_id) - return portal and not portal.backfilling and portal.peer_type != "user" + return portal and not portal.backfill_lock.locked and portal.peer_type != "user" # endregion # region Getters diff --git a/mautrix_telegram/user.py b/mautrix_telegram/user.py index 4d9f6ead..b616193e 100644 --- a/mautrix_telegram/user.py +++ b/mautrix_telegram/user.py @@ -1,5 +1,5 @@ # mautrix-telegram - A Matrix-Telegram puppeting bridge -# Copyright (C) 2019 Tulir Asokan +# Copyright (C) 2020 Tulir Asokan # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -13,7 +13,7 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from typing import (Awaitable, Dict, List, Iterable, NewType, Optional, Tuple, Any, cast, +from typing import (Awaitable, Dict, List, Iterable, NamedTuple, Optional, Tuple, Any, cast, TYPE_CHECKING) import logging import asyncio @@ -42,7 +42,7 @@ if TYPE_CHECKING: config: Optional['Config'] = None -SearchResult = NewType('SearchResult', Tuple['pu.Puppet', int]) +SearchResult = NamedTuple('SearchResult', puppet='pu.Puppet', similarity=int) class User(AbstractUser, BaseUser): @@ -306,7 +306,7 @@ class User(AbstractUser, BaseUser): for contact in self.contacts: similarity = contact.similarity(query) if similarity >= min_similarity: - results.append(SearchResult((contact, similarity))) + results.append(SearchResult(contact, similarity)) results.sort(key=lambda tup: tup[1], reverse=True) return results[0:max_results] @@ -318,7 +318,7 @@ class User(AbstractUser, BaseUser): for user in server_results.users: puppet = pu.Puppet.get(user.id) await puppet.update_info(self, user) - results.append(SearchResult((puppet, puppet.similarity(query)))) + results.append(SearchResult(puppet, puppet.similarity(query))) results.sort(key=lambda tup: tup[1], reverse=True) return results[0:max_results]