# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2021 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see
"
f"{content.formatted_body}
")
if require_html:
force_html()
if prefix_html:
force_html()
content.formatted_body = prefix_html + content.formatted_body
if prefix_text:
content.body = prefix_text + content.body
if getattr(evt, "fwd_from", None):
await _add_forward_header(source, content, evt.fwd_from)
if getattr(evt, "reply_to", None) and not no_reply_fallback:
await _add_reply_header(source, content, evt, main_intent)
if isinstance(evt, Message) and evt.post and evt.post_author:
force_html()
content.body += f"\n- {evt.post_author}"
content.formatted_body += f"
- {evt.post_author}"
return content
async def _telegram_entities_to_matrix_catch(text: str, entities: list[TypeMessageEntity]) -> str:
try:
return await _telegram_entities_to_matrix(text, entities)
except Exception:
log.exception(
"Failed to convert Telegram format:\nmessage=%s\nentities=%s", text, entities
)
return "[failed conversion in _telegram_entities_to_matrix]"
async def _telegram_entities_to_matrix(
text: str,
entities: list[TypeMessageEntity],
offset: int = 0,
length: int = None,
in_codeblock: bool = False,
) -> str:
def text_to_html(
val: str, _in_codeblock: bool = in_codeblock, escape_html: bool = True
) -> str:
if escape_html:
val = escape(val)
if not _in_codeblock:
val = val.replace("\n", "
")
return val
if not entities:
return text_to_html(text)
if length is None:
length = len(text)
html = []
last_offset = 0
for i, entity in enumerate(entities):
if entity.offset > offset + length:
break
relative_offset = entity.offset - offset
if relative_offset > last_offset:
html.append(text_to_html(text[last_offset:relative_offset]))
elif relative_offset < last_offset:
continue
skip_entity = False
is_code_entity = isinstance(entity, (MessageEntityCode, MessageEntityPre))
entity_text = await _telegram_entities_to_matrix(
text=text[relative_offset : relative_offset + entity.length],
entities=entities[i + 1 :],
offset=entity.offset,
length=entity.length,
in_codeblock=is_code_entity,
)
entity_text = text_to_html(entity_text, is_code_entity, escape_html=False)
entity_type = type(entity)
if entity_type == MessageEntityBold:
html.append(f"{entity_text}")
elif entity_type == MessageEntityItalic:
html.append(f"{entity_text}")
elif entity_type == MessageEntityUnderline:
html.append(f"{entity_text}")
elif entity_type == MessageEntityStrike:
html.append(f"{entity_text}")
elif entity_type == MessageEntityBlockquote:
html.append(f"
{entity_text}") elif entity_type == MessageEntityCode: html.append( f"
{entity_text}"
if "\n" in entity_text
else f"{entity_text}"
)
elif entity_type == MessageEntityPre:
skip_entity = _parse_pre(html, entity_text, entity.language)
elif entity_type == MessageEntityMention:
skip_entity = await _parse_mention(html, entity_text)
elif entity_type == MessageEntityMentionName:
skip_entity = await _parse_name_mention(html, entity_text, TelegramID(entity.user_id))
elif entity_type == MessageEntityEmail:
html.append(f"{entity_text}")
elif entity_type in (MessageEntityTextUrl, MessageEntityUrl):
skip_entity = await _parse_url(
html, entity_text, entity.url if entity_type == MessageEntityTextUrl else None
)
elif entity_type in (
MessageEntityBotCommand,
MessageEntityHashtag,
MessageEntityCashtag,
MessageEntityPhone,
):
html.append(f"{entity_text}")
elif entity_type == MessageEntitySpoiler:
html.append(f"{entity_text}")
else:
skip_entity = True
last_offset = relative_offset + (0 if skip_entity else entity.length)
html.append(text_to_html(text[last_offset:]))
return "".join(html)
def _parse_pre(html: list[str], entity_text: str, language: str) -> bool:
if language:
html.append(f"{entity_text}")
else:
html.append(f"{entity_text}")
return False
async def _parse_mention(html: list[str], entity_text: str) -> bool:
username = entity_text[1:]
user = await u.User.find_by_username(username) or await pu.Puppet.find_by_username(username)
if user:
mxid = user.mxid
else:
portal = await po.Portal.find_by_username(username)
mxid = portal.alias or portal.mxid if portal else None
if mxid:
html.append(f"{entity_text}")
else:
return True
return False
async def _parse_name_mention(html: list[str], entity_text: str, user_id: TelegramID) -> bool:
user = await u.User.get_by_tgid(user_id)
if user:
mxid = user.mxid
else:
puppet = await pu.Puppet.get_by_tgid(user_id, create=False)
mxid = puppet.mxid if puppet else None
if mxid:
html.append(f"{entity_text}")
else:
return True
return False
message_link_regex = re.compile(
r"https?://t(?:elegram)?\.(?:me|dog)"
# /username or /c/id
r"/([A-Za-z][A-Za-z0-9_]{3,31}[A-Za-z0-9]|[Cc]/[0-9]{1,20})"
# /messageid
r"/([0-9]{1,20})"
)
async def _parse_url(html: list[str], entity_text: str, url: str) -> bool:
url = escape(url) if url else entity_text
if not url.startswith(("https://", "http://", "ftp://", "magnet://")):
url = "http://" + url
message_link_match = message_link_regex.match(url)
if message_link_match:
group, msgid_str = message_link_match.groups()
msgid = int(msgid_str)
if group.lower().startswith("c/"):
portal = await po.Portal.get_by_tgid(TelegramID(int(group[2:])))
else:
portal = await po.Portal.find_by_username(group)
if portal:
message = await DBMessage.get_one_by_tgid(TelegramID(msgid), portal.tgid)
if message:
url = f"https://matrix.to/#/{portal.mxid}/{message.mxid}"
html.append(f"{entity_text}")
return False