# -*- coding: future_fstrings -*- # mautrix-telegram - A Matrix-Telegram puppeting bridge # Copyright (C) 2018 Tulir Asokan # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from typing import (Awaitable, Coroutine, Dict, List, Iterable, Optional, Pattern, Union, TYPE_CHECKING) from difflib import SequenceMatcher from enum import Enum from aiohttp import ServerDisconnectedError import asyncio import logging import re from sqlalchemy import orm from telethon.tl.types import UserProfilePhoto, User, FileLocation, UpdateUserName, PeerUser from mautrix_appservice import AppService, IntentAPI, IntentError, MatrixRequestError from .types import MatrixUserID, TelegramID from .db import Puppet as DBPuppet from .util import ignore_coro from . import util if TYPE_CHECKING: from .matrix import MatrixHandler from .config import Config from .context import Context from .abstract_user import AbstractUser PuppetError = Enum('PuppetError', 'Success OnlyLoginSelf InvalidAccessToken') config = None # type: Config class Puppet: log = logging.getLogger("mau.puppet") # type: logging.Logger db = None # type: orm.Session az = None # type: AppService mx = None # type: MatrixHandler loop = None # type: asyncio.AbstractEventLoop mxid_regex = None # type: Pattern username_template = None # type: str hs_domain = None # type: str cache = {} # type: Dict[TelegramID, Puppet] by_custom_mxid = {} # type: Dict[str, Puppet] def __init__(self, id: TelegramID, access_token: Optional[str] = None, custom_mxid: Optional[MatrixUserID] = None, username: Optional[str] = None, displayname: Optional[str] = None, displayname_source: Optional[TelegramID] = None, photo_id: Optional[str] = None, is_bot: bool = False, is_registered: bool = False, db_instance: Optional[DBPuppet] = None) -> None: self.id = id # type: TelegramID self.access_token = access_token # type: Optional[str] self.custom_mxid = custom_mxid # type: Optional[MatrixUserID] self.default_mxid = self.get_mxid_from_id(self.id) # type: MatrixUserID self.username = username # type: Optional[str] self.displayname = displayname # type: Optional[str] self.displayname_source = displayname_source # type: Optional[TelegramID] self.photo_id = photo_id # type: Optional[str] self.is_bot = is_bot # type: bool self.is_registered = is_registered # type: bool self._db_instance = db_instance # type: Optional[DBPuppet] self.default_mxid_intent = self.az.intent.user(self.default_mxid) self.intent = self._fresh_intent() # type: IntentAPI self.cache[id] = self if self.custom_mxid: self.by_custom_mxid[self.custom_mxid] = self @property def mxid(self) -> MatrixUserID: return self.custom_mxid or self.default_mxid @property def tgid(self) -> TelegramID: return self.id @property def is_real_user(self) -> bool: """ Is True when the puppet is a real Matrix user. """ return bool(self.custom_mxid and self.access_token) @staticmethod async def is_logged_in() -> bool: """ Is True if the puppet is logged in. """ return True @property def plain_displayname(self) -> str: tpl = config["bridge.displayname_template"] if tpl == "{displayname}": # Template has no extra stuff, no need to parse. return self.displayname regex = re.compile("^" + re.escape(tpl).replace(re.escape("{displayname}"), "(.+?)") + "$") match = regex.match(self.displayname) return match.group(1) or self.displayname # region Custom puppet management def _fresh_intent(self) -> IntentAPI: return (self.az.intent.user(self.custom_mxid, self.access_token) if self.is_real_user else self.default_mxid_intent) async def switch_mxid(self, access_token: Optional[str], mxid: Optional[MatrixUserID]) -> PuppetError: prev_mxid = self.custom_mxid self.custom_mxid = mxid self.access_token = access_token self.intent = self._fresh_intent() err = await self.init_custom_mxid() if err != PuppetError.Success: return err try: del self.by_custom_mxid[prev_mxid] # type: ignore except KeyError: pass if self.mxid != self.default_mxid: self.by_custom_mxid[self.mxid] = self await self.leave_rooms_with_default_user() self.save() return PuppetError.Success async def init_custom_mxid(self) -> PuppetError: if not self.is_real_user: return PuppetError.Success mxid = await self.intent.whoami() if not mxid or mxid != self.custom_mxid: self.custom_mxid = None self.access_token = None self.intent = self._fresh_intent() if mxid != self.custom_mxid: return PuppetError.OnlyLoginSelf return PuppetError.InvalidAccessToken if config["bridge.sync_with_custom_puppets"]: ignore_coro(asyncio.ensure_future(self.sync(), loop=self.loop)) return PuppetError.Success async def leave_rooms_with_default_user(self) -> None: for room_id in await self.default_mxid_intent.get_joined_rooms(): try: await self.default_mxid_intent.leave_room(room_id) await self.intent.ensure_joined(room_id) except (IntentError, MatrixRequestError): pass def create_sync_filter(self) -> Awaitable[str]: return self.intent.client.create_filter(self.custom_mxid, { "room": { "include_leave": False, "state": { "types": [] }, "timeline": { "types": [], }, "ephemeral": { "types": ["m.typing", "m.receipt"], }, "account_data": { "types": [] } }, "account_data": { "types": [], }, "presence": { "types": ["m.presence"], "senders": [self.custom_mxid], }, }) def filter_events(self, events: List[Dict]) -> List: new_events = [] for event in events: evt_type = event.get("type", None) event.setdefault("content", {}) if evt_type == "m.typing": is_typing = self.custom_mxid in event["content"].get("user_ids", []) event["content"]["user_ids"] = [self.custom_mxid] if is_typing else [] elif evt_type == "m.receipt": val = None evt = None for event_id in event["content"]: try: val = event["content"][event_id]["m.read"][self.custom_mxid] evt = event_id break except KeyError: pass if val and evt: event["content"] = {evt: {"m.read": { self.custom_mxid: val }}} else: continue new_events.append(event) return new_events def handle_sync(self, presence: List, ephemeral: Dict) -> None: presence_events = [self.mx.try_handle_event(event) for event in presence] for room_id, events in ephemeral.items(): for event in events: event["room_id"] = room_id ephemeral_events = [self.mx.try_handle_event(event) for events in ephemeral.values() for event in self.filter_events(events)] events = ephemeral_events + presence_events # List[Callable[[int], Awaitable[None]]] coro = asyncio.gather(*events, loop=self.loop) asyncio.ensure_future(coro, loop=self.loop) async def sync(self) -> None: try: await self._sync() except Exception: self.log.exception("Fatal error syncing") async def _sync(self) -> None: if not self.is_real_user: self.log.warning("Called sync() for non-custom puppet.") return custom_mxid = self.custom_mxid access_token_at_start = self.access_token errors = 0 next_batch = None filter_id = await self.create_sync_filter() self.log.debug(f"Starting syncer for {custom_mxid} with sync filter {filter_id}.") while access_token_at_start == self.access_token: try: sync_resp = await self.intent.client.sync(filter=filter_id, since=next_batch, set_presence="offline") # type: Dict errors = 0 if next_batch is not None: presence = sync_resp.get("presence", {}).get("events", []) # type: List ephemeral = {room: data.get("ephemeral", {}).get("events", []) for room, data in sync_resp.get("rooms", {}).get("join", {}).items() } # type: Dict self.handle_sync(presence, ephemeral) next_batch = sync_resp.get("next_batch", None) except (MatrixRequestError, ServerDisconnectedError) as e: wait = min(errors, 11) ** 2 self.log.warning(f"Syncer for {custom_mxid} errored: {e}. " f"Waiting for {wait} seconds...") errors += 1 await asyncio.sleep(wait) self.log.debug(f"Syncer for custom puppet {custom_mxid} stopped.") # endregion # region DB conversion @property def db_instance(self) -> DBPuppet: if not self._db_instance: self._db_instance = self.new_db_instance() return self._db_instance def new_db_instance(self) -> DBPuppet: return DBPuppet(id=self.id, access_token=self.access_token, custom_mxid=self.custom_mxid, username=self.username, displayname=self.displayname, displayname_source=self.displayname_source, photo_id=self.photo_id, is_bot=self.is_bot, matrix_registered=self.is_registered) @classmethod def from_db(cls, db_puppet: DBPuppet) -> 'Puppet': return Puppet(db_puppet.id, db_puppet.access_token, db_puppet.custom_mxid, db_puppet.username, db_puppet.displayname, db_puppet.displayname_source, db_puppet.photo_id, db_puppet.is_bot, db_puppet.matrix_registered, db_instance=db_puppet) def save(self) -> None: self.db_instance.access_token = self.access_token self.db_instance.custom_mxid = self.custom_mxid self.db_instance.username = self.username self.db_instance.displayname = self.displayname self.db_instance.displayname_source = self.displayname_source self.db_instance.photo_id = self.photo_id self.db_instance.is_bot = self.is_bot self.db_instance.matrix_registered = self.is_registered self.db.commit() # endregion # region Info updating def similarity(self, query: str) -> int: username_similarity = (SequenceMatcher(None, self.username, query).ratio() if self.username else 0) displayname_similarity = (SequenceMatcher(None, self.displayname, query).ratio() if self.displayname else 0) similarity = max(username_similarity, displayname_similarity) return int(round(similarity * 100)) @staticmethod def get_displayname(info: User, enable_format: bool = True) -> str: data = { "phone number": info.phone if hasattr(info, "phone") else None, "username": info.username, "full name": " ".join([info.first_name or "", info.last_name or ""]).strip(), "full name reversed": " ".join([info.first_name or "", info.last_name or ""]).strip(), "first name": info.first_name, "last name": info.last_name, } preferences = config["bridge.displayname_preference"] name = None for preference in preferences: name = data[preference] if name: break if isinstance(info, User) and info.deleted: name = f"Deleted account {info.id}" elif not name: name = info.id if not enable_format: return name return config["bridge.displayname_template"].format( displayname=name) async def update_info(self, source: 'AbstractUser', info: User) -> None: changed = False if self.username != info.username: self.username = info.username changed = True changed = await self.update_displayname(source, info) or changed if isinstance(info.photo, UserProfilePhoto): changed = await self.update_avatar(source, info.photo.photo_big) or changed self.is_bot = info.bot if changed: self.save() async def update_displayname(self, source: 'AbstractUser', info: Union[User, UpdateUserName] ) -> bool: ignore_source = (not source.is_relaybot and self.displayname_source is not None and self.displayname_source != source.tgid) if ignore_source: return False if isinstance(info, UpdateUserName): info = await source.client.get_entity(PeerUser(self.tgid)) displayname = self.get_displayname(info) if displayname != self.displayname: await self.default_mxid_intent.set_display_name(displayname) self.displayname = displayname self.displayname_source = source.tgid return True elif source.is_relaybot or self.displayname_source is None: self.displayname_source = source.tgid return True return False async def update_avatar(self, source: 'AbstractUser', photo: FileLocation) -> bool: photo_id = f"{photo.volume_id}-{photo.local_id}" if self.photo_id != photo_id: file = await util.transfer_file_to_matrix(source.client, self.default_mxid_intent, photo) if file: await self.default_mxid_intent.set_avatar(file.mxc) self.photo_id = photo_id return True return False # endregion # region Getters @classmethod def get(cls, tgid: TelegramID, create: bool = True) -> Optional['Puppet']: try: return cls.cache[tgid] except KeyError: pass puppet = DBPuppet.get_by_tgid(tgid) if puppet: return cls.from_db(puppet) if create: puppet = cls(tgid) cls.db.add(puppet.db_instance) cls.db.commit() return puppet return None @classmethod def get_by_mxid(cls, mxid: MatrixUserID, create: bool = True) -> Optional['Puppet']: tgid = cls.get_id_from_mxid(mxid) if tgid: return cls.get(tgid, create) return None @classmethod def get_by_custom_mxid(cls, mxid: MatrixUserID) -> Optional['Puppet']: if not mxid: raise ValueError("Matrix ID can't be empty") try: return cls.by_custom_mxid[mxid] except KeyError: pass puppet = DBPuppet.get_by_custom_mxid(mxid) if puppet: puppet = cls.from_db(puppet) return puppet return None @classmethod def all_with_custom_mxid(cls) -> Iterable['Puppet']: return (cls.by_custom_mxid[puppet.mxid] if puppet.custom_mxid in cls.by_custom_mxid else cls.from_db(puppet) for puppet in DBPuppet.all_with_custom_mxid()) @classmethod def get_id_from_mxid(cls, mxid: MatrixUserID) -> Optional[TelegramID]: match = cls.mxid_regex.match(mxid) if match: return TelegramID(int(match.group(1))) return None @classmethod def get_mxid_from_id(cls, tgid: TelegramID) -> MatrixUserID: return MatrixUserID(f"@{cls.username_template.format(userid=tgid)}:{cls.hs_domain}") @classmethod def find_by_username(cls, username: str) -> Optional['Puppet']: if not username: return None for _, puppet in cls.cache.items(): if puppet.username and puppet.username.lower() == username.lower(): return puppet dbpuppet = DBPuppet.get_by_username(username) if dbpuppet: return cls.from_db(dbpuppet) return None @classmethod def find_by_displayname(cls, displayname: str) -> Optional['Puppet']: if not displayname: return None for _, puppet in cls.cache.items(): if puppet.displayname and puppet.displayname == displayname: return puppet dbpuppet = DBPuppet.get_by_displayname(displayname) if dbpuppet: return cls.from_db(dbpuppet) return None # endregion def init(context: 'Context') -> List[Coroutine]: # [None, None, PuppetError] global config Puppet.az, Puppet.db, config, Puppet.loop, _ = context.core Puppet.mx = context.mx Puppet.username_template = config.get("bridge.username_template", "telegram_{userid}") Puppet.hs_domain = config["homeserver"]["domain"] Puppet.mxid_regex = re.compile( f"@{Puppet.username_template.format(userid='([0-9]+)')}:{Puppet.hs_domain}") return [puppet.init_custom_mxid() for puppet in Puppet.all_with_custom_mxid()]