# -*- coding: future_fstrings -*- # mautrix-telegram - A Matrix-Telegram puppeting bridge # Copyright (C) 2018 Tulir Asokan # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from collections import deque from datetime import datetime from string import Template from html import escape as escape_html import asyncio import random import mimetypes import unicodedata import hashlib import logging import magic from sqlalchemy.exc import IntegrityError, InvalidRequestError from sqlalchemy.orm.exc import FlushError from telethon.tl.functions.messages import * from telethon.tl.functions.channels import * from telethon.errors import * from telethon.tl.types import * from mautrix_appservice import MatrixRequestError, IntentError from .db import Portal as DBPortal, Message as DBMessage from . import puppet as p, user as u, formatter, util mimetypes.init() config = None class Portal: log = logging.getLogger("mau.portal") db = None az = None bot = None loop = None filter_mode = None filter_list = None bridge_notices = False alias_template = None mx_alias_regex = None hs_domain = None by_mxid = {} by_tgid = {} def __init__(self, tgid, peer_type, tg_receiver=None, mxid=None, username=None, megagroup=False, title=None, about=None, photo_id=None, db_instance=None): self.mxid = mxid self.tgid = tgid self.tg_receiver = tg_receiver or tgid self.peer_type = peer_type self.username = username self.megagroup = megagroup self.title = title self.about = about self.photo_id = photo_id self._db_instance = db_instance self._main_intent = None self._room_create_lock = asyncio.Lock() self._temp_pinned_message_id = None self._temp_pinned_message_sender = None self._dedup = deque() self._dedup_mxid = {} self._dedup_action = deque() self._send_locks = {} if tgid: self.by_tgid[self.tgid_full] = self if mxid: self.by_mxid[mxid] = self # region Propegrties @property def tgid_full(self): return self.tgid, self.tg_receiver @property def tgid_log(self): if self.tgid == self.tg_receiver: return self.tgid return f"{self.tg_receiver}<->{self.tgid}" @property def peer(self): if self.peer_type == "user": return PeerUser(user_id=self.tgid) elif self.peer_type == "chat": return PeerChat(chat_id=self.tgid) elif self.peer_type == "channel": return PeerChannel(channel_id=self.tgid) @property def has_bot(self): return self.bot and self.bot.is_in_chat(self.tgid) @property def main_intent(self): if not self._main_intent: direct = self.peer_type == "user" puppet = p.Puppet.get(self.tgid) if direct else None self._main_intent = puppet.intent if direct else self.az.intent return self._main_intent # endregion # region Filtering def allow_bridging(self, tgid=None): tgid = tgid or self.tgid if self.peer_type == "user": return True elif self.filter_mode == "whitelist": return tgid in self.filter_list elif self.filter_mode == "blacklist": return tgid not in self.filter_list return True # endregion # region Deduplication @staticmethod def _hash_event(event): # Non-channel messages are unique per-user (wtf telegram), so we have no other choice than # to deduplicate based on a hash of the message content. # The timestamp is only accurate to the second, so we can't rely solely on that either. if isinstance(event, MessageService): hash_content = [event.date.timestamp(), event.from_id, event.action] else: hash_content = [event.date.timestamp(), event.message] if event.fwd_from: hash_content += [event.fwd_from.from_id, event.fwd_from.channel_id] elif isinstance(event, Message) and event.media: try: hash_content += { MessageMediaContact: lambda media: [media.user_id], MessageMediaDocument: lambda media: [media.document.id], MessageMediaPhoto: lambda media: [media.photo.id], MessageMediaGeo: lambda media: [media.geo.long, media.geo.lat], }[type(event.media)](event.media) except KeyError: pass return hashlib.md5("-" .join(str(a) for a in hash_content) .encode("utf-8") ).hexdigest() def is_duplicate_action(self, event): hash = self._hash_event(event) if self.peer_type != "channel" else event.id if hash in self._dedup_action: return True self._dedup_action.append(hash) if len(self._dedup_action) > 20: self._dedup_action.popleft() return False def update_duplicate(self, event, mxid=None, expected_mxid=None, force_hash=False): hash = self._hash_event(event) if self.peer_type != "channel" or force_hash else event.id try: found_mxid = self._dedup_mxid[hash] except KeyError: return 0, "None" if found_mxid != expected_mxid: return found_mxid self._dedup_mxid[hash] = mxid return None def is_duplicate(self, event, mxid=None, force_hash=False): hash = self._hash_event(event) if self.peer_type != "channel" or force_hash else event.id if hash in self._dedup: return self._dedup_mxid[hash] self._dedup_mxid[hash] = mxid self._dedup.append(hash) if len(self._dedup) > 20: del self._dedup_mxid[self._dedup.popleft()] return None def get_input_entity(self, user): return user.client.get_input_entity(self.peer) # endregion # region Matrix room info updating async def invite_to_matrix(self, users): if isinstance(users, str): await self.main_intent.invite(self.mxid, users, check_cache=True) elif isinstance(users, list): for user in users: await self.main_intent.invite(self.mxid, user, check_cache=True) else: raise ValueError("Invalid invite identifier given to invite_matrix()") async def update_matrix_room(self, user, entity, direct, puppet=None, levels=None, users=None, participants=None): if not direct: await self.update_info(user, entity) if not users or not participants: users, participants = await self._get_users(user, entity) await self.sync_telegram_users(user, users) await self.update_telegram_participants(participants, levels) else: if not puppet: puppet = p.Puppet.get(self.tgid) await puppet.update_info(user, entity) await puppet.intent.join_room(self.mxid) async def create_matrix_room(self, user, entity=None, invites=None, update_if_exists=True, synchronous=False): if self.mxid: if update_if_exists: if not entity: entity = await user.client.get_entity(self.peer) update = self.update_matrix_room(user, entity, self.peer_type == "user") if synchronous: await update else: asyncio.ensure_future(update, loop=self.loop) await self.invite_to_matrix(invites or []) return self.mxid async with self._room_create_lock: return await self._create_matrix_room(user, entity, invites) async def _create_matrix_room(self, user, entity, invites): direct = self.peer_type == "user" if self.mxid: return self.mxid if not self.allow_bridging(): return None if not entity: entity = await user.client.get_entity(self.peer) self.log.debug("Fetched data: %s", entity) self.log.debug(f"Creating room for {self.tgid_log}") try: self.title = entity.title except AttributeError: self.title = None puppet = p.Puppet.get(self.tgid) if direct else None self._main_intent = puppet.intent if direct else self.az.intent if self.peer_type == "channel": self.megagroup = entity.megagroup if self.peer_type == "channel" and entity.username: public = config["bridge.public_portals"] alias = self._get_alias_localpart(entity.username) self.username = entity.username else: public = False # TODO invite link alias? alias = None if alias: # TODO? properly handle existing room aliases await self.main_intent.remove_room_alias(alias) power_levels = self._get_base_power_levels({}, entity) users = participants = None if not direct: users, participants = await self._get_users(user, entity) self._participants_to_power_levels(participants, power_levels) initial_state = [{ "type": "m.room.power_levels", "content": power_levels, }] room_id = await self.main_intent.create_room(alias=alias, is_public=public, is_direct=direct, invitees=invites or [], name=self.title, initial_state=initial_state) if not room_id: raise Exception(f"Failed to create room for {self.tgid_log}") self.mxid = room_id self.by_mxid[self.mxid] = self self.save() self.az.state_store.set_power_levels(self.mxid, power_levels) user.register_portal(self) asyncio.ensure_future(self.update_matrix_room(user, entity, direct, puppet, levels=power_levels, users=users, participants=participants), loop=self.loop) def _get_base_power_levels(self, levels=None, entity=None): levels = levels or {} power_level_requirement = (0 if self.peer_type == "chat" and not entity.admins_enabled else 50) levels["ban"] = 99 levels["invite"] = power_level_requirement if self.peer_type == "chat" else 75 if "events" not in levels: levels["events"] = {} levels["events"]["m.room.name"] = power_level_requirement levels["events"]["m.room.avatar"] = power_level_requirement levels["events"]["m.room.topic"] = 50 if self.peer_type == "channel" else 99 levels["events"]["m.room.power_levels"] = 75 levels["events"]["m.room.history_visibility"] = 75 levels["state_default"] = 50 levels["users_default"] = 0 levels["events_default"] = (50 if self.peer_type == "channel" and not entity.megagroup else 0) if "users" not in levels: levels["users"] = { self.main_intent.mxid: 100 } else: levels["users"][self.main_intent.mxid] = 100 return levels @property def alias(self): if not self.username: return None return f"#{self._get_alias_localpart()}:{self.hs_domain}" def _get_alias_localpart(self, username=None): username = username or self.username if not username: return None return self.alias_template.format(groupname=username) def add_bot_chat(self, entity): if self.bot and entity.id == self.bot.tgid: self.bot.add_chat(self.tgid, self.peer_type) return user = u.User.get_by_tgid(entity.id) if user and user.is_bot: user.register_portal(self) async def sync_telegram_users(self, source, users): allowed_tgids = set() for entity in users: puppet = p.Puppet.get(entity.id) if entity.bot: self.add_bot_chat(entity) allowed_tgids.add(entity.id) await puppet.intent.ensure_joined(self.mxid) await puppet.update_info(source, entity) user = u.User.get_by_tgid(entity.id) if user: await self.invite_to_matrix(user.mxid) # We can't trust the member list if any of the following cases is true: # * There are close to 10 000 users, because Telegram might not be sending all members. # * The member sync count is limited, because then we might ignore some members. # * It's a channel, because non-admins don't have access to the member list. trust_member_list = (len(allowed_tgids) < 9900 and config["bridge.max_initial_member_sync"] == -1 and (self.megagroup or self.peer_type != "channel")) if trust_member_list: joined_mxids = await self.main_intent.get_room_members(self.mxid) for user in joined_mxids: if user == self.az.bot_mxid: continue puppet_id = p.Puppet.get_id_from_mxid(user) if puppet_id and puppet_id not in allowed_tgids: if self.bot and puppet_id == self.bot.tgid: self.bot.remove_chat(self.tgid) await self.main_intent.kick(self.mxid, user, "User had left this Telegram chat.") continue mx_user = u.User.get_by_mxid(user, create=False) if mx_user and mx_user.is_bot and mx_user.tgid not in allowed_tgids: mx_user.unregister_portal(self) if mx_user and not self.has_bot and mx_user.tgid not in allowed_tgids: await self.main_intent.kick(self.mxid, mx_user.mxid, "You had left this Telegram chat.") continue async def add_telegram_user(self, user_id, source=None): puppet = p.Puppet.get(user_id) if source: entity = await source.client.get_entity(PeerUser(user_id)) await puppet.update_info(source, entity) await puppet.intent.join_room(self.mxid) user = u.User.get_by_tgid(user_id) if user: user.register_portal(self) await self.invite_to_matrix(user.mxid) async def delete_telegram_user(self, user_id, sender): puppet = p.Puppet.get(user_id) user = u.User.get_by_tgid(user_id) kick_message = (f"Kicked by {sender.displayname}" if sender and sender.tgid != puppet.tgid else "Left Telegram chat") if sender and sender.tgid != puppet.tgid: await self.main_intent.kick(self.mxid, puppet.mxid, kick_message) else: await puppet.intent.leave_room(self.mxid) if user: user.unregister_portal(self) await self.main_intent.kick(self.mxid, user.mxid, kick_message) async def update_info(self, user, entity=None): if self.peer_type == "user": self.log.warning(f"Called update_info() for direct chat portal {self.tgid_log}") return self.log.debug(f"Updating info of {self.tgid_log}") if not entity: entity = await user.client.get_entity(self.peer) self.log.debug("Fetched data: %s", entity) changed = False if self.peer_type == "channel": changed = await self.update_username(entity.username) or changed # TODO update about text # changed = self.update_about(entity.about) or changed changed = await self.update_title(entity.title) or changed if isinstance(entity.photo, ChatPhoto): changed = await self.update_avatar(user, entity.photo.photo_big) or changed if changed: self.save() async def update_username(self, username, save=False): if self.username != username: if self.username: await self.main_intent.remove_room_alias(self._get_alias_localpart()) self.username = username or None if self.username: await self.main_intent.add_room_alias(self.mxid, self._get_alias_localpart()) if config["bridge.public_portals"]: await self.main_intent.set_join_rule(self.mxid, "public") else: await self.main_intent.set_join_rule(self.mxid, "invite") if save: self.save() return True return False async def update_about(self, about, save=False): if self.about != about: self.about = about await self.main_intent.set_room_topic(self.mxid, self.about) if save: self.save() return True return False async def update_title(self, title, save=False): if self.title != title: self.title = title await self.main_intent.set_room_name(self.mxid, self.title) if save: self.save() return True return False @staticmethod def _get_largest_photo_size(photo): return max(photo.sizes, key=(lambda photo2: ( len(photo2.bytes) if isinstance(photo2, PhotoCachedSize) else photo2.size))) async def remove_avatar(self, user, save=False): await self.main_intent.set_room_avatar(self.mxid, None) self.photo_id = None if save: self.save() async def update_avatar(self, user, photo, save=False): photo_id = f"{photo.volume_id}-{photo.local_id}" if self.photo_id != photo_id: file = await util.transfer_file_to_matrix(self.db, user.client, self.main_intent, photo) if file: await self.main_intent.set_room_avatar(self.mxid, file.mxc) self.photo_id = photo_id if save: self.save() return True return False async def _get_users(self, user, entity): if self.peer_type == "chat": chat = await user.client(GetFullChatRequest(chat_id=self.tgid)) return chat.users, chat.full_chat.participants.participants elif self.peer_type == "channel": limit = config["bridge.max_initial_member_sync"] if limit == 0: return [], [] try: if 0 < limit <= 200: response = await user.client(GetParticipantsRequest( entity, ChannelParticipantsRecent(), offset=0, limit=limit, hash=0)) return response.users, response.participants elif limit > 200 or limit == -1: users, participants = [], [] offset = 0 remaining_quota = limit if limit > 0 else 1000000 query = (ChannelParticipantsSearch("") if limit == -1 else ChannelParticipantsRecent()) while True: if remaining_quota <= 0: break response = await user.client(GetParticipantsRequest( entity, query, offset=offset, limit=min(remaining_quota, 100), hash=0)) if not response.users: break participants += response.participants users += response.users offset += len(response.participants) remaining_quota -= len(response.participants) return users, participants except ChatAdminRequiredError: return [], [] elif self.peer_type == "user": return [entity], [] async def get_invite_link(self, user): if self.peer_type == "user": raise ValueError("You can't invite users to private chats.") elif self.peer_type == "chat": link = await user.client(ExportChatInviteRequest(chat_id=self.tgid)) elif self.peer_type == "channel": if self.username: return f"https://t.me/{self.username}" link = await user.client( ExportInviteRequest(channel=await self.get_input_entity(user))) else: raise ValueError(f"Invalid peer type '{self.peer_type}' for invite link.") if isinstance(link, ChatInviteEmpty): raise ValueError("Failed to get invite link.") return link.link async def get_authenticated_matrix_users(self): try: members = await self.main_intent.get_room_members(self.mxid) except MatrixRequestError: return [] authenticated = [] has_bot = self.has_bot for member in members: if p.Puppet.get_id_from_mxid(member) or member == self.main_intent.mxid: continue user = await u.User.get_by_mxid(member).ensure_started() if (has_bot and user.relaybot_whitelisted) or await user.has_full_access( allow_bot=True): authenticated.append(user) return authenticated @staticmethod async def cleanup_room(intent, room_id, message="Portal deleted", puppets_only=False): try: members = await intent.get_room_members(room_id) except MatrixRequestError: members = [] for user in members: puppet = p.Puppet.get_by_mxid(user, create=False) if user != intent.mxid and (not puppets_only or puppet): try: if puppet: await puppet.intent.leave_room(room_id) else: await intent.kick(room_id, user, message) except (MatrixRequestError, IntentError): pass await intent.leave_room(room_id) async def unbridge(self): await self.cleanup_room(self.main_intent, self.mxid, "Room unbridged", puppets_only=True) self.delete() async def cleanup_and_delete(self): await self.cleanup_room(self.main_intent, self.mxid) self.delete() # endregion # region Matrix event handling @staticmethod def _get_file_meta(body, mime): try: current_extension = body[body.rindex("."):] if mimetypes.types_map[current_extension] == mime: return body except (ValueError, KeyError): pass if mime: return f"matrix_upload{mimetypes.guess_extension(mime)}" else: return "" async def _get_state_change_message(self, event, user, arguments=None): tpl = config[f"bridge.state_event_formats.{event}"] if len(tpl) == 0: # Empty format means they don't want the message return None displayname = await self.get_displayname(user) tpl_args = dict(mxid=user.mxid, username=user.mxid_localpart, displayname=displayname) tpl_args = {**tpl_args, **(arguments or {})} message = Template(tpl).safe_substitute(tpl_args) return { "format": "org.matrix.custom.html", "formatted_body": message, } async def name_change_matrix(self, user, displayname, prev_displayname, event_id): async with self.require_send_lock(self.bot.tgid): message = await self._get_state_change_message( "name_change", user, dict(displayname=displayname, prev_displayname=prev_displayname)) if not message: return response = await self.bot.client.send_message( self.peer, message, parse_mode=self._matrix_event_to_entities) space = self.tgid if self.peer_type == "channel" else self.bot.tgid self.is_duplicate(response, (event_id, space)) async def get_displayname(self, user): return (await self.main_intent.get_displayname(self.mxid, user.mxid) or user.mxid_localpart) async def leave_matrix(self, user, source, event_id): if await user.needs_relaybot(self): async with self.require_send_lock(self.bot.tgid): message = await self._get_state_change_message("leave", user) if not message: return response = await self.bot.client.send_message( self.peer, message, parse_mode=self._matrix_event_to_entities) space = self.tgid if self.peer_type == "channel" else self.bot.tgid self.is_duplicate(response, (event_id, space)) return if self.peer_type == "user": await self.main_intent.leave_room(self.mxid) self.delete() try: del self.by_tgid[self.tgid_full] del self.by_mxid[self.mxid] except KeyError: pass elif source and source.tgid != user.tgid: if self.peer_type == "chat": await source.client(DeleteChatUserRequest(chat_id=self.tgid, user_id=user.tgid)) else: channel = await self.get_input_entity(source) rights = ChannelBannedRights(datetime.fromtimestamp(0), True) await source.client(EditBannedRequest(channel=channel, user_id=user.tgid, banned_rights=rights)) elif self.peer_type == "chat": await user.client(DeleteChatUserRequest(chat_id=self.tgid, user_id=InputUserSelf())) elif self.peer_type == "channel": channel = await self.get_input_entity(user) await user.client(LeaveChannelRequest(channel=channel)) async def join_matrix(self, user, event_id): if await user.needs_relaybot(self): async with self.require_send_lock(self.bot.tgid): message = await self._get_state_change_message("join", user) if not message: return response = await self.bot.client.send_message( self.peer, message, parse_mode=self._matrix_event_to_entities) space = self.tgid if self.peer_type == "channel" else self.bot.tgid self.is_duplicate(response, (event_id, space)) return if self.peer_type == "channel" and not user.is_bot: await user.client(JoinChannelRequest(channel=await self.get_input_entity(user))) else: # We'll just assume the user is already in the chat. pass async def _apply_msg_format(self, sender, msgtype, message): if "formatted_body" not in message: message["format"] = "org.matrix.custom.html" message["formatted_body"] = escape_html(message.get("body", "")) body = message["formatted_body"] tpl = config["bridge.message_formats"].get(msgtype, "$sender_displayname: $message") displayname = await self.get_displayname(sender) tpl_args = dict(sender_mxid=sender.mxid, sender_username=sender.mxid_localpart, sender_displayname=displayname, message=body) message["formatted_body"] = Template(tpl).safe_substitute(tpl_args) async def _preprocess_matrix_message(self, sender, use_relaybot, message): msgtype = message.get("msgtype", "m.text") if msgtype == "m.emote": await self._apply_msg_format(sender, msgtype, message) message["msgtype"] = "m.text" elif use_relaybot: await self._apply_msg_format(sender, msgtype, message) def _matrix_event_to_entities(self, event): try: if event.get("format", None) == "org.matrix.custom.html": message, entities = formatter.matrix_to_telegram(event["formatted_body"]) else: message, entities = formatter.matrix_text_to_telegram(event["body"]) except KeyError: message, entities = None, None return message, entities def require_send_lock(self, id): if id is None: return None try: return self._send_locks[id] except KeyError: self._send_locks[id] = asyncio.Lock() return self._send_locks[id] def optional_send_lock(self, id): if id is None: return None try: return self._send_locks[id] except KeyError: return None async def _handle_matrix_text(self, sender_id, event_id, space, client, message, reply_to): lock = self.require_send_lock(sender_id) async with lock: response = await client.send_message(self.peer, message, reply_to=reply_to, parse_mode=self._matrix_event_to_entities) self._add_telegram_message_to_db(event_id, space, response) async def _handle_matrix_file(self, type, sender_id, event_id, space, client, message, reply_to): file = await self.main_intent.download_file(message["url"]) info = message.get("info", {}) mime = info.get("mimetype", None) w, h = None, None if type == "m.sticker": if mime != "image/gif": mime, file, w, h = util.convert_image(file, source_mime=mime, target_type="webp") else: # Remove sticker description message["mxtg_filename"] = "sticker.gif" message["body"] = "" elif "w" in info and "h" in info: w, h = info["w"], info["h"] file_name = self._get_file_meta(message["mxtg_filename"], mime) attributes = [DocumentAttributeFilename(file_name=file_name)] if w and h: attributes.append(DocumentAttributeImageSize(w, h)) caption = message["body"] if message["body"] != file_name else None media = await client.upload_file(file, mime, attributes, file_name) lock = self.require_send_lock(sender_id) async with lock: response = await client.send_media(self.peer, media, reply_to=reply_to, caption=caption) self._add_telegram_message_to_db(event_id, space, response) async def _handle_matrix_location(self, sender_id, event_id, space, client, message, reply_to): try: lat, long = message["geo_uri"][len("geo:"):].split(",") lat, long = float(lat), float(long) except (KeyError, ValueError): self.log.exception("Failed to parse location") return None message, entities = self._matrix_event_to_entities(message) media = MessageMediaGeo(geo=GeoPoint(lat, long)) lock = self.require_send_lock(sender_id) async with lock: response = await client.send_media(self.peer, media, reply_to=reply_to, caption=message, entities=entities) self._add_telegram_message_to_db(event_id, space, response) def _add_telegram_message_to_db(self, event_id, space, response): self.log.debug("Handled Matrix message: %s", response) self.is_duplicate(response, (event_id, space)) self.db.add(DBMessage( tgid=response.id, tg_space=space, mx_room=self.mxid, mxid=event_id)) self.db.commit() async def handle_matrix_message(self, sender: u.User, message: dict, event_id: str): puppet = p.Puppet.get_by_custom_mxid(sender.mxid) if puppet and message.get("net.maunium.telegram.puppet", False): self.log.debug("Ignoring puppet-sent message by confirmed puppet user %s", sender.mxid) return logged_in = not await sender.needs_relaybot(self) client = sender.client if logged_in else self.bot.client sender_id = sender.tgid if logged_in else self.bot.tgid space = (self.tgid if self.peer_type == "channel" # Channels have their own ID space else (sender.tgid if logged_in else self.bot.tgid)) reply_to = formatter.matrix_reply_to_telegram(message, space, room_id=self.mxid) message["mxtg_filename"] = message["body"] await self._preprocess_matrix_message(sender, not logged_in, message) type = message["msgtype"] if type == "m.text" or (self.bridge_notices and type == "m.notice"): await self._handle_matrix_text(sender_id, event_id, space, client, message, reply_to) elif type == "m.location": await self._handle_matrix_location(sender_id, event_id, space, client, message, reply_to) elif type in ("m.sticker", "m.image", "m.file", "m.audio", "m.video"): await self._handle_matrix_file(type, sender_id, event_id, space, client, message, reply_to) else: self.log.debug(f"Unhandled Matrix event: {message}") async def handle_matrix_pin(self, sender, pinned_message): if self.peer_type != "channel": return try: if not pinned_message: await sender.client(UpdatePinnedMessageRequest(channel=self.peer, id=0)) else: message = DBMessage.query.filter(DBMessage.mxid == pinned_message, DBMessage.tg_space == self.tgid, DBMessage.mx_room == self.mxid).one_or_none() await sender.client(UpdatePinnedMessageRequest(channel=self.peer, id=message.tgid)) except ChatNotModifiedError: pass async def handle_matrix_deletion(self, deleter, event_id): deleter = deleter if not await deleter.needs_relaybot(self) else self.bot space = self.tgid if self.peer_type == "channel" else deleter.tgid message = DBMessage.query.filter(DBMessage.mxid == event_id, DBMessage.tg_space == space, DBMessage.mx_room == self.mxid).one_or_none() if not message: return await deleter.client.delete_messages(self.peer, [message.tgid]) async def _update_telegram_power_level(self, sender, user_id, level): if self.peer_type == "chat": await sender.client(EditChatAdminRequest( chat_id=self.tgid, user_id=user_id, is_admin=level >= 50)) elif self.peer_type == "channel": moderator = level >= 50 admin = level >= 75 rights = ChannelAdminRights(change_info=moderator, post_messages=moderator, edit_messages=moderator, delete_messages=moderator, ban_users=moderator, invite_users=moderator, invite_link=moderator, pin_messages=moderator, add_admins=admin) await sender.client( EditAdminRequest(channel=await self.get_input_entity(sender), user_id=user_id, admin_rights=rights)) async def handle_matrix_power_levels(self, sender, new_users, old_users): # TODO handle all power level changes and bridge exact admin rights to supergroups/channels for user, level in new_users.items(): if not user or user == self.main_intent.mxid or user == sender.mxid: continue user_id = p.Puppet.get_id_from_mxid(user) if not user_id: mx_user = u.User.get_by_mxid(user, create=False) if not mx_user or not mx_user.tgid: continue user_id = mx_user.tgid if not user_id or user_id == sender.tgid: continue if user not in old_users or level != old_users[user]: await self._update_telegram_power_level(sender, user_id, level) async def handle_matrix_about(self, sender, about): if self.peer_type not in {"channel"}: return channel = await self.get_input_entity(sender) await sender.client(EditAboutRequest(channel=channel, about=about)) self.about = about self.save() async def handle_matrix_title(self, sender, title): if self.peer_type not in {"chat", "channel"}: return if self.peer_type == "chat": response = await sender.client(EditChatTitleRequest(chat_id=self.tgid, title=title)) else: channel = await self.get_input_entity(sender) response = await sender.client(EditTitleRequest(channel=channel, title=title)) self._register_outgoing_actions_for_dedup(response) self.title = title self.save() async def handle_matrix_avatar(self, sender, url): if self.peer_type not in {"chat", "channel"}: # Invalid peer type return file = await self.main_intent.download_file(url) mime = magic.from_buffer(file, mime=True) ext = mimetypes.guess_extension(mime) uploaded = await sender.client.upload_file(file, file_name=f"avatar{ext}") photo = InputChatUploadedPhoto(file=uploaded) if self.peer_type == "chat": response = await sender.client(EditChatPhotoRequest(chat_id=self.tgid, photo=photo)) else: channel = await self.get_input_entity(sender) response = await sender.client(EditPhotoRequest(channel=channel, photo=photo)) self._register_outgoing_actions_for_dedup(response) for update in response.updates: is_photo_update = (isinstance(update, UpdateNewMessage) and isinstance(update.message, MessageService) and isinstance(update.message.action, MessageActionChatEditPhoto)) if is_photo_update: loc = self._get_largest_photo_size(update.message.action.photo).location self.photo_id = f"{loc.volume_id}-{loc.local_id}" self.save() break def _register_outgoing_actions_for_dedup(self, response): for update in response.updates: check_dedup = (isinstance(update, (UpdateNewMessage, UpdateNewChannelMessage)) and isinstance(update.message, MessageService)) if check_dedup: self.is_duplicate_action(update.message) # endregion # region Telegram chat info updating async def _get_telegram_users_in_matrix_room(self): user_tgids = set() user_mxids = await self.main_intent.get_room_members(self.mxid, ("join", "invite")) for user in user_mxids: if user == self.az.bot_mxid: continue mx_user = u.User.get_by_mxid(user, create=False) if mx_user and mx_user.tgid: user_tgids.add(mx_user.tgid) puppet_id = p.Puppet.get_id_from_mxid(user) if puppet_id: user_tgids.add(puppet_id) return list(user_tgids) async def upgrade_telegram_chat(self, source): if self.peer_type != "chat": raise ValueError("Only normal group chats are upgradable to supergroups.") updates = await source.client(MigrateChatRequest(chat_id=self.tgid)) entity = None for chat in updates.chats: if isinstance(chat, Channel): entity = chat break if not entity: raise ValueError("Upgrade may have failed: output channel not found.") self.peer_type = "channel" self.migrate_and_save(entity.id) await self.update_info(source, entity) async def set_telegram_username(self, source, username): if self.peer_type != "channel": raise ValueError("Only channels and supergroups have usernames.") await source.client( UpdateUsernameRequest(await self.get_input_entity(source), username)) if await self.update_username(username): self.save() async def create_telegram_chat(self, source, supergroup=False): if not self.mxid: raise ValueError("Can't create Telegram chat for portal without Matrix room.") elif self.tgid: raise ValueError("Can't create Telegram chat for portal with existing Telegram chat.") invites = await self._get_telegram_users_in_matrix_room() if len(invites) < 2: raise ValueError("Not enough Telegram users to create a chat") if self.peer_type == "chat": updates = await source.client(CreateChatRequest(title=self.title, users=invites)) entity = updates.chats[0] elif self.peer_type == "channel": updates = await source.client(CreateChannelRequest(title=self.title, about=self.about or "", megagroup=supergroup)) entity = updates.chats[0] await source.client(InviteToChannelRequest( channel=await source.client.get_input_entity(entity), users=invites)) else: raise ValueError("Invalid peer type for Telegram chat creation") self.tgid = entity.id self.tg_receiver = self.tgid self.by_tgid[self.tgid_full] = self await self.update_info(source, entity) self.db.add(self.db_instance) self.save() if self.bot and self.bot.tgid in invites: self.bot.add_chat(self.tgid, self.peer_type) levels = await self.main_intent.get_power_levels(self.mxid) bot_level = self._get_bot_level(levels) if bot_level == 100: levels = self._get_base_power_levels(levels, entity) await self.main_intent.set_power_levels(self.mxid, levels) await self.handle_matrix_power_levels(source, levels["users"], {}) async def invite_telegram(self, source, puppet): if self.peer_type == "chat": await source.client( AddChatUserRequest(chat_id=self.tgid, user_id=puppet.tgid, fwd_limit=0)) elif self.peer_type == "channel": await source.client(InviteToChannelRequest(channel=self.peer, users=[puppet.tgid])) else: raise ValueError("Invalid peer type for Telegram user invite") # endregion # region Telegram event handling async def handle_telegram_typing(self, user, event): if self.mxid: await user.intent.set_typing(self.mxid, is_typing=True) def get_external_url(self, evt: Message): if self.peer_type == "channel" and self.username is not None: return f"https://t.me/{self.username}/{evt.id}" return None async def handle_telegram_photo(self, source: u.User, intent, evt: Message, relates_to=None): largest_size = self._get_largest_photo_size(evt.media.photo) file = await util.transfer_file_to_matrix(self.db, source.client, intent, largest_size.location) if not file: return None if config["bridge.inline_images"] and (evt.message or evt.fwd_from or evt.reply_to_msg_id): text, html, relates_to = await formatter.telegram_to_matrix( evt, source, self.main_intent, prefix_html=f"Inline Telegram photo
", prefix_text="Inline image: ") await intent.set_typing(self.mxid, is_typing=False) return await intent.send_text(self.mxid, text, html=html, relates_to=relates_to, timestamp=evt.date, external_url=self.get_external_url(evt)) info = { "h": largest_size.h, "w": largest_size.w, "size": len(largest_size.bytes) if ( isinstance(largest_size, PhotoCachedSize)) else largest_size.size, "orientation": 0, "mimetype": file.mime_type, } name = evt.message await intent.set_typing(self.mxid, is_typing=False) return await intent.send_image(self.mxid, file.mxc, info=info, text=name, relates_to=relates_to, timestamp=evt.date, external_url=self.get_external_url(evt)) @staticmethod def _parse_telegram_document_attributes(attributes): attrs = { "name": None, "mime_type": None, "is_sticker": False, "sticker_alt": None, "width": None, "height": None, } for attr in attributes: if isinstance(attr, DocumentAttributeFilename): attrs["name"] = attrs["name"] or attr.file_name attrs["mime_type"], _ = mimetypes.guess_type(attr.file_name) elif isinstance(attr, DocumentAttributeSticker): attrs["is_sticker"] = True attrs["sticker_alt"] = attr.alt elif isinstance(attr, DocumentAttributeVideo): attrs["width"], attrs["height"] = attr.w, attr.h return attrs @staticmethod def _parse_telegram_document_meta(evt, file, attrs): document = evt.media.document name = evt.message or attrs["name"] if attrs["is_sticker"]: alt = attrs["sticker_alt"] if len(alt) > 0: name = f"{alt} ({unicodedata.name(alt[0]).lower()})" mime_type = document.mime_type or file.mime_type info = { "size": file.size, "mimetype": mime_type, } if attrs["mime_type"] and not file.was_converted: file.mime_type = attrs["mime_type"] or file.mime_type if file.width and file.height: info["w"], info["h"] = file.width, file.height elif attrs["width"] and attrs["height"]: info["w"], info["h"] = attrs["width"], attrs["height"] if file.thumbnail: info["thumbnail_url"] = file.thumbnail.mxc info["thumbnail_info"] = { "mimetype": file.thumbnail.mime_type, "h": file.thumbnail.height or document.thumb.h, "w": file.thumbnail.width or document.thumb.w, "size": file.thumbnail.size, } return info, name async def handle_telegram_document(self, source, intent, evt: Message, relates_to=None): document = evt.media.document attrs = self._parse_telegram_document_attributes(document.attributes) file = await util.transfer_file_to_matrix(self.db, source.client, intent, document, document.thumb, is_sticker=attrs["is_sticker"]) if not file: return None info, name = self._parse_telegram_document_meta(evt, file, attrs) await intent.set_typing(self.mxid, is_typing=False) kwargs = { "room_id": self.mxid, "url": file.mxc, "info": info, "text": name, "relates_to": relates_to, "timestamp": evt.date, "external_url": self.get_external_url(evt) } if attrs["is_sticker"] and config["bridge.native_stickers"]: return await intent.send_sticker(**kwargs) mime_type = info["mimetype"] if mime_type.startswith("video/"): kwargs["file_type"] = "m.video" elif mime_type.startswith("audio/"): kwargs["file_type"] = "m.audio" elif mime_type.startswith("image/"): kwargs["file_type"] = "m.image" else: kwargs["file_type"] = "m.file" return await intent.send_file(**kwargs) def handle_telegram_location(self, source, intent, evt, relates_to=None): location = evt.media.geo long = location.long lat = location.lat long_char = "E" if long > 0 else "W" lat_char = "N" if lat > 0 else "S" rounded_long = abs(round(long * 100000) / 100000) rounded_lat = abs(round(lat * 100000) / 100000) body = f"{rounded_lat}° {lat_char}, {rounded_long}° {long_char}" url = f"https://maps.google.com/?q={lat},{long}" formatted_body = f"Location: {body}" # At least riot-web ignores formatting in m.location messages, # so we'll add a plaintext link. body = f"Location: {body}\n{url}" return intent.send_message(self.mxid, { "msgtype": "m.location", "geo_uri": f"geo:{lat},{long}", "body": body, "format": "org.matrix.custom.html", "formatted_body": formatted_body, "m.relates_to": relates_to or None, }, timestamp=evt.date, external_url=self.get_external_url(evt)) async def handle_telegram_text(self, source, intent, is_bot, evt): self.log.debug(f"Sending {evt.message} to {self.mxid} by {intent.mxid}") text, html, relates_to = await formatter.telegram_to_matrix(evt, source, self.main_intent) await intent.set_typing(self.mxid, is_typing=False) msgtype = "m.notice" if is_bot and config["bridge.bot_messages_as_notices"] else "m.text" return await intent.send_text(self.mxid, text, html=html, relates_to=relates_to, msgtype=msgtype, timestamp=evt.date, external_url=self.get_external_url(evt)) async def handle_telegram_edit(self, source, sender, evt): if not self.mxid: return elif not config["bridge.edits_as_replies"]: self.log.debug("Edits as replies disabled, ignoring edit event...") return lock = self.optional_send_lock(sender.tgid if sender else None) if lock: async with lock: pass tg_space = self.tgid if self.peer_type == "channel" else source.tgid temporary_identifier = f"${random.randint(1000000000000,9999999999999)}TGBRIDGEDITEMP" duplicate_found = self.is_duplicate(evt, (temporary_identifier, tg_space), force_hash=True) if duplicate_found: mxid, other_tg_space = duplicate_found if tg_space != other_tg_space: msg = DBMessage.query.get((evt.id, tg_space)) msg.mxid = mxid msg.mx_room = self.mxid self.db.commit() return evt.reply_to_msg_id = evt.id text, html, relates_to = await formatter.telegram_to_matrix(evt, source, self.main_intent, is_edit=True) intent = sender.intent if sender else self.main_intent await intent.set_typing(self.mxid, is_typing=False) response = await intent.send_text(self.mxid, text, html=html, relates_to=relates_to, external_url=self.get_external_url(evt)) mxid = response["event_id"] msg = DBMessage.query.get((evt.id, tg_space)) if not msg: # Oh crap return msg.mxid = mxid msg.mx_room = self.mxid DBMessage.query \ .filter(DBMessage.mx_room == self.mxid, DBMessage.mxid == temporary_identifier) \ .update({"mxid": mxid}) self.db.commit() async def handle_telegram_message(self, source, sender, evt): if not self.mxid: await self.create_matrix_room(source, invites=[source.mxid], update_if_exists=False) lock = self.optional_send_lock(sender.tgid if sender else None) if lock: async with lock: pass tg_space = self.tgid if self.peer_type == "channel" else source.tgid temporary_identifier = f"${random.randint(1000000000000,9999999999999)}TGBRIDGETEMP" duplicate_found = self.is_duplicate(evt, (temporary_identifier, tg_space)) if duplicate_found: mxid, other_tg_space = duplicate_found self.log.debug(f"Ignoring message {evt.id}@{tg_space} (src {source.tgid}) " f"as it was already handled (in space {other_tg_space})") if tg_space != other_tg_space: self.db.add( DBMessage(tgid=evt.id, mx_room=self.mxid, mxid=mxid, tg_space=tg_space)) self.db.commit() return if sender and not sender.displayname: self.log.debug(f"Telegram user {sender.tgid} sent a message, but doesn't have a" "displayname, updating info...") entity = await source.client.get_entity(PeerUser(sender.tgid)) await sender.update_info(source, entity) allowed_media = (MessageMediaPhoto, MessageMediaDocument, MessageMediaGeo) media = evt.media if hasattr(evt, "media") and isinstance(evt.media, allowed_media) else None intent = sender.intent if sender else self.main_intent if not media and evt.message: is_bot = sender.is_bot if sender else False response = await self.handle_telegram_text(source, intent, is_bot, evt) elif media: relates_to = formatter.telegram_reply_to_matrix(evt, source) if isinstance(media, MessageMediaPhoto): response = await self.handle_telegram_photo(source, intent, evt, relates_to) elif isinstance(media, MessageMediaDocument): response = await self.handle_telegram_document(source, intent, evt, relates_to) elif isinstance(media, MessageMediaGeo): response = await self.handle_telegram_location(source, intent, evt, relates_to) else: self.log.debug("Unhandled Telegram media: %s", media) return else: self.log.debug("Unhandled Telegram message: %s", evt) return if not response: return mxid = response["event_id"] prev_id = self.update_duplicate(evt, (mxid, tg_space), (temporary_identifier, tg_space)) if prev_id: self.log.debug(f"Sent message {evt.id}@{tg_space} to Matrix as {mxid}. " f"Temporary dedup identifier was {temporary_identifier}, " f"but dedup map contained {prev_id[1]} instead! -- " "This was probably a race condition caused by Telegram sending updates" "to other clients before responding to the sender. I'll just redact " "the likely duplicate message now.") await intent.redact(self.mxid, mxid) return self.log.debug("Handled Telegram message: %s", evt) try: self.db.add(DBMessage(tgid=evt.id, mx_room=self.mxid, mxid=mxid, tg_space=tg_space)) self.db.commit() DBMessage.query \ .filter(DBMessage.mx_room == self.mxid, DBMessage.mxid == temporary_identifier) \ .update({"mxid": mxid}) except FlushError as e: self.log.exception(f"{e.__class__.__name__} while saving message mapping.") await intent.redact(self.mxid, mxid) except (IntegrityError, InvalidRequestError) as e: self.log.exception(f"{e.__class__.__name__} while saving message mapping.") self.db.rollback() await intent.redact(self.mxid, mxid) async def _create_room_on_action(self, source, action): if source.is_relaybot: return False create_and_exit = (MessageActionChatCreate, MessageActionChannelCreate) create_and_continue = (MessageActionChatAddUser, MessageActionChatJoinedByLink) if isinstance(action, create_and_exit + create_and_continue): await self.create_matrix_room(source, invites=[source.mxid], update_if_exists=isinstance(action, create_and_exit)) if not isinstance(action, create_and_continue): return False return True async def handle_telegram_action(self, source, sender, update): action = update.action should_ignore = ((not self.mxid and not await self._create_room_on_action(source, action)) or self.is_duplicate_action(update)) if should_ignore or not self.mxid: return # TODO figure out how to see changes to about text / channel username if isinstance(action, MessageActionChatEditTitle): await self.update_title(action.title, save=True) elif isinstance(action, MessageActionChatEditPhoto): largest_size = self._get_largest_photo_size(action.photo) await self.update_avatar(source, largest_size.location, save=True) elif isinstance(action, MessageActionChatDeletePhoto): await self.remove_avatar(source, save=True) elif isinstance(action, MessageActionChatAddUser): for user_id in action.users: await self.add_telegram_user(user_id, source) elif isinstance(action, MessageActionChatJoinedByLink): await self.add_telegram_user(sender.id, source) elif isinstance(action, MessageActionChatDeleteUser): await self.delete_telegram_user(action.user_id, sender) elif isinstance(action, MessageActionChatMigrateTo): self.peer_type = "channel" self.migrate_and_save(action.channel_id) await sender.intent.send_emote(self.mxid, "upgraded this group to a supergroup.") elif isinstance(action, MessageActionPinMessage): await self.receive_telegram_pin_sender(sender) else: self.log.debug("Unhandled Telegram action in %s: %s", self.title, action) async def set_telegram_admin(self, user_id): puppet = p.Puppet.get(user_id) user = await u.User.get_by_tgid(user_id) levels = await self.main_intent.get_power_levels(self.mxid) if user: levels["users"][user.mxid] = 50 if puppet: levels["users"][puppet.mxid] = 50 await self.main_intent.set_power_levels(self.mxid, levels) async def receive_telegram_pin_sender(self, sender): self._temp_pinned_message_sender = sender if self._temp_pinned_message_id: await self.update_telegram_pin() async def update_telegram_pin(self): intent = (self._temp_pinned_message_sender.intent if self._temp_pinned_message_sender else self.main_intent) id = self._temp_pinned_message_id self._temp_pinned_message_id = None self._temp_pinned_message_sender = None message = DBMessage.query.get((id, self.tgid)) if message: await intent.set_pinned_messages(self.mxid, [message.mxid]) else: await intent.set_pinned_messages(self.mxid, []) async def receive_telegram_pin_id(self, id): if id == 0: return await self.update_telegram_pin() self._temp_pinned_message_id = id if self._temp_pinned_message_sender: await self.update_telegram_pin() @staticmethod def _get_level_from_participant(participant, _): # TODO use the power level requirements to get better precision in channels if isinstance(participant, (ChatParticipantAdmin, ChannelParticipantAdmin)): return 50 elif isinstance(participant, (ChatParticipantCreator, ChannelParticipantCreator)): return 95 return 0 @staticmethod def _participant_to_power_levels(levels, user, new_level, bot_level): new_level = min(new_level, bot_level) default_level = levels["users_default"] if "users_default" in levels else 0 try: user_level = int(levels["users"][user.mxid]) except (ValueError, KeyError): user_level = default_level if user_level != new_level and user_level < bot_level: levels["users"][user.mxid] = new_level return True return False def _get_bot_level(self, levels): try: return levels["users"][self.main_intent.mxid] except KeyError: try: return levels["users_default"] except KeyError: return 0 @staticmethod def _get_powerlevel_level(levels): try: return levels["events"]["m.room.power_levels"] except KeyError: try: return levels["state_default"] except KeyError: return 50 def _participants_to_power_levels(self, participants, levels): bot_level = self._get_bot_level(levels) if bot_level < self._get_powerlevel_level(levels): return False changed = False admin_power_level = min(75 if self.peer_type == "channel" else 50, bot_level) if levels["events"]["m.room.power_levels"] != admin_power_level: changed = True levels["events"]["m.room.power_levels"] = admin_power_level for participant in participants: puppet = p.Puppet.get(participant.user_id) user = u.User.get_by_tgid(participant.user_id) new_level = self._get_level_from_participant(participant, levels) if user: user.register_portal(self) changed = self._participant_to_power_levels(levels, user, new_level, bot_level) or changed if puppet: changed = self._participant_to_power_levels(levels, puppet, new_level, bot_level) or changed return changed async def update_telegram_participants(self, participants, levels=None): if not levels: levels = await self.main_intent.get_power_levels(self.mxid) if self._participants_to_power_levels(participants, levels): await self.main_intent.set_power_levels(self.mxid, levels) async def set_telegram_admins_enabled(self, enabled): level = 50 if enabled else 10 levels = await self.main_intent.get_power_levels(self.mxid) levels["invite"] = level levels["events"]["m.room.name"] = level levels["events"]["m.room.avatar"] = level await self.main_intent.set_power_levels(self.mxid, levels) # endregion # region Database conversion @property def db_instance(self): if not self._db_instance: self._db_instance = self.new_db_instance() return self._db_instance def new_db_instance(self): return DBPortal(tgid=self.tgid, tg_receiver=self.tg_receiver, peer_type=self.peer_type, mxid=self.mxid, username=self.username, megagroup=self.megagroup, title=self.title, about=self.about, photo_id=self.photo_id) def migrate_and_save(self, new_id): existing = DBPortal.query.get(self.tgid_full) if existing: self.db.delete(existing) try: del self.by_tgid[self.tgid_full] except KeyError: pass self.tgid = new_id self.tg_receiver = new_id self.by_tgid[self.tgid_full] = self self.save() def save(self): self.db_instance.mxid = self.mxid self.db_instance.username = self.username self.db_instance.title = self.title self.db_instance.about = self.about self.db_instance.photo_id = self.photo_id self.db.commit() def delete(self): try: del self.by_tgid[self.tgid_full] except KeyError: pass try: del self.by_mxid[self.mxid] except KeyError: pass if self._db_instance: self.db.delete(self._db_instance) self.db.commit() @classmethod def from_db(cls, db_portal): return Portal(tgid=db_portal.tgid, tg_receiver=db_portal.tg_receiver, peer_type=db_portal.peer_type, mxid=db_portal.mxid, username=db_portal.username, megagroup=db_portal.megagroup, title=db_portal.title, about=db_portal.about, photo_id=db_portal.photo_id, db_instance=db_portal) # endregion # region Class instance lookup @classmethod def get_by_mxid(cls, mxid): try: return cls.by_mxid[mxid] except KeyError: pass portal = DBPortal.query.filter(DBPortal.mxid == mxid).one_or_none() if portal: return cls.from_db(portal) return None @classmethod def get_username_from_mx_alias(cls, alias): match = cls.mx_alias_regex.match(alias) if match: return match.group(1) return None @classmethod def find_by_username(cls, username): if not username: return None for _, portal in cls.by_tgid.items(): if portal.username and portal.username.lower() == username.lower(): return portal portal = DBPortal.query.filter(DBPortal.username == username).one_or_none() if portal: return cls.from_db(portal) return None @classmethod def get_by_tgid(cls, tgid, tg_receiver=None, peer_type=None): tg_receiver = tg_receiver or tgid tgid_full = (tgid, tg_receiver) try: return cls.by_tgid[tgid_full] except KeyError: pass portal = DBPortal.query.get(tgid_full) if portal: return cls.from_db(portal) if peer_type: portal = Portal(tgid, peer_type=peer_type, tg_receiver=tg_receiver) cls.db.add(portal.db_instance) cls.db.commit() return portal return None @classmethod def get_by_entity(cls, entity, receiver_id=None, create=True): entity_type = type(entity) if entity_type in {Chat, ChatFull}: type_name = "chat" id = entity.id elif entity_type in {PeerChat, InputPeerChat}: type_name = "chat" id = entity.chat_id elif entity_type in {Channel, ChannelFull}: type_name = "channel" id = entity.id elif entity_type in {PeerChannel, InputPeerChannel, InputChannel}: type_name = "channel" id = entity.channel_id elif entity_type in {User, UserFull}: type_name = "user" id = entity.id elif entity_type in {PeerUser, InputPeerUser, InputUser}: type_name = "user" id = entity.user_id else: raise ValueError(f"Unknown entity type {entity_type.__name__}") return cls.get_by_tgid(id, receiver_id if type_name == "user" else id, type_name if create else None) # endregion def init(context): global config Portal.az, Portal.db, config, Portal.loop, Portal.bot = context Portal.bridge_notices = config["bridge.bridge_notices"] Portal.filter_mode = config["bridge.filter.mode"] Portal.filter_list = config["bridge.filter.list"] Portal.alias_template = config.get("bridge.alias_template", "telegram_{groupname}") Portal.hs_domain = config["homeserver.domain"] localpart = Portal.alias_template.format(groupname="(.+)") Portal.mx_alias_regex = re.compile(f"#{localpart}:{Portal.hs_domain}")