From ae88aa0553655d94ef39cbf549e3a3fd7cc4ddbd Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Sat, 10 Mar 2018 10:23:50 +0200 Subject: [PATCH] Add type hints to formatter --- mautrix_telegram/formatter/__init__.py | 3 +- mautrix_telegram/formatter/from_matrix.py | 40 ++++++++++++--------- mautrix_telegram/formatter/from_telegram.py | 39 ++++++++++++-------- mautrix_telegram/formatter/util.py | 17 ++++----- 4 files changed, 60 insertions(+), 39 deletions(-) diff --git a/mautrix_telegram/formatter/__init__.py b/mautrix_telegram/formatter/__init__.py index 6252455a..7cb102f7 100644 --- a/mautrix_telegram/formatter/__init__.py +++ b/mautrix_telegram/formatter/__init__.py @@ -1,8 +1,9 @@ from .from_matrix import (matrix_reply_to_telegram, matrix_to_telegram, matrix_text_to_telegram, init_mx) from .from_telegram import (telegram_reply_to_matrix, telegram_to_matrix, init_tg) +from ..context import Context -def init(context): +def init(context: Context): init_mx(context) init_tg(context) diff --git a/mautrix_telegram/formatter/from_matrix.py b/mautrix_telegram/formatter/from_matrix.py index ffbc6878..aadc7b26 100644 --- a/mautrix_telegram/formatter/from_matrix.py +++ b/mautrix_telegram/formatter/from_matrix.py @@ -17,12 +17,18 @@ from html import unescape from html.parser import HTMLParser from collections import deque +from typing import Optional, List, Tuple, Type, Callable, Dict import math import re import logging -from telethon_aio.tl.types import * +from telethon_aio.tl.types import (TypeMessageEntity, MessageEntityMention, + InputMessageEntityMentionName, MessageEntityEmail, + MessageEntityUrl, MessageEntityTextUrl, MessageEntityBold, + MessageEntityItalic, MessageEntityCode, MessageEntityPre, + MessageEntityBotCommand, InputUser) +from ..context import Context from .. import user as u, puppet as pu, portal as po from ..db import Message as DBMessage from .util import (add_surrogates, remove_surrogates, trim_reply_fallback_html, @@ -51,7 +57,8 @@ class MatrixParser(HTMLParser): self._line_is_new = True self._list_entry_is_new = False - def _parse_url(self, url, args): + def _parse_url(self, url: str, args: Dict[str, str] + ) -> Tuple[Optional[Type[TypeMessageEntity]], Optional[str]]: mention = self.mention_regex.match(url) if mention: mxid = mention.group(1) @@ -80,7 +87,7 @@ class MatrixParser(HTMLParser): args["url"] = url return MessageEntityTextUrl, None - def handle_starttag(self, tag, attrs): + def handle_starttag(self, tag: str, attrs: List[Tuple[str, str]]): self._open_tags.appendleft(tag) self._open_tags_meta.appendleft(0) @@ -127,7 +134,7 @@ class MatrixParser(HTMLParser): self._building_entities[tag] = entity_type(offset=offset, length=0, **args) @property - def _list_indent(self): + def _list_indent(self) -> int: indent = 0 first_skipped = False for index, tag in enumerate(self._open_tags): @@ -143,7 +150,7 @@ class MatrixParser(HTMLParser): indent += 3 return indent - def _newline(self, allow_multi=False): + def _newline(self, allow_multi: bool = False): if self._line_is_new and not allow_multi: return self.text += "\n" @@ -151,7 +158,7 @@ class MatrixParser(HTMLParser): for entity in self._building_entities.values(): entity.length += 1 - def _handle_special_previous_tags(self, text): + def _handle_special_previous_tags(self, text: str) -> str: if "pre" not in self._open_tags and "code" not in self._open_tags: text = text.replace("\n", "") else: @@ -166,7 +173,7 @@ class MatrixParser(HTMLParser): text = f"/{text}" return text - def _html_to_unicode(self, text): + def _html_to_unicode(self, text: str) -> str: strikethrough, underline = "del" in self._open_tags, "u" in self._open_tags if strikethrough and underline: text = html_to_unicode(text, "\u0336\u0332") @@ -176,7 +183,7 @@ class MatrixParser(HTMLParser): text = html_to_unicode(text, "\u0332") return text - def _handle_tags_for_data(self, text): + def _handle_tags_for_data(self, text: str) -> Tuple[str, int]: extra_offset = 0 list_entry_handled_once = False # In order to maintain order of things like blockquotes in lists or lists in blockquotes, @@ -207,12 +214,12 @@ class MatrixParser(HTMLParser): list_entry_handled_once = True return text, extra_offset - def _extend_entities_in_construction(self, text, extra_offset): + def _extend_entities_in_construction(self, text: str, extra_offset: int): for tag, entity in self._building_entities.items(): entity.length += len(text) - extra_offset entity.offset += extra_offset - def handle_data(self, text): + def handle_data(self, text: str): text = unescape(text) text = self._handle_special_previous_tags(text) text = self._html_to_unicode(text) @@ -221,7 +228,7 @@ class MatrixParser(HTMLParser): self._line_is_new = False self.text += text - def handle_endtag(self, tag): + def handle_endtag(self, tag: str): try: self._open_tags.popleft() self._open_tags_meta.popleft() @@ -250,7 +257,7 @@ def plain_mention_to_html(match): return "".join(match.groups()) -def matrix_to_telegram(html): +def matrix_to_telegram(html: str) -> Tuple[str, List[TypeMessageEntity]]: try: parser = MatrixParser() html = command_regex.sub(r"\1\2", html) @@ -263,7 +270,8 @@ def matrix_to_telegram(html): log.exception("Failed to convert Matrix format:\nhtml=%s", html) -def matrix_reply_to_telegram(content, tg_space, room_id=None): +def matrix_reply_to_telegram(content: dict, tg_space: int, room_id: Optional[str] = None + ) -> Optional[int]: try: reply = content["m.relates_to"]["m.in_reply_to"] room_id = room_id or reply["room_id"] @@ -286,7 +294,7 @@ def matrix_reply_to_telegram(content, tg_space, room_id=None): return None -def matrix_text_to_telegram(text): +def matrix_text_to_telegram(text: str) -> Tuple[str, List[TypeMessageEntity]]: text = command_regex.sub(r"\1/\2", text) if should_bridge_plaintext_highlights: entities, pmr_replacer = plain_mention_to_text() @@ -296,7 +304,7 @@ def matrix_text_to_telegram(text): return text, entities -def plain_mention_to_text(): +def plain_mention_to_text() -> Tuple[List[TypeMessageEntity], Callable[[str], str]]: entities = [] def replacer(match): @@ -318,7 +326,7 @@ def plain_mention_to_text(): return entities, replacer -def init_mx(context): +def init_mx(context: Context): global plain_mention_regex, should_bridge_plaintext_highlights config = context.config dn_template = config.get("bridge.displayname_template", "{displayname} (Telegram)") diff --git a/mautrix_telegram/formatter/from_telegram.py b/mautrix_telegram/formatter/from_telegram.py index 7a2bf442..2ec2d8f9 100644 --- a/mautrix_telegram/formatter/from_telegram.py +++ b/mautrix_telegram/formatter/from_telegram.py @@ -15,6 +15,8 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from html import escape +from typing import Optional, List, Tuple + try: from lxml.html.diff import htmldiff except ImportError: @@ -22,10 +24,16 @@ except ImportError: import logging import re -from telethon_aio.tl.types import * +from telethon_aio.tl.types import (MessageEntityMention, MessageEntityMentionName, + MessageEntityEmail, MessageEntityUrl, MessageEntityTextUrl, + MessageEntityBold, MessageEntityItalic, MessageEntityCode, + MessageEntityPre, MessageEntityBotCommand, Message, PeerChannel, + MessageEntityHashtag, TypeMessageEntity) from mautrix_appservice import MatrixRequestError +from mautrix_appservice.intent_api import IntentAPI from .. import user as u, puppet as pu, portal as po +from ..context import Context from ..db import Message as DBMessage from .util import (add_surrogates, remove_surrogates, trim_reply_fallback_html, trim_reply_fallback_text, unicode_to_html) @@ -34,7 +42,7 @@ log = logging.getLogger("mau.fmt.tg") should_highlight_edits = False -def telegram_reply_to_matrix(evt, source): +def telegram_reply_to_matrix(evt: Message, source: u.User) -> dict: if evt.reply_to_msg_id: space = (evt.to_id.channel_id if isinstance(evt, Message) and isinstance(evt.to_id, PeerChannel) @@ -50,7 +58,8 @@ def telegram_reply_to_matrix(evt, source): return {} -async def _add_forward_header(source, text, html, fwd_from_id): +async def _add_forward_header(source, text: str, html: Optional[str], + fwd_from_id: Optional[int]) -> Tuple[str, str]: if not html: html = escape(text) user = u.User.get_by_tgid(fwd_from_id) @@ -74,7 +83,7 @@ async def _add_forward_header(source, text, html, fwd_from_id): return text, html -def highlight_edits(new_html, old_html): +def highlight_edits(new_html: str, old_html: str) -> str: # Don't include `Edit:` text in diff. if old_html.startswith("Edit: "): old_html = old_html[len("Edit: "):] @@ -89,7 +98,8 @@ def highlight_edits(new_html, old_html): return new_html -async def _add_reply_header(source, text, html, evt, relates_to, main_intent, is_edit): +async def _add_reply_header(source: u.User, text: str, html: str, evt: Message, relates_to: dict, + main_intent: IntentAPI, is_edit: bool) -> Tuple[str, str]: space = (evt.to_id.channel_id if isinstance(evt, Message) and isinstance(evt.to_id, PeerChannel) else source.tgid) @@ -145,8 +155,9 @@ async def _add_reply_header(source, text, html, evt, relates_to, main_intent, is return text_with_quote, html -async def telegram_to_matrix(evt, source, main_intent=None, is_edit=False, prefix_text=None, - prefix_html=None): +async def telegram_to_matrix(evt: Message, source: u.User, main_intent: Optional[IntentAPI] = None, + is_edit: bool = False, prefix_text: Optional[str] = None, + prefix_html: Optional[str] = None) -> Tuple[str, str, dict]: text = add_surrogates(evt.message) html = _telegram_entities_to_matrix_catch(text, evt.entities) if evt.entities else None relates_to = {} @@ -178,7 +189,7 @@ async def telegram_to_matrix(evt, source, main_intent=None, is_edit=False, prefi return remove_surrogates(text), remove_surrogates(html), relates_to -def _telegram_entities_to_matrix_catch(text, entities): +def _telegram_entities_to_matrix_catch(text: str, entities: List[TypeMessageEntity]) -> str: try: return _telegram_entities_to_matrix(text, entities) except Exception: @@ -188,7 +199,7 @@ def _telegram_entities_to_matrix_catch(text, entities): text, entities) -def _telegram_entities_to_matrix(text, entities): +def _telegram_entities_to_matrix(text: str, entities: List[TypeMessageEntity]) -> str: if not entities: return text html = [] @@ -232,7 +243,7 @@ def _telegram_entities_to_matrix(text, entities): return "".join(html) -def _parse_pre(html, entity_text, language): +def _parse_pre(html: List[str], entity_text: str, language: str) -> bool: if language: html.append("
"
                     f"{entity_text}"
@@ -242,7 +253,7 @@ def _parse_pre(html, entity_text, language):
     return False
 
 
-def _parse_mention(html, entity_text):
+def _parse_mention(html: List[str], entity_text: str) -> bool:
     username = entity_text[1:]
 
     user = u.User.find_by_username(username) or pu.Puppet.find_by_username(username)
@@ -259,7 +270,7 @@ def _parse_mention(html, entity_text):
     return False
 
 
-def _parse_name_mention(html, entity_text, user_id):
+def _parse_name_mention(html: List[str], entity_text: str, user_id: int) -> bool:
     user = u.User.get_by_tgid(user_id)
     if user:
         mxid = user.mxid
@@ -273,7 +284,7 @@ def _parse_name_mention(html, entity_text, user_id):
     return False
 
 
-def _parse_url(html, entity_text, url):
+def _parse_url(html: List[str], entity_text: str, url: str) -> bool:
     url = escape(url) if url else entity_text
     if not url.startswith(("https://", "http://", "ftp://", "magnet://")):
         url = "http://" + url
@@ -281,6 +292,6 @@ def _parse_url(html, entity_text, url):
     return False
 
 
-def init_tg(context):
+def init_tg(context: Context):
     global should_highlight_edits
     should_highlight_edits = htmldiff and context.config["bridge.highlight_edits"]
diff --git a/mautrix_telegram/formatter/util.py b/mautrix_telegram/formatter/util.py
index a9806d53..f10146b4 100644
--- a/mautrix_telegram/formatter/util.py
+++ b/mautrix_telegram/formatter/util.py
@@ -15,6 +15,7 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see .
 from html import escape
+from typing import Optional
 import struct
 import re
 
@@ -22,20 +23,20 @@ import re
 # add_surrogates and remove_surrogates are unicode surrogate utility functions from Telethon.
 # Licensed under the MIT license.
 # https://github.com/LonamiWebs/Telethon/blob/master/telethon/extensions/markdown.py
-def add_surrogates(text):
+def add_surrogates(text: Optional[str]) -> Optional[str]:
     if text is None:
         return None
     return "".join("".join(chr(y) for y in struct.unpack(" Optional[str]:
     if text is None:
         return None
     return text.encode("utf-16", "surrogatepass").decode("utf-16")
 
 
-def trim_reply_fallback_text(text):
+def trim_reply_fallback_text(text: str) -> str:
     if not text.startswith("> ") or "\n" not in text:
         return text
     lines = text.split("\n")
@@ -44,14 +45,14 @@ def trim_reply_fallback_text(text):
     return "\n".join(lines)
 
 
-HTML_REPLY_FALLBACK_REGEX = re.compile(r"^
[\s\S]+?
") +html_reply_fallback_regex = re.compile(r"^
[\s\S]+?
") -def trim_reply_fallback_html(html): - return HTML_REPLY_FALLBACK_REGEX.sub("", html) +def trim_reply_fallback_html(html: str) -> str: + return html_reply_fallback_regex.sub("", html) -def unicode_to_html(text, html, ctrl, tag): +def unicode_to_html(text: str, html: str, ctrl: str, tag: str) -> str: if ctrl not in text: return html if not html: @@ -79,5 +80,5 @@ def unicode_to_html(text, html, ctrl, tag): return html -def html_to_unicode(text, ctrl): +def html_to_unicode(text: str, ctrl: str) -> str: return ctrl.join(text) + ctrl