# mautrix-telegram - A Matrix-Telegram puppeting bridge # Copyright (C) 2019 Tulir Asokan # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from typing import Dict, Set, Tuple, Union, Iterable, TYPE_CHECKING from mautrix.bridge import BaseMatrixHandler from mautrix.types import (Event, EventType, RoomID, UserID, EventID, ReceiptEvent, ReceiptType, ReceiptEventContent, PresenceEvent, PresenceState, TypingEvent, MessageEvent, StateEvent, RedactionEvent, RoomNameStateEventContent, RoomAvatarStateEventContent, RoomTopicStateEventContent, MemberStateEventContent) from mautrix.errors import MatrixError from . import user as u, portal as po, puppet as pu, commands as com if TYPE_CHECKING: from .context import Context from .bot import Bot try: from prometheus_client import Histogram EVENT_TIME = Histogram("matrix_event", "Time spent processing Matrix events", ["event_type"]) except ImportError: Histogram = None EVENT_TIME = None RoomMetaStateEventContent = Union[RoomNameStateEventContent, RoomAvatarStateEventContent, RoomTopicStateEventContent] class MatrixHandler(BaseMatrixHandler): bot: 'Bot' commands: 'com.CommandProcessor' previously_typing: Dict[RoomID, Set[UserID]] def __init__(self, context: 'Context') -> None: super(MatrixHandler, self).__init__(context.az, context.config, loop=context.loop, command_processor=com.CommandProcessor(context)) self.bot = context.bot self.previously_typing = {} async def get_user(self, user_id: UserID) -> 'u.User': return await u.User.get_by_mxid(user_id).ensure_started() async def get_portal(self, room_id: RoomID) -> 'po.Portal': return po.Portal.get_by_mxid(room_id) async def get_puppet(self, user_id: UserID) -> 'pu.Puppet': return pu.Puppet.get_by_mxid(user_id) async def handle_puppet_invite(self, room_id: RoomID, puppet: pu.Puppet, inviter: u.User, event_id: EventID) -> None: intent = puppet.default_mxid_intent self.log.debug(f"{inviter} invited puppet for {puppet.tgid} to {room_id}") if not await inviter.is_logged_in(): await intent.error_and_leave( room_id, text="Please log in before inviting Telegram puppets.") return portal = po.Portal.get_by_mxid(room_id) if portal: if portal.peer_type == "user": await intent.error_and_leave( room_id, text="You can not invite additional users to private chats.") return await portal.invite_telegram(inviter, puppet) await intent.join_room(room_id) return try: members = await self.az.intent.get_room_members(room_id) except MatrixError: members = [] if self.az.bot_mxid not in members: if len(members) > 1: await intent.error_and_leave(room_id, text=None, html=( f"Please invite " f"the bridge bot " f"first if you want to create a Telegram chat.")) return await intent.join_room(room_id) portal = po.Portal.get_by_tgid(puppet.tgid, inviter.tgid, "user") if portal.mxid: try: await intent.invite_user(portal.mxid, inviter.mxid) await intent.send_notice( room_id, text=f"You already have a private chat with me: {portal.mxid}", html=("You already have a private chat with me: " f"Link to room")) await intent.leave_room(room_id) return except MatrixError: pass portal.mxid = room_id portal.save() inviter.register_portal(portal) await intent.send_notice(room_id, "Portal to private chat created.") else: await intent.join_room(room_id) await intent.send_notice(room_id, "This puppet will remain inactive until a " "Telegram chat is created for this room.") async def send_welcome_message(self, room_id: RoomID, inviter: 'u.User') -> None: try: is_management = len(await self.az.intent.get_room_members(room_id)) == 2 except MatrixError: # The AS bot is not in the room. return cmd_prefix = self.commands.command_prefix text = html = "Hello, I'm a Telegram bridge bot. " if is_management and inviter.puppet_whitelisted and not await inviter.is_logged_in(): text += f"Use `{cmd_prefix} help` for help or `{cmd_prefix} login` to log in." html += (f"Use {cmd_prefix} help for help" f" or {cmd_prefix} login to log in.") else: text += f"Use `{cmd_prefix} help` for help." html += f"Use {cmd_prefix} help for help." await self.az.intent.send_notice(room_id, text=text, html=html) async def handle_invite(self, room_id: RoomID, user_id: UserID, inviter: 'u.User', event_id: EventID) -> None: user = u.User.get_by_mxid(user_id, create=False) if not user: return await user.ensure_started() portal = po.Portal.get_by_mxid(room_id) if user and await user.has_full_access(allow_bot=True) and portal: await portal.invite_telegram(inviter, user) return async def handle_join(self, room_id: RoomID, user_id: UserID, event_id: EventID) -> None: user = await u.User.get_by_mxid(user_id).ensure_started() portal = po.Portal.get_by_mxid(room_id) if not portal: return if not user.relaybot_whitelisted: await portal.main_intent.kick_user(room_id, user.mxid, "You are not whitelisted on this Telegram bridge.") return elif not await user.is_logged_in() and not portal.has_bot: await portal.main_intent.kick_user(room_id, user.mxid, "This chat does not have a bot relaying " "messages for unauthenticated users.") return self.log.debug(f"{user} joined {room_id}") if await user.is_logged_in() or portal.has_bot: await portal.join_matrix(user, event_id) async def handle_raw_leave(self, room_id: RoomID, user_id: UserID, sender_id: UserID, reason: str, event_id: EventID) -> None: self.log.debug(f"{user_id} left {room_id}") sender = u.User.get_by_mxid(sender_id, create=False) if not sender: return await sender.ensure_started() portal = po.Portal.get_by_mxid(room_id) if not portal: return puppet = pu.Puppet.get_by_mxid(user_id) if puppet: await portal.kick_matrix(puppet, sender) return user = u.User.get_by_mxid(user_id, create=False) if not user: return await user.ensure_started() if sender_id != user_id: await portal.kick_matrix(user, sender) else: await portal.leave_matrix(user, event_id) @staticmethod async def allow_message(user: 'u.User') -> bool: return user.relaybot_whitelisted @staticmethod async def allow_command(user: 'u.User') -> bool: return user.whitelisted @staticmethod async def allow_bridging_message(user: 'u.User', portal: 'po.Portal') -> bool: return await user.is_logged_in() or portal.has_bot @staticmethod async def handle_redaction(evt: RedactionEvent) -> None: sender = await u.User.get_by_mxid(evt.sender).ensure_started() if not sender.relaybot_whitelisted: return portal = po.Portal.get_by_mxid(evt.room_id) if not portal: return await portal.handle_matrix_deletion(sender, evt.redacts) @staticmethod async def handle_power_levels(evt: StateEvent) -> None: portal = po.Portal.get_by_mxid(evt.room_id) sender = await u.User.get_by_mxid(evt.sender).ensure_started() if await sender.has_full_access(allow_bot=True) and portal: await portal.handle_matrix_power_levels(sender, evt.content.users, evt.unsigned.prev_content.users) @staticmethod async def handle_room_meta(evt_type: EventType, room_id: RoomID, sender_mxid: UserID, content: RoomMetaStateEventContent) -> None: portal = po.Portal.get_by_mxid(room_id) sender = await u.User.get_by_mxid(sender_mxid).ensure_started() if await sender.has_full_access(allow_bot=True) and portal: handler, content_type, content_key = { EventType.ROOM_NAME: (portal.handle_matrix_title, RoomNameStateEventContent, "name"), EventType.ROOM_TOPIC: (portal.handle_matrix_about, RoomTopicStateEventContent, "topic"), EventType.ROOM_AVATAR: (portal.handle_matrix_avatar, RoomAvatarStateEventContent, "url"), }[evt_type] if not isinstance(content, content_type): return await handler(sender, content[content_key]) @staticmethod async def handle_room_pin(room_id: RoomID, sender_mxid: UserID, new_events: Set[str], old_events: Set[str]) -> None: portal = po.Portal.get_by_mxid(room_id) sender = await u.User.get_by_mxid(sender_mxid).ensure_started() if await sender.has_full_access(allow_bot=True) and portal: events = new_events - old_events if len(events) > 0: # New event pinned, set that as pinned in Telegram. await portal.handle_matrix_pin(sender, EventID(events.pop())) elif len(new_events) == 0: # All pinned events removed, remove pinned event in Telegram. await portal.handle_matrix_pin(sender, None) @staticmethod async def handle_room_upgrade(room_id: RoomID, new_room_id: RoomID) -> None: portal = po.Portal.get_by_mxid(room_id) if portal: await portal.handle_matrix_upgrade(new_room_id) async def handle_member_info_change(self, room_id: RoomID, user_id: UserID, profile: MemberStateEventContent, prev_profile: MemberStateEventContent, event_id: EventID) -> None: if profile.displayname == prev_profile.displayname: return portal = po.Portal.get_by_mxid(room_id) if not portal or not portal.has_bot: return user = await u.User.get_by_mxid(user_id).ensure_started() if await user.needs_relaybot(portal): await portal.name_change_matrix(user, profile.displayname, prev_profile.displayname, event_id) @staticmethod def parse_read_receipts(content: ReceiptEventContent) -> Iterable[Tuple[UserID, EventID]]: return ((user_id, event_id) for event_id, receipts in content.items() for user_id in receipts.get(ReceiptType.READ, {})) @staticmethod async def handle_read_receipts(room_id: RoomID, receipts: Iterable[Tuple[UserID, EventID]] ) -> None: portal = po.Portal.get_by_mxid(room_id) if not portal: return for user_id, event_id in receipts: user = await u.User.get_by_mxid(user_id).ensure_started() if not await user.is_logged_in(): continue await portal.mark_read(user, event_id) @staticmethod async def handle_presence(user_id: UserID, presence: PresenceState) -> None: user = await u.User.get_by_mxid(user_id).ensure_started() if not await user.is_logged_in(): return await user.set_presence(presence == PresenceState.ONLINE) async def handle_typing(self, room_id: RoomID, now_typing: Set[UserID]) -> None: portal = po.Portal.get_by_mxid(room_id) if not portal: return previously_typing = self.previously_typing.get(room_id, set()) for user_id in set(previously_typing | now_typing): is_typing = user_id in now_typing was_typing = user_id in previously_typing if is_typing and was_typing: continue user = await u.User.get_by_mxid(user_id).ensure_started() if not await user.is_logged_in(): continue await portal.set_typing(user, is_typing) self.previously_typing[room_id] = now_typing def filter_matrix_event(self, evt: Event) -> bool: if not isinstance(evt, (RedactionEvent, MessageEvent, StateEvent)): return True return evt.sender and (evt.sender == self.az.bot_mxid or pu.Puppet.get_id_from_mxid(evt.sender) is not None) async def handle_ephemeral_event(self, evt: Union[ReceiptEvent, PresenceEvent, TypingEvent] ) -> None: if evt.type == EventType.RECEIPT: await self.handle_read_receipts(evt.room_id, self.parse_read_receipts(evt.content)) elif evt.type == EventType.PRESENCE: await self.handle_presence(evt.sender, evt.content.presence) elif evt.type == EventType.TYPING: await self.handle_typing(evt.room_id, set(evt.content.user_ids)) async def handle_event(self, evt: Event) -> None: if evt.type == EventType.ROOM_REDACTION: await self.handle_redaction(evt) async def handle_state_event(self, evt: StateEvent) -> None: if evt.type == EventType.ROOM_POWER_LEVELS: await self.handle_power_levels(evt) elif evt.type in (EventType.ROOM_NAME, EventType.ROOM_AVATAR, EventType.ROOM_TOPIC): await self.handle_room_meta(evt.type, evt.room_id, evt.sender, evt.content) elif evt.type == EventType.ROOM_PINNED_EVENTS: new_events = set(evt.content.pinned) try: old_events = set(evt.unsigned.prev_content.pinned) except (KeyError, ValueError, TypeError, AttributeError): old_events = set() await self.handle_room_pin(evt.room_id, evt.sender, new_events, old_events) elif evt.type == EventType.ROOM_TOMBSTONE: await self.handle_room_upgrade(evt.room_id, evt.content.replacement_room) # async def handle_event(self, evt: MatrixEvent) -> None: # if self.filter_matrix_event(evt): # return # start_time = time.time() # # if EVENT_TIME: # EVENT_TIME.labels(event_type=evt_type).observe(time.time() - start_time)