Initial split to htmlparser/lxml matrix->telegram formatters

This commit is contained in:
Tulir Asokan
2018-06-24 20:14:11 +03:00
parent 2172587286
commit 99f84b5dfe
5 changed files with 246 additions and 137 deletions
@@ -0,0 +1,150 @@
# -*- coding: future_fstrings -*-
# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2018 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Optional, List, Tuple, Callable
import re
import logging
from telethon.tl.types import (MessageEntityMention, MessageEntityMentionName, MessageEntityItalic,
TypeMessageEntity)
from ...context import Context
from ... import puppet as pu
from ...db import Message as DBMessage
from ..util import (add_surrogates, remove_surrogates, trim_reply_fallback_html,
trim_reply_fallback_text)
from .parser_common import ParsedMessage
try:
from mautrix_telegram.formatter.from_matrix.parser_lxml import parse_html
except ImportError:
from mautrix_telegram.formatter.from_matrix.parser_htmlparser import parse_html
log = logging.getLogger("mau.fmt.mx")
should_bridge_plaintext_highlights = False
command_regex = re.compile(r"^!([A-Za-z0-9@]+)")
not_command_regex = re.compile(r"^\\(![A-Za-z0-9@]+)")
plain_mention_regex = None
def plain_mention_to_html(match):
puppet = pu.Puppet.find_by_displayname(match.group(2))
if puppet:
return (f"{match.group(1)}"
f"<a href='https://matrix.to/#/{puppet.mxid}'>"
f"{puppet.displayname}"
"</a>")
return "".join(match.groups())
def cut_long_message(message: str, entities: List[TypeMessageEntity]) -> ParsedMessage:
if len(message) > 4096:
message = message[0:4082] + " [message cut]"
new_entities = []
for entity in entities:
if entity.offset > 4082:
continue
if entity.offset + entity.length > 4082:
entity.length = 4082 - entity.offset
new_entities.append(entity)
new_entities.append(MessageEntityItalic(4082, len(" [message cut]")))
entities = new_entities
return message, entities
def matrix_to_telegram(html: str) -> ParsedMessage:
try:
html = command_regex.sub(r"<command>\1</command>", html)
html = html.replace("\t", " " * 4)
html = not_command_regex.sub(r"\1", html)
if should_bridge_plaintext_highlights:
html = plain_mention_regex.sub(plain_mention_to_html, html)
html = add_surrogates(html)
text, entities = parse_html(add_surrogates(html))
text = remove_surrogates(html.strip())
text, entities = cut_long_message(text, entities)
return text, entities
except Exception:
log.exception("Failed to convert Matrix format:\nhtml=%s", html)
def matrix_reply_to_telegram(content: dict, tg_space: int, room_id: Optional[str] = None
) -> Optional[int]:
try:
reply = content["m.relates_to"]["m.in_reply_to"]
room_id = room_id or reply["room_id"]
event_id = reply["event_id"]
try:
if content["format"] == "org.matrix.custom.html":
content["formatted_body"] = trim_reply_fallback_html(content["formatted_body"])
except KeyError:
pass
content["body"] = trim_reply_fallback_text(content["body"])
message = DBMessage.query.filter(DBMessage.mxid == event_id,
DBMessage.tg_space == tg_space,
DBMessage.mx_room == room_id).one_or_none()
if message:
return message.tgid
except KeyError:
pass
return None
def matrix_text_to_telegram(text: str) -> ParsedMessage:
text = command_regex.sub(r"/\1", text)
text = text.replace("\t", " " * 4)
text = not_command_regex.sub(r"\1", text)
if should_bridge_plaintext_highlights:
entities, pmr_replacer = plain_mention_to_text()
text = plain_mention_regex.sub(pmr_replacer, text)
else:
entities = []
return text, entities
def plain_mention_to_text() -> Tuple[List[TypeMessageEntity], Callable[[str], str]]:
entities = []
def replacer(match) -> str:
puppet = pu.Puppet.find_by_displayname(match.group(2))
if puppet:
offset = match.start()
length = match.end() - offset
if puppet.username:
entity = MessageEntityMention(offset, length)
text = f"@{puppet.username}"
else:
entity = MessageEntityMentionName(offset, length, user_id=puppet.tgid)
text = puppet.displayname
entities.append(entity)
return text
return "".join(match.groups())
return entities, replacer
def init_mx(context: Context):
global plain_mention_regex, should_bridge_plaintext_highlights
config = context.config
dn_template = config.get("bridge.displayname_template", "{displayname} (Telegram)")
dn_template = re.escape(dn_template).replace(re.escape("{displayname}"), "[^>]+")
plain_mention_regex = re.compile(f"(\s|^)({dn_template})")
should_bridge_plaintext_highlights = config["bridge.plaintext_highlights"] or False
@@ -0,0 +1,31 @@
# -*- coding: future_fstrings -*-
# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2018 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import re
from typing import List, Tuple
from telethon.tl.types import TypeMessageEntity
class MatrixParserCommon:
mention_regex = re.compile("https://matrix.to/#/(@.+:.+)")
room_regex = re.compile("https://matrix.to/#/(#.+:.+)")
block_tags = ("br", "p", "pre", "blockquote",
"ol", "ul", "li",
"h1", "h2", "h3", "h4", "h5", "h6",
"div", "hr", "table")
ParsedMessage = Tuple[str, List[TypeMessageEntity]]
@@ -17,36 +17,28 @@
from html import unescape
from html.parser import HTMLParser
from collections import deque
from typing import Optional, List, Tuple, Type, Callable, Dict, Any
from typing import Optional, List, Tuple, Type, Dict, Any
import math
import re
import logging
from telethon.tl.types import (MessageEntityMention, MessageEntityMentionName, MessageEntityEmail,
MessageEntityUrl, MessageEntityTextUrl, MessageEntityBold,
MessageEntityItalic, MessageEntityCode, MessageEntityPre,
MessageEntityBotCommand, TypeMessageEntity)
from ..context import Context
from .. import user as u, puppet as pu, portal as po
from ..db import Message as DBMessage
from .util import (add_surrogates, remove_surrogates, trim_reply_fallback_html,
trim_reply_fallback_text, html_to_unicode)
log = logging.getLogger("mau.fmt.mx")
should_bridge_plaintext_highlights = False
from ... import user as u, puppet as pu, portal as po
from ..util import html_to_unicode
from .parser_common import MatrixParserCommon, ParsedMessage
class MatrixParser(HTMLParser):
mention_regex = re.compile("https://matrix.to/#/(@.+:.+)")
room_regex = re.compile("https://matrix.to/#/(#.+:.+)")
block_tags = ("br", "p", "pre", "blockquote",
"ol", "ul", "li",
"h1", "h2", "h3", "h4", "h5", "h6",
"div", "hr", "table")
def parse_html(html: str) -> ParsedMessage:
parser = MatrixParser()
parser.feed(html)
return parser.text, parser.entities
class MatrixParser(HTMLParser, MatrixParserCommon):
def __init__(self):
super().__init__()
super(HTMLParser, self).__init__()
self.text = ""
self.entities = []
self._building_entities = {}
@@ -242,120 +234,3 @@ class MatrixParser(HTMLParser):
if tag in self.block_tags and tag != "br" and "blockquote" not in self._open_tags:
self._newline(allow_multi=tag == "br")
command_regex = re.compile(r"^!([A-Za-z0-9@]+)")
not_command_regex = re.compile(r"^\\(![A-Za-z0-9@]+)")
plain_mention_regex = None
def plain_mention_to_html(match):
puppet = pu.Puppet.find_by_displayname(match.group(2))
if puppet:
return (f"{match.group(1)}"
f"<a href='https://matrix.to/#/{puppet.mxid}'>"
f"{puppet.displayname}"
"</a>")
return "".join(match.groups())
def cut_long_message(message: str, entities: List[TypeMessageEntity]
) -> Tuple[str, List[TypeMessageEntity]]:
if len(message) > 4096:
message = message[0:4082] + " [message cut]"
new_entities = []
for entity in entities:
if entity.offset > 4082:
continue
if entity.offset + entity.length > 4082:
entity.length = 4082 - entity.offset
new_entities.append(entity)
new_entities.append(MessageEntityItalic(4082, len(" [message cut]")))
entities = new_entities
return message, entities
def matrix_to_telegram(html: str) -> Tuple[str, List[TypeMessageEntity]]:
try:
parser = MatrixParser()
html = command_regex.sub(r"<command>\1</command>", html)
html = html.replace("\t", " " * 4)
html = not_command_regex.sub(r"\1", html)
if should_bridge_plaintext_highlights:
html = plain_mention_regex.sub(plain_mention_to_html, html)
parser.feed(add_surrogates(html))
message_text = remove_surrogates(parser.text.strip())
message_entities = parser.entities
message_text, message_entities = cut_long_message(message_text, message_entities)
return message_text, message_entities
except Exception:
log.exception("Failed to convert Matrix format:\nhtml=%s", html)
def matrix_reply_to_telegram(content: dict, tg_space: int, room_id: Optional[str] = None
) -> Optional[int]:
try:
reply = content["m.relates_to"]["m.in_reply_to"]
room_id = room_id or reply["room_id"]
event_id = reply["event_id"]
try:
if content["format"] == "org.matrix.custom.html":
content["formatted_body"] = trim_reply_fallback_html(content["formatted_body"])
except KeyError:
pass
content["body"] = trim_reply_fallback_text(content["body"])
message = DBMessage.query.filter(DBMessage.mxid == event_id,
DBMessage.tg_space == tg_space,
DBMessage.mx_room == room_id).one_or_none()
if message:
return message.tgid
except KeyError:
pass
return None
def matrix_text_to_telegram(text: str) -> Tuple[str, List[TypeMessageEntity]]:
text = command_regex.sub(r"/\1", text)
text = text.replace("\t", " " * 4)
text = not_command_regex.sub(r"\1", text)
if should_bridge_plaintext_highlights:
entities, pmr_replacer = plain_mention_to_text()
text = plain_mention_regex.sub(pmr_replacer, text)
else:
entities = []
return text, entities
def plain_mention_to_text() -> Tuple[List[TypeMessageEntity], Callable[[str], str]]:
entities = []
def replacer(match):
puppet = pu.Puppet.find_by_displayname(match.group(2))
if puppet:
offset = match.start()
length = match.end() - offset
if puppet.username:
entity = MessageEntityMention(offset, length)
text = f"@{puppet.username}"
else:
entity = MessageEntityMentionName(offset, length, user_id=puppet.tgid)
text = puppet.displayname
entities.append(entity)
return text
return "".join(match.groups())
return entities, replacer
def init_mx(context: Context):
global plain_mention_regex, should_bridge_plaintext_highlights
config = context.config
dn_template = config.get("bridge.displayname_template", "{displayname} (Telegram)")
dn_template = re.escape(dn_template).replace(re.escape("{displayname}"), "[^>]+")
plain_mention_regex = re.compile(f"(\s|^)({dn_template})")
should_bridge_plaintext_highlights = config["bridge.plaintext_highlights"] or False
@@ -0,0 +1,52 @@
# -*- coding: future_fstrings -*-
# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2018 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from lxml import etree
from typing import Optional, List, Tuple, Type, Callable, Dict, Any
import math
import re
import logging
from telethon.tl.types import (MessageEntityMention, MessageEntityMentionName, MessageEntityEmail,
MessageEntityUrl, MessageEntityTextUrl, MessageEntityBold,
MessageEntityItalic, MessageEntityCode, MessageEntityPre,
MessageEntityBotCommand, TypeMessageEntity)
from ...context import Context
from ... import user as u, puppet as pu, portal as po
from ...db import Message as DBMessage
from ...formatter.util import (add_surrogates, remove_surrogates, trim_reply_fallback_html,
trim_reply_fallback_text, html_to_unicode)
from .parser_common import MatrixParserCommon, ParsedMessage
class MatrixParser(MatrixParserCommon):
def __init__(self):
self.text = ""
self.entities = []
def parse_node(self, node) -> ParsedMessage:
pass
def feed(self, html: str):
document = etree.parse(html)
self.text, self.entities = self.parse_node(document)
def parse_html(html: str) -> ParsedMessage:
parser = MatrixParser()
parser.feed(html)
return parser.text, parser.entities
+2 -1
View File
@@ -4,11 +4,12 @@ import mautrix_telegram
extras = {
"highlight_edits": ["lxml>=4.1.1,<5"],
"better_formatter": ["lxml>=4.1.1,<5"],
"fast_crypto": ["cryptg>=0.1,<0.2"],
"webp_convert": ["Pillow>=5.0.0,<6"],
"hq_thumbnails": ["moviepy>=0.2,<0.3"],
}
extras["all"] = [deps[0] for deps in extras.values()]
extras["all"] = list(set(deps[0] for deps in extras.values()))
setuptools.setup(
name="mautrix-telegram",