diff --git a/mautrix_telegram/formatter/from_matrix/parser_lxml.py b/mautrix_telegram/formatter/from_matrix/parser_lxml.py
index 5bfd295d..d95fc3e8 100644
--- a/mautrix_telegram/formatter/from_matrix/parser_lxml.py
+++ b/mautrix_telegram/formatter/from_matrix/parser_lxml.py
@@ -14,163 +14,26 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
-from typing import Callable, List, Optional, Sequence, Tuple, Type, Union
+from typing import List, Tuple
from lxml import html
-from telethon.tl.types import (MessageEntityMention as Mention,
+from telethon.tl.types import (MessageEntityMention as Mention, MessageEntityBotCommand as Command,
MessageEntityMentionName as MentionName, MessageEntityEmail as Email,
MessageEntityUrl as URL, MessageEntityTextUrl as TextURL,
MessageEntityBold as Bold, MessageEntityItalic as Italic,
- MessageEntityCode as Code, MessageEntityPre as Pre,
- MessageEntityBotCommand as Command, TypeMessageEntity,
- InputMessageEntityMentionName as InputMentionName)
+ MessageEntityCode as Code, MessageEntityPre as Pre)
from ... import user as u, puppet as pu, portal as po
from ...types import MatrixUserID
from ..util import html_to_unicode
from .parser_common import MatrixParserCommon, ParsedMessage
+from .telegram_message import TelegramMessage, Entity, offset_length_multiply
def parse_html(input_html: str) -> ParsedMessage:
return MatrixParser.parse(input_html)
-class Entity:
- @staticmethod
- def copy(entity: TypeMessageEntity) -> Optional[TypeMessageEntity]:
- if not entity:
- return None
- kwargs = {
- "offset": entity.offset,
- "length": entity.length,
- }
- if isinstance(entity, Pre):
- kwargs["language"] = entity.language
- elif isinstance(entity, TextURL):
- kwargs["url"] = entity.url
- elif isinstance(entity, (MentionName, InputMentionName)):
- kwargs["user_id"] = entity.user_id
- return entity.__class__(**kwargs)
-
- @classmethod
- def adjust(cls, entity: Union[TypeMessageEntity, List[TypeMessageEntity]],
- func: Callable[[TypeMessageEntity], None]
- ) -> Union[Optional[TypeMessageEntity], List[TypeMessageEntity]]:
- if isinstance(entity, list):
- return [Entity.adjust(element, func) for element in entity if entity]
- elif not entity:
- return None
- entity = cls.copy(entity)
- func(entity)
- if entity.offset < 0:
- entity.length += entity.offset
- entity.offset = 0
- return entity
-
-
-def offset_diff(amount: int):
- def func(entity: TypeMessageEntity):
- entity.offset += amount
-
- return func
-
-
-def offset_length_multiply(amount: int):
- def func(entity: TypeMessageEntity):
- entity.offset *= amount
- entity.length *= amount
-
- return func
-
-
-class TelegramMessage:
- def __init__(self, text: str = "", entities: Optional[List[TypeMessageEntity]] = None) -> None:
- self.text = text # type: str
- self.entities = entities or [] # type: List[TypeMessageEntity]
-
- def offset_entities(self, offset: int) -> 'TelegramMessage':
- def apply_offset(entity: TypeMessageEntity, inner_offset: int
- ) -> Optional[TypeMessageEntity]:
- entity = Entity.copy(entity)
- entity.offset += inner_offset
- if entity.offset < 0:
- entity.offset = 0
- elif entity.offset > len(self.text):
- return None
- elif entity.offset + entity.length > len(self.text):
- entity.length = len(self.text) - entity.offset
- return entity
-
- self.entities = [apply_offset(entity, offset) for entity in self.entities if entity]
- self.entities = [x for x in self.entities if x is not None]
- return self
-
- def append(self, *args: Union[str, 'TelegramMessage']) -> 'TelegramMessage':
- for msg in args:
- if isinstance(msg, str):
- msg = TelegramMessage(text=msg)
- self.entities += Entity.adjust(msg.entities, offset_diff(len(self.text)))
- self.text += msg.text
- return self
-
- def prepend(self, *args: Union[str, 'TelegramMessage']) -> 'TelegramMessage':
- for msg in args:
- if isinstance(msg, str):
- msg = TelegramMessage(text=msg)
- self.entities = msg.entities + Entity.adjust(self.entities, offset_diff(len(msg.text)))
- self.text = msg.text + self.text
- return self
-
- def format(self, entity_type: Type[TypeMessageEntity], offset: int = None, length: int = None,
- **kwargs) -> 'TelegramMessage':
- self.entities.append(entity_type(offset=offset or 0,
- length=length if length is not None else len(self.text),
- **kwargs))
- return self
-
- def concat(self, *args: Union[str, 'TelegramMessage']) -> 'TelegramMessage':
- return TelegramMessage().append(self, *args)
-
- def trim(self) -> 'TelegramMessage':
- orig_len = len(self.text)
- self.text = self.text.lstrip()
- diff = orig_len - len(self.text)
- self.text = self.text.rstrip()
- self.offset_entities(-diff)
- return self
-
- def split(self, separator, max_items: int = 0) -> List['TelegramMessage']:
- text_parts = self.text.split(separator, max_items - 1)
- output = [] # type: List[TelegramMessage]
-
- offset = 0
- for part in text_parts:
- msg = TelegramMessage(part)
- for entity in self.entities:
- start_in_range = len(part) > entity.offset - offset >= 0
- end_in_range = len(part) >= entity.offset - offset + entity.length > 0
- if start_in_range and end_in_range:
- msg.entities.append(Entity.adjust(entity, offset_diff(-offset)))
- output.append(msg)
-
- offset += len(part)
- offset += len(separator)
-
- return output
-
- @staticmethod
- def join(items: Sequence[Union[str, 'TelegramMessage']],
- separator: str = " ") -> 'TelegramMessage':
- main = TelegramMessage()
- for msg in items:
- if isinstance(msg, str):
- msg = TelegramMessage(text=msg)
- main.entities += Entity.adjust(msg.entities, offset_diff(len(main.text)))
- main.text += msg.text + separator
- main.text = main.text[:-len(separator)]
- return main
-
-
class RecursionContext:
strip_linebreaks: bool
ul_depth: int
diff --git a/mautrix_telegram/formatter/from_matrix/telegram_message.py b/mautrix_telegram/formatter/from_matrix/telegram_message.py
new file mode 100644
index 00000000..c849cc00
--- /dev/null
+++ b/mautrix_telegram/formatter/from_matrix/telegram_message.py
@@ -0,0 +1,157 @@
+# -*- coding: future_fstrings -*-
+# mautrix-telegram - A Matrix-Telegram puppeting bridge
+# Copyright (C) 2018 Tulir Asokan
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+from typing import Callable, List, Optional, Sequence, Type, Union
+
+from telethon.tl.types import (MessageEntityMentionName as MentionName,
+ MessageEntityTextUrl as TextURL, MessageEntityPre as Pre,
+ TypeMessageEntity, InputMessageEntityMentionName as InputMentionName)
+
+
+class Entity:
+ @staticmethod
+ def copy(entity: TypeMessageEntity) -> Optional[TypeMessageEntity]:
+ if not entity:
+ return None
+ kwargs = {
+ "offset": entity.offset,
+ "length": entity.length,
+ }
+ if isinstance(entity, Pre):
+ kwargs["language"] = entity.language
+ elif isinstance(entity, TextURL):
+ kwargs["url"] = entity.url
+ elif isinstance(entity, (MentionName, InputMentionName)):
+ kwargs["user_id"] = entity.user_id
+ return entity.__class__(**kwargs)
+
+ @classmethod
+ def adjust(cls, entity: Union[TypeMessageEntity, List[TypeMessageEntity]],
+ func: Callable[[TypeMessageEntity], None]
+ ) -> Union[Optional[TypeMessageEntity], List[TypeMessageEntity]]:
+ if isinstance(entity, list):
+ return [Entity.adjust(element, func) for element in entity if entity]
+ elif not entity:
+ return None
+ entity = cls.copy(entity)
+ func(entity)
+ if entity.offset < 0:
+ entity.length += entity.offset
+ entity.offset = 0
+ return entity
+
+
+def offset_diff(amount: int) -> Callable[[TypeMessageEntity], None]:
+ def func(entity: TypeMessageEntity) -> None:
+ entity.offset += amount
+
+ return func
+
+
+def offset_length_multiply(amount: int) -> Callable[[TypeMessageEntity], None]:
+ def func(entity: TypeMessageEntity) -> None:
+ entity.offset *= amount
+ entity.length *= amount
+
+ return func
+
+
+class TelegramMessage:
+ def __init__(self, text: str = "", entities: Optional[List[TypeMessageEntity]] = None) -> None:
+ self.text = text # type: str
+ self.entities = entities or [] # type: List[TypeMessageEntity]
+
+ def offset_entities(self, offset: int) -> 'TelegramMessage':
+ def apply_offset(entity: TypeMessageEntity, inner_offset: int
+ ) -> Optional[TypeMessageEntity]:
+ entity = Entity.copy(entity)
+ entity.offset += inner_offset
+ if entity.offset < 0:
+ entity.offset = 0
+ elif entity.offset > len(self.text):
+ return None
+ elif entity.offset + entity.length > len(self.text):
+ entity.length = len(self.text) - entity.offset
+ return entity
+
+ self.entities = [apply_offset(entity, offset) for entity in self.entities if entity]
+ self.entities = [x for x in self.entities if x is not None]
+ return self
+
+ def append(self, *args: Union[str, 'TelegramMessage']) -> 'TelegramMessage':
+ for msg in args:
+ if isinstance(msg, str):
+ msg = TelegramMessage(text=msg)
+ self.entities += Entity.adjust(msg.entities, offset_diff(len(self.text)))
+ self.text += msg.text
+ return self
+
+ def prepend(self, *args: Union[str, 'TelegramMessage']) -> 'TelegramMessage':
+ for msg in args:
+ if isinstance(msg, str):
+ msg = TelegramMessage(text=msg)
+ self.entities = msg.entities + Entity.adjust(self.entities, offset_diff(len(msg.text)))
+ self.text = msg.text + self.text
+ return self
+
+ def format(self, entity_type: Type[TypeMessageEntity], offset: int = None, length: int = None,
+ **kwargs) -> 'TelegramMessage':
+ self.entities.append(entity_type(offset=offset or 0,
+ length=length if length is not None else len(self.text),
+ **kwargs))
+ return self
+
+ def concat(self, *args: Union[str, 'TelegramMessage']) -> 'TelegramMessage':
+ return TelegramMessage().append(self, *args)
+
+ def trim(self) -> 'TelegramMessage':
+ orig_len = len(self.text)
+ self.text = self.text.lstrip()
+ diff = orig_len - len(self.text)
+ self.text = self.text.rstrip()
+ self.offset_entities(-diff)
+ return self
+
+ def split(self, separator, max_items: int = 0) -> List['TelegramMessage']:
+ text_parts = self.text.split(separator, max_items - 1)
+ output = [] # type: List[TelegramMessage]
+
+ offset = 0
+ for part in text_parts:
+ msg = TelegramMessage(part)
+ for entity in self.entities:
+ start_in_range = len(part) > entity.offset - offset >= 0
+ end_in_range = len(part) >= entity.offset - offset + entity.length > 0
+ if start_in_range and end_in_range:
+ msg.entities.append(Entity.adjust(entity, offset_diff(-offset)))
+ output.append(msg)
+
+ offset += len(part)
+ offset += len(separator)
+
+ return output
+
+ @staticmethod
+ def join(items: Sequence[Union[str, 'TelegramMessage']],
+ separator: str = " ") -> 'TelegramMessage':
+ main = TelegramMessage()
+ for msg in items:
+ if isinstance(msg, str):
+ msg = TelegramMessage(text=msg)
+ main.entities += Entity.adjust(msg.entities, offset_diff(len(main.text)))
+ main.text += msg.text + separator
+ main.text = main.text[:-len(separator)]
+ return main