Separate Telegram message conversion code from Matrix sending
This commit is contained in:
@@ -98,7 +98,7 @@ def config_defaults(evt: CommandEvent) -> Awaitable[EventID]:
|
||||
"exceptions": evt.config["bridge.bridge_notices.exceptions"],
|
||||
},
|
||||
"bot_messages_as_notices": evt.config["bridge.bot_messages_as_notices"],
|
||||
"inline_images": evt.config["bridge.inline_images"],
|
||||
"caption_in_message": evt.config["bridge.caption_in_message"],
|
||||
"message_formats": evt.config["bridge.message_formats"],
|
||||
"emote_format": evt.config["bridge.emote_format"],
|
||||
"state_event_formats": evt.config["bridge.state_event_formats"],
|
||||
|
||||
@@ -68,5 +68,5 @@ async def user_has_power_level(
|
||||
await intent.get_power_levels(room_id)
|
||||
except MatrixRequestError:
|
||||
return False
|
||||
event_type = EventType.find(f"net.maunium.telegram.{event}", t_class=EventType.Class.STATE)
|
||||
event_type = EventType.find(f"fi.mau.telegram.{event}", t_class=EventType.Class.STATE)
|
||||
return await intent.state_store.has_power_level(room_id, sender.mxid, event_type)
|
||||
|
||||
@@ -138,7 +138,7 @@ class Config(BaseBridgeConfig):
|
||||
copy("bridge.login_shared_secret_map")
|
||||
copy("bridge.telegram_link_preview")
|
||||
copy("bridge.invite_link_resolve")
|
||||
copy("bridge.inline_images")
|
||||
copy("bridge.caption_in_message")
|
||||
copy("bridge.image_as_file_size")
|
||||
copy("bridge.image_as_file_pixels")
|
||||
copy("bridge.parallel_file_transfer")
|
||||
|
||||
@@ -206,9 +206,9 @@ bridge:
|
||||
# Whether or not the !tg join command should do a HTTP request
|
||||
# to resolve redirects in invite links.
|
||||
invite_link_resolve: false
|
||||
# Use inline images instead of a separate message for the caption.
|
||||
# N.B. Inline images are not supported on all clients (e.g. Element iOS/Android).
|
||||
inline_images: false
|
||||
# Send captions in the same message as images. This will send data compatible with both MSC2530 and MSC3552.
|
||||
# This is currently not supported in most clients.
|
||||
caption_in_message: false
|
||||
# Maximum size of image in megabytes before sending to Telegram as a document.
|
||||
image_as_file_size: 10
|
||||
# Maximum number of pixels in an image before sending to Telegram as a document. Defaults to 1280x1280 = 1638400.
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
from .from_matrix import matrix_reply_to_telegram, matrix_to_telegram
|
||||
from .from_telegram import telegram_reply_to_matrix, telegram_to_matrix
|
||||
from .from_telegram import telegram_to_matrix
|
||||
|
||||
@@ -48,15 +48,7 @@ from telethon.tl.types import (
|
||||
TypeMessageEntity,
|
||||
)
|
||||
|
||||
from mautrix.appservice import IntentAPI
|
||||
from mautrix.types import (
|
||||
EventType,
|
||||
Format,
|
||||
InReplyTo,
|
||||
MessageType,
|
||||
RelatesTo,
|
||||
TextMessageEventContent,
|
||||
)
|
||||
from mautrix.types import Format, MessageType, TextMessageEventContent
|
||||
|
||||
from .. import abstract_user as au, portal as po, puppet as pu, user as u
|
||||
from ..db import Message as DBMessage
|
||||
@@ -65,19 +57,6 @@ from ..types import TelegramID
|
||||
log: logging.Logger = logging.getLogger("mau.fmt.tg")
|
||||
|
||||
|
||||
async def telegram_reply_to_matrix(evt: Message, source: au.AbstractUser) -> RelatesTo | None:
|
||||
if evt.reply_to:
|
||||
space = (
|
||||
evt.peer_id.channel_id
|
||||
if isinstance(evt, Message) and isinstance(evt.peer_id, PeerChannel)
|
||||
else source.tgid
|
||||
)
|
||||
msg = await DBMessage.get_one_by_tgid(TelegramID(evt.reply_to.reply_to_msg_id), space)
|
||||
if msg:
|
||||
return RelatesTo(in_reply_to=InReplyTo(event_id=msg.mxid))
|
||||
return None
|
||||
|
||||
|
||||
async def _add_forward_header(
|
||||
source: au.AbstractUser, content: TextMessageEventContent, fwd_from: MessageFwdHeader
|
||||
) -> None:
|
||||
@@ -145,41 +124,11 @@ async def _add_forward_header(
|
||||
)
|
||||
|
||||
|
||||
async def _add_reply_header(
|
||||
source: au.AbstractUser, content: TextMessageEventContent, evt: Message, main_intent: IntentAPI
|
||||
) -> None:
|
||||
space = (
|
||||
evt.peer_id.channel_id
|
||||
if isinstance(evt, Message) and isinstance(evt.peer_id, PeerChannel)
|
||||
else source.tgid
|
||||
)
|
||||
|
||||
msg = await DBMessage.get_one_by_tgid(TelegramID(evt.reply_to.reply_to_msg_id), space)
|
||||
if not msg:
|
||||
return
|
||||
|
||||
try:
|
||||
event = await main_intent.get_event(msg.mx_room, msg.mxid)
|
||||
if event.type == EventType.ROOM_ENCRYPTED and source.bridge.matrix.e2ee:
|
||||
event = await source.bridge.matrix.e2ee.decrypt(event)
|
||||
if isinstance(event.content, TextMessageEventContent):
|
||||
event.content.trim_reply_fallback()
|
||||
puppet = await pu.Puppet.get_by_mxid(event.sender, create=False)
|
||||
content.set_reply(event, displayname=puppet.displayname if puppet else event.sender)
|
||||
except Exception:
|
||||
log.exception("Failed to get event to add reply fallback")
|
||||
content.set_reply(msg.mxid)
|
||||
|
||||
|
||||
async def telegram_to_matrix(
|
||||
evt: Message | SponsoredMessage,
|
||||
source: au.AbstractUser,
|
||||
main_intent: IntentAPI | None = None,
|
||||
prefix_text: str | None = None,
|
||||
prefix_html: str | None = None,
|
||||
override_text: str = None,
|
||||
override_entities: list[TypeMessageEntity] = None,
|
||||
no_reply_fallback: bool = False,
|
||||
require_html: bool = False,
|
||||
) -> TextMessageEventContent:
|
||||
content = TextMessageEventContent(
|
||||
@@ -195,18 +144,9 @@ async def telegram_to_matrix(
|
||||
if require_html:
|
||||
content.ensure_has_html()
|
||||
|
||||
if prefix_html:
|
||||
content.ensure_has_html()
|
||||
content.formatted_body = prefix_html + content.formatted_body
|
||||
if prefix_text:
|
||||
content.body = prefix_text + content.body
|
||||
|
||||
if getattr(evt, "fwd_from", None):
|
||||
await _add_forward_header(source, content, evt.fwd_from)
|
||||
|
||||
if getattr(evt, "reply_to", None) and not no_reply_fallback:
|
||||
await _add_reply_header(source, content, evt, main_intent)
|
||||
|
||||
if isinstance(evt, Message) and evt.post and evt.post_author:
|
||||
content.ensure_has_html()
|
||||
content.body += f"\n- {evt.post_author}"
|
||||
|
||||
+35
-637
@@ -15,28 +15,15 @@
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
AsyncGenerator,
|
||||
Awaitable,
|
||||
Callable,
|
||||
List,
|
||||
NamedTuple,
|
||||
Union,
|
||||
cast,
|
||||
)
|
||||
from typing import TYPE_CHECKING, Any, AsyncGenerator, Awaitable, Callable, List, Union, cast
|
||||
from datetime import datetime
|
||||
from html import escape as escape_html
|
||||
from sqlite3 import IntegrityError
|
||||
from string import Template
|
||||
import asyncio
|
||||
import base64
|
||||
import codecs
|
||||
import mimetypes
|
||||
import random
|
||||
import time
|
||||
import unicodedata
|
||||
|
||||
from asyncpg import UniqueViolationError
|
||||
from telethon.errors import (
|
||||
@@ -79,12 +66,9 @@ from telethon.tl.types import (
|
||||
ChatFull,
|
||||
ChatPhoto,
|
||||
ChatPhotoEmpty,
|
||||
Document,
|
||||
DocumentAttributeAnimated,
|
||||
DocumentAttributeAudio,
|
||||
DocumentAttributeFilename,
|
||||
DocumentAttributeImageSize,
|
||||
DocumentAttributeSticker,
|
||||
DocumentAttributeVideo,
|
||||
GeoPoint,
|
||||
InputChannel,
|
||||
@@ -95,7 +79,6 @@ from telethon.tl.types import (
|
||||
InputPeerChat,
|
||||
InputPeerPhotoFileLocation,
|
||||
InputPeerUser,
|
||||
InputPhotoFileLocation,
|
||||
InputUser,
|
||||
MessageActionChannelCreate,
|
||||
MessageActionChatAddUser,
|
||||
@@ -109,30 +92,15 @@ from telethon.tl.types import (
|
||||
MessageActionChatMigrateTo,
|
||||
MessageActionContactSignUp,
|
||||
MessageActionGameScore,
|
||||
MessageEntityPre,
|
||||
MessageMediaContact,
|
||||
MessageMediaDice,
|
||||
MessageMediaDocument,
|
||||
MessageMediaGame,
|
||||
MessageMediaGeo,
|
||||
MessageMediaGeoLive,
|
||||
MessageMediaPhoto,
|
||||
MessageMediaPoll,
|
||||
MessageMediaUnsupported,
|
||||
MessageMediaVenue,
|
||||
MessageMediaWebPage,
|
||||
MessagePeerReaction,
|
||||
MessageReactions,
|
||||
PeerChannel,
|
||||
PeerChat,
|
||||
PeerUser,
|
||||
Photo,
|
||||
PhotoCachedSize,
|
||||
PhotoEmpty,
|
||||
PhotoSize,
|
||||
PhotoSizeEmpty,
|
||||
PhotoSizeProgressive,
|
||||
Poll,
|
||||
ReactionCount,
|
||||
SendMessageCancelAction,
|
||||
SendMessageTypingAction,
|
||||
@@ -140,13 +108,11 @@ from telethon.tl.types import (
|
||||
TypeChannelParticipant,
|
||||
TypeChat,
|
||||
TypeChatParticipant,
|
||||
TypeDocumentAttribute,
|
||||
TypeInputChannel,
|
||||
TypeInputPeer,
|
||||
TypeMessage,
|
||||
TypeMessageAction,
|
||||
TypePeer,
|
||||
TypePhotoSize,
|
||||
TypeUser,
|
||||
TypeUserFull,
|
||||
TypeUserProfilePhoto,
|
||||
@@ -158,9 +124,8 @@ from telethon.tl.types import (
|
||||
UserFull,
|
||||
UserProfilePhoto,
|
||||
UserProfilePhotoEmpty,
|
||||
WebPage,
|
||||
)
|
||||
from telethon.utils import decode_waveform, encode_waveform
|
||||
from telethon.utils import encode_waveform
|
||||
import magic
|
||||
|
||||
from mautrix.appservice import DOUBLE_PUPPET_SOURCE_KEY, IntentAPI
|
||||
@@ -189,7 +154,6 @@ from mautrix.types import (
|
||||
RoomTopicStateEventContent,
|
||||
StateEventContent,
|
||||
TextMessageEventContent,
|
||||
ThumbnailInfo,
|
||||
UserID,
|
||||
VideoInfo,
|
||||
)
|
||||
@@ -198,14 +162,21 @@ from mautrix.util.message_send_checkpoint import MessageSendCheckpointStatus
|
||||
from mautrix.util.simple_lock import SimpleLock
|
||||
from mautrix.util.simple_template import SimpleTemplate
|
||||
|
||||
from . import abstract_user as au, formatter, portal_util as putil, puppet as p, user as u, util
|
||||
from . import (
|
||||
abstract_user as au,
|
||||
formatter,
|
||||
matrix as m,
|
||||
portal_util as putil,
|
||||
puppet as p,
|
||||
user as u,
|
||||
util,
|
||||
)
|
||||
from .config import Config
|
||||
from .db import (
|
||||
DisappearingMessage,
|
||||
Message as DBMessage,
|
||||
Portal as DBPortal,
|
||||
Reaction as DBReaction,
|
||||
TelegramFile as DBTelegramFile,
|
||||
)
|
||||
from .tgclient import MautrixTelegramClient
|
||||
from .types import TelegramID
|
||||
@@ -223,8 +194,6 @@ if TYPE_CHECKING:
|
||||
StateBridge = EventType.find("m.bridge", EventType.Class.STATE)
|
||||
StateHalfShotBridge = EventType.find("uk.half-shot.bridge", EventType.Class.STATE)
|
||||
DummyPortalCreated = EventType.find("fi.mau.dummy.portal_created", EventType.Class.MESSAGE)
|
||||
BEEPER_LINK_PREVIEWS_KEY = "com.beeper.linkpreviews"
|
||||
BEEPER_IMAGE_ENCRYPTION_KEY = "beeper:image:encryption"
|
||||
|
||||
InviteList = Union[UserID, List[UserID]]
|
||||
UpdateTyping = Union[UpdateUserTyping, UpdateChatUserTyping, UpdateChannelUserTyping]
|
||||
@@ -236,23 +205,10 @@ class BridgingError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class DocAttrs(NamedTuple):
|
||||
name: str | None
|
||||
mime_type: str | None
|
||||
is_sticker: bool
|
||||
sticker_alt: str | None
|
||||
width: int
|
||||
height: int
|
||||
is_gif: bool
|
||||
is_audio: bool
|
||||
is_voice: bool
|
||||
duration: int
|
||||
waveform: bytes
|
||||
|
||||
|
||||
class Portal(DBPortal, BasePortal):
|
||||
bot: "Bot"
|
||||
config: Config
|
||||
matrix: m.MatrixHandler
|
||||
disappearing_msg_class = DisappearingMessage
|
||||
|
||||
# Instance cache
|
||||
@@ -297,6 +253,8 @@ class Portal(DBPortal, BasePortal):
|
||||
_sponsored_seen: dict[UserID, bool]
|
||||
_new_messages_after_sponsored: bool
|
||||
|
||||
_msg_conv: putil.TelegramMessageConverter
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
tgid: TelegramID,
|
||||
@@ -362,6 +320,8 @@ class Portal(DBPortal, BasePortal):
|
||||
self._sponsored_seen = {}
|
||||
self._new_messages_after_sponsored = True
|
||||
|
||||
self._msg_conv = putil.TelegramMessageConverter(self)
|
||||
|
||||
# region Properties
|
||||
|
||||
@property
|
||||
@@ -1205,7 +1165,7 @@ class Portal(DBPortal, BasePortal):
|
||||
)
|
||||
photo_id = str(photo.photo_id)
|
||||
elif isinstance(photo, Photo):
|
||||
loc, _ = self._get_largest_photo_size(photo)
|
||||
loc, _ = self._msg_conv.get_largest_photo_size(photo)
|
||||
photo_id = str(loc.id)
|
||||
elif isinstance(photo, (UserProfilePhotoEmpty, ChatPhotoEmpty, PhotoEmpty, type(None))):
|
||||
photo_id = ""
|
||||
@@ -1626,7 +1586,7 @@ class Portal(DBPortal, BasePortal):
|
||||
w, h = content.info.width, content.info.height
|
||||
else:
|
||||
w = h = None
|
||||
file_name = content["net.maunium.telegram.internal.filename"]
|
||||
file_name = content["fi.mau.telegram.internal.filename"]
|
||||
max_image_size = self.config["bridge.image_as_file_size"] * 1000**2
|
||||
max_image_pixels = self.config["bridge.image_as_file_pixels"]
|
||||
|
||||
@@ -1918,7 +1878,7 @@ class Portal(DBPortal, BasePortal):
|
||||
sender, logged_in, event_id, space, client, content, reply_to
|
||||
)
|
||||
elif content.msgtype in media:
|
||||
content["net.maunium.telegram.internal.filename"] = content.body
|
||||
content["fi.mau.telegram.internal.filename"] = content.body
|
||||
try:
|
||||
caption_content: MessageEventContent = sender.command_status["caption"]
|
||||
reply_to = reply_to or await formatter.matrix_reply_to_telegram(
|
||||
@@ -2236,494 +2196,6 @@ class Portal(DBPortal, BasePortal):
|
||||
is_typing = isinstance(update.action, SendMessageTypingAction)
|
||||
await user.default_mxid_intent.set_typing(self.mxid, is_typing=is_typing)
|
||||
|
||||
def _get_external_url(self, evt: Message) -> str | None:
|
||||
if self.peer_type == "channel" and self.username is not None:
|
||||
return f"https://t.me/{self.username}/{evt.id}"
|
||||
elif self.peer_type != "user":
|
||||
return f"https://t.me/c/{self.tgid}/{evt.id}"
|
||||
return None
|
||||
|
||||
async def _handle_telegram_photo(
|
||||
self, source: au.AbstractUser, intent: IntentAPI, evt: Message, relates_to: RelatesTo
|
||||
) -> EventID | None:
|
||||
media: MessageMediaPhoto = evt.media
|
||||
if media.photo is None and media.ttl_seconds:
|
||||
return await self._send_message(
|
||||
intent,
|
||||
TextMessageEventContent(msgtype=MessageType.NOTICE, body="Photo has expired"),
|
||||
timestamp=evt.date,
|
||||
)
|
||||
loc, largest_size = self._get_largest_photo_size(media.photo)
|
||||
if loc is None:
|
||||
content = TextMessageEventContent(
|
||||
msgtype=MessageType.TEXT,
|
||||
body="Failed to bridge image",
|
||||
external_url=self._get_external_url(evt),
|
||||
)
|
||||
return await self._send_message(intent, content, timestamp=evt.date)
|
||||
file = await util.transfer_file_to_matrix(
|
||||
source.client,
|
||||
intent,
|
||||
loc,
|
||||
encrypt=self.encrypted,
|
||||
async_upload=self.config["homeserver.async_media"],
|
||||
)
|
||||
if not file:
|
||||
return None
|
||||
if self.get_config("inline_images") and (evt.message or evt.fwd_from or evt.reply_to):
|
||||
content = await formatter.telegram_to_matrix(
|
||||
evt,
|
||||
source,
|
||||
self.main_intent,
|
||||
prefix_html=f"<img src='{file.mxc}' alt='Inline Telegram photo'/><br/>",
|
||||
prefix_text="Inline image: ",
|
||||
)
|
||||
content.external_url = self._get_external_url(evt)
|
||||
await intent.set_typing(self.mxid, is_typing=False)
|
||||
return await self._send_message(intent, content, timestamp=evt.date)
|
||||
info = ImageInfo(
|
||||
height=largest_size.h,
|
||||
width=largest_size.w,
|
||||
orientation=0,
|
||||
mimetype=file.mime_type,
|
||||
size=self._photo_size_key(largest_size),
|
||||
)
|
||||
ext = sane_mimetypes.guess_extension(file.mime_type)
|
||||
name = f"disappearing_image{ext}" if media.ttl_seconds else f"image{ext}"
|
||||
await intent.set_typing(self.mxid, is_typing=False)
|
||||
content = MediaMessageEventContent(
|
||||
msgtype=MessageType.IMAGE,
|
||||
info=info,
|
||||
body=name,
|
||||
relates_to=relates_to,
|
||||
external_url=self._get_external_url(evt),
|
||||
)
|
||||
if file.decryption_info:
|
||||
content.file = file.decryption_info
|
||||
else:
|
||||
content.url = file.mxc
|
||||
result = await self._send_message(intent, content, timestamp=evt.date)
|
||||
if media.ttl_seconds:
|
||||
await DisappearingMessage(self.mxid, result, media.ttl_seconds).insert()
|
||||
if evt.message:
|
||||
caption_content = await formatter.telegram_to_matrix(
|
||||
evt, source, self.main_intent, no_reply_fallback=True
|
||||
)
|
||||
caption_content.external_url = content.external_url
|
||||
result = await self._send_message(intent, caption_content, timestamp=evt.date)
|
||||
if media.ttl_seconds:
|
||||
await DisappearingMessage(self.mxid, result, media.ttl_seconds).insert()
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def _parse_telegram_document_attributes(attributes: list[TypeDocumentAttribute]) -> DocAttrs:
|
||||
name, mime_type, is_sticker, sticker_alt, width, height = None, None, False, None, 0, 0
|
||||
is_gif, is_audio, is_voice, duration, waveform = False, False, False, 0, bytes()
|
||||
for attr in attributes:
|
||||
if isinstance(attr, DocumentAttributeFilename):
|
||||
name = name or attr.file_name
|
||||
mime_type, _ = mimetypes.guess_type(attr.file_name)
|
||||
elif isinstance(attr, DocumentAttributeSticker):
|
||||
is_sticker = True
|
||||
sticker_alt = attr.alt
|
||||
elif isinstance(attr, DocumentAttributeAnimated):
|
||||
is_gif = True
|
||||
elif isinstance(attr, DocumentAttributeVideo):
|
||||
width, height = attr.w, attr.h
|
||||
elif isinstance(attr, DocumentAttributeImageSize):
|
||||
width, height = attr.w, attr.h
|
||||
elif isinstance(attr, DocumentAttributeAudio):
|
||||
is_audio = True
|
||||
is_voice = attr.voice or False
|
||||
duration = attr.duration
|
||||
waveform = decode_waveform(attr.waveform) if attr.waveform else b""
|
||||
|
||||
return DocAttrs(
|
||||
name,
|
||||
mime_type,
|
||||
is_sticker,
|
||||
sticker_alt,
|
||||
width,
|
||||
height,
|
||||
is_gif,
|
||||
is_audio,
|
||||
is_voice,
|
||||
duration,
|
||||
waveform,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _parse_telegram_document_meta(
|
||||
evt: Message, file: DBTelegramFile, attrs: DocAttrs, thumb_size: TypePhotoSize
|
||||
) -> tuple[ImageInfo, str]:
|
||||
document = evt.media.document
|
||||
name = attrs.name
|
||||
if attrs.is_sticker:
|
||||
alt = attrs.sticker_alt
|
||||
if len(alt) > 0:
|
||||
try:
|
||||
name = f"{alt} ({unicodedata.name(alt[0]).lower()})"
|
||||
except ValueError:
|
||||
name = alt
|
||||
|
||||
generic_types = ("text/plain", "application/octet-stream")
|
||||
if file.mime_type in generic_types and document.mime_type not in generic_types:
|
||||
mime_type = document.mime_type or file.mime_type
|
||||
elif file.mime_type == "application/ogg":
|
||||
mime_type = "audio/ogg"
|
||||
else:
|
||||
mime_type = file.mime_type or document.mime_type
|
||||
info = ImageInfo(size=file.size, mimetype=mime_type)
|
||||
|
||||
if attrs.mime_type and not file.was_converted:
|
||||
file.mime_type = attrs.mime_type or file.mime_type
|
||||
if file.width and file.height:
|
||||
info.width, info.height = file.width, file.height
|
||||
elif attrs.width and attrs.height:
|
||||
info.width, info.height = attrs.width, attrs.height
|
||||
|
||||
if file.thumbnail:
|
||||
if file.thumbnail.decryption_info:
|
||||
info.thumbnail_file = file.thumbnail.decryption_info
|
||||
else:
|
||||
info.thumbnail_url = file.thumbnail.mxc
|
||||
info.thumbnail_info = ThumbnailInfo(
|
||||
mimetype=file.thumbnail.mime_type,
|
||||
height=file.thumbnail.height or thumb_size.h,
|
||||
width=file.thumbnail.width or thumb_size.w,
|
||||
size=file.thumbnail.size,
|
||||
)
|
||||
elif attrs.is_sticker:
|
||||
# This is a hack for bad clients like Element iOS that require a thumbnail
|
||||
info.thumbnail_info = ImageInfo.deserialize(info.serialize())
|
||||
if file.decryption_info:
|
||||
info.thumbnail_file = file.decryption_info
|
||||
else:
|
||||
info.thumbnail_url = file.mxc
|
||||
|
||||
return info, name
|
||||
|
||||
async def _handle_telegram_document(
|
||||
self, source: au.AbstractUser, intent: IntentAPI, evt: Message, relates_to: RelatesTo
|
||||
) -> EventID | None:
|
||||
document = evt.media.document
|
||||
|
||||
attrs = self._parse_telegram_document_attributes(document.attributes)
|
||||
|
||||
if document.size > self.matrix.media_config.upload_size:
|
||||
name = attrs.name or ""
|
||||
caption = f"\n{evt.message}" if evt.message else ""
|
||||
# TODO encrypt
|
||||
return await intent.send_notice(self.mxid, f"Too large file {name}{caption}")
|
||||
|
||||
thumb_loc, thumb_size = self._get_largest_photo_size(document)
|
||||
if thumb_size and not isinstance(thumb_size, (PhotoSize, PhotoCachedSize)):
|
||||
self.log.debug(f"Unsupported thumbnail type {type(thumb_size)}")
|
||||
thumb_loc = None
|
||||
thumb_size = None
|
||||
parallel_id = source.tgid if self.config["bridge.parallel_file_transfer"] else None
|
||||
file = await util.transfer_file_to_matrix(
|
||||
source.client,
|
||||
intent,
|
||||
document,
|
||||
thumb_loc,
|
||||
is_sticker=attrs.is_sticker,
|
||||
tgs_convert=self.config["bridge.animated_sticker"],
|
||||
filename=attrs.name,
|
||||
parallel_id=parallel_id,
|
||||
encrypt=self.encrypted,
|
||||
async_upload=self.config["homeserver.async_media"],
|
||||
)
|
||||
if not file:
|
||||
return None
|
||||
|
||||
info, name = self._parse_telegram_document_meta(evt, file, attrs, thumb_size)
|
||||
|
||||
await intent.set_typing(self.mxid, is_typing=False)
|
||||
|
||||
event_type = EventType.ROOM_MESSAGE
|
||||
# Elements only support images as stickers, so send animated webm stickers as m.video
|
||||
if attrs.is_sticker and file.mime_type.startswith("image/"):
|
||||
event_type = EventType.STICKER
|
||||
# Tell clients to render the stickers as 256x256 if they're bigger
|
||||
if info.width > 256 or info.height > 256:
|
||||
if info.width > info.height:
|
||||
info.height = int(info.height / (info.width / 256))
|
||||
info.width = 256
|
||||
else:
|
||||
info.width = int(info.width / (info.height / 256))
|
||||
info.height = 256
|
||||
if info.thumbnail_info:
|
||||
info.thumbnail_info.width = info.width
|
||||
info.thumbnail_info.height = info.height
|
||||
if attrs.is_gif or (attrs.is_sticker and info.mimetype == "video/webm"):
|
||||
if attrs.is_gif:
|
||||
info["fi.mau.telegram.gif"] = True
|
||||
else:
|
||||
info["fi.mau.telegram.animated_sticker"] = True
|
||||
info["fi.mau.loop"] = True
|
||||
info["fi.mau.autoplay"] = True
|
||||
info["fi.mau.hide_controls"] = True
|
||||
info["fi.mau.no_audio"] = True
|
||||
if not name:
|
||||
ext = sane_mimetypes.guess_extension(file.mime_type)
|
||||
name = "unnamed_file" + ext
|
||||
|
||||
content = MediaMessageEventContent(
|
||||
body=name,
|
||||
info=info,
|
||||
relates_to=relates_to,
|
||||
external_url=self._get_external_url(evt),
|
||||
msgtype={
|
||||
"video/": MessageType.VIDEO,
|
||||
"audio/": MessageType.AUDIO,
|
||||
"image/": MessageType.IMAGE,
|
||||
}.get(info.mimetype[:6], MessageType.FILE),
|
||||
)
|
||||
if event_type == EventType.STICKER:
|
||||
content.msgtype = None
|
||||
if attrs.is_audio:
|
||||
content["org.matrix.msc1767.audio"] = {"duration": attrs.duration * 1000}
|
||||
if attrs.waveform:
|
||||
content["org.matrix.msc1767.audio"]["waveform"] = [x << 5 for x in attrs.waveform]
|
||||
if attrs.is_voice:
|
||||
content["org.matrix.msc3245.voice"] = {}
|
||||
if file.decryption_info:
|
||||
content.file = file.decryption_info
|
||||
else:
|
||||
content.url = file.mxc
|
||||
res = await self._send_message(intent, content, event_type=event_type, timestamp=evt.date)
|
||||
if evt.media.ttl_seconds:
|
||||
await DisappearingMessage(self.mxid, res, evt.media.ttl_seconds).insert()
|
||||
if evt.message:
|
||||
caption_content = await formatter.telegram_to_matrix(
|
||||
evt, source, self.main_intent, no_reply_fallback=True
|
||||
)
|
||||
caption_content.external_url = content.external_url
|
||||
res = await self._send_message(intent, caption_content, timestamp=evt.date)
|
||||
if evt.media.ttl_seconds:
|
||||
await DisappearingMessage(self.mxid, res, evt.media.ttl_seconds).insert()
|
||||
return res
|
||||
|
||||
def _location_message_to_content(
|
||||
self, evt: Message, relates_to: RelatesTo, note: str
|
||||
) -> LocationMessageEventContent:
|
||||
long = evt.media.geo.long
|
||||
lat = evt.media.geo.lat
|
||||
long_char = "E" if long > 0 else "W"
|
||||
lat_char = "N" if lat > 0 else "S"
|
||||
geo = f"{round(lat, 6)},{round(long, 6)}"
|
||||
|
||||
body = f"{round(abs(lat), 4)}° {lat_char}, {round(abs(long), 4)}° {long_char}"
|
||||
url = f"https://maps.google.com/?q={geo}"
|
||||
|
||||
content = LocationMessageEventContent(
|
||||
msgtype=MessageType.LOCATION,
|
||||
geo_uri=f"geo:{geo}",
|
||||
body=f"{note}: {body}\n{url}",
|
||||
relates_to=relates_to,
|
||||
external_url=self._get_external_url(evt),
|
||||
)
|
||||
content["format"] = str(Format.HTML)
|
||||
content["formatted_body"] = f"{note}: <a href='{url}'>{body}</a>"
|
||||
content["org.matrix.msc3488.location"] = {
|
||||
"uri": content.geo_uri,
|
||||
"description": note,
|
||||
}
|
||||
return content
|
||||
|
||||
def _handle_telegram_location(
|
||||
self, source: au.AbstractUser, intent: IntentAPI, evt: Message, relates_to: RelatesTo
|
||||
) -> Awaitable[EventID]:
|
||||
content = self._location_message_to_content(evt, relates_to, "Location")
|
||||
return self._send_message(intent, content, timestamp=evt.date)
|
||||
|
||||
def _handle_telegram_live_location(
|
||||
self, source: au.AbstractUser, intent: IntentAPI, evt: Message, relates_to: RelatesTo
|
||||
) -> Awaitable[EventID]:
|
||||
content = self._location_message_to_content(
|
||||
evt, relates_to, "Live Location (see your Telegram client for live updates)"
|
||||
)
|
||||
return self._send_message(intent, content, timestamp=evt.date)
|
||||
|
||||
def _handle_telegram_venue(
|
||||
self, source: au.AbstractUser, intent: IntentAPI, evt: Message, relates_to: RelatesTo
|
||||
) -> Awaitable[EventID]:
|
||||
content = self._location_message_to_content(evt, relates_to, evt.media.title)
|
||||
return self._send_message(intent, content, timestamp=evt.date)
|
||||
|
||||
async def _telegram_webpage_to_beeper_link_preview(
|
||||
self, source: au.AbstractUser, intent: IntentAPI, webpage: WebPage
|
||||
) -> dict[str, Any]:
|
||||
beeper_link_preview: dict[str, Any] = {
|
||||
"matched_url": webpage.url,
|
||||
"og:title": webpage.title,
|
||||
"og:url": webpage.url,
|
||||
"og:description": webpage.description,
|
||||
}
|
||||
|
||||
# Upload an image corresponding to the link preview if it exists.
|
||||
if webpage.photo:
|
||||
loc, largest_size = self._get_largest_photo_size(webpage.photo)
|
||||
if loc is None:
|
||||
return beeper_link_preview
|
||||
beeper_link_preview["og:image:height"] = largest_size.h
|
||||
beeper_link_preview["og:image:width"] = largest_size.w
|
||||
file = await util.transfer_file_to_matrix(
|
||||
source.client,
|
||||
intent,
|
||||
loc,
|
||||
encrypt=self.encrypted,
|
||||
async_upload=self.config["homeserver.async_media"],
|
||||
)
|
||||
|
||||
if file.decryption_info:
|
||||
beeper_link_preview[BEEPER_IMAGE_ENCRYPTION_KEY] = file.decryption_info.serialize()
|
||||
else:
|
||||
beeper_link_preview["og:image"] = file.mxc
|
||||
|
||||
return beeper_link_preview
|
||||
|
||||
async def _handle_telegram_text(
|
||||
self, source: au.AbstractUser, intent: IntentAPI, is_bot: bool, evt: Message
|
||||
) -> EventID:
|
||||
self.log.trace(f"Sending {evt.message} to {self.mxid} by {intent.mxid}")
|
||||
content = await formatter.telegram_to_matrix(evt, source, self.main_intent)
|
||||
content.external_url = self._get_external_url(evt)
|
||||
if is_bot and self.get_config("bot_messages_as_notices"):
|
||||
content.msgtype = MessageType.NOTICE
|
||||
await intent.set_typing(self.mxid, is_typing=False)
|
||||
|
||||
if (
|
||||
hasattr(evt, "media")
|
||||
and isinstance(evt.media, MessageMediaWebPage)
|
||||
and isinstance(evt.media.webpage, WebPage)
|
||||
):
|
||||
content[BEEPER_LINK_PREVIEWS_KEY] = [
|
||||
await self._telegram_webpage_to_beeper_link_preview(
|
||||
source, intent, evt.media.webpage
|
||||
)
|
||||
]
|
||||
|
||||
return await self._send_message(intent, content, timestamp=evt.date)
|
||||
|
||||
async def _handle_telegram_unsupported(
|
||||
self, source: au.AbstractUser, intent: IntentAPI, evt: Message, relates_to: RelatesTo
|
||||
) -> EventID:
|
||||
override_text = (
|
||||
"This message is not supported on your version of Mautrix-Telegram. "
|
||||
"Please check https://github.com/mautrix/telegram or ask your "
|
||||
"bridge administrator about possible updates."
|
||||
)
|
||||
content = await formatter.telegram_to_matrix(
|
||||
evt, source, self.main_intent, override_text=override_text
|
||||
)
|
||||
content.msgtype = MessageType.NOTICE
|
||||
content.external_url = self._get_external_url(evt)
|
||||
content["net.maunium.telegram.unsupported"] = True
|
||||
await intent.set_typing(self.mxid, is_typing=False)
|
||||
return await self._send_message(intent, content, timestamp=evt.date)
|
||||
|
||||
async def _handle_telegram_poll(
|
||||
self, source: au.AbstractUser, intent: IntentAPI, evt: Message, relates_to: RelatesTo
|
||||
) -> EventID:
|
||||
poll: Poll = evt.media.poll
|
||||
poll_id = self._encode_msgid(source, evt)
|
||||
|
||||
_n = 0
|
||||
|
||||
def n() -> int:
|
||||
nonlocal _n
|
||||
_n += 1
|
||||
return _n
|
||||
|
||||
text_answers = "\n".join(f"{n()}. {answer.text}" for answer in poll.answers)
|
||||
html_answers = "\n".join(f"<li>{answer.text}</li>" for answer in poll.answers)
|
||||
content = TextMessageEventContent(
|
||||
msgtype=MessageType.TEXT,
|
||||
format=Format.HTML,
|
||||
body=(
|
||||
f"Poll: {poll.question}\n{text_answers}\n"
|
||||
f"Vote with !tg vote {poll_id} <choice number>"
|
||||
),
|
||||
formatted_body=(
|
||||
f"<strong>Poll</strong>: {poll.question}<br/>\n"
|
||||
f"<ol>{html_answers}</ol>\n"
|
||||
f"Vote with <code>!tg vote {poll_id} <choice number></code>"
|
||||
),
|
||||
relates_to=relates_to,
|
||||
external_url=self._get_external_url(evt),
|
||||
)
|
||||
|
||||
await intent.set_typing(self.mxid, is_typing=False)
|
||||
return await self._send_message(intent, content, timestamp=evt.date)
|
||||
|
||||
async def _handle_telegram_dice(
|
||||
self, _: au.AbstractUser, intent: IntentAPI, evt: Message, relates_to: RelatesTo
|
||||
) -> EventID:
|
||||
content = putil.make_dice_event_content(evt.media)
|
||||
content.relates_to = relates_to
|
||||
content.external_url = self._get_external_url(evt)
|
||||
await intent.set_typing(self.mxid, is_typing=False)
|
||||
return await self._send_message(intent, content, timestamp=evt.date)
|
||||
|
||||
@staticmethod
|
||||
def _int_to_bytes(i: int) -> bytes:
|
||||
hex_value = f"{i:010x}".encode("utf-8")
|
||||
return codecs.decode(hex_value, "hex_codec")
|
||||
|
||||
def _encode_msgid(self, source: au.AbstractUser, evt: Message) -> str:
|
||||
if self.peer_type == "channel":
|
||||
play_id = b"c" + self._int_to_bytes(self.tgid) + self._int_to_bytes(evt.id)
|
||||
elif self.peer_type == "chat":
|
||||
play_id = (
|
||||
b"g"
|
||||
+ self._int_to_bytes(self.tgid)
|
||||
+ self._int_to_bytes(evt.id)
|
||||
+ self._int_to_bytes(source.tgid)
|
||||
)
|
||||
elif self.peer_type == "user":
|
||||
play_id = b"u" + self._int_to_bytes(self.tgid) + self._int_to_bytes(evt.id)
|
||||
else:
|
||||
raise ValueError("Portal has invalid peer type")
|
||||
return base64.b64encode(play_id).decode("utf-8").rstrip("=")
|
||||
|
||||
async def _handle_telegram_game(
|
||||
self, source: au.AbstractUser, intent: IntentAPI, evt: Message, relates_to: RelatesTo
|
||||
) -> EventID:
|
||||
game = evt.media.game
|
||||
play_id = self._encode_msgid(source, evt)
|
||||
command = f"!tg play {play_id}"
|
||||
override_text = f"Run {command} in your bridge management room to play {game.title}"
|
||||
override_entities = [
|
||||
MessageEntityPre(offset=len("Run "), length=len(command), language="")
|
||||
]
|
||||
|
||||
content = await formatter.telegram_to_matrix(
|
||||
evt,
|
||||
source,
|
||||
self.main_intent,
|
||||
override_text=override_text,
|
||||
override_entities=override_entities,
|
||||
)
|
||||
content.msgtype = MessageType.NOTICE
|
||||
content.external_url = self._get_external_url(evt)
|
||||
content.relates_to = relates_to
|
||||
content["net.maunium.telegram.game"] = play_id
|
||||
|
||||
await intent.set_typing(self.mxid, is_typing=False)
|
||||
return await self._send_message(intent, content, timestamp=evt.date)
|
||||
|
||||
async def _handle_telegram_contact(
|
||||
self, source: au.AbstractUser, intent: IntentAPI, evt: Message, relates_to: RelatesTo
|
||||
) -> EventID:
|
||||
content = await putil.make_contact_event_content(source, evt.media)
|
||||
content.relates_to = relates_to
|
||||
content.external_url = self._get_external_url(evt)
|
||||
|
||||
await intent.set_typing(self.mxid, is_typing=False)
|
||||
return await self._send_message(intent, content, timestamp=evt.date)
|
||||
|
||||
async def handle_telegram_edit(
|
||||
self, source: au.AbstractUser, sender: p.Puppet, evt: Message
|
||||
) -> None:
|
||||
@@ -2770,9 +2242,6 @@ class Portal(DBPortal, BasePortal):
|
||||
).insert()
|
||||
return
|
||||
|
||||
content = await formatter.telegram_to_matrix(
|
||||
evt, source, self.main_intent, no_reply_fallback=True
|
||||
)
|
||||
editing_msg = await DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space)
|
||||
if not editing_msg:
|
||||
self.log.info(
|
||||
@@ -2791,17 +2260,15 @@ class Portal(DBPortal, BasePortal):
|
||||
await DBMessage.delete_temp_mxid(temporary_identifier, self.mxid)
|
||||
return
|
||||
|
||||
content.msgtype = (
|
||||
MessageType.NOTICE
|
||||
if (sender and sender.is_bot and self.get_config("bot_messages_as_notices"))
|
||||
else MessageType.TEXT
|
||||
)
|
||||
content.external_url = self._get_external_url(evt)
|
||||
content.set_edit(editing_msg.mxid)
|
||||
|
||||
intent = sender.intent_for(self) if sender else self.main_intent
|
||||
is_bot = sender.is_bot if sender else False
|
||||
converted = await self._msg_conv.convert(
|
||||
source, intent, is_bot, evt, no_reply_fallback=True
|
||||
)
|
||||
converted.content.set_edit(editing_msg.mxid)
|
||||
await intent.set_typing(self.mxid, is_typing=False)
|
||||
event_id = await self._send_message(intent, content)
|
||||
timestamp = evt.edit_date if evt.edit_date != evt.date else None
|
||||
event_id = await self._send_message(intent, converted.content, timestamp=timestamp)
|
||||
|
||||
await DBMessage(
|
||||
mxid=event_id,
|
||||
@@ -3195,53 +2662,19 @@ class Portal(DBPortal, BasePortal):
|
||||
f" updating with data {entity!s}"
|
||||
)
|
||||
|
||||
allowed_media = (
|
||||
MessageMediaPhoto,
|
||||
MessageMediaDocument,
|
||||
MessageMediaGeo,
|
||||
MessageMediaGeoLive,
|
||||
MessageMediaVenue,
|
||||
MessageMediaGame,
|
||||
MessageMediaDice,
|
||||
MessageMediaPoll,
|
||||
MessageMediaContact,
|
||||
MessageMediaUnsupported,
|
||||
)
|
||||
if sender:
|
||||
# TODO don't use double puppet when backfilling
|
||||
intent = sender.intent_for(self)
|
||||
if (
|
||||
self.backfill_lock.locked
|
||||
and intent != sender.default_mxid_intent
|
||||
and self.config["bridge.backfill.invite_own_puppet"]
|
||||
):
|
||||
intent = sender.default_mxid_intent
|
||||
self.backfill_leave.add(intent)
|
||||
else:
|
||||
intent = self.main_intent
|
||||
if hasattr(evt, "media") and isinstance(evt.media, allowed_media):
|
||||
handler: MediaHandler = {
|
||||
MessageMediaPhoto: self._handle_telegram_photo,
|
||||
MessageMediaDocument: self._handle_telegram_document,
|
||||
MessageMediaGeo: self._handle_telegram_location,
|
||||
MessageMediaGeoLive: self._handle_telegram_live_location,
|
||||
MessageMediaVenue: self._handle_telegram_venue,
|
||||
MessageMediaPoll: self._handle_telegram_poll,
|
||||
MessageMediaDice: self._handle_telegram_dice,
|
||||
MessageMediaUnsupported: self._handle_telegram_unsupported,
|
||||
MessageMediaGame: self._handle_telegram_game,
|
||||
MessageMediaContact: self._handle_telegram_contact,
|
||||
}[type(evt.media)]
|
||||
relates_to = await formatter.telegram_reply_to_matrix(evt, source)
|
||||
event_id = await handler(source, intent, evt, relates_to)
|
||||
elif evt.message:
|
||||
is_bot = sender.is_bot if sender else False
|
||||
event_id = await self._handle_telegram_text(source, intent, is_bot, evt)
|
||||
else:
|
||||
self.log.debug("Unhandled Telegram message %d", evt.id)
|
||||
return
|
||||
|
||||
if not event_id:
|
||||
is_bot = sender.is_bot if sender else False
|
||||
converted = await self._msg_conv.convert(source, intent, is_bot, evt)
|
||||
if not converted:
|
||||
return
|
||||
await intent.set_typing(self.mxid, is_typing=False)
|
||||
event_id = await self._send_message(intent, converted.content, timestamp=evt.date)
|
||||
if converted.caption:
|
||||
await self._send_message(intent, converted.caption, timestamp=evt.date)
|
||||
|
||||
self._new_messages_after_sponsored = True
|
||||
|
||||
@@ -3424,41 +2857,6 @@ class Portal(DBPortal, BasePortal):
|
||||
return local
|
||||
return self.config[f"bridge.{key}"]
|
||||
|
||||
@staticmethod
|
||||
def _photo_size_key(photo: TypePhotoSize) -> int:
|
||||
if isinstance(photo, PhotoSize):
|
||||
return photo.size
|
||||
elif isinstance(photo, PhotoSizeProgressive):
|
||||
return max(photo.sizes)
|
||||
elif isinstance(photo, PhotoSizeEmpty):
|
||||
return 0
|
||||
else:
|
||||
return len(photo.bytes)
|
||||
|
||||
@classmethod
|
||||
def _get_largest_photo_size(
|
||||
cls, photo: Photo | Document
|
||||
) -> tuple[InputPhotoFileLocation | None, TypePhotoSize | None]:
|
||||
if (
|
||||
not photo
|
||||
or isinstance(photo, PhotoEmpty)
|
||||
or (isinstance(photo, Document) and not photo.thumbs)
|
||||
):
|
||||
return None, None
|
||||
|
||||
largest = max(
|
||||
photo.thumbs if isinstance(photo, Document) else photo.sizes, key=cls._photo_size_key
|
||||
)
|
||||
return (
|
||||
InputPhotoFileLocation(
|
||||
id=photo.id,
|
||||
access_hash=photo.access_hash,
|
||||
file_reference=photo.file_reference,
|
||||
thumb_size=largest.type,
|
||||
),
|
||||
largest,
|
||||
)
|
||||
|
||||
async def can_user_perform(self, user: u.User, event: str) -> bool:
|
||||
if user.is_admin:
|
||||
return True
|
||||
@@ -3469,7 +2867,7 @@ class Portal(DBPortal, BasePortal):
|
||||
await self.main_intent.get_power_levels(self.mxid)
|
||||
except MatrixRequestError:
|
||||
return False
|
||||
evt_type = EventType.find(f"net.maunium.telegram.{event}", t_class=EventType.Class.STATE)
|
||||
evt_type = EventType.find(f"fi.mau.telegram.{event}", t_class=EventType.Class.STATE)
|
||||
return await self.main_intent.state_store.has_power_level(self.mxid, user.mxid, evt_type)
|
||||
|
||||
def get_input_entity(
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
from .deduplication import PortalDedup
|
||||
from .media_fallback import make_contact_event_content, make_dice_event_content
|
||||
from .message_convert import TelegramMessageConverter
|
||||
from .participants import get_users
|
||||
from .power_levels import get_base_power_levels, participants_to_power_levels
|
||||
from .send_lock import PortalReactionLock, PortalSendLock
|
||||
|
||||
@@ -1,133 +0,0 @@
|
||||
# mautrix-telegram - A Matrix-Telegram puppeting bridge
|
||||
# Copyright (C) 2021 Tulir Asokan
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
from __future__ import annotations
|
||||
|
||||
import html
|
||||
|
||||
from telethon.tl.types import MessageMediaContact, MessageMediaDice, PeerUser
|
||||
|
||||
from mautrix.types import Format, MessageType, TextMessageEventContent
|
||||
|
||||
from .. import abstract_user as au, puppet as pu
|
||||
from ..types import TelegramID
|
||||
|
||||
try:
|
||||
import phonenumbers
|
||||
except ImportError:
|
||||
phonenumbers = None
|
||||
|
||||
|
||||
def _format_dice(roll: MessageMediaDice) -> str:
|
||||
if roll.emoticon == "\U0001F3B0":
|
||||
emojis = {
|
||||
0: "\U0001F36B", # "🍫",
|
||||
1: "\U0001F352", # "🍒",
|
||||
2: "\U0001F34B", # "🍋",
|
||||
3: "7\ufe0f\u20e3", # "7️⃣",
|
||||
}
|
||||
res = roll.value - 1
|
||||
slot1, slot2, slot3 = emojis[res % 4], emojis[res // 4 % 4], emojis[res // 16]
|
||||
return f"{slot1} {slot2} {slot3} ({roll.value})"
|
||||
elif roll.emoticon == "\u26BD":
|
||||
results = {
|
||||
1: "miss",
|
||||
2: "hit the woodwork",
|
||||
3: "goal", # seems to go in through the center
|
||||
4: "goal",
|
||||
5: "goal 🎉", # seems to go in through the top right corner, includes confetti
|
||||
}
|
||||
elif roll.emoticon == "\U0001F3B3":
|
||||
results = {
|
||||
1: "miss",
|
||||
2: "1 pin down",
|
||||
3: "3 pins down, split",
|
||||
4: "4 pins down, split",
|
||||
5: "5 pins down",
|
||||
6: "strike 🎉",
|
||||
}
|
||||
# elif roll.emoticon == "\U0001F3C0":
|
||||
# results = {
|
||||
# 2: "rolled off",
|
||||
# 3: "stuck",
|
||||
# }
|
||||
# elif roll.emoticon == "\U0001F3AF":
|
||||
# results = {
|
||||
# 1: "bounced off",
|
||||
# 2: "outer rim",
|
||||
#
|
||||
# 6: "bullseye",
|
||||
# }
|
||||
else:
|
||||
return str(roll.value)
|
||||
return f"{results[roll.value]} ({roll.value})"
|
||||
|
||||
|
||||
def make_dice_event_content(roll: MessageMediaDice) -> TextMessageEventContent:
|
||||
emoji_text = {
|
||||
"\U0001F3AF": " Dart throw",
|
||||
"\U0001F3B2": " Dice roll",
|
||||
"\U0001F3C0": " Basketball throw",
|
||||
"\U0001F3B0": " Slot machine",
|
||||
"\U0001F3B3": " Bowling",
|
||||
"\u26BD": " Football kick",
|
||||
}
|
||||
text = f"{roll.emoticon}{emoji_text.get(roll.emoticon, '')} result: {_format_dice(roll)}"
|
||||
content = TextMessageEventContent(
|
||||
msgtype=MessageType.TEXT, format=Format.HTML, body=text, formatted_body=f"<h4>{text}</h4>"
|
||||
)
|
||||
content["net.maunium.telegram.dice"] = {"emoticon": roll.emoticon, "value": roll.value}
|
||||
return content
|
||||
|
||||
|
||||
async def make_contact_event_content(
|
||||
source: au.AbstractUser, contact: MessageMediaContact
|
||||
) -> TextMessageEventContent:
|
||||
name = " ".join(x for x in [contact.first_name, contact.last_name] if x)
|
||||
formatted_phone = f"+{contact.phone_number}"
|
||||
if phonenumbers is not None:
|
||||
try:
|
||||
parsed = phonenumbers.parse(formatted_phone)
|
||||
fmt = phonenumbers.PhoneNumberFormat.INTERNATIONAL
|
||||
formatted_phone = phonenumbers.format_number(parsed, fmt)
|
||||
except phonenumbers.NumberParseException:
|
||||
pass
|
||||
content = TextMessageEventContent(
|
||||
msgtype=MessageType.TEXT,
|
||||
body=f"Shared contact info for {name}: {formatted_phone}",
|
||||
)
|
||||
content["net.maunium.telegram.contact"] = {
|
||||
"user_id": contact.user_id,
|
||||
"first_name": contact.first_name,
|
||||
"last_name": contact.last_name,
|
||||
"phone_number": contact.phone_number,
|
||||
"vcard": contact.vcard,
|
||||
}
|
||||
|
||||
puppet = await pu.Puppet.get_by_tgid(TelegramID(contact.user_id))
|
||||
if not puppet.displayname:
|
||||
try:
|
||||
entity = await source.client.get_entity(PeerUser(contact.user_id))
|
||||
await puppet.update_info(source, entity)
|
||||
except Exception as e:
|
||||
source.log.warning(f"Failed to sync puppet info of received contact: {e}")
|
||||
else:
|
||||
content.format = Format.HTML
|
||||
content.formatted_body = (
|
||||
f"Shared contact info for "
|
||||
f"<a href='https://matrix.to/#/{puppet.mxid}'>{html.escape(name)}</a>: "
|
||||
f"{html.escape(formatted_phone)}"
|
||||
)
|
||||
return content
|
||||
@@ -0,0 +1,762 @@
|
||||
# mautrix-telegram - A Matrix-Telegram puppeting bridge
|
||||
# Copyright (C) 2022 Tulir Asokan
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, NamedTuple
|
||||
import base64
|
||||
import codecs
|
||||
import html
|
||||
import mimetypes
|
||||
import unicodedata
|
||||
|
||||
from attr import dataclass
|
||||
from telethon.tl.types import (
|
||||
Document,
|
||||
DocumentAttributeAnimated,
|
||||
DocumentAttributeAudio,
|
||||
DocumentAttributeFilename,
|
||||
DocumentAttributeImageSize,
|
||||
DocumentAttributeSticker,
|
||||
DocumentAttributeVideo,
|
||||
Game,
|
||||
InputPhotoFileLocation,
|
||||
Message,
|
||||
MessageEntityPre,
|
||||
MessageMediaContact,
|
||||
MessageMediaDice,
|
||||
MessageMediaDocument,
|
||||
MessageMediaGame,
|
||||
MessageMediaGeo,
|
||||
MessageMediaGeoLive,
|
||||
MessageMediaPhoto,
|
||||
MessageMediaPoll,
|
||||
MessageMediaUnsupported,
|
||||
MessageMediaVenue,
|
||||
MessageMediaWebPage,
|
||||
PeerChannel,
|
||||
PeerUser,
|
||||
Photo,
|
||||
PhotoCachedSize,
|
||||
PhotoEmpty,
|
||||
PhotoSize,
|
||||
PhotoSizeEmpty,
|
||||
PhotoSizeProgressive,
|
||||
Poll,
|
||||
TypeDocumentAttribute,
|
||||
TypePhotoSize,
|
||||
WebPage,
|
||||
)
|
||||
from telethon.utils import decode_waveform
|
||||
|
||||
from mautrix.appservice import IntentAPI
|
||||
from mautrix.types import (
|
||||
EventType,
|
||||
Format,
|
||||
ImageInfo,
|
||||
LocationMessageEventContent,
|
||||
MediaMessageEventContent,
|
||||
MessageEventContent,
|
||||
MessageType,
|
||||
TextMessageEventContent,
|
||||
ThumbnailInfo,
|
||||
)
|
||||
from mautrix.util.logging import TraceLogger
|
||||
|
||||
from .. import abstract_user as au, formatter, matrix as m, portal as po, puppet as pu, util
|
||||
from ..config import Config
|
||||
from ..db import Message as DBMessage, TelegramFile as DBTelegramFile
|
||||
from ..types import TelegramID
|
||||
from ..util import sane_mimetypes
|
||||
|
||||
try:
|
||||
import phonenumbers
|
||||
except ImportError:
|
||||
phonenumbers = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConvertedMessage:
|
||||
content: MessageEventContent
|
||||
caption: MessageEventContent | None = None
|
||||
type: EventType = EventType.ROOM_MESSAGE
|
||||
disappear_in: int | None = None
|
||||
|
||||
|
||||
class DocAttrs(NamedTuple):
|
||||
name: str | None
|
||||
mime_type: str | None
|
||||
is_sticker: bool
|
||||
sticker_alt: str | None
|
||||
width: int
|
||||
height: int
|
||||
is_gif: bool
|
||||
is_audio: bool
|
||||
is_voice: bool
|
||||
duration: int
|
||||
waveform: bytes
|
||||
|
||||
|
||||
BEEPER_LINK_PREVIEWS_KEY = "com.beeper.linkpreviews"
|
||||
BEEPER_IMAGE_ENCRYPTION_KEY = "beeper:image:encryption"
|
||||
|
||||
|
||||
class TelegramMessageConverter:
|
||||
portal: po.Portal
|
||||
matrix: m.MatrixHandler
|
||||
config: Config
|
||||
log: TraceLogger
|
||||
|
||||
def __init__(self, portal: po.Portal) -> None:
|
||||
self.portal = portal
|
||||
self.matrix = portal.matrix
|
||||
self.config = portal.config
|
||||
self.log = portal.log.getChild("msg_conv")
|
||||
|
||||
self._media_converters = {
|
||||
MessageMediaPhoto: self._convert_photo,
|
||||
MessageMediaDocument: self._convert_document,
|
||||
MessageMediaGeo: self._convert_location,
|
||||
MessageMediaGeoLive: self._convert_location,
|
||||
MessageMediaVenue: self._convert_location,
|
||||
MessageMediaPoll: self._convert_poll,
|
||||
MessageMediaDice: self._convert_dice,
|
||||
MessageMediaUnsupported: self._convert_unsupported,
|
||||
MessageMediaGame: self._convert_game,
|
||||
MessageMediaContact: self._convert_contact,
|
||||
}
|
||||
self._allowed_media = tuple(self._media_converters.keys())
|
||||
|
||||
async def convert(
|
||||
self,
|
||||
source: au.AbstractUser,
|
||||
intent: IntentAPI,
|
||||
is_bot: bool,
|
||||
evt: Message,
|
||||
no_reply_fallback: bool = False,
|
||||
) -> ConvertedMessage | None:
|
||||
if hasattr(evt, "media") and isinstance(evt.media, self._allowed_media):
|
||||
convert_media = self._media_converters[type(evt.media)]
|
||||
converted = await convert_media(source=source, intent=intent, evt=evt)
|
||||
elif evt.message:
|
||||
converted = await self._convert_text(source, intent, is_bot, evt)
|
||||
else:
|
||||
self.log.debug("Unhandled Telegram message %d", evt.id)
|
||||
return
|
||||
if converted:
|
||||
converted.content.external_url = self._get_external_url(evt)
|
||||
if converted.caption:
|
||||
converted.caption.external_url = converted.content.external_url
|
||||
if self.portal.get_config("caption_in_message"):
|
||||
self._caption_to_message(converted)
|
||||
await self._set_reply(source, evt, converted.content, no_fallback=no_reply_fallback)
|
||||
return converted
|
||||
|
||||
@staticmethod
|
||||
def _caption_to_message(converted: ConvertedMessage) -> None:
|
||||
content, caption = converted.content, converted.caption
|
||||
converted.caption = None
|
||||
|
||||
content["filename"] = content.body
|
||||
content["org.matrix.msc1767.caption"] = {
|
||||
"org.matrix.msc1767.text": caption.body,
|
||||
}
|
||||
content.body = caption.body
|
||||
if caption.format == Format.HTML:
|
||||
content["org.matrix.msc1767.caption"][
|
||||
"org.matrix.msc1767.html"
|
||||
] = caption.formatted_body
|
||||
content["formatted_body"] = caption.formatted_body
|
||||
content["format"] = Format.HTML.value
|
||||
|
||||
def _get_external_url(self, evt: Message) -> str | None:
|
||||
if self.portal.peer_type == "channel" and self.portal.username is not None:
|
||||
return f"https://t.me/{self.portal.username}/{evt.id}"
|
||||
elif self.portal.peer_type != "user":
|
||||
return f"https://t.me/c/{self.portal.tgid}/{evt.id}"
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _int_to_bytes(i: int) -> bytes:
|
||||
return codecs.decode(f"{i:010x}", "hex")
|
||||
|
||||
def _encode_msgid(self, source: au.AbstractUser, evt: Message) -> str:
|
||||
if self.portal.peer_type == "channel":
|
||||
play_id = b"c" + self._int_to_bytes(self.portal.tgid) + self._int_to_bytes(evt.id)
|
||||
elif self.portal.peer_type == "chat":
|
||||
play_id = (
|
||||
b"g"
|
||||
+ self._int_to_bytes(self.portal.tgid)
|
||||
+ self._int_to_bytes(evt.id)
|
||||
+ self._int_to_bytes(source.tgid)
|
||||
)
|
||||
elif self.portal.peer_type == "user":
|
||||
play_id = b"u" + self._int_to_bytes(self.portal.tgid) + self._int_to_bytes(evt.id)
|
||||
else:
|
||||
raise ValueError("Portal has invalid peer type")
|
||||
return base64.b64encode(play_id).decode("utf-8").rstrip("=")
|
||||
|
||||
async def _set_reply(
|
||||
self,
|
||||
source: au.AbstractUser,
|
||||
evt: Message,
|
||||
content: MessageEventContent,
|
||||
no_fallback: bool = False,
|
||||
) -> None:
|
||||
if not evt.reply_to:
|
||||
return
|
||||
space = (
|
||||
evt.peer_id.channel_id
|
||||
if isinstance(evt, Message) and isinstance(evt.peer_id, PeerChannel)
|
||||
else source.tgid
|
||||
)
|
||||
msg = await DBMessage.get_one_by_tgid(TelegramID(evt.reply_to.reply_to_msg_id), space)
|
||||
if not msg or msg.mx_room != self.portal.mxid:
|
||||
return
|
||||
elif not isinstance(content, TextMessageEventContent) or no_fallback:
|
||||
# Not a text message, just set the reply metadata and return
|
||||
content.set_reply(msg.mxid)
|
||||
return
|
||||
|
||||
# Text message, try to fetch original message to generate reply fallback.
|
||||
try:
|
||||
event = await self.portal.main_intent.get_event(msg.mx_room, msg.mxid)
|
||||
if event.type == EventType.ROOM_ENCRYPTED and source.bridge.matrix.e2ee:
|
||||
event = await source.bridge.matrix.e2ee.decrypt(event)
|
||||
if isinstance(event.content, TextMessageEventContent):
|
||||
event.content.trim_reply_fallback()
|
||||
puppet = await pu.Puppet.get_by_mxid(event.sender, create=False)
|
||||
content.set_reply(event, displayname=puppet.displayname if puppet else event.sender)
|
||||
except Exception:
|
||||
self.log.exception("Failed to get event to add reply fallback")
|
||||
content.set_reply(msg.mxid)
|
||||
|
||||
@staticmethod
|
||||
def _photo_size_key(photo: TypePhotoSize) -> int:
|
||||
if isinstance(photo, PhotoSize):
|
||||
return photo.size
|
||||
elif isinstance(photo, PhotoSizeProgressive):
|
||||
return max(photo.sizes)
|
||||
elif isinstance(photo, PhotoSizeEmpty):
|
||||
return 0
|
||||
else:
|
||||
return len(photo.bytes)
|
||||
|
||||
@classmethod
|
||||
def get_largest_photo_size(
|
||||
cls, photo: Photo | Document
|
||||
) -> tuple[InputPhotoFileLocation | None, TypePhotoSize | None]:
|
||||
if (
|
||||
not photo
|
||||
or isinstance(photo, PhotoEmpty)
|
||||
or (isinstance(photo, Document) and not photo.thumbs)
|
||||
):
|
||||
return None, None
|
||||
|
||||
largest = max(
|
||||
photo.thumbs if isinstance(photo, Document) else photo.sizes, key=cls._photo_size_key
|
||||
)
|
||||
return (
|
||||
InputPhotoFileLocation(
|
||||
id=photo.id,
|
||||
access_hash=photo.access_hash,
|
||||
file_reference=photo.file_reference,
|
||||
thumb_size=largest.type,
|
||||
),
|
||||
largest,
|
||||
)
|
||||
|
||||
async def _webpage_to_beeper_link_preview(
|
||||
self, source: au.AbstractUser, intent: IntentAPI, webpage: WebPage
|
||||
) -> dict[str, Any]:
|
||||
beeper_link_preview: dict[str, Any] = {
|
||||
"matched_url": webpage.url,
|
||||
"og:title": webpage.title,
|
||||
"og:url": webpage.url,
|
||||
"og:description": webpage.description,
|
||||
}
|
||||
|
||||
# Upload an image corresponding to the link preview if it exists.
|
||||
if webpage.photo:
|
||||
loc, largest_size = self.get_largest_photo_size(webpage.photo)
|
||||
if loc is None:
|
||||
return beeper_link_preview
|
||||
beeper_link_preview["og:image:height"] = largest_size.h
|
||||
beeper_link_preview["og:image:width"] = largest_size.w
|
||||
file = await util.transfer_file_to_matrix(
|
||||
source.client,
|
||||
intent,
|
||||
loc,
|
||||
encrypt=self.portal.encrypted,
|
||||
async_upload=self.config["homeserver.async_media"],
|
||||
)
|
||||
|
||||
if file.decryption_info:
|
||||
beeper_link_preview[BEEPER_IMAGE_ENCRYPTION_KEY] = file.decryption_info.serialize()
|
||||
else:
|
||||
beeper_link_preview["og:image"] = file.mxc
|
||||
|
||||
return beeper_link_preview
|
||||
|
||||
async def _convert_text(
|
||||
self, source: au.AbstractUser, intent: IntentAPI, is_bot: bool, evt: Message
|
||||
) -> ConvertedMessage:
|
||||
content = await formatter.telegram_to_matrix(evt, source)
|
||||
if is_bot and self.portal.get_config("bot_messages_as_notices"):
|
||||
content.msgtype = MessageType.NOTICE
|
||||
|
||||
if (
|
||||
hasattr(evt, "media")
|
||||
and isinstance(evt.media, MessageMediaWebPage)
|
||||
and isinstance(evt.media.webpage, WebPage)
|
||||
):
|
||||
content[BEEPER_LINK_PREVIEWS_KEY] = [
|
||||
await self._webpage_to_beeper_link_preview(source, intent, evt.media.webpage)
|
||||
]
|
||||
|
||||
return ConvertedMessage(content=content)
|
||||
|
||||
async def _convert_photo(
|
||||
self, source: au.AbstractUser, intent: IntentAPI, evt: Message
|
||||
) -> ConvertedMessage | None:
|
||||
media: MessageMediaPhoto = evt.media
|
||||
if media.photo is None and media.ttl_seconds:
|
||||
return ConvertedMessage(
|
||||
content=TextMessageEventContent(
|
||||
msgtype=MessageType.NOTICE, body="Photo has expired"
|
||||
)
|
||||
)
|
||||
loc, largest_size = self.get_largest_photo_size(media.photo)
|
||||
if loc is None:
|
||||
return ConvertedMessage(
|
||||
content=TextMessageEventContent(
|
||||
msgtype=MessageType.TEXT,
|
||||
body="Failed to bridge image",
|
||||
)
|
||||
)
|
||||
file = await util.transfer_file_to_matrix(
|
||||
source.client,
|
||||
intent,
|
||||
loc,
|
||||
encrypt=self.portal.encrypted,
|
||||
async_upload=self.config["homeserver.async_media"],
|
||||
)
|
||||
if not file:
|
||||
return None
|
||||
info = ImageInfo(
|
||||
height=largest_size.h,
|
||||
width=largest_size.w,
|
||||
orientation=0,
|
||||
mimetype=file.mime_type,
|
||||
size=self._photo_size_key(largest_size),
|
||||
)
|
||||
ext = sane_mimetypes.guess_extension(file.mime_type)
|
||||
name = f"disappearing_image{ext}" if media.ttl_seconds else f"image{ext}"
|
||||
content = MediaMessageEventContent(
|
||||
msgtype=MessageType.IMAGE,
|
||||
info=info,
|
||||
body=name,
|
||||
)
|
||||
if file.decryption_info:
|
||||
content.file = file.decryption_info
|
||||
else:
|
||||
content.url = file.mxc
|
||||
caption_content = await formatter.telegram_to_matrix(evt, source) if evt.message else None
|
||||
return ConvertedMessage(
|
||||
content=content,
|
||||
caption=caption_content,
|
||||
disappear_in=media.ttl_seconds,
|
||||
)
|
||||
|
||||
async def _convert_document(
|
||||
self, source: au.AbstractUser, intent: IntentAPI, evt: Message
|
||||
) -> ConvertedMessage | None:
|
||||
document = evt.media.document
|
||||
|
||||
attrs = _parse_document_attributes(document.attributes)
|
||||
|
||||
if document.size > self.matrix.media_config.upload_size:
|
||||
name = attrs.name or ""
|
||||
caption = f"\n{evt.message}" if evt.message else ""
|
||||
return ConvertedMessage(
|
||||
content=TextMessageEventContent(
|
||||
msgtype=MessageType.NOTICE, body=f"Too large file {name}{caption}"
|
||||
)
|
||||
)
|
||||
|
||||
thumb_loc, thumb_size = self.get_largest_photo_size(document)
|
||||
if thumb_size and not isinstance(thumb_size, (PhotoSize, PhotoCachedSize)):
|
||||
self.log.debug(f"Unsupported thumbnail type {type(thumb_size)}")
|
||||
thumb_loc = None
|
||||
thumb_size = None
|
||||
parallel_id = source.tgid if self.config["bridge.parallel_file_transfer"] else None
|
||||
file = await util.transfer_file_to_matrix(
|
||||
source.client,
|
||||
intent,
|
||||
document,
|
||||
thumb_loc,
|
||||
is_sticker=attrs.is_sticker,
|
||||
tgs_convert=self.config["bridge.animated_sticker"],
|
||||
filename=attrs.name,
|
||||
parallel_id=parallel_id,
|
||||
encrypt=self.portal.encrypted,
|
||||
async_upload=self.config["homeserver.async_media"],
|
||||
)
|
||||
if not file:
|
||||
return None
|
||||
|
||||
info, name = _parse_document_meta(evt, file, attrs, thumb_size)
|
||||
|
||||
event_type = EventType.ROOM_MESSAGE
|
||||
# Elements only support images as stickers, so send animated webm stickers as m.video
|
||||
if attrs.is_sticker and file.mime_type.startswith("image/"):
|
||||
event_type = EventType.STICKER
|
||||
# Tell clients to render the stickers as 256x256 if they're bigger
|
||||
if info.width > 256 or info.height > 256:
|
||||
if info.width > info.height:
|
||||
info.height = int(info.height / (info.width / 256))
|
||||
info.width = 256
|
||||
else:
|
||||
info.width = int(info.width / (info.height / 256))
|
||||
info.height = 256
|
||||
if info.thumbnail_info:
|
||||
info.thumbnail_info.width = info.width
|
||||
info.thumbnail_info.height = info.height
|
||||
if attrs.is_gif or (attrs.is_sticker and info.mimetype == "video/webm"):
|
||||
if attrs.is_gif:
|
||||
info["fi.mau.telegram.gif"] = True
|
||||
else:
|
||||
info["fi.mau.telegram.animated_sticker"] = True
|
||||
info["fi.mau.loop"] = True
|
||||
info["fi.mau.autoplay"] = True
|
||||
info["fi.mau.hide_controls"] = True
|
||||
info["fi.mau.no_audio"] = True
|
||||
if not name:
|
||||
ext = sane_mimetypes.guess_extension(file.mime_type)
|
||||
name = "unnamed_file" + ext
|
||||
|
||||
content = MediaMessageEventContent(
|
||||
body=name,
|
||||
info=info,
|
||||
msgtype={
|
||||
"video/": MessageType.VIDEO,
|
||||
"audio/": MessageType.AUDIO,
|
||||
"image/": MessageType.IMAGE,
|
||||
}.get(info.mimetype[:6], MessageType.FILE),
|
||||
)
|
||||
if event_type == EventType.STICKER:
|
||||
content.msgtype = None
|
||||
if attrs.is_audio:
|
||||
content["org.matrix.msc1767.audio"] = {"duration": attrs.duration * 1000}
|
||||
if attrs.waveform:
|
||||
content["org.matrix.msc1767.audio"]["waveform"] = [x << 5 for x in attrs.waveform]
|
||||
if attrs.is_voice:
|
||||
content["org.matrix.msc3245.voice"] = {}
|
||||
if file.decryption_info:
|
||||
content.file = file.decryption_info
|
||||
else:
|
||||
content.url = file.mxc
|
||||
|
||||
caption_content = await formatter.telegram_to_matrix(evt, source) if evt.message else None
|
||||
|
||||
return ConvertedMessage(
|
||||
type=event_type,
|
||||
content=content,
|
||||
caption=caption_content,
|
||||
disappear_in=evt.media.ttl_seconds,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _convert_location(evt: Message, **_) -> ConvertedMessage:
|
||||
long = evt.media.geo.long
|
||||
lat = evt.media.geo.lat
|
||||
long_char = "E" if long > 0 else "W"
|
||||
lat_char = "N" if lat > 0 else "S"
|
||||
geo = f"{round(lat, 6)},{round(long, 6)}"
|
||||
|
||||
body = f"{round(abs(lat), 4)}° {lat_char}, {round(abs(long), 4)}° {long_char}"
|
||||
url = f"https://maps.google.com/?q={geo}"
|
||||
|
||||
if isinstance(evt.media, MessageMediaGeoLive):
|
||||
note = "Live Location (see your Telegram client for live updates)"
|
||||
elif isinstance(evt.media, MessageMediaVenue):
|
||||
note = evt.media.title
|
||||
else:
|
||||
note = "Location"
|
||||
|
||||
content = LocationMessageEventContent(
|
||||
msgtype=MessageType.LOCATION,
|
||||
geo_uri=f"geo:{geo}",
|
||||
body=f"{note}: {body}\n{url}",
|
||||
)
|
||||
content["format"] = str(Format.HTML)
|
||||
content["formatted_body"] = f"{note}: <a href='{url}'>{body}</a>"
|
||||
content["org.matrix.msc3488.location"] = {
|
||||
"uri": content.geo_uri,
|
||||
"description": note,
|
||||
}
|
||||
return ConvertedMessage(content=content)
|
||||
|
||||
@staticmethod
|
||||
async def _convert_unsupported(source: au.AbstractUser, evt: Message, **_) -> ConvertedMessage:
|
||||
override_text = (
|
||||
"This message is not supported on your version of Mautrix-Telegram. "
|
||||
"Please check https://github.com/mautrix/telegram or ask your "
|
||||
"bridge administrator about possible updates."
|
||||
)
|
||||
content = await formatter.telegram_to_matrix(evt, source, override_text=override_text)
|
||||
content.msgtype = MessageType.NOTICE
|
||||
content["fi.mau.telegram.unsupported"] = True
|
||||
return ConvertedMessage(content=content)
|
||||
|
||||
async def _convert_poll(self, source: au.AbstractUser, evt: Message) -> ConvertedMessage:
|
||||
poll: Poll = evt.media.poll
|
||||
poll_id = self._encode_msgid(source, evt)
|
||||
|
||||
_n = 0
|
||||
|
||||
def n() -> int:
|
||||
nonlocal _n
|
||||
_n += 1
|
||||
return _n
|
||||
|
||||
text_answers = "\n".join(f"{n()}. {answer.text}" for answer in poll.answers)
|
||||
html_answers = "\n".join(f"<li>{answer.text}</li>" for answer in poll.answers)
|
||||
content = TextMessageEventContent(
|
||||
msgtype=MessageType.TEXT,
|
||||
format=Format.HTML,
|
||||
body=(
|
||||
f"Poll: {poll.question}\n{text_answers}\n"
|
||||
f"Vote with !tg vote {poll_id} <choice number>"
|
||||
),
|
||||
formatted_body=(
|
||||
f"<strong>Poll</strong>: {poll.question}<br/>\n"
|
||||
f"<ol>{html_answers}</ol>\n"
|
||||
f"Vote with <code>!tg vote {poll_id} <choice number></code>"
|
||||
),
|
||||
)
|
||||
|
||||
return ConvertedMessage(content=content)
|
||||
|
||||
@staticmethod
|
||||
async def _convert_dice(evt: Message, **_) -> ConvertedMessage:
|
||||
roll: MessageMediaDice = evt.media
|
||||
emoji_text = {
|
||||
"\U0001F3AF": " Dart throw",
|
||||
"\U0001F3B2": " Dice roll",
|
||||
"\U0001F3C0": " Basketball throw",
|
||||
"\U0001F3B0": " Slot machine",
|
||||
"\U0001F3B3": " Bowling",
|
||||
"\u26BD": " Football kick",
|
||||
}
|
||||
text = f"{roll.emoticon}{emoji_text.get(roll.emoticon, '')} result: {_format_dice(roll)}"
|
||||
content = TextMessageEventContent(
|
||||
msgtype=MessageType.TEXT,
|
||||
format=Format.HTML,
|
||||
body=text,
|
||||
formatted_body=f"<h4>{text}</h4>",
|
||||
)
|
||||
content["fi.mau.telegram.dice"] = {"emoticon": roll.emoticon, "value": roll.value}
|
||||
return ConvertedMessage(content=content)
|
||||
|
||||
async def _convert_game(self, source: au.AbstractUser, evt: Message, **_) -> ConvertedMessage:
|
||||
game: Game = evt.media.game
|
||||
play_id = self._encode_msgid(source, evt)
|
||||
command = f"!tg play {play_id}"
|
||||
override_text = f"Run {command} in your bridge management room to play {game.title}"
|
||||
override_entities = [
|
||||
MessageEntityPre(offset=len("Run "), length=len(command), language="")
|
||||
]
|
||||
|
||||
content = await formatter.telegram_to_matrix(
|
||||
evt, source, override_text=override_text, override_entities=override_entities
|
||||
)
|
||||
content.msgtype = MessageType.NOTICE
|
||||
content["fi.mau.telegram.game"] = play_id
|
||||
|
||||
return ConvertedMessage(content=content)
|
||||
|
||||
@staticmethod
|
||||
async def _convert_contact(source: au.AbstractUser, evt: Message, **_) -> ConvertedMessage:
|
||||
contact: MessageMediaContact = evt.media
|
||||
name = " ".join(x for x in [contact.first_name, contact.last_name] if x)
|
||||
formatted_phone = f"+{contact.phone_number}"
|
||||
if phonenumbers is not None:
|
||||
try:
|
||||
parsed = phonenumbers.parse(formatted_phone)
|
||||
fmt = phonenumbers.PhoneNumberFormat.INTERNATIONAL
|
||||
formatted_phone = phonenumbers.format_number(parsed, fmt)
|
||||
except phonenumbers.NumberParseException:
|
||||
pass
|
||||
content = TextMessageEventContent(
|
||||
msgtype=MessageType.TEXT,
|
||||
body=f"Shared contact info for {name}: {formatted_phone}",
|
||||
)
|
||||
content["fi.mau.telegram.contact"] = {
|
||||
"user_id": contact.user_id,
|
||||
"first_name": contact.first_name,
|
||||
"last_name": contact.last_name,
|
||||
"phone_number": contact.phone_number,
|
||||
"vcard": contact.vcard,
|
||||
}
|
||||
|
||||
puppet = await pu.Puppet.get_by_tgid(TelegramID(contact.user_id))
|
||||
if not puppet.displayname:
|
||||
try:
|
||||
entity = await source.client.get_entity(PeerUser(contact.user_id))
|
||||
await puppet.update_info(source, entity)
|
||||
except Exception as e:
|
||||
source.log.warning(f"Failed to sync puppet info of received contact: {e}")
|
||||
else:
|
||||
content.format = Format.HTML
|
||||
content.formatted_body = (
|
||||
f"Shared contact info for "
|
||||
f"<a href='https://matrix.to/#/{puppet.mxid}'>{html.escape(name)}</a>: "
|
||||
f"{html.escape(formatted_phone)}"
|
||||
)
|
||||
return ConvertedMessage(content=content)
|
||||
|
||||
|
||||
def _parse_document_attributes(attributes: list[TypeDocumentAttribute]) -> DocAttrs:
|
||||
name, mime_type, is_sticker, sticker_alt, width, height = None, None, False, None, 0, 0
|
||||
is_gif, is_audio, is_voice, duration, waveform = False, False, False, 0, bytes()
|
||||
for attr in attributes:
|
||||
if isinstance(attr, DocumentAttributeFilename):
|
||||
name = name or attr.file_name
|
||||
mime_type, _ = mimetypes.guess_type(attr.file_name)
|
||||
elif isinstance(attr, DocumentAttributeSticker):
|
||||
is_sticker = True
|
||||
sticker_alt = attr.alt
|
||||
elif isinstance(attr, DocumentAttributeAnimated):
|
||||
is_gif = True
|
||||
elif isinstance(attr, DocumentAttributeVideo):
|
||||
width, height = attr.w, attr.h
|
||||
elif isinstance(attr, DocumentAttributeImageSize):
|
||||
width, height = attr.w, attr.h
|
||||
elif isinstance(attr, DocumentAttributeAudio):
|
||||
is_audio = True
|
||||
is_voice = attr.voice or False
|
||||
duration = attr.duration
|
||||
waveform = decode_waveform(attr.waveform) if attr.waveform else b""
|
||||
|
||||
return DocAttrs(
|
||||
name,
|
||||
mime_type,
|
||||
is_sticker,
|
||||
sticker_alt,
|
||||
width,
|
||||
height,
|
||||
is_gif,
|
||||
is_audio,
|
||||
is_voice,
|
||||
duration,
|
||||
waveform,
|
||||
)
|
||||
|
||||
|
||||
def _parse_document_meta(
|
||||
evt: Message, file: DBTelegramFile, attrs: DocAttrs, thumb_size: TypePhotoSize
|
||||
) -> tuple[ImageInfo, str]:
|
||||
document = evt.media.document
|
||||
name = attrs.name
|
||||
if attrs.is_sticker:
|
||||
alt = attrs.sticker_alt
|
||||
if len(alt) > 0:
|
||||
try:
|
||||
name = f"{alt} ({unicodedata.name(alt[0]).lower()})"
|
||||
except ValueError:
|
||||
name = alt
|
||||
|
||||
generic_types = ("text/plain", "application/octet-stream")
|
||||
if file.mime_type in generic_types and document.mime_type not in generic_types:
|
||||
mime_type = document.mime_type or file.mime_type
|
||||
elif file.mime_type == "application/ogg":
|
||||
mime_type = "audio/ogg"
|
||||
else:
|
||||
mime_type = file.mime_type or document.mime_type
|
||||
info = ImageInfo(size=file.size, mimetype=mime_type)
|
||||
|
||||
if attrs.mime_type and not file.was_converted:
|
||||
file.mime_type = attrs.mime_type or file.mime_type
|
||||
if file.width and file.height:
|
||||
info.width, info.height = file.width, file.height
|
||||
elif attrs.width and attrs.height:
|
||||
info.width, info.height = attrs.width, attrs.height
|
||||
|
||||
if file.thumbnail:
|
||||
if file.thumbnail.decryption_info:
|
||||
info.thumbnail_file = file.thumbnail.decryption_info
|
||||
else:
|
||||
info.thumbnail_url = file.thumbnail.mxc
|
||||
info.thumbnail_info = ThumbnailInfo(
|
||||
mimetype=file.thumbnail.mime_type,
|
||||
height=file.thumbnail.height or thumb_size.h,
|
||||
width=file.thumbnail.width or thumb_size.w,
|
||||
size=file.thumbnail.size,
|
||||
)
|
||||
elif attrs.is_sticker:
|
||||
# This is a hack for bad clients like Element iOS that require a thumbnail
|
||||
info.thumbnail_info = ImageInfo.deserialize(info.serialize())
|
||||
if file.decryption_info:
|
||||
info.thumbnail_file = file.decryption_info
|
||||
else:
|
||||
info.thumbnail_url = file.mxc
|
||||
|
||||
return info, name
|
||||
|
||||
|
||||
def _format_dice(roll: MessageMediaDice) -> str:
|
||||
if roll.emoticon == "\U0001F3B0":
|
||||
emojis = {
|
||||
0: "\U0001F36B", # "🍫",
|
||||
1: "\U0001F352", # "🍒",
|
||||
2: "\U0001F34B", # "🍋",
|
||||
3: "7\ufe0f\u20e3", # "7️⃣",
|
||||
}
|
||||
res = roll.value - 1
|
||||
slot1, slot2, slot3 = emojis[res % 4], emojis[res // 4 % 4], emojis[res // 16]
|
||||
return f"{slot1} {slot2} {slot3} ({roll.value})"
|
||||
elif roll.emoticon == "\u26BD":
|
||||
results = {
|
||||
1: "miss",
|
||||
2: "hit the woodwork",
|
||||
3: "goal", # seems to go in through the center
|
||||
4: "goal",
|
||||
5: "goal 🎉", # seems to go in through the top right corner, includes confetti
|
||||
}
|
||||
elif roll.emoticon == "\U0001F3B3":
|
||||
results = {
|
||||
1: "miss",
|
||||
2: "1 pin down",
|
||||
3: "3 pins down, split",
|
||||
4: "4 pins down, split",
|
||||
5: "5 pins down",
|
||||
6: "strike 🎉",
|
||||
}
|
||||
# elif roll.emoticon == "\U0001F3C0":
|
||||
# results = {
|
||||
# 2: "rolled off",
|
||||
# 3: "stuck",
|
||||
# }
|
||||
# elif roll.emoticon == "\U0001F3AF":
|
||||
# results = {
|
||||
# 1: "bounced off",
|
||||
# 2: "outer rim",
|
||||
#
|
||||
# 6: "bullseye",
|
||||
# }
|
||||
else:
|
||||
return str(roll.value)
|
||||
return f"{results[roll.value]} ({roll.value})"
|
||||
@@ -83,7 +83,7 @@ async def make_sponsored_message_content(
|
||||
else:
|
||||
sponsor_name = sponsor_name_html = "unknown entity"
|
||||
|
||||
content["net.maunium.telegram.sponsored"] = sponsored_meta
|
||||
content["fi.mau.telegram.sponsored"] = sponsored_meta
|
||||
content.formatted_body += (
|
||||
f"<br/><br/>Sponsored message from {sponsor_name_html} "
|
||||
f"- <a href='{content.external_url}'>{action}</a>"
|
||||
|
||||
Reference in New Issue
Block a user