Merge pull request #692 from mautrix/sumner/bri-827-add-bridge-and-remote-message-tracking

Use message send checkpoints
This commit is contained in:
Sumner Evans
2021-11-17 15:23:50 -07:00
committed by GitHub
2 changed files with 127 additions and 45 deletions
+2
View File
@@ -14,6 +14,8 @@ homeserver:
# If set, the bridge will make POST requests to this URL whenever a user's Telegram connection state changes.
# The bridge will use the appservice as_token to authorize requests.
status_endpoint: null
# Endpoint for reporting per-message status.
message_send_checkpoint_endpoint: null
# Application service host/registration related details
# Changing these values requires regeneration of the registration.
+125 -45
View File
@@ -33,9 +33,10 @@ from telethon.tl.types import (DocumentAttributeFilename, DocumentAttributeImage
UpdateNewMessage, InputMediaUploadedDocument,
InputMediaUploadedPhoto)
from mautrix.types import (EventID, RoomID, UserID, ContentURI, MessageType, MessageEventContent,
TextMessageEventContent, MediaMessageEventContent, Format,
LocationMessageEventContent, ImageInfo, VideoInfo)
from mautrix.types import (EventID, EventType, RoomID, UserID, ContentURI, MessageType,
MessageEventContent, TextMessageEventContent, MediaMessageEventContent,
Format, LocationMessageEventContent, ImageInfo, VideoInfo)
from mautrix.util.message_send_checkpoint import MessageSendCheckpointStatus
from ..types import TelegramID
from ..db import Message as DBMessage
@@ -225,11 +226,13 @@ class PortalMatrix(BasePortal, ABC):
elif content.msgtype == MessageType.EMOTE:
await self._apply_emote_format(sender, content)
async def _handle_matrix_text(self, sender_id: TelegramID, event_id: EventID,
async def _handle_matrix_text(self, sender: 'u.User', logged_in: bool, event_id: EventID,
space: TelegramID, client: 'MautrixTelegramClient',
content: TextMessageEventContent, reply_to: TelegramID) -> None:
content: TextMessageEventContent, reply_to: Optional[TelegramID]
) -> None:
message, entities = await formatter.matrix_to_telegram(client, text=content.body,
html=content.formatted(Format.HTML))
sender_id = sender.tgid if logged_in else self.bot.tgid
async with self.send_lock(sender_id):
lp = self.get_config("telegram_link_preview")
if content.get_edit():
@@ -240,16 +243,28 @@ class PortalMatrix(BasePortal, ABC):
link_preview=lp)
self._add_telegram_message_to_db(event_id, space, -1, response)
return
response = await client.send_message(self.peer, message, reply_to=reply_to,
formatting_entities=entities,
link_preview=lp)
self._add_telegram_message_to_db(event_id, space, 0, response)
await self._send_delivery_receipt(event_id)
try:
response = await client.send_message(self.peer, message, reply_to=reply_to,
formatting_entities=entities,
link_preview=lp)
except Exception:
raise
else:
sender.send_remote_checkpoint(
MessageSendCheckpointStatus.SUCCESS,
event_id,
self.mxid,
EventType.ROOM_MESSAGE,
message_type=content.msgtype,
)
self._add_telegram_message_to_db(event_id, space, 0, response)
await self._send_delivery_receipt(event_id)
async def _handle_matrix_file(self, sender_id: TelegramID, event_id: EventID,
async def _handle_matrix_file(self, sender: 'u.User', logged_in: bool, event_id: EventID,
space: TelegramID, client: 'MautrixTelegramClient',
content: MediaMessageEventContent, reply_to: TelegramID,
caption: TextMessageEventContent = None) -> None:
sender_id = sender.tgid if logged_in else self.bot.tgid
mime = content.info.mimetype
if isinstance(content.info, (ImageInfo, VideoInfo)):
w, h = content.info.width, content.info.height
@@ -264,9 +279,8 @@ class PortalMatrix(BasePortal, ABC):
else:
if content.file:
if not decrypt_attachment:
self.log.warning(f"Can't bridge encrypted media event {event_id}:"
" matrix-nio not installed")
return
raise Exception(f"Can't bridge encrypted media event {event_id}: "
"encryption dependencies not installed")
file = await self.main_intent.download_media(content.file.url)
file = decrypt_attachment(file, content.file.key.key,
content.file.hashes.get("sha256"), content.file.iv)
@@ -304,15 +318,26 @@ class PortalMatrix(BasePortal, ABC):
if await self._matrix_document_edit(client, content, space, capt, media, event_id):
return
try:
response = await client.send_media(self.peer, media, reply_to=reply_to,
caption=capt, entities=entities)
except (PhotoInvalidDimensionsError, PhotoSaveFileInvalidError, PhotoExtInvalidError):
media = InputMediaUploadedDocument(file=media.file, mime_type=mime,
attributes=attributes)
response = await client.send_media(self.peer, media, reply_to=reply_to,
caption=capt, entities=entities)
self._add_telegram_message_to_db(event_id, space, 0, response)
await self._send_delivery_receipt(event_id)
try:
response = await client.send_media(self.peer, media, reply_to=reply_to,
caption=capt, entities=entities)
except (PhotoInvalidDimensionsError, PhotoSaveFileInvalidError, PhotoExtInvalidError):
media = InputMediaUploadedDocument(file=media.file, mime_type=mime,
attributes=attributes)
response = await client.send_media(self.peer, media, reply_to=reply_to,
caption=capt, entities=entities)
except Exception:
raise
else:
sender.send_remote_checkpoint(
MessageSendCheckpointStatus.SUCCESS,
event_id,
self.mxid,
EventType.ROOM_MESSAGE,
message_type=content.msgtype,
)
self._add_telegram_message_to_db(event_id, space, 0, response)
await self._send_delivery_receipt(event_id)
async def _matrix_document_edit(self, client: 'MautrixTelegramClient',
content: MessageEventContent, space: TelegramID,
@@ -327,10 +352,11 @@ class PortalMatrix(BasePortal, ABC):
return True
return False
async def _handle_matrix_location(self, sender_id: TelegramID, event_id: EventID,
async def _handle_matrix_location(self, sender: 'u.User', logged_in: bool, event_id: EventID,
space: TelegramID, client: 'MautrixTelegramClient',
content: LocationMessageEventContent, reply_to: TelegramID
) -> None:
sender_id = sender.tgid if logged_in else self.bot.tgid
try:
lat, long = content.geo_uri[len("geo:"):].split(";")[0].split(",")
lat, long = float(lat), float(long)
@@ -343,10 +369,21 @@ class PortalMatrix(BasePortal, ABC):
async with self.send_lock(sender_id):
if await self._matrix_document_edit(client, content, space, caption, media, event_id):
return
response = await client.send_media(self.peer, media, reply_to=reply_to,
caption=caption, entities=entities)
self._add_telegram_message_to_db(event_id, space, 0, response)
await self._send_delivery_receipt(event_id)
try:
response = await client.send_media(self.peer, media, reply_to=reply_to,
caption=caption, entities=entities)
except Exception:
raise
else:
self._add_telegram_message_to_db(event_id, space, 0, response)
sender.send_remote_checkpoint(
MessageSendCheckpointStatus.SUCCESS,
event_id,
self.mxid,
EventType.ROOM_MESSAGE,
message_type=content.msgtype,
)
await self._send_delivery_receipt(event_id)
def _add_telegram_message_to_db(self, event_id: EventID, space: TelegramID,
edit_index: int, response: TypeMessage) -> None:
@@ -362,7 +399,19 @@ class PortalMatrix(BasePortal, ABC):
mxid=event_id,
edit_index=edit_index).insert()
async def _send_bridge_error(self, msg: str) -> None:
async def _send_bridge_error(self, sender: 'u.User', err: Exception, event_id: EventID,
event_type: EventType,
message_type: Optional[MessageType] = None,
msg: Optional[str] = None, confirmed: bool = False) -> None:
sender.send_remote_checkpoint(
MessageSendCheckpointStatus.PERM_FAILURE,
event_id,
self.mxid,
event_type,
message_type=message_type,
error=err,
)
if config["bridge.delivery_error_reports"]:
await self._send_message(self.main_intent,
TextMessageEventContent(msgtype=MessageType.NOTICE, body=msg))
@@ -372,10 +421,25 @@ class PortalMatrix(BasePortal, ABC):
try:
await self._handle_matrix_message(sender, content, event_id)
except RPCError as e:
if config["bridge.delivery_error_reports"]:
await self._send_bridge_error(
f"\u26a0 Your message may not have been bridged: {e}")
self.log.exception(f"RPCError while bridging {event_id}: {e}")
await self._send_bridge_error(
sender,
e,
event_id,
EventType.ROOM_MESSAGE,
message_type=content.msgtype,
msg=f"\u26a0 Your message may not have been bridged: {e}",
)
raise
except Exception as e:
self.log.exception(f"Failed to bridge {event_id}: {e}")
await self._send_bridge_error(
sender,
e,
event_id,
EventType.ROOM_MESSAGE,
message_type=content.msgtype,
)
async def _handle_matrix_message(self, sender: 'u.User', content: MessageEventContent,
event_id: EventID) -> None:
@@ -385,7 +449,6 @@ class PortalMatrix(BasePortal, ABC):
logged_in = not await sender.needs_relaybot(self)
client = sender.client if logged_in else self.bot.client
sender_id = sender.tgid if logged_in else self.bot.tgid
space = (self.tgid if self.peer_type == "channel" # Channels have their own ID space
else (sender.tgid if logged_in else self.bot.tgid))
reply_to = formatter.matrix_reply_to_telegram(content, space, room_id=self.mxid)
@@ -397,14 +460,15 @@ class PortalMatrix(BasePortal, ABC):
bridge_notices = self.get_config("bridge_notices.default")
excepted = sender.mxid in self.get_config("bridge_notices.exceptions")
if not bridge_notices and not excepted:
return
raise Exception("Notices are not configured to be bridged.")
if content.msgtype in (MessageType.TEXT, MessageType.EMOTE, MessageType.NOTICE):
await self._pre_process_matrix_message(sender, not logged_in, content)
await self._handle_matrix_text(sender_id, event_id, space, client, content, reply_to)
await self._handle_matrix_text(sender, logged_in, event_id, space, client, content,
reply_to)
elif content.msgtype == MessageType.LOCATION:
await self._pre_process_matrix_message(sender, not logged_in, content)
await self._handle_matrix_location(sender_id, event_id, space, client, content,
await self._handle_matrix_location(sender, logged_in, event_id, space, client, content,
reply_to)
elif content.msgtype in media:
content["net.maunium.telegram.internal.filename"] = content.body
@@ -418,11 +482,12 @@ class PortalMatrix(BasePortal, ABC):
if caption_content:
caption_content.msgtype = content.msgtype
await self._pre_process_matrix_message(sender, not logged_in, caption_content)
await self._handle_matrix_file(sender_id, event_id, space, client, content, reply_to,
caption_content)
await self._handle_matrix_file(sender, logged_in, event_id, space, client, content,
reply_to, caption_content)
else:
self.log.debug("Didn't handle Matrix event {event_id} due to unknown msgtype {content.msgtype}")
self.log.debug(f"Didn't handle Matrix event {event_id} due to unknown msgtype {content.msgtype}")
self.log.trace("Unhandled Matrix event content: %s", content)
raise Exception(f"Unhandled msgtype {content.msgtype}")
async def handle_matrix_unpin_all(self, sender: 'u.User', pin_event_id: EventID) -> None:
await sender.client(UnpinAllMessagesRequest(peer=self.peer))
@@ -444,22 +509,37 @@ class PortalMatrix(BasePortal, ABC):
async def handle_matrix_deletion(self, deleter: 'u.User', event_id: EventID,
redaction_event_id: EventID) -> None:
try:
await self._handle_matrix_deletion(deleter, event_id, redaction_event_id)
except Exception as e:
self.log.debug(str(e))
await self._send_bridge_error(deleter, e, event_id, EventType.ROOM_REDACTION)
else:
deleter.send_remote_checkpoint(
MessageSendCheckpointStatus.SUCCESS,
event_id,
self.mxid,
EventType.ROOM_REDACTION,
)
await self._send_delivery_receipt(redaction_event_id)
async def _handle_matrix_deletion(self, deleter: 'u.User', event_id: EventID,
redaction_event_id: EventID) -> None:
real_deleter = deleter if not await deleter.needs_relaybot(self) else self.bot
space = self.tgid if self.peer_type == "channel" else real_deleter.tgid
message = DBMessage.get_by_mxid(event_id, self.mxid, space)
if not message:
self.log.trace(f"Ignoring Matrix redaction of unknown event {event_id}")
raise Exception(f"Ignoring Matrix redaction of unknown event {event_id}")
elif message.redacted:
self.log.debug("Ignoring Matrix redaction of already redacted event "
f"{message.mxid} in {message.mx_room}")
raise Exception("Ignoring Matrix redaction of already redacted event "
f"{message.mxid} in {message.mx_room}")
elif message.edit_index != 0:
message.edit(redacted=True)
self.log.debug("Ignoring Matrix redaction of edit event "
f"{message.mxid} in {message.mx_room}")
raise Exception("Ignoring Matrix redaction of edit event "
f"{message.mxid} in {message.mx_room}")
else:
message.edit(redacted=True)
await real_deleter.client.delete_messages(self.peer, [message.tgid])
await self._send_delivery_receipt(redaction_event_id)
async def _update_telegram_power_level(self, sender: 'u.User', user_id: TelegramID,
level: int) -> None: