Improve backfilling to fetch less redundant messages
This commit is contained in:
+47
-31
@@ -2789,7 +2789,7 @@ class Portal(DBPortal, BasePortal):
|
|||||||
source: u.User,
|
source: u.User,
|
||||||
is_initial: bool = False,
|
is_initial: bool = False,
|
||||||
limit: int | None = None,
|
limit: int | None = None,
|
||||||
last_id: int | None = None,
|
last_tgid: int | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
limit = limit or (
|
limit = limit or (
|
||||||
self.config["bridge.backfill.initial_limit"]
|
self.config["bridge.backfill.initial_limit"]
|
||||||
@@ -2800,43 +2800,39 @@ class Portal(DBPortal, BasePortal):
|
|||||||
return
|
return
|
||||||
if not self.config["bridge.backfill.normal_groups"] and self.peer_type == "chat":
|
if not self.config["bridge.backfill.normal_groups"] and self.peer_type == "chat":
|
||||||
return
|
return
|
||||||
last = await DBMessage.find_last(
|
last_in_room = await DBMessage.find_last(
|
||||||
self.mxid, (source.tgid if self.peer_type != "channel" else self.tgid)
|
self.mxid, (source.tgid if self.peer_type != "channel" else self.tgid)
|
||||||
)
|
)
|
||||||
min_id = last.tgid if last else 0
|
min_id = last_in_room.tgid if last_in_room else 0
|
||||||
if last_id is None:
|
if last_tgid is None:
|
||||||
messages = await source.client.get_messages(self.peer, limit=1)
|
messages = await source.client.get_messages(self.peer, limit=1)
|
||||||
if not messages:
|
if not messages:
|
||||||
# The chat seems empty
|
# The chat seems empty
|
||||||
return
|
return
|
||||||
last_id = messages[0].id
|
last_tgid = messages[0].id
|
||||||
if last_id <= min_id:
|
if last_tgid <= min_id or (last_tgid == 1 and self.peer_type == "channel"):
|
||||||
# Nothing to backfill
|
# Nothing to backfill
|
||||||
return
|
return
|
||||||
if limit < 0:
|
if limit < 0:
|
||||||
limit = last_id - min_id
|
limit = last_tgid - min_id
|
||||||
self.log.debug(
|
self.log.debug(
|
||||||
f"Backfilling approximately {last_id - min_id} messages through {source.mxid}"
|
f"Backfilling approximately {last_tgid - min_id} messages through {source.mxid}"
|
||||||
)
|
)
|
||||||
elif self.peer_type == "channel":
|
elif self.peer_type == "channel":
|
||||||
# This is a channel or supergroup, so we'll backfill messages based on the ID.
|
min_id = max(last_tgid - limit, min_id)
|
||||||
# There are some cases, such as deleted messages, where this may backfill less
|
|
||||||
# messages than the limit.
|
|
||||||
min_id = max(last_id - limit, min_id)
|
|
||||||
self.log.debug(
|
self.log.debug(
|
||||||
f"Backfilling messages after ID {min_id} (last message: {last_id}) "
|
f"Backfilling messages after ID {min_id} (last message: {last_tgid}) "
|
||||||
f"through {source.mxid}"
|
f"through {source.mxid}"
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
# Private chats and normal groups don't have their own message ID namespace,
|
# This limit will be higher than the actual message count if there are any messages
|
||||||
# which means we'll have to fetch messages a different way.
|
# in other DMs or normal groups, but that's not too bad.
|
||||||
# The _backfill_messages method will detect min_id=None and not use reverse=True
|
limit = min(last_tgid - min_id, limit)
|
||||||
min_id = None
|
|
||||||
self.log.debug(f"Backfilling up to {limit} messages through {source.mxid}")
|
self.log.debug(f"Backfilling up to {limit} messages through {source.mxid}")
|
||||||
with self.backfill_lock:
|
with self.backfill_lock:
|
||||||
await self._backfill(source, min_id, limit)
|
await self._backfill(source, min_id, limit)
|
||||||
|
|
||||||
async def _backfill(self, source: u.User, min_id: int | None, limit: int) -> None:
|
async def _backfill(self, source: u.User, min_id: int, limit: int) -> None:
|
||||||
self.backfill_leave = set()
|
self.backfill_leave = set()
|
||||||
if (
|
if (
|
||||||
self.peer_type == "user"
|
self.peer_type == "user"
|
||||||
@@ -2854,38 +2850,55 @@ class Portal(DBPortal, BasePortal):
|
|||||||
if limit > self.config["bridge.backfill.takeout_limit"]:
|
if limit > self.config["bridge.backfill.takeout_limit"]:
|
||||||
self.log.debug(f"Opening takeout client for {source.tgid}")
|
self.log.debug(f"Opening takeout client for {source.tgid}")
|
||||||
async with client.takeout(**self._takeout_options) as takeout:
|
async with client.takeout(**self._takeout_options) as takeout:
|
||||||
count = await self._backfill_messages(source, min_id, limit, takeout)
|
count, handled = await self._backfill_messages(source, min_id, limit, takeout)
|
||||||
else:
|
else:
|
||||||
count = await self._backfill_messages(source, min_id, limit, client)
|
count, handled = await self._backfill_messages(source, min_id, limit, client)
|
||||||
|
|
||||||
for intent in self.backfill_leave:
|
for intent in self.backfill_leave:
|
||||||
self.log.trace("Leaving room with %s post-backfill", intent.mxid)
|
self.log.trace("Leaving room with %s post-backfill", intent.mxid)
|
||||||
await intent.leave_room(self.mxid)
|
await intent.leave_room(self.mxid)
|
||||||
self.backfill_leave = None
|
self.backfill_leave = None
|
||||||
self.log.info("Backfilled %d messages through %s", count, source.mxid)
|
self.log.info(
|
||||||
|
"Backfilled %d (of %d fetched) messages through %s", handled, count, source.mxid
|
||||||
|
)
|
||||||
|
|
||||||
async def _backfill_messages(
|
async def _backfill_messages(
|
||||||
self, source: u.User, min_id: int | None, limit: int, client: MautrixTelegramClient
|
self, source: u.User, min_id: int, limit: int, client: MautrixTelegramClient
|
||||||
) -> int:
|
) -> tuple[int, int]:
|
||||||
count = 0
|
count = handled_count = 0
|
||||||
entity = await self.get_input_entity(source)
|
entity = await self.get_input_entity(source)
|
||||||
if min_id is not None:
|
if self.peer_type == "channel":
|
||||||
|
# This is a channel or supergroup, so we'll backfill messages based on the ID.
|
||||||
|
# There are some cases, such as deleted messages, where this may backfill less
|
||||||
|
# messages than the limit.
|
||||||
self.log.debug(f"Iterating all messages starting with {min_id} (approx: {limit})")
|
self.log.debug(f"Iterating all messages starting with {min_id} (approx: {limit})")
|
||||||
messages = client.iter_messages(entity, reverse=True, min_id=min_id)
|
messages = client.iter_messages(entity, reverse=True, min_id=min_id)
|
||||||
async for message in messages:
|
async for message in messages:
|
||||||
await self._handle_telegram_backfill_message(source, message)
|
|
||||||
count += 1
|
count += 1
|
||||||
|
was_handled = await self._handle_telegram_backfill_message(source, message)
|
||||||
|
handled_count += 1 if was_handled else 0
|
||||||
else:
|
else:
|
||||||
self.log.debug(f"Fetching up to {limit} most recent messages")
|
# Private chats and normal groups don't have their own message ID namespace,
|
||||||
messages = await client.get_messages(entity, limit=limit)
|
# which means we'll have to fetch messages a different way.
|
||||||
|
self.log.debug(
|
||||||
|
f"Fetching up to {limit} most recent messages, ignoring anything before {min_id}"
|
||||||
|
)
|
||||||
|
messages = await client.get_messages(entity, min_id=min_id, limit=limit)
|
||||||
for message in reversed(messages):
|
for message in reversed(messages):
|
||||||
await self._handle_telegram_backfill_message(source, message)
|
|
||||||
count += 1
|
count += 1
|
||||||
return count
|
if message.id <= min_id:
|
||||||
|
self.log.trace(
|
||||||
|
f"Skipping {message.id} in backfill response as it's lower than "
|
||||||
|
f"the last bridged message ({min_id})"
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
was_handled = await self._handle_telegram_backfill_message(source, message)
|
||||||
|
handled_count += 1 if was_handled else 0
|
||||||
|
return count, handled_count
|
||||||
|
|
||||||
async def _handle_telegram_backfill_message(
|
async def _handle_telegram_backfill_message(
|
||||||
self, source: au.AbstractUser, msg: Message | MessageService
|
self, source: au.AbstractUser, msg: Message | MessageService
|
||||||
) -> None:
|
) -> bool:
|
||||||
if msg.from_id and isinstance(msg.from_id, (PeerUser, PeerChannel)):
|
if msg.from_id and isinstance(msg.from_id, (PeerUser, PeerChannel)):
|
||||||
sender = await p.Puppet.get_by_peer(msg.from_id)
|
sender = await p.Puppet.get_by_peer(msg.from_id)
|
||||||
elif isinstance(msg.peer_id, PeerUser):
|
elif isinstance(msg.peer_id, PeerUser):
|
||||||
@@ -2898,14 +2911,17 @@ class Portal(DBPortal, BasePortal):
|
|||||||
if isinstance(msg, MessageService):
|
if isinstance(msg, MessageService):
|
||||||
if isinstance(msg.action, MessageActionContactSignUp):
|
if isinstance(msg.action, MessageActionContactSignUp):
|
||||||
await self.handle_telegram_joined(source, sender, msg, backfill=True)
|
await self.handle_telegram_joined(source, sender, msg, backfill=True)
|
||||||
|
return True
|
||||||
else:
|
else:
|
||||||
self.log.debug(
|
self.log.debug(
|
||||||
f"Unhandled service message {type(msg.action).__name__} in backfill"
|
f"Unhandled service message {type(msg.action).__name__} in backfill"
|
||||||
)
|
)
|
||||||
elif isinstance(msg, Message):
|
elif isinstance(msg, Message):
|
||||||
await self.handle_telegram_message(source, sender, msg)
|
await self.handle_telegram_message(source, sender, msg)
|
||||||
|
return True
|
||||||
else:
|
else:
|
||||||
self.log.debug(f"Unhandled message type {type(msg).__name__} in backfill")
|
self.log.debug(f"Unhandled message type {type(msg).__name__} in backfill")
|
||||||
|
return False
|
||||||
|
|
||||||
def _split_dm_reaction_counts(self, counts: list[ReactionCount]) -> list[MessagePeerReaction]:
|
def _split_dm_reaction_counts(self, counts: list[ReactionCount]) -> list[MessagePeerReaction]:
|
||||||
if len(counts) == 1:
|
if len(counts) == 1:
|
||||||
|
|||||||
Reference in New Issue
Block a user