diff --git a/mautrix_telegram/abstract_user.py b/mautrix_telegram/abstract_user.py index 27fd07b3..512cedf6 100644 --- a/mautrix_telegram/abstract_user.py +++ b/mautrix_telegram/abstract_user.py @@ -711,6 +711,22 @@ class AbstractUser(ABC): self.log.debug("Ignoring relaybot-sent message %s to %s", update.id, portal.tgid_log) return + task = self._call_portal_message_handler(update, original_update, portal, sender) + if portal.backfill_lock.locked: + self.log.debug( + f"{portal.tgid_log} is backfill locked, moving incoming message to async task" + ) + background_task.create(task) + else: + await task + + async def _call_portal_message_handler( + self, + update: UpdateMessageContent, + original_update: UpdateMessage, + portal: po.Portal, + sender: pu.Puppet, + ) -> None: await portal.backfill_lock.wait(f"update {update.id}") if isinstance(update, MessageService): diff --git a/mautrix_telegram/portal.py b/mautrix_telegram/portal.py index e4996771..5a36e68c 100644 --- a/mautrix_telegram/portal.py +++ b/mautrix_telegram/portal.py @@ -2863,8 +2863,11 @@ class Portal(DBPortal, BasePortal): if limit == 0: return "Limit is zero, not backfilling" with self.backfill_lock: - output = await self.backfill( - source, client, forward=True, forward_limit=limit, last_tgid=last_tgid + output = await asyncio.wait_for( + self.backfill( + source, client, forward=True, forward_limit=limit, last_tgid=last_tgid + ), + timeout=15 * 60, ) self.log.debug(f"Forward backfill complete, status: {output}") return output @@ -3129,9 +3132,16 @@ class Portal(DBPortal, BasePortal): anchor_id = 2**31 - 1 minmax = {} self.log.debug(f"Iterating messages through {source.tgid} with {limit=}, {minmax}") + delay_warn_handle = self.loop.call_later( + 5 * 60, lambda: self.log.warning("Iterating messages is taking long") + ) # Iterate messages newest to oldest and collect the results async for msg in client.iter_messages(entity, limit=limit, **minmax): message_count += 1 + if message_count == 1: + self.log.debug(f"Backfill iter: got first message {msg.id}") + elif message_count % 50 == 0: + self.log.debug(f"Backfill iter: got {message_count} messages so far (at {msg.id})") if (forward and msg.id <= anchor_id) or (not forward and msg.id >= anchor_id): continue elif isinstance(msg, MessageService): @@ -3156,6 +3166,7 @@ class Portal(DBPortal, BasePortal): events.append(await self._wrap_batch_msg(intent, msg, converted, caption=True)) intents.append(intent) metas.append(None) + delay_warn_handle.cancel() if len(events) == 0: self.log.debug( f"Didn't get any events to send out of {message_count} messages fetched "