Separate dialog syncing and creation limits and fix bugs
This commit is contained in:
@@ -89,7 +89,12 @@ class Config(BaseBridgeConfig):
|
||||
copy("bridge.sync_channel_members")
|
||||
copy("bridge.skip_deleted_members")
|
||||
copy("bridge.startup_sync")
|
||||
copy("bridge.sync_dialog_limit")
|
||||
if "bridge.sync_dialog_limit" in self:
|
||||
base["bridge.sync_create_limit"] = self["bridge.sync_dialog_limit"]
|
||||
base["bridge.sync_update_limit"] = self["bridge.sync_dialog_limit"]
|
||||
else:
|
||||
copy("bridge.sync_update_limit")
|
||||
copy("bridge.sync_create_limit")
|
||||
copy("bridge.sync_direct_chats")
|
||||
copy("bridge.max_telegram_delete")
|
||||
copy("bridge.sync_matrix_state")
|
||||
|
||||
@@ -142,7 +142,10 @@ bridge:
|
||||
startup_sync: true
|
||||
# Number of most recently active dialogs to check when syncing chats.
|
||||
# Set to 0 to remove limit.
|
||||
sync_dialog_limit: 30
|
||||
sync_update_limit: 0
|
||||
# Number of most recently active dialogs to create portals for when syncing chats.
|
||||
# Set to 0 to remove limit.
|
||||
sync_create_limit: 30
|
||||
# Whether or not to sync and create portals for direct chats at startup.
|
||||
sync_direct_chats: false
|
||||
# The maximum number of simultaneous Telegram deletions to handle.
|
||||
@@ -249,10 +252,11 @@ bridge:
|
||||
# N.B. Initial backfill will only start after member sync. Make sure your
|
||||
# max_initial_member_sync is set to a low enough value so it doesn't take forever.
|
||||
initial_limit: 0
|
||||
# Maximum number of messages to backfill if messages were missed while
|
||||
# the bridge was disconnected.
|
||||
# Maximum number of messages to backfill if messages were missed while the bridge was
|
||||
# disconnected. Note that this only works for logged in users and only if the chat isn't
|
||||
# older than sync_update_limit
|
||||
# Set to 0 to disable backfilling missed messages.
|
||||
missed_limit: 100
|
||||
missed_limit: 50
|
||||
# If using double puppeting, should notifications be disabled
|
||||
# while the initial backfill is in progress?
|
||||
disable_notifications: false
|
||||
|
||||
@@ -534,7 +534,7 @@ class BasePortal(ABC):
|
||||
|
||||
@abstractmethod
|
||||
def backfill(self, source: 'AbstractUser', is_initial: bool = False,
|
||||
limit: Optional[int] = None) -> Awaitable[None]:
|
||||
limit: Optional[int] = None, last_id: Optional[int] = None) -> Awaitable[None]:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
|
||||
@@ -436,7 +436,7 @@ class PortalTelegram(BasePortal, ABC):
|
||||
# Nothing to backfill
|
||||
return
|
||||
if limit < 0:
|
||||
limit = None
|
||||
limit = last_id - min_id
|
||||
self.log.debug(f"Backfilling approximately {last_id - min_id} messages "
|
||||
f"through {source.mxid}")
|
||||
elif self.peer_type == "channel":
|
||||
@@ -444,7 +444,6 @@ class PortalTelegram(BasePortal, ABC):
|
||||
# There are some cases, such as deleted messages, where this may backfill less
|
||||
# messages than the limit.
|
||||
min_id = max(last_id - limit, min_id)
|
||||
limit = None
|
||||
self.log.debug(f"Backfilling messages after ID {min_id} (last message: {last_id}) "
|
||||
f"through {source.mxid}")
|
||||
else:
|
||||
@@ -456,20 +455,19 @@ class PortalTelegram(BasePortal, ABC):
|
||||
with self.backfill_lock:
|
||||
await self._backfill(source, min_id, limit)
|
||||
|
||||
async def _backfill(self, source: 'AbstractUser', min_id: Optional[int], limit: Optional[int]
|
||||
) -> None:
|
||||
async def _backfill(self, source: 'AbstractUser', min_id: Optional[int], limit: int) -> None:
|
||||
self.backfill_leave = set()
|
||||
if ((self.peer_type == "user" and self.tg_receiver != source.tgid
|
||||
if ((self.peer_type == "user" and self.tgid != source.tgid
|
||||
and config["bridge.backfill.invite_own_puppet"])):
|
||||
self.log.debug("Adding %s's default puppet to room for backfilling", source.mxid)
|
||||
sender = p.Puppet.get(source.tgid)
|
||||
await self.main_intent.invite_user(self.mxid, sender.default_mxid)
|
||||
await sender.default_mxid_intent.join_room_by_id(self.mxid)
|
||||
self.backfill_leave.add(sender.default_mxid_intent)
|
||||
self.log.trace("Opening takeout client for %d, message ID %d->", source.tgid, min_id)
|
||||
|
||||
client = source.client
|
||||
if limit > config["bridge.backfill.takeout_limit"]:
|
||||
self.log.debug(f"Opening takeout client for {source.tgid}")
|
||||
async with client.takeout(**self._takeout_options) as takeout:
|
||||
count = await self._backfill_messages(source, min_id, limit, takeout)
|
||||
else:
|
||||
@@ -481,11 +479,12 @@ class PortalTelegram(BasePortal, ABC):
|
||||
self.backfill_leave = None
|
||||
self.log.info("Backfilled %d messages through %s", count, source.mxid)
|
||||
|
||||
async def _backfill_messages(self, source: 'AbstractUser', min_id: Optional[int],
|
||||
limit: Optional[int], client: TelegramClient) -> int:
|
||||
async def _backfill_messages(self, source: 'AbstractUser', min_id: Optional[int], limit: int,
|
||||
client: TelegramClient) -> int:
|
||||
count = 0
|
||||
entity = await self.get_input_entity(source)
|
||||
if min_id is not None:
|
||||
self.log.debug(f"Iterating all messages starting with {min_id} (approx: {limit})")
|
||||
messages = client.iter_messages(entity, reverse=True, min_id=min_id)
|
||||
async for message in messages:
|
||||
sender = p.Puppet.get(message.sender_id)
|
||||
@@ -493,6 +492,7 @@ class PortalTelegram(BasePortal, ABC):
|
||||
await self.handle_telegram_message(source, sender, message)
|
||||
count += 1
|
||||
else:
|
||||
self.log.debug(f"Fetching up to {limit} most recent messages")
|
||||
messages = await client.get_messages(entity, limit=limit)
|
||||
for message in reversed(messages):
|
||||
sender = p.Puppet.get(message.sender_id)
|
||||
|
||||
@@ -338,10 +338,13 @@ class User(AbstractUser, BaseUser):
|
||||
if self.is_bot:
|
||||
return
|
||||
creators = []
|
||||
limit = config["bridge.sync_dialog_limit"] or None
|
||||
self.log.debug(f"Syncing dialogs (limit={limit})")
|
||||
update_limit = config["bridge.sync_update_limit"] or None
|
||||
create_limit = config["bridge.sync_create_limit"]
|
||||
index = 0
|
||||
self.log.debug(f"Syncing dialogs (update_limit={update_limit}, "
|
||||
f"create_limit={create_limit})")
|
||||
dialog: Dialog
|
||||
async for dialog in self.client.iter_dialogs(limit=limit, ignore_migrated=True,
|
||||
async for dialog in self.client.iter_dialogs(limit=update_limit, ignore_migrated=True,
|
||||
archived=False):
|
||||
entity = dialog.entity
|
||||
if isinstance(entity, ChatForbidden):
|
||||
@@ -357,12 +360,13 @@ class User(AbstractUser, BaseUser):
|
||||
self.portals[portal.tgid_full] = portal
|
||||
if portal.mxid:
|
||||
update_task = portal.update_matrix_room(self, entity)
|
||||
backfill_task = portal.backfill(self, last_known_id=dialog.message.id)
|
||||
backfill_task = portal.backfill(self, last_id=dialog.message.id)
|
||||
creators.append(self.loop.create_task(update_task))
|
||||
creators.append(self.loop.create_task(backfill_task))
|
||||
else:
|
||||
elif not create_limit or index < create_limit:
|
||||
create_task = portal.create_matrix_room(self, entity, invites=[self.mxid])
|
||||
creators.append(self.loop.create_task(create_task))
|
||||
index += 1
|
||||
self.save(portals=True)
|
||||
await asyncio.gather(*creators)
|
||||
self.log.debug("Dialog syncing complete")
|
||||
|
||||
Reference in New Issue
Block a user