# mautrix-telegram - A Matrix-Telegram puppeting bridge # Copyright (C) 2021 Tulir Asokan # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from __future__ import annotations import asyncio import logging import random import string import time from aiohttp import web from mako.template import Template import pkg_resources from mautrix.types import UserID from mautrix.util.signed_token import sign_token, verify_token from ...puppet import Puppet from ...user import User from ..common import AuthAPI class PublicBridgeWebsite(AuthAPI): log: logging.Logger = logging.getLogger("mau.web.public") secret_key: str login: Template mx_login: Template app: web.Application def __init__(self, loop: asyncio.AbstractEventLoop): super().__init__(loop) self.secret_key = "".join(random.choices(string.ascii_lowercase + string.digits, k=64)) self.login = Template( pkg_resources.resource_string("mautrix_telegram", "web/public/login.html.mako") ) self.mx_login = Template( pkg_resources.resource_string("mautrix_telegram", "web/public/matrix-login.html.mako") ) self.app = web.Application(loop=loop) self.app.router.add_route("GET", "/login", self.get_login) self.app.router.add_route("POST", "/login", self.post_login) self.app.router.add_route("GET", "/matrix-login", self.get_matrix_login) self.app.router.add_route("POST", "/matrix-login", self.post_matrix_login) self.app.router.add_static( "/", pkg_resources.resource_filename("mautrix_telegram", "web/public/") ) def make_token(self, mxid: str, endpoint: str = "/login", expires_in: int = 900) -> str: return sign_token( self.secret_key, { "mxid": mxid, "endpoint": endpoint, "expiry": int(time.time()) + expires_in, }, ) def verify_token(self, token: str, endpoint: str = "/login") -> UserID | None: token = verify_token(self.secret_key, token) if token and ( token.get("expiry", 0) > int(time.time()) and token.get("endpoint", None) == endpoint ): return UserID(token.get("mxid", None)) return None async def get_login(self, request: web.Request) -> web.Response: state = "bot_token" if request.rel_url.query.get("mode", "") == "bot" else "request" mxid = self.verify_token(request.rel_url.query.get("token", None), endpoint="/login") if not mxid: return self.get_login_response(status=401, state="invalid-token") user = await User.get_by_mxid(mxid, create=False) if mxid else None if not user: return self.get_login_response(mxid=mxid, state=state) elif not user.puppet_whitelisted: return self.get_login_response( mxid=user.mxid, error="You are not whitelisted.", status=403 ) await user.ensure_started() if not await user.is_logged_in(): return self.get_login_response(mxid=user.mxid, state=state) return self.get_login_response(mxid=user.mxid, human_tg_id=user.human_tg_id) async def get_matrix_login(self, request: web.Request) -> web.Response: mxid = self.verify_token( request.rel_url.query.get("token", None), endpoint="/matrix-login" ) if not mxid: return self.get_mx_login_response(status=401, state="invalid-token") user = await User.get_by_mxid(mxid, create=False) if mxid else None if not user: return self.get_mx_login_response(mxid=mxid) elif not user.puppet_whitelisted: return self.get_mx_login_response( mxid=user.mxid, error="You are not whitelisted.", status=403 ) await user.ensure_started() if not await user.is_logged_in(): return self.get_mx_login_response( mxid=user.mxid, status=403, error="You are not logged in to Telegram." ) puppet = await Puppet.get_by_tgid(user.tgid) if puppet.is_real_user: return self.get_mx_login_response(state="already-logged-in", status=409) return self.get_mx_login_response(mxid=user.mxid) def get_login_response( self, status: int = 200, state: str = "", username: str = "", phone: str = "", human_tg_id: str = "", mxid: str = "", message: str = "", error: str = "", errcode: str = "", ) -> web.Response: return web.Response( status=status, content_type="text/html", text=self.login.render( human_tg_id=human_tg_id, state=state, error=error, message=message, mxid=mxid ), ) def get_mx_login_response( self, status: int = 200, state: str = "", username: str = "", phone: str = "", human_tg_id: str = "", mxid: str = "", message: str = "", error: str = "", errcode: str = "", ) -> web.Response: return web.Response( status=status, content_type="text/html", text=self.mx_login.render( human_tg_id=human_tg_id, state=state, error=error, message=message, mxid=mxid ), ) async def post_matrix_login(self, request: web.Request) -> web.Response: mxid = self.verify_token( request.rel_url.query.get("token", None), endpoint="/matrix-login" ) if not mxid: return self.get_mx_login_response(status=401, state="invalid-token") data = await request.post() user = await User.get_and_start_by_mxid(mxid) if not user.puppet_whitelisted: return self.get_mx_login_response( mxid=user.mxid, error="You are not whitelisted.", status=403 ) elif not await user.is_logged_in(): return self.get_mx_login_response( mxid=user.mxid, status=403, error="You are not logged in to Telegram." ) mode = data.get("mode", "access_token") if mode == "password": return await self.post_matrix_password(user, data["value"]) elif mode == "access_token": return await self.post_matrix_token(user, data["value"]) return self.get_mx_login_response( mxid=user.mxid, status=400, error="You must provide an access token or password." ) async def post_login(self, request: web.Request) -> web.Response: mxid = self.verify_token(request.rel_url.query.get("token", None), endpoint="/login") if not mxid: return self.get_login_response(status=401, state="invalid-token") data = await request.post() user = await User.get_and_start_by_mxid(mxid, even_if_no_session=True) if not user.puppet_whitelisted: return self.get_login_response( mxid=user.mxid, error="You are not whitelisted.", status=403 ) elif await user.is_logged_in(): return self.get_login_response(mxid=user.mxid, human_tg_id=user.human_tg_id) await user.ensure_started(even_if_no_session=True) if "phone" in data: return await self.post_login_phone(user, data["phone"]) elif "bot_token" in data: return await self.post_login_token(user, data["bot_token"]) elif "code" in data: try: code = int(data["code"].strip()) except ValueError: return self.get_login_response( mxid=user.mxid, state="code", status=400, errcode="phone_code_invalid", error="Phone code must be a number.", ) resp = await self.post_login_code(user, code, password_in_data="password" in data) if resp or "password" not in data: return resp elif "password" not in data: return self.get_login_response(error="No data given.", status=400) if "password" in data: return await self.post_login_password(user, data["password"]) return self.get_login_response(error="This should never happen.", status=500)