From 2fbee75453d40fa219a6a67160eb4b1d460f6ee7 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Tue, 4 Feb 2020 22:41:49 +0200 Subject: [PATCH] Add command to backfill room history from Telegram Currently supports backfilling one room at a time and backfills everything after the last bridged message. --- mautrix_telegram/commands/telegram/misc.py | 18 ++++++++-- mautrix_telegram/db/message.py | 10 ++++++ mautrix_telegram/portal/__init__.pyi | 6 ++-- mautrix_telegram/portal/base.py | 19 +++++++--- mautrix_telegram/portal/telegram.py | 40 +++++++++++++++++++--- mautrix_telegram/util/file_transfer.py | 12 ++++--- 6 files changed, 85 insertions(+), 20 deletions(-) diff --git a/mautrix_telegram/commands/telegram/misc.py b/mautrix_telegram/commands/telegram/misc.py index 0e58685b..67fd8bf4 100644 --- a/mautrix_telegram/commands/telegram/misc.py +++ b/mautrix_telegram/commands/telegram/misc.py @@ -20,7 +20,8 @@ import base64 import re from telethon.errors import (InviteHashInvalidError, InviteHashExpiredError, OptionsTooMuchError, - UserAlreadyParticipantError, ChatIdInvalidError) + UserAlreadyParticipantError, ChatIdInvalidError, + TakeoutInitDelayError) from telethon.tl.patched import Message from telethon.tl.types import (User as TLUser, TypeUpdates, MessageMediaGame, MessageMediaPoll, TypeInputPeer) @@ -35,7 +36,8 @@ from ... import puppet as pu, portal as po from ...abstract_user import AbstractUser from ...db import Message as DBMessage from ...types import TelegramID -from ...commands import command_handler, CommandEvent, SECTION_MISC, SECTION_CREATING_PORTALS +from ...commands import (command_handler, CommandEvent, SECTION_MISC, SECTION_CREATING_PORTALS, + SECTION_PORTAL_MANAGEMENT) @command_handler(needs_auth=False, @@ -303,3 +305,15 @@ async def vote(evt: CommandEvent) -> EventID: return await evt.reply("You passed too many options.") # TODO use response return await evt.mark_read() + + +@command_handler(help_section=SECTION_PORTAL_MANAGEMENT, + help_args="<_number of messages_> [--takeout]", + help_text="Backfill messages from Telegram history.") +async def backfill(evt: CommandEvent) -> None: + portal = po.Portal.get_by_mxid(evt.room_id) + try: + await portal.backfill(evt.sender) + except TakeoutInitDelayError: + await evt.reply("Please accept the data export request from a mobile device, " + "then re-run the backfill command.") diff --git a/mautrix_telegram/db/message.py b/mautrix_telegram/db/message.py index d6b228a3..8d6f5bd5 100644 --- a/mautrix_telegram/db/message.py +++ b/mautrix_telegram/db/message.py @@ -61,6 +61,16 @@ class Message(Base): except StopIteration: return 0 + @classmethod + def find_last(cls, mx_room: RoomID, tg_space: TelegramID) -> Optional['Message']: + return cls._one_or_none(cls.db.execute( + cls._make_simple_select(cls.c.mx_room == mx_room, cls.c.tg_space == tg_space) + .order_by(desc(cls.c.tgid)).limit(1))) + + @classmethod + def delete_all(cls, mx_room: RoomID) -> None: + cls.db.execute(cls.t.delete().where(cls.c.mx_room == mx_room)) + @classmethod def get_by_mxid(cls, mxid: EventID, mx_room: RoomID, tg_space: TelegramID ) -> Optional['Message']: diff --git a/mautrix_telegram/portal/__init__.pyi b/mautrix_telegram/portal/__init__.pyi index f705bfc7..c1564548 100644 --- a/mautrix_telegram/portal/__init__.pyi +++ b/mautrix_telegram/portal/__init__.pyi @@ -1,8 +1,8 @@ from typing import Union from .base import BasePortal -from .portal_matrix import PortalMatrix -from .portal_metadata import PortalMetadata -from .portal_telegram import PortalTelegram +from .matrix import PortalMatrix +from .metadata import PortalMetadata +from .telegram import PortalTelegram from ..context import Context Portal = Union[BasePortal, PortalMatrix, PortalMetadata, PortalTelegram] diff --git a/mautrix_telegram/portal/base.py b/mautrix_telegram/portal/base.py index 9ab829c5..967083b1 100644 --- a/mautrix_telegram/portal/base.py +++ b/mautrix_telegram/portal/base.py @@ -13,7 +13,7 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from typing import Awaitable, Dict, List, Optional, Tuple, Union, Any, TYPE_CHECKING +from typing import Awaitable, Dict, List, Optional, Tuple, Union, Any, Set, TYPE_CHECKING from abc import ABC, abstractmethod import asyncio import logging @@ -35,7 +35,7 @@ from mautrix.util.simple_template import SimpleTemplate from ..types import TelegramID from ..context import Context -from ..db import Portal as DBPortal +from ..db import Portal as DBPortal, Message as DBMessage from .. import puppet as p, user as u, util from .deduplication import PortalDedup from .send_lock import PortalSendLock @@ -86,6 +86,8 @@ class BasePortal(ABC): photo_id: Optional[str] local_config: Dict[str, Any] deleted: bool + backfilling: bool + backfill_leave: Optional[Set[IntentAPI]] log: logging.Logger alias: Optional[RoomAlias] @@ -115,6 +117,8 @@ class BasePortal(ABC): self._main_intent = None self.deleted = False self.log = self.base_log.getChild(self.tgid_log if self.tgid else self.mxid) + self.backfilling = True + self.backfill_leave = None self.dedup = PortalDedup(self) self.send_lock = PortalSendLock() @@ -273,8 +277,8 @@ class BasePortal(ABC): authenticated.append(user) return authenticated - @staticmethod - async def cleanup_room(intent: IntentAPI, room_id: RoomID, message: str, + @classmethod + async def cleanup_room(cls, intent: IntentAPI, room_id: RoomID, message: str, puppets_only: bool = False) -> None: try: members = await intent.get_room_members(room_id) @@ -293,7 +297,7 @@ class BasePortal(ABC): try: await intent.leave_room(room_id) except (MatrixRequestError, IntentError): - self.log.warning("Failed to leave room when cleaning up room", exc_info=True) + cls.log.warning(f"Failed to leave room {room_id} when cleaning up room", exc_info=True) async def cleanup_portal(self, message: str, puppets_only: bool = False) -> None: if self.username: @@ -342,6 +346,7 @@ class BasePortal(ABC): pass if self._db_instance: self._db_instance.delete() + DBMessage.delete_all(self.mxid) self.deleted = True @classmethod @@ -491,6 +496,10 @@ class BasePortal(ABC): old_levels: Dict[UserID, int]) -> Awaitable[None]: pass + @abstractmethod + def backfill(self, source: 'AbstractUser') -> Awaitable[None]: + pass + # endregion diff --git a/mautrix_telegram/portal/telegram.py b/mautrix_telegram/portal/telegram.py index eca22172..73c7238a 100644 --- a/mautrix_telegram/portal/telegram.py +++ b/mautrix_telegram/portal/telegram.py @@ -14,7 +14,6 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from typing import Awaitable, Dict, List, Optional, Tuple, Union, NamedTuple, TYPE_CHECKING -from html import escape as escape_html from abc import ABC import random import mimetypes @@ -30,7 +29,7 @@ from telethon.tl.types import ( MessageMediaPoll, MessageActionChannelCreate, MessageActionChatAddUser, MessageActionChatCreate, MessageActionChatDeletePhoto, MessageActionChatDeleteUser, MessageActionChatEditPhoto, MessageActionChatEditTitle, MessageActionChatJoinedByLink, - MessageActionChatMigrateTo, MessageActionPinMessage, MessageActionGameScore, + MessageActionChatMigrateTo, MessageActionChannelMigrateFrom, MessageActionGameScore, MessageMediaDocument, MessageMediaGeo, MessageMediaPhoto, MessageMediaUnsupported, MessageMediaGame, PeerUser, PhotoCachedSize, TypeChannelParticipant, TypeChatParticipant, TypeDocumentAttribute, TypeMessageAction, TypePhotoSize, PhotoSize, UpdateChatUserTyping, @@ -356,6 +355,31 @@ class PortalTelegram(BasePortal, ABC): edit_index=prev_edit_msg.edit_index + 1).insert() DBMessage.update_by_mxid(temporary_identifier, self.mxid, mxid=event_id) + async def backfill(self, source: 'AbstractUser') -> None: + last = DBMessage.find_last(self.mxid, (source.tgid if self.peer_type != "channel" + else self.tgid)) + min_id = last.tgid if last else 0 + self.backfilling = True + self.backfill_leave = set() + max_file_size = min(config["bridge.max_document_size"], 1500) * 1024 * 1024 + async with source.client.takeout(files=True, megagroups=self.megagroup, + chats=self.peer_type == "chat", + users=self.peer_type == "user", + channels=(self.peer_type == "channel" + and not self.megagroup), + max_file_size=max_file_size + ) as takeout_client: + async for message in takeout_client.iter_messages(await self.get_input_entity(source), + reverse=True, min_id=min_id): + sender = p.Puppet.get(message.sender_id) + # if isinstance(message, MessageService): + # await self.handle_telegram_action(source, sender, message) + await self.handle_telegram_message(source, sender, message) + for intent in self.backfill_leave: + await intent.leave_room(self.mxid) + self.backfilling = False + self.backfill_leave = None + async def handle_telegram_message(self, source: 'AbstractUser', sender: p.Puppet, evt: Message) -> None: if not self.mxid: @@ -383,7 +407,7 @@ class PortalTelegram(BasePortal, ABC): tg_space=tg_space, edit_index=0).insert() return - if self.dedup.pre_db_check and self.peer_type == "channel": + if self.backfilling or (self.dedup.pre_db_check and self.peer_type == "channel"): msg = DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space) if msg: self.log.debug(f"Ignoring message {evt.id} (src {source.tgid}) as it was already" @@ -402,7 +426,13 @@ class PortalTelegram(BasePortal, ABC): MessageMediaGame, MessageMediaPoll, MessageMediaUnsupported) media = evt.media if hasattr(evt, "media") and isinstance(evt.media, allowed_media) else None - intent = sender.intent_for(self) if sender else self.main_intent + if sender: + intent = sender.intent_for(self) + if self.backfilling and intent != sender.default_mxid_intent: + intent = sender.default_mxid_intent + self.backfill_leave.add(intent) + else: + intent = self.main_intent if not media and evt.message: is_bot = sender.is_bot if sender else False event_id = await self.handle_telegram_text(source, intent, is_bot, evt) @@ -502,7 +532,7 @@ class PortalTelegram(BasePortal, ABC): await self.main_intent.set_power_levels(self.mxid, levels) async def receive_telegram_pin_id(self, msg_id: TelegramID, receiver: TelegramID) -> None: - tg_space = receiver if self.peer_type != "channel" else self.tgid + tg_space = receiver if self.peer_type != "channel" else self.tgid message = DBMessage.get_one_by_tgid(msg_id, tg_space) if msg_id != 0 else None if message: await self.main_intent.set_pinned_messages(self.mxid, [message.mxid]) diff --git a/mautrix_telegram/util/file_transfer.py b/mautrix_telegram/util/file_transfer.py index 9028d9ce..f9df31e5 100644 --- a/mautrix_telegram/util/file_transfer.py +++ b/mautrix_telegram/util/file_transfer.py @@ -30,7 +30,6 @@ from telethon.errors import (AuthBytesInvalidError, AuthKeyInvalidError, Locatio from mautrix.appservice import IntentAPI - from ..tgclient import MautrixTelegramClient from ..db import TelegramFile as DBTelegramFile from ..util import sane_mimetypes @@ -214,8 +213,8 @@ async def _unlocked_transfer_file_to_matrix(client: MautrixTelegramClient, inten image_converted = False # A weird bug in alpine/magic makes it return application/octet-stream for gzips... if is_sticker and tgs_convert and (mime_type == "application/gzip" or ( - mime_type == "application/octet-stream" - and magic.from_buffer(file).startswith("gzip"))): + mime_type == "application/octet-stream" + and magic.from_buffer(file).startswith("gzip"))): mime_type, file, width, height = await convert_tgs_to( file, tgs_convert["target"], **tgs_convert["args"]) thumbnail = None @@ -238,8 +237,11 @@ async def _unlocked_transfer_file_to_matrix(client: MautrixTelegramClient, inten if thumbnail and (mime_type.startswith("video/") or mime_type == "image/gif"): if isinstance(thumbnail, (PhotoSize, PhotoCachedSize)): thumbnail = thumbnail.location - db_file.thumbnail = await transfer_thumbnail_to_matrix(client, intent, thumbnail, file, - mime_type) + try: + db_file.thumbnail = await transfer_thumbnail_to_matrix(client, intent, thumbnail, file, + mime_type) + except FileIdInvalidError: + log.warning(f"Failed to transfer thumbnail for {thumbnail!s}", exc_info=True) try: db_file.insert()