# -*- coding: future_fstrings -*- # mautrix-telegram - A Matrix-Telegram puppeting bridge # Copyright (C) 2019 Tulir Asokan # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from abc import abstractmethod from typing import Optional from aiohttp import web import abc import asyncio import logging from telethon.errors import * from ...commands.telegram.auth import enter_password from ...util import format_duration, ignore_coro from ...puppet import Puppet, PuppetError from ...user import User class AuthAPI(abc.ABC): log = logging.getLogger("mau.web.auth") # type: logging.Logger def __init__(self, loop: asyncio.AbstractEventLoop): self.loop = loop # type: asyncio.AbstractEventLoop @abstractmethod def get_login_response(self, status: int = 200, state: str = "", username: str = "", phone: str = "", human_tg_id: str = "", mxid: str = "", message: str = "", error: str = "", errcode: str = "") -> web.Response: raise NotImplementedError() @abstractmethod def get_mx_login_response(self, status: int = 200, state: str = "", username: str = "", phone: str = "", human_tg_id: str = "", mxid: str = "", message: str = "", error: str = "", errcode: str = "" ) -> web.Response: raise NotImplementedError() async def post_matrix_token(self, user: User, token: str) -> web.Response: puppet = Puppet.get(user.tgid) if puppet.is_real_user: return self.get_mx_login_response(state="already-logged-in", status=409, error="You have already logged in with your Matrix " "account.", errcode="already-logged-in") resp = await puppet.switch_mxid(token, user.mxid) if resp == PuppetError.OnlyLoginSelf: return self.get_mx_login_response(status=403, errcode="only-login-self", error="You can only log in as your own Matrix user.") elif resp == PuppetError.InvalidAccessToken: return self.get_mx_login_response(status=401, errcode="invalid-access-token", error="Failed to verify access token.") assert resp == PuppetError.Success, "Encountered an unhandled PuppetError." return self.get_mx_login_response(mxid=user.mxid, status=200, state="logged-in") async def post_matrix_password(self, user: User, password: str) -> web.Response: return self.get_mx_login_response(mxid=user.mxid, status=501, error="Not yet implemented", errcode="not-yet-implemented") async def post_login_phone(self, user: User, phone: str) -> web.Response: try: await user.client.sign_in(phone or "+123") return self.get_login_response(mxid=user.mxid, state="code", status=200, message="Code requested successfully.") except PhoneNumberInvalidError: return self.get_login_response(mxid=user.mxid, state="request", status=400, errcode="phone_number_invalid", error="Invalid phone number.") except PhoneNumberBannedError: return self.get_login_response(mxid=user.mxid, state="request", status=403, errcode="phone_number_banned", error="Your phone number is banned from Telegram.") except PhoneNumberAppSignupForbiddenError: return self.get_login_response(mxid=user.mxid, state="request", status=403, errcode="phone_number_app_signup_forbidden", error="You have disabled 3rd party apps on your " "account.") except PhoneNumberUnoccupiedError: return self.get_login_response(mxid=user.mxid, state="request", status=404, errcode="phone_number_unoccupied", error="That phone number has not been registered.") except PhoneNumberFloodError: return self.get_login_response( mxid=user.mxid, state="request", status=429, errcode="phone_number_flood", error="Your phone number has been temporarily blocked for flooding. " "The ban is usually applied for around a day.") except FloodWaitError as e: return self.get_login_response( mxid=user.mxid, state="request", status=429, errcode="flood_wait", error="Your phone number has been temporarily blocked for flooding. " f"Please wait for {format_duration(e.seconds)} before trying again.") except Exception: self.log.exception("Error requesting phone code") return self.get_login_response(mxid=user.mxid, state="request", status=500, errcode="unknown_error", error="Internal server error while requesting code.") async def postprocess_login(self, user: User, user_info) -> None: existing_user = User.get_by_tgid(user_info.id) if existing_user and existing_user != user: await existing_user.log_out() ignore_coro(asyncio.ensure_future(user.post_login(user_info), loop=self.loop)) if user.command_status and user.command_status["action"] == "Login": user.command_status = None async def post_login_token(self, user: User, token: str) -> web.Response: try: user_info = await user.client.sign_in(bot_token=token) await self.postprocess_login(user, user_info) return self.get_login_response(mxid=user.mxid, state="logged-in", status=200, username=user_info.username, phone=None, human_tg_id=f"@{user_info.username}") except AccessTokenInvalidError: return self.get_login_response(mxid=user.mxid, state="token", status=401, errcode="bot_token_invalid", error="Bot token invalid.") except AccessTokenExpiredError: return self.get_login_response(mxid=user.mxid, state="token", status=403, errcode="bot_token_expired", error="Bot token expired.") except Exception: self.log.exception("Error sending bot token") return self.get_login_response(mxid=user.mxid, state="token", status=500, error="Internal server error while sending token.") async def post_login_code(self, user: User, code: int, password_in_data: bool ) -> Optional[web.Response]: try: user_info = await user.client.sign_in(code=code) await self.postprocess_login(user, user_info) human_tg_id = f"@{user_info.username}" if user_info.username else f"+{user_info.phone}" return self.get_login_response(mxid=user.mxid, state="logged-in", status=200, username=user_info.username, phone=user_info.phone, human_tg_id=human_tg_id) except PhoneCodeInvalidError: return self.get_login_response(mxid=user.mxid, state="code", status=401, errcode="phone_code_invalid", error="Incorrect phone code.") except PhoneCodeExpiredError: return self.get_login_response(mxid=user.mxid, state="code", status=403, errcode="phone_code_expired", error="Phone code expired.") except SessionPasswordNeededError: if not password_in_data: if user.command_status and user.command_status["action"] == "Login": user.command_status = { "next": enter_password, "action": "Login (password entry)", } return self.get_login_response( mxid=user.mxid, state="password", status=202, message="Code accepted, but you have 2-factor authentication is enabled.") return None except Exception: self.log.exception("Error sending phone code") return self.get_login_response(mxid=user.mxid, state="code", status=500, errcode="unknown_error", error="Internal server error while sending code.") async def post_login_password(self, user: User, password: str) -> web.Response: try: user_info = await user.client.sign_in(password=password) await self.postprocess_login(user, user_info) human_tg_id = f"@{user_info.username}" if user_info.username else f"+{user_info.phone}" return self.get_login_response(mxid=user.mxid, state="logged-in", status=200, username=user_info.username, phone=user_info.phone, human_tg_id=human_tg_id) except PasswordEmptyError: return self.get_login_response(mxid=user.mxid, state="password", status=400, errcode="password_empty", error="Empty password.") except PasswordHashInvalidError: return self.get_login_response(mxid=user.mxid, state="password", status=401, errcode="password_invalid", error="Incorrect password.") except Exception: self.log.exception("Error sending password") return self.get_login_response(mxid=user.mxid, state="password", status=500, errcode="unknown_error", error="Internal server error while sending password.")