Improve handling of reaching the start of a chat in backfill
This commit is contained in:
@@ -21,9 +21,11 @@ from datetime import datetime, timedelta
|
||||
from asyncpg import Record
|
||||
from attr import dataclass
|
||||
|
||||
from mautrix.types import RoomID, UserID
|
||||
from mautrix.types import UserID
|
||||
from mautrix.util.async_db import Database
|
||||
|
||||
from ..types import TelegramID
|
||||
|
||||
fake_db = Database.create("") if TYPE_CHECKING else None
|
||||
|
||||
|
||||
@@ -34,8 +36,9 @@ class Backfill:
|
||||
queue_id: int | None
|
||||
user_mxid: UserID
|
||||
priority: int
|
||||
portal_tgid: int
|
||||
portal_tg_receiver: int
|
||||
portal_tgid: TelegramID
|
||||
portal_tg_receiver: TelegramID
|
||||
anchor_msg_id: TelegramID | None
|
||||
messages_per_batch: int
|
||||
post_batch_delay: int
|
||||
max_batches: int
|
||||
@@ -47,9 +50,10 @@ class Backfill:
|
||||
def new(
|
||||
user_mxid: UserID,
|
||||
priority: int,
|
||||
portal_tgid: int,
|
||||
portal_tg_receiver: int,
|
||||
portal_tgid: TelegramID,
|
||||
portal_tg_receiver: TelegramID,
|
||||
messages_per_batch: int,
|
||||
anchor_msg_id: TelegramID | None = None,
|
||||
post_batch_delay: int = 0,
|
||||
max_batches: int = -1,
|
||||
) -> "Backfill":
|
||||
@@ -59,6 +63,7 @@ class Backfill:
|
||||
priority=priority,
|
||||
portal_tgid=portal_tgid,
|
||||
portal_tg_receiver=portal_tg_receiver,
|
||||
anchor_msg_id=anchor_msg_id,
|
||||
messages_per_batch=messages_per_batch,
|
||||
post_batch_delay=post_batch_delay,
|
||||
max_batches=max_batches,
|
||||
@@ -78,6 +83,7 @@ class Backfill:
|
||||
"priority",
|
||||
"portal_tgid",
|
||||
"portal_tg_receiver",
|
||||
"anchor_msg_id",
|
||||
"messages_per_batch",
|
||||
"post_batch_delay",
|
||||
"max_batches",
|
||||
@@ -150,6 +156,7 @@ class Backfill:
|
||||
self.priority,
|
||||
self.portal_tgid,
|
||||
self.portal_tg_receiver,
|
||||
self.anchor_msg_id,
|
||||
self.messages_per_batch,
|
||||
self.post_batch_delay,
|
||||
self.max_batches,
|
||||
|
||||
@@ -17,4 +17,5 @@ from . import (
|
||||
v12_message_sender,
|
||||
v13_multiple_reactions,
|
||||
v14_puppet_custom_mxid_index,
|
||||
v15_backfill_anchor_id,
|
||||
)
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
from mautrix.util.async_db import Connection, Scheme
|
||||
|
||||
latest_version = 14
|
||||
latest_version = 15
|
||||
|
||||
|
||||
async def create_latest_tables(conn: Connection, scheme: Scheme) -> int:
|
||||
@@ -221,6 +221,7 @@ async def create_latest_tables(conn: Connection, scheme: Scheme) -> int:
|
||||
priority INTEGER NOT NULL,
|
||||
portal_tgid BIGINT,
|
||||
portal_tg_receiver BIGINT,
|
||||
anchor_msg_id BIGINT,
|
||||
messages_per_batch INTEGER NOT NULL,
|
||||
post_batch_delay INTEGER NOT NULL,
|
||||
max_batches INTEGER NOT NULL,
|
||||
|
||||
@@ -0,0 +1,23 @@
|
||||
# mautrix-telegram - A Matrix-Telegram puppeting bridge
|
||||
# Copyright (C) 2022 Tulir Asokan
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
from mautrix.util.async_db import Connection
|
||||
|
||||
from . import upgrade_table
|
||||
|
||||
|
||||
@upgrade_table.register(description="Store lowest message ID in backfill queue")
|
||||
async def upgrade_v15(conn: Connection) -> None:
|
||||
await conn.execute("ALTER TABLE backfill_queue ADD COLUMN anchor_msg_id BIGINT")
|
||||
+21
-11
@@ -2621,6 +2621,7 @@ class Portal(DBPortal, BasePortal):
|
||||
priority: int,
|
||||
max_batches: int | None = None,
|
||||
messages_per_batch: int | None = None,
|
||||
anchor_msg_id: int | None = None,
|
||||
) -> None:
|
||||
# TODO check that there are no queued backfills
|
||||
# if not await Backfill.get(source.mxid, self.tgid, self.tg_receiver):
|
||||
@@ -2629,6 +2630,7 @@ class Portal(DBPortal, BasePortal):
|
||||
priority=priority,
|
||||
portal_tgid=self.tgid,
|
||||
portal_tg_receiver=self.tg_receiver,
|
||||
anchor_msg_id=anchor_msg_id,
|
||||
messages_per_batch=(
|
||||
messages_per_batch or self.config["bridge.backfill.incremental.messages_per_batch"]
|
||||
),
|
||||
@@ -2712,11 +2714,15 @@ class Portal(DBPortal, BasePortal):
|
||||
limit = req.messages_per_batch
|
||||
first_in_room = await DBMessage.find_first(self.mxid, tg_space)
|
||||
anchor_id = first_in_room.tgid if first_in_room else None
|
||||
anchor_source = "lowest in chat"
|
||||
if req.anchor_msg_id and req.anchor_msg_id < anchor_id:
|
||||
anchor_source = "backfill queue anchor"
|
||||
anchor_id = req.anchor_msg_id
|
||||
self.log.debug(
|
||||
f"Backfilling up to {req.messages_per_batch} historical messages "
|
||||
f"before {anchor_id} through {source.mxid}"
|
||||
f"before {anchor_id} ({anchor_source}) through {source.mxid}"
|
||||
)
|
||||
insertion_id, message_count, first_id = await self._backfill_messages(
|
||||
insertion_id, event_count, message_count, lowest_id = await self._backfill_messages(
|
||||
source, client, forward, anchor_id, limit, prev_event_id
|
||||
)
|
||||
if prev_event_id == self.first_event_id:
|
||||
@@ -2726,7 +2732,7 @@ class Portal(DBPortal, BasePortal):
|
||||
insertion_id = self.base_insertion_id
|
||||
await self.save()
|
||||
# TODO this should probably check actual event count instead of message count
|
||||
if message_count > 0 and self.backfill_msc2716:
|
||||
if event_count > 0 and self.backfill_msc2716:
|
||||
await self.main_intent.send_state_event(
|
||||
self.mxid,
|
||||
StateMarker,
|
||||
@@ -2737,8 +2743,8 @@ class Portal(DBPortal, BasePortal):
|
||||
state_key=insertion_id,
|
||||
)
|
||||
if forward:
|
||||
self.log.debug(f"Forward backfill finished with {message_count} messages")
|
||||
elif message_count > 0 and first_id and first_id > 1:
|
||||
self.log.debug(f"Forward backfill finished with {event_count}/{message_count} events")
|
||||
elif message_count > 0 and lowest_id and lowest_id > 1:
|
||||
if req.max_batches in (0, 1):
|
||||
return "Already backfilled enough batches, not enqueuing more"
|
||||
self.log.debug(f"Enqueuing more backfill through {source.mxid}")
|
||||
@@ -2747,10 +2753,11 @@ class Portal(DBPortal, BasePortal):
|
||||
priority=100,
|
||||
messages_per_batch=req.messages_per_batch,
|
||||
max_batches=-1 if req.max_batches < 0 else (req.max_batches - 1),
|
||||
anchor_msg_id=lowest_id,
|
||||
)
|
||||
else:
|
||||
self.log.debug("No more messages to backfill")
|
||||
return f"Backfilled {message_count} messages"
|
||||
return f"Backfilled {event_count} messages"
|
||||
|
||||
def _can_double_puppet_backfill(self, custom_mxid: UserID) -> bool:
|
||||
if not self.backfill_msc2716:
|
||||
@@ -2843,7 +2850,7 @@ class Portal(DBPortal, BasePortal):
|
||||
anchor_id: int,
|
||||
limit: int,
|
||||
prev_event_id: EventID,
|
||||
) -> tuple[EventID | None, int, int]:
|
||||
) -> tuple[EventID | None, int, int, TelegramID]:
|
||||
entity = await self.get_input_entity(source)
|
||||
events = []
|
||||
intents = []
|
||||
@@ -2884,6 +2891,7 @@ class Portal(DBPortal, BasePortal):
|
||||
state_events.append(invite_event)
|
||||
state_events.append(join_event)
|
||||
|
||||
lowest_id = 0
|
||||
first_id = anchor_id
|
||||
message_count = 0
|
||||
minmax = {"min_id": anchor_id} if forward else {"max_id": anchor_id}
|
||||
@@ -2899,6 +2907,8 @@ class Portal(DBPortal, BasePortal):
|
||||
elif isinstance(msg, MessageService):
|
||||
# TODO some service messages can be backfilled
|
||||
continue
|
||||
if not lowest_id or msg.id < lowest_id:
|
||||
lowest_id = msg.id
|
||||
if not before_first_msg_timestamp:
|
||||
first_id = msg.id
|
||||
before_first_msg_timestamp = int(msg.date.timestamp() * 1000) - 1
|
||||
@@ -2916,12 +2926,12 @@ class Portal(DBPortal, BasePortal):
|
||||
if len(events) == 0:
|
||||
self.log.debug(
|
||||
f"Didn't get any events to send out of {message_count} messages fetched "
|
||||
f"(first received ID: {first_id})"
|
||||
f"(first received ID: {first_id}, lowest: {lowest_id})"
|
||||
)
|
||||
return None, message_count, first_id
|
||||
return None, 0, message_count, lowest_id
|
||||
self.log.debug(
|
||||
f"Got {len(events)} events to send out of {message_count} messages fetched "
|
||||
f"(first received ID: {first_id})"
|
||||
f"(first received ID: {first_id}, lowest: {lowest_id})"
|
||||
)
|
||||
if do_batch_send:
|
||||
resp = await self.main_intent.batch_send(
|
||||
@@ -2964,7 +2974,7 @@ class Portal(DBPortal, BasePortal):
|
||||
if msg is not None
|
||||
]
|
||||
)
|
||||
return base_insertion_event_id, message_count, first_id
|
||||
return base_insertion_event_id, len(events), message_count, lowest_id
|
||||
|
||||
def _split_dm_reaction_counts(self, counts: list[ReactionCount]) -> list[MessagePeerReaction]:
|
||||
reactions = []
|
||||
|
||||
Reference in New Issue
Block a user