# -*- coding: future_fstrings -*- # mautrix-telegram - A Matrix-Telegram puppeting bridge # Copyright (C) 2018 Tulir Asokan # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import logging import asyncio import re from telethon.tl.types import * from telethon.tl.types.contacts import ContactsNotModified from telethon.tl.functions.contacts import GetContactsRequest, SearchRequest from mautrix_appservice import MatrixRequestError from .db import User as DBUser, Message as DBMessage, Contact as DBContact from .abstract_user import AbstractUser from . import portal as po, puppet as pu config = None class User(AbstractUser): log = logging.getLogger("mau.user") by_mxid = {} by_tgid = {} def __init__(self, mxid, tgid=None, username=None, db_contacts=None, saved_contacts=0, db_portals=None): super().__init__() self.mxid = mxid self.tgid = tgid self.username = username self.contacts = [] self.saved_contacts = saved_contacts self.db_contacts = db_contacts self.portals = {} self.db_portals = db_portals self.command_status = None self.is_admin = self.mxid in config.get("bridge.admins", []) whitelist = config.get("bridge.whitelist", None) or [self.mxid] self.whitelisted = not whitelist or self.mxid in whitelist if not self.whitelisted: homeserver = self.mxid[self.mxid.index(":") + 1:] self.whitelisted = homeserver in whitelist self.by_mxid[mxid] = self if tgid: self.by_tgid[tgid] = self self._init_client() @property def name(self): return self.mxid @property def displayname(self): # TODO show better username match = re.compile("@(.+):(.+)").match(self.mxid) return match.group(1) @property def db_contacts(self): return [self.db.merge(DBContact(user=self.tgid, contact=puppet.id)) for puppet in self.contacts] @db_contacts.setter def db_contacts(self, contacts): if contacts: self.contacts = [pu.Puppet.get(entry.contact) for entry in contacts] else: self.contacts = [] @property def db_portals(self): return [portal.to_db(merge=False) for _, portal in self.portals.items()] @db_portals.setter def db_portals(self, portals): if portals: self.portals = {(portal.tgid, portal.tg_receiver): po.Portal.get_by_tgid(portal.tgid, portal.tg_receiver) for portal in portals} else: self.portals = {} # region Database conversion def to_db(self): return self.db.merge( DBUser(mxid=self.mxid, tgid=self.tgid, tg_username=self.username, contacts=self.db_contacts, saved_contacts=self.saved_contacts, portals=self.db_portals)) def save(self): self.to_db() self.db.commit() def delete(self): try: del self.by_mxid[self.mxid] del self.by_tgid[self.tgid] except KeyError: pass self.db.delete(self.to_db()) self.db.commit() @classmethod def from_db(cls, db_user): return User(db_user.mxid, db_user.tgid, db_user.tg_username, db_user.contacts, db_user.saved_contacts, db_user.portals) # endregion # region Telegram connection management async def start(self, delete_unless_authenticated=False): await super().start() if self.logged_in: self.log.debug(f"Ensuring post_login() for {self.name}") asyncio.ensure_future(self.post_login(), loop=self.loop) elif delete_unless_authenticated: self.log.debug(f"Unauthenticated user {self.name} start()ed, deleting...") # User not logged in -> forget user self.client.disconnect() self.client.session.delete() self.delete() return self async def post_login(self, info=None): try: await self.update_info(info) await self.sync_dialogs() await self.sync_contacts() except Exception: self.log.exception("Failed to run post-login functions") # endregion # region Telegram actions that need custom methods async def update_info(self, info=None): info = info or await self.client.get_me() changed = False if self.username != info.username: self.username = info.username changed = True if self.tgid != info.id: self.tgid = info.id self.by_tgid[self.tgid] = self if changed: self.save() async def log_out(self): for _, portal in self.portals.items(): try: await portal.main_intent.kick(portal.mxid, self.mxid, "Logged out of Telegram.") except MatrixRequestError: pass self.portals = {} self.contacts = [] self.save() if self.tgid: try: del self.by_tgid[self.tgid] except KeyError: pass self.tgid = None self.save() ok = await self.client.log_out() if not ok: return False self.delete() return True def _search_local(self, query, max_results=5, min_similarity=45): results = [] for contact in self.contacts: similarity = contact.similarity(query) if similarity >= min_similarity: results.append((contact, similarity)) results.sort(key=lambda tup: tup[1], reverse=True) return results[0:max_results] async def _search_remote(self, query, max_results=5): if len(query) < 5: return [] server_results = await self.client(SearchRequest(q=query, limit=max_results)) results = [] for user in server_results.users: puppet = pu.Puppet.get(user.id) await puppet.update_info(self, user) results.append((puppet, puppet.similarity(query))) results.sort(key=lambda tup: tup[1], reverse=True) return results[0:max_results] async def search(self, query, force_remote=False): if force_remote: return await self._search_remote(query), True results = self._search_local(query) if results: return results, False return await self._search_remote(query), True async def sync_dialogs(self): creators = [] for entity in await self._get_dialogs(limit=30): portal = po.Portal.get_by_entity(entity) self.portals[portal.tgid_full] = portal creators.append(portal.create_matrix_room(self, entity, invites=[self.mxid])) self.save() await asyncio.gather(*creators, loop=self.loop) def register_portal(self, portal): try: if self.portals[portal.tgid_full] == portal: return except KeyError: pass self.portals[portal.tgid_full] = portal self.save() def unregister_portal(self, portal): try: del self.portals[portal.tgid_full] self.save() except KeyError: pass def _hash_contacts(self): acc = 0 for id in sorted([self.saved_contacts] + [contact.id for contact in self.contacts]): acc = (acc * 20261 + id) & 0xffffffff return acc & 0x7fffffff async def sync_contacts(self): response = await self.client(GetContactsRequest(hash=self._hash_contacts())) if isinstance(response, ContactsNotModified): return self.log.debug("Updating contacts...") self.contacts = [] self.saved_contacts = response.saved_count for user in response.users: puppet = pu.Puppet.get(user.id) await puppet.update_info(self, user) self.contacts.append(puppet) self.save() # endregion # region Telegram update handling async def update(self, update): if isinstance(update, (UpdateShortChatMessage, UpdateShortMessage, UpdateNewChannelMessage, UpdateNewMessage, UpdateEditMessage, UpdateEditChannelMessage)): await self.update_message(update) elif isinstance(update, (UpdateChatUserTyping, UpdateUserTyping)): await self.update_typing(update) elif isinstance(update, UpdateUserStatus): await self.update_status(update) elif isinstance(update, (UpdateChatAdmins, UpdateChatParticipantAdmin)): await self.update_admin(update) elif isinstance(update, UpdateChatParticipants): portal = po.Portal.get_by_tgid(update.participants.chat_id) if portal and portal.mxid: await portal.update_telegram_participants(update.participants.participants) elif isinstance(update, UpdateChannelPinnedMessage): portal = po.Portal.get_by_tgid(update.channel_id) if portal and portal.mxid: await portal.update_telegram_pin(self, update.id) elif isinstance(update, (UpdateUserName, UpdateUserPhoto)): await self.update_others_info(update) elif isinstance(update, UpdateReadHistoryOutbox): await self.update_read_receipt(update) else: self.log.debug("Unhandled update: %s", update) async def update_read_receipt(self, update): if not isinstance(update.peer, PeerUser): self.log.debug("Unexpected read receipt peer: %s", update.peer) return portal = po.Portal.get_by_tgid(update.peer.user_id, self.tgid) if not portal or not portal.mxid: return # We check that these are user read receipts, so tg_space is always the user ID. message = DBMessage.query.get((update.max_id, self.tgid)) if not message: return puppet = pu.Puppet.get(update.peer.user_id) await puppet.intent.mark_read(portal.mxid, message.mxid) async def update_admin(self, update): portal = po.Portal.get_by_tgid(update.chat_id, peer_type="chat") if isinstance(update, UpdateChatAdmins): await portal.set_telegram_admins_enabled(update.enabled) elif isinstance(update, UpdateChatParticipantAdmin): puppet = pu.Puppet.get(update.user_id) user = await User.get_by_tgid(update.user_id).ensure_started() await portal.set_telegram_admin(puppet, user) async def update_typing(self, update): if isinstance(update, UpdateUserTyping): portal = po.Portal.get_by_tgid(update.user_id, self.tgid, "user") else: portal = po.Portal.get_by_tgid(update.chat_id, peer_type="chat") sender = pu.Puppet.get(update.user_id) await portal.handle_telegram_typing(sender, update) async def update_others_info(self, update): puppet = pu.Puppet.get(update.user_id) if isinstance(update, UpdateUserName): if await puppet.update_displayname(self, update): puppet.save() elif isinstance(update, UpdateUserPhoto): if await puppet.update_avatar(self, update.photo.photo_big): puppet.save() async def update_status(self, update): puppet = pu.Puppet.get(update.user_id) if isinstance(update.status, UserStatusOnline): await puppet.intent.set_presence("online") elif isinstance(update.status, UserStatusOffline): await puppet.intent.set_presence("offline") else: self.log.warning("Unexpected user status update: %s", update) return def get_message_details(self, update): if isinstance(update, UpdateShortChatMessage): portal = po.Portal.get_by_tgid(update.chat_id, peer_type="chat") sender = pu.Puppet.get(update.from_id) elif isinstance(update, UpdateShortMessage): portal = po.Portal.get_by_tgid(update.user_id, self.tgid, "user") sender = pu.Puppet.get(self.tgid if update.out else update.user_id) elif isinstance(update, (UpdateNewMessage, UpdateNewChannelMessage, UpdateEditMessage, UpdateEditChannelMessage)): update = update.message if isinstance(update.to_id, PeerUser) and not update.out: portal = po.Portal.get_by_tgid(update.from_id, peer_type="user", tg_receiver=self.tgid) else: portal = po.Portal.get_by_entity(update.to_id, receiver_id=self.tgid) sender = pu.Puppet.get(update.from_id) if update.from_id else None else: self.log.warning( f"Unexpected message type in User#get_message_details: {type(update)}") return update, None, None return update, sender, portal def update_message(self, original_update): update, sender, portal = self.get_message_details(original_update) if isinstance(update, MessageService): if isinstance(update.action, MessageActionChannelMigrateFrom): self.log.debug(f"Ignoring action %s to %s by %d", update.action, portal.tgid_log, sender.id) return self.log.debug("Handling action %s to %s by %d", update.action, portal.tgid_log, sender.id) return portal.handle_telegram_action(self, sender, update.action) user = sender.tgid if sender else "admin" if isinstance(original_update, (UpdateEditMessage, UpdateEditChannelMessage)): self.log.debug("Handling edit %s to %s by %s", update, portal.tgid_log, user) return portal.handle_telegram_edit(self, sender, update) self.log.debug("Handling message %s to %s by %s", update, portal.tgid_log, user) return portal.handle_telegram_message(self, sender, update) # endregion # region Class instance lookup @classmethod def get_by_mxid(cls, mxid, create=True): try: return cls.by_mxid[mxid] except KeyError: pass user = DBUser.query.get(mxid) if user: user = cls.from_db(user) # asyncio.ensure_future(user.start(), loop=cls.loop) return user if create: user = cls(mxid) cls.db.add(user.to_db()) cls.db.commit() # asyncio.ensure_future(user.start(), loop=cls.loop) return user return None @classmethod def get_by_tgid(cls, tgid): try: return cls.by_tgid[tgid] except KeyError: pass user = DBUser.query.filter(DBUser.tgid == tgid).one_or_none() if user: user = cls.from_db(user) # asyncio.ensure_future(user.start(), loop=cls.loop) return user return None @classmethod def find_by_username(cls, username): for _, user in cls.by_tgid.items(): if user.username == username: return user puppet = DBUser.query.filter(DBUser.tg_username == username).one_or_none() if puppet: return cls.from_db(puppet) return None # endregion def init(context): global config config = context.config users = [User.from_db(user) for user in DBUser.query.all()] return [user.start(delete_unless_authenticated=True) for user in users]