From 3cca11a997e1056c767cd0c15289a212ccb9312e Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Wed, 25 Jul 2018 21:45:25 -0400 Subject: [PATCH] Implement lxml parser --- .../formatter/from_matrix/__init__.py | 8 +- .../formatter/from_matrix/parser_lxml.py | 331 ++++++++++++++++-- 2 files changed, 308 insertions(+), 31 deletions(-) diff --git a/mautrix_telegram/formatter/from_matrix/__init__.py b/mautrix_telegram/formatter/from_matrix/__init__.py index 47eafaef..99a9a70a 100644 --- a/mautrix_telegram/formatter/from_matrix/__init__.py +++ b/mautrix_telegram/formatter/from_matrix/__init__.py @@ -68,6 +68,10 @@ def cut_long_message(message: str, entities: List[TypeMessageEntity]) -> ParsedM return message, entities +class FormatError(Exception): + pass + + def matrix_to_telegram(html: str) -> ParsedMessage: try: html = command_regex.sub(r"\1", html) @@ -82,8 +86,8 @@ def matrix_to_telegram(html: str) -> ParsedMessage: text, entities = cut_long_message(text, entities) return text, entities - except Exception: - log.exception("Failed to convert Matrix format:\nhtml=%s", html) + except Exception as e: + raise FormatError(f"Failed to convert Matrix format: {html}") from e def matrix_reply_to_telegram(content: dict, tg_space: int, room_id: Optional[str] = None diff --git a/mautrix_telegram/formatter/from_matrix/parser_lxml.py b/mautrix_telegram/formatter/from_matrix/parser_lxml.py index 94531d31..5ce01d59 100644 --- a/mautrix_telegram/formatter/from_matrix/parser_lxml.py +++ b/mautrix_telegram/formatter/from_matrix/parser_lxml.py @@ -14,39 +14,312 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from lxml import etree -from typing import Optional, List, Tuple, Type, Callable, Dict, Any -import math -import re -import logging +from typing import Optional, List, Tuple, Union +from lxml import html -from telethon.tl.types import (MessageEntityMention, MessageEntityMentionName, MessageEntityEmail, - MessageEntityUrl, MessageEntityTextUrl, MessageEntityBold, - MessageEntityItalic, MessageEntityCode, MessageEntityPre, - MessageEntityBotCommand, TypeMessageEntity) +from telethon.tl.types import (MessageEntityMention as Mention, + MessageEntityMentionName as MentionName, MessageEntityEmail as Email, + MessageEntityUrl as URL, MessageEntityTextUrl as TextURL, + MessageEntityBold as Bold, MessageEntityItalic as Italic, + MessageEntityCode as Code, MessageEntityPre as Pre, + MessageEntityBotCommand as Command, TypeMessageEntity, + InputMessageEntityMentionName as InputMentionName) -from ...context import Context from ... import user as u, puppet as pu, portal as po -from ...db import Message as DBMessage -from ...formatter.util import (add_surrogates, remove_surrogates, trim_reply_fallback_html, - trim_reply_fallback_text, html_to_unicode) from .parser_common import MatrixParserCommon, ParsedMessage -class MatrixParser(MatrixParserCommon): - def __init__(self): - self.text = "" # type: str - self.entities = [] # type: List[TypeMessageEntity] - - def parse_node(self, node) -> ParsedMessage: - pass - - def feed(self, html: str): - document = etree.parse(html) - self.text, self.entities = self.parse_node(document) - - def parse_html(html: str) -> ParsedMessage: - parser = MatrixParser() - parser.feed(html) - return parser.text, parser.entities + return MatrixParser.parse(html) + + +class Entity: + @staticmethod + def copy(entity: TypeMessageEntity) -> TypeMessageEntity: + kwargs = { + "offset": entity.offset, + "length": entity.length, + } + if isinstance(entity, Pre): + kwargs["language"] = entity.language + elif isinstance(entity, TextURL): + kwargs["url"] = entity.url + elif isinstance(entity, (MentionName, InputMentionName)): + kwargs["user_id"] = entity.user_id + return entity.__class__(**kwargs) + + @classmethod + def adjust(cls, entity: Union[TypeMessageEntity, List[TypeMessageEntity]], *, + length: int = None, offset: int = None, offset_diff: int = None + ) -> Union[TypeMessageEntity, List[TypeMessageEntity]]: + if isinstance(entity, list): + return [Entity.adjust(element, length=length, offset=offset, offset_diff=offset_diff) + for element in entity] + entity = cls.copy(entity) + if length is not None: + entity.length = length + if offset is not None: + entity.offset = offset + if offset_diff is not None: + entity.offset += offset_diff + if entity.offset < 0: + entity.length += entity.offset + entity.offset = 0 + return entity + + +class TelegramMessage: + def __init__(self, text: str = "", entities: Optional[List[TypeMessageEntity]] = None): + self.text = text # type: str + self.entities = entities or [] # type: List[TypeMessageEntity] + + def offset_entities(self, offset: int) -> "TelegramMessage": + def apply_offset(entity: TypeMessageEntity, offset: int): + entity = Entity.copy(entity) + entity.offset += offset + if entity.offset < 0: + entity.offset = 0 + elif entity.offset > len(self.text): + return None + elif entity.offset + entity.length > len(self.text): + entity.length = len(self.text) - entity.offset + return entity + + self.entities = [apply_offset(entity, offset) for entity in self.entities if entity] + return self + + def append(self, *args: Union[str, "TelegramMessage"]) -> "TelegramMessage": + for msg in args: + if isinstance(msg, str): + msg = TelegramMessage(text=msg) + self.entities += Entity.adjust(msg.entities, offset_diff=len(self.text)) + self.text += msg.text + return self + + def prepend(self, *args: Union[str, "TelegramMessage"]) -> "TelegramMessage": + for msg in args: + if isinstance(msg, str): + msg = TelegramMessage(text=msg) + self.entities = msg.entities + Entity.adjust(self.entities, offset_diff=len(msg.text)) + self.text = msg.text + self.text + return self + + def format(self, entity_type: type(TypeMessageEntity), offset: int = None, length: int = None, + **kwargs) -> "TelegramMessage": + self.entities.append(entity_type(offset=offset or 0, + length=length if length is not None else len(self.text), + **kwargs)) + return self + + def concat(self, *args: Union[str, "TelegramMessage"]) -> "TelegramMessage": + return TelegramMessage().append(self, *args) + + def trim(self) -> "TelegramMessage": + orig_len = len(self.text) + self.text = self.text.lstrip() + diff = orig_len - len(self.text) + self.text = self.text.rstrip() + self.offset_entities(-diff) + return self + + def split(self, separator, max_items: int = 0) -> List["TelegramMessage"]: + text_parts = self.text.split(separator, max_items - 1) + output = [] # type: List[TelegramMessage] + + offset = 0 + for part in text_parts: + msg = TelegramMessage(part) + for entity in self.entities: + start_in_range = len(part) > entity.offset - offset >= 0 + end_in_range = len(part) >= entity.offset - offset + entity.length > 0 + if start_in_range and end_in_range: + msg.entities.append(Entity.adjust(entity, offset_diff=-offset)) + output.append(msg) + + offset += len(part) + offset += len(separator) + + return output + + @staticmethod + def join(items: List[Union[str, "TelegramMessage"]], separator: str = " ") -> "TelegramMessage": + main = TelegramMessage() + for msg in items: + if isinstance(msg, str): + msg = TelegramMessage(text=msg) + main.entities += Entity.adjust(msg.entities, offset_diff=len(main.text)) + main.text += msg.text + separator + main.text = main.text[:-len(separator)] + return main + + +class MatrixParser(MatrixParserCommon): + @classmethod + def list_to_tmessage(cls, node: html.HtmlElement, strip_linebreaks) -> TelegramMessage: + ordered = node.tag == "ol" + tagged_children = cls.node_to_tagged_tmessages(node, strip_linebreaks) + counter = 1 + indent_length = 0 + if ordered: + try: + counter = int(node.attrib.get("start", "1")) + except ValueError: + counter = 1 + + longest_index = counter - 1 + len(tagged_children) + indent_length = len(str(longest_index)) + indent = (indent_length + 4) * " " + children = [] # type: List[TelegramMessage] + for child, tag in tagged_children: + if tag != "li": + continue + + if ordered: + prefix = f"{counter}. " + counter += 1 + else: + prefix = "● " + child = child.prepend(prefix) + parts = child.split("\n") + parts = parts[:1] + [part.prepend(indent) for part in parts[1:]] + child = TelegramMessage.join(parts, "\n") + children.append(child) + return TelegramMessage.join(children, "\n") + + @classmethod + def blockquote_to_tmessage(cls, node: html.HtmlElement, strip_linebreaks) -> TelegramMessage: + msg = cls.tag_aware_parse_node(node, strip_linebreaks) + children = msg.trim().split("\n") + children = [child.prepend("> ") for child in children] + return TelegramMessage.join(children, "\n") + + @classmethod + def header_to_tmessage(cls, node: html.HtmlElement, strip_linebreaks) -> TelegramMessage: + children = cls.node_to_tmessages(node, strip_linebreaks) + length = int(node.tag[1]) + prefix = "#" * length + " " + return TelegramMessage.join(children, "").prepend(prefix) + + @classmethod + def basic_format_to_tmessage(cls, node: html.HtmlElement, strip_linebreaks) -> TelegramMessage: + msg = cls.tag_aware_parse_node(node, strip_linebreaks) + if node.tag in ("b", "strong"): + msg.format(Bold) + elif node.tag in ("i", "em"): + msg.format(Italic) + elif node.tag == "command": + msg.format(Command) + elif node.tag in ("s", "del"): + pass # TODO + elif node.tag in ("u", "ins"): + pass # TODO + return msg + + @classmethod + def link_to_tstring(cls, node: html.HtmlElement, strip_linebreaks) -> TelegramMessage: + msg = cls.tag_aware_parse_node(node, strip_linebreaks) + href = node.attrib.get("href", "") + if not href: + return msg + + if href.startswith("mailto:"): + return TelegramMessage(href[len("mailto:"):]).format(Email) + + mention = cls.mention_regex.match(href) + if mention: + mxid = mention.group(1) + user = (pu.Puppet.get_by_mxid(mxid) + or u.User.get_by_mxid(mxid, create=False)) + if not user: + return msg + if user.username: + return TelegramMessage(f"@{user.username}").format(Mention) + elif user.tgid: + return TelegramMessage(user.displayname or msg.text).format(MentionName, + user_id=user.tgid) + return msg + + room = cls.room_regex.match(href) + if room: + username = po.Portal.get_username_from_mx_alias(room.group(1)) + portal = po.Portal.find_by_username(username) + if portal and portal.username: + return TelegramMessage(f"@{portal.username}").format(Mention) + + return (msg.format(URL) + if msg.text == href + else msg.format(TextURL, url=href)) + + @classmethod + def node_to_tmessage(cls, node: html.HtmlElement, strip_linebreaks) -> TelegramMessage: + if node.tag == "blockquote": + return cls.blockquote_to_tmessage(node, strip_linebreaks) + elif node.tag in ("ol", "ul"): + return cls.list_to_tmessage(node, strip_linebreaks) + elif node.tag in ("h1", "h2", "h3", "h4", "h5", "h6"): + return cls.header_to_tmessage(node, strip_linebreaks) + elif node.tag == "br": + return TelegramMessage("\n") + elif node.tag in ("b", "strong", "i", "em", "s", "del", "u", "ins", "command"): + return cls.basic_format_to_tmessage(node, strip_linebreaks) + elif node.tag == "a": + return cls.link_to_tstring(node, strip_linebreaks) + elif node.tag == "p": + return cls.tag_aware_parse_node(node, strip_linebreaks).append("\n") + elif node.tag == "pre": + lang = "" + try: + if node[0].tag == "code": + lang = node[0].attrib["class"][len("language-"):] + node = node[0] + except (IndexError, KeyError): + pass + return cls.parse_node(node, strip_linebreaks=False).format(Pre, language=lang) + elif node.tag == "code": + return cls.parse_node(node, strip_linebreaks=False).format(Code) + return cls.tag_aware_parse_node(node, strip_linebreaks) + + @staticmethod + def text_to_tmessage(text: str, strip_linebreaks: bool = True) -> TelegramMessage: + if strip_linebreaks: + text = text.replace("\n", "") + return TelegramMessage(text) + + @classmethod + def node_to_tagged_tmessages(cls, node: html.HtmlElement, strip_linebreaks: bool = True + ) -> List[Tuple[TelegramMessage, str]]: + output = [] + + if node.text: + output.append((cls.text_to_tmessage(node.text, strip_linebreaks), "text")) + for child in node: + output.append((cls.node_to_tmessage(child, strip_linebreaks), child.tag)) + if child.tail: + output.append((cls.text_to_tmessage(child.tail, strip_linebreaks), "text")) + return output + + @classmethod + def node_to_tmessages(cls, node: html.HtmlElement, strip_linebreaks) -> List[ + TelegramMessage]: + return [msg for (msg, tag) in cls.node_to_tagged_tmessages(node, strip_linebreaks)] + + @classmethod + def tag_aware_parse_node(cls, node: html.HtmlElement, + strip_linebreaks: bool = True) -> TelegramMessage: + msgs = cls.node_to_tagged_tmessages(node, strip_linebreaks) + output = TelegramMessage() + for msg, tag in msgs: + if tag in cls.block_tags: + msg = msg.append("\n").prepend("\n") + output = output.append(msg) + return output.trim() + + @classmethod + def parse_node(cls, node: html.HtmlElement, strip_linebreaks: bool = True) -> TelegramMessage: + return TelegramMessage.join(cls.node_to_tmessages(node, strip_linebreaks)) + + @classmethod + def parse(cls, data: str) -> ParsedMessage: + document = html.fromstring(f"{data}") + msg = cls.parse_node(document) + return msg.text, msg.entities