Add QR login command. Fixes #399

Requires LonamiWebs/Telethon#1494 until it's merged, then requires using
the master branch of Telethon until a release is made.
This commit is contained in:
Tulir Asokan
2020-06-24 15:04:49 +03:00
parent 3fa6ed74e5
commit a29d9cf4ff
3 changed files with 106 additions and 20 deletions
+99 -17
View File
@@ -15,6 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Any, Dict, Optional
import asyncio
import io
from telethon.errors import ( # isort: skip
AccessTokenExpiredError, AccessTokenInvalidError, FirstNameInvalidError, FloodWaitError,
@@ -22,13 +23,24 @@ from telethon.errors import ( # isort: skip
PhoneNumberAppSignupForbiddenError, PhoneNumberBannedError, PhoneNumberFloodError,
PhoneNumberOccupiedError, PhoneNumberUnoccupiedError, SessionPasswordNeededError,
PhoneNumberInvalidError)
from telethon.tl.types import User
from mautrix.types import EventID
from mautrix.types import (EventID, UserID, MediaMessageEventContent, ImageInfo, MessageType,
TextMessageEventContent)
from ... import user as u
from ...types import TelegramID
from ...commands import command_handler, CommandEvent, SECTION_AUTH
from ...util import format_duration
try:
import qrcode
import PIL as _
from telethon.tl.custom import QRLogin
except ImportError:
qrcode = None
QRLogin = None
@command_handler(needs_auth=False,
help_section=SECTION_AUTH,
@@ -104,18 +116,76 @@ async def enter_code_register(evt: CommandEvent) -> EventID:
"Check console for more details.")
@command_handler(needs_auth=False, management_only=True, help_section=SECTION_AUTH,
help_text="Log in by scanning a QR code.")
async def login_qr(evt: CommandEvent) -> EventID:
login_as = evt.sender
if len(evt.args) > 0 and evt.sender.is_admin:
login_as = u.User.get_by_mxid(UserID(evt.args[0]))
if not qrcode or not QRLogin:
return await evt.reply("This bridge instance does not support logging in with a QR code.")
if await login_as.is_logged_in():
return await evt.reply(f"You are already logged in as {login_as.human_tg_id}.")
await login_as.ensure_started(even_if_no_session=True)
qr_login = QRLogin(login_as.client, ignored_ids=[])
qr_event_id: Optional[EventID] = None
async def upload_qr() -> None:
nonlocal qr_event_id
buffer = io.BytesIO()
image = qrcode.make(qr_login.url)
size = image.pixel_size
image.save(buffer, "PNG")
qr = buffer.getvalue()
mxc = await evt.az.intent.upload_media(qr, "image/png", "login-qr.png", len(qr))
content = MediaMessageEventContent(body=qr_login.url, url=mxc, msgtype=MessageType.IMAGE,
info=ImageInfo(mimetype="image/png", size=len(qr),
width=size, height=size))
if qr_event_id:
content.set_edit(qr_event_id)
await evt.az.intent.send_message(evt.room_id, content)
else:
content.set_reply(evt.event_id)
qr_event_id = await evt.az.intent.send_message(evt.room_id, content)
retries = 4
while retries > 0:
await qr_login.recreate()
await upload_qr()
try:
user = await qr_login.wait()
break
except asyncio.TimeoutError:
retries -= 1
except SessionPasswordNeededError:
evt.sender.command_status = {
"next": enter_password,
"login_as": login_as if login_as != evt.sender else None,
"action": "Login (password entry)",
}
return await evt.reply("Your account has two-factor authentication. "
"Please send your password here.")
else:
timeout = TextMessageEventContent(body="Login timed out", msgtype=MessageType.TEXT)
timeout.set_edit(qr_event_id)
return await evt.az.intent.send_message(evt.room_id, timeout)
return await _finish_sign_in(evt, user, login_as=login_as)
@command_handler(needs_auth=False, management_only=True,
help_section=SECTION_AUTH,
help_text="Get instructions on how to log in.")
async def login(evt: CommandEvent) -> EventID:
override_sender = False
if len(evt.args) > 0 and evt.sender.is_admin:
evt.sender = await u.User.get_by_mxid(evt.args[0]).ensure_started()
evt.sender = await u.User.get_by_mxid(UserID(evt.args[0])).ensure_started()
override_sender = True
if await evt.sender.is_logged_in():
return await evt.reply(f"You are already logged in as {evt.sender.human_tg_id}.")
allow_matrix_login = evt.config.get("bridge.allow_matrix_login", True)
allow_matrix_login = evt.config["bridge.allow_matrix_login"]
if allow_matrix_login and not override_sender:
evt.sender.command_status = {
"next": enter_phone_or_token,
@@ -225,7 +295,8 @@ async def enter_password(evt: CommandEvent) -> Optional[EventID]:
return await evt.reply("This bridge instance does not allow in-Matrix login. "
"Please use `$cmdprefix+sp login` to get login instructions")
try:
await _sign_in(evt, password=" ".join(evt.args))
await _sign_in(evt, login_as=evt.sender.command_status.get("login_as", None),
password=" ".join(evt.args))
except AccessTokenInvalidError:
return await evt.reply("That bot token is not valid.")
except AccessTokenExpiredError:
@@ -237,20 +308,12 @@ async def enter_password(evt: CommandEvent) -> Optional[EventID]:
return None
async def _sign_in(evt: CommandEvent, **sign_in_info) -> EventID:
async def _sign_in(evt: CommandEvent, login_as: 'u.User' = None, **sign_in_info) -> EventID:
login_as = login_as or evt.sender
try:
await evt.sender.ensure_started(even_if_no_session=True)
user = await evt.sender.client.sign_in(**sign_in_info)
existing_user = u.User.get_by_tgid(user.id)
if existing_user and existing_user != evt.sender:
await existing_user.log_out()
await evt.reply(f"[{existing_user.displayname}]"
f"(https://matrix.to/#/{existing_user.mxid})"
" was logged out from the account.")
asyncio.ensure_future(evt.sender.post_login(user, first_login=True), loop=evt.loop)
evt.sender.command_status = None
name = f"@{user.username}" if user.username else f"+{user.phone}"
return await evt.reply(f"Successfully logged in as {name}")
await login_as.ensure_started(even_if_no_session=True)
user = await login_as.client.sign_in(**sign_in_info)
await _finish_sign_in(evt, user)
except PhoneCodeExpiredError:
return await evt.reply("Phone code expired. Try again with `$cmdprefix+sp login`.")
except PhoneCodeInvalidError:
@@ -266,6 +329,25 @@ async def _sign_in(evt: CommandEvent, **sign_in_info) -> EventID:
"Please send your password here.")
async def _finish_sign_in(evt: CommandEvent, user: User, login_as: 'u.User' = None) -> EventID:
login_as = login_as or evt.sender
existing_user = u.User.get_by_tgid(TelegramID(user.id))
if existing_user and existing_user != login_as:
await existing_user.log_out()
await evt.reply(f"[{existing_user.displayname}]"
f"(https://matrix.to/#/{existing_user.mxid})"
" was logged out from the account.")
asyncio.ensure_future(login_as.post_login(user, first_login=True), loop=evt.loop)
evt.sender.command_status = None
name = f"@{user.username}" if user.username else f"+{user.phone}"
if login_as != evt.sender:
msg = (f"Successfully logged in [{login_as.mxid}](https://matrix.to/#/{login_as.mxid})"
f" as {name}")
else:
msg = f"Successfully logged in as {name}"
return await evt.reply(msg)
@command_handler(needs_auth=True,
help_section=SECTION_AUTH,
help_text="Log out from Telegram.")
+2 -2
View File
@@ -151,8 +151,8 @@ bridge:
# Whether or not to automatically sync the Matrix room state (mostly unpuppeted displaynames)
# at startup and when creating a bridge.
sync_matrix_state: true
# Allow logging in within Matrix. If false, the only way to log in is using the out-of-Matrix
# login website (see appservice.public config section)
# Allow logging in within Matrix. If false, users can only log in using login-qr or the
# out-of-Matrix login website (see appservice.public config section)
allow_matrix_login: true
# Whether or not to bridge plaintext highlights.
# Only enable this if your displayname_template has some static part that the bridge can use to
+5 -1
View File
@@ -8,7 +8,11 @@ aiodns
brotli
#/webp_convert
pillow>=4.3,<8
pillow>=4,<8
#/qr_login
pillow>=4,<8
qrcode>=6,<7
#/hq_thumbnails
moviepy>=1,<2