From 2cf9205cdae9a97f2410855416d4f602de40c591 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Sun, 14 Aug 2022 14:07:48 +0300 Subject: [PATCH] Add command to ban relaybot users from Telegram Fixes #357 Closes #819 --- mautrix_telegram/bot.py | 233 ++++++++++++++++++++++++++++------------ 1 file changed, 162 insertions(+), 71 deletions(-) diff --git a/mautrix_telegram/bot.py b/mautrix_telegram/bot.py index 8d2ea36e..bc9760f7 100644 --- a/mautrix_telegram/bot.py +++ b/mautrix_telegram/bot.py @@ -1,5 +1,5 @@ # mautrix-telegram - A Matrix-Telegram puppeting bridge -# Copyright (C) 2021 Tulir Asokan +# Copyright (C) 2022 Tulir Asokan # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -13,8 +13,11 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from typing import Awaitable, Callable, Dict, List, Optional, Tuple +from __future__ import annotations + +from typing import Awaitable, Callable, Literal import logging +import time from telethon.errors import ChannelInvalidError, ChannelPrivateError from telethon.tl.functions.channels import GetChannelsRequest, GetParticipantRequest @@ -35,31 +38,58 @@ from telethon.tl.types import ( PeerChannel, PeerChat, PeerUser, + TypeChannelParticipant, + TypeChatParticipant, + TypeInputPeer, TypePeer, UpdateNewChannelMessage, UpdateNewMessage, User, ) +from telethon.utils import add_surrogate, del_surrogate +from mautrix.errors import MBadState, MForbidden from mautrix.types import UserID from . import portal as po, puppet as pu, user as u from .abstract_user import AbstractUser -from .db import BotChat +from .db import BotChat, Message as DBMessage from .types import TelegramID ReplyFunc = Callable[[str], Awaitable[Message]] +TelegramAdminPermission = Literal[ + "change_info", + "post_messages", + "edit_messages", + "delete_messages", + "ban_users", + "invite_users", + "pin_messages", + "add_admins", + "anonymous", + "manage_call", + "other", +] class Bot(AbstractUser): log: logging.Logger = logging.getLogger("mau.user.bot") token: str - chats: Dict[int, str] - tg_whitelist: List[int] + chats: dict[int, str] + tg_whitelist: list[int] whitelist_group_admins: bool - _me_info: Optional[User] - _me_mxid: Optional[UserID] + _me_info: User | None + _me_mxid: UserID | None + _admin_cache: dict[ + tuple[int, int], + tuple[ChatParticipantAdmin | ChatParticipantCreator | None, float], + ] + required_permissions: dict[str, TelegramAdminPermission] = { + "portal": None, + "invite": "invite_users", + "mxban": "ban_users", + } def __init__(self, token: str) -> None: super().__init__() @@ -73,6 +103,7 @@ class Bot(AbstractUser): self.is_relaybot = True self.is_bot = True self.chats = {} + self._admin_cache = {} self.tg_whitelist = [] self.whitelist_group_admins = ( self.config["bridge.relaybot.whitelist_group_admins"] or False @@ -80,7 +111,7 @@ class Bot(AbstractUser): self._me_info = None self._me_mxid = None - async def get_me(self, use_cache: bool = True) -> Tuple[User, UserID]: + async def get_me(self, use_cache: bool = True) -> tuple[User, UserID]: if not use_cache or not self._me_mxid: self._me_info = await self.client.get_me() self._me_mxid = pu.Puppet.get_mxid_from_id(TelegramID(self._me_info.id)) @@ -98,7 +129,7 @@ class Bot(AbstractUser): if isinstance(user_id, int): self.tg_whitelist.append(user_id) - async def start(self, delete_unless_authenticated: bool = False) -> "Bot": + async def start(self, delete_unless_authenticated: bool = False) -> Bot: self.chats = {chat.id: chat.type for chat in await BotChat.all()} await super().start(delete_unless_authenticated) if not await self.is_logged_in(): @@ -148,7 +179,44 @@ class Bot(AbstractUser): pass await BotChat.delete_by_id(chat_id) - async def _can_use_commands(self, chat: TypePeer, tgid: TelegramID) -> bool: + async def _get_admin_participant( + self, chat: TypePeer | TypeInputPeer, tgid: TelegramID + ) -> TypeChatParticipant | TypeChannelParticipant | None: + chan_id = chat.channel_id if isinstance(chat, PeerChannel) else chat.chat_id + try: + cached, created = self._admin_cache[chan_id, tgid] + if created + 60 < time.time(): + return cached + except KeyError: + pass + if isinstance(chat, PeerChannel): + p = await self.client(GetParticipantRequest(chat, tgid)) + pcp = p.participant + self._admin_cache[chat.channel_id, tgid] = (pcp, time.time()) + return pcp + elif isinstance(chat, PeerChat): + chat = await self.client(GetFullChatRequest(chat.chat_id)) + participants = chat.full_chat.participants.participants + for p in participants: + self._admin_cache[chat.channel_id, tgid] = (p, time.time()) + if p.user_id == tgid: + return p + return None + + @staticmethod + def _has_participant_permission( + pcp: TypeChatParticipant | TypeChannelParticipant | None, + permission: TelegramAdminPermission | None, + ) -> bool: + if isinstance(pcp, (ChannelParticipantCreator, ChannelParticipantAdmin)): + return permission is None or getattr(pcp.admin_rights, permission, False) + elif isinstance(pcp, (ChatParticipantCreator, ChatParticipantAdmin)): + return True + return False + + async def _can_use_commands( + self, chat: TypePeer, tgid: TelegramID, permission: TelegramAdminPermission | None = None + ) -> bool: if tgid in self.tg_whitelist: return True @@ -158,22 +226,20 @@ class Bot(AbstractUser): return True if self.whitelist_group_admins: - if isinstance(chat, PeerChannel): - p = await self.client(GetParticipantRequest(chat, tgid)) - return isinstance( - p.participant, (ChannelParticipantCreator, ChannelParticipantAdmin) - ) - elif isinstance(chat, PeerChat): - chat = await self.client(GetFullChatRequest(chat.chat_id)) - participants = chat.full_chat.participants.participants - for p in participants: - if p.user_id == tgid: - return isinstance(p, (ChatParticipantCreator, ChatParticipantAdmin)) + pcp = await self._get_admin_participant(chat, tgid) + return self._has_participant_permission(pcp, permission) return False - async def check_can_use_commands(self, event: Message, reply: ReplyFunc) -> bool: - # FIXME event.from_id is not int - if not await self._can_use_commands(event.to_id, TelegramID(event.from_id)): + async def check_can_use_command(self, event: Message, reply: ReplyFunc, command: str) -> bool: + if command not in self.required_permissions: + # Unknown command + return False + elif not isinstance(event.from_id, PeerUser): + await reply("Channels can't use commands") + return False + elif not await self._can_use_commands( + event.to_id, TelegramID(event.from_id.user_id), self.required_permissions[command] + ): await reply("You do not have the permission to use that command.") return False return True @@ -215,9 +281,48 @@ class Bot(AbstractUser): f"Just invite [{displayname}](tg://user?id={user.tgid})" ) else: - await portal.invite_to_matrix(user.mxid) + try: + await portal.invite_to_matrix(user.mxid) + except MBadState: + try: + await portal.main_intent.unban_user( + portal.mxid, user.mxid, reason="Invited from Telegram" + ) + except Exception: + return await reply(f"Failed to unban `{user.mxid}` from the portal.") + await portal.invite_to_matrix(user.mxid) + return await reply(f"Unbanned and invited `{user.mxid}` to the portal.") return await reply(f"Invited `{user.mxid}` to the portal.") + async def handle_command_ban( + self, message: Message, portal: po.Portal, reply: ReplyFunc, reason: str + ) -> Message: + if not message.reply_to: + return await reply("You must reply to a relaybot message when using that command") + reply_to_id = TelegramID(message.reply_to.reply_to_msg_id) + tg_space = portal.tgid if portal.peer_type == "channel" else self.tgid + msg = await DBMessage.get_one_by_tgid(reply_to_id, tg_space) + if not msg or msg.sender != self.tgid or not msg.sender_mxid: + return await reply("Target message is not a relayed message") + puppet = await pu.Puppet.get_by_peer(message.from_id) + try: + await puppet.intent_for(portal).ban_user(portal.mxid, msg.sender_mxid, reason) + except MForbidden as e: + self.log.warning( + f"Failed to ban {msg.sender_mxid} from {portal.mxid} as {puppet.mxid}: {e}, " + f"falling back to bridge bot" + ) + reason_prefix = f"Banned by {puppet.displayname or puppet.tgid}" + reason = f"{reason_prefix}: {reason}" if reason else reason_prefix + try: + await self.az.intent.ban_user(portal.mxid, msg.sender_mxid, reason) + except MForbidden as e: + self.log.warning( + f"Failed to ban {msg.sender_mxid} from {portal.mxid} as bridge bot: {e}" + ) + return await reply(f"Failed to ban `{msg.sender_mxid}`") + return await reply(f"Successfully banned `{msg.sender_mxid}`") + @staticmethod def handle_command_id(message: Message, reply: ReplyFunc) -> Awaitable[Message]: # Provide the prefixed ID to the user so that the user wouldn't need to specify whether the @@ -235,53 +340,44 @@ class Bot(AbstractUser): else: return reply("Failed to find chat ID.") - def match_command(self, text: str, command: str) -> bool: - text = text.lower() - command = f"/{command.lower()}" - command_targeted = f"{command}@{self.tg_username.lower()}" + def parse_command(self, message: Message) -> tuple[str | None, str | None]: + if not message.entities or len(message.entities) < 1 or not message.message: + return None, None + cmd_entity = message.entities[0] + if not isinstance(cmd_entity, MessageEntityBotCommand) or cmd_entity.offset != 0: + return None, None + surrogated_text = add_surrogate(message.message) + command: str = del_surrogate(surrogated_text[: cmd_entity.length]).lower() + rest_of_message: str = "" + if len(surrogated_text) > cmd_entity.length + 1: + rest_of_message: str = del_surrogate(surrogated_text[cmd_entity.length + 1 :]) + command, *target = command.split("@", 1) + if not command.startswith("/"): + return None, None + elif target and target[0] != self.tg_username.lower(): + return None, None + return command[1:], rest_of_message - is_plain_command = text == command or text == command_targeted - if is_plain_command: - return True - - is_arg_command = text.startswith(command + " ") or text.startswith(command_targeted + " ") - if is_arg_command: - return True - - return False - - async def handle_command(self, message: Message) -> None: + async def handle_command(self, message: Message, command: str, args: str) -> None: def reply(reply_text: str) -> Awaitable[Message]: return self.client.send_message(message.chat_id, reply_text, reply_to=message.id) - text = message.message - - if self.match_command(text, "start"): + if command == "start": pcm = self.config["bridge.relaybot.private_chat.message"] if pcm: await reply(pcm) - return - elif self.match_command(text, "id"): + elif command == "id": await self.handle_command_id(message, reply) - return - elif message.is_private: - return - - portal = await po.Portal.get_by_entity(message.to_id) - - is_portal_cmd = self.match_command(text, "portal") - is_invite_cmd = self.match_command(text, "invite") - if is_portal_cmd or is_invite_cmd: - if not await self.check_can_use_commands(message, reply): + elif not message.is_private: + if not await self.check_can_use_command(message, reply, command): return - if is_portal_cmd: + portal = await po.Portal.get_by_entity(message.to_id) + if command == "portal": await self.handle_command_portal(portal, reply) - elif is_invite_cmd: - try: - mxid = text[text.index(" ") + 1 :] - except ValueError: - mxid = "" - await self.handle_command_invite(portal, reply, mxid_input=UserID(mxid)) + elif command == "invite": + await self.handle_command_invite(portal, reply, mxid_input=UserID(args)) + elif command == "mxban": + await self.handle_command_ban(message, portal, reply, reason=args) async def handle_service_message(self, message: MessageService) -> None: to_peer = message.to_id @@ -310,15 +406,10 @@ class Bot(AbstractUser): await self.handle_service_message(update.message) return False - is_command = ( - isinstance(update.message, Message) - and update.message.entities - and len(update.message.entities) > 0 - and isinstance(update.message.entities[0], MessageEntityBotCommand) - and update.message.entities[0].offset == 0 - ) - if is_command: - await self.handle_command(update.message) + if isinstance(update.message, Message): + command, args = self.parse_command(update.message) + if command: + await self.handle_command(update.message, command, args) return False def is_in_chat(self, peer_id) -> bool: