diff --git a/mautrix_telegram/formatter/from_matrix/__init__.py b/mautrix_telegram/formatter/from_matrix/__init__.py
index a041109f..b4062445 100644
--- a/mautrix_telegram/formatter/from_matrix/__init__.py
+++ b/mautrix_telegram/formatter/from_matrix/__init__.py
@@ -19,23 +19,24 @@ import logging
from telethon.tl.types import (MessageEntityMention, MessageEntityMentionName, MessageEntityItalic,
TypeMessageEntity)
+from telethon.helpers import add_surrogate, del_surrogate
+
+from mautrix.types import RoomID
from ... import puppet as pu
-from ...types import TelegramID, MatrixRoomID
+from ...types import TelegramID
from ...db import Message as DBMessage
-from ..util import (add_surrogates, remove_surrogates, trim_reply_fallback_html,
- trim_reply_fallback_text)
from .parser import ParsedMessage, parse_html
if TYPE_CHECKING:
from ...context import Context
-log = logging.getLogger("mau.fmt.mx") # type: logging.Logger
-should_bridge_plaintext_highlights = False # type: bool
+log: logging.Logger = logging.getLogger("mau.fmt.mx")
+should_bridge_plaintext_highlights: bool = False
-command_regex = re.compile(r"^!([A-Za-z0-9@]+)") # type: Pattern
-not_command_regex = re.compile(r"^\\(![A-Za-z0-9@]+)") # type: Pattern
-plain_mention_regex = None # type: Optional[Pattern]
+command_regex: Pattern = re.compile(r"^!([A-Za-z0-9@]+)")
+not_command_regex: Pattern = re.compile(r"^\\(![A-Za-z0-9@]+)")
+plain_mention_regex: Optional[Pattern] = None
def plain_mention_to_html(match: Match) -> str:
@@ -75,8 +76,8 @@ def matrix_to_telegram(html: str) -> ParsedMessage:
if should_bridge_plaintext_highlights:
html = plain_mention_regex.sub(plain_mention_to_html, html)
- text, entities = parse_html(add_surrogates(html))
- text = remove_surrogates(text.strip())
+ text, entities = parse_html(add_surrogate(html))
+ text = del_surrogate(text.strip())
text, entities = cut_long_message(text, entities)
return text, entities
@@ -85,7 +86,7 @@ def matrix_to_telegram(html: str) -> ParsedMessage:
def matrix_reply_to_telegram(content: Dict[str, Any], tg_space: TelegramID,
- room_id: Optional[MatrixRoomID] = None) -> Optional[TelegramID]:
+ room_id: Optional[RoomID] = None) -> Optional[TelegramID]:
relates_to = content.get("m.relates_to", None) or {}
if not relates_to:
return None
diff --git a/mautrix_telegram/formatter/from_matrix/html_reader.py b/mautrix_telegram/formatter/from_matrix/html_reader.py
deleted file mode 100644
index 0828aa89..00000000
--- a/mautrix_telegram/formatter/from_matrix/html_reader.py
+++ /dev/null
@@ -1,65 +0,0 @@
-# mautrix-telegram - A Matrix-Telegram puppeting bridge
-# Copyright (C) 2019 Tulir Asokan
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see .
-from typing import Dict, List, Tuple
-
-from html.parser import HTMLParser
-
-
-class HTMLNode(list):
- def __init__(self, tag: str, attrs: List[Tuple[str, str]]):
- super().__init__()
- self.tag = tag # type: str
- self.text = "" # type: str
- self.tail = "" # type: str
- self.attrib = dict(attrs) # type: Dict[str, str]
-
-
-class NodeifyingParser(HTMLParser):
- # From https://www.w3.org/TR/html5/syntax.html#writing-html-documents-elements
- void_tags = ("area", "base", "br", "col", "command", "embed", "hr", "img", "input", "link",
- "meta", "param", "source", "track", "wbr")
-
- def __init__(self):
- super().__init__()
- self.stack = [HTMLNode("html", [])] # type: List[HTMLNode]
-
- def handle_starttag(self, tag, attrs):
- node = HTMLNode(tag, attrs)
- self.stack[-1].append(node)
- if tag not in self.void_tags:
- self.stack.append(node)
-
- def handle_startendtag(self, tag, attrs):
- self.stack[-1].append(HTMLNode(tag, attrs))
-
- def handle_endtag(self, tag):
- if tag == self.stack[-1].tag:
- self.stack.pop()
-
- def handle_data(self, data):
- if len(self.stack[-1]) > 0:
- self.stack[-1][-1].tail += data
- else:
- self.stack[-1].text += data
-
- def error(self, message):
- pass
-
-
-def read_html(data: str) -> HTMLNode:
- parser = NodeifyingParser()
- parser.feed(data)
- return parser.stack[0]
diff --git a/mautrix_telegram/formatter/from_matrix/html_reader.pyi b/mautrix_telegram/formatter/from_matrix/html_reader.pyi
deleted file mode 100644
index d292ff3c..00000000
--- a/mautrix_telegram/formatter/from_matrix/html_reader.pyi
+++ /dev/null
@@ -1,11 +0,0 @@
-from typing import Dict, List
-
-
-class HTMLNode(List['HTMLNode']):
- tag: str
- text: str
- tail: str
- attrib: Dict[str, str]
-
-
-def read_html(data: str) -> HTMLNode: ...
diff --git a/mautrix_telegram/formatter/from_matrix/parser.py b/mautrix_telegram/formatter/from_matrix/parser.py
index 3429126e..fdcf5aac 100644
--- a/mautrix_telegram/formatter/from_matrix/parser.py
+++ b/mautrix_telegram/formatter/from_matrix/parser.py
@@ -13,240 +13,77 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
-from typing import List, Tuple, Pattern
-import re
+from typing import List, Tuple, Optional
-from telethon.tl.types import (MessageEntityMention as Mention, MessageEntityBotCommand as Command,
- MessageEntityMentionName as MentionName, MessageEntityUrl as URL,
- MessageEntityEmail as Email, MessageEntityTextUrl as TextURL,
- MessageEntityBold as Bold, MessageEntityItalic as Italic,
- MessageEntityCode as Code, MessageEntityPre as Pre,
- MessageEntityStrike as Strike, MessageEntityUnderline as Underline,
- MessageEntityBlockquote as Blockquote, TypeMessageEntity)
+from telethon.tl.types import TypeMessageEntity
+
+from mautrix.types import UserID, RoomID
+from mautrix.util.formatter import MatrixParser as BaseMatrixParser, RecursionContext
+from mautrix.util.formatter.html_reader_htmlparser import read_html, HTMLNode
from ... import user as u, puppet as pu, portal as po
-from ...types import MatrixUserID
-from .telegram_message import TelegramMessage, Entity, offset_length_multiply
+from .telegram_message import TelegramMessage, TelegramEntityType
-from .html_reader import HTMLNode, read_html
ParsedMessage = Tuple[str, List[TypeMessageEntity]]
def parse_html(input_html: str) -> ParsedMessage:
- return MatrixParser.parse(input_html)
+ msg = MatrixParser.parse(input_html)
+ return msg.text, msg.telegram_entities
-class RecursionContext:
- def __init__(self, strip_linebreaks: bool = True, ul_depth: int = 0):
- self.strip_linebreaks = strip_linebreaks # type: bool
- self.ul_depth = ul_depth # type: int
- self._inited = True # type: bool
-
- def __setattr__(self, key, value):
- if getattr(self, "_inited", False) is True:
- raise TypeError("'RecursionContext' object is immutable")
- super(RecursionContext, self).__setattr__(key, value)
-
- def enter_list(self) -> 'RecursionContext':
- return RecursionContext(strip_linebreaks=self.strip_linebreaks, ul_depth=self.ul_depth + 1)
-
- def enter_code_block(self) -> 'RecursionContext':
- return RecursionContext(strip_linebreaks=False, ul_depth=self.ul_depth)
-
-
-class MatrixParser:
- mention_regex = re.compile("https://matrix.to/#/(@.+:.+)") # type: Pattern
- room_regex = re.compile("https://matrix.to/#/(#.+:.+)") # type: Pattern
- block_tags = ("p", "pre", "blockquote",
- "ol", "ul", "li",
- "h1", "h2", "h3", "h4", "h5", "h6",
- "div", "hr", "table") # type: Tuple[str, ...]
- list_bullets = ("●", "○", "■", "‣") # type: Tuple[str, ...]
+class MatrixParser(BaseMatrixParser[TelegramMessage]):
+ e = TelegramEntityType
+ fs = TelegramMessage
+ read_html = read_html
@classmethod
- def list_bullet(cls, depth: int) -> str:
- return cls.list_bullets[(depth - 1) % len(cls.list_bullets)] + " "
-
- @classmethod
- def list_to_tmessage(cls, node: HTMLNode, ctx: RecursionContext) -> TelegramMessage:
- ordered = node.tag == "ol"
- tagged_children = cls.node_to_tagged_tmessages(node, ctx)
- counter = 1
- indent_length = 0
- if ordered:
- try:
- counter = int(node.attrib.get("start", "1"))
- except ValueError:
- counter = 1
-
- longest_index = counter - 1 + len(tagged_children)
- indent_length = len(str(longest_index))
- indent = (indent_length + 4) * " "
- children = [] # type: List[TelegramMessage]
- for child, tag in tagged_children:
- if tag != "li":
- continue
-
- if ordered:
- prefix = f"{counter}. "
- counter += 1
- else:
- prefix = cls.list_bullet(ctx.ul_depth)
- child = child.prepend(prefix)
- parts = child.split("\n")
- parts = parts[:1] + [part.prepend(indent) for part in parts[1:]]
- child = TelegramMessage.join(parts, "\n")
- children.append(child)
- return TelegramMessage.join(children, "\n")
-
- @classmethod
- def header_to_tmessage(cls, node: HTMLNode, ctx: RecursionContext) -> TelegramMessage:
- children = cls.node_to_tmessages(node, ctx)
- length = int(node.tag[1])
- prefix = "#" * length + " "
- return TelegramMessage.join(children, "").prepend(prefix).format(Bold)
-
- @classmethod
- def basic_format_to_tmessage(cls, node: HTMLNode, ctx: RecursionContext) -> TelegramMessage:
+ def custom_node_to_fstring(cls, node: HTMLNode, ctx: RecursionContext
+ ) -> Optional[TelegramMessage]:
msg = cls.tag_aware_parse_node(node, ctx)
- if node.tag in ("b", "strong"):
- msg.format(Bold)
- elif node.tag in ("i", "em"):
- msg.format(Italic)
- elif node.tag in ("s", "strike", "del"):
- msg.format(Strike)
- elif node.tag in ("u", "ins"):
- msg.format(Underline)
- elif node == "blockquote":
- msg.format(Blockquote)
- elif node.tag == "command":
- msg.format(Command)
+ if node.tag == "command":
+ msg.format(TelegramEntityType.COMMAND)
+ return None
+ @classmethod
+ def user_pill_to_fstring(cls, msg: TelegramMessage, user_id: UserID) -> TelegramMessage:
+ user = (pu.Puppet.get_by_mxid(user_id)
+ or u.User.get_by_mxid(user_id, create=False))
+ if not user:
+ return msg
+ if user.username:
+ return TelegramMessage(f"@{user.username}").format(TelegramEntityType.MENTION)
+ elif user.tgid:
+ displayname = user.plain_displayname or msg.text
+ return TelegramMessage(displayname).format(TelegramEntityType.MENTION_NAME,
+ user_id=user.tgid)
return msg
@classmethod
- def link_to_tstring(cls, node: HTMLNode, ctx: RecursionContext) -> TelegramMessage:
- msg = cls.tag_aware_parse_node(node, ctx)
- href = node.attrib.get("href", "")
- if not href:
- return msg
-
- if href.startswith("mailto:"):
- return TelegramMessage(href[len("mailto:"):]).format(Email)
-
- mention = cls.mention_regex.match(href)
- if mention:
- mxid = MatrixUserID(mention.group(1))
- user = (pu.Puppet.get_by_mxid(mxid)
- or u.User.get_by_mxid(mxid, create=False))
- if not user:
- return msg
- if user.username:
- return TelegramMessage(f"@{user.username}").format(Mention)
- elif user.tgid:
- displayname = user.plain_displayname or msg.text
- return TelegramMessage(displayname).format(MentionName, user_id=user.tgid)
- return msg
-
- room = cls.room_regex.match(href)
- if room:
- username = po.Portal.get_username_from_mx_alias(room.group(1))
- portal = po.Portal.find_by_username(username)
- if portal and portal.username:
- return TelegramMessage(f"@{portal.username}").format(Mention)
-
- return (msg.format(URL)
- if msg.text == href
- else msg.format(TextURL, url=href))
+ def url_to_fstring(cls, msg: TelegramMessage, url: str) -> TelegramMessage:
+ if url == msg.text:
+ return msg.format(cls.e.URL)
+ else:
+ return msg.format(cls.e.INLINE_URL, url=url)
@classmethod
- def blockquote_to_tmessage(cls, node: HTMLNode, ctx: RecursionContext) -> TelegramMessage:
+ def room_pill_to_fstring(cls, msg: TelegramMessage, room_id: RoomID) -> TelegramMessage:
+ username = po.Portal.get_username_from_mx_alias(room_id)
+ portal = po.Portal.find_by_username(username)
+ if portal and portal.username:
+ return TelegramMessage(f"@{portal.username}").format(TelegramEntityType.MENTION)
+
+ @classmethod
+ def header_to_fstring(cls, node: HTMLNode, ctx: RecursionContext) -> TelegramMessage:
+ children = cls.node_to_fstrings(node, ctx)
+ length = int(node.tag[1])
+ prefix = "#" * length + " "
+ return TelegramMessage.join(children, "").prepend(prefix).format(TelegramEntityType.BOLD)
+
+ @classmethod
+ def blockquote_to_fstring(cls, node: HTMLNode, ctx: RecursionContext) -> TelegramMessage:
msg = cls.tag_aware_parse_node(node, ctx)
children = msg.trim().split("\n")
children = [child.prepend("> ") for child in children]
return TelegramMessage.join(children, "\n")
-
- @classmethod
- def node_to_tmessage(cls, node: HTMLNode, ctx: RecursionContext) -> TelegramMessage:
- if node.tag == "mx-reply":
- return TelegramMessage("")
- elif node.tag == "ol":
- return cls.list_to_tmessage(node, ctx)
- elif node.tag == "ul":
- return cls.list_to_tmessage(node, ctx.enter_list())
- elif node.tag in ("h1", "h2", "h3", "h4", "h5", "h6"):
- return cls.header_to_tmessage(node, ctx)
- elif node.tag == "br":
- return TelegramMessage("\n")
- elif node.tag in ("b", "strong", "i", "em", "s", "del", "u", "ins", "command"):
- return cls.basic_format_to_tmessage(node, ctx)
- elif node.tag == "blockquote":
- # Telegram already has blockquote entities in the protocol schema, but it strips them
- # server-side and none of the official clients support them.
- # TODO once Telegram changes that, use the above if block for blockquotes too.
- return cls.blockquote_to_tmessage(node, ctx)
- elif node.tag == "a":
- return cls.link_to_tstring(node, ctx)
- elif node.tag == "p":
- return cls.tag_aware_parse_node(node, ctx).append("\n")
- elif node.tag == "pre":
- lang = ""
- try:
- if node[0].tag == "code":
- node = node[0]
- lang = node.attrib["class"][len("language-"):]
- except (IndexError, KeyError):
- pass
- return cls.parse_node(node, ctx.enter_code_block()).format(Pre, language=lang)
- elif node.tag == "code":
- return cls.parse_node(node, ctx.enter_code_block()).format(Code)
- return cls.tag_aware_parse_node(node, ctx)
-
- @staticmethod
- def text_to_tmessage(text: str, ctx: RecursionContext) -> TelegramMessage:
- if ctx.strip_linebreaks:
- text = text.replace("\n", "")
- return TelegramMessage(text)
-
- @classmethod
- def node_to_tagged_tmessages(cls, node: HTMLNode, ctx: RecursionContext
- ) -> List[Tuple[TelegramMessage, str]]:
- output = []
-
- if node.text:
- output.append((cls.text_to_tmessage(node.text, ctx), "text"))
- for child in node:
- output.append((cls.node_to_tmessage(child, ctx), child.tag))
- if child.tail:
- output.append((cls.text_to_tmessage(child.tail, ctx), "text"))
- return output
-
- @classmethod
- def node_to_tmessages(cls, node: HTMLNode, ctx: RecursionContext
- ) -> List[TelegramMessage]:
- return [msg for (msg, tag) in cls.node_to_tagged_tmessages(node, ctx)]
-
- @classmethod
- def tag_aware_parse_node(cls, node: HTMLNode, ctx: RecursionContext
- ) -> TelegramMessage:
- msgs = cls.node_to_tagged_tmessages(node, ctx)
- output = TelegramMessage()
- prev_was_block = False
- for msg, tag in msgs:
- if tag in cls.block_tags:
- msg = msg.append("\n")
- if not prev_was_block:
- msg = msg.prepend("\n")
- prev_was_block = True
- output = output.append(msg)
- return output.trim()
-
- @classmethod
- def parse_node(cls, node: HTMLNode, ctx: RecursionContext) -> TelegramMessage:
- return TelegramMessage.join(cls.node_to_tmessages(node, ctx))
-
- @classmethod
- def parse(cls, data: str) -> ParsedMessage:
- msg = cls.node_to_tmessage(read_html(f"
{data}"), RecursionContext())
- return msg.text, msg.entities
diff --git a/mautrix_telegram/formatter/from_matrix/telegram_message.py b/mautrix_telegram/formatter/from_matrix/telegram_message.py
index c30e5f72..c5cb8daa 100644
--- a/mautrix_telegram/formatter/from_matrix/telegram_message.py
+++ b/mautrix_telegram/formatter/from_matrix/telegram_message.py
@@ -13,145 +13,84 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
-from typing import Callable, List, Optional, Sequence, Type, Union
+from typing import Optional, Union, Any, List, Type, Dict
+from enum import Enum
-from telethon.tl.types import (MessageEntityMentionName as MentionName,
- MessageEntityTextUrl as TextURL, MessageEntityPre as Pre,
- TypeMessageEntity, InputMessageEntityMentionName as InputMentionName)
+from telethon.tl.types import (MessageEntityMention as Mention, MessageEntityBotCommand as Command,
+ MessageEntityMentionName as MentionName, MessageEntityUrl as URL,
+ MessageEntityEmail as Email, MessageEntityTextUrl as TextURL,
+ MessageEntityBold as Bold, MessageEntityItalic as Italic,
+ MessageEntityCode as Code, MessageEntityPre as Pre,
+ MessageEntityStrike as Strike, MessageEntityUnderline as Underline,
+ MessageEntityBlockquote as Blockquote, TypeMessageEntity,
+ InputMessageEntityMentionName as InputMentionName)
+
+from mautrix.util.formatter import EntityString, SemiAbstractEntity
-class Entity:
- @staticmethod
- def copy(entity: TypeMessageEntity) -> Optional[TypeMessageEntity]:
- if not entity:
- return None
- kwargs = {
- "offset": entity.offset,
- "length": entity.length,
- }
- if isinstance(entity, Pre):
- kwargs["language"] = entity.language
- elif isinstance(entity, TextURL):
- kwargs["url"] = entity.url
- elif isinstance(entity, (MentionName, InputMentionName)):
- kwargs["user_id"] = entity.user_id
- return entity.__class__(**kwargs)
+class TelegramEntityType(Enum):
+ """EntityType is a Matrix formatting entity type."""
+ BOLD = Bold
+ ITALIC = Italic
+ STRIKETHROUGH = Strike
+ UNDERLINE = Underline
+ URL = URL
+ INLINE_URL = TextURL
+ EMAIL = Email
+ PREFORMATTED = Pre
+ INLINE_CODE = Code
+ BLOCKQUOTE = Blockquote
+ MENTION = Mention
+ MENTION_NAME = MentionName
+ COMMAND = Command
- @classmethod
- def adjust(cls, entity: Union[TypeMessageEntity, List[TypeMessageEntity]],
- func: Callable[[TypeMessageEntity], None]
- ) -> Union[Optional[TypeMessageEntity], List[TypeMessageEntity]]:
- if isinstance(entity, list):
- return [Entity.adjust(element, func) for element in entity if entity]
- elif not entity:
- return None
- entity = cls.copy(entity)
- func(entity)
- if entity.offset < 0:
- entity.length += entity.offset
- entity.offset = 0
- return entity
+ USER_MENTION = 1
+ ROOM_MENTION = 2
+ HEADER = 3
-def offset_diff(amount: int) -> Callable[[TypeMessageEntity], None]:
- def func(entity: TypeMessageEntity) -> None:
- entity.offset += amount
+class TelegramEntity(SemiAbstractEntity):
+ internal: TypeMessageEntity
- return func
+ def __init__(self, type: Union[TelegramEntityType, Type[TypeMessageEntity]],
+ offset: int, length: int, extra_info: Dict[str, Any]) -> None:
+ if isinstance(type, TelegramEntityType):
+ if isinstance(type.value, int):
+ raise ValueError(f"Can't create Entity with non-Telegram EntityType {type}")
+ type = type.value
+ self.internal = type(offset=offset, length=length, **extra_info)
+
+ def copy(self) -> Optional['TelegramEntity']:
+ extra_info = {}
+ if isinstance(self.internal, Pre):
+ extra_info["language"] = self.internal.language
+ elif isinstance(self.internal, TextURL):
+ extra_info["url"] = self.internal.url
+ elif isinstance(self.internal, (MentionName, InputMentionName)):
+ extra_info["user_id"] = self.internal.user_id
+ return TelegramEntity(type(self.internal), offset=self.internal.offset,
+ length=self.internal.length, extra_info=extra_info)
+
+ @property
+ def offset(self) -> int:
+ return self.internal.offset
+
+ @offset.setter
+ def offset(self, value: int) -> None:
+ self.internal.offset = value
+
+ @property
+ def length(self) -> int:
+ return self.internal.length
+
+ @length.setter
+ def length(self, value: int) -> None:
+ self.internal.length = value
-def offset_length_multiply(amount: int) -> Callable[[TypeMessageEntity], None]:
- def func(entity: TypeMessageEntity) -> None:
- entity.offset *= amount
- entity.length *= amount
+class TelegramMessage(EntityString[TelegramEntity, TelegramEntityType]):
+ entity_class = TelegramEntity
- return func
-
-
-class TelegramMessage:
- def __init__(self, text: str = "", entities: Optional[List[TypeMessageEntity]] = None) -> None:
- self.text = text # type: str
- self.entities = entities or [] # type: List[TypeMessageEntity]
-
- def offset_entities(self, offset: int) -> 'TelegramMessage':
- def apply_offset(entity: TypeMessageEntity, inner_offset: int
- ) -> Optional[TypeMessageEntity]:
- entity = Entity.copy(entity)
- entity.offset += inner_offset
- if entity.offset < 0:
- entity.offset = 0
- elif entity.offset > len(self.text):
- return None
- elif entity.offset + entity.length > len(self.text):
- entity.length = len(self.text) - entity.offset
- return entity
-
- self.entities = [apply_offset(entity, offset) for entity in self.entities if entity]
- self.entities = [x for x in self.entities if x is not None]
- return self
-
- def append(self, *args: Union[str, 'TelegramMessage']) -> 'TelegramMessage':
- for msg in args:
- if isinstance(msg, str):
- msg = TelegramMessage(text=msg)
- self.entities += Entity.adjust(msg.entities, offset_diff(len(self.text)))
- self.text += msg.text
- return self
-
- def prepend(self, *args: Union[str, 'TelegramMessage']) -> 'TelegramMessage':
- for msg in args:
- if isinstance(msg, str):
- msg = TelegramMessage(text=msg)
- self.entities = msg.entities + Entity.adjust(self.entities, offset_diff(len(msg.text)))
- self.text = msg.text + self.text
- return self
-
- def format(self, entity_type: Type[TypeMessageEntity], offset: int = None, length: int = None,
- **kwargs) -> 'TelegramMessage':
- self.entities.append(entity_type(offset=offset or 0,
- length=length if length is not None else len(self.text),
- **kwargs))
- return self
-
- def concat(self, *args: Union[str, 'TelegramMessage']) -> 'TelegramMessage':
- return TelegramMessage().append(self, *args)
-
- def trim(self) -> 'TelegramMessage':
- orig_len = len(self.text)
- self.text = self.text.lstrip()
- diff = orig_len - len(self.text)
- self.text = self.text.rstrip()
- self.offset_entities(-diff)
- return self
-
- def split(self, separator, max_items: int = 0) -> List['TelegramMessage']:
- text_parts = self.text.split(separator, max_items - 1)
- output = [] # type: List[TelegramMessage]
-
- offset = 0
- for part in text_parts:
- msg = TelegramMessage(part)
- for entity in self.entities:
- start_in_range = len(part) > entity.offset - offset >= 0
- end_in_range = len(part) >= entity.offset - offset + entity.length > 0
- if start_in_range and end_in_range:
- msg.entities.append(Entity.adjust(entity, offset_diff(-offset)))
- output.append(msg)
-
- offset += len(part)
- offset += len(separator)
-
- return output
-
- @staticmethod
- def join(items: Sequence[Union[str, 'TelegramMessage']],
- separator: str = " ") -> 'TelegramMessage':
- main = TelegramMessage()
- for msg in items:
- if isinstance(msg, str):
- msg = TelegramMessage(text=msg)
- main.entities += Entity.adjust(msg.entities, offset_diff(len(main.text)))
- main.text += msg.text + separator
- if len(separator) > 0:
- main.text = main.text[:-len(separator)]
- return main
+ @property
+ def telegram_entities(self) -> List[TypeMessageEntity]:
+ return [entity.internal for entity in self.entities]
diff --git a/mautrix_telegram/formatter/from_telegram.py b/mautrix_telegram/formatter/from_telegram.py
index fb23528b..e1d3c4c6 100644
--- a/mautrix_telegram/formatter/from_telegram.py
+++ b/mautrix_telegram/formatter/from_telegram.py
@@ -25,6 +25,7 @@ from telethon.tl.types import (MessageEntityMention, MessageEntityMentionName, M
MessageEntityPhone, TypeMessageEntity, Message, PeerChannel,
MessageEntityBlockquote, MessageEntityStrike, MessageFwdHeader,
MessageEntityUnderline, PeerUser)
+from telethon.helpers import add_surrogate, del_surrogate
from mautrix.errors import MatrixRequestError
from mautrix.appservice import IntentAPI
@@ -34,7 +35,6 @@ from mautrix.types import (TextMessageEventContent, RelatesTo, RelationType, For
from .. import user as u, puppet as pu, portal as po
from ..types import TelegramID
from ..db import Message as DBMessage
-from .util import (add_surrogates, remove_surrogates)
if TYPE_CHECKING:
from ..abstract_user import AbstractUser
@@ -136,7 +136,7 @@ async def telegram_to_matrix(evt: Message, source: "AbstractUser",
no_reply_fallback: bool = False) -> TextMessageEventContent:
content = TextMessageEventContent(
msgtype=MessageType.TEXT,
- body=add_surrogates(override_text or evt.message),
+ body=add_surrogate(override_text or evt.message),
)
entities = override_entities or evt.entities
if entities:
@@ -163,11 +163,10 @@ async def telegram_to_matrix(evt: Message, source: "AbstractUser",
content.body += f"\n- {evt.post_author}"
content.formatted_body += f"
- {evt.post_author}"
- if content.formatted_body:
- content.formatted_body = content.formatted_body.replace("\n", "
")
+ content.body = del_surrogate(content.body)
- content.body = remove_surrogates(content.body)
- content.formatted_body = remove_surrogates(content.formatted_body)
+ if content.formatted_body:
+ content.formatted_body = del_surrogate(content.formatted_body.replace("\n", "
"))
return content
@@ -284,8 +283,8 @@ def _parse_name_mention(html: List[str], entity_text: str, user_id: TelegramID)
return False
-message_link_regex = re.compile(
- r"https?://t(?:elegram)?\.(?:me|dog)/([A-Za-z][A-Za-z0-9_]{3,}[A-Za-z0-9])/([0-9]{1,50})")
+message_link_regex = re.compile(r"https?://t(?:elegram)?\.(?:me|dog)/"
+ r"([A-Za-z][A-Za-z0-9_]{3,}[A-Za-z0-9])/([0-9]{1,50})")
def _parse_url(html: List[str], entity_text: str, url: str) -> bool:
diff --git a/mautrix_telegram/formatter/util.py b/mautrix_telegram/formatter/util.py
deleted file mode 100644
index bde76adb..00000000
--- a/mautrix_telegram/formatter/util.py
+++ /dev/null
@@ -1,34 +0,0 @@
-# mautrix-telegram - A Matrix-Telegram puppeting bridge
-# Copyright (C) 2019 Tulir Asokan
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see .
-from typing import Optional, Pattern
-import struct
-import re
-
-
-# add_surrogates and remove_surrogates are unicode surrogate utility functions from Telethon.
-# Licensed under the MIT license.
-# https://github.com/LonamiWebs/Telethon/blob/7cce7aa3e4c6c7019a55530391b1761d33e5a04e/telethon/helpers.py
-def add_surrogates(text: Optional[str]) -> Optional[str]:
- if text is None:
- return None
- return "".join("".join(chr(y) for y in struct.unpack(" Optional[str]:
- if text is None:
- return None
- return text.encode("utf-16", "surrogatepass").decode("utf-16")
diff --git a/mautrix_telegram/util/__init__.py b/mautrix_telegram/util/__init__.py
index 7071b2d6..5245ef9a 100644
--- a/mautrix_telegram/util/__init__.py
+++ b/mautrix_telegram/util/__init__.py
@@ -1,4 +1,3 @@
from .file_transfer import transfer_file_to_matrix, convert_image
from .format_duration import format_duration
-from .signed_token import sign_token, verify_token
from .recursive_dict import recursive_del, recursive_set, recursive_get
diff --git a/mautrix_telegram/util/file_transfer.py b/mautrix_telegram/util/file_transfer.py
index c4c01930..93d095d0 100644
--- a/mautrix_telegram/util/file_transfer.py
+++ b/mautrix_telegram/util/file_transfer.py
@@ -38,6 +38,7 @@ try:
from PIL import Image
except ImportError:
Image = None
+
try:
from moviepy.editor import VideoFileClip
import random
@@ -47,7 +48,7 @@ try:
except ImportError:
VideoFileClip = random = string = os = mimetypes = None
-log = logging.getLogger("mau.util") # type: logging.Logger
+log: logging.Logger = logging.getLogger("mau.util")
TypeLocation = Union[Document, InputDocumentFileLocation, InputPeerPhotoFileLocation,
InputFileLocation, InputPhotoFileLocation]
@@ -59,7 +60,7 @@ def convert_image(file: bytes, source_mime: str = "image/webp", target_type: str
if not Image:
return source_mime, file, None, None
try:
- image = Image.open(BytesIO(file)).convert("RGBA") # type: Image.Image
+ image: Image.Image = Image.open(BytesIO(file)).convert("RGBA")
if thumbnail_to:
image.thumbnail(thumbnail_to, Image.ANTIALIAS)
new_file = BytesIO()
@@ -134,7 +135,7 @@ async def transfer_thumbnail_to_matrix(client: MautrixTelegramClient, intent: In
width, height = None, None
mime_type = magic.from_buffer(file, mime=True)
- content_uri = await intent.upload_file(file, mime_type)
+ content_uri = await intent.upload_media(file, mime_type)
db_file = DBTelegramFile(id=loc_id, mxc=content_uri, mime_type=mime_type,
was_converted=False, timestamp=int(time.time()), size=len(file),
@@ -148,7 +149,7 @@ async def transfer_thumbnail_to_matrix(client: MautrixTelegramClient, intent: In
return db_file
-transfer_locks = {} # type: Dict[str, asyncio.Lock]
+transfer_locks: Dict[str, asyncio.Lock] = {}
TypeThumbnail = Optional[Union[TypeLocation, TypePhotoSize]]
@@ -202,7 +203,7 @@ async def _unlocked_transfer_file_to_matrix(client: MautrixTelegramClient, inten
mime_type = new_mime_type
thumbnail = None
- content_uri = await intent.upload_file(file, mime_type)
+ content_uri = await intent.upload_media(file, mime_type)
db_file = DBTelegramFile(id=loc_id, mxc=content_uri,
mime_type=mime_type, was_converted=image_converted,
diff --git a/mautrix_telegram/util/signed_token.py b/mautrix_telegram/util/signed_token.py
deleted file mode 100644
index 86cf9f8b..00000000
--- a/mautrix_telegram/util/signed_token.py
+++ /dev/null
@@ -1,52 +0,0 @@
-# mautrix-telegram - A Matrix-Telegram puppeting bridge
-# Copyright (C) 2019 Tulir Asokan
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see .
-from typing import Dict, Optional
-import json
-import base64
-import hashlib
-
-
-def _get_checksum(key: str, payload: bytes) -> str:
- hasher = hashlib.sha256()
- hasher.update(payload)
- hasher.update(key.encode("utf-8"))
- checksum = hasher.hexdigest()
- return checksum
-
-
-def sign_token(key: str, payload: Dict) -> str:
- payload_b64 = base64.urlsafe_b64encode(json.dumps(payload).encode("utf-8"))
- checksum = _get_checksum(key, payload_b64)
- return f"{checksum}:{payload_b64.decode('utf-8')}"
-
-
-def verify_token(key: str, data: str) -> Optional[Dict]:
- if not data:
- return None
-
- try:
- checksum, payload = data.split(":", 1)
- except ValueError:
- return None
-
- if checksum != _get_checksum(key, payload.encode("utf-8")):
- return None
-
- payload = base64.urlsafe_b64decode(payload).decode("utf-8")
- try:
- return json.loads(payload)
- except json.JSONDecodeError:
- return None
diff --git a/mautrix_telegram/web/public/__init__.py b/mautrix_telegram/web/public/__init__.py
index 31dc07e3..71f7de88 100644
--- a/mautrix_telegram/web/public/__init__.py
+++ b/mautrix_telegram/web/public/__init__.py
@@ -25,8 +25,8 @@ from aiohttp import web
import pkg_resources
from mautrix.types import UserID
+from mautrix.util.signed_token import sign_token, verify_token
-from ...util import sign_token, verify_token
from ...user import User
from ...puppet import Puppet
from ..common import AuthAPI