Infinite backfill with MSC2716 (#817)

Disabled by default, with non-infinite fallback mode as the default behavior
This commit is contained in:
Tulir Asokan
2022-10-11 16:03:52 +03:00
committed by GitHub
parent 02f21a30a8
commit 9b9a3b452d
12 changed files with 767 additions and 205 deletions
+15 -13
View File
@@ -18,6 +18,7 @@ from __future__ import annotations
from typing import cast
import base64
import codecs
import math
import re
from aiohttp import ClientSession, InvalidURL
@@ -427,6 +428,9 @@ async def backfill(evt: CommandEvent) -> None:
if not evt.is_portal:
await evt.reply("You can only use backfill in portal rooms")
return
elif not evt.config["bridge.backfill.enable"]:
await evt.reply("Backfilling is disabled in the bridge config")
return
try:
limit = int(evt.args[0])
except (ValueError, IndexError):
@@ -435,16 +439,14 @@ async def backfill(evt: CommandEvent) -> None:
if not evt.config["bridge.backfill.normal_groups"] and portal.peer_type == "chat":
await evt.reply("Backfilling normal groups is disabled in the bridge config")
return
try:
await portal.backfill(evt.sender, limit=limit)
except TakeoutInitDelayError:
msg = (
"Please accept the data export request from a mobile device, "
"then re-run the backfill command."
)
if portal.peer_type == "user":
from mautrix.appservice import IntentAPI
await portal.main_intent.send_notice(evt.room_id, msg)
else:
await evt.reply(msg)
if portal.backfill_msc2716:
messages_per_batch = evt.config["bridge.backfill.incremental.messages_per_batch"]
batches = math.ceil(limit / messages_per_batch)
rounded = ""
if batches * messages_per_batch != limit:
rounded = f" (rounded message limit to {batches}*{messages_per_batch})"
await portal.enqueue_backfill(evt.sender, priority=0, max_batches=batches)
await evt.reply(f"Backfill queued{rounded}")
else:
output = await portal.forward_backfill(evt.sender, initial=False, override_limit=limit)
await evt.reply(output)
+11 -5
View File
@@ -158,12 +158,18 @@ class Config(BaseBridgeConfig):
copy("bridge.bridge_matrix_leave")
copy("bridge.kick_on_logout")
copy("bridge.always_read_joined_telegram_notice")
copy("bridge.backfill.invite_own_puppet")
copy("bridge.backfill.takeout_limit")
copy("bridge.backfill.initial_limit")
copy("bridge.backfill.missed_limit")
copy("bridge.backfill.disable_notifications")
copy("bridge.backfill.enable")
copy("bridge.backfill.msc2716")
copy("bridge.backfill.double_puppet_backfill")
copy("bridge.backfill.normal_groups")
copy("bridge.backfill.forward.initial_limit")
copy("bridge.backfill.forward.sync_limit")
copy("bridge.backfill.incremental.messages_per_batch")
copy("bridge.backfill.incremental.post_batch_delay")
copy("bridge.backfill.incremental.max_batches.user")
copy("bridge.backfill.incremental.max_batches.normal_group")
copy("bridge.backfill.incremental.max_batches.supergroup")
copy("bridge.backfill.incremental.max_batches.channel")
copy("bridge.initial_power_level_overrides.group")
copy("bridge.initial_power_level_overrides.user")
+3
View File
@@ -15,6 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from mautrix.util.async_db import Database
from .backfill_queue import Backfill
from .bot_chat import BotChat
from .disappearing_message import DisappearingMessage
from .message import Message
@@ -38,6 +39,7 @@ def init(db: Database) -> None:
BotChat,
PgSession,
DisappearingMessage,
Backfill,
):
table.db = db
@@ -54,4 +56,5 @@ __all__ = [
"BotChat",
"PgSession",
"DisappearingMessage",
"Backfill",
]
+175
View File
@@ -0,0 +1,175 @@
# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2022 Tulir Asokan, Sumner Evans
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from typing import TYPE_CHECKING, ClassVar
from datetime import datetime, timedelta
from asyncpg import Record
from attr import dataclass
from mautrix.types import RoomID, UserID
from mautrix.util.async_db import Database
fake_db = Database.create("") if TYPE_CHECKING else None
@dataclass
class Backfill:
db: ClassVar[Database] = fake_db
queue_id: int | None
user_mxid: UserID
priority: int
portal_tgid: int
portal_tg_receiver: int
messages_per_batch: int
post_batch_delay: int
max_batches: int
dispatch_time: datetime | None
completed_at: datetime | None
cooldown_timeout: datetime | None
@staticmethod
def new(
user_mxid: UserID,
priority: int,
portal_tgid: int,
portal_tg_receiver: int,
messages_per_batch: int,
post_batch_delay: int = 0,
max_batches: int = -1,
) -> "Backfill":
return Backfill(
queue_id=None,
user_mxid=user_mxid,
priority=priority,
portal_tgid=portal_tgid,
portal_tg_receiver=portal_tg_receiver,
messages_per_batch=messages_per_batch,
post_batch_delay=post_batch_delay,
max_batches=max_batches,
dispatch_time=None,
completed_at=None,
cooldown_timeout=None,
)
@classmethod
def _from_row(cls, row: Record | None) -> Backfill | None:
if row is None:
return None
return cls(**row)
columns = [
"user_mxid",
"priority",
"portal_tgid",
"portal_tg_receiver",
"messages_per_batch",
"post_batch_delay",
"max_batches",
"dispatch_time",
"completed_at",
"cooldown_timeout",
]
columns_str = ",".join(columns)
@classmethod
async def get_next(cls, user_mxid: UserID) -> Backfill | None:
q = f"""
SELECT queue_id, {cls.columns_str}
FROM backfill_queue
WHERE user_mxid=$1
AND (
dispatch_time IS NULL
OR (
dispatch_time < $2
AND completed_at IS NULL
)
)
AND (
cooldown_timeout IS NULL
OR cooldown_timeout < current_timestamp
)
ORDER BY priority, queue_id
LIMIT 1
"""
return cls._from_row(
await cls.db.fetchrow(q, user_mxid, datetime.now() - timedelta(minutes=15))
)
@classmethod
async def get(
cls,
user_mxid: UserID,
portal_tgid: int,
portal_tg_receiver: int,
) -> Backfill | None:
q = f"""
SELECT queue_id, {cls.columns_str}
FROM backfill_queue
WHERE user_mxid=$1
AND portal_tgid=$2
AND portal_tg_receiver=$3
ORDER BY priority, queue_id
LIMIT 1
"""
return cls._from_row(await cls.db.fetchrow(q, user_mxid, portal_tgid, portal_tg_receiver))
@classmethod
async def delete_all(cls, user_mxid: UserID) -> None:
await cls.db.execute("DELETE FROM backfill_queue WHERE user_mxid=$1", user_mxid)
@classmethod
async def delete_for_portal(cls, tgid: int, tg_receiver: int) -> None:
q = "DELETE FROM backfill_queue WHERE portal_tgid=$1 AND portal_tg_receiver=$2"
await cls.db.execute(q, tgid, tg_receiver)
async def insert(self) -> None:
q = f"""
INSERT INTO backfill_queue ({self.columns_str})
VALUES ({','.join(f'${i+1}' for i in range(len(self.columns)))})
RETURNING queue_id
"""
row = await self.db.fetchrow(
q,
self.user_mxid,
self.priority,
self.portal_tgid,
self.portal_tg_receiver,
self.messages_per_batch,
self.post_batch_delay,
self.max_batches,
self.dispatch_time,
self.completed_at,
self.cooldown_timeout,
)
self.queue_id = row["queue_id"]
async def mark_dispatched(self) -> None:
q = "UPDATE backfill_queue SET dispatch_time=$1 WHERE queue_id=$2"
await self.db.execute(q, datetime.now(), self.queue_id)
async def mark_done(self) -> None:
q = "UPDATE backfill_queue SET completed_at=$1 WHERE queue_id=$2"
await self.db.execute(q, datetime.now(), self.queue_id)
async def set_cooldown_timeout(self, timeout) -> None:
"""
Set the backfill request to cooldown for ``timeout`` seconds.
"""
q = "UPDATE backfill_queue SET cooldown_timeout=$1 WHERE queue_id=$2"
await self.db.execute(q, datetime.now() + timedelta(seconds=timeout), self.queue_id)
+27 -7
View File
@@ -19,6 +19,7 @@ from typing import TYPE_CHECKING, ClassVar
from asyncpg import Record
from attr import dataclass
import attr
from mautrix.types import EventID, RoomID, UserID
from mautrix.util.async_db import Database, Scheme
@@ -122,6 +123,14 @@ class Message:
)
return cls._from_row(await cls.db.fetchrow(q, mx_room, tg_space))
@classmethod
async def find_first(cls, mx_room: RoomID, tg_space: TelegramID) -> Message | None:
q = (
f"SELECT {cls.columns} FROM message WHERE mx_room=$1 AND tg_space=$2 "
f"ORDER BY tgid ASC LIMIT 1"
)
return cls._from_row(await cls.db.fetchrow(q, mx_room, tg_space))
@classmethod
async def delete_all(cls, mx_room: RoomID) -> None:
await cls.db.execute("DELETE FROM message WHERE mx_room=$1", mx_room)
@@ -173,6 +182,23 @@ class Message:
q = "DELETE FROM message WHERE mxid=$1 AND mx_room=$2"
await cls.db.execute(q, temp_mxid, mx_room)
@classmethod
async def bulk_insert(cls, messages: list[Message]) -> None:
columns = cls.columns.split(", ")
records = [attr.astuple(message) for message in messages]
async with cls.db.acquire() as conn, conn.transaction():
if cls.db.scheme == Scheme.POSTGRES:
await conn.copy_records_to_table("message", records=records, columns=columns)
else:
await conn.executemany(cls._insert_query, records)
_insert_query: ClassVar[
str
] = """
INSERT INTO message (mxid, mx_room, tgid, tg_space, edit_index, redacted, content_hash, sender_mxid, sender)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
"""
@property
def _values(self):
return (
@@ -188,13 +214,7 @@ class Message:
)
async def insert(self) -> None:
q = """
INSERT INTO message (
mxid, mx_room, tgid, tg_space, edit_index, redacted, content_hash,
sender_mxid, sender
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
"""
await self.db.execute(q, *self._values)
await self.db.execute(self._insert_query, *self._values)
async def delete(self) -> None:
q = "DELETE FROM message WHERE mxid=$1 AND mx_room=$2 AND tg_space=$3"
+37 -22
View File
@@ -337,34 +337,49 @@ bridge:
create_group_on_invite: true
# Settings for backfilling messages from Telegram.
backfill:
# Whether or not the Telegram ghosts of logged in Matrix users should be
# invited to private chats when backfilling history from Telegram. This is
# usually needed to prevent rate limits and to allow timestamp massaging.
invite_own_puppet: true
# Maximum number of messages to backfill without using a takeout.
# The first time a takeout is used, the user has to manually approve it from a different
# device. If initial_limit or missed_limit are higher than this value, the bridge will ask
# the user to accept the takeout after logging in before syncing any chats.
takeout_limit: 100
# Maximum number of messages to backfill initially.
# Set to 0 to disable backfilling when creating portal, or -1 to disable the limit.
# Allow backfilling at all?
enable: true
# Use MSC2716 for backfilling?
#
# N.B. Initial backfill will only start after member sync. Make sure your
# max_initial_member_sync is set to a low enough value so it doesn't take forever.
initial_limit: 0
# Maximum number of messages to backfill if messages were missed while the bridge was
# disconnected. Note that this only works for logged in users and only if the chat isn't
# older than sync_update_limit
# Set to 0 to disable backfilling missed messages.
missed_limit: 50
# If using double puppeting, should notifications be disabled
# while the initial backfill is in progress?
disable_notifications: false
# This requires a server with MSC2716 support, which is currently an experimental feature in Synapse.
# It can be enabled by setting experimental_features -> msc2716_enabled to true in homeserver.yaml.
msc2716: false
# Use double puppets for backfilling?
#
# If using MSC2716, the double puppets must be in the appservice's user ID namespace
# (because the bridge can't use the double puppet access token with batch sending).
double_puppet_backfill: false
# Whether or not to enable backfilling in normal groups.
# Normal groups have numerous technical problems in Telegram, and backfilling normal groups
# will likely cause problems if there are multiple Matrix users in the group.
normal_groups: false
# Forward backfilling limits. These apply to both MSC2716 and legacy backfill.
forward:
# Number of messages to backfill immediately after creating a portal.
initial_limit: 10
# Number of messages to backfill when syncing chats.
sync_limit: 100
# Settings for incremental backfill of history. These only apply when using MSC2716.
incremental:
# Maximum number of messages to backfill per batch.
messages_per_batch: 100
# The number of seconds to wait after backfilling the batch of messages.
post_batch_delay: 20
# The maximum number of batches to backfill per portal, split by the chat type.
# If set to -1, all messages in the chat will eventually be backfilled.
max_batches:
# Direct chats
user: -1
# Normal groups. Note that the normal_groups option above must be enabled
# for these to be backfilled.
normal_group: -1
# Supergroups
supergroup: 10
# Broadcast channels
channel: -1
# Overrides for base power levels.
initial_power_level_overrides:
user: {}
+16 -3
View File
@@ -16,15 +16,14 @@
from __future__ import annotations
from typing import TYPE_CHECKING
import sys
from mautrix.bridge import BaseMatrixHandler
from mautrix.errors import MatrixError
from mautrix.types import (
Event,
EventID,
EventType,
MemberStateEventContent,
MessageType,
PresenceEvent,
PresenceState,
ReactionEvent,
@@ -36,7 +35,6 @@ from mautrix.types import (
RoomTopicStateEventContent as TopicContent,
SingleReceiptEventContent,
StateEvent,
TextMessageEventContent,
TypingEvent,
UserID,
)
@@ -63,6 +61,21 @@ class MatrixHandler(BaseMatrixHandler):
self._previously_typing = {}
async def check_versions(self) -> None:
await super().check_versions()
if self.config["bridge.backfill.msc2716"] and not (
support := self.versions.supports("org.matrix.msc2716")
):
self.log.fatal(
"Backfilling with MSC2716 is enabled in bridge config, but "
+ (
"batch sending is not enabled on homeserver"
if support is False
else "homeserver does not support batch sending"
)
)
sys.exit(18)
async def handle_puppet_group_invite(
self,
room_id: RoomID,
+354 -148
View File
@@ -132,12 +132,15 @@ from telethon.tl.types import (
UserProfilePhotoEmpty,
)
from telethon.utils import encode_waveform
import attr
from mautrix.appservice import DOUBLE_PUPPET_SOURCE_KEY, IntentAPI
from mautrix.bridge import BasePortal, NotificationDisabler, RejectMatrixInvite, async_getter_lock
from mautrix.errors import IntentError, MatrixRequestError, MForbidden
from mautrix.types import (
BatchID,
BatchSendEvent,
BatchSendStateEvent,
BeeperMessageStatusEventContent,
ContentURI,
EventID,
@@ -148,6 +151,7 @@ from mautrix.types import (
LocationMessageEventContent,
MediaMessageEventContent,
Membership,
MemberStateEventContent,
MessageEventContent,
MessageStatus,
MessageStatusReason,
@@ -182,6 +186,7 @@ from . import (
)
from .config import Config
from .db import (
Backfill,
DisappearingMessage,
Message as DBMessage,
Portal as DBPortal,
@@ -204,6 +209,7 @@ if TYPE_CHECKING:
StateBridge = EventType.find("m.bridge", EventType.Class.STATE)
StateHalfShotBridge = EventType.find("uk.half-shot.bridge", EventType.Class.STATE)
DummyPortalCreated = EventType.find("fi.mau.dummy.portal_created", EventType.Class.MESSAGE)
StateMarker = EventType.find("org.matrix.msc2716.marker", EventType.Class.STATE)
InviteList = Union[UserID, List[UserID]]
UpdateTyping = Union[UpdateUserTyping, UpdateChatUserTyping, UpdateChannelUserTyping]
@@ -250,6 +256,7 @@ class Portal(DBPortal, BasePortal):
backfill_lock: SimpleLock
backfill_method_lock: asyncio.Lock
backfill_leave: set[IntentAPI] | None
backfill_msc2716: bool
alias: RoomAlias | None
@@ -269,6 +276,7 @@ class Portal(DBPortal, BasePortal):
_sponsored_seen: dict[UserID, bool]
_new_messages_after_sponsored: bool
_member_list_cache: dict[EventID, set[UserID]]
_prev_reaction_poll: dict[UserID, float]
_msg_conv: putil.TelegramMessageConverter
@@ -322,6 +330,7 @@ class Portal(DBPortal, BasePortal):
self.log = self.log.getChild(self.tgid_log if self.tgid else self.mxid)
self._main_intent = None
self.deleted = False
self.backfill_lock = SimpleLock(
"Waiting for backfilling to finish before handling %s", log=self.log
)
@@ -341,6 +350,7 @@ class Portal(DBPortal, BasePortal):
self._new_messages_after_sponsored = True
self._bridging_blocked_at_runtime = False
self._member_list_cache = {}
self._prev_reaction_poll = defaultdict(lambda: 0.0)
self._msg_conv = putil.TelegramMessageConverter(self)
@@ -428,6 +438,7 @@ class Portal(DBPortal, BasePortal):
cls.filter_mode = cls.config["bridge.filter.mode"]
cls.filter_list = cls.config["bridge.filter.list"]
cls.hs_domain = cls.config["homeserver.domain"]
cls.backfill_msc2716 = cls.config["bridge.backfill.msc2716"]
cls.alias_template = SimpleTemplate(
cls.config["bridge.alias_template"],
"groupname",
@@ -905,32 +916,30 @@ class Portal(DBPortal, BasePortal):
self.mxid = room_id
self.by_mxid[self.mxid] = self
self.first_event_id = await self.main_intent.send_message_event(
self.mxid, DummyPortalCreated, {}
)
await self.save()
self.log.debug(f"Matrix room created: {self.mxid}")
await self.az.state_store.set_power_levels(self.mxid, power_levels)
await user.register_portal(self)
await self.invite_to_matrix(invites)
await self.update_matrix_room(user, entity, puppet, levels=power_levels, users=users)
update_room = asyncio.create_task(
self.update_matrix_room(user, entity, puppet, levels=power_levels, users=users)
self.first_event_id = await self.main_intent.send_message_event(
self.mxid, DummyPortalCreated, {}
)
if self.config["bridge.backfill.initial_limit"] > 0:
self.log.debug(
"Initial backfill is enabled. Waiting for room members to sync "
"and then starting backfill"
if not self.bridge.homeserver_software.is_hungry:
self._member_list_cache[self.first_event_id] = set(
(await self.main_intent.get_joined_members(self.mxid)).keys()
)
await update_room
await self.save()
if isinstance(user, u.User) or not self.backfill_msc2716:
try:
if isinstance(user, u.User):
await self.backfill(user, is_initial=True)
await self.forward_backfill(user, initial=True)
except Exception:
self.log.exception("Failed to backfill new portal")
self.log.exception("Error in initial backfill")
if self.backfill_msc2716:
await self.enqueue_backfill(user, priority=50)
return self.mxid
@@ -2595,142 +2604,188 @@ class Portal(DBPortal, BasePortal):
await DBMessage.replace_temp_mxid(temporary_identifier, self.mxid, event_id)
@property
def _takeout_options(self) -> dict[str, bool | int]:
return {
"files": True,
"megagroups": self.megagroup,
"chats": self.peer_type == "chat",
"users": self.peer_type == "user",
"channels": (self.peer_type == "channel" and not self.megagroup),
"max_file_size": min(self.matrix.media_config.upload_size, 2000 * 1024 * 1024),
}
def _default_max_batches(self) -> int:
if self.peer_type == "user":
own_type = "user"
elif self.peer_type == "chat":
own_type = "normal_group"
elif self.megagroup:
own_type = "supergroup"
else:
own_type = "channel"
return self.config[f"bridge.backfill.incremental.max_batches.{own_type}"]
async def enqueue_backfill(
self,
source: u.User,
priority: int,
max_batches: int | None = None,
messages_per_batch: int | None = None,
) -> None:
# TODO check that there are no queued backfills
# if not await Backfill.get(source.mxid, self.tgid, self.tg_receiver):
await Backfill.new(
user_mxid=source.mxid,
priority=priority,
portal_tgid=self.tgid,
portal_tg_receiver=self.tg_receiver,
messages_per_batch=(
messages_per_batch or self.config["bridge.backfill.incremental.messages_per_batch"]
),
post_batch_delay=self.config["bridge.backfill.incremental.post_batch_delay"],
max_batches=max_batches or self._default_max_batches,
).insert()
source.wakeup_backfill_task.set()
async def forward_backfill(
self,
source: u.User,
initial: bool,
last_tgid: int | None = None,
override_limit: int | None = None,
) -> str:
type = "initial" if initial else "sync"
limit = override_limit or self.config[f"bridge.backfill.forward.{type}_limit"]
with self.backfill_lock:
output = await self.backfill(
source, source.client, forward=True, forward_limit=limit, last_tgid=last_tgid
)
self.log.debug(f"Forward backfill complete, status: {output}")
return output
async def backfill(
self,
source: u.User,
is_initial: bool = False,
limit: int | None = None,
last_id: int | None = None,
) -> None:
client: MautrixTelegramClient,
req: Backfill | None = None,
forward: bool = False,
forward_limit: int | None = None,
last_tgid: int | None = None,
) -> str:
async with self.backfill_method_lock:
await self._locked_backfill(source, is_initial, limit, last_id)
return await self._locked_backfill(
source, client, req, forward, forward_limit, last_tgid
)
async def _locked_backfill(
self,
source: u.User,
is_initial: bool = False,
limit: int | None = None,
client: MautrixTelegramClient,
req: Backfill | None = None,
forward: bool = False,
forward_limit: int | None = None,
last_tgid: int | None = None,
) -> None:
limit = limit or (
self.config["bridge.backfill.initial_limit"]
if is_initial
else self.config["bridge.backfill.missed_limit"]
)
if limit == 0:
return
) -> str:
assert forward != bool(req)
if not self.config["bridge.backfill.normal_groups"] and self.peer_type == "chat":
return
last_in_room = await DBMessage.find_last(
self.mxid, (source.tgid if self.peer_type != "channel" else self.tgid)
)
min_id = last_in_room.tgid if last_in_room else 0
if last_tgid is None:
messages = await source.client.get_messages(self.peer, limit=1)
if not messages:
# The chat seems empty
return
last_tgid = messages[0].id
if last_tgid <= min_id or (last_tgid == 1 and self.peer_type == "channel"):
# Nothing to backfill
return
if limit < 0:
limit = last_tgid - min_id
limit_type = "unlimited"
elif self.peer_type == "channel":
min_id = max(last_tgid - limit, min_id)
# This is now just an approximate message count, not the actual limit.
limit = last_tgid - min_id
limit_type = "channel"
else:
# This limit will be higher than the actual message count if there are any messages
# in other DMs or normal groups, but that's not too bad.
limit = min(last_tgid - min_id, limit)
limit_type = "dm/minigroup"
self.log.debug(
f"Backfilling up to {limit} messages after ID {min_id} through {source.mxid} "
f"(last message: {last_tgid}, limit type: {limit_type})"
)
with self.backfill_lock:
await self._backfill(source, min_id, limit)
async def _backfill(self, source: u.User, min_id: int, limit: int) -> None:
self.backfill_leave = set()
if (
self.peer_type == "user"
and self.tgid != source.tgid
and self.config["bridge.backfill.invite_own_puppet"]
):
self.log.debug("Adding %s's default puppet to room for backfilling", source.mxid)
sender = await p.Puppet.get_by_tgid(source.tgid)
await self.main_intent.invite_user(self.mxid, sender.default_mxid)
await sender.default_mxid_intent.join_room_by_id(self.mxid)
self.backfill_leave.add(sender.default_mxid_intent)
client = source.client
async with NotificationDisabler(self.mxid, source):
if limit > self.config["bridge.backfill.takeout_limit"]:
self.log.debug(f"Opening takeout client for {source.tgid}")
async with client.takeout(**self._takeout_options) as takeout:
count, handled = await self._backfill_messages(source, min_id, limit, takeout)
return "Backfilling normal groups is disabled in the bridge config"
tg_space = source.tgid if self.peer_type != "channel" else self.tgid
prev_event_id = self.first_event_id
if forward:
last_in_room = await DBMessage.find_last(self.mxid, tg_space)
if last_in_room:
prev_event_id = last_in_room.mxid
min_id = last_in_room.tgid
else:
count, handled = await self._backfill_messages(source, min_id, limit, client)
for intent in self.backfill_leave:
self.log.trace("Leaving room with %s post-backfill", intent.mxid)
await intent.leave_room(self.mxid)
self.backfill_leave = None
self.log.info(
"Backfilled %d (of %d fetched) messages through %s", handled, count, source.mxid
)
async def _backfill_messages(
self, source: u.User, min_id: int, limit: int, client: MautrixTelegramClient
) -> tuple[int, int]:
count = handled_count = 0
entity = await self.get_input_entity(source)
if self.peer_type == "channel":
# This is a channel or supergroup, so we'll backfill messages based on the ID.
# There are some cases, such as deleted messages, where this may backfill less
# messages than the limit.
self.log.debug(f"Iterating all messages starting with {min_id} (approx: {limit})")
messages = client.iter_messages(entity, reverse=True, min_id=min_id)
async for message in messages:
count += 1
was_handled = await self._handle_telegram_backfill_message(source, message)
handled_count += 1 if was_handled else 0
else:
# Private chats and normal groups don't have their own message ID namespace,
# which means we'll have to fetch messages a different way.
min_id = 0
if last_tgid is None:
messages = await source.client.get_messages(self.peer, limit=1)
if not messages:
return "Chat is empty, nothing to backfill"
last_tgid = messages[0].id
if last_tgid <= min_id or (last_tgid == 1 and self.peer_type == "channel"):
return (
f"Last bridged message {min_id} is equal to or greater than last message "
f"in Telegram chat {last_tgid}, nothing to backfill"
)
limit = last_tgid - min_id
if (forward_limit or 0) > 0:
limit = min(limit, forward_limit)
self.log.debug(
f"Fetching up to {limit} most recent messages, ignoring anything before {min_id}"
f"Backfilling up to {limit} messages after ID {min_id} through {source.mxid} "
f"(last message: {last_tgid})"
)
messages = await client.get_messages(entity, min_id=min_id, limit=limit)
for message in reversed(messages):
count += 1
if message.id <= min_id:
self.log.trace(
f"Skipping {message.id} in backfill response as it's lower than "
f"the last bridged message ({min_id})"
)
continue
was_handled = await self._handle_telegram_backfill_message(source, message)
handled_count += 1 if was_handled else 0
return count, handled_count
anchor_id = min_id
else:
limit = req.messages_per_batch
first_in_room = await DBMessage.find_first(self.mxid, tg_space)
anchor_id = first_in_room.tgid if first_in_room else None
self.log.debug(
f"Backfilling up to {req.messages_per_batch} historical messages "
f"before {anchor_id} through {source.mxid}"
)
insertion_id, message_count, first_id = await self._backfill_messages(
source, client, forward, anchor_id, limit, prev_event_id
)
if prev_event_id == self.first_event_id:
if insertion_id and not self.base_insertion_id:
self.base_insertion_id = insertion_id
elif not insertion_id:
insertion_id = self.base_insertion_id
await self.save()
# TODO this should probably check actual event count instead of message count
if message_count > 0 and self.backfill_msc2716:
await self.main_intent.send_state_event(
self.mxid,
StateMarker,
{
"org.matrix.msc2716.marker.insertion": insertion_id,
"com.beeper.timestamp": int(time.time() * 1000),
},
state_key=insertion_id,
)
if forward:
self.log.debug(f"Forward backfill finished with {message_count} messages")
elif message_count > 0 and first_id and first_id > 1:
if req.max_batches in (0, 1):
return "Already backfilled enough batches, not enqueuing more"
self.log.debug(f"Enqueuing more backfill through {source.mxid}")
await self.enqueue_backfill(
source,
priority=100,
messages_per_batch=req.messages_per_batch,
max_batches=-1 if req.max_batches < 0 else (req.max_batches - 1),
)
else:
self.log.debug("No more messages to backfill")
return f"Backfilled {message_count} messages"
async def _handle_telegram_backfill_message(
self, source: au.AbstractUser, msg: Message | MessageService
) -> bool:
def _can_double_puppet_backfill(self, custom_mxid: UserID) -> bool:
if not self.backfill_msc2716:
return True
if not self.config["bridge.backfill.double_puppet_backfill"]:
return False
if self.bridge.homeserver_software.is_hungry:
return True
# Batch sending can only use local users, so don't allow double puppets on other servers.
if custom_mxid[custom_mxid.index(":") + 1 :] != self.config["homeserver.domain"]:
return False
return True
async def _get_members_at(self, event_id: EventID) -> set[UserID]:
try:
return self._member_list_cache[event_id]
except KeyError:
pass
# TODO cache the list in db?
self.log.debug(f"Fetching member list at {event_id}")
ctx = await self.main_intent.get_event_context(self.mxid, event_id, limit=0)
members = {
evt.state_key
for evt in ctx.state
if evt.type == EventType.ROOM_MEMBER and evt.content.membership == Membership.JOIN
}
self.log.debug(f"Found {len(members)} members at {event_id}")
self._member_list_cache[event_id] = members
return members
async def _convert_batch_msg(
self,
source: u.User,
msg: Message,
add_member: Callable[[IntentAPI, str, ContentURI], Awaitable[None]],
) -> tuple[putil.ConvertedMessage, IntentAPI]:
if msg.from_id and isinstance(msg.from_id, (PeerUser, PeerChannel)):
sender = await p.Puppet.get_by_peer(msg.from_id)
elif isinstance(msg.peer_id, PeerUser):
@@ -2740,20 +2795,171 @@ class Portal(DBPortal, BasePortal):
sender = await p.Puppet.get_by_peer(msg.peer_id)
else:
sender = None
if isinstance(msg, MessageService):
if isinstance(msg.action, MessageActionContactSignUp):
await self.handle_telegram_joined(source, sender, msg, backfill=True)
return True
else:
self.log.debug(
f"Unhandled service message {type(msg.action).__name__} in backfill"
)
elif isinstance(msg, Message):
await self.handle_telegram_message(source, sender, msg)
return True
if sender:
intent = sender.intent_for(self)
if not sender.displayname:
entity = await source.client.get_entity(sender.peer)
await sender.update_info(source, entity)
else:
self.log.debug(f"Unhandled message type {type(msg).__name__} in backfill")
return False
intent = self.main_intent
if intent.api.is_real_user and not self._can_double_puppet_backfill(intent.mxid):
intent = sender.default_mxid_intent
if sender:
await add_member(intent, sender.displayname, sender.avatar_url)
is_bot = sender.is_bot if sender else False
converted = await self._msg_conv.convert(source, intent, is_bot, msg)
return converted, intent
async def _wrap_batch_msg(
self,
intent: IntentAPI,
msg: Message,
converted: putil.ConvertedMessage,
caption: bool = False,
) -> BatchSendEvent:
if caption:
content = converted.caption
event_type = EventType.ROOM_MESSAGE
else:
content = converted.content
event_type = converted.type
if self.encrypted and self.matrix.e2ee:
event_type, content = await self.matrix.e2ee.encrypt(self.mxid, event_type, content)
return BatchSendEvent(
sender=intent.mxid,
timestamp=int(msg.date.timestamp() * 1000),
content=content,
type=event_type,
)
async def _backfill_messages(
self,
source: u.User,
client: MautrixTelegramClient,
forward: bool,
anchor_id: int,
limit: int,
prev_event_id: EventID,
) -> tuple[EventID | None, int, int]:
entity = await self.get_input_entity(source)
events = []
intents = []
metas = []
state_events = []
do_batch_send = self.backfill_msc2716
added_members = (
await self._get_members_at(prev_event_id)
if not self.bridge.homeserver_software.is_hungry and do_batch_send
else []
)
before_first_msg_timestamp = 0
async def add_member(intent: IntentAPI, displayname: str, avatar_url: ContentURI) -> None:
if self.bridge.homeserver_software.is_hungry or intent.mxid in added_members:
return
added_members.add(intent.mxid)
if not do_batch_send:
# TODO leave these members?
await intent.ensure_joined(self.mxid)
return
invite_event = BatchSendStateEvent(
type=EventType.ROOM_MEMBER,
state_key=intent.mxid,
sender=self.main_intent.mxid,
timestamp=before_first_msg_timestamp,
content=MemberStateEventContent(
membership=Membership.INVITE,
displayname=displayname,
avatar_url=avatar_url,
),
)
join_event = attr.evolve(
invite_event,
content=attr.evolve(invite_event.content, membership=Membership.JOIN),
sender=intent.mxid,
)
state_events.append(invite_event)
state_events.append(join_event)
first_id = anchor_id
message_count = 0
minmax = {"min_id": anchor_id} if forward else {"max_id": anchor_id}
if not forward and not anchor_id:
anchor_id = 2**31 - 1
minmax = {}
self.log.debug(f"Iterating messages through {source.tgid} with {limit=}, {minmax}")
# Iterate messages newest to oldest and collect the results
async for msg in client.iter_messages(entity, limit=limit, **minmax):
message_count += 1
if (forward and msg.id <= anchor_id) or (not forward and msg.id >= anchor_id):
continue
elif isinstance(msg, MessageService):
# TODO some service messages can be backfilled
continue
if not before_first_msg_timestamp:
first_id = msg.id
before_first_msg_timestamp = int(msg.date.timestamp() * 1000) - 1
converted, intent = await self._convert_batch_msg(source, msg, add_member)
events.append(await self._wrap_batch_msg(intent, msg, converted))
intents.append(intent)
metas.append(msg)
if converted.caption:
events.append(await self._wrap_batch_msg(intent, msg, converted, caption=True))
intents.append(intent)
metas.append(None)
if len(events) == 0:
self.log.debug(
f"Didn't get any events to send out of {message_count} messages fetched "
f"(first received ID: {first_id})"
)
return None, message_count, first_id
self.log.debug(
f"Got {len(events)} events to send out of {message_count} messages fetched "
f"(first received ID: {first_id})"
)
if do_batch_send:
resp = await self.main_intent.batch_send(
self.mxid,
prev_event_id,
batch_id=self.next_batch_id if not forward else None,
# We iterated the events in reverse chronological order,
# so reverse them before sending
events=list(reversed(events)),
state_events_at_start=state_events,
beeper_new_messages=forward,
)
if prev_event_id == self.first_event_id and resp.next_batch_id:
self.next_batch_id = resp.next_batch_id
base_insertion_event_id = resp.base_insertion_event_id
event_ids = resp.event_ids
else:
base_insertion_event_id = None
event_ids = [
await intent.send_message_event(
self.mxid, evt.type, evt.content, timestamp=evt.timestamp
)
for evt, intent in zip(reversed(events), reversed(intents))
]
tg_space = source.tgid if self.peer_type != "channel" else self.tgid
await DBMessage.bulk_insert(
[
DBMessage(
mxid=event_id,
mx_room=self.mxid,
tgid=msg.id,
tg_space=tg_space,
edit_index=0,
content_hash=self.dedup.hash_event(msg),
# TODO sender
)
# Original arrays are in reverse chronological order, but event IDs are
# chronological (because we reversed the original messages list before sending)
for event_id, msg in zip(event_ids, reversed(metas))
if msg is not None
]
)
return base_insertion_event_id, message_count, first_id
def _split_dm_reaction_counts(self, counts: list[ReactionCount]) -> list[MessagePeerReaction]:
reactions = []
+1 -1
View File
@@ -399,7 +399,7 @@ class Puppet(DBPuppet, BasePuppet):
async def default_puppet_should_leave_room(self, room_id: RoomID) -> bool:
portal: p.Portal = await p.Portal.get_by_mxid(room_id)
return portal and not portal.backfill_lock.locked and portal.peer_type != "user"
return portal and portal.peer_type != "user"
# endregion
# region Getters
+109 -5
View File
@@ -20,7 +20,12 @@ from datetime import datetime, timezone
import asyncio
import time
from telethon.errors import AuthKeyDuplicatedError, RPCError, UnauthorizedError
from telethon.errors import (
AuthKeyDuplicatedError,
RPCError,
TakeoutInitDelayError,
UnauthorizedError,
)
from telethon.tl.custom import Dialog
from telethon.tl.functions.account import UpdateStatusRequest
from telethon.tl.functions.contacts import GetContactsRequest, SearchRequest
@@ -57,7 +62,8 @@ from mautrix.util.opt_prometheus import Gauge
from . import portal as po, puppet as pu, util
from .abstract_user import AbstractUser
from .db import Message as DBMessage, PgSession, User as DBUser
from .db import Backfill, Message as DBMessage, PgSession, User as DBUser
from .tgclient import MautrixTelegramClient
from .types import TelegramID
if TYPE_CHECKING:
@@ -86,7 +92,11 @@ class User(DBUser, AbstractUser, BaseUser):
_ensure_started_lock: asyncio.Lock
_track_connection_task: asyncio.Task | None
_backfill_task: asyncio.Task | None
wakeup_backfill_task: asyncio.Event
_is_backfilling: bool
takeout_retry_immediate: asyncio.Event
takeout_requested: bool
_available_emoji_reactions: set[str] | None
_available_emoji_reactions_hash: int | None
@@ -119,6 +129,12 @@ class User(DBUser, AbstractUser, BaseUser):
self._track_connection_task = None
self._is_backfilling = False
self._portals_cache = None
self._backfill_task = None
self.wakeup_backfill_task = asyncio.Event()
self.takeout_retry_immediate = asyncio.Event()
self.takeout_requested = False
self._available_emoji_reactions = None
self._available_emoji_reactions_hash = None
self._available_emoji_reactions_fetched = 0
@@ -248,6 +264,14 @@ class User(DBUser, AbstractUser, BaseUser):
self.client and self.client._sender and self.client._sender._transport_connected()
)
@property
def _bridge_state_info(self) -> dict[str, Any]:
if self.takeout_requested:
return {
"takeout_requested": True,
}
return {}
async def _track_connection(self) -> None:
self.log.debug("Starting loop to track connection state")
while True:
@@ -260,6 +284,7 @@ class User(DBUser, AbstractUser, BaseUser):
if self._is_backfilling
else BridgeStateEvent.CONNECTED,
ttl=3600,
info=self._bridge_state_info,
)
else:
await self.push_bridge_state(
@@ -284,7 +309,7 @@ class User(DBUser, AbstractUser, BaseUser):
else:
state_event = BridgeStateEvent.UNKNOWN_ERROR
ttl = 240
return [BridgeState(state_event=state_event, ttl=ttl)]
return [BridgeState(state_event=state_event, ttl=ttl, info=self._bridge_state_info)]
async def get_puppet(self) -> pu.Puppet | None:
if not self.tgid:
@@ -302,6 +327,9 @@ class User(DBUser, AbstractUser, BaseUser):
if self._track_connection_task:
self._track_connection_task.cancel()
self._track_connection_task = None
if self._backfill_task:
self._backfill_task.cancel()
self._backfill_task = None
await super().stop()
self._track_metric(METRIC_CONNECTED, False)
@@ -318,6 +346,8 @@ class User(DBUser, AbstractUser, BaseUser):
return
self._track_metric(METRIC_LOGGED_IN, True)
if not self._backfill_task or self._backfill_task.done():
self._backfill_task = asyncio.create_task(self._handle_backfill_requests_loop())
try:
puppet = await pu.Puppet.get_by_tgid(self.tgid)
@@ -327,7 +357,7 @@ class User(DBUser, AbstractUser, BaseUser):
except Exception:
self.log.exception("Failed to automatically enable custom puppet")
if not self.is_bot and self.config["bridge.startup_sync"]:
if not self.is_bot and (self.config["bridge.startup_sync"] or first_login):
try:
self._is_backfilling = True
await self.sync_dialogs()
@@ -337,6 +367,80 @@ class User(DBUser, AbstractUser, BaseUser):
finally:
self._is_backfilling = False
@property
def _takeout_options(self) -> dict[str, bool | int]:
return {
"users": True,
"chats": self.config["bridge.backfill.normal_groups"],
"megagroups": True,
"channels": True,
"files": True,
"max_file_size": min(self.bridge.matrix.media_config.upload_size, 2000 * 1024 * 1024),
}
async def _handle_backfill_requests_loop(self) -> None:
while True:
req = await Backfill.get_next(self.mxid)
if not req:
await self.wakeup_backfill_task.wait()
self.wakeup_backfill_task.clear()
else:
await self._takeout_and_backfill(req)
async def _takeout_and_backfill(self, first_req: Backfill, first_attempt: bool = True) -> None:
self.takeout_retry_immediate.clear()
self.takeout_requested = True
try:
async with self.client.takeout(**self._takeout_options) as takeout_client:
self.takeout_requested = False
self.log.info("Acquired takeout client successfully")
await self._backfill_loop_with_client(takeout_client, first_req)
self.log.info("Backfills finished, exiting takeout")
except TakeoutInitDelayError as e:
if first_attempt:
self.log.info(
f"Takeout requested, will wait for retry request or {e.seconds} seconds"
)
else:
self.log.warning(
f"Got takeout init delay again after retry, waiting for {e.seconds} seconds"
)
try:
await asyncio.wait_for(self.takeout_retry_immediate.wait(), timeout=e.seconds)
self.log.info("Retrying takeout")
except asyncio.TimeoutError:
self.log.info("Takeout timeout expired")
await self._takeout_and_backfill(first_req, first_attempt=False)
async def _backfill_loop_with_client(
self, client: MautrixTelegramClient, first_req: Backfill
) -> None:
missed_reqs = 0
while missed_reqs < 20:
req = first_req or await Backfill.get_next(self.mxid)
first_req = None
if not req:
missed_reqs += 1
try:
await asyncio.wait_for(self.wakeup_backfill_task.wait(), timeout=30)
except asyncio.TimeoutError:
pass
self.wakeup_backfill_task.clear()
continue
missed_reqs = 0
self.log.info("Backfill request %s", req)
try:
portal = await po.Portal.get_by_tgid(
TelegramID(req.portal_tgid), tg_receiver=TelegramID(req.portal_tg_receiver)
)
await req.mark_dispatched()
await portal.backfill(self, client, req=req)
await req.mark_done()
await asyncio.sleep(req.post_batch_delay)
except Exception:
self.log.exception("Error handling backfill request for %s", req.portal_tgid)
await req.set_cooldown_timeout(10)
async def update(self, update: TypeUpdate) -> bool:
if not self.is_bot:
return False
@@ -573,7 +677,7 @@ class User(DBUser, AbstractUser, BaseUser):
was_created = False
if portal.mxid:
try:
await portal.backfill(self, last_id=dialog.message.id)
await portal.forward_backfill(self, initial=False, last_tgid=dialog.message.id)
except Exception:
self.log.exception(f"Error while backfilling {portal.tgid_log}")
try:
@@ -71,6 +71,8 @@ class ProvisioningAPI(AuthAPI):
)
self.app.router.add_route("POST", f"{user_prefix}/pm/{{identifier}}", self.start_dm)
self.app.router.add_route("POST", f"{user_prefix}/retry_takeout", self.retry_takeout)
self.app.router.add_route("POST", f"{user_prefix}/logout", self.logout)
self.app.router.add_route("POST", f"{user_prefix}/login/bot_token", self.send_bot_token)
self.app.router.add_route("POST", f"{user_prefix}/login/request_code", self.request_code)
@@ -494,6 +496,22 @@ class ProvisioningAPI(AuthAPI):
status=201 if just_created else 200,
)
async def retry_takeout(self, request: web.Request) -> web.Response:
data, user, err = await self.get_user_request_info(
request, expect_logged_in=True, want_data=False
)
if err is not None:
return err
if not user.takeout_requested:
return web.json_response(
{
"error": "There was no takeout requested",
},
status=400,
)
user.takeout_retry_immediate.set()
return web.json_response({}, status=200)
async def send_bot_token(self, request: web.Request) -> web.Response:
data, user, err = await self.get_user_request_info(request)
if err is not None:
+1 -1
View File
@@ -3,7 +3,7 @@ python-magic>=0.4,<0.5
commonmark>=0.8,<0.10
aiohttp>=3,<4
yarl>=1,<2
mautrix>=0.18.2,<0.19
mautrix>=0.18.3,<0.19
#telethon>=1.24,<1.25
tulir-telethon==1.26.0a6
asyncpg>=0.20,<0.27