diff --git a/mautrix_telegram/commands/telegram/misc.py b/mautrix_telegram/commands/telegram/misc.py
index 0e58685b..67fd8bf4 100644
--- a/mautrix_telegram/commands/telegram/misc.py
+++ b/mautrix_telegram/commands/telegram/misc.py
@@ -20,7 +20,8 @@ import base64
import re
from telethon.errors import (InviteHashInvalidError, InviteHashExpiredError, OptionsTooMuchError,
- UserAlreadyParticipantError, ChatIdInvalidError)
+ UserAlreadyParticipantError, ChatIdInvalidError,
+ TakeoutInitDelayError)
from telethon.tl.patched import Message
from telethon.tl.types import (User as TLUser, TypeUpdates, MessageMediaGame, MessageMediaPoll,
TypeInputPeer)
@@ -35,7 +36,8 @@ from ... import puppet as pu, portal as po
from ...abstract_user import AbstractUser
from ...db import Message as DBMessage
from ...types import TelegramID
-from ...commands import command_handler, CommandEvent, SECTION_MISC, SECTION_CREATING_PORTALS
+from ...commands import (command_handler, CommandEvent, SECTION_MISC, SECTION_CREATING_PORTALS,
+ SECTION_PORTAL_MANAGEMENT)
@command_handler(needs_auth=False,
@@ -303,3 +305,15 @@ async def vote(evt: CommandEvent) -> EventID:
return await evt.reply("You passed too many options.")
# TODO use response
return await evt.mark_read()
+
+
+@command_handler(help_section=SECTION_PORTAL_MANAGEMENT,
+ help_args="<_number of messages_> [--takeout]",
+ help_text="Backfill messages from Telegram history.")
+async def backfill(evt: CommandEvent) -> None:
+ portal = po.Portal.get_by_mxid(evt.room_id)
+ try:
+ await portal.backfill(evt.sender)
+ except TakeoutInitDelayError:
+ await evt.reply("Please accept the data export request from a mobile device, "
+ "then re-run the backfill command.")
diff --git a/mautrix_telegram/db/message.py b/mautrix_telegram/db/message.py
index d6b228a3..8d6f5bd5 100644
--- a/mautrix_telegram/db/message.py
+++ b/mautrix_telegram/db/message.py
@@ -61,6 +61,16 @@ class Message(Base):
except StopIteration:
return 0
+ @classmethod
+ def find_last(cls, mx_room: RoomID, tg_space: TelegramID) -> Optional['Message']:
+ return cls._one_or_none(cls.db.execute(
+ cls._make_simple_select(cls.c.mx_room == mx_room, cls.c.tg_space == tg_space)
+ .order_by(desc(cls.c.tgid)).limit(1)))
+
+ @classmethod
+ def delete_all(cls, mx_room: RoomID) -> None:
+ cls.db.execute(cls.t.delete().where(cls.c.mx_room == mx_room))
+
@classmethod
def get_by_mxid(cls, mxid: EventID, mx_room: RoomID, tg_space: TelegramID
) -> Optional['Message']:
diff --git a/mautrix_telegram/portal/__init__.pyi b/mautrix_telegram/portal/__init__.pyi
index f705bfc7..c1564548 100644
--- a/mautrix_telegram/portal/__init__.pyi
+++ b/mautrix_telegram/portal/__init__.pyi
@@ -1,8 +1,8 @@
from typing import Union
from .base import BasePortal
-from .portal_matrix import PortalMatrix
-from .portal_metadata import PortalMetadata
-from .portal_telegram import PortalTelegram
+from .matrix import PortalMatrix
+from .metadata import PortalMetadata
+from .telegram import PortalTelegram
from ..context import Context
Portal = Union[BasePortal, PortalMatrix, PortalMetadata, PortalTelegram]
diff --git a/mautrix_telegram/portal/base.py b/mautrix_telegram/portal/base.py
index 9ab829c5..967083b1 100644
--- a/mautrix_telegram/portal/base.py
+++ b/mautrix_telegram/portal/base.py
@@ -13,7 +13,7 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
-from typing import Awaitable, Dict, List, Optional, Tuple, Union, Any, TYPE_CHECKING
+from typing import Awaitable, Dict, List, Optional, Tuple, Union, Any, Set, TYPE_CHECKING
from abc import ABC, abstractmethod
import asyncio
import logging
@@ -35,7 +35,7 @@ from mautrix.util.simple_template import SimpleTemplate
from ..types import TelegramID
from ..context import Context
-from ..db import Portal as DBPortal
+from ..db import Portal as DBPortal, Message as DBMessage
from .. import puppet as p, user as u, util
from .deduplication import PortalDedup
from .send_lock import PortalSendLock
@@ -86,6 +86,8 @@ class BasePortal(ABC):
photo_id: Optional[str]
local_config: Dict[str, Any]
deleted: bool
+ backfilling: bool
+ backfill_leave: Optional[Set[IntentAPI]]
log: logging.Logger
alias: Optional[RoomAlias]
@@ -115,6 +117,8 @@ class BasePortal(ABC):
self._main_intent = None
self.deleted = False
self.log = self.base_log.getChild(self.tgid_log if self.tgid else self.mxid)
+ self.backfilling = True
+ self.backfill_leave = None
self.dedup = PortalDedup(self)
self.send_lock = PortalSendLock()
@@ -273,8 +277,8 @@ class BasePortal(ABC):
authenticated.append(user)
return authenticated
- @staticmethod
- async def cleanup_room(intent: IntentAPI, room_id: RoomID, message: str,
+ @classmethod
+ async def cleanup_room(cls, intent: IntentAPI, room_id: RoomID, message: str,
puppets_only: bool = False) -> None:
try:
members = await intent.get_room_members(room_id)
@@ -293,7 +297,7 @@ class BasePortal(ABC):
try:
await intent.leave_room(room_id)
except (MatrixRequestError, IntentError):
- self.log.warning("Failed to leave room when cleaning up room", exc_info=True)
+ cls.log.warning(f"Failed to leave room {room_id} when cleaning up room", exc_info=True)
async def cleanup_portal(self, message: str, puppets_only: bool = False) -> None:
if self.username:
@@ -342,6 +346,7 @@ class BasePortal(ABC):
pass
if self._db_instance:
self._db_instance.delete()
+ DBMessage.delete_all(self.mxid)
self.deleted = True
@classmethod
@@ -491,6 +496,10 @@ class BasePortal(ABC):
old_levels: Dict[UserID, int]) -> Awaitable[None]:
pass
+ @abstractmethod
+ def backfill(self, source: 'AbstractUser') -> Awaitable[None]:
+ pass
+
# endregion
diff --git a/mautrix_telegram/portal/telegram.py b/mautrix_telegram/portal/telegram.py
index eca22172..73c7238a 100644
--- a/mautrix_telegram/portal/telegram.py
+++ b/mautrix_telegram/portal/telegram.py
@@ -14,7 +14,6 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
from typing import Awaitable, Dict, List, Optional, Tuple, Union, NamedTuple, TYPE_CHECKING
-from html import escape as escape_html
from abc import ABC
import random
import mimetypes
@@ -30,7 +29,7 @@ from telethon.tl.types import (
MessageMediaPoll, MessageActionChannelCreate, MessageActionChatAddUser,
MessageActionChatCreate, MessageActionChatDeletePhoto, MessageActionChatDeleteUser,
MessageActionChatEditPhoto, MessageActionChatEditTitle, MessageActionChatJoinedByLink,
- MessageActionChatMigrateTo, MessageActionPinMessage, MessageActionGameScore,
+ MessageActionChatMigrateTo, MessageActionChannelMigrateFrom, MessageActionGameScore,
MessageMediaDocument, MessageMediaGeo, MessageMediaPhoto, MessageMediaUnsupported,
MessageMediaGame, PeerUser, PhotoCachedSize, TypeChannelParticipant, TypeChatParticipant,
TypeDocumentAttribute, TypeMessageAction, TypePhotoSize, PhotoSize, UpdateChatUserTyping,
@@ -356,6 +355,31 @@ class PortalTelegram(BasePortal, ABC):
edit_index=prev_edit_msg.edit_index + 1).insert()
DBMessage.update_by_mxid(temporary_identifier, self.mxid, mxid=event_id)
+ async def backfill(self, source: 'AbstractUser') -> None:
+ last = DBMessage.find_last(self.mxid, (source.tgid if self.peer_type != "channel"
+ else self.tgid))
+ min_id = last.tgid if last else 0
+ self.backfilling = True
+ self.backfill_leave = set()
+ max_file_size = min(config["bridge.max_document_size"], 1500) * 1024 * 1024
+ async with source.client.takeout(files=True, megagroups=self.megagroup,
+ chats=self.peer_type == "chat",
+ users=self.peer_type == "user",
+ channels=(self.peer_type == "channel"
+ and not self.megagroup),
+ max_file_size=max_file_size
+ ) as takeout_client:
+ async for message in takeout_client.iter_messages(await self.get_input_entity(source),
+ reverse=True, min_id=min_id):
+ sender = p.Puppet.get(message.sender_id)
+ # if isinstance(message, MessageService):
+ # await self.handle_telegram_action(source, sender, message)
+ await self.handle_telegram_message(source, sender, message)
+ for intent in self.backfill_leave:
+ await intent.leave_room(self.mxid)
+ self.backfilling = False
+ self.backfill_leave = None
+
async def handle_telegram_message(self, source: 'AbstractUser', sender: p.Puppet,
evt: Message) -> None:
if not self.mxid:
@@ -383,7 +407,7 @@ class PortalTelegram(BasePortal, ABC):
tg_space=tg_space, edit_index=0).insert()
return
- if self.dedup.pre_db_check and self.peer_type == "channel":
+ if self.backfilling or (self.dedup.pre_db_check and self.peer_type == "channel"):
msg = DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space)
if msg:
self.log.debug(f"Ignoring message {evt.id} (src {source.tgid}) as it was already"
@@ -402,7 +426,13 @@ class PortalTelegram(BasePortal, ABC):
MessageMediaGame, MessageMediaPoll, MessageMediaUnsupported)
media = evt.media if hasattr(evt, "media") and isinstance(evt.media,
allowed_media) else None
- intent = sender.intent_for(self) if sender else self.main_intent
+ if sender:
+ intent = sender.intent_for(self)
+ if self.backfilling and intent != sender.default_mxid_intent:
+ intent = sender.default_mxid_intent
+ self.backfill_leave.add(intent)
+ else:
+ intent = self.main_intent
if not media and evt.message:
is_bot = sender.is_bot if sender else False
event_id = await self.handle_telegram_text(source, intent, is_bot, evt)
@@ -502,7 +532,7 @@ class PortalTelegram(BasePortal, ABC):
await self.main_intent.set_power_levels(self.mxid, levels)
async def receive_telegram_pin_id(self, msg_id: TelegramID, receiver: TelegramID) -> None:
- tg_space = receiver if self.peer_type != "channel" else self.tgid
+ tg_space = receiver if self.peer_type != "channel" else self.tgid
message = DBMessage.get_one_by_tgid(msg_id, tg_space) if msg_id != 0 else None
if message:
await self.main_intent.set_pinned_messages(self.mxid, [message.mxid])
diff --git a/mautrix_telegram/util/file_transfer.py b/mautrix_telegram/util/file_transfer.py
index 9028d9ce..f9df31e5 100644
--- a/mautrix_telegram/util/file_transfer.py
+++ b/mautrix_telegram/util/file_transfer.py
@@ -30,7 +30,6 @@ from telethon.errors import (AuthBytesInvalidError, AuthKeyInvalidError, Locatio
from mautrix.appservice import IntentAPI
-
from ..tgclient import MautrixTelegramClient
from ..db import TelegramFile as DBTelegramFile
from ..util import sane_mimetypes
@@ -214,8 +213,8 @@ async def _unlocked_transfer_file_to_matrix(client: MautrixTelegramClient, inten
image_converted = False
# A weird bug in alpine/magic makes it return application/octet-stream for gzips...
if is_sticker and tgs_convert and (mime_type == "application/gzip" or (
- mime_type == "application/octet-stream"
- and magic.from_buffer(file).startswith("gzip"))):
+ mime_type == "application/octet-stream"
+ and magic.from_buffer(file).startswith("gzip"))):
mime_type, file, width, height = await convert_tgs_to(
file, tgs_convert["target"], **tgs_convert["args"])
thumbnail = None
@@ -238,8 +237,11 @@ async def _unlocked_transfer_file_to_matrix(client: MautrixTelegramClient, inten
if thumbnail and (mime_type.startswith("video/") or mime_type == "image/gif"):
if isinstance(thumbnail, (PhotoSize, PhotoCachedSize)):
thumbnail = thumbnail.location
- db_file.thumbnail = await transfer_thumbnail_to_matrix(client, intent, thumbnail, file,
- mime_type)
+ try:
+ db_file.thumbnail = await transfer_thumbnail_to_matrix(client, intent, thumbnail, file,
+ mime_type)
+ except FileIdInvalidError:
+ log.warning(f"Failed to transfer thumbnail for {thumbnail!s}", exc_info=True)
try:
db_file.insert()