From d72897dfe8f4970c9fdb359d3cdf4ed3dcceea91 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Wed, 1 Nov 2023 01:03:45 +0200 Subject: [PATCH] Remove support for MSC2716 --- mautrix_telegram/commands/telegram/misc.py | 13 +- mautrix_telegram/config.py | 2 - mautrix_telegram/example-config.yaml | 22 +-- mautrix_telegram/matrix.py | 15 --- mautrix_telegram/portal.py | 150 ++++----------------- 5 files changed, 28 insertions(+), 174 deletions(-) diff --git a/mautrix_telegram/commands/telegram/misc.py b/mautrix_telegram/commands/telegram/misc.py index 65e3486b..274a21cf 100644 --- a/mautrix_telegram/commands/telegram/misc.py +++ b/mautrix_telegram/commands/telegram/misc.py @@ -440,14 +440,5 @@ async def backfill(evt: CommandEvent) -> None: if not evt.config["bridge.backfill.normal_groups"] and portal.peer_type == "chat": await evt.reply("Backfilling normal groups is disabled in the bridge config") return - if portal.backfill_msc2716: - messages_per_batch = evt.config["bridge.backfill.incremental.messages_per_batch"] - batches = math.ceil(limit / messages_per_batch) - rounded = "" - if batches * messages_per_batch != limit: - rounded = f" (rounded message limit to {batches}*{messages_per_batch})" - await portal.enqueue_backfill(evt.sender, priority=0, max_batches=batches) - await evt.reply(f"Backfill queued{rounded}") - else: - output = await portal.forward_backfill(evt.sender, initial=False, override_limit=limit) - await evt.reply(output) + output = await portal.forward_backfill(evt.sender, initial=False, override_limit=limit) + await evt.reply(output) diff --git a/mautrix_telegram/config.py b/mautrix_telegram/config.py index 624f7dfe..ca38c520 100644 --- a/mautrix_telegram/config.py +++ b/mautrix_telegram/config.py @@ -171,8 +171,6 @@ class Config(BaseBridgeConfig): copy("bridge.kick_on_logout") copy("bridge.always_read_joined_telegram_notice") copy("bridge.backfill.enable") - copy("bridge.backfill.msc2716") - copy("bridge.backfill.double_puppet_backfill") copy("bridge.backfill.normal_groups") copy("bridge.backfill.unread_hours_threshold") if "bridge.backfill.forward" in self: diff --git a/mautrix_telegram/example-config.yaml b/mautrix_telegram/example-config.yaml index c3c6e578..27749d03 100644 --- a/mautrix_telegram/example-config.yaml +++ b/mautrix_telegram/example-config.yaml @@ -379,23 +379,6 @@ bridge: backfill: # Allow backfilling at all? enable: true - # Use MSC2716 for backfilling? - # - # This requires a server with MSC2716 support, which is currently an experimental feature in Synapse. - # It can be enabled by setting experimental_features -> msc2716_enabled to true in homeserver.yaml. - msc2716: false - # Use double puppets for backfilling? - # - # If using MSC2716, the double puppets must be in the appservice's user ID namespace - # (because the bridge can't use the double puppet access token with batch sending). - # - # Even without MSC2716, bridging old messages with correct timestamps requires the double - # puppets to be in an appservice namespace, or the server to be modified to allow - # overriding timestamps anyway. - # - # Also note that adding users to the appservice namespace may have unexpected side effects, - # as described in https://docs.mau.fi/bridges/general/double-puppeting.html#appservice-method - double_puppet_backfill: false # Whether or not to enable backfilling in normal groups. # Normal groups have numerous technical problems in Telegram, and backfilling normal groups # will likely cause problems if there are multiple Matrix users in the group. @@ -405,10 +388,9 @@ bridge: # Set to -1 to let any chat be unread. unread_hours_threshold: 720 - # Forward backfilling limits. These apply to both MSC2716 and legacy backfill. + # Forward backfilling limits. # # Using a negative initial limit is not recommended, as it would try to backfill everything in a single batch. - # MSC2716 and the incremental settings are meant for backfilling everything incrementally rather than at once. forward_limits: # Number of messages to backfill immediately after creating a portal. initial: @@ -425,7 +407,7 @@ bridge: # Timeout for forward backfills in seconds. If you have a high limit, you'll have to increase this too. forward_timeout: 900 - # Settings for incremental backfill of history. These only apply when using MSC2716. + # Settings for incremental backfill of history. These only apply to Beeper, as upstream abandoned MSC2716. incremental: # Maximum number of messages to backfill per batch. messages_per_batch: 100 diff --git a/mautrix_telegram/matrix.py b/mautrix_telegram/matrix.py index 770e832f..55fd3bec 100644 --- a/mautrix_telegram/matrix.py +++ b/mautrix_telegram/matrix.py @@ -61,21 +61,6 @@ class MatrixHandler(BaseMatrixHandler): self._previously_typing = {} - async def check_versions(self) -> None: - await super().check_versions() - if self.config["bridge.backfill.msc2716"] and not ( - support := self.versions.supports("org.matrix.msc2716") - ): - self.log.fatal( - "Backfilling with MSC2716 is enabled in bridge config, but " - + ( - "batch sending is not enabled on homeserver" - if support is False - else "homeserver does not support batch sending" - ) - ) - sys.exit(18) - async def handle_puppet_group_invite( self, room_id: RoomID, diff --git a/mautrix_telegram/portal.py b/mautrix_telegram/portal.py index 00a79d5b..2ab062db 100644 --- a/mautrix_telegram/portal.py +++ b/mautrix_telegram/portal.py @@ -252,7 +252,6 @@ if TYPE_CHECKING: StateBridge = EventType.find("m.bridge", EventType.Class.STATE) StateHalfShotBridge = EventType.find("uk.half-shot.bridge", EventType.Class.STATE) DummyPortalCreated = EventType.find("fi.mau.dummy.portal_created", EventType.Class.MESSAGE) -StateMarker = EventType.find("org.matrix.msc2716.marker", EventType.Class.STATE) InviteList = Union[UserID, List[UserID]] UpdateTyping = Union[UpdateUserTyping, UpdateChatUserTyping, UpdateChannelUserTyping] @@ -299,8 +298,6 @@ class Portal(DBPortal, BasePortal): backfill_lock: SimpleLock backfill_method_lock: asyncio.Lock - backfill_leave: set[IntentAPI] | None - backfill_msc2716: bool backfill_enable: bool alias: RoomAlias | None @@ -321,7 +318,6 @@ class Portal(DBPortal, BasePortal): _sponsored_seen: dict[UserID, bool] _new_messages_after_sponsored: bool - _member_list_cache: dict[EventID, set[UserID]] _prev_reaction_poll: dict[UserID, float] _msg_conv: putil.TelegramMessageConverter @@ -380,7 +376,6 @@ class Portal(DBPortal, BasePortal): "Waiting for backfilling to finish before handling %s", log=self.log ) self.backfill_method_lock = asyncio.Lock() - self.backfill_leave = None self.dedup = putil.PortalDedup(self) self.send_lock = putil.PortalSendLock() @@ -395,7 +390,6 @@ class Portal(DBPortal, BasePortal): self._new_messages_after_sponsored = True self._bridging_blocked_at_runtime = False - self._member_list_cache = {} self._prev_reaction_poll = defaultdict(lambda: 0.0) self._msg_conv = putil.TelegramMessageConverter(self) @@ -492,7 +486,6 @@ class Portal(DBPortal, BasePortal): cls.filter_list = cls.config["bridge.filter.list"] cls.filter_users = cls.config["bridge.filter.users"] cls.hs_domain = cls.config["homeserver.domain"] - cls.backfill_msc2716 = cls.config["bridge.backfill.msc2716"] cls.backfill_enable = cls.config["bridge.backfill.enable"] cls.alias_template = SimpleTemplate( cls.config["bridge.alias_template"], @@ -1065,18 +1058,14 @@ class Portal(DBPortal, BasePortal): self.first_event_id = await self.main_intent.send_message_event( self.mxid, DummyPortalCreated, {} ) - if not self.bridge.homeserver_software.is_hungry: - self._member_list_cache[self.first_event_id] = set( - (await self.main_intent.get_joined_members(self.mxid)).keys() - ) await self.save() - if self.backfill_enable and (isinstance(user, u.User) or not self.backfill_msc2716): + if self.backfill_enable: try: await self.forward_backfill(user, initial=True, client=client) except Exception: self.log.exception("Error in initial backfill") - if self.backfill_msc2716: + if self._enable_batch_sending: await self.enqueue_backfill(user, priority=50) return self.mxid @@ -2837,6 +2826,10 @@ class Portal(DBPortal, BasePortal): def _default_max_batches(self) -> int: return self.config[f"bridge.backfill.incremental.max_batches.{self._backfill_config_type}"] + @property + def _enable_batch_sending(self) -> bool: + return self.bridge.matrix.versions.supports("com.beeper.batch_sending") + async def enqueue_backfill( self, source: u.User, @@ -2926,14 +2919,9 @@ class Portal(DBPortal, BasePortal): if not self.config["bridge.backfill.normal_groups"] and self.peer_type == "chat": return "Backfilling normal groups is disabled in the bridge config" tg_space = source.tgid if self.peer_type != "channel" else self.tgid - prev_event_id = self.first_event_id if forward: last_in_room = await DBMessage.find_last(self.mxid, tg_space) - if last_in_room: - prev_event_id = last_in_room.mxid - min_id = last_in_room.tgid - else: - min_id = 0 + min_id = last_in_room.tgid if last_in_room else 0 if last_tgid is None: messages = await source.client.get_messages(self.peer, limit=1) if not messages: @@ -2964,29 +2952,10 @@ class Portal(DBPortal, BasePortal): f"Backfilling up to {req.messages_per_batch} historical messages " f"before {anchor_id} ({anchor_source}) through {source.mxid}" ) - insertion_id, event_count, message_count, lowest_id = await self._backfill_messages( - source, client, forward, anchor_id, limit, prev_event_id + event_count, message_count, lowest_id = await self._backfill_messages( + source, client, forward, anchor_id, limit ) - if prev_event_id == self.first_event_id: - if insertion_id and not self.base_insertion_id: - self.base_insertion_id = insertion_id - elif not insertion_id: - insertion_id = self.base_insertion_id await self.save() - if ( - event_count > 0 - and self.backfill_msc2716 - and (not forward or not self.bridge.homeserver_software.is_hungry) - ): - await self.main_intent.send_state_event( - self.mxid, - StateMarker, - { - "org.matrix.msc2716.marker.insertion": insertion_id, - "com.beeper.timestamp": int(time.time() * 1000), - }, - state_key=insertion_id, - ) if forward: self.log.debug(f"Forward backfill finished with {event_count}/{message_count} events") elif message_count > 0 and lowest_id and lowest_id > 1: @@ -3005,42 +2974,11 @@ class Portal(DBPortal, BasePortal): self.log.debug("No more messages to backfill") return f"Backfilled {event_count} messages" - def _can_double_puppet_backfill(self, custom_mxid: UserID) -> bool: - if not self.backfill_msc2716: - return True - if not self.config["bridge.backfill.double_puppet_backfill"]: - return False - if self.bridge.homeserver_software.is_hungry: - return True - - # Batch sending can only use local users, so don't allow double puppets on other servers. - if custom_mxid[custom_mxid.index(":") + 1 :] != self.config["homeserver.domain"]: - return False - return True - - async def _get_members_at(self, event_id: EventID) -> set[UserID]: - try: - return self._member_list_cache[event_id] - except KeyError: - pass - # TODO cache the list in db? - self.log.debug(f"Fetching member list at {event_id}") - ctx = await self.main_intent.get_event_context(self.mxid, event_id, limit=0) - members = { - evt.state_key - for evt in ctx.state - if evt.type == EventType.ROOM_MEMBER and evt.content.membership == Membership.JOIN - } - self.log.debug(f"Found {len(members)} members at {event_id}") - self._member_list_cache[event_id] = members - return members - async def _convert_batch_msg( self, source: u.User, client: MautrixTelegramClient, msg: Message, - add_member: Callable[[IntentAPI, str, ContentURI], Awaitable[None]], ) -> tuple[putil.ConvertedMessage, IntentAPI]: if msg.from_id and isinstance(msg.from_id, (PeerUser, PeerChannel)): sender = await p.Puppet.get_by_peer(msg.from_id) @@ -3058,10 +2996,12 @@ class Portal(DBPortal, BasePortal): await sender.update_info(source, entity, client_override=client) else: intent = self.main_intent - if intent.api.is_real_user and not self._can_double_puppet_backfill(intent.mxid): + if ( + intent.api.is_real_user + and not intent.api.is_real_user_as_token + and not self._enable_batch_sending + ): intent = sender.default_mxid_intent - if sender: - await add_member(intent, sender.displayname, sender.avatar_url) is_bot = sender.is_bot if sender else False converted = await self._msg_conv.convert( source, @@ -3106,50 +3046,15 @@ class Portal(DBPortal, BasePortal): forward: bool, anchor_id: int, limit: int, - prev_event_id: EventID, - ) -> tuple[EventID | None, int, int, TelegramID]: + ) -> tuple[int, int, TelegramID]: entity = await self.get_input_entity(source) events = [] intents = [] metas = [] - state_events = [] - do_batch_send = self.backfill_msc2716 - added_members = ( - await self._get_members_at(prev_event_id) - if not self.bridge.homeserver_software.is_hungry and do_batch_send - else set() - ) - before_first_msg_timestamp = 0 tg_space = self.tgid if self.peer_type == "channel" else source.tgid - async def add_member(intent: IntentAPI, displayname: str, avatar_url: ContentURI) -> None: - if self.bridge.homeserver_software.is_hungry or intent.mxid in added_members: - return - added_members.add(intent.mxid) - if not do_batch_send: - # TODO leave these members? - await intent.ensure_joined(self.mxid) - return - invite_event = BatchSendStateEvent( - type=EventType.ROOM_MEMBER, - state_key=intent.mxid, - sender=self.main_intent.mxid, - timestamp=before_first_msg_timestamp, - content=MemberStateEventContent( - membership=Membership.INVITE, - displayname=displayname, - avatar_url=avatar_url, - ), - ) - join_event = attr.evolve( - invite_event, - content=attr.evolve(invite_event.content, membership=Membership.JOIN), - sender=intent.mxid, - ) - state_events.append(invite_event) - state_events.append(join_event) - lowest_id = 0 + first_id_found = False first_id = anchor_id message_count = 0 minmax = {"min_id": anchor_id} if forward else {"max_id": anchor_id} @@ -3174,11 +3079,11 @@ class Portal(DBPortal, BasePortal): continue if not lowest_id or msg.id < lowest_id: lowest_id = msg.id - if not before_first_msg_timestamp: + if not first_id_found: first_id = msg.id - before_first_msg_timestamp = int(msg.date.timestamp() * 1000) - 1 + first_id_found = True - converted, intent = await self._convert_batch_msg(source, client, msg, add_member) + converted, intent = await self._convert_batch_msg(source, client, msg) if converted is None: continue d_event_id = None @@ -3197,28 +3102,21 @@ class Portal(DBPortal, BasePortal): f"Didn't get any events to send out of {message_count} messages fetched " f"(first received ID: {first_id}, lowest: {lowest_id})" ) - return None, 0, message_count, lowest_id + return 0, message_count, lowest_id self.log.debug( f"Got {len(events)} events to send out of {message_count} messages fetched " f"(first received ID: {first_id}, lowest: {lowest_id})" ) - if do_batch_send: - resp = await self.main_intent.batch_send( + if self._enable_batch_sending: + resp = await self.main_intent.beeper_batch_send( self.mxid, - prev_event_id, - batch_id=self.next_batch_id if not forward else None, # We iterated the events in reverse chronological order, # so reverse them before sending events=list(reversed(events)), - state_events_at_start=state_events, - beeper_new_messages=forward, + forward=forward, ) - if prev_event_id == self.first_event_id and resp.next_batch_id: - self.next_batch_id = resp.next_batch_id - base_insertion_event_id = resp.base_insertion_event_id event_ids = resp.event_ids else: - base_insertion_event_id = None event_ids = [ await intent.send_message_event( self.mxid, evt.type, evt.content, timestamp=evt.timestamp @@ -3243,7 +3141,7 @@ class Portal(DBPortal, BasePortal): if msg is not None ] ) - return base_insertion_event_id, len(events), message_count, lowest_id + return len(events), message_count, lowest_id def _split_dm_reaction_counts(self, counts: list[ReactionCount]) -> list[MessagePeerReaction]: reactions = []