From 828047e27206932fdf331975da35f0335a220c6c Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Fri, 28 Sep 2018 00:49:37 +0300 Subject: [PATCH] Split TelegramMessage helper to separate file --- .../formatter/from_matrix/parser_lxml.py | 145 +--------------- .../formatter/from_matrix/telegram_message.py | 157 ++++++++++++++++++ 2 files changed, 161 insertions(+), 141 deletions(-) create mode 100644 mautrix_telegram/formatter/from_matrix/telegram_message.py diff --git a/mautrix_telegram/formatter/from_matrix/parser_lxml.py b/mautrix_telegram/formatter/from_matrix/parser_lxml.py index 5bfd295d..d95fc3e8 100644 --- a/mautrix_telegram/formatter/from_matrix/parser_lxml.py +++ b/mautrix_telegram/formatter/from_matrix/parser_lxml.py @@ -14,163 +14,26 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from typing import Callable, List, Optional, Sequence, Tuple, Type, Union +from typing import List, Tuple from lxml import html -from telethon.tl.types import (MessageEntityMention as Mention, +from telethon.tl.types import (MessageEntityMention as Mention, MessageEntityBotCommand as Command, MessageEntityMentionName as MentionName, MessageEntityEmail as Email, MessageEntityUrl as URL, MessageEntityTextUrl as TextURL, MessageEntityBold as Bold, MessageEntityItalic as Italic, - MessageEntityCode as Code, MessageEntityPre as Pre, - MessageEntityBotCommand as Command, TypeMessageEntity, - InputMessageEntityMentionName as InputMentionName) + MessageEntityCode as Code, MessageEntityPre as Pre) from ... import user as u, puppet as pu, portal as po from ...types import MatrixUserID from ..util import html_to_unicode from .parser_common import MatrixParserCommon, ParsedMessage +from .telegram_message import TelegramMessage, Entity, offset_length_multiply def parse_html(input_html: str) -> ParsedMessage: return MatrixParser.parse(input_html) -class Entity: - @staticmethod - def copy(entity: TypeMessageEntity) -> Optional[TypeMessageEntity]: - if not entity: - return None - kwargs = { - "offset": entity.offset, - "length": entity.length, - } - if isinstance(entity, Pre): - kwargs["language"] = entity.language - elif isinstance(entity, TextURL): - kwargs["url"] = entity.url - elif isinstance(entity, (MentionName, InputMentionName)): - kwargs["user_id"] = entity.user_id - return entity.__class__(**kwargs) - - @classmethod - def adjust(cls, entity: Union[TypeMessageEntity, List[TypeMessageEntity]], - func: Callable[[TypeMessageEntity], None] - ) -> Union[Optional[TypeMessageEntity], List[TypeMessageEntity]]: - if isinstance(entity, list): - return [Entity.adjust(element, func) for element in entity if entity] - elif not entity: - return None - entity = cls.copy(entity) - func(entity) - if entity.offset < 0: - entity.length += entity.offset - entity.offset = 0 - return entity - - -def offset_diff(amount: int): - def func(entity: TypeMessageEntity): - entity.offset += amount - - return func - - -def offset_length_multiply(amount: int): - def func(entity: TypeMessageEntity): - entity.offset *= amount - entity.length *= amount - - return func - - -class TelegramMessage: - def __init__(self, text: str = "", entities: Optional[List[TypeMessageEntity]] = None) -> None: - self.text = text # type: str - self.entities = entities or [] # type: List[TypeMessageEntity] - - def offset_entities(self, offset: int) -> 'TelegramMessage': - def apply_offset(entity: TypeMessageEntity, inner_offset: int - ) -> Optional[TypeMessageEntity]: - entity = Entity.copy(entity) - entity.offset += inner_offset - if entity.offset < 0: - entity.offset = 0 - elif entity.offset > len(self.text): - return None - elif entity.offset + entity.length > len(self.text): - entity.length = len(self.text) - entity.offset - return entity - - self.entities = [apply_offset(entity, offset) for entity in self.entities if entity] - self.entities = [x for x in self.entities if x is not None] - return self - - def append(self, *args: Union[str, 'TelegramMessage']) -> 'TelegramMessage': - for msg in args: - if isinstance(msg, str): - msg = TelegramMessage(text=msg) - self.entities += Entity.adjust(msg.entities, offset_diff(len(self.text))) - self.text += msg.text - return self - - def prepend(self, *args: Union[str, 'TelegramMessage']) -> 'TelegramMessage': - for msg in args: - if isinstance(msg, str): - msg = TelegramMessage(text=msg) - self.entities = msg.entities + Entity.adjust(self.entities, offset_diff(len(msg.text))) - self.text = msg.text + self.text - return self - - def format(self, entity_type: Type[TypeMessageEntity], offset: int = None, length: int = None, - **kwargs) -> 'TelegramMessage': - self.entities.append(entity_type(offset=offset or 0, - length=length if length is not None else len(self.text), - **kwargs)) - return self - - def concat(self, *args: Union[str, 'TelegramMessage']) -> 'TelegramMessage': - return TelegramMessage().append(self, *args) - - def trim(self) -> 'TelegramMessage': - orig_len = len(self.text) - self.text = self.text.lstrip() - diff = orig_len - len(self.text) - self.text = self.text.rstrip() - self.offset_entities(-diff) - return self - - def split(self, separator, max_items: int = 0) -> List['TelegramMessage']: - text_parts = self.text.split(separator, max_items - 1) - output = [] # type: List[TelegramMessage] - - offset = 0 - for part in text_parts: - msg = TelegramMessage(part) - for entity in self.entities: - start_in_range = len(part) > entity.offset - offset >= 0 - end_in_range = len(part) >= entity.offset - offset + entity.length > 0 - if start_in_range and end_in_range: - msg.entities.append(Entity.adjust(entity, offset_diff(-offset))) - output.append(msg) - - offset += len(part) - offset += len(separator) - - return output - - @staticmethod - def join(items: Sequence[Union[str, 'TelegramMessage']], - separator: str = " ") -> 'TelegramMessage': - main = TelegramMessage() - for msg in items: - if isinstance(msg, str): - msg = TelegramMessage(text=msg) - main.entities += Entity.adjust(msg.entities, offset_diff(len(main.text))) - main.text += msg.text + separator - main.text = main.text[:-len(separator)] - return main - - class RecursionContext: strip_linebreaks: bool ul_depth: int diff --git a/mautrix_telegram/formatter/from_matrix/telegram_message.py b/mautrix_telegram/formatter/from_matrix/telegram_message.py new file mode 100644 index 00000000..c849cc00 --- /dev/null +++ b/mautrix_telegram/formatter/from_matrix/telegram_message.py @@ -0,0 +1,157 @@ +# -*- coding: future_fstrings -*- +# mautrix-telegram - A Matrix-Telegram puppeting bridge +# Copyright (C) 2018 Tulir Asokan +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +from typing import Callable, List, Optional, Sequence, Type, Union + +from telethon.tl.types import (MessageEntityMentionName as MentionName, + MessageEntityTextUrl as TextURL, MessageEntityPre as Pre, + TypeMessageEntity, InputMessageEntityMentionName as InputMentionName) + + +class Entity: + @staticmethod + def copy(entity: TypeMessageEntity) -> Optional[TypeMessageEntity]: + if not entity: + return None + kwargs = { + "offset": entity.offset, + "length": entity.length, + } + if isinstance(entity, Pre): + kwargs["language"] = entity.language + elif isinstance(entity, TextURL): + kwargs["url"] = entity.url + elif isinstance(entity, (MentionName, InputMentionName)): + kwargs["user_id"] = entity.user_id + return entity.__class__(**kwargs) + + @classmethod + def adjust(cls, entity: Union[TypeMessageEntity, List[TypeMessageEntity]], + func: Callable[[TypeMessageEntity], None] + ) -> Union[Optional[TypeMessageEntity], List[TypeMessageEntity]]: + if isinstance(entity, list): + return [Entity.adjust(element, func) for element in entity if entity] + elif not entity: + return None + entity = cls.copy(entity) + func(entity) + if entity.offset < 0: + entity.length += entity.offset + entity.offset = 0 + return entity + + +def offset_diff(amount: int) -> Callable[[TypeMessageEntity], None]: + def func(entity: TypeMessageEntity) -> None: + entity.offset += amount + + return func + + +def offset_length_multiply(amount: int) -> Callable[[TypeMessageEntity], None]: + def func(entity: TypeMessageEntity) -> None: + entity.offset *= amount + entity.length *= amount + + return func + + +class TelegramMessage: + def __init__(self, text: str = "", entities: Optional[List[TypeMessageEntity]] = None) -> None: + self.text = text # type: str + self.entities = entities or [] # type: List[TypeMessageEntity] + + def offset_entities(self, offset: int) -> 'TelegramMessage': + def apply_offset(entity: TypeMessageEntity, inner_offset: int + ) -> Optional[TypeMessageEntity]: + entity = Entity.copy(entity) + entity.offset += inner_offset + if entity.offset < 0: + entity.offset = 0 + elif entity.offset > len(self.text): + return None + elif entity.offset + entity.length > len(self.text): + entity.length = len(self.text) - entity.offset + return entity + + self.entities = [apply_offset(entity, offset) for entity in self.entities if entity] + self.entities = [x for x in self.entities if x is not None] + return self + + def append(self, *args: Union[str, 'TelegramMessage']) -> 'TelegramMessage': + for msg in args: + if isinstance(msg, str): + msg = TelegramMessage(text=msg) + self.entities += Entity.adjust(msg.entities, offset_diff(len(self.text))) + self.text += msg.text + return self + + def prepend(self, *args: Union[str, 'TelegramMessage']) -> 'TelegramMessage': + for msg in args: + if isinstance(msg, str): + msg = TelegramMessage(text=msg) + self.entities = msg.entities + Entity.adjust(self.entities, offset_diff(len(msg.text))) + self.text = msg.text + self.text + return self + + def format(self, entity_type: Type[TypeMessageEntity], offset: int = None, length: int = None, + **kwargs) -> 'TelegramMessage': + self.entities.append(entity_type(offset=offset or 0, + length=length if length is not None else len(self.text), + **kwargs)) + return self + + def concat(self, *args: Union[str, 'TelegramMessage']) -> 'TelegramMessage': + return TelegramMessage().append(self, *args) + + def trim(self) -> 'TelegramMessage': + orig_len = len(self.text) + self.text = self.text.lstrip() + diff = orig_len - len(self.text) + self.text = self.text.rstrip() + self.offset_entities(-diff) + return self + + def split(self, separator, max_items: int = 0) -> List['TelegramMessage']: + text_parts = self.text.split(separator, max_items - 1) + output = [] # type: List[TelegramMessage] + + offset = 0 + for part in text_parts: + msg = TelegramMessage(part) + for entity in self.entities: + start_in_range = len(part) > entity.offset - offset >= 0 + end_in_range = len(part) >= entity.offset - offset + entity.length > 0 + if start_in_range and end_in_range: + msg.entities.append(Entity.adjust(entity, offset_diff(-offset))) + output.append(msg) + + offset += len(part) + offset += len(separator) + + return output + + @staticmethod + def join(items: Sequence[Union[str, 'TelegramMessage']], + separator: str = " ") -> 'TelegramMessage': + main = TelegramMessage() + for msg in items: + if isinstance(msg, str): + msg = TelegramMessage(text=msg) + main.entities += Entity.adjust(msg.entities, offset_diff(len(main.text))) + main.text += msg.text + separator + main.text = main.text[:-len(separator)] + return main