From 8e8360a9920d79450415c5c9a2aac8cfd7ecdd67 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Mon, 15 Nov 2021 14:25:01 -0700 Subject: [PATCH] portal/matrix: report message send checkpoint on all message types --- mautrix_telegram/portal/matrix.py | 169 ++++++++++++++++++++++-------- 1 file changed, 124 insertions(+), 45 deletions(-) diff --git a/mautrix_telegram/portal/matrix.py b/mautrix_telegram/portal/matrix.py index 1b2b68a9..05c0df7b 100644 --- a/mautrix_telegram/portal/matrix.py +++ b/mautrix_telegram/portal/matrix.py @@ -33,9 +33,10 @@ from telethon.tl.types import (DocumentAttributeFilename, DocumentAttributeImage UpdateNewMessage, InputMediaUploadedDocument, InputMediaUploadedPhoto) -from mautrix.types import (EventID, RoomID, UserID, ContentURI, MessageType, MessageEventContent, - TextMessageEventContent, MediaMessageEventContent, Format, - LocationMessageEventContent, ImageInfo, VideoInfo) +from mautrix.types import (EventID, EventType, RoomID, UserID, ContentURI, MessageType, + MessageEventContent, TextMessageEventContent, MediaMessageEventContent, + Format, LocationMessageEventContent, ImageInfo, VideoInfo) +from mautrix.util.message_send_checkpoint import MessageSendCheckpointStatus from ..types import TelegramID from ..db import Message as DBMessage @@ -225,11 +226,13 @@ class PortalMatrix(BasePortal, ABC): elif content.msgtype == MessageType.EMOTE: await self._apply_emote_format(sender, content) - async def _handle_matrix_text(self, sender_id: TelegramID, event_id: EventID, + async def _handle_matrix_text(self, sender: 'u.User', logged_in: bool, event_id: EventID, space: TelegramID, client: 'MautrixTelegramClient', - content: TextMessageEventContent, reply_to: TelegramID) -> None: + content: TextMessageEventContent, reply_to: Optional[TelegramID] + ) -> None: message, entities = await formatter.matrix_to_telegram(client, text=content.body, html=content.formatted(Format.HTML)) + sender_id = sender.tgid if logged_in else self.bot.tgid async with self.send_lock(sender_id): lp = self.get_config("telegram_link_preview") if content.get_edit(): @@ -240,16 +243,28 @@ class PortalMatrix(BasePortal, ABC): link_preview=lp) self._add_telegram_message_to_db(event_id, space, -1, response) return - response = await client.send_message(self.peer, message, reply_to=reply_to, - formatting_entities=entities, - link_preview=lp) - self._add_telegram_message_to_db(event_id, space, 0, response) - await self._send_delivery_receipt(event_id) + try: + response = await client.send_message(self.peer, message, reply_to=reply_to, + formatting_entities=entities, + link_preview=lp) + except Exception: + raise + else: + sender.send_remote_checkpoint( + MessageSendCheckpointStatus.SUCCESS, + event_id, + self.mxid, + EventType.ROOM_MESSAGE, + message_type=content.msgtype, + ) + self._add_telegram_message_to_db(event_id, space, 0, response) + await self._send_delivery_receipt(event_id) - async def _handle_matrix_file(self, sender_id: TelegramID, event_id: EventID, + async def _handle_matrix_file(self, sender: 'u.User', logged_in: bool, event_id: EventID, space: TelegramID, client: 'MautrixTelegramClient', content: MediaMessageEventContent, reply_to: TelegramID, caption: TextMessageEventContent = None) -> None: + sender_id = sender.tgid if logged_in else self.bot.tgid mime = content.info.mimetype if isinstance(content.info, (ImageInfo, VideoInfo)): w, h = content.info.width, content.info.height @@ -264,9 +279,8 @@ class PortalMatrix(BasePortal, ABC): else: if content.file: if not decrypt_attachment: - self.log.warning(f"Can't bridge encrypted media event {event_id}:" - " matrix-nio not installed") - return + raise Exception(f"Can't bridge encrypted media event {event_id}: matrix-nio " + "is not installed") file = await self.main_intent.download_media(content.file.url) file = decrypt_attachment(file, content.file.key.key, content.file.hashes.get("sha256"), content.file.iv) @@ -304,15 +318,26 @@ class PortalMatrix(BasePortal, ABC): if await self._matrix_document_edit(client, content, space, capt, media, event_id): return try: - response = await client.send_media(self.peer, media, reply_to=reply_to, - caption=capt, entities=entities) - except (PhotoInvalidDimensionsError, PhotoSaveFileInvalidError, PhotoExtInvalidError): - media = InputMediaUploadedDocument(file=media.file, mime_type=mime, - attributes=attributes) - response = await client.send_media(self.peer, media, reply_to=reply_to, - caption=capt, entities=entities) - self._add_telegram_message_to_db(event_id, space, 0, response) - await self._send_delivery_receipt(event_id) + try: + response = await client.send_media(self.peer, media, reply_to=reply_to, + caption=capt, entities=entities) + except (PhotoInvalidDimensionsError, PhotoSaveFileInvalidError, PhotoExtInvalidError): + media = InputMediaUploadedDocument(file=media.file, mime_type=mime, + attributes=attributes) + response = await client.send_media(self.peer, media, reply_to=reply_to, + caption=capt, entities=entities) + except Exception: + raise + else: + sender.send_remote_checkpoint( + MessageSendCheckpointStatus.SUCCESS, + event_id, + self.mxid, + EventType.ROOM_MESSAGE, + message_type=content.msgtype, + ) + self._add_telegram_message_to_db(event_id, space, 0, response) + await self._send_delivery_receipt(event_id) async def _matrix_document_edit(self, client: 'MautrixTelegramClient', content: MessageEventContent, space: TelegramID, @@ -327,10 +352,11 @@ class PortalMatrix(BasePortal, ABC): return True return False - async def _handle_matrix_location(self, sender_id: TelegramID, event_id: EventID, + async def _handle_matrix_location(self, sender: 'u.User', logged_in: bool, event_id: EventID, space: TelegramID, client: 'MautrixTelegramClient', content: LocationMessageEventContent, reply_to: TelegramID ) -> None: + sender_id = sender.tgid if logged_in else self.bot.tgid try: lat, long = content.geo_uri[len("geo:"):].split(";")[0].split(",") lat, long = float(lat), float(long) @@ -343,10 +369,21 @@ class PortalMatrix(BasePortal, ABC): async with self.send_lock(sender_id): if await self._matrix_document_edit(client, content, space, caption, media, event_id): return - response = await client.send_media(self.peer, media, reply_to=reply_to, - caption=caption, entities=entities) - self._add_telegram_message_to_db(event_id, space, 0, response) - await self._send_delivery_receipt(event_id) + try: + response = await client.send_media(self.peer, media, reply_to=reply_to, + caption=caption, entities=entities) + except Exception: + raise + else: + self._add_telegram_message_to_db(event_id, space, 0, response) + sender.send_remote_checkpoint( + MessageSendCheckpointStatus.SUCCESS, + event_id, + self.mxid, + EventType.ROOM_MESSAGE, + message_type=content.msgtype, + ) + await self._send_delivery_receipt(event_id) def _add_telegram_message_to_db(self, event_id: EventID, space: TelegramID, edit_index: int, response: TypeMessage) -> None: @@ -362,7 +399,19 @@ class PortalMatrix(BasePortal, ABC): mxid=event_id, edit_index=edit_index).insert() - async def _send_bridge_error(self, msg: str) -> None: + async def _send_bridge_error(self, sender: 'u.User', err: Exception, event_id: EventID, + event_type: EventType, + message_type: Optional[MessageType] = None, + msg: Optional[str] = None, confirmed: bool = False) -> None: + sender.send_remote_checkpoint( + MessageSendCheckpointStatus.PERM_FAILURE, + event_id, + self.mxid, + event_type, + message_type=message_type, + error=err, + ) + if config["bridge.delivery_error_reports"]: await self._send_message(self.main_intent, TextMessageEventContent(msgtype=MessageType.NOTICE, body=msg)) @@ -372,10 +421,25 @@ class PortalMatrix(BasePortal, ABC): try: await self._handle_matrix_message(sender, content, event_id) except RPCError as e: - if config["bridge.delivery_error_reports"]: - await self._send_bridge_error( - f"\u26a0 Your message may not have been bridged: {e}") + self.log.exception(f"RPCError while bridging {event_id}: {e}") + await self._send_bridge_error( + sender, + e, + event_id, + EventType.ROOM_MESSAGE, + message_type=content.msgtype, + msg=f"\u26a0 Your message may not have been bridged: {e}", + ) raise + except Exception as e: + self.log.exception(f"Failed to bridge {event_id}: {e}") + await self._send_bridge_error( + sender, + e, + event_id, + EventType.ROOM_MESSAGE, + message_type=content.msgtype, + ) async def _handle_matrix_message(self, sender: 'u.User', content: MessageEventContent, event_id: EventID) -> None: @@ -385,7 +449,6 @@ class PortalMatrix(BasePortal, ABC): logged_in = not await sender.needs_relaybot(self) client = sender.client if logged_in else self.bot.client - sender_id = sender.tgid if logged_in else self.bot.tgid space = (self.tgid if self.peer_type == "channel" # Channels have their own ID space else (sender.tgid if logged_in else self.bot.tgid)) reply_to = formatter.matrix_reply_to_telegram(content, space, room_id=self.mxid) @@ -397,14 +460,15 @@ class PortalMatrix(BasePortal, ABC): bridge_notices = self.get_config("bridge_notices.default") excepted = sender.mxid in self.get_config("bridge_notices.exceptions") if not bridge_notices and not excepted: - return + raise Exception("Notices are not configured to be bridged.") if content.msgtype in (MessageType.TEXT, MessageType.EMOTE, MessageType.NOTICE): await self._pre_process_matrix_message(sender, not logged_in, content) - await self._handle_matrix_text(sender_id, event_id, space, client, content, reply_to) + await self._handle_matrix_text(sender, logged_in, event_id, space, client, content, + reply_to) elif content.msgtype == MessageType.LOCATION: await self._pre_process_matrix_message(sender, not logged_in, content) - await self._handle_matrix_location(sender_id, event_id, space, client, content, + await self._handle_matrix_location(sender, logged_in, event_id, space, client, content, reply_to) elif content.msgtype in media: content["net.maunium.telegram.internal.filename"] = content.body @@ -418,11 +482,12 @@ class PortalMatrix(BasePortal, ABC): if caption_content: caption_content.msgtype = content.msgtype await self._pre_process_matrix_message(sender, not logged_in, caption_content) - await self._handle_matrix_file(sender_id, event_id, space, client, content, reply_to, - caption_content) + await self._handle_matrix_file(sender, logged_in, event_id, space, client, content, + reply_to, caption_content) else: - self.log.debug("Didn't handle Matrix event {event_id} due to unknown msgtype {content.msgtype}") + self.log.debug(f"Didn't handle Matrix event {event_id} due to unknown msgtype {content.msgtype}") self.log.trace("Unhandled Matrix event content: %s", content) + raise Exception(f"Unhandled msgtype {content.msgtype}") async def handle_matrix_unpin_all(self, sender: 'u.User', pin_event_id: EventID) -> None: await sender.client(UnpinAllMessagesRequest(peer=self.peer)) @@ -444,22 +509,36 @@ class PortalMatrix(BasePortal, ABC): async def handle_matrix_deletion(self, deleter: 'u.User', event_id: EventID, redaction_event_id: EventID) -> None: + try: + await self._handle_matrix_deletion(deleter, event_id, redaction_event_id) + except Exception as e: + await self._send_bridge_error(deleter, e, event_id, EventType.ROOM_REDACTION) + else: + deleter.send_remote_checkpoint( + MessageSendCheckpointStatus.SUCCESS, + event_id, + self.mxid, + EventType.ROOM_REDACTION, + ) + await self._send_delivery_receipt(redaction_event_id) + + async def _handle_matrix_deletion(self, deleter: 'u.User', event_id: EventID, + redaction_event_id: EventID) -> None: real_deleter = deleter if not await deleter.needs_relaybot(self) else self.bot space = self.tgid if self.peer_type == "channel" else real_deleter.tgid message = DBMessage.get_by_mxid(event_id, self.mxid, space) if not message: - self.log.trace(f"Ignoring Matrix redaction of unknown event {event_id}") + raise Exception(f"Ignoring Matrix redaction of unknown event {event_id}") elif message.redacted: - self.log.debug("Ignoring Matrix redaction of already redacted event " - f"{message.mxid} in {message.mx_room}") + raise Exception("Ignoring Matrix redaction of already redacted event " + f"{message.mxid} in {message.mx_room}") elif message.edit_index != 0: message.edit(redacted=True) - self.log.debug("Ignoring Matrix redaction of edit event " - f"{message.mxid} in {message.mx_room}") + raise Exception("Ignoring Matrix redaction of edit event " + f"{message.mxid} in {message.mx_room}") else: message.edit(redacted=True) await real_deleter.client.delete_messages(self.peer, [message.tgid]) - await self._send_delivery_receipt(redaction_event_id) async def _update_telegram_power_level(self, sender: 'u.User', user_id: TelegramID, level: int) -> None: