Remove support for MSC2716
This commit is contained in:
@@ -440,14 +440,5 @@ async def backfill(evt: CommandEvent) -> None:
|
||||
if not evt.config["bridge.backfill.normal_groups"] and portal.peer_type == "chat":
|
||||
await evt.reply("Backfilling normal groups is disabled in the bridge config")
|
||||
return
|
||||
if portal.backfill_msc2716:
|
||||
messages_per_batch = evt.config["bridge.backfill.incremental.messages_per_batch"]
|
||||
batches = math.ceil(limit / messages_per_batch)
|
||||
rounded = ""
|
||||
if batches * messages_per_batch != limit:
|
||||
rounded = f" (rounded message limit to {batches}*{messages_per_batch})"
|
||||
await portal.enqueue_backfill(evt.sender, priority=0, max_batches=batches)
|
||||
await evt.reply(f"Backfill queued{rounded}")
|
||||
else:
|
||||
output = await portal.forward_backfill(evt.sender, initial=False, override_limit=limit)
|
||||
await evt.reply(output)
|
||||
output = await portal.forward_backfill(evt.sender, initial=False, override_limit=limit)
|
||||
await evt.reply(output)
|
||||
|
||||
@@ -171,8 +171,6 @@ class Config(BaseBridgeConfig):
|
||||
copy("bridge.kick_on_logout")
|
||||
copy("bridge.always_read_joined_telegram_notice")
|
||||
copy("bridge.backfill.enable")
|
||||
copy("bridge.backfill.msc2716")
|
||||
copy("bridge.backfill.double_puppet_backfill")
|
||||
copy("bridge.backfill.normal_groups")
|
||||
copy("bridge.backfill.unread_hours_threshold")
|
||||
if "bridge.backfill.forward" in self:
|
||||
|
||||
@@ -379,23 +379,6 @@ bridge:
|
||||
backfill:
|
||||
# Allow backfilling at all?
|
||||
enable: true
|
||||
# Use MSC2716 for backfilling?
|
||||
#
|
||||
# This requires a server with MSC2716 support, which is currently an experimental feature in Synapse.
|
||||
# It can be enabled by setting experimental_features -> msc2716_enabled to true in homeserver.yaml.
|
||||
msc2716: false
|
||||
# Use double puppets for backfilling?
|
||||
#
|
||||
# If using MSC2716, the double puppets must be in the appservice's user ID namespace
|
||||
# (because the bridge can't use the double puppet access token with batch sending).
|
||||
#
|
||||
# Even without MSC2716, bridging old messages with correct timestamps requires the double
|
||||
# puppets to be in an appservice namespace, or the server to be modified to allow
|
||||
# overriding timestamps anyway.
|
||||
#
|
||||
# Also note that adding users to the appservice namespace may have unexpected side effects,
|
||||
# as described in https://docs.mau.fi/bridges/general/double-puppeting.html#appservice-method
|
||||
double_puppet_backfill: false
|
||||
# Whether or not to enable backfilling in normal groups.
|
||||
# Normal groups have numerous technical problems in Telegram, and backfilling normal groups
|
||||
# will likely cause problems if there are multiple Matrix users in the group.
|
||||
@@ -405,10 +388,9 @@ bridge:
|
||||
# Set to -1 to let any chat be unread.
|
||||
unread_hours_threshold: 720
|
||||
|
||||
# Forward backfilling limits. These apply to both MSC2716 and legacy backfill.
|
||||
# Forward backfilling limits.
|
||||
#
|
||||
# Using a negative initial limit is not recommended, as it would try to backfill everything in a single batch.
|
||||
# MSC2716 and the incremental settings are meant for backfilling everything incrementally rather than at once.
|
||||
forward_limits:
|
||||
# Number of messages to backfill immediately after creating a portal.
|
||||
initial:
|
||||
@@ -425,7 +407,7 @@ bridge:
|
||||
# Timeout for forward backfills in seconds. If you have a high limit, you'll have to increase this too.
|
||||
forward_timeout: 900
|
||||
|
||||
# Settings for incremental backfill of history. These only apply when using MSC2716.
|
||||
# Settings for incremental backfill of history. These only apply to Beeper, as upstream abandoned MSC2716.
|
||||
incremental:
|
||||
# Maximum number of messages to backfill per batch.
|
||||
messages_per_batch: 100
|
||||
|
||||
@@ -61,21 +61,6 @@ class MatrixHandler(BaseMatrixHandler):
|
||||
|
||||
self._previously_typing = {}
|
||||
|
||||
async def check_versions(self) -> None:
|
||||
await super().check_versions()
|
||||
if self.config["bridge.backfill.msc2716"] and not (
|
||||
support := self.versions.supports("org.matrix.msc2716")
|
||||
):
|
||||
self.log.fatal(
|
||||
"Backfilling with MSC2716 is enabled in bridge config, but "
|
||||
+ (
|
||||
"batch sending is not enabled on homeserver"
|
||||
if support is False
|
||||
else "homeserver does not support batch sending"
|
||||
)
|
||||
)
|
||||
sys.exit(18)
|
||||
|
||||
async def handle_puppet_group_invite(
|
||||
self,
|
||||
room_id: RoomID,
|
||||
|
||||
+24
-126
@@ -252,7 +252,6 @@ if TYPE_CHECKING:
|
||||
StateBridge = EventType.find("m.bridge", EventType.Class.STATE)
|
||||
StateHalfShotBridge = EventType.find("uk.half-shot.bridge", EventType.Class.STATE)
|
||||
DummyPortalCreated = EventType.find("fi.mau.dummy.portal_created", EventType.Class.MESSAGE)
|
||||
StateMarker = EventType.find("org.matrix.msc2716.marker", EventType.Class.STATE)
|
||||
|
||||
InviteList = Union[UserID, List[UserID]]
|
||||
UpdateTyping = Union[UpdateUserTyping, UpdateChatUserTyping, UpdateChannelUserTyping]
|
||||
@@ -299,8 +298,6 @@ class Portal(DBPortal, BasePortal):
|
||||
|
||||
backfill_lock: SimpleLock
|
||||
backfill_method_lock: asyncio.Lock
|
||||
backfill_leave: set[IntentAPI] | None
|
||||
backfill_msc2716: bool
|
||||
backfill_enable: bool
|
||||
|
||||
alias: RoomAlias | None
|
||||
@@ -321,7 +318,6 @@ class Portal(DBPortal, BasePortal):
|
||||
_sponsored_seen: dict[UserID, bool]
|
||||
_new_messages_after_sponsored: bool
|
||||
|
||||
_member_list_cache: dict[EventID, set[UserID]]
|
||||
_prev_reaction_poll: dict[UserID, float]
|
||||
|
||||
_msg_conv: putil.TelegramMessageConverter
|
||||
@@ -380,7 +376,6 @@ class Portal(DBPortal, BasePortal):
|
||||
"Waiting for backfilling to finish before handling %s", log=self.log
|
||||
)
|
||||
self.backfill_method_lock = asyncio.Lock()
|
||||
self.backfill_leave = None
|
||||
|
||||
self.dedup = putil.PortalDedup(self)
|
||||
self.send_lock = putil.PortalSendLock()
|
||||
@@ -395,7 +390,6 @@ class Portal(DBPortal, BasePortal):
|
||||
self._new_messages_after_sponsored = True
|
||||
self._bridging_blocked_at_runtime = False
|
||||
|
||||
self._member_list_cache = {}
|
||||
self._prev_reaction_poll = defaultdict(lambda: 0.0)
|
||||
|
||||
self._msg_conv = putil.TelegramMessageConverter(self)
|
||||
@@ -492,7 +486,6 @@ class Portal(DBPortal, BasePortal):
|
||||
cls.filter_list = cls.config["bridge.filter.list"]
|
||||
cls.filter_users = cls.config["bridge.filter.users"]
|
||||
cls.hs_domain = cls.config["homeserver.domain"]
|
||||
cls.backfill_msc2716 = cls.config["bridge.backfill.msc2716"]
|
||||
cls.backfill_enable = cls.config["bridge.backfill.enable"]
|
||||
cls.alias_template = SimpleTemplate(
|
||||
cls.config["bridge.alias_template"],
|
||||
@@ -1065,18 +1058,14 @@ class Portal(DBPortal, BasePortal):
|
||||
self.first_event_id = await self.main_intent.send_message_event(
|
||||
self.mxid, DummyPortalCreated, {}
|
||||
)
|
||||
if not self.bridge.homeserver_software.is_hungry:
|
||||
self._member_list_cache[self.first_event_id] = set(
|
||||
(await self.main_intent.get_joined_members(self.mxid)).keys()
|
||||
)
|
||||
await self.save()
|
||||
|
||||
if self.backfill_enable and (isinstance(user, u.User) or not self.backfill_msc2716):
|
||||
if self.backfill_enable:
|
||||
try:
|
||||
await self.forward_backfill(user, initial=True, client=client)
|
||||
except Exception:
|
||||
self.log.exception("Error in initial backfill")
|
||||
if self.backfill_msc2716:
|
||||
if self._enable_batch_sending:
|
||||
await self.enqueue_backfill(user, priority=50)
|
||||
|
||||
return self.mxid
|
||||
@@ -2837,6 +2826,10 @@ class Portal(DBPortal, BasePortal):
|
||||
def _default_max_batches(self) -> int:
|
||||
return self.config[f"bridge.backfill.incremental.max_batches.{self._backfill_config_type}"]
|
||||
|
||||
@property
|
||||
def _enable_batch_sending(self) -> bool:
|
||||
return self.bridge.matrix.versions.supports("com.beeper.batch_sending")
|
||||
|
||||
async def enqueue_backfill(
|
||||
self,
|
||||
source: u.User,
|
||||
@@ -2926,14 +2919,9 @@ class Portal(DBPortal, BasePortal):
|
||||
if not self.config["bridge.backfill.normal_groups"] and self.peer_type == "chat":
|
||||
return "Backfilling normal groups is disabled in the bridge config"
|
||||
tg_space = source.tgid if self.peer_type != "channel" else self.tgid
|
||||
prev_event_id = self.first_event_id
|
||||
if forward:
|
||||
last_in_room = await DBMessage.find_last(self.mxid, tg_space)
|
||||
if last_in_room:
|
||||
prev_event_id = last_in_room.mxid
|
||||
min_id = last_in_room.tgid
|
||||
else:
|
||||
min_id = 0
|
||||
min_id = last_in_room.tgid if last_in_room else 0
|
||||
if last_tgid is None:
|
||||
messages = await source.client.get_messages(self.peer, limit=1)
|
||||
if not messages:
|
||||
@@ -2964,29 +2952,10 @@ class Portal(DBPortal, BasePortal):
|
||||
f"Backfilling up to {req.messages_per_batch} historical messages "
|
||||
f"before {anchor_id} ({anchor_source}) through {source.mxid}"
|
||||
)
|
||||
insertion_id, event_count, message_count, lowest_id = await self._backfill_messages(
|
||||
source, client, forward, anchor_id, limit, prev_event_id
|
||||
event_count, message_count, lowest_id = await self._backfill_messages(
|
||||
source, client, forward, anchor_id, limit
|
||||
)
|
||||
if prev_event_id == self.first_event_id:
|
||||
if insertion_id and not self.base_insertion_id:
|
||||
self.base_insertion_id = insertion_id
|
||||
elif not insertion_id:
|
||||
insertion_id = self.base_insertion_id
|
||||
await self.save()
|
||||
if (
|
||||
event_count > 0
|
||||
and self.backfill_msc2716
|
||||
and (not forward or not self.bridge.homeserver_software.is_hungry)
|
||||
):
|
||||
await self.main_intent.send_state_event(
|
||||
self.mxid,
|
||||
StateMarker,
|
||||
{
|
||||
"org.matrix.msc2716.marker.insertion": insertion_id,
|
||||
"com.beeper.timestamp": int(time.time() * 1000),
|
||||
},
|
||||
state_key=insertion_id,
|
||||
)
|
||||
if forward:
|
||||
self.log.debug(f"Forward backfill finished with {event_count}/{message_count} events")
|
||||
elif message_count > 0 and lowest_id and lowest_id > 1:
|
||||
@@ -3005,42 +2974,11 @@ class Portal(DBPortal, BasePortal):
|
||||
self.log.debug("No more messages to backfill")
|
||||
return f"Backfilled {event_count} messages"
|
||||
|
||||
def _can_double_puppet_backfill(self, custom_mxid: UserID) -> bool:
|
||||
if not self.backfill_msc2716:
|
||||
return True
|
||||
if not self.config["bridge.backfill.double_puppet_backfill"]:
|
||||
return False
|
||||
if self.bridge.homeserver_software.is_hungry:
|
||||
return True
|
||||
|
||||
# Batch sending can only use local users, so don't allow double puppets on other servers.
|
||||
if custom_mxid[custom_mxid.index(":") + 1 :] != self.config["homeserver.domain"]:
|
||||
return False
|
||||
return True
|
||||
|
||||
async def _get_members_at(self, event_id: EventID) -> set[UserID]:
|
||||
try:
|
||||
return self._member_list_cache[event_id]
|
||||
except KeyError:
|
||||
pass
|
||||
# TODO cache the list in db?
|
||||
self.log.debug(f"Fetching member list at {event_id}")
|
||||
ctx = await self.main_intent.get_event_context(self.mxid, event_id, limit=0)
|
||||
members = {
|
||||
evt.state_key
|
||||
for evt in ctx.state
|
||||
if evt.type == EventType.ROOM_MEMBER and evt.content.membership == Membership.JOIN
|
||||
}
|
||||
self.log.debug(f"Found {len(members)} members at {event_id}")
|
||||
self._member_list_cache[event_id] = members
|
||||
return members
|
||||
|
||||
async def _convert_batch_msg(
|
||||
self,
|
||||
source: u.User,
|
||||
client: MautrixTelegramClient,
|
||||
msg: Message,
|
||||
add_member: Callable[[IntentAPI, str, ContentURI], Awaitable[None]],
|
||||
) -> tuple[putil.ConvertedMessage, IntentAPI]:
|
||||
if msg.from_id and isinstance(msg.from_id, (PeerUser, PeerChannel)):
|
||||
sender = await p.Puppet.get_by_peer(msg.from_id)
|
||||
@@ -3058,10 +2996,12 @@ class Portal(DBPortal, BasePortal):
|
||||
await sender.update_info(source, entity, client_override=client)
|
||||
else:
|
||||
intent = self.main_intent
|
||||
if intent.api.is_real_user and not self._can_double_puppet_backfill(intent.mxid):
|
||||
if (
|
||||
intent.api.is_real_user
|
||||
and not intent.api.is_real_user_as_token
|
||||
and not self._enable_batch_sending
|
||||
):
|
||||
intent = sender.default_mxid_intent
|
||||
if sender:
|
||||
await add_member(intent, sender.displayname, sender.avatar_url)
|
||||
is_bot = sender.is_bot if sender else False
|
||||
converted = await self._msg_conv.convert(
|
||||
source,
|
||||
@@ -3106,50 +3046,15 @@ class Portal(DBPortal, BasePortal):
|
||||
forward: bool,
|
||||
anchor_id: int,
|
||||
limit: int,
|
||||
prev_event_id: EventID,
|
||||
) -> tuple[EventID | None, int, int, TelegramID]:
|
||||
) -> tuple[int, int, TelegramID]:
|
||||
entity = await self.get_input_entity(source)
|
||||
events = []
|
||||
intents = []
|
||||
metas = []
|
||||
state_events = []
|
||||
do_batch_send = self.backfill_msc2716
|
||||
added_members = (
|
||||
await self._get_members_at(prev_event_id)
|
||||
if not self.bridge.homeserver_software.is_hungry and do_batch_send
|
||||
else set()
|
||||
)
|
||||
before_first_msg_timestamp = 0
|
||||
tg_space = self.tgid if self.peer_type == "channel" else source.tgid
|
||||
|
||||
async def add_member(intent: IntentAPI, displayname: str, avatar_url: ContentURI) -> None:
|
||||
if self.bridge.homeserver_software.is_hungry or intent.mxid in added_members:
|
||||
return
|
||||
added_members.add(intent.mxid)
|
||||
if not do_batch_send:
|
||||
# TODO leave these members?
|
||||
await intent.ensure_joined(self.mxid)
|
||||
return
|
||||
invite_event = BatchSendStateEvent(
|
||||
type=EventType.ROOM_MEMBER,
|
||||
state_key=intent.mxid,
|
||||
sender=self.main_intent.mxid,
|
||||
timestamp=before_first_msg_timestamp,
|
||||
content=MemberStateEventContent(
|
||||
membership=Membership.INVITE,
|
||||
displayname=displayname,
|
||||
avatar_url=avatar_url,
|
||||
),
|
||||
)
|
||||
join_event = attr.evolve(
|
||||
invite_event,
|
||||
content=attr.evolve(invite_event.content, membership=Membership.JOIN),
|
||||
sender=intent.mxid,
|
||||
)
|
||||
state_events.append(invite_event)
|
||||
state_events.append(join_event)
|
||||
|
||||
lowest_id = 0
|
||||
first_id_found = False
|
||||
first_id = anchor_id
|
||||
message_count = 0
|
||||
minmax = {"min_id": anchor_id} if forward else {"max_id": anchor_id}
|
||||
@@ -3174,11 +3079,11 @@ class Portal(DBPortal, BasePortal):
|
||||
continue
|
||||
if not lowest_id or msg.id < lowest_id:
|
||||
lowest_id = msg.id
|
||||
if not before_first_msg_timestamp:
|
||||
if not first_id_found:
|
||||
first_id = msg.id
|
||||
before_first_msg_timestamp = int(msg.date.timestamp() * 1000) - 1
|
||||
first_id_found = True
|
||||
|
||||
converted, intent = await self._convert_batch_msg(source, client, msg, add_member)
|
||||
converted, intent = await self._convert_batch_msg(source, client, msg)
|
||||
if converted is None:
|
||||
continue
|
||||
d_event_id = None
|
||||
@@ -3197,28 +3102,21 @@ class Portal(DBPortal, BasePortal):
|
||||
f"Didn't get any events to send out of {message_count} messages fetched "
|
||||
f"(first received ID: {first_id}, lowest: {lowest_id})"
|
||||
)
|
||||
return None, 0, message_count, lowest_id
|
||||
return 0, message_count, lowest_id
|
||||
self.log.debug(
|
||||
f"Got {len(events)} events to send out of {message_count} messages fetched "
|
||||
f"(first received ID: {first_id}, lowest: {lowest_id})"
|
||||
)
|
||||
if do_batch_send:
|
||||
resp = await self.main_intent.batch_send(
|
||||
if self._enable_batch_sending:
|
||||
resp = await self.main_intent.beeper_batch_send(
|
||||
self.mxid,
|
||||
prev_event_id,
|
||||
batch_id=self.next_batch_id if not forward else None,
|
||||
# We iterated the events in reverse chronological order,
|
||||
# so reverse them before sending
|
||||
events=list(reversed(events)),
|
||||
state_events_at_start=state_events,
|
||||
beeper_new_messages=forward,
|
||||
forward=forward,
|
||||
)
|
||||
if prev_event_id == self.first_event_id and resp.next_batch_id:
|
||||
self.next_batch_id = resp.next_batch_id
|
||||
base_insertion_event_id = resp.base_insertion_event_id
|
||||
event_ids = resp.event_ids
|
||||
else:
|
||||
base_insertion_event_id = None
|
||||
event_ids = [
|
||||
await intent.send_message_event(
|
||||
self.mxid, evt.type, evt.content, timestamp=evt.timestamp
|
||||
@@ -3243,7 +3141,7 @@ class Portal(DBPortal, BasePortal):
|
||||
if msg is not None
|
||||
]
|
||||
)
|
||||
return base_insertion_event_id, len(events), message_count, lowest_id
|
||||
return len(events), message_count, lowest_id
|
||||
|
||||
def _split_dm_reaction_counts(self, counts: list[ReactionCount]) -> list[MessagePeerReaction]:
|
||||
reactions = []
|
||||
|
||||
Reference in New Issue
Block a user