Add type hints to formatter
This commit is contained in:
@@ -1,8 +1,9 @@
|
||||
from .from_matrix import (matrix_reply_to_telegram, matrix_to_telegram, matrix_text_to_telegram,
|
||||
init_mx)
|
||||
from .from_telegram import (telegram_reply_to_matrix, telegram_to_matrix, init_tg)
|
||||
from ..context import Context
|
||||
|
||||
|
||||
def init(context):
|
||||
def init(context: Context):
|
||||
init_mx(context)
|
||||
init_tg(context)
|
||||
|
||||
@@ -17,12 +17,18 @@
|
||||
from html import unescape
|
||||
from html.parser import HTMLParser
|
||||
from collections import deque
|
||||
from typing import Optional, List, Tuple, Type, Callable, Dict
|
||||
import math
|
||||
import re
|
||||
import logging
|
||||
|
||||
from telethon_aio.tl.types import *
|
||||
from telethon_aio.tl.types import (TypeMessageEntity, MessageEntityMention,
|
||||
InputMessageEntityMentionName, MessageEntityEmail,
|
||||
MessageEntityUrl, MessageEntityTextUrl, MessageEntityBold,
|
||||
MessageEntityItalic, MessageEntityCode, MessageEntityPre,
|
||||
MessageEntityBotCommand, InputUser)
|
||||
|
||||
from ..context import Context
|
||||
from .. import user as u, puppet as pu, portal as po
|
||||
from ..db import Message as DBMessage
|
||||
from .util import (add_surrogates, remove_surrogates, trim_reply_fallback_html,
|
||||
@@ -51,7 +57,8 @@ class MatrixParser(HTMLParser):
|
||||
self._line_is_new = True
|
||||
self._list_entry_is_new = False
|
||||
|
||||
def _parse_url(self, url, args):
|
||||
def _parse_url(self, url: str, args: Dict[str, str]
|
||||
) -> Tuple[Optional[Type[TypeMessageEntity]], Optional[str]]:
|
||||
mention = self.mention_regex.match(url)
|
||||
if mention:
|
||||
mxid = mention.group(1)
|
||||
@@ -80,7 +87,7 @@ class MatrixParser(HTMLParser):
|
||||
args["url"] = url
|
||||
return MessageEntityTextUrl, None
|
||||
|
||||
def handle_starttag(self, tag, attrs):
|
||||
def handle_starttag(self, tag: str, attrs: List[Tuple[str, str]]):
|
||||
self._open_tags.appendleft(tag)
|
||||
self._open_tags_meta.appendleft(0)
|
||||
|
||||
@@ -127,7 +134,7 @@ class MatrixParser(HTMLParser):
|
||||
self._building_entities[tag] = entity_type(offset=offset, length=0, **args)
|
||||
|
||||
@property
|
||||
def _list_indent(self):
|
||||
def _list_indent(self) -> int:
|
||||
indent = 0
|
||||
first_skipped = False
|
||||
for index, tag in enumerate(self._open_tags):
|
||||
@@ -143,7 +150,7 @@ class MatrixParser(HTMLParser):
|
||||
indent += 3
|
||||
return indent
|
||||
|
||||
def _newline(self, allow_multi=False):
|
||||
def _newline(self, allow_multi: bool = False):
|
||||
if self._line_is_new and not allow_multi:
|
||||
return
|
||||
self.text += "\n"
|
||||
@@ -151,7 +158,7 @@ class MatrixParser(HTMLParser):
|
||||
for entity in self._building_entities.values():
|
||||
entity.length += 1
|
||||
|
||||
def _handle_special_previous_tags(self, text):
|
||||
def _handle_special_previous_tags(self, text: str) -> str:
|
||||
if "pre" not in self._open_tags and "code" not in self._open_tags:
|
||||
text = text.replace("\n", "")
|
||||
else:
|
||||
@@ -166,7 +173,7 @@ class MatrixParser(HTMLParser):
|
||||
text = f"/{text}"
|
||||
return text
|
||||
|
||||
def _html_to_unicode(self, text):
|
||||
def _html_to_unicode(self, text: str) -> str:
|
||||
strikethrough, underline = "del" in self._open_tags, "u" in self._open_tags
|
||||
if strikethrough and underline:
|
||||
text = html_to_unicode(text, "\u0336\u0332")
|
||||
@@ -176,7 +183,7 @@ class MatrixParser(HTMLParser):
|
||||
text = html_to_unicode(text, "\u0332")
|
||||
return text
|
||||
|
||||
def _handle_tags_for_data(self, text):
|
||||
def _handle_tags_for_data(self, text: str) -> Tuple[str, int]:
|
||||
extra_offset = 0
|
||||
list_entry_handled_once = False
|
||||
# In order to maintain order of things like blockquotes in lists or lists in blockquotes,
|
||||
@@ -207,12 +214,12 @@ class MatrixParser(HTMLParser):
|
||||
list_entry_handled_once = True
|
||||
return text, extra_offset
|
||||
|
||||
def _extend_entities_in_construction(self, text, extra_offset):
|
||||
def _extend_entities_in_construction(self, text: str, extra_offset: int):
|
||||
for tag, entity in self._building_entities.items():
|
||||
entity.length += len(text) - extra_offset
|
||||
entity.offset += extra_offset
|
||||
|
||||
def handle_data(self, text):
|
||||
def handle_data(self, text: str):
|
||||
text = unescape(text)
|
||||
text = self._handle_special_previous_tags(text)
|
||||
text = self._html_to_unicode(text)
|
||||
@@ -221,7 +228,7 @@ class MatrixParser(HTMLParser):
|
||||
self._line_is_new = False
|
||||
self.text += text
|
||||
|
||||
def handle_endtag(self, tag):
|
||||
def handle_endtag(self, tag: str):
|
||||
try:
|
||||
self._open_tags.popleft()
|
||||
self._open_tags_meta.popleft()
|
||||
@@ -250,7 +257,7 @@ def plain_mention_to_html(match):
|
||||
return "".join(match.groups())
|
||||
|
||||
|
||||
def matrix_to_telegram(html):
|
||||
def matrix_to_telegram(html: str) -> Tuple[str, List[TypeMessageEntity]]:
|
||||
try:
|
||||
parser = MatrixParser()
|
||||
html = command_regex.sub(r"\1<command>\2</command>", html)
|
||||
@@ -263,7 +270,8 @@ def matrix_to_telegram(html):
|
||||
log.exception("Failed to convert Matrix format:\nhtml=%s", html)
|
||||
|
||||
|
||||
def matrix_reply_to_telegram(content, tg_space, room_id=None):
|
||||
def matrix_reply_to_telegram(content: dict, tg_space: int, room_id: Optional[str] = None
|
||||
) -> Optional[int]:
|
||||
try:
|
||||
reply = content["m.relates_to"]["m.in_reply_to"]
|
||||
room_id = room_id or reply["room_id"]
|
||||
@@ -286,7 +294,7 @@ def matrix_reply_to_telegram(content, tg_space, room_id=None):
|
||||
return None
|
||||
|
||||
|
||||
def matrix_text_to_telegram(text):
|
||||
def matrix_text_to_telegram(text: str) -> Tuple[str, List[TypeMessageEntity]]:
|
||||
text = command_regex.sub(r"\1/\2", text)
|
||||
if should_bridge_plaintext_highlights:
|
||||
entities, pmr_replacer = plain_mention_to_text()
|
||||
@@ -296,7 +304,7 @@ def matrix_text_to_telegram(text):
|
||||
return text, entities
|
||||
|
||||
|
||||
def plain_mention_to_text():
|
||||
def plain_mention_to_text() -> Tuple[List[TypeMessageEntity], Callable[[str], str]]:
|
||||
entities = []
|
||||
|
||||
def replacer(match):
|
||||
@@ -318,7 +326,7 @@ def plain_mention_to_text():
|
||||
return entities, replacer
|
||||
|
||||
|
||||
def init_mx(context):
|
||||
def init_mx(context: Context):
|
||||
global plain_mention_regex, should_bridge_plaintext_highlights
|
||||
config = context.config
|
||||
dn_template = config.get("bridge.displayname_template", "{displayname} (Telegram)")
|
||||
|
||||
@@ -15,6 +15,8 @@
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
from html import escape
|
||||
from typing import Optional, List, Tuple
|
||||
|
||||
try:
|
||||
from lxml.html.diff import htmldiff
|
||||
except ImportError:
|
||||
@@ -22,10 +24,16 @@ except ImportError:
|
||||
import logging
|
||||
import re
|
||||
|
||||
from telethon_aio.tl.types import *
|
||||
from telethon_aio.tl.types import (MessageEntityMention, MessageEntityMentionName,
|
||||
MessageEntityEmail, MessageEntityUrl, MessageEntityTextUrl,
|
||||
MessageEntityBold, MessageEntityItalic, MessageEntityCode,
|
||||
MessageEntityPre, MessageEntityBotCommand, Message, PeerChannel,
|
||||
MessageEntityHashtag, TypeMessageEntity)
|
||||
from mautrix_appservice import MatrixRequestError
|
||||
from mautrix_appservice.intent_api import IntentAPI
|
||||
|
||||
from .. import user as u, puppet as pu, portal as po
|
||||
from ..context import Context
|
||||
from ..db import Message as DBMessage
|
||||
from .util import (add_surrogates, remove_surrogates, trim_reply_fallback_html,
|
||||
trim_reply_fallback_text, unicode_to_html)
|
||||
@@ -34,7 +42,7 @@ log = logging.getLogger("mau.fmt.tg")
|
||||
should_highlight_edits = False
|
||||
|
||||
|
||||
def telegram_reply_to_matrix(evt, source):
|
||||
def telegram_reply_to_matrix(evt: Message, source: u.User) -> dict:
|
||||
if evt.reply_to_msg_id:
|
||||
space = (evt.to_id.channel_id
|
||||
if isinstance(evt, Message) and isinstance(evt.to_id, PeerChannel)
|
||||
@@ -50,7 +58,8 @@ def telegram_reply_to_matrix(evt, source):
|
||||
return {}
|
||||
|
||||
|
||||
async def _add_forward_header(source, text, html, fwd_from_id):
|
||||
async def _add_forward_header(source, text: str, html: Optional[str],
|
||||
fwd_from_id: Optional[int]) -> Tuple[str, str]:
|
||||
if not html:
|
||||
html = escape(text)
|
||||
user = u.User.get_by_tgid(fwd_from_id)
|
||||
@@ -74,7 +83,7 @@ async def _add_forward_header(source, text, html, fwd_from_id):
|
||||
return text, html
|
||||
|
||||
|
||||
def highlight_edits(new_html, old_html):
|
||||
def highlight_edits(new_html: str, old_html: str) -> str:
|
||||
# Don't include `Edit:` text in diff.
|
||||
if old_html.startswith("<u>Edit:</u> "):
|
||||
old_html = old_html[len("<u>Edit:</u> "):]
|
||||
@@ -89,7 +98,8 @@ def highlight_edits(new_html, old_html):
|
||||
return new_html
|
||||
|
||||
|
||||
async def _add_reply_header(source, text, html, evt, relates_to, main_intent, is_edit):
|
||||
async def _add_reply_header(source: u.User, text: str, html: str, evt: Message, relates_to: dict,
|
||||
main_intent: IntentAPI, is_edit: bool) -> Tuple[str, str]:
|
||||
space = (evt.to_id.channel_id
|
||||
if isinstance(evt, Message) and isinstance(evt.to_id, PeerChannel)
|
||||
else source.tgid)
|
||||
@@ -145,8 +155,9 @@ async def _add_reply_header(source, text, html, evt, relates_to, main_intent, is
|
||||
return text_with_quote, html
|
||||
|
||||
|
||||
async def telegram_to_matrix(evt, source, main_intent=None, is_edit=False, prefix_text=None,
|
||||
prefix_html=None):
|
||||
async def telegram_to_matrix(evt: Message, source: u.User, main_intent: Optional[IntentAPI] = None,
|
||||
is_edit: bool = False, prefix_text: Optional[str] = None,
|
||||
prefix_html: Optional[str] = None) -> Tuple[str, str, dict]:
|
||||
text = add_surrogates(evt.message)
|
||||
html = _telegram_entities_to_matrix_catch(text, evt.entities) if evt.entities else None
|
||||
relates_to = {}
|
||||
@@ -178,7 +189,7 @@ async def telegram_to_matrix(evt, source, main_intent=None, is_edit=False, prefi
|
||||
return remove_surrogates(text), remove_surrogates(html), relates_to
|
||||
|
||||
|
||||
def _telegram_entities_to_matrix_catch(text, entities):
|
||||
def _telegram_entities_to_matrix_catch(text: str, entities: List[TypeMessageEntity]) -> str:
|
||||
try:
|
||||
return _telegram_entities_to_matrix(text, entities)
|
||||
except Exception:
|
||||
@@ -188,7 +199,7 @@ def _telegram_entities_to_matrix_catch(text, entities):
|
||||
text, entities)
|
||||
|
||||
|
||||
def _telegram_entities_to_matrix(text, entities):
|
||||
def _telegram_entities_to_matrix(text: str, entities: List[TypeMessageEntity]) -> str:
|
||||
if not entities:
|
||||
return text
|
||||
html = []
|
||||
@@ -232,7 +243,7 @@ def _telegram_entities_to_matrix(text, entities):
|
||||
return "".join(html)
|
||||
|
||||
|
||||
def _parse_pre(html, entity_text, language):
|
||||
def _parse_pre(html: List[str], entity_text: str, language: str) -> bool:
|
||||
if language:
|
||||
html.append("<pre>"
|
||||
f"<code class='language-{language}'>{entity_text}</code>"
|
||||
@@ -242,7 +253,7 @@ def _parse_pre(html, entity_text, language):
|
||||
return False
|
||||
|
||||
|
||||
def _parse_mention(html, entity_text):
|
||||
def _parse_mention(html: List[str], entity_text: str) -> bool:
|
||||
username = entity_text[1:]
|
||||
|
||||
user = u.User.find_by_username(username) or pu.Puppet.find_by_username(username)
|
||||
@@ -259,7 +270,7 @@ def _parse_mention(html, entity_text):
|
||||
return False
|
||||
|
||||
|
||||
def _parse_name_mention(html, entity_text, user_id):
|
||||
def _parse_name_mention(html: List[str], entity_text: str, user_id: int) -> bool:
|
||||
user = u.User.get_by_tgid(user_id)
|
||||
if user:
|
||||
mxid = user.mxid
|
||||
@@ -273,7 +284,7 @@ def _parse_name_mention(html, entity_text, user_id):
|
||||
return False
|
||||
|
||||
|
||||
def _parse_url(html, entity_text, url):
|
||||
def _parse_url(html: List[str], entity_text: str, url: str) -> bool:
|
||||
url = escape(url) if url else entity_text
|
||||
if not url.startswith(("https://", "http://", "ftp://", "magnet://")):
|
||||
url = "http://" + url
|
||||
@@ -281,6 +292,6 @@ def _parse_url(html, entity_text, url):
|
||||
return False
|
||||
|
||||
|
||||
def init_tg(context):
|
||||
def init_tg(context: Context):
|
||||
global should_highlight_edits
|
||||
should_highlight_edits = htmldiff and context.config["bridge.highlight_edits"]
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
from html import escape
|
||||
from typing import Optional
|
||||
import struct
|
||||
import re
|
||||
|
||||
@@ -22,20 +23,20 @@ import re
|
||||
# add_surrogates and remove_surrogates are unicode surrogate utility functions from Telethon.
|
||||
# Licensed under the MIT license.
|
||||
# https://github.com/LonamiWebs/Telethon/blob/master/telethon/extensions/markdown.py
|
||||
def add_surrogates(text):
|
||||
def add_surrogates(text: Optional[str]) -> Optional[str]:
|
||||
if text is None:
|
||||
return None
|
||||
return "".join("".join(chr(y) for y in struct.unpack("<HH", x.encode("utf-16-le")))
|
||||
if (0x10000 <= ord(x) <= 0x10FFFF) else x for x in text)
|
||||
|
||||
|
||||
def remove_surrogates(text):
|
||||
def remove_surrogates(text: Optional[str]) -> Optional[str]:
|
||||
if text is None:
|
||||
return None
|
||||
return text.encode("utf-16", "surrogatepass").decode("utf-16")
|
||||
|
||||
|
||||
def trim_reply_fallback_text(text):
|
||||
def trim_reply_fallback_text(text: str) -> str:
|
||||
if not text.startswith("> ") or "\n" not in text:
|
||||
return text
|
||||
lines = text.split("\n")
|
||||
@@ -44,14 +45,14 @@ def trim_reply_fallback_text(text):
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
HTML_REPLY_FALLBACK_REGEX = re.compile(r"^<blockquote data-mx-reply>[\s\S]+?</blockquote>")
|
||||
html_reply_fallback_regex = re.compile(r"^<blockquote data-mx-reply>[\s\S]+?</blockquote>")
|
||||
|
||||
|
||||
def trim_reply_fallback_html(html):
|
||||
return HTML_REPLY_FALLBACK_REGEX.sub("", html)
|
||||
def trim_reply_fallback_html(html: str) -> str:
|
||||
return html_reply_fallback_regex.sub("", html)
|
||||
|
||||
|
||||
def unicode_to_html(text, html, ctrl, tag):
|
||||
def unicode_to_html(text: str, html: str, ctrl: str, tag: str) -> str:
|
||||
if ctrl not in text:
|
||||
return html
|
||||
if not html:
|
||||
@@ -79,5 +80,5 @@ def unicode_to_html(text, html, ctrl, tag):
|
||||
return html
|
||||
|
||||
|
||||
def html_to_unicode(text, ctrl):
|
||||
def html_to_unicode(text: str, ctrl: str) -> str:
|
||||
return ctrl.join(text) + ctrl
|
||||
|
||||
Reference in New Issue
Block a user