From 2c0a2e694b98fa551ad5d1eb4f655532a864db11 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Tue, 28 Jul 2020 17:28:07 +0300 Subject: [PATCH 1/4] Add option for automatic backfilling when creating portal --- ROADMAP.md | 4 +- mautrix_telegram/abstract_user.py | 4 +- mautrix_telegram/commands/telegram/misc.py | 10 ++- mautrix_telegram/config.py | 7 +- mautrix_telegram/example-config.yaml | 28 +++++- mautrix_telegram/portal/base.py | 9 +- mautrix_telegram/portal/metadata.py | 65 ++++++++------ mautrix_telegram/portal/telegram.py | 99 ++++++++++++++++------ mautrix_telegram/puppet.py | 2 +- mautrix_telegram/user.py | 10 +-- 10 files changed, 170 insertions(+), 68 deletions(-) diff --git a/ROADMAP.md b/ROADMAP.md index 56308642..6654c677 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -28,9 +28,9 @@ * [ ] Buttons * [x] Message deletions * [x] Message edits - * [ ] Message history + * [x] Message history * [x] Manually (`!tg backfill`) - * [ ] Automatically when creating portal + * [x] Automatically when creating portal * [ ] Automatically for missed messages * [x] Avatars * [x] Presence diff --git a/mautrix_telegram/abstract_user.py b/mautrix_telegram/abstract_user.py index 7f6b2677..77744e8e 100644 --- a/mautrix_telegram/abstract_user.py +++ b/mautrix_telegram/abstract_user.py @@ -1,5 +1,5 @@ # mautrix-telegram - A Matrix-Telegram puppeting bridge -# Copyright (C) 2019 Tulir Asokan +# Copyright (C) 2020 Tulir Asokan # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -426,6 +426,8 @@ class AbstractUser(ABC): self.log.debug(f"Ignoring relaybot-sent message %s to %s", update.id, portal.tgid_log) return + await portal.backfill_lock.wait(update.id) + if isinstance(update, MessageService): if isinstance(update.action, MessageActionChannelMigrateFrom): self.log.trace(f"Ignoring action %s to %s by %d", update.action, portal.tgid_log, diff --git a/mautrix_telegram/commands/telegram/misc.py b/mautrix_telegram/commands/telegram/misc.py index c044725a..0c1b0bd1 100644 --- a/mautrix_telegram/commands/telegram/misc.py +++ b/mautrix_telegram/commands/telegram/misc.py @@ -1,5 +1,5 @@ # mautrix-telegram - A Matrix-Telegram puppeting bridge -# Copyright (C) 2019 Tulir Asokan +# Copyright (C) 2020 Tulir Asokan # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -331,15 +331,19 @@ async def random(evt: CommandEvent) -> EventID: return await evt.reply("Invalid emoji for randomization") -@command_handler(help_section=SECTION_PORTAL_MANAGEMENT, +@command_handler(help_section=SECTION_PORTAL_MANAGEMENT, help_args="[_limit_]", help_text="Backfill messages from Telegram history.") async def backfill(evt: CommandEvent) -> None: if not evt.is_portal: await evt.reply("You can only use backfill in portal rooms") return + try: + limit = int(evt.args[0]) + except (ValueError, IndexError): + limit = -1 portal = po.Portal.get_by_mxid(evt.room_id) try: - await portal.backfill(evt.sender) + await portal.backfill(evt.sender, limit=limit) except TakeoutInitDelayError: msg = ("Please accept the data export request from a mobile device, " "then re-run the backfill command.") diff --git a/mautrix_telegram/config.py b/mautrix_telegram/config.py index 4fa36582..c0d1f292 100644 --- a/mautrix_telegram/config.py +++ b/mautrix_telegram/config.py @@ -1,5 +1,5 @@ # mautrix-telegram - A Matrix-Telegram puppeting bridge -# Copyright (C) 2019 Tulir Asokan +# Copyright (C) 2020 Tulir Asokan # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -113,6 +113,11 @@ class Config(BaseBridgeConfig): copy("bridge.delivery_receipts") copy("bridge.delivery_error_reports") copy("bridge.resend_bridge_info") + copy("bridge.backfill.invite_own_puppet") + copy("bridge.backfill.takeout_limit") + copy("bridge.backfill.initial_limit") + copy("bridge.backfill.missed_limit") + copy("bridge.backfill.disable_notifications") copy("bridge.initial_power_level_overrides.group") copy("bridge.initial_power_level_overrides.user") diff --git a/mautrix_telegram/example-config.yaml b/mautrix_telegram/example-config.yaml index 52a4ce0e..22ffdbb6 100644 --- a/mautrix_telegram/example-config.yaml +++ b/mautrix_telegram/example-config.yaml @@ -129,8 +129,8 @@ bridge: # Maximum number of members to sync per portal when starting up. Other members will be # synced when they send messages. The maximum is 10000, after which the Telegram server # will not send any more members. - # Defaults to no local limit (-> limited to 10000 by server) - max_initial_member_sync: -1 + # -1 means no limit (which means it's limited to 10000 by the server) + max_initial_member_sync: 100 # Whether or not to sync the member list in channels. # If no channel admins have logged into the bridge, the bridge won't be able to sync the member # list regardless of this setting. @@ -232,6 +232,30 @@ bridge: # This field will automatically be changed back to false after it, # except if the config file is not writable. resend_bridge_info: false + # Settings for backfilling messages from Telegram. + backfill: + # Whether or not the Telegram ghosts of logged in Matrix users should be + # invited to private chats when backfilling history from Telegram. This is + # usually needed to prevent rate limits and to allow timestamp massaging. + invite_own_puppet: true + # Maximum number of messages to backfill without using a takeout. + # The first time a takeout is used, the user has to manually approve it from a different + # device. If initial_limit or missed_limit are higher than this value, the bridge will ask + # the user to accept the takeout after logging in before syncing any chats. + takeout_limit: 100 + # Maximum number of messages to backfill initially. + # Set to 0 to disable backfilling when creating portal, or -1 to disable the limit. + # + # N.B. Initial backfill will only start after member sync. Make sure your + # max_initial_member_sync is set to a low enough value so it doesn't take forever. + initial_limit: 0 + # Maximum number of messages to backfill if messages were missed while + # the bridge was disconnected. + # Set to 0 to disable backfilling missed messages. + missed_limit: 100 + # If using double puppeting, should notifications be disabled + # while the initial backfill is in progress? + disable_notifications: false # Overrides for base power levels. initial_power_level_overrides: diff --git a/mautrix_telegram/portal/base.py b/mautrix_telegram/portal/base.py index 67170d3b..9e5bdaef 100644 --- a/mautrix_telegram/portal/base.py +++ b/mautrix_telegram/portal/base.py @@ -33,6 +33,7 @@ from mautrix.appservice import AppService, IntentAPI from mautrix.types import (RoomID, RoomAlias, UserID, EventID, EventType, MessageEventContent, PowerLevelStateEventContent, ContentURI) from mautrix.util.simple_template import SimpleTemplate +from mautrix.util.simple_lock import SimpleLock from mautrix.util.logging import TraceLogger from ..types import TelegramID @@ -93,7 +94,7 @@ class BasePortal(ABC): avatar_url: Optional[ContentURI] encrypted: bool deleted: bool - backfilling: bool + backfill_lock: SimpleLock backfill_leave: Optional[Set[IntentAPI]] log: TraceLogger @@ -127,7 +128,8 @@ class BasePortal(ABC): self._main_intent = None self.deleted = False self.log = self.base_log.getChild(self.tgid_log if self.tgid else self.mxid) - self.backfilling = False + self.backfill_lock = SimpleLock("Waiting for backfilling to finish before handling %s", + log=self.log, loop=self.loop) self.backfill_leave = None self.dedup = PortalDedup(self) @@ -531,7 +533,8 @@ class BasePortal(ABC): pass @abstractmethod - def backfill(self, source: 'AbstractUser') -> Awaitable[None]: + def backfill(self, source: 'AbstractUser', is_initial: bool = False, + limit: Optional[int] = None) -> Awaitable[None]: pass @abstractmethod diff --git a/mautrix_telegram/portal/metadata.py b/mautrix_telegram/portal/metadata.py index 26651cdd..4589a418 100644 --- a/mautrix_telegram/portal/metadata.py +++ b/mautrix_telegram/portal/metadata.py @@ -219,12 +219,6 @@ class PortalMetadata(BasePortal, ABC): if changed: self.save() await self.update_bridge_info() - puppet = p.Puppet.get_by_custom_mxid(user.mxid) - if puppet: - try: - await puppet.intent.ensure_joined(self.mxid) - except Exception: - self.log.exception("Failed to ensure %s is joined to portal", user.mxid) if self.sync_matrix_state: await self.main_intent.get_joined_members(self.mxid) @@ -392,28 +386,40 @@ class PortalMetadata(BasePortal, ABC): if not config["bridge.federate_rooms"]: creation_content["m.federate"] = False - room_id = await self.main_intent.create_room(alias_localpart=alias, preset=preset, - is_direct=direct, invitees=invites or [], - name=self.title, topic=self.about, - initial_state=initial_state, - creation_content=creation_content) - if not room_id: - raise Exception(f"Failed to create room") + with self.backfill_lock: + room_id = await self.main_intent.create_room(alias_localpart=alias, preset=preset, + is_direct=direct, invitees=invites or [], + name=self.title, topic=self.about, + initial_state=initial_state, + creation_content=creation_content) + if not room_id: + raise Exception(f"Failed to create room") - if self.encrypted and self.matrix.e2ee and direct: - try: - await self.az.intent.ensure_joined(room_id) - except Exception: - self.log.warning(f"Failed to add bridge bot to new private chat {room_id}") + if self.encrypted and self.matrix.e2ee and direct: + try: + await self.az.intent.ensure_joined(room_id) + except Exception: + self.log.warning(f"Failed to add bridge bot to new private chat {room_id}") - self.mxid = room_id - self.by_mxid[self.mxid] = self - self.save() - await self.az.state_store.set_power_levels(self.mxid, power_levels) - user.register_portal(self) - asyncio.ensure_future(self.update_matrix_room(user, entity, direct, puppet, - levels=power_levels, users=users, - participants=participants), loop=self.loop) + self.mxid = room_id + self.by_mxid[self.mxid] = self + self.save() + await self.az.state_store.set_power_levels(self.mxid, power_levels) + user.register_portal(self) + + update_room = self.loop.create_task(self.update_matrix_room( + user, entity, direct, puppet, + levels=power_levels, users=users, participants=participants)) + + if config["bridge.backfill.initial_limit"] > 0: + self.log.debug("Initial backfill is enabled. Waiting for room members to sync " + "and then starting backfill") + await update_room + + try: + await self.backfill(user, is_initial=True) + except Exception: + self.log.exception("Failed to backfill new portal") return self.mxid @@ -545,6 +551,13 @@ class PortalMetadata(BasePortal, ABC): if user: await self.invite_to_matrix(user.mxid) + puppet = p.Puppet.get_by_custom_mxid(user.mxid) + if puppet: + try: + await puppet.intent.ensure_joined(self.mxid) + except Exception: + self.log.exception("Failed to ensure %s is joined to portal", user.mxid) + # We can't trust the member list if any of the following cases is true: # * There are close to 10 000 users, because Telegram might not be sending all members. # * The member sync count is limited, because then we might ignore some members. diff --git a/mautrix_telegram/portal/telegram.py b/mautrix_telegram/portal/telegram.py index bb27d5ca..ad6ff332 100644 --- a/mautrix_telegram/portal/telegram.py +++ b/mautrix_telegram/portal/telegram.py @@ -38,12 +38,13 @@ from telethon.tl.types import ( from mautrix.appservice import IntentAPI from mautrix.types import (EventID, UserID, ImageInfo, ThumbnailInfo, RelatesTo, MessageType, EventType, MediaMessageEventContent, TextMessageEventContent, - LocationMessageEventContent, Format, MessageEventContent) + LocationMessageEventContent, Format) from ..types import TelegramID from ..db import Message as DBMessage, TelegramFile as DBTelegramFile from ..util import sane_mimetypes from ..context import Context +from ..tgclient import TelegramClient from .. import puppet as p, user as u, formatter, util from .base import BasePortal @@ -407,43 +408,92 @@ class PortalTelegram(BasePortal, ABC): edit_index=prev_edit_msg.edit_index + 1).insert() DBMessage.update_by_mxid(temporary_identifier, self.mxid, mxid=event_id) - async def backfill(self, source: 'AbstractUser') -> None: - self.log.debug("Backfilling history through %s", source.mxid) + @property + def _takeout_options(self) -> Dict[str, Union[bool, int]]: + return { + "files": True, + "megagroups": self.megagroup, + "chats": self.peer_type == "chat", + "users": self.peer_type == "user", + "channels": (self.peer_type == "channel" and not self.megagroup), + "max_file_size": min(config["bridge.max_document_size"], 2000) * 1024 * 1024 + } + + async def backfill(self, source: 'AbstractUser', is_initial: bool = False, + limit: Optional[int] = None) -> None: + limit = limit or (config["bridge.backfill.initial_limit"] if is_initial + else config["bridge.backfill.missed_limit"]) + if limit == 0: + return last = DBMessage.find_last(self.mxid, (source.tgid if self.peer_type != "channel" else self.tgid)) min_id = last.tgid if last else 0 - self.backfilling = True + message = (await source.client.get_messages(self.peer, limit=1))[0] + if limit < 0: + limit = None + self.log.debug(f"Backfilling approximately {message.id - min_id} messages " + f"through {source.mxid}") + elif self.peer_type == "channel": + # This is a channel or supergroup, so we'll backfill messages based on the ID. + # There are some cases, such as deleted messages, where this may backfill less + # messages than the limit. + min_id = max(message.id - limit, min_id) + limit = None + self.log.debug(f"Backfilling messages after ID {min_id} (last message: {message.id}) " + f"through {source.mxid}") + else: + # Private chats and normal groups don't have their own message ID namespace, + # which means we'll have to fetch messages a different way. + # The _backfill_messages method will detect min_id=None and not use reverse=True + min_id = None + self.log.debug(f"Backfilling up to {limit} messages through {source.mxid}") + with self.backfill_lock: + await self._backfill(source, min_id, limit) + + async def _backfill(self, source: 'AbstractUser', min_id: Optional[int], limit: Optional[int] + ) -> None: self.backfill_leave = set() - if self.peer_type == "user": + if ((self.peer_type == "user" and self.tg_receiver != source.tgid + and config["bridge.backfill.invite_own_puppet"])): self.log.debug("Adding %s's default puppet to room for backfilling", source.mxid) sender = p.Puppet.get(source.tgid) await self.main_intent.invite_user(self.mxid, sender.default_mxid) await sender.default_mxid_intent.join_room_by_id(self.mxid) self.backfill_leave.add(sender.default_mxid_intent) - max_file_size = min(config["bridge.max_document_size"], 1500) * 1024 * 1024 self.log.trace("Opening takeout client for %d, message ID %d->", source.tgid, min_id) - count = 0 - async with source.client.takeout(files=True, megagroups=self.megagroup, - chats=self.peer_type == "chat", - users=self.peer_type == "user", - channels=(self.peer_type == "channel" - and not self.megagroup), - max_file_size=max_file_size - ) as takeout_client: - async for message in takeout_client.iter_messages(await self.get_input_entity(source), - reverse=True, min_id=min_id): - sender = p.Puppet.get(message.sender_id) - # if isinstance(message, MessageService): - # await self.handle_telegram_action(source, sender, message) - await self.handle_telegram_message(source, sender, message) - count += 1 + + client = source.client + if limit > config["bridge.backfill.takeout_limit"]: + async with client.takeout(**self._takeout_options) as takeout: + count = await self._backfill_messages(source, min_id, limit, takeout) + else: + count = await self._backfill_messages(source, min_id, limit, client) + for intent in self.backfill_leave: self.log.trace("Leaving room with %s post-backfill", intent.mxid) await intent.leave_room(self.mxid) - self.backfilling = False self.backfill_leave = None self.log.info("Backfilled %d messages through %s", count, source.mxid) + async def _backfill_messages(self, source: 'AbstractUser', min_id: Optional[int], + limit: Optional[int], client: TelegramClient) -> int: + count = 0 + entity = await self.get_input_entity(source) + if min_id is not None: + messages = client.iter_messages(entity, reverse=True, min_id=min_id) + async for message in messages: + sender = p.Puppet.get(message.sender_id) + # TODO handle service messages? + await self.handle_telegram_message(source, sender, message) + count += 1 + else: + messages = await client.get_messages(entity, limit=limit) + for message in reversed(messages): + sender = p.Puppet.get(message.sender_id) + await self.handle_telegram_message(source, sender, message) + count += 1 + return count + async def handle_telegram_message(self, source: 'AbstractUser', sender: p.Puppet, evt: Message) -> None: if not self.mxid: @@ -472,7 +522,7 @@ class PortalTelegram(BasePortal, ABC): tg_space=tg_space, edit_index=0).insert() return - if self.backfilling or (self.dedup.pre_db_check and self.peer_type == "channel"): + if self.backfill_lock.locked or (self.dedup.pre_db_check and self.peer_type == "channel"): msg = DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space) if msg: self.log.debug(f"Ignoring message {evt.id} (src {source.tgid}) as it was already" @@ -496,7 +546,8 @@ class PortalTelegram(BasePortal, ABC): allowed_media) else None if sender: intent = sender.intent_for(self) - if self.backfilling and intent != sender.default_mxid_intent: + if ((self.backfill_lock.locked and intent != sender.default_mxid_intent + and config["bridge.backfill.invite_own_puppet"])): intent = sender.default_mxid_intent self.backfill_leave.add(intent) else: diff --git a/mautrix_telegram/puppet.py b/mautrix_telegram/puppet.py index c0507500..7aa8cdf6 100644 --- a/mautrix_telegram/puppet.py +++ b/mautrix_telegram/puppet.py @@ -331,7 +331,7 @@ class Puppet(CustomPuppetMixin): def default_puppet_should_leave_room(self, room_id: RoomID) -> bool: portal: p.Portal = p.Portal.get_by_mxid(room_id) - return portal and not portal.backfilling and portal.peer_type != "user" + return portal and not portal.backfill_lock.locked and portal.peer_type != "user" # endregion # region Getters diff --git a/mautrix_telegram/user.py b/mautrix_telegram/user.py index 4d9f6ead..b616193e 100644 --- a/mautrix_telegram/user.py +++ b/mautrix_telegram/user.py @@ -1,5 +1,5 @@ # mautrix-telegram - A Matrix-Telegram puppeting bridge -# Copyright (C) 2019 Tulir Asokan +# Copyright (C) 2020 Tulir Asokan # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -13,7 +13,7 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from typing import (Awaitable, Dict, List, Iterable, NewType, Optional, Tuple, Any, cast, +from typing import (Awaitable, Dict, List, Iterable, NamedTuple, Optional, Tuple, Any, cast, TYPE_CHECKING) import logging import asyncio @@ -42,7 +42,7 @@ if TYPE_CHECKING: config: Optional['Config'] = None -SearchResult = NewType('SearchResult', Tuple['pu.Puppet', int]) +SearchResult = NamedTuple('SearchResult', puppet='pu.Puppet', similarity=int) class User(AbstractUser, BaseUser): @@ -306,7 +306,7 @@ class User(AbstractUser, BaseUser): for contact in self.contacts: similarity = contact.similarity(query) if similarity >= min_similarity: - results.append(SearchResult((contact, similarity))) + results.append(SearchResult(contact, similarity)) results.sort(key=lambda tup: tup[1], reverse=True) return results[0:max_results] @@ -318,7 +318,7 @@ class User(AbstractUser, BaseUser): for user in server_results.users: puppet = pu.Puppet.get(user.id) await puppet.update_info(self, user) - results.append(SearchResult((puppet, puppet.similarity(query)))) + results.append(SearchResult(puppet, puppet.similarity(query))) results.sort(key=lambda tup: tup[1], reverse=True) return results[0:max_results] From ea017467fd81777cb96ad354c9c5047c90abd58f Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Tue, 28 Jul 2020 18:01:44 +0300 Subject: [PATCH 2/4] Add support for football --- mautrix_telegram/commands/telegram/misc.py | 8 +++++--- mautrix_telegram/portal/telegram.py | 3 ++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/mautrix_telegram/commands/telegram/misc.py b/mautrix_telegram/commands/telegram/misc.py index 0c1b0bd1..ced50eaf 100644 --- a/mautrix_telegram/commands/telegram/misc.py +++ b/mautrix_telegram/commands/telegram/misc.py @@ -311,11 +311,11 @@ async def vote(evt: CommandEvent) -> EventID: @command_handler(help_section=SECTION_MISC, help_args="<_emoji_>", - help_text="Roll a dice (\U0001F3B2) or throw a dart (\U0001F3AF) " - "on the Telegram servers.") + help_text="Roll a dice (\U0001F3B2), kick a football (\u26BD\uFE0F) or throw a " + "dart (\U0001F3AF) or basketball (\U0001F3C0) on the Telegram servers.") async def random(evt: CommandEvent) -> EventID: if not evt.is_portal: - return await evt.reply("You can only roll dice in portal rooms") + return await evt.reply("You can only randomize values in portal rooms") portal = po.Portal.get_by_mxid(evt.room_id) arg = evt.args[0] if len(evt.args) > 0 else "dice" emoticon = { @@ -323,6 +323,8 @@ async def random(evt: CommandEvent) -> EventID: "dice": "\U0001F3B2", "ball": "\U0001F3C0", "basketball": "\U0001F3C0", + "football": "\u26BD", + "soccer": "\u26BD", }.get(arg, arg) try: await evt.sender.client.send_media(await portal.get_input_entity(evt.sender), diff --git a/mautrix_telegram/portal/telegram.py b/mautrix_telegram/portal/telegram.py index ad6ff332..c32980de 100644 --- a/mautrix_telegram/portal/telegram.py +++ b/mautrix_telegram/portal/telegram.py @@ -305,7 +305,8 @@ class PortalTelegram(BasePortal, ABC): emoji_text = { "\U0001F3AF": " Dart throw", "\U0001F3B2": " Dice roll", - "\U0001F3C0": " Basketball throw" + "\U0001F3C0": " Basketball throw", + "\u26BD": " Football kick" } roll: MessageMediaDice = evt.media text = f"{roll.emoticon}{emoji_text.get(roll.emoticon, '')} result: {roll.value}" From 59eb7376c979bf5a09045a754207c4388e4d19b5 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Tue, 28 Jul 2020 18:32:34 +0300 Subject: [PATCH 3/4] Add missed message backfilling --- ROADMAP.md | 2 +- mautrix_telegram/commands/telegram/misc.py | 4 +++- mautrix_telegram/portal/metadata.py | 9 +++------ mautrix_telegram/portal/telegram.py | 15 ++++++++++----- mautrix_telegram/user.py | 19 +++++++++++++------ 5 files changed, 30 insertions(+), 19 deletions(-) diff --git a/ROADMAP.md b/ROADMAP.md index 6654c677..34e430de 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -31,7 +31,7 @@ * [x] Message history * [x] Manually (`!tg backfill`) * [x] Automatically when creating portal - * [ ] Automatically for missed messages + * [x] Automatically for missed messages * [x] Avatars * [x] Presence * [x] Typing notifications diff --git a/mautrix_telegram/commands/telegram/misc.py b/mautrix_telegram/commands/telegram/misc.py index ced50eaf..5e5e4531 100644 --- a/mautrix_telegram/commands/telegram/misc.py +++ b/mautrix_telegram/commands/telegram/misc.py @@ -185,8 +185,10 @@ async def sync(evt: CommandEvent) -> EventID: sync_only = None if not sync_only or sync_only == "chats": - await evt.sender.sync_dialogs(synchronous_create=True) + await evt.reply("Synchronizing chats...") + await evt.sender.sync_dialogs() if not sync_only or sync_only == "contacts": + await evt.reply("Synchronizing contacts...") await evt.sender.sync_contacts() if not sync_only or sync_only == "me": await evt.sender.update_info() diff --git a/mautrix_telegram/portal/metadata.py b/mautrix_telegram/portal/metadata.py index 4589a418..582a38ec 100644 --- a/mautrix_telegram/portal/metadata.py +++ b/mautrix_telegram/portal/metadata.py @@ -223,8 +223,8 @@ class PortalMetadata(BasePortal, ABC): await self.main_intent.get_joined_members(self.mxid) async def create_matrix_room(self, user: 'AbstractUser', entity: Union[TypeChat, User] = None, - invites: InviteList = None, update_if_exists: bool = True, - synchronous: bool = False) -> Optional[str]: + invites: InviteList = None, update_if_exists: bool = True + ) -> Optional[RoomID]: if self.mxid: if update_if_exists: if not entity: @@ -234,10 +234,7 @@ class PortalMetadata(BasePortal, ABC): self.log.exception(f"Failed to get entity through {user.tgid} for update") return self.mxid update = self.update_matrix_room(user, entity, self.peer_type == "user") - if synchronous: - await update - else: - asyncio.ensure_future(update, loop=self.loop) + self.loop.create_task(update) await self.invite_to_matrix(invites or []) return self.mxid async with self._room_create_lock: diff --git a/mautrix_telegram/portal/telegram.py b/mautrix_telegram/portal/telegram.py index c32980de..2f7367b4 100644 --- a/mautrix_telegram/portal/telegram.py +++ b/mautrix_telegram/portal/telegram.py @@ -421,7 +421,7 @@ class PortalTelegram(BasePortal, ABC): } async def backfill(self, source: 'AbstractUser', is_initial: bool = False, - limit: Optional[int] = None) -> None: + limit: Optional[int] = None, last_id: Optional[int] = None) -> None: limit = limit or (config["bridge.backfill.initial_limit"] if is_initial else config["bridge.backfill.missed_limit"]) if limit == 0: @@ -429,18 +429,23 @@ class PortalTelegram(BasePortal, ABC): last = DBMessage.find_last(self.mxid, (source.tgid if self.peer_type != "channel" else self.tgid)) min_id = last.tgid if last else 0 - message = (await source.client.get_messages(self.peer, limit=1))[0] + if last_id is None: + message = (await source.client.get_messages(self.peer, limit=1))[0] + last_id = message.id + if last_id <= min_id: + # Nothing to backfill + return if limit < 0: limit = None - self.log.debug(f"Backfilling approximately {message.id - min_id} messages " + self.log.debug(f"Backfilling approximately {last_id - min_id} messages " f"through {source.mxid}") elif self.peer_type == "channel": # This is a channel or supergroup, so we'll backfill messages based on the ID. # There are some cases, such as deleted messages, where this may backfill less # messages than the limit. - min_id = max(message.id - limit, min_id) + min_id = max(last_id - limit, min_id) limit = None - self.log.debug(f"Backfilling messages after ID {min_id} (last message: {message.id}) " + self.log.debug(f"Backfilling messages after ID {min_id} (last message: {last_id}) " f"through {source.mxid}") else: # Private chats and normal groups don't have their own message ID namespace, diff --git a/mautrix_telegram/user.py b/mautrix_telegram/user.py index b616193e..33953ec2 100644 --- a/mautrix_telegram/user.py +++ b/mautrix_telegram/user.py @@ -21,6 +21,7 @@ import asyncio from telethon.tl.types import (TypeUpdate, UpdateNewMessage, UpdateNewChannelMessage, PeerUser, UpdateShortChatMessage, UpdateShortMessage, User as TLUser, Chat, ChatForbidden) +from telethon.tl.custom import Dialog from telethon.tl.types.contacts import ContactsNotModified from telethon.tl.functions.contacts import GetContactsRequest, SearchRequest from telethon.tl.functions.account import UpdateStatusRequest @@ -333,12 +334,13 @@ class User(AbstractUser, BaseUser): return await self._search_remote(query), True - async def sync_dialogs(self, synchronous_create: bool = False) -> None: + async def sync_dialogs(self) -> None: if self.is_bot: return creators = [] limit = config["bridge.sync_dialog_limit"] or None - self.log.debug(f"Syncing dialogs (limit={limit}, synchronous_create={synchronous_create})") + self.log.debug(f"Syncing dialogs (limit={limit})") + dialog: Dialog async for dialog in self.client.iter_dialogs(limit=limit, ignore_migrated=True, archived=False): entity = dialog.entity @@ -353,11 +355,16 @@ class User(AbstractUser, BaseUser): continue portal = po.Portal.get_by_entity(entity, receiver_id=self.tgid) self.portals[portal.tgid_full] = portal - creators.append( - portal.create_matrix_room(self, entity, invites=[self.mxid], - synchronous=synchronous_create)) + if portal.mxid: + update_task = portal.update_matrix_room(self, entity) + backfill_task = portal.backfill(self, last_known_id=dialog.message.id) + creators.append(self.loop.create_task(update_task)) + creators.append(self.loop.create_task(backfill_task)) + else: + create_task = portal.create_matrix_room(self, entity, invites=[self.mxid]) + creators.append(self.loop.create_task(create_task)) self.save(portals=True) - await asyncio.gather(*creators, loop=self.loop) + await asyncio.gather(*creators) self.log.debug("Dialog syncing complete") def register_portal(self, portal: po.Portal) -> None: From 9848f8b92c02723d16c88a89905fab8465100eb9 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Wed, 29 Jul 2020 21:55:51 +0300 Subject: [PATCH 4/4] Separate dialog syncing and creation limits and fix bugs --- mautrix_telegram/config.py | 7 ++++++- mautrix_telegram/example-config.yaml | 12 ++++++++---- mautrix_telegram/portal/base.py | 2 +- mautrix_telegram/portal/telegram.py | 16 ++++++++-------- mautrix_telegram/user.py | 14 +++++++++----- 5 files changed, 32 insertions(+), 19 deletions(-) diff --git a/mautrix_telegram/config.py b/mautrix_telegram/config.py index c0d1f292..36f77383 100644 --- a/mautrix_telegram/config.py +++ b/mautrix_telegram/config.py @@ -89,7 +89,12 @@ class Config(BaseBridgeConfig): copy("bridge.sync_channel_members") copy("bridge.skip_deleted_members") copy("bridge.startup_sync") - copy("bridge.sync_dialog_limit") + if "bridge.sync_dialog_limit" in self: + base["bridge.sync_create_limit"] = self["bridge.sync_dialog_limit"] + base["bridge.sync_update_limit"] = self["bridge.sync_dialog_limit"] + else: + copy("bridge.sync_update_limit") + copy("bridge.sync_create_limit") copy("bridge.sync_direct_chats") copy("bridge.max_telegram_delete") copy("bridge.sync_matrix_state") diff --git a/mautrix_telegram/example-config.yaml b/mautrix_telegram/example-config.yaml index 22ffdbb6..89642f70 100644 --- a/mautrix_telegram/example-config.yaml +++ b/mautrix_telegram/example-config.yaml @@ -142,7 +142,10 @@ bridge: startup_sync: true # Number of most recently active dialogs to check when syncing chats. # Set to 0 to remove limit. - sync_dialog_limit: 30 + sync_update_limit: 0 + # Number of most recently active dialogs to create portals for when syncing chats. + # Set to 0 to remove limit. + sync_create_limit: 30 # Whether or not to sync and create portals for direct chats at startup. sync_direct_chats: false # The maximum number of simultaneous Telegram deletions to handle. @@ -249,10 +252,11 @@ bridge: # N.B. Initial backfill will only start after member sync. Make sure your # max_initial_member_sync is set to a low enough value so it doesn't take forever. initial_limit: 0 - # Maximum number of messages to backfill if messages were missed while - # the bridge was disconnected. + # Maximum number of messages to backfill if messages were missed while the bridge was + # disconnected. Note that this only works for logged in users and only if the chat isn't + # older than sync_update_limit # Set to 0 to disable backfilling missed messages. - missed_limit: 100 + missed_limit: 50 # If using double puppeting, should notifications be disabled # while the initial backfill is in progress? disable_notifications: false diff --git a/mautrix_telegram/portal/base.py b/mautrix_telegram/portal/base.py index 9e5bdaef..e94946a5 100644 --- a/mautrix_telegram/portal/base.py +++ b/mautrix_telegram/portal/base.py @@ -534,7 +534,7 @@ class BasePortal(ABC): @abstractmethod def backfill(self, source: 'AbstractUser', is_initial: bool = False, - limit: Optional[int] = None) -> Awaitable[None]: + limit: Optional[int] = None, last_id: Optional[int] = None) -> Awaitable[None]: pass @abstractmethod diff --git a/mautrix_telegram/portal/telegram.py b/mautrix_telegram/portal/telegram.py index 2f7367b4..2a806b09 100644 --- a/mautrix_telegram/portal/telegram.py +++ b/mautrix_telegram/portal/telegram.py @@ -436,7 +436,7 @@ class PortalTelegram(BasePortal, ABC): # Nothing to backfill return if limit < 0: - limit = None + limit = last_id - min_id self.log.debug(f"Backfilling approximately {last_id - min_id} messages " f"through {source.mxid}") elif self.peer_type == "channel": @@ -444,7 +444,6 @@ class PortalTelegram(BasePortal, ABC): # There are some cases, such as deleted messages, where this may backfill less # messages than the limit. min_id = max(last_id - limit, min_id) - limit = None self.log.debug(f"Backfilling messages after ID {min_id} (last message: {last_id}) " f"through {source.mxid}") else: @@ -456,20 +455,19 @@ class PortalTelegram(BasePortal, ABC): with self.backfill_lock: await self._backfill(source, min_id, limit) - async def _backfill(self, source: 'AbstractUser', min_id: Optional[int], limit: Optional[int] - ) -> None: + async def _backfill(self, source: 'AbstractUser', min_id: Optional[int], limit: int) -> None: self.backfill_leave = set() - if ((self.peer_type == "user" and self.tg_receiver != source.tgid + if ((self.peer_type == "user" and self.tgid != source.tgid and config["bridge.backfill.invite_own_puppet"])): self.log.debug("Adding %s's default puppet to room for backfilling", source.mxid) sender = p.Puppet.get(source.tgid) await self.main_intent.invite_user(self.mxid, sender.default_mxid) await sender.default_mxid_intent.join_room_by_id(self.mxid) self.backfill_leave.add(sender.default_mxid_intent) - self.log.trace("Opening takeout client for %d, message ID %d->", source.tgid, min_id) client = source.client if limit > config["bridge.backfill.takeout_limit"]: + self.log.debug(f"Opening takeout client for {source.tgid}") async with client.takeout(**self._takeout_options) as takeout: count = await self._backfill_messages(source, min_id, limit, takeout) else: @@ -481,11 +479,12 @@ class PortalTelegram(BasePortal, ABC): self.backfill_leave = None self.log.info("Backfilled %d messages through %s", count, source.mxid) - async def _backfill_messages(self, source: 'AbstractUser', min_id: Optional[int], - limit: Optional[int], client: TelegramClient) -> int: + async def _backfill_messages(self, source: 'AbstractUser', min_id: Optional[int], limit: int, + client: TelegramClient) -> int: count = 0 entity = await self.get_input_entity(source) if min_id is not None: + self.log.debug(f"Iterating all messages starting with {min_id} (approx: {limit})") messages = client.iter_messages(entity, reverse=True, min_id=min_id) async for message in messages: sender = p.Puppet.get(message.sender_id) @@ -493,6 +492,7 @@ class PortalTelegram(BasePortal, ABC): await self.handle_telegram_message(source, sender, message) count += 1 else: + self.log.debug(f"Fetching up to {limit} most recent messages") messages = await client.get_messages(entity, limit=limit) for message in reversed(messages): sender = p.Puppet.get(message.sender_id) diff --git a/mautrix_telegram/user.py b/mautrix_telegram/user.py index 33953ec2..63dc0f14 100644 --- a/mautrix_telegram/user.py +++ b/mautrix_telegram/user.py @@ -338,10 +338,13 @@ class User(AbstractUser, BaseUser): if self.is_bot: return creators = [] - limit = config["bridge.sync_dialog_limit"] or None - self.log.debug(f"Syncing dialogs (limit={limit})") + update_limit = config["bridge.sync_update_limit"] or None + create_limit = config["bridge.sync_create_limit"] + index = 0 + self.log.debug(f"Syncing dialogs (update_limit={update_limit}, " + f"create_limit={create_limit})") dialog: Dialog - async for dialog in self.client.iter_dialogs(limit=limit, ignore_migrated=True, + async for dialog in self.client.iter_dialogs(limit=update_limit, ignore_migrated=True, archived=False): entity = dialog.entity if isinstance(entity, ChatForbidden): @@ -357,12 +360,13 @@ class User(AbstractUser, BaseUser): self.portals[portal.tgid_full] = portal if portal.mxid: update_task = portal.update_matrix_room(self, entity) - backfill_task = portal.backfill(self, last_known_id=dialog.message.id) + backfill_task = portal.backfill(self, last_id=dialog.message.id) creators.append(self.loop.create_task(update_task)) creators.append(self.loop.create_task(backfill_task)) - else: + elif not create_limit or index < create_limit: create_task = portal.create_matrix_room(self, entity, invites=[self.mxid]) creators.append(self.loop.create_task(create_task)) + index += 1 self.save(portals=True) await asyncio.gather(*creators) self.log.debug("Dialog syncing complete")