diff --git a/mautrix_telegram/formatter/from_matrix/__init__.py b/mautrix_telegram/formatter/from_matrix/__init__.py
index b98058a0..e6d033ef 100644
--- a/mautrix_telegram/formatter/from_matrix/__init__.py
+++ b/mautrix_telegram/formatter/from_matrix/__init__.py
@@ -13,7 +13,7 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
-from typing import Optional, List, Tuple, Callable, Pattern, Match, TYPE_CHECKING, Dict, Any
+from typing import Optional, List, Tuple, Callable, Pattern, Match, TYPE_CHECKING
import re
import logging
@@ -21,7 +21,7 @@ from telethon.tl.types import (MessageEntityMention, MessageEntityMentionName, M
TypeMessageEntity)
from telethon.helpers import add_surrogate, del_surrogate
-from mautrix.types import RoomID
+from mautrix.types import RoomID, MessageEventContent
from ... import puppet as pu
from ...types import TelegramID
@@ -90,26 +90,12 @@ def matrix_to_telegram(html: str) -> ParsedMessage:
raise FormatError(f"Failed to convert Matrix format: {html}") from e
-def matrix_reply_to_telegram(content: Dict[str, Any], tg_space: TelegramID,
+def matrix_reply_to_telegram(content: MessageEventContent, tg_space: TelegramID,
room_id: Optional[RoomID] = None) -> Optional[TelegramID]:
- relates_to = content.get("m.relates_to", None) or {}
- if not relates_to:
- return None
- reply = (relates_to if relates_to.get("rel_type", None) == "m.reference"
- else relates_to.get("m.in_reply_to", None) or {})
- if not reply:
- return None
- room_id = room_id or reply.get("room_id", None)
- event_id = reply.get("event_id", None)
+ event_id = content.get_reply_to()
if not event_id:
return
-
- try:
- if content["format"] == "org.matrix.custom.html":
- content["formatted_body"] = trim_reply_fallback_html(content["formatted_body"])
- except KeyError:
- pass
- content["body"] = trim_reply_fallback_text(content["body"])
+ content.trim_reply_fallback()
message = DBMessage.get_by_mxid(event_id, room_id, tg_space)
if message:
diff --git a/mautrix_telegram/portal/base.py b/mautrix_telegram/portal/base.py
index 5561e0d6..ec93b44e 100644
--- a/mautrix_telegram/portal/base.py
+++ b/mautrix_telegram/portal/base.py
@@ -456,8 +456,8 @@ class BasePortal(ABC):
pass
@abstractmethod
- def handle_matrix_power_levels(self, sender: 'u.User', new_levels: PowerLevelStateEventContent,
- old_levels: PowerLevelStateEventContent) -> Awaitable[None]:
+ def handle_matrix_power_levels(self, sender: 'u.User', new_levels: Dict[UserID, int],
+ old_levels: Dict[UserID, int]) -> Awaitable[None]:
pass
# endregion
diff --git a/mautrix_telegram/portal/portal_matrix.py b/mautrix_telegram/portal/portal_matrix.py
index a0139619..3e1225b0 100644
--- a/mautrix_telegram/portal/portal_matrix.py
+++ b/mautrix_telegram/portal/portal_matrix.py
@@ -37,8 +37,9 @@ from telethon.tl.types import (
SendMessageCancelAction, SendMessageTypingAction, TypeInputPeer, TypeMessageEntity,
UpdateNewMessage, InputMediaUploadedDocument)
-from mautrix.types import (EventID, RoomID, UserID, ContentURI, MessageType,
- TextMessageEventContent, Format)
+from mautrix.types import (EventID, RoomID, UserID, ContentURI, MessageType, MessageEventContent,
+ TextMessageEventContent, MediaMessageEventContent, Format,
+ LocationMessageEventContent)
from mautrix.bridge import BasePortal as MautrixBasePortal
from ..types import TelegramID
@@ -181,38 +182,31 @@ class PortalMatrix(BasePortal, MautrixBasePortal, ABC):
# We'll just assume the user is already in the chat.
pass
- async def _apply_msg_format(self, sender: 'u.User', msgtype: str, message: Dict[str, Any]
+ async def _apply_msg_format(self, sender: 'u.User', content: MessageEventContent
) -> None:
- if "formatted_body" not in message:
- message["format"] = "org.matrix.custom.html"
- message["formatted_body"] = escape_html(message.get("body", "")).replace("\n", "
")
- body = message["formatted_body"]
+ if not content.formatted_body or content.format != Format.HTML:
+ content.format = Format.HTML
+ content.formatted_body = escape_html(content.body).replace("\n", "
")
- tpl = (self.get_config(f"message_formats.[{msgtype}]")
+ tpl = (self.get_config(f"message_formats.[{content.msgtype.value}]")
or "$sender_displayname: $message")
displayname = await self.get_displayname(sender)
tpl_args = dict(sender_mxid=sender.mxid,
sender_username=sender.mxid_localpart,
sender_displayname=escape_html(displayname),
- message=body)
- message["formatted_body"] = Template(tpl).safe_substitute(tpl_args)
+ message=content.formatted_body)
+ content.formatted_body = Template(tpl).safe_substitute(tpl_args)
async def _pre_process_matrix_message(self, sender: 'u.User', use_relaybot: bool,
- message: Dict[str, Any]) -> None:
- msgtype = message.get("msgtype", "m.text")
- if msgtype == "m.emote":
- await self._apply_msg_format(sender, msgtype, message)
- if "m.new_content" in message:
- await self._apply_msg_format(sender, msgtype, message["m.new_content"])
- message["m.new_content"]["msgtype"] = "m.text"
- message["msgtype"] = "m.text"
+ content: MessageEventContent) -> None:
+ if content.msgtype == MessageType.EMOTE:
+ await self._apply_msg_format(sender, content)
+ content.msgtype = MessageType.TEXT
elif use_relaybot:
- await self._apply_msg_format(sender, msgtype, message)
- if "m.new_content" in message:
- await self._apply_msg_format(sender, msgtype, message["m.new_content"])
+ await self._apply_msg_format(sender, content)
@staticmethod
- def _matrix_event_to_entities(event: Union[str, TextMessageEventContent]
+ def _matrix_event_to_entities(event: Union[str, MessageEventContent]
) -> Tuple[str, Optional[List[TypeMessageEntity]]]:
try:
if isinstance(event, str):
@@ -227,57 +221,51 @@ class PortalMatrix(BasePortal, MautrixBasePortal, ABC):
async def _handle_matrix_text(self, sender_id: TelegramID, event_id: EventID,
space: TelegramID, client: 'MautrixTelegramClient',
- message: Dict, reply_to: TelegramID) -> None:
+ content: TextMessageEventContent, reply_to: TelegramID) -> None:
async with self.send_lock(sender_id):
lp = self.get_config("telegram_link_preview")
- relates_to = message.get("m.relates_to", None) or {}
- if relates_to.get("rel_type", None) == "m.replace":
- orig_msg = DBMessage.get_by_mxid(relates_to.get("event_id", ""), self.mxid, space)
- if orig_msg and "m.new_content" in message:
- message = message["m.new_content"]
- response = await client.edit_message(self.peer, orig_msg.tgid, message,
+ if content.get_edit():
+ orig_msg = DBMessage.get_by_mxid(content.get_edit(), self.mxid, space)
+ if orig_msg:
+ response = await client.edit_message(self.peer, orig_msg.tgid, content,
parse_mode=self._matrix_event_to_entities,
link_preview=lp)
self._add_telegram_message_to_db(event_id, space, -1, response)
return
- response = await client.send_message(self.peer, message, reply_to=reply_to,
+ response = await client.send_message(self.peer, content, reply_to=reply_to,
parse_mode=self._matrix_event_to_entities,
link_preview=lp)
self._add_telegram_message_to_db(event_id, space, 0, response)
- async def _handle_matrix_file(self, msgtype: MessageType, sender_id: TelegramID,
- event_id: EventID, space: TelegramID,
- client: 'MautrixTelegramClient', message: dict,
- reply_to: TelegramID) -> None:
- file = await self.main_intent.download_media(message["url"])
+ async def _handle_matrix_file(self, sender_id: TelegramID, event_id: EventID,
+ space: TelegramID, client: 'MautrixTelegramClient',
+ content: MediaMessageEventContent, reply_to: TelegramID) -> None:
+ file = await self.main_intent.download_media(content.url)
- info = message.get("info", {})
- mime = info.get("mimetype", None)
+ mime = content.info.mimetype
- w, h = None, None
+ w, h = content.info.width, content.info.height
- if msgtype == MessageType.STICKER:
+ if content.msgtype == MessageType.STICKER:
if mime != "image/gif":
mime, file, w, h = util.convert_image(file, source_mime=mime, target_type="webp")
else:
# Remove sticker description
- message["mxtg_filename"] = "sticker.gif"
- message["body"] = ""
- elif "w" in info and "h" in info:
- w, h = info["w"], info["h"]
+ content["net.maunium.telegram.internal.filename"] = "sticker.gif"
+ content.body = ""
- file_name = self._get_file_meta(message["mxtg_filename"], mime)
+ file_name = self._get_file_meta(content["net.maunium.telegram.internal.filename"], mime)
attributes = [DocumentAttributeFilename(file_name=file_name)]
if w and h:
attributes.append(DocumentAttributeImageSize(w, h))
- caption = message["body"] if message["body"].lower() != file_name.lower() else None
+ caption = content.body if content.body.lower() != file_name.lower() else None
media = await client.upload_file_direct(
file, mime, attributes, file_name,
max_image_size=config["bridge.image_as_file_size"] * 1000 ** 2)
async with self.send_lock(sender_id):
- if await self._matrix_document_edit(client, message, space, caption, media, event_id):
+ if await self._matrix_document_edit(client, content, space, caption, media, event_id):
return
try:
response = await client.send_media(self.peer, media, reply_to=reply_to,
@@ -289,12 +277,11 @@ class PortalMatrix(BasePortal, MautrixBasePortal, ABC):
caption=caption)
self._add_telegram_message_to_db(event_id, space, 0, response)
- async def _matrix_document_edit(self, client: 'MautrixTelegramClient', message: dict,
- space: TelegramID, caption: str, media: Any, event_id: EventID
- ) -> bool:
- relates_to = message.get("m.relates_to", None) or {}
- if relates_to.get("rel_type", None) == "m.replace":
- orig_msg = DBMessage.get_by_mxid(relates_to.get("event_id", ""), self.mxid, space)
+ async def _matrix_document_edit(self, client: 'MautrixTelegramClient',
+ content: MessageEventContent, space: TelegramID,
+ caption: str, media: Any, event_id: EventID) -> bool:
+ if content.get_edit():
+ orig_msg = DBMessage.get_by_mxid(content.get_edit(), self.mxid, space)
if orig_msg:
response = await client.edit_message(self.peer, orig_msg.tgid,
caption, file=media)
@@ -304,18 +291,19 @@ class PortalMatrix(BasePortal, MautrixBasePortal, ABC):
async def _handle_matrix_location(self, sender_id: TelegramID, event_id: EventID,
space: TelegramID, client: 'MautrixTelegramClient',
- message: Dict[str, Any], reply_to: TelegramID) -> None:
+ content: LocationMessageEventContent, reply_to: TelegramID
+ ) -> None:
try:
- lat, long = message["geo_uri"][len("geo:"):].split(",")
+ lat, long = content.geo_uri[len("geo:"):].split(",")
lat, long = float(lat), float(long)
except (KeyError, ValueError):
self.log.exception("Failed to parse location")
return None
- caption, entities = self._matrix_event_to_entities(message)
+ caption, entities = self._matrix_event_to_entities(content)
media = MessageMediaGeo(geo=GeoPoint(lat, long, access_hash=0))
async with self.send_lock(sender_id):
- if await self._matrix_document_edit(client, message, space, caption, media, event_id):
+ if await self._matrix_document_edit(client, content, space, caption, media, event_id):
return
response = await client.send_media(self.peer, media, reply_to=reply_to,
caption=caption, entities=entities)
@@ -335,14 +323,14 @@ class PortalMatrix(BasePortal, MautrixBasePortal, ABC):
mxid=event_id,
edit_index=edit_index).insert()
- async def handle_matrix_message(self, sender: 'u.User', message: Dict[str, Any],
+ async def handle_matrix_message(self, sender: 'u.User', content: MessageEventContent,
event_id: EventID) -> None:
- if "body" not in message or "msgtype" not in message:
+ if not content.body or not content.msgtype:
self.log.debug(f"Ignoring message {event_id} in {self.mxid} without body or msgtype")
return
puppet = p.Puppet.get_by_custom_mxid(sender.mxid)
- if puppet and message.get("net.maunium.telegram.puppet", False):
+ if puppet and content.get("net.maunium.telegram.puppet", False):
self.log.debug("Ignoring puppet-sent message by confirmed puppet user %s", sender.mxid)
return
@@ -351,28 +339,27 @@ class PortalMatrix(BasePortal, MautrixBasePortal, ABC):
sender_id = sender.tgid if logged_in else self.bot.tgid
space = (self.tgid if self.peer_type == "channel" # Channels have their own ID space
else (sender.tgid if logged_in else self.bot.tgid))
- reply_to = formatter.matrix_reply_to_telegram(message, space, room_id=self.mxid)
+ reply_to = formatter.matrix_reply_to_telegram(content, space, room_id=self.mxid)
- message["mxtg_filename"] = message["body"]
- await self._pre_process_matrix_message(sender, not logged_in, message)
- msgtype = message["msgtype"]
+ content["net.maunium.telegram.internal.filename"] = content.body
+ await self._pre_process_matrix_message(sender, not logged_in, content)
- if msgtype == "m.notice":
+ if content.msgtype == MessageType.NOTICE:
bridge_notices = self.get_config("bridge_notices.default")
excepted = sender.mxid in self.get_config("bridge_notices.exceptions")
if not bridge_notices and not excepted:
return
- if msgtype == "m.text" or msgtype == "m.notice":
- await self._handle_matrix_text(sender_id, event_id, space, client, message, reply_to)
- elif msgtype == "m.location":
- await self._handle_matrix_location(sender_id, event_id, space, client, message,
+ if content.msgtype in (MessageType.TEXT, MessageType.NOTICE):
+ await self._handle_matrix_text(sender_id, event_id, space, client, content, reply_to)
+ elif content.msgtype == MessageType.LOCATION:
+ await self._handle_matrix_location(sender_id, event_id, space, client, content,
reply_to)
- elif msgtype in ("m.sticker", "m.image", "m.file", "m.audio", "m.video"):
- await self._handle_matrix_file(msgtype, sender_id, event_id, space, client, message,
- reply_to)
+ elif content.msgtype in (MessageType.STICKER, MessageType.IMAGE, MessageType.FILE,
+ MessageType.AUDIO, MessageType.VIDEO):
+ await self._handle_matrix_file(sender_id, event_id, space, client, content, reply_to)
else:
- self.log.debug(f"Unhandled Matrix event: {message}")
+ self.log.debug(f"Unhandled Matrix event: {content}")
async def handle_matrix_pin(self, sender: 'u.User',
pinned_message: Optional[EventID]) -> None:
@@ -418,9 +405,8 @@ class PortalMatrix(BasePortal, MautrixBasePortal, ABC):
EditAdminRequest(channel=await self.get_input_entity(sender),
user_id=user_id, admin_rights=rights))
- async def handle_matrix_power_levels(self, sender: 'u.User',
- new_users: Dict[UserID, int],
- old_users: Dict[str, int]) -> None:
+ async def handle_matrix_power_levels(self, sender: 'u.User', new_users: Dict[UserID, int],
+ old_users: Dict[UserID, int]) -> None:
# TODO handle all power level changes and bridge exact admin rights to supergroups/channels
for user, level in new_users.items():
if not user or user == self.main_intent.mxid or user == sender.mxid:
diff --git a/mautrix_telegram/portal/portal_metadata.py b/mautrix_telegram/portal/portal_metadata.py
index d995c55e..694477a4 100644
--- a/mautrix_telegram/portal/portal_metadata.py
+++ b/mautrix_telegram/portal/portal_metadata.py
@@ -155,7 +155,7 @@ class PortalMetadata(BasePortal, ABC):
if levels.get_user_level(self.main_intent.mxid) == 100:
levels = self._get_base_power_levels(levels, entity)
await self.main_intent.set_power_levels(self.mxid, levels)
- await self.handle_matrix_power_levels(source, levels, PowerLevelStateEventContent())
+ await self.handle_matrix_power_levels(source, levels.users, {})
async def invite_telegram(self, source: 'u.User',
puppet: Union[p.Puppet, 'AbstractUser']) -> None:
diff --git a/mautrix_telegram/portal/portal_telegram.py b/mautrix_telegram/portal/portal_telegram.py
index 1ec977b9..f8b3f0d6 100644
--- a/mautrix_telegram/portal/portal_telegram.py
+++ b/mautrix_telegram/portal/portal_telegram.py
@@ -13,7 +13,7 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
-from typing import Awaitable, Dict, List, Optional, Tuple, Union, TYPE_CHECKING
+from typing import Awaitable, Dict, List, Optional, Tuple, Union, NamedTuple, TYPE_CHECKING
from html import escape as escape_html
from abc import ABC
import random
@@ -37,7 +37,9 @@ from telethon.tl.types import (
UpdateUserTyping, MessageEntityPre, ChatPhotoEmpty)
from mautrix.appservice import IntentAPI
-from mautrix.types import EventID, UserID, ImageInfo, ThumbnailInfo
+from mautrix.types import (EventID, UserID, ImageInfo, ThumbnailInfo, RelatesTo, MessageType,
+ EventType, MediaMessageEventContent, TextMessageEventContent,
+ LocationMessageEventContent, Format)
from ..types import TelegramID
from ..db import Message as DBMessage, TelegramFile as DBTelegramFile
@@ -52,6 +54,8 @@ if TYPE_CHECKING:
InviteList = Union[UserID, List[UserID]]
TypeParticipant = Union[TypeChatParticipant, TypeChannelParticipant]
+DocAttrs = NamedTuple("DocAttrs", name=Optional[str], mime_type=Optional[str], is_sticker=bool,
+ sticker_alt=Optional[str], width=int, height=int)
config: Optional['Config'] = None
@@ -71,7 +75,7 @@ class PortalTelegram(BasePortal, ABC):
_: Union[UpdateUserTyping, UpdateChatUserTyping]) -> None:
await user.intent.set_typing(self.mxid, is_typing=True)
- def get_external_url(self, evt: Message) -> Optional[str]:
+ def _get_external_url(self, evt: Message) -> Optional[str]:
if self.peer_type == "channel" and self.username is not None:
return f"https://t.me/{self.username}/{evt.id}"
elif self.peer_type != "user":
@@ -90,7 +94,7 @@ class PortalTelegram(BasePortal, ABC):
evt, source, self.main_intent,
prefix_html=f"
",
prefix_text="Inline image: ")
- content.external_url = self.get_external_url(evt)
+ content.external_url = self._get_external_url(evt)
await intent.set_typing(self.mxid, is_typing=False)
return await intent.send_message(self.mxid, content, timestamp=evt.date)
info = ImageInfo(
@@ -101,42 +105,36 @@ class PortalTelegram(BasePortal, ABC):
await intent.set_typing(self.mxid, is_typing=False)
result = await intent.send_image(self.mxid, file.mxc, info=info, text=name,
relates_to=relates_to, timestamp=evt.date,
- external_url=self.get_external_url(evt))
+ external_url=self._get_external_url(evt))
if evt.message:
text, html, _ = await formatter.telegram_to_matrix(evt, source, self.main_intent,
no_reply_fallback=True)
result = await intent.send_text(self.mxid, text, html=html, timestamp=evt.date,
- external_url=self.get_external_url(evt))
+ external_url=self._get_external_url(evt))
return result
@staticmethod
- def _parse_telegram_document_attributes(attributes: List[TypeDocumentAttribute]) -> Dict:
- attrs = {
- "name": None,
- "mime_type": None,
- "is_sticker": False,
- "sticker_alt": None,
- "width": None,
- "height": None,
- } # type: Dict
+ def _parse_telegram_document_attributes(attributes: List[TypeDocumentAttribute]) -> DocAttrs:
+ attrs = DocAttrs(name=None, mime_type=None, is_sticker=False, sticker_alt=None,
+ width=0, height=0)
for attr in attributes:
if isinstance(attr, DocumentAttributeFilename):
- attrs["name"] = attrs["name"] or attr.file_name
- attrs["mime_type"], _ = mimetypes.guess_type(attr.file_name)
+ attrs.name = attrs.name or attr.file_name
+ attrs.mime_type, _ = mimetypes.guess_type(attr.file_name)
elif isinstance(attr, DocumentAttributeSticker):
- attrs["is_sticker"] = True
- attrs["sticker_alt"] = attr.alt
+ attrs.is_sticker = True
+ attrs.sticker_alt = attr.alt
elif isinstance(attr, DocumentAttributeVideo):
- attrs["width"], attrs["height"] = attr.w, attr.h
+ attrs.width, attrs.height = attr.w, attr.h
return attrs
@staticmethod
- def _parse_telegram_document_meta(evt: Message, file: DBTelegramFile, attrs: Dict,
+ def _parse_telegram_document_meta(evt: Message, file: DBTelegramFile, attrs: DocAttrs,
thumb_size: TypePhotoSize) -> Tuple[ImageInfo, str]:
document = evt.media.document
- name = evt.message or attrs["name"]
- if attrs["is_sticker"]:
- alt = attrs["sticker_alt"]
+ name = evt.message or attrs.name
+ if attrs.is_sticker:
+ alt = attrs.sticker_alt
if len(alt) > 0:
try:
name = f"{alt} ({unicodedata.name(alt[0]).lower()})"
@@ -150,12 +148,12 @@ class PortalTelegram(BasePortal, ABC):
mime_type = file.mime_type or document.mime_type
info = ImageInfo(size=file.size, mimetype=mime_type)
- if attrs["mime_type"] and not file.was_converted:
- file.mime_type = attrs["mime_type"] or file.mime_type
+ if attrs.mime_type and not file.was_converted:
+ file.mime_type = attrs.mime_type or file.mime_type
if file.width and file.height:
info.width, info.height = file.width, file.height
- elif attrs["width"] and attrs["height"]:
- info.width, info.height = attrs["width"], attrs["height"]
+ elif attrs.width and attrs.height:
+ info.width, info.height = attrs.width, attrs.height
if file.thumbnail:
info.thumbnail_url = file.thumbnail.mxc
@@ -167,13 +165,14 @@ class PortalTelegram(BasePortal, ABC):
return info, name
async def handle_telegram_document(self, source: 'AbstractUser', intent: IntentAPI,
- evt: Message, relates_to: dict = None) -> Optional[EventID]:
+ evt: Message, relates_to: RelatesTo = None
+ ) -> Optional[EventID]:
document = evt.media.document
attrs = self._parse_telegram_document_attributes(document.attributes)
if document.size > config["bridge.max_document_size"] * 1000 ** 2:
- name = attrs["name"] or ""
+ name = attrs.name or ""
caption = f"\n{evt.message}" if evt.message else ""
return await intent.send_notice(self.mxid, f"Too large file {name}{caption}")
@@ -183,7 +182,7 @@ class PortalTelegram(BasePortal, ABC):
thumb_loc = None
thumb_size = None
file = await util.transfer_file_to_matrix(source.client, intent, document, thumb_loc,
- is_sticker=attrs["is_sticker"])
+ is_sticker=attrs.is_sticker)
if not file:
return None
@@ -191,88 +190,62 @@ class PortalTelegram(BasePortal, ABC):
await intent.set_typing(self.mxid, is_typing=False)
- kwargs = {
- "room_id": self.mxid,
- "url": file.mxc,
- "info": info,
- "text": name,
- "relates_to": relates_to,
- "timestamp": evt.date,
- "external_url": self.get_external_url(evt)
- }
-
- if attrs["is_sticker"]:
- return await intent.send_sticker(**kwargs)
-
- mime_type = info["mimetype"]
- if mime_type.startswith("video/"):
- kwargs["file_type"] = "m.video"
- elif mime_type.startswith("audio/"):
- kwargs["file_type"] = "m.audio"
- elif mime_type.startswith("image/"):
- kwargs["file_type"] = "m.image"
- else:
- kwargs["file_type"] = "m.file"
- return await intent.send_file(**kwargs)
+ event_type = EventType.STICKER if attrs.is_sticker else EventType.ROOM_MESSAGE
+ content = MediaMessageEventContent(
+ body=name, info=info, url=file.mxc, relates_to=relates_to,
+ external_url=self._get_external_url(evt),
+ msgtype={
+ "video/": MessageType.VIDEO,
+ "audio/": MessageType.AUDIO,
+ "image/": MessageType.IMAGE,
+ }.get(info.mimetype[:6], default=MessageType.FILE))
+ return await intent.send_message_event(self.mxid, event_type, content, timestamp=evt.date)
def handle_telegram_location(self, _: 'AbstractUser', intent: IntentAPI, evt: Message,
relates_to: dict = None) -> Awaitable[EventID]:
- location = evt.media.geo
- long = location.long
- lat = location.lat
+ long = evt.media.geo.long
+ lat = evt.media.geo.lat
long_char = "E" if long > 0 else "W"
lat_char = "N" if lat > 0 else "S"
- rounded_long = round(long, 5)
- rounded_lat = round(lat, 5)
-
- body = f"{rounded_lat}° {lat_char}, {rounded_long}° {long_char}"
+ body = f"{round(lat, 5)}° {lat_char}, {round(long, 5)}° {long_char}"
url = f"https://maps.google.com/?q={lat},{long}"
- formatted_body = f"Location: {body}"
- # At least riot-web ignores formatting in m.location messages,
- # so we'll add a plaintext link.
- body = f"Location: {body}\n{url}"
+ content = LocationMessageEventContent(
+ msgtype=MessageType.LOCATION, geo_uri=f"geo:{lat},{long}",
+ body=f"Location: {body}\n{url}",
+ relates_to=relates_to, external_url=self._get_external_url(evt))
+ content["format"] = Format.HTML
+ content["formatted_body"] = f"Location: {body}"
- return intent.send_message(self.mxid, {
- "msgtype": "m.location",
- "geo_uri": f"geo:{lat},{long}",
- "body": body,
- "format": "org.matrix.custom.html",
- "formatted_body": formatted_body,
- "m.relates_to": relates_to or None,
- }, timestamp=evt.date, external_url=self.get_external_url(evt))
+ return intent.send_message(self.mxid, content, timestamp=evt.date)
async def handle_telegram_text(self, source: 'AbstractUser', intent: IntentAPI, is_bot: bool,
evt: Message) -> EventID:
self.log.debug(f"Sending {evt.message} to {self.mxid} by {intent.mxid}")
- text, html, relates_to = await formatter.telegram_to_matrix(evt, source, self.main_intent)
+ content = await formatter.telegram_to_matrix(evt, source, self.main_intent)
+ content.external_url = self._get_external_url(evt)
+ if is_bot and self.get_config("bot_messages_as_notices"):
+ content.msgtype = MessageType.NOTICE
await intent.set_typing(self.mxid, is_typing=False)
- msgtype = "m.notice" if is_bot and self.get_config("bot_messages_as_notices") else "m.text"
- return await intent.send_text(self.mxid, text, html=html, relates_to=relates_to,
- msgtype=msgtype, timestamp=evt.date,
- external_url=self.get_external_url(evt))
+ return await intent.send_message(self.mxid, content, timestamp=evt.date)
async def handle_telegram_unsupported(self, source: 'AbstractUser', intent: IntentAPI,
evt: Message, relates_to: dict = None) -> EventID:
override_text = ("This message is not supported on your version of Mautrix-Telegram. "
"Please check https://github.com/tulir/mautrix-telegram or ask your "
"bridge administrator about possible updates.")
- text, html, relates_to = await formatter.telegram_to_matrix(
+ content = await formatter.telegram_to_matrix(
evt, source, self.main_intent, override_text=override_text)
+ content.msgtype = MessageType.NOTICE
+ content.external_url = self._get_external_url(evt)
+ content["net.maunium.telegram.unsupported"] = True
await intent.set_typing(self.mxid, is_typing=False)
- return await intent.send_message(self.mxid, {
- "body": text,
- "msgtype": "m.notice",
- "format": "org.matrix.custom.html",
- "formatted_body": html,
- "m.relates_to": relates_to,
- "net.maunium.telegram.unsupported": True,
- }, timestamp=evt.date, external_url=self.get_external_url(evt))
+ return await intent.send_message(self.mxid, content, timestamp=evt.date)
async def handle_telegram_poll(self, source: 'AbstractUser', intent: IntentAPI, evt: Message,
- relates_to: dict) -> EventID:
- poll = evt.media.poll # type: Poll
+ relates_to: RelatesTo) -> EventID:
+ poll: Poll = evt.media.poll
poll_id = self._encode_msgid(source, evt)
_n = 0
@@ -282,21 +255,19 @@ class PortalTelegram(BasePortal, ABC):
_n += 1
return _n
- text = (f"Poll: {poll.question}\n"
- + "\n".join(f"{n()}. {answer.text}" for answer in poll.answers) +
- "\n"
- f"Vote with !tg vote {poll_id} ")
+ text_answers = "\n".join(f"{n()}. {answer.text}" for answer in poll.answers)
+ html_answers = "\n".join(f"{answer.text}" for answer in poll.answers)
+ content = TextMessageEventContent(
+ msgtype=MessageType.TEXT, format=Format.HTML,
+ body=f"Poll: {poll.question}\n{text_answers}\n"
+ f"Vote with !tg vote {poll_id} ",
+ formatted_body=f"Poll: {poll.question}
\n"
+ f"{html_answers}
\n"
+ f"Vote with !tg vote {poll_id} <choice number>",
+ relates_to=relates_to, external_url=self._get_external_url(evt))
- html = (f"Poll: {poll.question}
\n"
- f""
- + "\n".join(f"- {answer.text}
"
- for answer in poll.answers) +
- "
\n"
- f"Vote with !tg vote {poll_id} <choice number>")
await intent.set_typing(self.mxid, is_typing=False)
- return await intent.send_text(self.mxid, text, html=html, relates_to=relates_to,
- msgtype="m.text", timestamp=evt.date,
- external_url=self.get_external_url(evt))
+ return await intent.send_message(self.mxid, content, timestamp=evt.date)
@staticmethod
def _int_to_bytes(i: int) -> bytes:
@@ -322,31 +293,29 @@ class PortalTelegram(BasePortal, ABC):
return base64.b64encode(play_id).decode("utf-8").rstrip("=")
async def handle_telegram_game(self, source: 'AbstractUser', intent: IntentAPI,
- evt: Message, relates_to: dict = None) -> EventID:
+ evt: Message, relates_to: RelatesTo = None) -> EventID:
game = evt.media.game
play_id = self._encode_msgid(source, evt)
command = f"!tg play {play_id}"
override_text = f"Run {command} in your bridge management room to play {game.title}"
override_entities = [
MessageEntityPre(offset=len("Run "), length=len(command), language="")]
- text, html, relates_to = await formatter.telegram_to_matrix(
+
+ content = await formatter.telegram_to_matrix(
evt, source, self.main_intent,
override_text=override_text, override_entities=override_entities)
- await intent.set_typing(self.mxid, is_typing=False)
- return await intent.send_message(self.mxid, {
- "body": text,
- "msgtype": "m.notice",
- "format": "org.matrix.custom.html",
- "formatted_body": html,
- "m.relates_to": relates_to,
- "net.maunium.telegram.game": play_id,
- }, timestamp=evt.date, external_url=self.get_external_url(evt))
+ content.msgtype = MessageType.NOTICE
+ content.external_url = self._get_external_url(evt)
+ content["net.maunium.telegram.game"] = play_id
- async def handle_telegram_edit(self, source: 'AbstractUser', sender: p.Puppet,
- evt: Message) -> None:
+ await intent.set_typing(self.mxid, is_typing=False)
+ return await intent.send_message(self.mxid, content, timestamp=evt.date)
+
+ async def handle_telegram_edit(self, source: 'AbstractUser', sender: p.Puppet, evt: Message
+ ) -> None:
if not self.mxid:
return
- elif hasattr(evt, "media") and isinstance(evt.media, (MessageMediaGame,)):
+ elif hasattr(evt, "media") and isinstance(evt.media, MessageMediaGame):
self.log.debug("Ignoring game message edit event")
return
@@ -368,45 +337,38 @@ class PortalTelegram(BasePortal, ABC):
).insert()
return
- text, html, _ = await formatter.telegram_to_matrix(evt, source, self.main_intent)
+ content = await formatter.telegram_to_matrix(evt, source, self.main_intent,
+ no_reply_fallback=True)
editing_msg = DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space)
if not editing_msg:
self.log.info(f"Didn't find edited message {evt.id}@{tg_space} (src {source.tgid}) "
"in database.")
return
- msgtype = ("m.notice"
- if sender and sender.is_bot and self.get_config("bot_messages_as_notices")
- else "m.text")
- content = {
- "body": f"Edit: {text}",
- "msgtype": msgtype,
- "format": "org.matrix.custom.html",
- "formatted_body": (f"Edit: "
- f"{html or escape_html(text)}"),
- "external_url": self.get_external_url(evt),
- "m.new_content": {
- "body": text,
- "msgtype": "m.text",
- **({"format": "org.matrix.custom.html",
- "formatted_body": html} if html else {}),
- },
- "m.relates_to": {
- "rel_type": "m.replace",
- "event_id": editing_msg.mxid,
- },
- }
+ content.msgtype = (MessageType.NOTICE if (sender and sender.is_bot
+ and self.get_config("bot_messages_as_notices"))
+ else MessageType.TEXT)
+ content.external_url = self._get_external_url(evt)
+ content.set_edit(editing_msg.mxid)
+
+ # TODO remove this stuff once mautrix-python generates m.new_content
+ new_content = content.serialize()
+ del new_content["m.relates_to"]
+ content["m.new_content"] = new_content
+ content.body = f"Edit: {content.body}"
+ content.format = Format.HTML
+ content.formatted_body = (f"Edit: "
+ f"{content.formatted_body or escape_html(content.body)}")
intent = sender.intent if sender else self.main_intent
await intent.set_typing(self.mxid, is_typing=False)
- response = await intent.send_message(self.mxid, content)
- mxid = response["event_id"]
+ event_id = await intent.send_message(self.mxid, content)
prev_edit_msg = DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space, -1) or editing_msg
- DBMessage(mxid=mxid, mx_room=self.mxid, tg_space=tg_space, tgid=evt.id,
+ DBMessage(mxid=event_id, mx_room=self.mxid, tg_space=tg_space, tgid=TelegramID(evt.id),
edit_index=prev_edit_msg.edit_index + 1).insert()
- DBMessage.update_by_mxid(temporary_identifier, self.mxid, mxid=mxid)
+ DBMessage.update_by_mxid(temporary_identifier, self.mxid, mxid=event_id)
async def handle_telegram_message(self, source: 'AbstractUser', sender: p.Puppet,
evt: Message) -> None:
@@ -450,9 +412,9 @@ class PortalTelegram(BasePortal, ABC):
intent = sender.intent if sender else self.main_intent
if not media and evt.message:
is_bot = sender.is_bot if sender else False
- response = await self.handle_telegram_text(source, intent, is_bot, evt)
+ event_id = await self.handle_telegram_text(source, intent, is_bot, evt)
elif media:
- response = await {
+ event_id = await {
MessageMediaPhoto: self.handle_telegram_photo,
MessageMediaDocument: self.handle_telegram_document,
MessageMediaGeo: self.handle_telegram_location,
@@ -465,33 +427,31 @@ class PortalTelegram(BasePortal, ABC):
self.log.debug("Unhandled Telegram message: %s", evt)
return
- if not response:
+ if not event_id:
return
- mxid = response["event_id"]
-
- prev_id = self.dedup.update(evt, (mxid, tg_space), (temporary_identifier, tg_space))
+ prev_id = self.dedup.update(evt, (event_id, tg_space), (temporary_identifier, tg_space))
if prev_id:
- self.log.debug(f"Sent message {evt.id}@{tg_space} to Matrix as {mxid}. "
+ self.log.debug(f"Sent message {evt.id}@{tg_space} to Matrix as {event_id}. "
f"Temporary dedup identifier was {temporary_identifier}, "
f"but dedup map contained {prev_id[1]} instead! -- "
"This was probably a race condition caused by Telegram sending updates"
"to other clients before responding to the sender. I'll just redact "
"the likely duplicate message now.")
- await intent.redact(self.mxid, mxid)
+ await intent.redact(self.mxid, event_id)
return
self.log.debug("Handled Telegram message: %s", evt)
try:
- DBMessage(tgid=TelegramID(evt.id), mx_room=self.mxid, mxid=mxid,
+ DBMessage(tgid=TelegramID(evt.id), mx_room=self.mxid, mxid=event_id,
tg_space=tg_space, edit_index=0).insert()
- DBMessage.update_by_mxid(temporary_identifier, self.mxid, mxid=mxid)
+ DBMessage.update_by_mxid(temporary_identifier, self.mxid, mxid=event_id)
except IntegrityError as e:
self.log.exception(f"{e.__class__.__name__} while saving message mapping. "
"This might mean that an update was handled after it left the "
"dedup cache queue. You can try enabling bridge.deduplication."
"pre_db_check in the config.")
- await intent.redact(self.mxid, mxid)
+ await intent.redact(self.mxid, event_id)
async def _create_room_on_action(self, source: 'AbstractUser',
action: TypeMessageAction) -> bool:
@@ -544,9 +504,9 @@ class PortalTelegram(BasePortal, ABC):
levels = await self.main_intent.get_power_levels(self.mxid)
if user:
- levels["users"][user.mxid] = 50
+ levels.users[user.mxid] = 50
if puppet:
- levels["users"][puppet.mxid] = 50
+ levels.users[puppet.mxid] = 50
await self.main_intent.set_power_levels(self.mxid, levels)
async def receive_telegram_pin_sender(self, sender: p.Puppet) -> None:
@@ -578,9 +538,9 @@ class PortalTelegram(BasePortal, ABC):
async def set_telegram_admins_enabled(self, enabled: bool) -> None:
level = 50 if enabled else 10
levels = await self.main_intent.get_power_levels(self.mxid)
- levels["invite"] = level
- levels["events"]["m.room.name"] = level
- levels["events"]["m.room.avatar"] = level
+ levels.invite = level
+ levels.events[EventType.ROOM_NAME] = level
+ levels.events[EventType.ROOM_AVATAR] = level
await self.main_intent.set_power_levels(self.mxid, levels)