diff --git a/mautrix_telegram/formatter/from_matrix/__init__.py b/mautrix_telegram/formatter/from_matrix/__init__.py
index 47eafaef..99a9a70a 100644
--- a/mautrix_telegram/formatter/from_matrix/__init__.py
+++ b/mautrix_telegram/formatter/from_matrix/__init__.py
@@ -68,6 +68,10 @@ def cut_long_message(message: str, entities: List[TypeMessageEntity]) -> ParsedM
return message, entities
+class FormatError(Exception):
+ pass
+
+
def matrix_to_telegram(html: str) -> ParsedMessage:
try:
html = command_regex.sub(r"\1", html)
@@ -82,8 +86,8 @@ def matrix_to_telegram(html: str) -> ParsedMessage:
text, entities = cut_long_message(text, entities)
return text, entities
- except Exception:
- log.exception("Failed to convert Matrix format:\nhtml=%s", html)
+ except Exception as e:
+ raise FormatError(f"Failed to convert Matrix format: {html}") from e
def matrix_reply_to_telegram(content: dict, tg_space: int, room_id: Optional[str] = None
diff --git a/mautrix_telegram/formatter/from_matrix/parser_lxml.py b/mautrix_telegram/formatter/from_matrix/parser_lxml.py
index 94531d31..5ce01d59 100644
--- a/mautrix_telegram/formatter/from_matrix/parser_lxml.py
+++ b/mautrix_telegram/formatter/from_matrix/parser_lxml.py
@@ -14,39 +14,312 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
-from lxml import etree
-from typing import Optional, List, Tuple, Type, Callable, Dict, Any
-import math
-import re
-import logging
+from typing import Optional, List, Tuple, Union
+from lxml import html
-from telethon.tl.types import (MessageEntityMention, MessageEntityMentionName, MessageEntityEmail,
- MessageEntityUrl, MessageEntityTextUrl, MessageEntityBold,
- MessageEntityItalic, MessageEntityCode, MessageEntityPre,
- MessageEntityBotCommand, TypeMessageEntity)
+from telethon.tl.types import (MessageEntityMention as Mention,
+ MessageEntityMentionName as MentionName, MessageEntityEmail as Email,
+ MessageEntityUrl as URL, MessageEntityTextUrl as TextURL,
+ MessageEntityBold as Bold, MessageEntityItalic as Italic,
+ MessageEntityCode as Code, MessageEntityPre as Pre,
+ MessageEntityBotCommand as Command, TypeMessageEntity,
+ InputMessageEntityMentionName as InputMentionName)
-from ...context import Context
from ... import user as u, puppet as pu, portal as po
-from ...db import Message as DBMessage
-from ...formatter.util import (add_surrogates, remove_surrogates, trim_reply_fallback_html,
- trim_reply_fallback_text, html_to_unicode)
from .parser_common import MatrixParserCommon, ParsedMessage
-class MatrixParser(MatrixParserCommon):
- def __init__(self):
- self.text = "" # type: str
- self.entities = [] # type: List[TypeMessageEntity]
-
- def parse_node(self, node) -> ParsedMessage:
- pass
-
- def feed(self, html: str):
- document = etree.parse(html)
- self.text, self.entities = self.parse_node(document)
-
-
def parse_html(html: str) -> ParsedMessage:
- parser = MatrixParser()
- parser.feed(html)
- return parser.text, parser.entities
+ return MatrixParser.parse(html)
+
+
+class Entity:
+ @staticmethod
+ def copy(entity: TypeMessageEntity) -> TypeMessageEntity:
+ kwargs = {
+ "offset": entity.offset,
+ "length": entity.length,
+ }
+ if isinstance(entity, Pre):
+ kwargs["language"] = entity.language
+ elif isinstance(entity, TextURL):
+ kwargs["url"] = entity.url
+ elif isinstance(entity, (MentionName, InputMentionName)):
+ kwargs["user_id"] = entity.user_id
+ return entity.__class__(**kwargs)
+
+ @classmethod
+ def adjust(cls, entity: Union[TypeMessageEntity, List[TypeMessageEntity]], *,
+ length: int = None, offset: int = None, offset_diff: int = None
+ ) -> Union[TypeMessageEntity, List[TypeMessageEntity]]:
+ if isinstance(entity, list):
+ return [Entity.adjust(element, length=length, offset=offset, offset_diff=offset_diff)
+ for element in entity]
+ entity = cls.copy(entity)
+ if length is not None:
+ entity.length = length
+ if offset is not None:
+ entity.offset = offset
+ if offset_diff is not None:
+ entity.offset += offset_diff
+ if entity.offset < 0:
+ entity.length += entity.offset
+ entity.offset = 0
+ return entity
+
+
+class TelegramMessage:
+ def __init__(self, text: str = "", entities: Optional[List[TypeMessageEntity]] = None):
+ self.text = text # type: str
+ self.entities = entities or [] # type: List[TypeMessageEntity]
+
+ def offset_entities(self, offset: int) -> "TelegramMessage":
+ def apply_offset(entity: TypeMessageEntity, offset: int):
+ entity = Entity.copy(entity)
+ entity.offset += offset
+ if entity.offset < 0:
+ entity.offset = 0
+ elif entity.offset > len(self.text):
+ return None
+ elif entity.offset + entity.length > len(self.text):
+ entity.length = len(self.text) - entity.offset
+ return entity
+
+ self.entities = [apply_offset(entity, offset) for entity in self.entities if entity]
+ return self
+
+ def append(self, *args: Union[str, "TelegramMessage"]) -> "TelegramMessage":
+ for msg in args:
+ if isinstance(msg, str):
+ msg = TelegramMessage(text=msg)
+ self.entities += Entity.adjust(msg.entities, offset_diff=len(self.text))
+ self.text += msg.text
+ return self
+
+ def prepend(self, *args: Union[str, "TelegramMessage"]) -> "TelegramMessage":
+ for msg in args:
+ if isinstance(msg, str):
+ msg = TelegramMessage(text=msg)
+ self.entities = msg.entities + Entity.adjust(self.entities, offset_diff=len(msg.text))
+ self.text = msg.text + self.text
+ return self
+
+ def format(self, entity_type: type(TypeMessageEntity), offset: int = None, length: int = None,
+ **kwargs) -> "TelegramMessage":
+ self.entities.append(entity_type(offset=offset or 0,
+ length=length if length is not None else len(self.text),
+ **kwargs))
+ return self
+
+ def concat(self, *args: Union[str, "TelegramMessage"]) -> "TelegramMessage":
+ return TelegramMessage().append(self, *args)
+
+ def trim(self) -> "TelegramMessage":
+ orig_len = len(self.text)
+ self.text = self.text.lstrip()
+ diff = orig_len - len(self.text)
+ self.text = self.text.rstrip()
+ self.offset_entities(-diff)
+ return self
+
+ def split(self, separator, max_items: int = 0) -> List["TelegramMessage"]:
+ text_parts = self.text.split(separator, max_items - 1)
+ output = [] # type: List[TelegramMessage]
+
+ offset = 0
+ for part in text_parts:
+ msg = TelegramMessage(part)
+ for entity in self.entities:
+ start_in_range = len(part) > entity.offset - offset >= 0
+ end_in_range = len(part) >= entity.offset - offset + entity.length > 0
+ if start_in_range and end_in_range:
+ msg.entities.append(Entity.adjust(entity, offset_diff=-offset))
+ output.append(msg)
+
+ offset += len(part)
+ offset += len(separator)
+
+ return output
+
+ @staticmethod
+ def join(items: List[Union[str, "TelegramMessage"]], separator: str = " ") -> "TelegramMessage":
+ main = TelegramMessage()
+ for msg in items:
+ if isinstance(msg, str):
+ msg = TelegramMessage(text=msg)
+ main.entities += Entity.adjust(msg.entities, offset_diff=len(main.text))
+ main.text += msg.text + separator
+ main.text = main.text[:-len(separator)]
+ return main
+
+
+class MatrixParser(MatrixParserCommon):
+ @classmethod
+ def list_to_tmessage(cls, node: html.HtmlElement, strip_linebreaks) -> TelegramMessage:
+ ordered = node.tag == "ol"
+ tagged_children = cls.node_to_tagged_tmessages(node, strip_linebreaks)
+ counter = 1
+ indent_length = 0
+ if ordered:
+ try:
+ counter = int(node.attrib.get("start", "1"))
+ except ValueError:
+ counter = 1
+
+ longest_index = counter - 1 + len(tagged_children)
+ indent_length = len(str(longest_index))
+ indent = (indent_length + 4) * " "
+ children = [] # type: List[TelegramMessage]
+ for child, tag in tagged_children:
+ if tag != "li":
+ continue
+
+ if ordered:
+ prefix = f"{counter}. "
+ counter += 1
+ else:
+ prefix = "● "
+ child = child.prepend(prefix)
+ parts = child.split("\n")
+ parts = parts[:1] + [part.prepend(indent) for part in parts[1:]]
+ child = TelegramMessage.join(parts, "\n")
+ children.append(child)
+ return TelegramMessage.join(children, "\n")
+
+ @classmethod
+ def blockquote_to_tmessage(cls, node: html.HtmlElement, strip_linebreaks) -> TelegramMessage:
+ msg = cls.tag_aware_parse_node(node, strip_linebreaks)
+ children = msg.trim().split("\n")
+ children = [child.prepend("> ") for child in children]
+ return TelegramMessage.join(children, "\n")
+
+ @classmethod
+ def header_to_tmessage(cls, node: html.HtmlElement, strip_linebreaks) -> TelegramMessage:
+ children = cls.node_to_tmessages(node, strip_linebreaks)
+ length = int(node.tag[1])
+ prefix = "#" * length + " "
+ return TelegramMessage.join(children, "").prepend(prefix)
+
+ @classmethod
+ def basic_format_to_tmessage(cls, node: html.HtmlElement, strip_linebreaks) -> TelegramMessage:
+ msg = cls.tag_aware_parse_node(node, strip_linebreaks)
+ if node.tag in ("b", "strong"):
+ msg.format(Bold)
+ elif node.tag in ("i", "em"):
+ msg.format(Italic)
+ elif node.tag == "command":
+ msg.format(Command)
+ elif node.tag in ("s", "del"):
+ pass # TODO
+ elif node.tag in ("u", "ins"):
+ pass # TODO
+ return msg
+
+ @classmethod
+ def link_to_tstring(cls, node: html.HtmlElement, strip_linebreaks) -> TelegramMessage:
+ msg = cls.tag_aware_parse_node(node, strip_linebreaks)
+ href = node.attrib.get("href", "")
+ if not href:
+ return msg
+
+ if href.startswith("mailto:"):
+ return TelegramMessage(href[len("mailto:"):]).format(Email)
+
+ mention = cls.mention_regex.match(href)
+ if mention:
+ mxid = mention.group(1)
+ user = (pu.Puppet.get_by_mxid(mxid)
+ or u.User.get_by_mxid(mxid, create=False))
+ if not user:
+ return msg
+ if user.username:
+ return TelegramMessage(f"@{user.username}").format(Mention)
+ elif user.tgid:
+ return TelegramMessage(user.displayname or msg.text).format(MentionName,
+ user_id=user.tgid)
+ return msg
+
+ room = cls.room_regex.match(href)
+ if room:
+ username = po.Portal.get_username_from_mx_alias(room.group(1))
+ portal = po.Portal.find_by_username(username)
+ if portal and portal.username:
+ return TelegramMessage(f"@{portal.username}").format(Mention)
+
+ return (msg.format(URL)
+ if msg.text == href
+ else msg.format(TextURL, url=href))
+
+ @classmethod
+ def node_to_tmessage(cls, node: html.HtmlElement, strip_linebreaks) -> TelegramMessage:
+ if node.tag == "blockquote":
+ return cls.blockquote_to_tmessage(node, strip_linebreaks)
+ elif node.tag in ("ol", "ul"):
+ return cls.list_to_tmessage(node, strip_linebreaks)
+ elif node.tag in ("h1", "h2", "h3", "h4", "h5", "h6"):
+ return cls.header_to_tmessage(node, strip_linebreaks)
+ elif node.tag == "br":
+ return TelegramMessage("\n")
+ elif node.tag in ("b", "strong", "i", "em", "s", "del", "u", "ins", "command"):
+ return cls.basic_format_to_tmessage(node, strip_linebreaks)
+ elif node.tag == "a":
+ return cls.link_to_tstring(node, strip_linebreaks)
+ elif node.tag == "p":
+ return cls.tag_aware_parse_node(node, strip_linebreaks).append("\n")
+ elif node.tag == "pre":
+ lang = ""
+ try:
+ if node[0].tag == "code":
+ lang = node[0].attrib["class"][len("language-"):]
+ node = node[0]
+ except (IndexError, KeyError):
+ pass
+ return cls.parse_node(node, strip_linebreaks=False).format(Pre, language=lang)
+ elif node.tag == "code":
+ return cls.parse_node(node, strip_linebreaks=False).format(Code)
+ return cls.tag_aware_parse_node(node, strip_linebreaks)
+
+ @staticmethod
+ def text_to_tmessage(text: str, strip_linebreaks: bool = True) -> TelegramMessage:
+ if strip_linebreaks:
+ text = text.replace("\n", "")
+ return TelegramMessage(text)
+
+ @classmethod
+ def node_to_tagged_tmessages(cls, node: html.HtmlElement, strip_linebreaks: bool = True
+ ) -> List[Tuple[TelegramMessage, str]]:
+ output = []
+
+ if node.text:
+ output.append((cls.text_to_tmessage(node.text, strip_linebreaks), "text"))
+ for child in node:
+ output.append((cls.node_to_tmessage(child, strip_linebreaks), child.tag))
+ if child.tail:
+ output.append((cls.text_to_tmessage(child.tail, strip_linebreaks), "text"))
+ return output
+
+ @classmethod
+ def node_to_tmessages(cls, node: html.HtmlElement, strip_linebreaks) -> List[
+ TelegramMessage]:
+ return [msg for (msg, tag) in cls.node_to_tagged_tmessages(node, strip_linebreaks)]
+
+ @classmethod
+ def tag_aware_parse_node(cls, node: html.HtmlElement,
+ strip_linebreaks: bool = True) -> TelegramMessage:
+ msgs = cls.node_to_tagged_tmessages(node, strip_linebreaks)
+ output = TelegramMessage()
+ for msg, tag in msgs:
+ if tag in cls.block_tags:
+ msg = msg.append("\n").prepend("\n")
+ output = output.append(msg)
+ return output.trim()
+
+ @classmethod
+ def parse_node(cls, node: html.HtmlElement, strip_linebreaks: bool = True) -> TelegramMessage:
+ return TelegramMessage.join(cls.node_to_tmessages(node, strip_linebreaks))
+
+ @classmethod
+ def parse(cls, data: str) -> ParsedMessage:
+ document = html.fromstring(f"{data}")
+ msg = cls.parse_node(document)
+ return msg.text, msg.entities