From 2cf7fc70590044d596e6c7ee7d4cfdccdfe61e6a Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Mon, 28 Feb 2022 12:26:24 +0200 Subject: [PATCH] Improve backfilling to fetch less redundant messages --- mautrix_telegram/portal.py | 78 +++++++++++++++++++++++--------------- 1 file changed, 47 insertions(+), 31 deletions(-) diff --git a/mautrix_telegram/portal.py b/mautrix_telegram/portal.py index d6f3713e..bd05eb98 100644 --- a/mautrix_telegram/portal.py +++ b/mautrix_telegram/portal.py @@ -2789,7 +2789,7 @@ class Portal(DBPortal, BasePortal): source: u.User, is_initial: bool = False, limit: int | None = None, - last_id: int | None = None, + last_tgid: int | None = None, ) -> None: limit = limit or ( self.config["bridge.backfill.initial_limit"] @@ -2800,43 +2800,39 @@ class Portal(DBPortal, BasePortal): return if not self.config["bridge.backfill.normal_groups"] and self.peer_type == "chat": return - last = await DBMessage.find_last( + last_in_room = await DBMessage.find_last( self.mxid, (source.tgid if self.peer_type != "channel" else self.tgid) ) - min_id = last.tgid if last else 0 - if last_id is None: + min_id = last_in_room.tgid if last_in_room else 0 + if last_tgid is None: messages = await source.client.get_messages(self.peer, limit=1) if not messages: # The chat seems empty return - last_id = messages[0].id - if last_id <= min_id: + last_tgid = messages[0].id + if last_tgid <= min_id or (last_tgid == 1 and self.peer_type == "channel"): # Nothing to backfill return if limit < 0: - limit = last_id - min_id + limit = last_tgid - min_id self.log.debug( - f"Backfilling approximately {last_id - min_id} messages through {source.mxid}" + f"Backfilling approximately {last_tgid - min_id} messages through {source.mxid}" ) elif self.peer_type == "channel": - # This is a channel or supergroup, so we'll backfill messages based on the ID. - # There are some cases, such as deleted messages, where this may backfill less - # messages than the limit. - min_id = max(last_id - limit, min_id) + min_id = max(last_tgid - limit, min_id) self.log.debug( - f"Backfilling messages after ID {min_id} (last message: {last_id}) " + f"Backfilling messages after ID {min_id} (last message: {last_tgid}) " f"through {source.mxid}" ) else: - # Private chats and normal groups don't have their own message ID namespace, - # which means we'll have to fetch messages a different way. - # The _backfill_messages method will detect min_id=None and not use reverse=True - min_id = None + # This limit will be higher than the actual message count if there are any messages + # in other DMs or normal groups, but that's not too bad. + limit = min(last_tgid - min_id, limit) self.log.debug(f"Backfilling up to {limit} messages through {source.mxid}") with self.backfill_lock: await self._backfill(source, min_id, limit) - async def _backfill(self, source: u.User, min_id: int | None, limit: int) -> None: + async def _backfill(self, source: u.User, min_id: int, limit: int) -> None: self.backfill_leave = set() if ( self.peer_type == "user" @@ -2854,38 +2850,55 @@ class Portal(DBPortal, BasePortal): if limit > self.config["bridge.backfill.takeout_limit"]: self.log.debug(f"Opening takeout client for {source.tgid}") async with client.takeout(**self._takeout_options) as takeout: - count = await self._backfill_messages(source, min_id, limit, takeout) + count, handled = await self._backfill_messages(source, min_id, limit, takeout) else: - count = await self._backfill_messages(source, min_id, limit, client) + count, handled = await self._backfill_messages(source, min_id, limit, client) for intent in self.backfill_leave: self.log.trace("Leaving room with %s post-backfill", intent.mxid) await intent.leave_room(self.mxid) self.backfill_leave = None - self.log.info("Backfilled %d messages through %s", count, source.mxid) + self.log.info( + "Backfilled %d (of %d fetched) messages through %s", handled, count, source.mxid + ) async def _backfill_messages( - self, source: u.User, min_id: int | None, limit: int, client: MautrixTelegramClient - ) -> int: - count = 0 + self, source: u.User, min_id: int, limit: int, client: MautrixTelegramClient + ) -> tuple[int, int]: + count = handled_count = 0 entity = await self.get_input_entity(source) - if min_id is not None: + if self.peer_type == "channel": + # This is a channel or supergroup, so we'll backfill messages based on the ID. + # There are some cases, such as deleted messages, where this may backfill less + # messages than the limit. self.log.debug(f"Iterating all messages starting with {min_id} (approx: {limit})") messages = client.iter_messages(entity, reverse=True, min_id=min_id) async for message in messages: - await self._handle_telegram_backfill_message(source, message) count += 1 + was_handled = await self._handle_telegram_backfill_message(source, message) + handled_count += 1 if was_handled else 0 else: - self.log.debug(f"Fetching up to {limit} most recent messages") - messages = await client.get_messages(entity, limit=limit) + # Private chats and normal groups don't have their own message ID namespace, + # which means we'll have to fetch messages a different way. + self.log.debug( + f"Fetching up to {limit} most recent messages, ignoring anything before {min_id}" + ) + messages = await client.get_messages(entity, min_id=min_id, limit=limit) for message in reversed(messages): - await self._handle_telegram_backfill_message(source, message) count += 1 - return count + if message.id <= min_id: + self.log.trace( + f"Skipping {message.id} in backfill response as it's lower than " + f"the last bridged message ({min_id})" + ) + continue + was_handled = await self._handle_telegram_backfill_message(source, message) + handled_count += 1 if was_handled else 0 + return count, handled_count async def _handle_telegram_backfill_message( self, source: au.AbstractUser, msg: Message | MessageService - ) -> None: + ) -> bool: if msg.from_id and isinstance(msg.from_id, (PeerUser, PeerChannel)): sender = await p.Puppet.get_by_peer(msg.from_id) elif isinstance(msg.peer_id, PeerUser): @@ -2898,14 +2911,17 @@ class Portal(DBPortal, BasePortal): if isinstance(msg, MessageService): if isinstance(msg.action, MessageActionContactSignUp): await self.handle_telegram_joined(source, sender, msg, backfill=True) + return True else: self.log.debug( f"Unhandled service message {type(msg.action).__name__} in backfill" ) elif isinstance(msg, Message): await self.handle_telegram_message(source, sender, msg) + return True else: self.log.debug(f"Unhandled message type {type(msg).__name__} in backfill") + return False def _split_dm_reaction_counts(self, counts: list[ReactionCount]) -> list[MessagePeerReaction]: if len(counts) == 1: