# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2019 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
from typing import Dict, Set, Tuple, Union, Iterable, TYPE_CHECKING
from mautrix.bridge import BaseMatrixHandler
from mautrix.types import (Event, EventType, RoomID, UserID, EventID, ReceiptEvent, ReceiptType,
ReceiptEventContent, PresenceEvent, PresenceState, TypingEvent,
MessageEvent, StateEvent, RedactionEvent, RoomNameStateEventContent,
RoomAvatarStateEventContent, RoomTopicStateEventContent,
MemberStateEventContent)
from mautrix.errors import MatrixError
from . import user as u, portal as po, puppet as pu, commands as com
if TYPE_CHECKING:
from .context import Context
from .bot import Bot
try:
from prometheus_client import Histogram
EVENT_TIME = Histogram("matrix_event", "Time spent processing Matrix events", ["event_type"])
except ImportError:
Histogram = None
EVENT_TIME = None
RoomMetaStateEventContent = Union[RoomNameStateEventContent, RoomAvatarStateEventContent,
RoomTopicStateEventContent]
class MatrixHandler(BaseMatrixHandler):
bot: 'Bot'
commands: 'com.CommandProcessor'
previously_typing: Dict[RoomID, Set[UserID]]
def __init__(self, context: 'Context') -> None:
super(MatrixHandler, self).__init__(context.az, context.config, loop=context.loop,
command_processor=com.CommandProcessor(context))
self.bot = context.bot
self.previously_typing = {}
async def get_user(self, user_id: UserID) -> 'u.User':
return await u.User.get_by_mxid(user_id).ensure_started()
async def get_portal(self, room_id: RoomID) -> 'po.Portal':
return po.Portal.get_by_mxid(room_id)
async def get_puppet(self, user_id: UserID) -> 'pu.Puppet':
return pu.Puppet.get_by_mxid(user_id)
async def handle_puppet_invite(self, room_id: RoomID, puppet: pu.Puppet, inviter: u.User,
event_id: EventID) -> None:
intent = puppet.default_mxid_intent
self.log.debug(f"{inviter} invited puppet for {puppet.tgid} to {room_id}")
if not await inviter.is_logged_in():
await intent.error_and_leave(
room_id, text="Please log in before inviting Telegram puppets.")
return
portal = po.Portal.get_by_mxid(room_id)
if portal:
if portal.peer_type == "user":
await intent.error_and_leave(
room_id, text="You can not invite additional users to private chats.")
return
await portal.invite_telegram(inviter, puppet)
await intent.join_room(room_id)
return
try:
members = await self.az.intent.get_room_members(room_id)
except MatrixError:
members = []
if self.az.bot_mxid not in members:
if len(members) > 1:
await intent.error_and_leave(room_id, text=None, html=(
f"Please invite "
f"the bridge bot "
f"first if you want to create a Telegram chat."))
return
await intent.join_room(room_id)
portal = po.Portal.get_by_tgid(puppet.tgid, inviter.tgid, "user")
if portal.mxid:
try:
await intent.invite_user(portal.mxid, inviter.mxid)
await intent.send_notice(
room_id, text=f"You already have a private chat with me: {portal.mxid}",
html=("You already have a private chat with me: "
f"Link to room"))
await intent.leave_room(room_id)
return
except MatrixError:
pass
portal.mxid = room_id
portal.save()
inviter.register_portal(portal)
await intent.send_notice(room_id, "Portal to private chat created.")
else:
await intent.join_room(room_id)
await intent.send_notice(room_id, "This puppet will remain inactive until a "
"Telegram chat is created for this room.")
async def send_welcome_message(self, room_id: RoomID, inviter: 'u.User') -> None:
try:
is_management = len(await self.az.intent.get_room_members(room_id)) == 2
except MatrixError:
# The AS bot is not in the room.
return
cmd_prefix = self.commands.command_prefix
text = html = "Hello, I'm a Telegram bridge bot. "
if is_management and inviter.puppet_whitelisted and not await inviter.is_logged_in():
text += f"Use `{cmd_prefix} help` for help or `{cmd_prefix} login` to log in."
html += (f"Use {cmd_prefix} help for help"
f" or {cmd_prefix} login to log in.")
else:
text += f"Use `{cmd_prefix} help` for help."
html += f"Use {cmd_prefix} help for help."
await self.az.intent.send_notice(room_id, text=text, html=html)
async def handle_invite(self, room_id: RoomID, user_id: UserID, inviter: 'u.User',
event_id: EventID) -> None:
user = u.User.get_by_mxid(user_id, create=False)
if not user:
return
await user.ensure_started()
portal = po.Portal.get_by_mxid(room_id)
if user and await user.has_full_access(allow_bot=True) and portal:
await portal.invite_telegram(inviter, user)
return
async def handle_join(self, room_id: RoomID, user_id: UserID, event_id: EventID) -> None:
user = await u.User.get_by_mxid(user_id).ensure_started()
portal = po.Portal.get_by_mxid(room_id)
if not portal:
return
if not user.relaybot_whitelisted:
await portal.main_intent.kick_user(room_id, user.mxid,
"You are not whitelisted on this Telegram bridge.")
return
elif not await user.is_logged_in() and not portal.has_bot:
await portal.main_intent.kick_user(room_id, user.mxid,
"This chat does not have a bot relaying "
"messages for unauthenticated users.")
return
self.log.debug(f"{user} joined {room_id}")
if await user.is_logged_in() or portal.has_bot:
await portal.join_matrix(user, event_id)
async def handle_raw_leave(self, room_id: RoomID, user_id: UserID, sender_id: UserID,
reason: str, event_id: EventID) -> None:
self.log.debug(f"{user_id} left {room_id}")
sender = u.User.get_by_mxid(sender_id, create=False)
if not sender:
return
await sender.ensure_started()
portal = po.Portal.get_by_mxid(room_id)
if not portal:
return
puppet = pu.Puppet.get_by_mxid(user_id)
if puppet:
await portal.kick_matrix(puppet, sender)
return
user = u.User.get_by_mxid(user_id, create=False)
if not user:
return
await user.ensure_started()
if sender_id != user_id:
await portal.kick_matrix(user, sender)
else:
await portal.leave_matrix(user, event_id)
@staticmethod
async def allow_message(user: 'u.User') -> bool:
return user.relaybot_whitelisted
@staticmethod
async def allow_command(user: 'u.User') -> bool:
return user.whitelisted
@staticmethod
async def allow_bridging_message(user: 'u.User', portal: 'po.Portal') -> bool:
return await user.is_logged_in() or portal.has_bot
@staticmethod
async def handle_redaction(evt: RedactionEvent) -> None:
sender = await u.User.get_by_mxid(evt.sender).ensure_started()
if not sender.relaybot_whitelisted:
return
portal = po.Portal.get_by_mxid(evt.room_id)
if not portal:
return
await portal.handle_matrix_deletion(sender, evt.redacts)
@staticmethod
async def handle_power_levels(evt: StateEvent) -> None:
portal = po.Portal.get_by_mxid(evt.room_id)
sender = await u.User.get_by_mxid(evt.sender).ensure_started()
if await sender.has_full_access(allow_bot=True) and portal:
await portal.handle_matrix_power_levels(sender, evt.content.users,
evt.unsigned.prev_content.users)
@staticmethod
async def handle_room_meta(evt_type: EventType, room_id: RoomID, sender_mxid: UserID,
content: RoomMetaStateEventContent) -> None:
portal = po.Portal.get_by_mxid(room_id)
sender = await u.User.get_by_mxid(sender_mxid).ensure_started()
if await sender.has_full_access(allow_bot=True) and portal:
handler, content_key = {
EventType.ROOM_NAME: (portal.handle_matrix_title, "name"),
EventType.ROOM_TOPIC: (portal.handle_matrix_about, "topic"),
EventType.ROOM_AVATAR: (portal.handle_matrix_avatar, "url"),
}[evt_type]
if content_key not in content:
return
await handler(sender, content[content_key])
@staticmethod
async def handle_room_pin(room_id: RoomID, sender_mxid: UserID,
new_events: Set[str], old_events: Set[str]) -> None:
portal = po.Portal.get_by_mxid(room_id)
sender = await u.User.get_by_mxid(sender_mxid).ensure_started()
if await sender.has_full_access(allow_bot=True) and portal:
events = new_events - old_events
if len(events) > 0:
# New event pinned, set that as pinned in Telegram.
await portal.handle_matrix_pin(sender, EventID(events.pop()))
elif len(new_events) == 0:
# All pinned events removed, remove pinned event in Telegram.
await portal.handle_matrix_pin(sender, None)
@staticmethod
async def handle_room_upgrade(room_id: RoomID, new_room_id: RoomID) -> None:
portal = po.Portal.get_by_mxid(room_id)
if portal:
await portal.handle_matrix_upgrade(new_room_id)
async def handle_member_info_change(self, room_id: RoomID, user_id: UserID,
profile: MemberStateEventContent,
prev_profile: MemberStateEventContent,
event_id: EventID) -> None:
if profile.displayname == prev_profile.displayname:
return
portal = po.Portal.get_by_mxid(room_id)
if not portal or not portal.has_bot:
return
user = await u.User.get_by_mxid(user_id).ensure_started()
if await user.needs_relaybot(portal):
await portal.name_change_matrix(user, profile.displayname, prev_profile.displayname,
event_id)
@staticmethod
def parse_read_receipts(content: ReceiptEventContent) -> Iterable[Tuple[UserID, EventID]]:
return ((user_id, event_id)
for event_id, receipts in content.items()
for user_id in receipts.get(ReceiptType.READ, {}))
@staticmethod
async def handle_read_receipts(room_id: RoomID, receipts: Iterable[Tuple[UserID, EventID]]
) -> None:
portal = po.Portal.get_by_mxid(room_id)
if not portal:
return
for user_id, event_id in receipts:
user = await u.User.get_by_mxid(user_id).ensure_started()
if not await user.is_logged_in():
continue
await portal.mark_read(user, event_id)
@staticmethod
async def handle_presence(user_id: UserID, presence: PresenceState) -> None:
user = await u.User.get_by_mxid(user_id).ensure_started()
if not await user.is_logged_in():
return
await user.set_presence(presence == PresenceState.ONLINE)
async def handle_typing(self, room_id: RoomID, now_typing: Set[UserID]) -> None:
portal = po.Portal.get_by_mxid(room_id)
if not portal:
return
previously_typing = self.previously_typing.get(room_id, set())
for user_id in set(previously_typing | now_typing):
is_typing = user_id in now_typing
was_typing = user_id in previously_typing
if is_typing and was_typing:
continue
user = await u.User.get_by_mxid(user_id).ensure_started()
if not await user.is_logged_in():
continue
await portal.set_typing(user, is_typing)
self.previously_typing[room_id] = now_typing
def filter_matrix_event(self, evt: Event) -> bool:
if not isinstance(evt, (MessageEvent, StateEvent)):
return True
return evt.sender and (evt.sender == self.az.bot_mxid
or pu.Puppet.get_id_from_mxid(evt.sender) is not None)
async def handle_ephemeral_event(self, evt: Union[ReceiptEvent, PresenceEvent, TypingEvent]
) -> None:
if evt.type == EventType.RECEIPT:
await self.handle_read_receipts(evt.room_id, self.parse_read_receipts(evt.content))
elif evt.type == EventType.PRESENCE:
await self.handle_presence(evt.sender, evt.content.presence)
elif evt.type == EventType.TYPING:
await self.handle_typing(evt.room_id, set(evt.content.user_ids))
async def handle_event(self, evt: Event) -> None:
if evt.type == EventType.ROOM_REDACTION:
await self.handle_redaction(evt)
async def handle_state_event(self, evt: StateEvent) -> None:
if evt.type == EventType.ROOM_POWER_LEVELS:
await self.handle_power_levels(evt)
elif evt.type in (EventType.ROOM_NAME, EventType.ROOM_AVATAR, EventType.ROOM_TOPIC):
await self.handle_room_meta(evt.type, evt.room_id, evt.sender, evt.content)
elif evt.type == EventType.ROOM_PINNED_EVENTS:
new_events = set(evt.content.pinned)
try:
old_events = set(evt.unsigned.prev_content.pinned)
except (KeyError, ValueError, TypeError, AttributeError):
old_events = set()
await self.handle_room_pin(evt.room_id, evt.sender, new_events, old_events)
elif evt.type == EventType.ROOM_TOMBSTONE:
await self.handle_room_upgrade(evt.room_id, evt.content.replacement_room)
# async def handle_event(self, evt: MatrixEvent) -> None:
# if self.filter_matrix_event(evt):
# return
# start_time = time.time()
#
# if EVENT_TIME:
# EVENT_TIME.labels(event_type=evt_type).observe(time.time() - start_time)