diff --git a/mautrix_telegram/web/provisioning/__init__.py b/mautrix_telegram/web/provisioning/__init__.py index 21759ccf..c52101d6 100644 --- a/mautrix_telegram/web/provisioning/__init__.py +++ b/mautrix_telegram/web/provisioning/__init__.py @@ -21,6 +21,8 @@ import json import logging from aiohttp import web +from telethon.errors import SessionPasswordNeededError +from telethon.tl.custom import QRLogin from telethon.tl.functions.messages import GetAllStickersRequest from telethon.tl.types import ChannelForbidden, ChatForbidden, TypeChat, User as TLUser from telethon.utils import get_peer_id, resolve_id @@ -78,6 +80,7 @@ class ProvisioningAPI(AuthAPI): self.app.router.add_route("POST", f"{user_prefix}/retry_takeout", self.retry_takeout) self.app.router.add_route("POST", f"{user_prefix}/logout", self.logout) + self.app.router.add_route("GET", f"{user_prefix}/login/qr", self.login_qr) self.app.router.add_route("POST", f"{user_prefix}/login/bot_token", self.send_bot_token) self.app.router.add_route("POST", f"{user_prefix}/login/request_code", self.request_code) self.app.router.add_route("POST", f"{user_prefix}/login/send_code", self.send_code) @@ -528,6 +531,36 @@ class ProvisioningAPI(AuthAPI): user.takeout_retry_immediate.set() return web.json_response({}, status=200) + async def login_qr(self, request: web.Request) -> web.Response: + _, user, err = await self.get_user_request_info(request, websocket=True) + if err is not None: + return err + + await user.ensure_started(even_if_no_session=True) + qr_login = QRLogin(user.client, ignored_ids=[]) + + ws = web.WebSocketResponse(protocols=["net.maunium.telegram.login"]) + await ws.prepare(request) + + user_info = None + try: + await qr_login.recreate() + await ws.send_json({"code": qr_login.url, "timeout": 30}) + user_info = await qr_login.wait() + except asyncio.TimeoutError: + await ws.send_json({"success": False, "error": "timeout"}) + await ws.close() + return ws + except SessionPasswordNeededError: + await ws.send_json({"success": False, "error": "password-needed"}) + await ws.close() + return ws + + await self.postprocess_login(user, user_info) + await ws.send_json({"success": True}) + await ws.close() + return ws + async def send_bot_token(self, request: web.Request) -> web.Response: data, user, err = await self.get_user_request_info(request) if err is not None: @@ -653,6 +686,15 @@ class ProvisioningAPI(AuthAPI): ) return None + def check_websocket_authorization(self, request: web.Request) -> web.Response | None: + auth_parts = request.headers.get("Sec-WebSocket-Protocol").split(",") + for part in auth_parts: + if part.strip() == f"net.maunium.telegram.auth-{self.secret}": + return None + return self.get_error_response( + error="Shared secret is not valid.", errcode="shared_secret_invalid", status=401 + ) + @staticmethod async def get_data(request: web.Request) -> dict | None: try: @@ -707,8 +749,12 @@ class ProvisioningAPI(AuthAPI): expect_logged_in: bool | None = False, require_puppeting: bool = False, want_data: bool = True, + websocket: bool = False, ) -> tuple[dict | None, User | None, web.Response | None]: - err = self.check_authorization(request) + if not websocket: + err = self.check_authorization(request) + else: + err = self.check_websocket_authorization(request) if err is not None: return None, None, err