Add option for automatic backfilling when creating portal

This commit is contained in:
Tulir Asokan
2020-07-28 17:28:07 +03:00
parent 993354bce5
commit 2c0a2e694b
10 changed files with 170 additions and 68 deletions
+2 -2
View File
@@ -28,9 +28,9 @@
* [ ] Buttons
* [x] Message deletions
* [x] Message edits
* [ ] Message history
* [x] Message history
* [x] Manually (`!tg backfill`)
* [ ] Automatically when creating portal
* [x] Automatically when creating portal
* [ ] Automatically for missed messages
* [x] Avatars
* [x] Presence
+3 -1
View File
@@ -1,5 +1,5 @@
# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2019 Tulir Asokan
# Copyright (C) 2020 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@@ -426,6 +426,8 @@ class AbstractUser(ABC):
self.log.debug(f"Ignoring relaybot-sent message %s to %s", update.id, portal.tgid_log)
return
await portal.backfill_lock.wait(update.id)
if isinstance(update, MessageService):
if isinstance(update.action, MessageActionChannelMigrateFrom):
self.log.trace(f"Ignoring action %s to %s by %d", update.action, portal.tgid_log,
+7 -3
View File
@@ -1,5 +1,5 @@
# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2019 Tulir Asokan
# Copyright (C) 2020 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@@ -331,15 +331,19 @@ async def random(evt: CommandEvent) -> EventID:
return await evt.reply("Invalid emoji for randomization")
@command_handler(help_section=SECTION_PORTAL_MANAGEMENT,
@command_handler(help_section=SECTION_PORTAL_MANAGEMENT, help_args="[_limit_]",
help_text="Backfill messages from Telegram history.")
async def backfill(evt: CommandEvent) -> None:
if not evt.is_portal:
await evt.reply("You can only use backfill in portal rooms")
return
try:
limit = int(evt.args[0])
except (ValueError, IndexError):
limit = -1
portal = po.Portal.get_by_mxid(evt.room_id)
try:
await portal.backfill(evt.sender)
await portal.backfill(evt.sender, limit=limit)
except TakeoutInitDelayError:
msg = ("Please accept the data export request from a mobile device, "
"then re-run the backfill command.")
+6 -1
View File
@@ -1,5 +1,5 @@
# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2019 Tulir Asokan
# Copyright (C) 2020 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@@ -113,6 +113,11 @@ class Config(BaseBridgeConfig):
copy("bridge.delivery_receipts")
copy("bridge.delivery_error_reports")
copy("bridge.resend_bridge_info")
copy("bridge.backfill.invite_own_puppet")
copy("bridge.backfill.takeout_limit")
copy("bridge.backfill.initial_limit")
copy("bridge.backfill.missed_limit")
copy("bridge.backfill.disable_notifications")
copy("bridge.initial_power_level_overrides.group")
copy("bridge.initial_power_level_overrides.user")
+26 -2
View File
@@ -129,8 +129,8 @@ bridge:
# Maximum number of members to sync per portal when starting up. Other members will be
# synced when they send messages. The maximum is 10000, after which the Telegram server
# will not send any more members.
# Defaults to no local limit (-> limited to 10000 by server)
max_initial_member_sync: -1
# -1 means no limit (which means it's limited to 10000 by the server)
max_initial_member_sync: 100
# Whether or not to sync the member list in channels.
# If no channel admins have logged into the bridge, the bridge won't be able to sync the member
# list regardless of this setting.
@@ -232,6 +232,30 @@ bridge:
# This field will automatically be changed back to false after it,
# except if the config file is not writable.
resend_bridge_info: false
# Settings for backfilling messages from Telegram.
backfill:
# Whether or not the Telegram ghosts of logged in Matrix users should be
# invited to private chats when backfilling history from Telegram. This is
# usually needed to prevent rate limits and to allow timestamp massaging.
invite_own_puppet: true
# Maximum number of messages to backfill without using a takeout.
# The first time a takeout is used, the user has to manually approve it from a different
# device. If initial_limit or missed_limit are higher than this value, the bridge will ask
# the user to accept the takeout after logging in before syncing any chats.
takeout_limit: 100
# Maximum number of messages to backfill initially.
# Set to 0 to disable backfilling when creating portal, or -1 to disable the limit.
#
# N.B. Initial backfill will only start after member sync. Make sure your
# max_initial_member_sync is set to a low enough value so it doesn't take forever.
initial_limit: 0
# Maximum number of messages to backfill if messages were missed while
# the bridge was disconnected.
# Set to 0 to disable backfilling missed messages.
missed_limit: 100
# If using double puppeting, should notifications be disabled
# while the initial backfill is in progress?
disable_notifications: false
# Overrides for base power levels.
initial_power_level_overrides:
+6 -3
View File
@@ -33,6 +33,7 @@ from mautrix.appservice import AppService, IntentAPI
from mautrix.types import (RoomID, RoomAlias, UserID, EventID, EventType, MessageEventContent,
PowerLevelStateEventContent, ContentURI)
from mautrix.util.simple_template import SimpleTemplate
from mautrix.util.simple_lock import SimpleLock
from mautrix.util.logging import TraceLogger
from ..types import TelegramID
@@ -93,7 +94,7 @@ class BasePortal(ABC):
avatar_url: Optional[ContentURI]
encrypted: bool
deleted: bool
backfilling: bool
backfill_lock: SimpleLock
backfill_leave: Optional[Set[IntentAPI]]
log: TraceLogger
@@ -127,7 +128,8 @@ class BasePortal(ABC):
self._main_intent = None
self.deleted = False
self.log = self.base_log.getChild(self.tgid_log if self.tgid else self.mxid)
self.backfilling = False
self.backfill_lock = SimpleLock("Waiting for backfilling to finish before handling %s",
log=self.log, loop=self.loop)
self.backfill_leave = None
self.dedup = PortalDedup(self)
@@ -531,7 +533,8 @@ class BasePortal(ABC):
pass
@abstractmethod
def backfill(self, source: 'AbstractUser') -> Awaitable[None]:
def backfill(self, source: 'AbstractUser', is_initial: bool = False,
limit: Optional[int] = None) -> Awaitable[None]:
pass
@abstractmethod
+39 -26
View File
@@ -219,12 +219,6 @@ class PortalMetadata(BasePortal, ABC):
if changed:
self.save()
await self.update_bridge_info()
puppet = p.Puppet.get_by_custom_mxid(user.mxid)
if puppet:
try:
await puppet.intent.ensure_joined(self.mxid)
except Exception:
self.log.exception("Failed to ensure %s is joined to portal", user.mxid)
if self.sync_matrix_state:
await self.main_intent.get_joined_members(self.mxid)
@@ -392,28 +386,40 @@ class PortalMetadata(BasePortal, ABC):
if not config["bridge.federate_rooms"]:
creation_content["m.federate"] = False
room_id = await self.main_intent.create_room(alias_localpart=alias, preset=preset,
is_direct=direct, invitees=invites or [],
name=self.title, topic=self.about,
initial_state=initial_state,
creation_content=creation_content)
if not room_id:
raise Exception(f"Failed to create room")
with self.backfill_lock:
room_id = await self.main_intent.create_room(alias_localpart=alias, preset=preset,
is_direct=direct, invitees=invites or [],
name=self.title, topic=self.about,
initial_state=initial_state,
creation_content=creation_content)
if not room_id:
raise Exception(f"Failed to create room")
if self.encrypted and self.matrix.e2ee and direct:
try:
await self.az.intent.ensure_joined(room_id)
except Exception:
self.log.warning(f"Failed to add bridge bot to new private chat {room_id}")
if self.encrypted and self.matrix.e2ee and direct:
try:
await self.az.intent.ensure_joined(room_id)
except Exception:
self.log.warning(f"Failed to add bridge bot to new private chat {room_id}")
self.mxid = room_id
self.by_mxid[self.mxid] = self
self.save()
await self.az.state_store.set_power_levels(self.mxid, power_levels)
user.register_portal(self)
asyncio.ensure_future(self.update_matrix_room(user, entity, direct, puppet,
levels=power_levels, users=users,
participants=participants), loop=self.loop)
self.mxid = room_id
self.by_mxid[self.mxid] = self
self.save()
await self.az.state_store.set_power_levels(self.mxid, power_levels)
user.register_portal(self)
update_room = self.loop.create_task(self.update_matrix_room(
user, entity, direct, puppet,
levels=power_levels, users=users, participants=participants))
if config["bridge.backfill.initial_limit"] > 0:
self.log.debug("Initial backfill is enabled. Waiting for room members to sync "
"and then starting backfill")
await update_room
try:
await self.backfill(user, is_initial=True)
except Exception:
self.log.exception("Failed to backfill new portal")
return self.mxid
@@ -545,6 +551,13 @@ class PortalMetadata(BasePortal, ABC):
if user:
await self.invite_to_matrix(user.mxid)
puppet = p.Puppet.get_by_custom_mxid(user.mxid)
if puppet:
try:
await puppet.intent.ensure_joined(self.mxid)
except Exception:
self.log.exception("Failed to ensure %s is joined to portal", user.mxid)
# We can't trust the member list if any of the following cases is true:
# * There are close to 10 000 users, because Telegram might not be sending all members.
# * The member sync count is limited, because then we might ignore some members.
+75 -24
View File
@@ -38,12 +38,13 @@ from telethon.tl.types import (
from mautrix.appservice import IntentAPI
from mautrix.types import (EventID, UserID, ImageInfo, ThumbnailInfo, RelatesTo, MessageType,
EventType, MediaMessageEventContent, TextMessageEventContent,
LocationMessageEventContent, Format, MessageEventContent)
LocationMessageEventContent, Format)
from ..types import TelegramID
from ..db import Message as DBMessage, TelegramFile as DBTelegramFile
from ..util import sane_mimetypes
from ..context import Context
from ..tgclient import TelegramClient
from .. import puppet as p, user as u, formatter, util
from .base import BasePortal
@@ -407,43 +408,92 @@ class PortalTelegram(BasePortal, ABC):
edit_index=prev_edit_msg.edit_index + 1).insert()
DBMessage.update_by_mxid(temporary_identifier, self.mxid, mxid=event_id)
async def backfill(self, source: 'AbstractUser') -> None:
self.log.debug("Backfilling history through %s", source.mxid)
@property
def _takeout_options(self) -> Dict[str, Union[bool, int]]:
return {
"files": True,
"megagroups": self.megagroup,
"chats": self.peer_type == "chat",
"users": self.peer_type == "user",
"channels": (self.peer_type == "channel" and not self.megagroup),
"max_file_size": min(config["bridge.max_document_size"], 2000) * 1024 * 1024
}
async def backfill(self, source: 'AbstractUser', is_initial: bool = False,
limit: Optional[int] = None) -> None:
limit = limit or (config["bridge.backfill.initial_limit"] if is_initial
else config["bridge.backfill.missed_limit"])
if limit == 0:
return
last = DBMessage.find_last(self.mxid, (source.tgid if self.peer_type != "channel"
else self.tgid))
min_id = last.tgid if last else 0
self.backfilling = True
message = (await source.client.get_messages(self.peer, limit=1))[0]
if limit < 0:
limit = None
self.log.debug(f"Backfilling approximately {message.id - min_id} messages "
f"through {source.mxid}")
elif self.peer_type == "channel":
# This is a channel or supergroup, so we'll backfill messages based on the ID.
# There are some cases, such as deleted messages, where this may backfill less
# messages than the limit.
min_id = max(message.id - limit, min_id)
limit = None
self.log.debug(f"Backfilling messages after ID {min_id} (last message: {message.id}) "
f"through {source.mxid}")
else:
# Private chats and normal groups don't have their own message ID namespace,
# which means we'll have to fetch messages a different way.
# The _backfill_messages method will detect min_id=None and not use reverse=True
min_id = None
self.log.debug(f"Backfilling up to {limit} messages through {source.mxid}")
with self.backfill_lock:
await self._backfill(source, min_id, limit)
async def _backfill(self, source: 'AbstractUser', min_id: Optional[int], limit: Optional[int]
) -> None:
self.backfill_leave = set()
if self.peer_type == "user":
if ((self.peer_type == "user" and self.tg_receiver != source.tgid
and config["bridge.backfill.invite_own_puppet"])):
self.log.debug("Adding %s's default puppet to room for backfilling", source.mxid)
sender = p.Puppet.get(source.tgid)
await self.main_intent.invite_user(self.mxid, sender.default_mxid)
await sender.default_mxid_intent.join_room_by_id(self.mxid)
self.backfill_leave.add(sender.default_mxid_intent)
max_file_size = min(config["bridge.max_document_size"], 1500) * 1024 * 1024
self.log.trace("Opening takeout client for %d, message ID %d->", source.tgid, min_id)
count = 0
async with source.client.takeout(files=True, megagroups=self.megagroup,
chats=self.peer_type == "chat",
users=self.peer_type == "user",
channels=(self.peer_type == "channel"
and not self.megagroup),
max_file_size=max_file_size
) as takeout_client:
async for message in takeout_client.iter_messages(await self.get_input_entity(source),
reverse=True, min_id=min_id):
sender = p.Puppet.get(message.sender_id)
# if isinstance(message, MessageService):
# await self.handle_telegram_action(source, sender, message)
await self.handle_telegram_message(source, sender, message)
count += 1
client = source.client
if limit > config["bridge.backfill.takeout_limit"]:
async with client.takeout(**self._takeout_options) as takeout:
count = await self._backfill_messages(source, min_id, limit, takeout)
else:
count = await self._backfill_messages(source, min_id, limit, client)
for intent in self.backfill_leave:
self.log.trace("Leaving room with %s post-backfill", intent.mxid)
await intent.leave_room(self.mxid)
self.backfilling = False
self.backfill_leave = None
self.log.info("Backfilled %d messages through %s", count, source.mxid)
async def _backfill_messages(self, source: 'AbstractUser', min_id: Optional[int],
limit: Optional[int], client: TelegramClient) -> int:
count = 0
entity = await self.get_input_entity(source)
if min_id is not None:
messages = client.iter_messages(entity, reverse=True, min_id=min_id)
async for message in messages:
sender = p.Puppet.get(message.sender_id)
# TODO handle service messages?
await self.handle_telegram_message(source, sender, message)
count += 1
else:
messages = await client.get_messages(entity, limit=limit)
for message in reversed(messages):
sender = p.Puppet.get(message.sender_id)
await self.handle_telegram_message(source, sender, message)
count += 1
return count
async def handle_telegram_message(self, source: 'AbstractUser', sender: p.Puppet,
evt: Message) -> None:
if not self.mxid:
@@ -472,7 +522,7 @@ class PortalTelegram(BasePortal, ABC):
tg_space=tg_space, edit_index=0).insert()
return
if self.backfilling or (self.dedup.pre_db_check and self.peer_type == "channel"):
if self.backfill_lock.locked or (self.dedup.pre_db_check and self.peer_type == "channel"):
msg = DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space)
if msg:
self.log.debug(f"Ignoring message {evt.id} (src {source.tgid}) as it was already"
@@ -496,7 +546,8 @@ class PortalTelegram(BasePortal, ABC):
allowed_media) else None
if sender:
intent = sender.intent_for(self)
if self.backfilling and intent != sender.default_mxid_intent:
if ((self.backfill_lock.locked and intent != sender.default_mxid_intent
and config["bridge.backfill.invite_own_puppet"])):
intent = sender.default_mxid_intent
self.backfill_leave.add(intent)
else:
+1 -1
View File
@@ -331,7 +331,7 @@ class Puppet(CustomPuppetMixin):
def default_puppet_should_leave_room(self, room_id: RoomID) -> bool:
portal: p.Portal = p.Portal.get_by_mxid(room_id)
return portal and not portal.backfilling and portal.peer_type != "user"
return portal and not portal.backfill_lock.locked and portal.peer_type != "user"
# endregion
# region Getters
+5 -5
View File
@@ -1,5 +1,5 @@
# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2019 Tulir Asokan
# Copyright (C) 2020 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@@ -13,7 +13,7 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import (Awaitable, Dict, List, Iterable, NewType, Optional, Tuple, Any, cast,
from typing import (Awaitable, Dict, List, Iterable, NamedTuple, Optional, Tuple, Any, cast,
TYPE_CHECKING)
import logging
import asyncio
@@ -42,7 +42,7 @@ if TYPE_CHECKING:
config: Optional['Config'] = None
SearchResult = NewType('SearchResult', Tuple['pu.Puppet', int])
SearchResult = NamedTuple('SearchResult', puppet='pu.Puppet', similarity=int)
class User(AbstractUser, BaseUser):
@@ -306,7 +306,7 @@ class User(AbstractUser, BaseUser):
for contact in self.contacts:
similarity = contact.similarity(query)
if similarity >= min_similarity:
results.append(SearchResult((contact, similarity)))
results.append(SearchResult(contact, similarity))
results.sort(key=lambda tup: tup[1], reverse=True)
return results[0:max_results]
@@ -318,7 +318,7 @@ class User(AbstractUser, BaseUser):
for user in server_results.users:
puppet = pu.Puppet.get(user.id)
await puppet.update_info(self, user)
results.append(SearchResult((puppet, puppet.similarity(query))))
results.append(SearchResult(puppet, puppet.similarity(query)))
results.sort(key=lambda tup: tup[1], reverse=True)
return results[0:max_results]