diff --git a/mautrix_telegram/db/backfill_queue.py b/mautrix_telegram/db/backfill_queue.py index a1ba408e..c856bc74 100644 --- a/mautrix_telegram/db/backfill_queue.py +++ b/mautrix_telegram/db/backfill_queue.py @@ -21,9 +21,11 @@ from datetime import datetime, timedelta from asyncpg import Record from attr import dataclass -from mautrix.types import RoomID, UserID +from mautrix.types import UserID from mautrix.util.async_db import Database +from ..types import TelegramID + fake_db = Database.create("") if TYPE_CHECKING else None @@ -34,8 +36,9 @@ class Backfill: queue_id: int | None user_mxid: UserID priority: int - portal_tgid: int - portal_tg_receiver: int + portal_tgid: TelegramID + portal_tg_receiver: TelegramID + anchor_msg_id: TelegramID | None messages_per_batch: int post_batch_delay: int max_batches: int @@ -47,9 +50,10 @@ class Backfill: def new( user_mxid: UserID, priority: int, - portal_tgid: int, - portal_tg_receiver: int, + portal_tgid: TelegramID, + portal_tg_receiver: TelegramID, messages_per_batch: int, + anchor_msg_id: TelegramID | None = None, post_batch_delay: int = 0, max_batches: int = -1, ) -> "Backfill": @@ -59,6 +63,7 @@ class Backfill: priority=priority, portal_tgid=portal_tgid, portal_tg_receiver=portal_tg_receiver, + anchor_msg_id=anchor_msg_id, messages_per_batch=messages_per_batch, post_batch_delay=post_batch_delay, max_batches=max_batches, @@ -78,6 +83,7 @@ class Backfill: "priority", "portal_tgid", "portal_tg_receiver", + "anchor_msg_id", "messages_per_batch", "post_batch_delay", "max_batches", @@ -150,6 +156,7 @@ class Backfill: self.priority, self.portal_tgid, self.portal_tg_receiver, + self.anchor_msg_id, self.messages_per_batch, self.post_batch_delay, self.max_batches, diff --git a/mautrix_telegram/db/upgrade/__init__.py b/mautrix_telegram/db/upgrade/__init__.py index a358efd3..a436bffb 100644 --- a/mautrix_telegram/db/upgrade/__init__.py +++ b/mautrix_telegram/db/upgrade/__init__.py @@ -17,4 +17,5 @@ from . import ( v12_message_sender, v13_multiple_reactions, v14_puppet_custom_mxid_index, + v15_backfill_anchor_id, ) diff --git a/mautrix_telegram/db/upgrade/v00_latest_revision.py b/mautrix_telegram/db/upgrade/v00_latest_revision.py index de862bcf..56e18e65 100644 --- a/mautrix_telegram/db/upgrade/v00_latest_revision.py +++ b/mautrix_telegram/db/upgrade/v00_latest_revision.py @@ -15,7 +15,7 @@ # along with this program. If not, see . from mautrix.util.async_db import Connection, Scheme -latest_version = 14 +latest_version = 15 async def create_latest_tables(conn: Connection, scheme: Scheme) -> int: @@ -221,6 +221,7 @@ async def create_latest_tables(conn: Connection, scheme: Scheme) -> int: priority INTEGER NOT NULL, portal_tgid BIGINT, portal_tg_receiver BIGINT, + anchor_msg_id BIGINT, messages_per_batch INTEGER NOT NULL, post_batch_delay INTEGER NOT NULL, max_batches INTEGER NOT NULL, diff --git a/mautrix_telegram/db/upgrade/v15_backfill_anchor_id.py b/mautrix_telegram/db/upgrade/v15_backfill_anchor_id.py new file mode 100644 index 00000000..f5e7756e --- /dev/null +++ b/mautrix_telegram/db/upgrade/v15_backfill_anchor_id.py @@ -0,0 +1,23 @@ +# mautrix-telegram - A Matrix-Telegram puppeting bridge +# Copyright (C) 2022 Tulir Asokan +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +from mautrix.util.async_db import Connection + +from . import upgrade_table + + +@upgrade_table.register(description="Store lowest message ID in backfill queue") +async def upgrade_v15(conn: Connection) -> None: + await conn.execute("ALTER TABLE backfill_queue ADD COLUMN anchor_msg_id BIGINT") diff --git a/mautrix_telegram/portal.py b/mautrix_telegram/portal.py index 577fd1bd..bb50c00b 100644 --- a/mautrix_telegram/portal.py +++ b/mautrix_telegram/portal.py @@ -2621,6 +2621,7 @@ class Portal(DBPortal, BasePortal): priority: int, max_batches: int | None = None, messages_per_batch: int | None = None, + anchor_msg_id: int | None = None, ) -> None: # TODO check that there are no queued backfills # if not await Backfill.get(source.mxid, self.tgid, self.tg_receiver): @@ -2629,6 +2630,7 @@ class Portal(DBPortal, BasePortal): priority=priority, portal_tgid=self.tgid, portal_tg_receiver=self.tg_receiver, + anchor_msg_id=anchor_msg_id, messages_per_batch=( messages_per_batch or self.config["bridge.backfill.incremental.messages_per_batch"] ), @@ -2712,11 +2714,15 @@ class Portal(DBPortal, BasePortal): limit = req.messages_per_batch first_in_room = await DBMessage.find_first(self.mxid, tg_space) anchor_id = first_in_room.tgid if first_in_room else None + anchor_source = "lowest in chat" + if req.anchor_msg_id and req.anchor_msg_id < anchor_id: + anchor_source = "backfill queue anchor" + anchor_id = req.anchor_msg_id self.log.debug( f"Backfilling up to {req.messages_per_batch} historical messages " - f"before {anchor_id} through {source.mxid}" + f"before {anchor_id} ({anchor_source}) through {source.mxid}" ) - insertion_id, message_count, first_id = await self._backfill_messages( + insertion_id, event_count, message_count, lowest_id = await self._backfill_messages( source, client, forward, anchor_id, limit, prev_event_id ) if prev_event_id == self.first_event_id: @@ -2726,7 +2732,7 @@ class Portal(DBPortal, BasePortal): insertion_id = self.base_insertion_id await self.save() # TODO this should probably check actual event count instead of message count - if message_count > 0 and self.backfill_msc2716: + if event_count > 0 and self.backfill_msc2716: await self.main_intent.send_state_event( self.mxid, StateMarker, @@ -2737,8 +2743,8 @@ class Portal(DBPortal, BasePortal): state_key=insertion_id, ) if forward: - self.log.debug(f"Forward backfill finished with {message_count} messages") - elif message_count > 0 and first_id and first_id > 1: + self.log.debug(f"Forward backfill finished with {event_count}/{message_count} events") + elif message_count > 0 and lowest_id and lowest_id > 1: if req.max_batches in (0, 1): return "Already backfilled enough batches, not enqueuing more" self.log.debug(f"Enqueuing more backfill through {source.mxid}") @@ -2747,10 +2753,11 @@ class Portal(DBPortal, BasePortal): priority=100, messages_per_batch=req.messages_per_batch, max_batches=-1 if req.max_batches < 0 else (req.max_batches - 1), + anchor_msg_id=lowest_id, ) else: self.log.debug("No more messages to backfill") - return f"Backfilled {message_count} messages" + return f"Backfilled {event_count} messages" def _can_double_puppet_backfill(self, custom_mxid: UserID) -> bool: if not self.backfill_msc2716: @@ -2843,7 +2850,7 @@ class Portal(DBPortal, BasePortal): anchor_id: int, limit: int, prev_event_id: EventID, - ) -> tuple[EventID | None, int, int]: + ) -> tuple[EventID | None, int, int, TelegramID]: entity = await self.get_input_entity(source) events = [] intents = [] @@ -2884,6 +2891,7 @@ class Portal(DBPortal, BasePortal): state_events.append(invite_event) state_events.append(join_event) + lowest_id = 0 first_id = anchor_id message_count = 0 minmax = {"min_id": anchor_id} if forward else {"max_id": anchor_id} @@ -2899,6 +2907,8 @@ class Portal(DBPortal, BasePortal): elif isinstance(msg, MessageService): # TODO some service messages can be backfilled continue + if not lowest_id or msg.id < lowest_id: + lowest_id = msg.id if not before_first_msg_timestamp: first_id = msg.id before_first_msg_timestamp = int(msg.date.timestamp() * 1000) - 1 @@ -2916,12 +2926,12 @@ class Portal(DBPortal, BasePortal): if len(events) == 0: self.log.debug( f"Didn't get any events to send out of {message_count} messages fetched " - f"(first received ID: {first_id})" + f"(first received ID: {first_id}, lowest: {lowest_id})" ) - return None, message_count, first_id + return None, 0, message_count, lowest_id self.log.debug( f"Got {len(events)} events to send out of {message_count} messages fetched " - f"(first received ID: {first_id})" + f"(first received ID: {first_id}, lowest: {lowest_id})" ) if do_batch_send: resp = await self.main_intent.batch_send( @@ -2964,7 +2974,7 @@ class Portal(DBPortal, BasePortal): if msg is not None ] ) - return base_insertion_event_id, message_count, first_id + return base_insertion_event_id, len(events), message_count, lowest_id def _split_dm_reaction_counts(self, counts: list[ReactionCount]) -> list[MessagePeerReaction]: reactions = []