Files
mautrix-telegram/mautrix_telegram/portal.py
T

3103 lines
120 KiB
Python

# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2021 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from typing import (
TYPE_CHECKING,
Any,
AsyncGenerator,
Awaitable,
Callable,
List,
NamedTuple,
Union,
cast,
)
from datetime import datetime
from html import escape as escape_html
from sqlite3 import IntegrityError
from string import Template
import asyncio
import base64
import codecs
import mimetypes
import random
import time
import unicodedata
from asyncpg import UniqueViolationError
from telethon.errors import (
ChatNotModifiedError,
MessageIdInvalidError,
PhotoExtInvalidError,
PhotoInvalidDimensionsError,
PhotoSaveFileInvalidError,
RPCError,
)
from telethon.tl.functions.channels import (
CreateChannelRequest,
EditPhotoRequest,
EditTitleRequest,
InviteToChannelRequest,
JoinChannelRequest,
UpdateUsernameRequest,
ViewSponsoredMessageRequest,
)
from telethon.tl.functions.messages import (
AddChatUserRequest,
CreateChatRequest,
EditChatAboutRequest,
EditChatPhotoRequest,
EditChatTitleRequest,
ExportChatInviteRequest,
MigrateChatRequest,
SetTypingRequest,
UnpinAllMessagesRequest,
UpdatePinnedMessageRequest,
)
from telethon.tl.patched import Message, MessageService
from telethon.tl.types import (
Channel,
ChannelFull,
Chat,
ChatFull,
ChatPhoto,
ChatPhotoEmpty,
Document,
DocumentAttributeAnimated,
DocumentAttributeFilename,
DocumentAttributeImageSize,
DocumentAttributeSticker,
DocumentAttributeVideo,
GeoPoint,
InputChannel,
InputChatUploadedPhoto,
InputMediaUploadedDocument,
InputMediaUploadedPhoto,
InputPeerChannel,
InputPeerChat,
InputPeerPhotoFileLocation,
InputPeerUser,
InputPhotoFileLocation,
InputUser,
MessageActionChannelCreate,
MessageActionChatAddUser,
MessageActionChatCreate,
MessageActionChatDeletePhoto,
MessageActionChatDeleteUser,
MessageActionChatEditPhoto,
MessageActionChatEditTitle,
MessageActionChatJoinedByLink,
MessageActionChatMigrateTo,
MessageActionGameScore,
MessageEntityPre,
MessageMediaContact,
MessageMediaDice,
MessageMediaDocument,
MessageMediaGame,
MessageMediaGeo,
MessageMediaPhoto,
MessageMediaPoll,
MessageMediaUnsupported,
PeerChannel,
PeerChat,
PeerUser,
Photo,
PhotoCachedSize,
PhotoEmpty,
PhotoSize,
PhotoSizeEmpty,
PhotoSizeProgressive,
Poll,
SendMessageCancelAction,
SendMessageTypingAction,
SponsoredMessage,
TypeChannelParticipant,
TypeChat,
TypeChatParticipant,
TypeDocumentAttribute,
TypeInputChannel,
TypeInputPeer,
TypeMessage,
TypeMessageAction,
TypePeer,
TypePhotoSize,
TypeUser,
TypeUserFull,
UpdateChannelUserTyping,
UpdateChatUserTyping,
UpdateNewMessage,
UpdateUserTyping,
User,
UserFull,
UserProfilePhoto,
UserProfilePhotoEmpty,
)
import magic
from mautrix.appservice import DOUBLE_PUPPET_SOURCE_KEY, IntentAPI
from mautrix.bridge import BasePortal, NotificationDisabler, async_getter_lock
from mautrix.errors import IntentError, MatrixRequestError, MForbidden
from mautrix.types import (
ContentURI,
EventID,
EventType,
Format,
ImageInfo,
JoinRule,
LocationMessageEventContent,
MediaMessageEventContent,
Membership,
MessageEventContent,
MessageType,
PowerLevelStateEventContent,
RelatesTo,
RoomAlias,
RoomAvatarStateEventContent,
RoomCreatePreset,
RoomID,
RoomNameStateEventContent,
RoomTopicStateEventContent,
StateEventContent,
TextMessageEventContent,
ThumbnailInfo,
UserID,
VideoInfo,
)
from mautrix.util.message_send_checkpoint import MessageSendCheckpointStatus
from mautrix.util.simple_lock import SimpleLock
from mautrix.util.simple_template import SimpleTemplate
from . import abstract_user as au, formatter, portal_util as putil, puppet as p, user as u, util
from .config import Config
from .db import Message as DBMessage, Portal as DBPortal, TelegramFile as DBTelegramFile
from .tgclient import MautrixTelegramClient
from .types import TelegramID
from .util import sane_mimetypes
try:
from mautrix.crypto.attachments import decrypt_attachment
except ImportError:
decrypt_attachment = None
if TYPE_CHECKING:
from .__main__ import TelegramBridge
from .bot import Bot
StateBridge = EventType.find("m.bridge", EventType.Class.STATE)
StateHalfShotBridge = EventType.find("uk.half-shot.bridge", EventType.Class.STATE)
InviteList = Union[UserID, List[UserID]]
UpdateTyping = Union[UpdateUserTyping, UpdateChatUserTyping, UpdateChannelUserTyping]
TypeChatPhoto = Union[ChatPhoto, ChatPhotoEmpty, Photo, PhotoEmpty]
MediaHandler = Callable[["au.AbstractUser", IntentAPI, Message, RelatesTo], Awaitable[EventID]]
class DocAttrs(NamedTuple):
name: str | None
mime_type: str | None
is_sticker: bool
sticker_alt: str | None
width: int
height: int
is_gif: bool
class Portal(DBPortal, BasePortal):
bot: "Bot"
config: Config
# Instance cache
by_mxid: dict[RoomID, Portal] = {}
by_tgid: dict[tuple[TelegramID, TelegramID], Portal] = {}
# Config cache
filter_mode: str
filter_list: list[int]
max_initial_member_sync: int
sync_channel_members: bool
sync_matrix_state: bool
public_portals: bool
private_chat_portal_meta: bool
alias_template: SimpleTemplate[str]
hs_domain: str
# Instance variables
deleted: bool
backfill_lock: SimpleLock
backfill_method_lock: asyncio.Lock
backfill_leave: set[IntentAPI] | None
alias: RoomAlias | None
dedup: putil.PortalDedup
send_lock: putil.PortalSendLock
_pin_lock: asyncio.Lock
_main_intent: IntentAPI | None
_room_create_lock: asyncio.Lock
_sponsored_msg: SponsoredMessage | None
_sponsored_entity: User | Channel | None
_sponsored_msg_ts: float
_sponsored_msg_lock: asyncio.Lock
_sponsored_evt_id: EventID | None
_sponsored_seen: dict[UserID, bool]
_new_messages_after_sponsored: bool
def __init__(
self,
tgid: TelegramID,
tg_receiver: TelegramID,
peer_type: str,
megagroup: bool = False,
mxid: RoomID | None = None,
avatar_url: ContentURI | None = None,
encrypted: bool = False,
sponsored_event_id: EventID | None = None,
sponsored_event_ts: int | None = None,
sponsored_msg_random_id: bytes | None = None,
username: str | None = None,
title: str | None = None,
about: str | None = None,
photo_id: str | None = None,
local_config: dict[str, Any] | None = None,
) -> None:
super().__init__(
tgid=tgid,
tg_receiver=tg_receiver,
peer_type=peer_type,
megagroup=megagroup,
mxid=mxid,
avatar_url=avatar_url,
encrypted=encrypted,
sponsored_event_id=sponsored_event_id,
sponsored_event_ts=sponsored_event_ts,
sponsored_msg_random_id=sponsored_msg_random_id,
username=username,
title=title,
about=about,
photo_id=photo_id,
local_config=local_config or {},
)
self.log = self.log.getChild(self.tgid_log if self.tgid else self.mxid)
self._main_intent = None
self.deleted = False
self.backfill_lock = SimpleLock(
"Waiting for backfilling to finish before handling %s", log=self.log
)
self.backfill_method_lock = asyncio.Lock()
self.backfill_leave = None
self.dedup = putil.PortalDedup(self)
self.send_lock = putil.PortalSendLock()
self._pin_lock = asyncio.Lock()
self._room_create_lock = asyncio.Lock()
self._sponsored_msg = None
self._sponsored_msg_ts = 0
self._sponsored_msg_lock = asyncio.Lock()
self._sponsored_seen = {}
self._new_messages_after_sponsored = True
# region Properties
@property
def tgid_full(self) -> tuple[TelegramID, TelegramID]:
return self.tgid, self.tg_receiver
@property
def tgid_log(self) -> str:
if self.tgid == self.tg_receiver:
return str(self.tgid)
return f"{self.tg_receiver}<->{self.tgid}"
@property
def name(self) -> str:
return self.title
@property
def alias(self) -> RoomAlias | None:
if not self.username:
return None
return RoomAlias(f"#{self.alias_localpart}:{self.hs_domain}")
@property
def alias_localpart(self) -> str | None:
if not self.username:
return None
return self.alias_template.format(self.username)
@property
def peer(self) -> TypePeer | TypeInputPeer:
if self.peer_type == "user":
return PeerUser(user_id=self.tgid)
elif self.peer_type == "chat":
return PeerChat(chat_id=self.tgid)
elif self.peer_type == "channel":
return PeerChannel(channel_id=self.tgid)
@property
def is_direct(self) -> bool:
return self.peer_type == "user"
@property
def has_bot(self) -> bool:
return bool(self.bot) and (
self.bot.is_in_chat(self.tgid)
or (self.peer_type == "user" and self.tg_receiver == self.bot.tgid)
)
@property
def main_intent(self) -> IntentAPI:
if self._main_intent is None:
raise RuntimeError("Portal must be postinit()ed before main_intent can be used")
return self._main_intent
@property
def allow_bridging(self) -> bool:
if self.peer_type == "user":
return True
elif self.filter_mode == "whitelist":
return self.tgid in self.filter_list
elif self.filter_mode == "blacklist":
return self.tgid not in self.filter_list
return True
@classmethod
def init_cls(cls, bridge: "TelegramBridge") -> None:
BasePortal.bridge = bridge
cls.az = bridge.az
cls.config = bridge.config
cls.loop = bridge.loop
cls.matrix = bridge.matrix
cls.bot = bridge.bot
cls.max_initial_member_sync = cls.config["bridge.max_initial_member_sync"]
cls.sync_channel_members = cls.config["bridge.sync_channel_members"]
cls.sync_matrix_state = cls.config["bridge.sync_matrix_state"]
cls.public_portals = cls.config["bridge.public_portals"]
cls.private_chat_portal_meta = cls.config["bridge.private_chat_portal_meta"]
cls.filter_mode = cls.config["bridge.filter.mode"]
cls.filter_list = cls.config["bridge.filter.list"]
cls.hs_domain = cls.config["homeserver.domain"]
cls.alias_template = SimpleTemplate(
cls.config["bridge.alias_template"],
"groupname",
prefix="#",
suffix=f":{cls.hs_domain}",
)
NotificationDisabler.puppet_cls = p.Puppet
NotificationDisabler.config_enabled = cls.config["bridge.backfill.disable_notifications"]
putil.PortalDedup.dedup_pre_db_check = cls.config["bridge.deduplication.pre_db_check"]
putil.PortalDedup.dedup_cache_queue_length = cls.config[
"bridge.deduplication.cache_queue_length"
]
# endregion
# region Matrix -> Telegram metadata
async def get_telegram_users_in_matrix_room(
self, source: u.User
) -> tuple[list[InputPeerUser], list[UserID]]:
user_tgids = {}
user_mxids = await self.main_intent.get_room_members(
self.mxid, (Membership.JOIN, Membership.INVITE)
)
for mxid in user_mxids:
if mxid == self.az.bot_mxid:
continue
mx_user = await u.User.get_by_mxid(mxid, create=False)
if mx_user and mx_user.tgid:
user_tgids[mx_user.tgid] = mxid
puppet_id = p.Puppet.get_id_from_mxid(mxid)
if puppet_id:
user_tgids[puppet_id] = mxid
input_users = []
errors = []
for tgid, mxid in user_tgids.items():
try:
input_users.append(await source.client.get_input_entity(tgid))
except ValueError as e:
source.log.debug(
f"Failed to find the input entity for {tgid} ({mxid}) for "
f"creating a group: {e}"
)
errors.append(mxid)
return input_users, errors
async def upgrade_telegram_chat(self, source: u.User) -> None:
if self.peer_type != "chat":
raise ValueError("Only normal group chats are upgradable to supergroups.")
response = await source.client(MigrateChatRequest(chat_id=self.tgid))
entity = None
for chat in response.chats:
if isinstance(chat, Channel):
entity = chat
break
if not entity:
raise ValueError("Upgrade may have failed: output channel not found.")
await self._migrate_and_save_telegram(TelegramID(entity.id))
await self.update_info(source, entity)
async def _migrate_and_save_telegram(self, new_id: TelegramID) -> None:
try:
del self.by_tgid[self.tgid_full]
except KeyError:
pass
try:
existing = self.by_tgid[(new_id, new_id)]
except KeyError:
existing = None
self.by_tgid[(new_id, new_id)] = self
if existing:
await existing.delete()
old_id = self.tgid
await self.update_id(new_id, "channel")
self.log = self.__class__.log.getChild(self.tgid_log)
self.log.info(f"Telegram chat upgraded from {old_id}")
async def set_telegram_username(self, source: u.User, username: str) -> None:
if self.peer_type != "channel":
raise ValueError("Only channels and supergroups have usernames.")
await source.client(UpdateUsernameRequest(await self.get_input_entity(source), username))
if await self._update_username(username):
await self.save()
async def create_telegram_chat(
self, source: u.User, invites: list[InputUser], supergroup: bool = False
) -> None:
if not self.mxid:
raise ValueError("Can't create Telegram chat for portal without Matrix room.")
elif self.tgid:
raise ValueError("Can't create Telegram chat for portal with existing Telegram chat.")
if len(invites) < 2:
if self.bot is not None:
info, mxid = await self.bot.get_me()
raise ValueError(
"Not enough Telegram users to create a chat. "
"Invite more Telegram ghost users to the room, such as the "
f"relaybot ([{info.first_name}](https://matrix.to/#/{mxid}))."
)
raise ValueError(
"Not enough Telegram users to create a chat. "
"Invite more Telegram ghost users to the room."
)
if self.peer_type == "chat":
response = await source.client(CreateChatRequest(title=self.title, users=invites))
entity = response.chats[0]
elif self.peer_type == "channel":
response = await source.client(
CreateChannelRequest(
title=self.title, about=self.about or "", megagroup=supergroup
)
)
entity = response.chats[0]
await source.client(
InviteToChannelRequest(
channel=await source.client.get_input_entity(entity), users=invites
)
)
else:
raise ValueError("Invalid peer type for Telegram chat creation")
self.tgid = entity.id
self.tg_receiver = self.tgid
await self.postinit()
await self.insert()
await self.update_info(source, entity)
self.log = self.__class__.log.getChild(self.tgid_log)
if self.bot and self.bot.tgid in invites:
await self.bot.add_chat(self.tgid, self.peer_type)
levels = await self.main_intent.get_power_levels(self.mxid)
if levels.get_user_level(self.main_intent.mxid) == 100:
levels = putil.get_base_power_levels(self, levels, entity)
await self.main_intent.set_power_levels(self.mxid, levels)
await self.handle_matrix_power_levels(source, levels.users, {}, None)
await self.update_bridge_info()
async def invite_telegram(self, source: u.User, puppet: p.Puppet | au.AbstractUser) -> None:
if self.peer_type == "chat":
await source.client(
AddChatUserRequest(chat_id=self.tgid, user_id=puppet.tgid, fwd_limit=0)
)
elif self.peer_type == "channel":
await source.client(InviteToChannelRequest(channel=self.peer, users=[puppet.tgid]))
# We don't care if there are invites for private chat portals with the relaybot.
elif not self.bot or self.tg_receiver != self.bot.tgid:
raise ValueError("Invalid peer type for Telegram user invite")
# endregion
# region Telegram -> Matrix metadata
def _get_invite_content(self, double_puppet: p.Puppet | None) -> dict[str, Any]:
invite_content = {}
if double_puppet:
invite_content["fi.mau.will_auto_accept"] = True
if self.is_direct:
invite_content["is_direct"] = True
return invite_content
async def invite_to_matrix(self, users: InviteList) -> None:
if isinstance(users, list):
for user in users:
await self.invite_to_matrix(user)
else:
puppet = await p.Puppet.get_by_custom_mxid(users)
await self.main_intent.invite_user(
self.mxid, users, check_cache=True, extra_content=self._get_invite_content(puppet)
)
if puppet:
try:
await puppet.intent.ensure_joined(self.mxid)
except Exception:
self.log.exception("Failed to ensure %s is joined to portal", users)
async def update_matrix_room(
self,
user: au.AbstractUser,
entity: TypeChat | User,
direct: bool = None,
puppet: p.Puppet = None,
levels: PowerLevelStateEventContent = None,
users: list[User] = None,
) -> None:
if direct is None:
direct = self.peer_type == "user"
try:
await self._update_matrix_room(user, entity, direct, puppet, levels, users)
except Exception:
self.log.exception("Fatal error updating Matrix room")
async def _update_matrix_room(
self,
user: au.AbstractUser,
entity: TypeChat | User,
direct: bool,
puppet: p.Puppet = None,
levels: PowerLevelStateEventContent = None,
users: list[User] = None,
) -> None:
if not direct:
await self.update_info(user, entity)
if not users:
users = await self._get_users(user, entity)
await self._sync_telegram_users(user, users)
await self.update_power_levels(users, levels)
else:
if not puppet:
puppet = await p.Puppet.get_by_tgid(self.tgid)
await puppet.update_info(user, entity)
await puppet.intent_for(self).join_room(self.mxid)
if self.encrypted or self.private_chat_portal_meta:
# The bridge bot needs to join for e2ee, but that messes up the default name
# generation. If/when canonical DMs happen, this might not be necessary anymore.
changed = await self._update_title(puppet.displayname)
changed = await self._update_avatar(user, entity.photo) or changed
if changed:
await self.save()
await self.update_bridge_info()
puppet = await p.Puppet.get_by_custom_mxid(user.mxid)
if puppet:
try:
did_join = await puppet.intent.ensure_joined(self.mxid)
if isinstance(user, u.User) and did_join and self.peer_type == "user":
await user.update_direct_chats({self.main_intent.mxid: [self.mxid]})
except Exception:
self.log.exception("Failed to ensure %s is joined to portal", user.mxid)
if self.sync_matrix_state:
await self.main_intent.get_joined_members(self.mxid)
async def create_matrix_room(
self,
user: au.AbstractUser,
entity: TypeChat | User = None,
invites: InviteList = None,
update_if_exists: bool = True,
) -> RoomID | None:
if self.mxid:
if update_if_exists:
if not entity:
try:
entity = await self.get_entity(user)
except Exception:
self.log.exception(f"Failed to get entity through {user.tgid} for update")
return self.mxid
update = self.update_matrix_room(user, entity, self.peer_type == "user")
asyncio.create_task(update)
await self.invite_to_matrix(invites or [])
return self.mxid
async with self._room_create_lock:
try:
return await self._create_matrix_room(user, entity, invites)
except Exception:
self.log.exception("Fatal error creating Matrix room")
@property
def bridge_info_state_key(self) -> str:
return f"net.maunium.telegram://telegram/{self.tgid}"
@property
def bridge_info(self) -> dict[str, Any]:
info = {
"bridgebot": self.az.bot_mxid,
"creator": self.main_intent.mxid,
"protocol": {
"id": "telegram",
"displayname": "Telegram",
"avatar_url": self.config["appservice.bot_avatar"],
"external_url": "https://telegram.org",
},
"channel": {
"id": str(self.tgid),
"displayname": self.title,
"avatar_url": self.avatar_url,
},
}
if self.username:
info["channel"]["external_url"] = f"https://t.me/{self.username}"
elif self.peer_type == "user":
# TODO this doesn't feel very reliable
puppet = p.Puppet.by_tgid.get(self.tgid, None)
if puppet and puppet.username:
info["channel"]["external_url"] = f"https://t.me/{puppet.username}"
return info
async def update_bridge_info(self) -> None:
if not self.mxid:
self.log.debug("Not updating bridge info: no Matrix room created")
return
try:
self.log.debug("Updating bridge info...")
await self.main_intent.send_state_event(
self.mxid, StateBridge, self.bridge_info, self.bridge_info_state_key
)
# TODO remove this once https://github.com/matrix-org/matrix-doc/pull/2346 is in spec
await self.main_intent.send_state_event(
self.mxid, StateHalfShotBridge, self.bridge_info, self.bridge_info_state_key
)
except Exception:
self.log.warning("Failed to update bridge info", exc_info=True)
async def _create_matrix_room(
self, user: au.AbstractUser, entity: TypeChat | User, invites: InviteList
) -> RoomID | None:
if self.mxid:
return self.mxid
elif not self.allow_bridging:
return None
direct = self.peer_type == "user"
invites = invites or []
if not entity:
entity = await self.get_entity(user)
self.log.trace("Fetched data: %s", entity)
self.log.debug("Creating room")
try:
self.title = entity.title
except AttributeError:
self.title = None
if direct and self.tgid == user.tgid:
self.title = "Telegram Saved Messages"
self.about = "Your Telegram cloud storage chat"
puppet = await p.Puppet.get_by_tgid(self.tgid) if direct else None
if puppet:
await puppet.update_info(user, entity)
self._main_intent = puppet.intent_for(self) if direct else self.az.intent
if self.peer_type == "channel":
self.megagroup = entity.megagroup
preset = RoomCreatePreset.PRIVATE
if self.peer_type == "channel" and entity.username:
if self.public_portals:
preset = RoomCreatePreset.PUBLIC
self.username = entity.username
alias = self.alias_localpart
else:
# TODO invite link alias?
alias = None
if alias:
# TODO? properly handle existing room aliases
await self.main_intent.remove_room_alias(alias)
power_levels = putil.get_base_power_levels(self, entity=entity)
users = None
if not direct:
users = await self._get_users(user, entity)
if self.has_bot:
extra_invites = self.config["bridge.relaybot.group_chat_invite"]
invites += extra_invites
for invite in extra_invites:
power_levels.users.setdefault(invite, 100)
await putil.participants_to_power_levels(self, users, power_levels)
elif self.bot and self.tg_receiver == self.bot.tgid:
invites = self.config["bridge.relaybot.private_chat.invite"]
for invite in invites:
power_levels.users.setdefault(invite, 100)
self.title = puppet.displayname
initial_state = [
{
"type": EventType.ROOM_POWER_LEVELS.serialize(),
"content": power_levels.serialize(),
},
{
"type": str(StateBridge),
"state_key": self.bridge_info_state_key,
"content": self.bridge_info,
},
{
# TODO remove this once https://github.com/matrix-org/matrix-doc/pull/2346 is in spec
"type": str(StateHalfShotBridge),
"state_key": self.bridge_info_state_key,
"content": self.bridge_info,
},
]
create_invites = []
if self.config["bridge.encryption.default"] and self.matrix.e2ee:
self.encrypted = True
initial_state.append(
{
"type": "m.room.encryption",
"content": {"algorithm": "m.megolm.v1.aes-sha2"},
}
)
if direct:
create_invites.append(self.az.bot_mxid)
if direct and (self.encrypted or self.private_chat_portal_meta):
self.title = puppet.displayname
if self.config["appservice.community_id"]:
initial_state.append(
{
"type": "m.room.related_groups",
"content": {"groups": [self.config["appservice.community_id"]]},
}
)
creation_content = {}
if not self.config["bridge.federate_rooms"]:
creation_content["m.federate"] = False
with self.backfill_lock:
room_id = await self.main_intent.create_room(
alias_localpart=alias,
preset=preset,
is_direct=direct,
invitees=create_invites,
name=self.title,
topic=self.about,
initial_state=initial_state,
creation_content=creation_content,
)
if not room_id:
raise Exception(f"Failed to create room")
if self.encrypted and self.matrix.e2ee and direct:
try:
await self.az.intent.ensure_joined(room_id)
except Exception:
self.log.warning(f"Failed to add bridge bot to new private chat {room_id}")
self.mxid = room_id
self.by_mxid[self.mxid] = self
await self.save()
await self.az.state_store.set_power_levels(self.mxid, power_levels)
await user.register_portal(self)
await self.invite_to_matrix(invites)
update_room = asyncio.create_task(
self.update_matrix_room(
user, entity, direct, puppet, levels=power_levels, users=users
)
)
if self.config["bridge.backfill.initial_limit"] > 0:
self.log.debug(
"Initial backfill is enabled. Waiting for room members to sync "
"and then starting backfill"
)
await update_room
try:
if isinstance(user, u.User):
await self.backfill(user, is_initial=True)
except Exception:
self.log.exception("Failed to backfill new portal")
return self.mxid
async def _get_users(
self,
user: au.AbstractUser,
entity: TypeInputPeer | InputUser | TypeChat | TypeUser | InputChannel,
) -> list[TypeUser]:
if self.peer_type == "channel" and not self.megagroup and not self.sync_channel_members:
return []
limit = self.max_initial_member_sync
if limit == 0:
return []
return await putil.get_users(user.client, self.tgid, entity, limit, self.peer_type)
async def update_power_levels(
self,
users: list[TypeUser | TypeChatParticipant | TypeChannelParticipant],
levels: PowerLevelStateEventContent = None,
) -> None:
if not levels:
levels = await self.main_intent.get_power_levels(self.mxid)
if await putil.participants_to_power_levels(self, users, levels):
await self.main_intent.set_power_levels(self.mxid, levels)
async def _add_bot_chat(self, bot: User) -> None:
if self.bot and bot.id == self.bot.tgid:
await self.bot.add_chat(self.tgid, self.peer_type)
return
user = await u.User.get_by_tgid(TelegramID(bot.id))
if user and user.is_bot:
await user.register_portal(self)
async def _sync_telegram_users(self, source: au.AbstractUser, users: list[User]) -> None:
allowed_tgids = set()
skip_deleted = self.config["bridge.skip_deleted_members"]
for entity in users:
puppet = await p.Puppet.get_by_tgid(TelegramID(entity.id))
if entity.bot:
await self._add_bot_chat(entity)
allowed_tgids.add(entity.id)
await puppet.update_info(source, entity)
if skip_deleted and entity.deleted:
continue
await puppet.intent_for(self).ensure_joined(self.mxid)
user = await u.User.get_by_tgid(TelegramID(entity.id))
if user:
await self.invite_to_matrix(user.mxid)
# We can't trust the member list if any of the following cases is true:
# * There are close to 10 000 users, because Telegram might not be sending all members.
# * The member sync count is limited, because then we might ignore some members.
# * It's a channel, because non-admins don't have access to the member list.
trust_member_list = (
len(allowed_tgids) < 9900
if self.max_initial_member_sync < 0
else len(allowed_tgids) < self.max_initial_member_sync - 10
) and (self.megagroup or self.peer_type != "channel")
if not trust_member_list:
return
for user_mxid in await self.main_intent.get_room_members(self.mxid):
if user_mxid == self.az.bot_mxid:
continue
puppet_id = p.Puppet.get_id_from_mxid(user_mxid)
if puppet_id:
if puppet_id in allowed_tgids:
continue
if self.bot and puppet_id == self.bot.tgid:
await self.bot.remove_chat(self.tgid)
try:
await self.main_intent.kick_user(
self.mxid, user_mxid, "User had left this Telegram chat."
)
except MForbidden:
pass
continue
mx_user = await u.User.get_by_mxid(user_mxid, create=False)
if mx_user:
if mx_user.tgid in allowed_tgids:
continue
if mx_user.is_bot:
await mx_user.unregister_portal(*self.tgid_full)
if not self.has_bot:
try:
await self.main_intent.kick_user(
self.mxid, mx_user.mxid, "You had left this Telegram chat."
)
except MForbidden:
pass
async def _add_telegram_user(
self, user_id: TelegramID, source: au.AbstractUser | None = None
) -> None:
puppet = await p.Puppet.get_by_tgid(user_id)
if source:
entity: User = await source.client.get_entity(PeerUser(user_id))
await puppet.update_info(source, entity)
await puppet.intent_for(self).ensure_joined(self.mxid)
user = await u.User.get_by_tgid(user_id)
if user:
await user.register_portal(self)
await self.invite_to_matrix(user.mxid)
async def _delete_telegram_user(self, user_id: TelegramID, sender: p.Puppet) -> None:
puppet = await p.Puppet.get_by_tgid(user_id)
user = await u.User.get_by_tgid(user_id)
kick_message = (
f"Kicked by {sender.displayname}"
if sender and sender.tgid != puppet.tgid
else "Left Telegram chat"
)
puppet_extra_content = None
if sender.is_real_user:
puppet_extra_content = {DOUBLE_PUPPET_SOURCE_KEY: self.bridge.name}
if sender.tgid != puppet.tgid:
try:
await sender.intent_for(self).kick_user(
self.mxid, puppet.mxid, extra_content=puppet_extra_content
)
except MForbidden:
await self.main_intent.kick_user(self.mxid, puppet.mxid, kick_message)
else:
await puppet.intent_for(self).leave_room(self.mxid, extra_content=puppet_extra_content)
if user:
await user.unregister_portal(*self.tgid_full)
if sender.tgid != puppet.tgid:
try:
await sender.intent_for(self).kick_user(
self.mxid, user.mxid, extra_content=puppet_extra_content
)
return
except MForbidden:
pass
try:
await self.main_intent.kick_user(self.mxid, user.mxid, kick_message)
except MForbidden as e:
self.log.warning(f"Failed to kick {user.mxid}: {e}")
async def update_info(self, user: au.AbstractUser, entity: TypeChat = None) -> None:
if self.peer_type == "user":
self.log.warning("Called update_info() for direct chat portal")
return
changed = False
self.log.debug("Updating info")
try:
if not entity:
entity = await self.get_entity(user)
self.log.trace("Fetched data: %s", entity)
if self.peer_type == "channel":
changed = self.megagroup != entity.megagroup or changed
self.megagroup = entity.megagroup
changed = await self._update_username(entity.username) or changed
if hasattr(entity, "about"):
changed = self._update_about(entity.about) or changed
changed = await self._update_title(entity.title) or changed
if isinstance(entity.photo, ChatPhoto):
changed = await self._update_avatar(user, entity.photo) or changed
except Exception:
self.log.exception(f"Failed to update info from source {user.tgid}")
if changed:
await self.save()
await self.update_bridge_info()
async def _update_username(self, username: str, save: bool = False) -> bool:
if self.username == username:
return False
if self.username:
await self.main_intent.remove_room_alias(self.alias_localpart)
self.username = username or None
if self.username:
await self.main_intent.add_room_alias(self.mxid, self.alias_localpart, override=True)
if self.public_portals:
await self.main_intent.set_join_rule(self.mxid, JoinRule.PUBLIC)
else:
await self.main_intent.set_join_rule(self.mxid, JoinRule.INVITE)
if save:
await self.save()
return True
async def _try_set_state(
self, sender: p.Puppet | None, evt_type: EventType, content: StateEventContent
) -> None:
if sender:
try:
intent = sender.intent_for(self)
if sender.is_real_user:
content[DOUBLE_PUPPET_SOURCE_KEY] = self.bridge.name
await intent.send_state_event(self.mxid, evt_type, content)
except MForbidden:
await self.main_intent.send_state_event(self.mxid, evt_type, content)
else:
await self.main_intent.send_state_event(self.mxid, evt_type, content)
async def _update_about(
self, about: str, sender: p.Puppet | None = None, save: bool = False
) -> bool:
if self.about == about:
return False
self.about = about
await self._try_set_state(
sender, EventType.ROOM_TOPIC, RoomTopicStateEventContent(topic=self.about)
)
if save:
await self.save()
return True
async def _update_title(
self, title: str, sender: p.Puppet | None = None, save: bool = False
) -> bool:
if self.title == title:
return False
self.title = title
await self._try_set_state(
sender, EventType.ROOM_NAME, RoomNameStateEventContent(name=self.title)
)
if save:
await self.save()
return True
async def _update_avatar(
self,
user: au.AbstractUser,
photo: TypeChatPhoto,
sender: p.Puppet | None = None,
save: bool = False,
) -> bool:
if isinstance(photo, (ChatPhoto, UserProfilePhoto)):
loc = InputPeerPhotoFileLocation(
peer=await self.get_input_entity(user), photo_id=photo.photo_id, big=True
)
photo_id = str(photo.photo_id)
elif isinstance(photo, Photo):
loc, _ = self._get_largest_photo_size(photo)
photo_id = str(loc.id)
elif isinstance(photo, (UserProfilePhotoEmpty, ChatPhotoEmpty, PhotoEmpty, type(None))):
photo_id = ""
loc = None
else:
raise ValueError(f"Unknown photo type {type(photo)}")
if (
self.peer_type == "user"
and not photo_id
and not self.config["bridge.allow_avatar_remove"]
):
return False
if self.photo_id != photo_id:
if not photo_id:
await self._try_set_state(
sender, EventType.ROOM_AVATAR, RoomAvatarStateEventContent(url=None)
)
self.photo_id = ""
self.avatar_url = None
if save:
await self.save()
return True
file = await util.transfer_file_to_matrix(user.client, self.main_intent, loc)
if file:
await self._try_set_state(
sender, EventType.ROOM_AVATAR, RoomAvatarStateEventContent(url=file.mxc)
)
self.photo_id = photo_id
self.avatar_url = file.mxc
if save:
await self.save()
return True
return False
# endregion
# region Matrix -> Telegram bridging
async def _send_delivery_receipt(
self, event_id: EventID, room_id: RoomID | None = None
) -> None:
# TODO maybe check if the bot is in the room rather than assuming based on self.encrypted
if (
event_id
and self.config["bridge.delivery_receipts"]
and (self.encrypted or self.peer_type != "user")
):
try:
await self.az.intent.mark_read(room_id or self.mxid, event_id)
except Exception:
self.log.exception("Failed to send delivery receipt for %s", event_id)
async def _get_state_change_message(
self, event: str, user: u.User, **kwargs: Any
) -> str | None:
tpl = self.get_config(f"state_event_formats.{event}")
if len(tpl) == 0:
# Empty format means they don't want the message
return None
displayname = await self.get_displayname(user)
tpl_args = {
"mxid": user.mxid,
"username": user.mxid_localpart,
"displayname": escape_html(displayname),
**kwargs,
}
return Template(tpl).safe_substitute(tpl_args)
async def _send_state_change_message(
self, event: str, user: u.User, event_id: EventID, **kwargs: Any
) -> None:
if not self.has_bot:
return
elif (
self.peer_type == "user"
and not self.config["bridge.relaybot.private_chat.state_changes"]
):
return
async with self.send_lock(self.bot.tgid):
message = await self._get_state_change_message(event, user, **kwargs)
if not message:
return
message, entities = await formatter.matrix_to_telegram(self.bot.client, html=message)
response = await self.bot.client.send_message(
self.peer, message, formatting_entities=entities
)
space = self.tgid if self.peer_type == "channel" else self.bot.tgid
self.dedup.check(response, (event_id, space))
async def name_change_matrix(
self, user: u.User, displayname: str, prev_displayname: str, event_id: EventID
) -> None:
await self._send_state_change_message(
"name_change",
user,
event_id,
displayname=displayname,
prev_displayname=prev_displayname,
)
async def get_displayname(self, user: u.User) -> str:
return await self.main_intent.get_room_displayname(self.mxid, user.mxid) or user.mxid
def set_typing(
self, user: u.User, typing: bool = True, action: type = SendMessageTypingAction
) -> Awaitable[bool]:
return user.client(
SetTypingRequest(self.peer, action() if typing else SendMessageCancelAction())
)
async def _get_sponsored_message(
self, user: u.User
) -> tuple[SponsoredMessage | None, Channel | User | None]:
if user.is_bot:
return None, None
elif self._sponsored_msg_ts + 5 * 60 > time.monotonic():
return self._sponsored_msg, self._sponsored_entity
self.log.trace(f"Fetching a new sponsored message through {user.mxid}")
self._sponsored_msg, t_id, self._sponsored_entity = await putil.get_sponsored_message(
user, await self.get_input_entity(user)
)
self._sponsored_msg_ts = time.monotonic()
if self._sponsored_msg is not None and self._sponsored_entity is None:
self.log.warning(f"GetSponsoredMessages didn't return entity for {t_id}")
return self._sponsored_msg, self._sponsored_entity
async def _send_sponsored_msg(self, user: u.User) -> None:
msg, entity = await self._get_sponsored_message(user)
if msg is None:
self.log.trace("Didn't get a sponsored message")
return
if self.sponsored_event_id is not None:
self.log.debug(
f"Redacting old sponsored {self.sponsored_event_id}"
" in preparation for sending new one"
)
await self.main_intent.redact(self.mxid, self.sponsored_event_id)
content = await putil.make_sponsored_message_content(user, msg, entity)
self.log.trace("Sending sponsored message")
self.sponsored_event_id = await self._send_message(self.main_intent, content)
self.sponsored_event_ts = int(time.time())
self.sponsored_msg_random_id = msg.random_id
self._new_messages_after_sponsored = False
self._sponsored_seen = {}
await self.save()
self.log.debug(
f"Sent sponsored message {base64.b64encode(self.sponsored_msg_random_id)} "
f"to Matrix {self.sponsored_event_id} / {self.sponsored_event_ts}"
)
@property
def _sponsored_is_expired(self) -> bool:
return (
self.sponsored_event_id is None
or self.sponsored_event_ts + 24 * 60 * 60 < int(time.time())
) and self._new_messages_after_sponsored
async def _try_handle_read_for_sponsored_msg(
self, user: u.User, event_id: EventID, timestamp: int
) -> None:
try:
await self._handle_read_for_sponsored_msg(user, event_id, timestamp)
except Exception:
self.log.warning(
"Error handling read receipt for sponsored message processing", exc_info=True
)
async def _handle_read_for_sponsored_msg(
self, user: u.User, event_id: EventID, timestamp: int
) -> None:
if user.is_bot or not self.username:
return
if self._sponsored_is_expired:
self.log.trace("Sponsored message is expired, sending new one")
async with self._sponsored_msg_lock:
if self._sponsored_is_expired:
await self._send_sponsored_msg(user)
return
if (
self.sponsored_event_id == event_id or self.sponsored_event_ts <= timestamp
) and not self._sponsored_seen.get(user.mxid, False):
self._sponsored_seen[user.mxid] = True
self.log.debug(
f"Marking sponsored message {self.sponsored_event_id} as seen by {user.mxid}"
)
await user.client(
ViewSponsoredMessageRequest(
channel=await self.get_input_entity(user),
random_id=self.sponsored_msg_random_id,
)
)
async def mark_read(self, user: u.User, event_id: EventID, timestamp: int) -> None:
if user.is_bot:
return
space = self.tgid if self.peer_type == "channel" else user.tgid
message = await DBMessage.get_by_mxid(event_id, self.mxid, space)
if not message:
message = await DBMessage.find_last(self.mxid, space)
if not message:
self.log.debug(
f"Dropping Matrix read receipt from {user.mxid}: "
f"target message {event_id} not known and last message in chat not found"
)
return
else:
self.log.debug(
f"Matrix read receipt target {event_id} not known, marking "
f"messages up to most recent ({message.mxid}/{message.tgid}) "
f"as read by {user.mxid}/{user.tgid}"
)
else:
self.log.debug(
"Handling Matrix read receipt: marking messages up to "
f"{message.mxid}/{message.tgid} as read by {user.mxid}/{user.tgid}"
)
await user.client.send_read_acknowledge(
self.peer, max_id=message.tgid, clear_mentions=True
)
if self.peer_type == "channel" and not self.megagroup:
asyncio.create_task(self._try_handle_read_for_sponsored_msg(user, event_id, timestamp))
async def _preproc_kick_ban(
self, user: u.User | p.Puppet, source: u.User
) -> au.AbstractUser | None:
if user.tgid == source.tgid:
return None
if self.peer_type == "user" and user.tgid == self.tgid:
await self.delete()
return None
if isinstance(user, u.User) and await user.needs_relaybot(self):
if not self.bot:
return None
# TODO kick message
return None
if await source.needs_relaybot(self):
if not self.has_bot:
return None
return self.bot
return source
async def kick_matrix(self, user: u.User | p.Puppet, source: u.User) -> None:
source = await self._preproc_kick_ban(user, source)
if source is not None:
await source.client.kick_participant(self.peer, user.peer)
async def ban_matrix(self, user: u.User | p.Puppet, source: u.User):
source = await self._preproc_kick_ban(user, source)
if source is not None:
await source.client.edit_permissions(self.peer, user.peer, view_messages=False)
async def leave_matrix(self, user: u.User, event_id: EventID) -> None:
if await user.needs_relaybot(self):
await self._send_state_change_message("leave", user, event_id)
return
if self.peer_type == "user":
await self.main_intent.leave_room(self.mxid)
await self.delete()
try:
del self.by_tgid[self.tgid_full]
del self.by_mxid[self.mxid]
except KeyError:
pass
elif self.config["bridge.kick_on_logout"]:
await user.client.delete_dialog(self.peer)
async def join_matrix(self, user: u.User, event_id: EventID) -> None:
if await user.needs_relaybot(self):
await self._send_state_change_message("join", user, event_id)
return
if self.peer_type == "channel" and not user.is_bot:
await user.client(JoinChannelRequest(channel=await self.get_input_entity(user)))
else:
# We'll just assume the user is already in the chat.
pass
async def _apply_msg_format(self, sender: u.User, content: MessageEventContent) -> None:
if not isinstance(content, TextMessageEventContent) or content.format != Format.HTML:
content.format = Format.HTML
content.formatted_body = escape_html(content.body).replace("\n", "<br/>")
tpl = (
self.get_config(f"message_formats.[{content.msgtype.value}]")
or "<b>$sender_displayname</b>: $message"
)
displayname = await self.get_displayname(sender)
tpl_args = dict(
sender_mxid=sender.mxid,
sender_username=sender.mxid_localpart,
sender_displayname=escape_html(displayname),
message=content.formatted_body,
body=content.body,
formatted_body=content.formatted_body,
)
content.formatted_body = Template(tpl).safe_substitute(tpl_args)
async def _apply_emote_format(self, sender: u.User, content: TextMessageEventContent) -> None:
if content.format != Format.HTML:
content.format = Format.HTML
content.formatted_body = escape_html(content.body).replace("\n", "<br/>")
tpl = self.get_config("emote_format")
puppet = await p.Puppet.get_by_tgid(sender.tgid)
content.formatted_body = Template(tpl).safe_substitute(
dict(
sender_mxid=sender.mxid,
sender_username=sender.mxid_localpart,
sender_displayname=escape_html(await self.get_displayname(sender)),
mention=f"<a href='https://matrix.to/#/{puppet.mxid}'>{puppet.displayname}</a>",
username=sender.tg_username,
displayname=puppet.displayname,
body=content.body,
formatted_body=content.formatted_body,
)
)
content.msgtype = MessageType.TEXT
async def _pre_process_matrix_message(
self, sender: u.User, use_relaybot: bool, content: MessageEventContent
) -> None:
if use_relaybot:
await self._apply_msg_format(sender, content)
elif content.msgtype == MessageType.EMOTE:
await self._apply_emote_format(sender, content)
async def _handle_matrix_text(
self,
sender: u.User,
logged_in: bool,
event_id: EventID,
space: TelegramID,
client: MautrixTelegramClient,
content: TextMessageEventContent,
reply_to: TelegramID | None,
) -> None:
message, entities = await formatter.matrix_to_telegram(
client, text=content.body, html=content.formatted(Format.HTML)
)
sender_id = sender.tgid if logged_in else self.bot.tgid
async with self.send_lock(sender_id):
lp = self.get_config("telegram_link_preview")
if content.get_edit():
orig_msg = await DBMessage.get_by_mxid(content.get_edit(), self.mxid, space)
if orig_msg:
response = await client.edit_message(
self.peer,
orig_msg.tgid,
message,
formatting_entities=entities,
link_preview=lp,
)
await self._add_telegram_message_to_db(event_id, space, -1, response)
return
try:
response = await client.send_message(
self.peer,
message,
reply_to=reply_to,
formatting_entities=entities,
link_preview=lp,
)
except Exception:
raise
else:
sender.send_remote_checkpoint(
MessageSendCheckpointStatus.SUCCESS,
event_id,
self.mxid,
EventType.ROOM_MESSAGE,
message_type=content.msgtype,
)
await self._add_telegram_message_to_db(event_id, space, 0, response)
await self._send_delivery_receipt(event_id)
async def _handle_matrix_file(
self,
sender: u.User,
logged_in: bool,
event_id: EventID,
space: TelegramID,
client: MautrixTelegramClient,
content: MediaMessageEventContent,
reply_to: TelegramID,
caption: TextMessageEventContent = None,
) -> None:
sender_id = sender.tgid if logged_in else self.bot.tgid
mime = content.info.mimetype
if isinstance(content.info, (ImageInfo, VideoInfo)):
w, h = content.info.width, content.info.height
else:
w = h = None
file_name = content["net.maunium.telegram.internal.filename"]
max_image_size = self.config["bridge.image_as_file_size"] * 1000 ** 2
if self.config["bridge.parallel_file_transfer"] and content.url:
file_handle, file_size = await util.parallel_transfer_to_telegram(
client, self.main_intent, content.url, sender_id
)
else:
if content.file:
if not decrypt_attachment:
raise Exception(
f"Can't bridge encrypted media event {event_id}: "
"encryption dependencies not installed"
)
file = await self.main_intent.download_media(content.file.url)
file = decrypt_attachment(
file, content.file.key.key, content.file.hashes.get("sha256"), content.file.iv
)
else:
file = await self.main_intent.download_media(content.url)
if content.msgtype == MessageType.STICKER:
if mime != "image/gif":
mime, file, w, h = util.convert_image(
file, source_mime=mime, target_type="webp"
)
else:
# Remove sticker description
file_name = "sticker.gif"
file_handle = await client.upload_file(file)
file_size = len(file)
file_handle.name = file_name
attributes = [DocumentAttributeFilename(file_name=file_name)]
if w and h:
attributes.append(DocumentAttributeImageSize(w, h))
if (mime == "image/png" or mime == "image/jpeg") and file_size < max_image_size:
media = InputMediaUploadedPhoto(file_handle)
else:
media = InputMediaUploadedDocument(
file=file_handle,
attributes=attributes,
mime_type=mime or "application/octet-stream",
)
capt, entities = (
await formatter.matrix_to_telegram(
client, text=caption.body, html=caption.formatted(Format.HTML)
)
if caption
else (None, None)
)
async with self.send_lock(sender_id):
if await self._matrix_document_edit(client, content, space, capt, media, event_id):
return
try:
try:
response = await client.send_media(
self.peer, media, reply_to=reply_to, caption=capt, entities=entities
)
except (
PhotoInvalidDimensionsError,
PhotoSaveFileInvalidError,
PhotoExtInvalidError,
):
media = InputMediaUploadedDocument(
file=media.file, mime_type=mime, attributes=attributes
)
response = await client.send_media(
self.peer, media, reply_to=reply_to, caption=capt, entities=entities
)
except Exception:
raise
else:
sender.send_remote_checkpoint(
MessageSendCheckpointStatus.SUCCESS,
event_id,
self.mxid,
EventType.ROOM_MESSAGE,
message_type=content.msgtype,
)
await self._add_telegram_message_to_db(event_id, space, 0, response)
await self._send_delivery_receipt(event_id)
async def _matrix_document_edit(
self,
client: MautrixTelegramClient,
content: MessageEventContent,
space: TelegramID,
caption: str,
media: Any,
event_id: EventID,
) -> bool:
if content.get_edit():
orig_msg = await DBMessage.get_by_mxid(content.get_edit(), self.mxid, space)
if orig_msg:
response = await client.edit_message(self.peer, orig_msg.tgid, caption, file=media)
await self._add_telegram_message_to_db(event_id, space, -1, response)
await self._send_delivery_receipt(event_id)
return True
return False
async def _handle_matrix_location(
self,
sender: u.User,
logged_in: bool,
event_id: EventID,
space: TelegramID,
client: MautrixTelegramClient,
content: LocationMessageEventContent,
reply_to: TelegramID,
) -> None:
sender_id = sender.tgid if logged_in else self.bot.tgid
try:
lat, long = content.geo_uri[len("geo:") :].split(";")[0].split(",")
lat, long = float(lat), float(long)
except (KeyError, ValueError):
self.log.exception("Failed to parse location")
return None
caption, entities = await formatter.matrix_to_telegram(client, text=content.body)
media = MessageMediaGeo(geo=GeoPoint(lat=lat, long=long, access_hash=0))
async with self.send_lock(sender_id):
if await self._matrix_document_edit(client, content, space, caption, media, event_id):
return
try:
response = await client.send_media(
self.peer, media, reply_to=reply_to, caption=caption, entities=entities
)
except Exception:
raise
else:
await self._add_telegram_message_to_db(event_id, space, 0, response)
sender.send_remote_checkpoint(
MessageSendCheckpointStatus.SUCCESS,
event_id,
self.mxid,
EventType.ROOM_MESSAGE,
message_type=content.msgtype,
)
await self._send_delivery_receipt(event_id)
async def _add_telegram_message_to_db(
self, event_id: EventID, space: TelegramID, edit_index: int, response: TypeMessage
) -> None:
self.log.trace("Handled Matrix message: %s", response)
self.dedup.check(response, (event_id, space), force_hash=edit_index != 0)
if edit_index < 0:
prev_edit = await DBMessage.get_one_by_tgid(TelegramID(response.id), space, -1)
edit_index = prev_edit.edit_index + 1
await DBMessage(
tgid=TelegramID(response.id),
tg_space=space,
mx_room=self.mxid,
mxid=event_id,
edit_index=edit_index,
).insert()
async def _send_bridge_error(
self,
sender: u.User,
err: Exception,
event_id: EventID,
event_type: EventType,
message_type: MessageType | None = None,
msg: str | None = None,
) -> None:
sender.send_remote_checkpoint(
MessageSendCheckpointStatus.PERM_FAILURE,
event_id,
self.mxid,
event_type,
message_type=message_type,
error=err,
)
if msg and self.config["bridge.delivery_error_reports"]:
await self._send_message(
self.main_intent, TextMessageEventContent(msgtype=MessageType.NOTICE, body=msg)
)
async def handle_matrix_message(
self, sender: u.User, content: MessageEventContent, event_id: EventID
) -> None:
try:
await self._handle_matrix_message(sender, content, event_id)
except RPCError as e:
self.log.exception(f"RPCError while bridging {event_id}: {e}")
await self._send_bridge_error(
sender,
e,
event_id,
EventType.ROOM_MESSAGE,
message_type=content.msgtype,
msg=f"\u26a0 Your message may not have been bridged: {e}",
)
raise
except Exception as e:
self.log.exception(f"Failed to bridge {event_id}: {e}")
await self._send_bridge_error(
sender,
e,
event_id,
EventType.ROOM_MESSAGE,
message_type=content.msgtype,
)
async def _handle_matrix_message(
self, sender: u.User, content: MessageEventContent, event_id: EventID
) -> None:
if not content.body or not content.msgtype:
self.log.debug(f"Ignoring message {event_id} in {self.mxid} without body or msgtype")
return
logged_in = not await sender.needs_relaybot(self)
client = sender.client if logged_in else self.bot.client
space = (
self.tgid
if self.peer_type == "channel" # Channels have their own ID space
else (sender.tgid if logged_in else self.bot.tgid)
)
reply_to = await formatter.matrix_reply_to_telegram(content, space, room_id=self.mxid)
media = (
MessageType.STICKER,
MessageType.IMAGE,
MessageType.FILE,
MessageType.AUDIO,
MessageType.VIDEO,
)
if content.msgtype == MessageType.NOTICE:
bridge_notices = self.get_config("bridge_notices.default")
excepted = sender.mxid in self.get_config("bridge_notices.exceptions")
if not bridge_notices and not excepted:
raise Exception("Notices are not configured to be bridged.")
if content.msgtype in (MessageType.TEXT, MessageType.EMOTE, MessageType.NOTICE):
await self._pre_process_matrix_message(sender, not logged_in, content)
await self._handle_matrix_text(
sender, logged_in, event_id, space, client, content, reply_to
)
elif content.msgtype == MessageType.LOCATION:
await self._pre_process_matrix_message(sender, not logged_in, content)
await self._handle_matrix_location(
sender, logged_in, event_id, space, client, content, reply_to
)
elif content.msgtype in media:
content["net.maunium.telegram.internal.filename"] = content.body
try:
caption_content: MessageEventContent = sender.command_status["caption"]
reply_to = reply_to or await formatter.matrix_reply_to_telegram(
caption_content, space, room_id=self.mxid
)
sender.command_status = None
except (KeyError, TypeError):
caption_content = None if logged_in else TextMessageEventContent(body=content.body)
if caption_content:
caption_content.msgtype = content.msgtype
await self._pre_process_matrix_message(sender, not logged_in, caption_content)
await self._handle_matrix_file(
sender, logged_in, event_id, space, client, content, reply_to, caption_content
)
else:
self.log.debug(
f"Didn't handle Matrix event {event_id} due to unknown msgtype {content.msgtype}"
)
self.log.trace("Unhandled Matrix event content: %s", content)
raise Exception(f"Unhandled msgtype {content.msgtype}")
async def handle_matrix_unpin_all(self, sender: u.User, pin_event_id: EventID) -> None:
await sender.client(UnpinAllMessagesRequest(peer=self.peer))
await self._send_delivery_receipt(pin_event_id)
async def handle_matrix_pin(
self, sender: u.User, changes: dict[EventID, bool], pin_event_id: EventID
) -> None:
tg_space = self.tgid if self.peer_type == "channel" else sender.tgid
ids = {
msg.mxid: msg.tgid
for msg in await DBMessage.get_by_mxids(
list(changes.keys()), mx_room=self.mxid, tg_space=tg_space
)
}
for event_id, pinned in changes.items():
try:
await sender.client(
UpdatePinnedMessageRequest(peer=self.peer, id=ids[event_id], unpin=not pinned)
)
except (ChatNotModifiedError, MessageIdInvalidError, KeyError):
pass
await self._send_delivery_receipt(pin_event_id)
async def handle_matrix_deletion(
self, deleter: u.User, event_id: EventID, redaction_event_id: EventID
) -> None:
try:
await self._handle_matrix_deletion(deleter, event_id)
except Exception as e:
self.log.debug(str(e))
await self._send_bridge_error(deleter, e, redaction_event_id, EventType.ROOM_REDACTION)
else:
deleter.send_remote_checkpoint(
MessageSendCheckpointStatus.SUCCESS,
redaction_event_id,
self.mxid,
EventType.ROOM_REDACTION,
)
await self._send_delivery_receipt(redaction_event_id)
async def _handle_matrix_deletion(self, deleter: u.User, event_id: EventID) -> None:
real_deleter = deleter if not await deleter.needs_relaybot(self) else self.bot
space = self.tgid if self.peer_type == "channel" else real_deleter.tgid
message = await DBMessage.get_by_mxid(event_id, self.mxid, space)
if not message:
raise Exception(f"Ignoring Matrix redaction of unknown event {event_id}")
elif message.redacted:
raise Exception(
"Ignoring Matrix redaction of already redacted event "
f"{message.mxid} in {message.mx_room}"
)
elif message.edit_index != 0:
await message.mark_redacted()
raise Exception(
f"Ignoring Matrix redaction of edit event {message.mxid} in {message.mx_room}"
)
else:
await message.mark_redacted()
await real_deleter.client.delete_messages(self.peer, [message.tgid])
async def _update_telegram_power_level(
self, sender: u.User, user_id: TelegramID, level: int
) -> None:
moderator = level >= 50
admin = level >= 75
await sender.client.edit_admin(
self.peer,
user_id,
change_info=moderator,
post_messages=moderator,
edit_messages=moderator,
delete_messages=moderator,
ban_users=moderator,
invite_users=moderator,
pin_messages=moderator,
add_admins=admin,
)
async def handle_matrix_power_levels(
self,
sender: u.User,
new_users: dict[UserID, int],
old_users: dict[UserID, int],
event_id: EventID | None,
) -> None:
# TODO handle all power level changes and bridge exact admin rights to supergroups/channels
for user, level in new_users.items():
if not user or user == self.main_intent.mxid or user == sender.mxid:
continue
user_id = p.Puppet.get_id_from_mxid(user)
if not user_id:
mx_user = await u.User.get_by_mxid(user, create=False)
if not mx_user or not mx_user.tgid:
continue
user_id = mx_user.tgid
if not user_id or user_id == sender.tgid:
continue
if user not in old_users or level != old_users[user]:
await self._update_telegram_power_level(sender, user_id, level)
async def handle_matrix_about(self, sender: u.User, about: str, event_id: EventID) -> None:
if self.peer_type not in ("chat", "channel"):
return
peer = await self.get_input_entity(sender)
await sender.client(EditChatAboutRequest(peer=peer, about=about))
self.about = about
await self.save()
await self._send_delivery_receipt(event_id)
async def handle_matrix_title(self, sender: u.User, title: str, event_id: EventID) -> None:
if self.peer_type not in ("chat", "channel"):
return
if self.peer_type == "chat":
response = await sender.client(EditChatTitleRequest(chat_id=self.tgid, title=title))
else:
channel = await self.get_input_entity(sender)
response = await sender.client(EditTitleRequest(channel=channel, title=title))
self.dedup.register_outgoing_actions(response)
self.title = title
await self.save()
await self._send_delivery_receipt(event_id)
await self.update_bridge_info()
async def handle_matrix_avatar(
self, sender: u.User, url: ContentURI, event_id: EventID
) -> None:
if self.peer_type not in ("chat", "channel"):
# Invalid peer type
return
elif self.avatar_url == url:
return
self.avatar_url = url
file = await self.main_intent.download_media(url)
mime = magic.from_buffer(file, mime=True)
ext = sane_mimetypes.guess_extension(mime)
uploaded = await sender.client.upload_file(file, file_name=f"avatar{ext}")
photo = InputChatUploadedPhoto(file=uploaded)
if self.peer_type == "chat":
response = await sender.client(EditChatPhotoRequest(chat_id=self.tgid, photo=photo))
else:
channel = await self.get_input_entity(sender)
response = await sender.client(EditPhotoRequest(channel=channel, photo=photo))
self.dedup.register_outgoing_actions(response)
for update in response.updates:
is_photo_update = (
isinstance(update, UpdateNewMessage)
and isinstance(update.message, MessageService)
and isinstance(update.message.action, MessageActionChatEditPhoto)
)
if is_photo_update:
loc, size = self._get_largest_photo_size(update.message.action.photo)
self.photo_id = str(loc.id)
await self.save()
break
await self._send_delivery_receipt(event_id)
await self.update_bridge_info()
async def handle_matrix_upgrade(
self, sender: UserID, new_room: RoomID, event_id: EventID
) -> None:
_, server = self.main_intent.parse_user_id(sender)
old_room = self.mxid
await self.migrate_and_save_matrix(new_room)
await self.main_intent.join_room(new_room, servers=[server])
entity: TypeChat | User | None = None
user: au.AbstractUser | None = None
if self.bot and self.has_bot:
user = self.bot
entity = await self.get_entity(self.bot)
if not entity:
user_mxids = await self.main_intent.get_room_members(self.mxid)
for user_str in user_mxids:
user_id = UserID(user_str)
if user_id == self.az.bot_mxid:
continue
user = await u.User.get_by_mxid(user_id, create=False)
if user and user.tgid:
entity = await self.get_entity(user)
if entity:
break
if not entity:
self.log.error(
"Failed to fully migrate to upgraded Matrix room: no Telegram user found."
)
return
await self.update_matrix_room(user, entity, direct=self.peer_type == "user")
self.log.info(f"{sender} upgraded room from {old_room} to {self.mxid}")
await self._send_delivery_receipt(event_id, room_id=old_room)
async def migrate_and_save_matrix(self, new_id: RoomID) -> None:
try:
del self.by_mxid[self.mxid]
except KeyError:
pass
self.mxid = new_id
self.by_mxid[self.mxid] = self
await self.save()
async def enable_dm_encryption(self) -> bool:
ok = await super().enable_dm_encryption()
if ok:
try:
puppet = await p.Puppet.get_by_tgid(self.tgid)
await self.main_intent.set_room_name(self.mxid, puppet.displayname)
except Exception:
self.log.warning(f"Failed to set room name", exc_info=True)
return ok
# endregion
# region Telegram -> Matrix bridging
async def handle_telegram_typing(self, user: p.Puppet, update: UpdateTyping) -> None:
if user.is_real_user:
# Ignore typing notifications from double puppeted users to avoid echoing
return
is_typing = isinstance(update.action, SendMessageTypingAction)
await user.default_mxid_intent.set_typing(self.mxid, is_typing=is_typing)
def _get_external_url(self, evt: Message) -> str | None:
if self.peer_type == "channel" and self.username is not None:
return f"https://t.me/{self.username}/{evt.id}"
elif self.peer_type != "user":
return f"https://t.me/c/{self.tgid}/{evt.id}"
return None
async def _expire_telegram_photo(self, intent: IntentAPI, event_id: EventID, ttl: int) -> None:
try:
content = TextMessageEventContent(msgtype=MessageType.NOTICE, body="Photo has expired")
content.set_edit(event_id)
await asyncio.sleep(ttl)
await self._send_message(intent, content)
except Exception:
self.log.warning("Failed to expire Telegram photo %s", event_id, exc_info=True)
async def _handle_telegram_photo(
self, source: au.AbstractUser, intent: IntentAPI, evt: Message, relates_to: RelatesTo
) -> EventID | None:
media: MessageMediaPhoto = evt.media
if media.photo is None and media.ttl_seconds:
return await self._send_message(
intent,
TextMessageEventContent(msgtype=MessageType.NOTICE, body="Photo has expired"),
)
loc, largest_size = self._get_largest_photo_size(media.photo)
if loc is None:
content = TextMessageEventContent(
msgtype=MessageType.TEXT,
body="Failed to bridge image",
external_url=self._get_external_url(evt),
)
return await self._send_message(intent, content, timestamp=evt.date)
file = await util.transfer_file_to_matrix(
source.client, intent, loc, encrypt=self.encrypted
)
if not file:
return None
if self.get_config("inline_images") and (evt.message or evt.fwd_from or evt.reply_to):
content = await formatter.telegram_to_matrix(
evt,
source,
self.main_intent,
prefix_html=f"<img src='{file.mxc}' alt='Inline Telegram photo'/><br/>",
prefix_text="Inline image: ",
)
content.external_url = self._get_external_url(evt)
await intent.set_typing(self.mxid, is_typing=False)
return await self._send_message(intent, content, timestamp=evt.date)
info = ImageInfo(
height=largest_size.h,
width=largest_size.w,
orientation=0,
mimetype=file.mime_type,
size=self._photo_size_key(largest_size),
)
ext = sane_mimetypes.guess_extension(file.mime_type)
name = f"disappearing_image{ext}" if media.ttl_seconds else f"image{ext}"
await intent.set_typing(self.mxid, is_typing=False)
content = MediaMessageEventContent(
msgtype=MessageType.IMAGE,
info=info,
body=name,
relates_to=relates_to,
external_url=self._get_external_url(evt),
)
if file.decryption_info:
content.file = file.decryption_info
else:
content.url = file.mxc
result = await self._send_message(intent, content, timestamp=evt.date)
if media.ttl_seconds:
asyncio.create_task(self._expire_telegram_photo(intent, result, media.ttl_seconds))
if evt.message:
caption_content = await formatter.telegram_to_matrix(
evt, source, self.main_intent, no_reply_fallback=True
)
caption_content.external_url = content.external_url
result = await self._send_message(intent, caption_content, timestamp=evt.date)
return result
@staticmethod
def _parse_telegram_document_attributes(attributes: list[TypeDocumentAttribute]) -> DocAttrs:
name, mime_type, is_sticker, sticker_alt, width, height = None, None, False, None, 0, 0
is_gif = False
for attr in attributes:
if isinstance(attr, DocumentAttributeFilename):
name = name or attr.file_name
mime_type, _ = mimetypes.guess_type(attr.file_name)
elif isinstance(attr, DocumentAttributeSticker):
is_sticker = True
sticker_alt = attr.alt
elif isinstance(attr, DocumentAttributeAnimated):
is_gif = True
elif isinstance(attr, DocumentAttributeVideo):
width, height = attr.w, attr.h
elif isinstance(attr, DocumentAttributeImageSize):
width, height = attr.w, attr.h
return DocAttrs(name, mime_type, is_sticker, sticker_alt, width, height, is_gif)
@staticmethod
def _parse_telegram_document_meta(
evt: Message, file: DBTelegramFile, attrs: DocAttrs, thumb_size: TypePhotoSize
) -> tuple[ImageInfo, str]:
document = evt.media.document
name = attrs.name
if attrs.is_sticker:
alt = attrs.sticker_alt
if len(alt) > 0:
try:
name = f"{alt} ({unicodedata.name(alt[0]).lower()})"
except ValueError:
name = alt
generic_types = ("text/plain", "application/octet-stream")
if file.mime_type in generic_types and document.mime_type not in generic_types:
mime_type = document.mime_type or file.mime_type
elif file.mime_type == "application/ogg":
mime_type = "audio/ogg"
else:
mime_type = file.mime_type or document.mime_type
info = ImageInfo(size=file.size, mimetype=mime_type)
if attrs.mime_type and not file.was_converted:
file.mime_type = attrs.mime_type or file.mime_type
if file.width and file.height:
info.width, info.height = file.width, file.height
elif attrs.width and attrs.height:
info.width, info.height = attrs.width, attrs.height
if file.thumbnail:
if file.thumbnail.decryption_info:
info.thumbnail_file = file.thumbnail.decryption_info
else:
info.thumbnail_url = file.thumbnail.mxc
info.thumbnail_info = ThumbnailInfo(
mimetype=file.thumbnail.mime_type,
height=file.thumbnail.height or thumb_size.h,
width=file.thumbnail.width or thumb_size.w,
size=file.thumbnail.size,
)
elif attrs.is_sticker:
# This is a hack for bad clients like Element iOS that require a thumbnail
info.thumbnail_info = ImageInfo.deserialize(info.serialize())
if file.decryption_info:
info.thumbnail_file = file.decryption_info
else:
info.thumbnail_url = file.mxc
return info, name
async def _handle_telegram_document(
self, source: au.AbstractUser, intent: IntentAPI, evt: Message, relates_to: RelatesTo
) -> EventID | None:
document = evt.media.document
attrs = self._parse_telegram_document_attributes(document.attributes)
if document.size > self.config["bridge.max_document_size"] * 1000 ** 2:
name = attrs.name or ""
caption = f"\n{evt.message}" if evt.message else ""
# TODO encrypt
return await intent.send_notice(self.mxid, f"Too large file {name}{caption}")
thumb_loc, thumb_size = self._get_largest_photo_size(document)
if thumb_size and not isinstance(thumb_size, (PhotoSize, PhotoCachedSize)):
self.log.debug(f"Unsupported thumbnail type {type(thumb_size)}")
thumb_loc = None
thumb_size = None
parallel_id = source.tgid if self.config["bridge.parallel_file_transfer"] else None
file = await util.transfer_file_to_matrix(
source.client,
intent,
document,
thumb_loc,
is_sticker=attrs.is_sticker,
tgs_convert=self.config["bridge.animated_sticker"],
filename=attrs.name,
parallel_id=parallel_id,
encrypt=self.encrypted,
)
if not file:
return None
info, name = self._parse_telegram_document_meta(evt, file, attrs, thumb_size)
await intent.set_typing(self.mxid, is_typing=False)
event_type = EventType.ROOM_MESSAGE
# Elements only support images as stickers, so send animated webm stickers as m.video
if attrs.is_sticker and file.mime_type.startswith("image/"):
event_type = EventType.STICKER
# Tell clients to render the stickers as 256x256 if they're bigger
if info.width > 256 or info.height > 256:
if info.width > info.height:
info.height = int(info.height / (info.width / 256))
info.width = 256
else:
info.width = int(info.width / (info.height / 256))
info.height = 256
if info.thumbnail_info:
info.thumbnail_info.width = info.width
info.thumbnail_info.height = info.height
if attrs.is_gif or (attrs.is_sticker and info.mimetype == "video/webm"):
if attrs.is_gif:
info["fi.mau.telegram.gif"] = True
else:
info["fi.mau.telegram.animated_sticker"] = True
info["fi.mau.loop"] = True
info["fi.mau.autoplay"] = True
info["fi.mau.hide_controls"] = True
info["fi.mau.no_audio"] = True
if not name:
ext = sane_mimetypes.guess_extension(file.mime_type)
name = "unnamed_file" + ext
content = MediaMessageEventContent(
body=name,
info=info,
relates_to=relates_to,
external_url=self._get_external_url(evt),
msgtype={
"video/": MessageType.VIDEO,
"audio/": MessageType.AUDIO,
"image/": MessageType.IMAGE,
}.get(info.mimetype[:6], MessageType.FILE),
)
if file.decryption_info:
content.file = file.decryption_info
else:
content.url = file.mxc
res = await self._send_message(intent, content, event_type=event_type, timestamp=evt.date)
if evt.message:
caption_content = await formatter.telegram_to_matrix(
evt, source, self.main_intent, no_reply_fallback=True
)
caption_content.external_url = content.external_url
res = await self._send_message(intent, caption_content, timestamp=evt.date)
return res
def _handle_telegram_location(
self, source: au.AbstractUser, intent: IntentAPI, evt: Message, relates_to: RelatesTo
) -> Awaitable[EventID]:
long = evt.media.geo.long
lat = evt.media.geo.lat
long_char = "E" if long > 0 else "W"
lat_char = "N" if lat > 0 else "S"
geo = f"{round(lat, 6)},{round(long, 6)}"
body = f"{round(abs(lat), 4)}° {lat_char}, {round(abs(long), 4)}° {long_char}"
url = f"https://maps.google.com/?q={geo}"
content = LocationMessageEventContent(
msgtype=MessageType.LOCATION,
geo_uri=f"geo:{geo}",
body=f"Location: {body}\n{url}",
relates_to=relates_to,
external_url=self._get_external_url(evt),
)
content["format"] = str(Format.HTML)
content["formatted_body"] = f"Location: <a href='{url}'>{body}</a>"
return self._send_message(intent, content, timestamp=evt.date)
async def _handle_telegram_text(
self, source: au.AbstractUser, intent: IntentAPI, is_bot: bool, evt: Message
) -> EventID:
self.log.trace(f"Sending {evt.message} to {self.mxid} by {intent.mxid}")
content = await formatter.telegram_to_matrix(evt, source, self.main_intent)
content.external_url = self._get_external_url(evt)
if is_bot and self.get_config("bot_messages_as_notices"):
content.msgtype = MessageType.NOTICE
await intent.set_typing(self.mxid, is_typing=False)
return await self._send_message(intent, content, timestamp=evt.date)
async def _handle_telegram_unsupported(
self, source: au.AbstractUser, intent: IntentAPI, evt: Message, relates_to: RelatesTo
) -> EventID:
override_text = (
"This message is not supported on your version of Mautrix-Telegram. "
"Please check https://github.com/mautrix/telegram or ask your "
"bridge administrator about possible updates."
)
content = await formatter.telegram_to_matrix(
evt, source, self.main_intent, override_text=override_text
)
content.msgtype = MessageType.NOTICE
content.external_url = self._get_external_url(evt)
content["net.maunium.telegram.unsupported"] = True
await intent.set_typing(self.mxid, is_typing=False)
return await self._send_message(intent, content, timestamp=evt.date)
async def _handle_telegram_poll(
self, source: au.AbstractUser, intent: IntentAPI, evt: Message, relates_to: RelatesTo
) -> EventID:
poll: Poll = evt.media.poll
poll_id = self._encode_msgid(source, evt)
_n = 0
def n() -> int:
nonlocal _n
_n += 1
return _n
text_answers = "\n".join(f"{n()}. {answer.text}" for answer in poll.answers)
html_answers = "\n".join(f"<li>{answer.text}</li>" for answer in poll.answers)
content = TextMessageEventContent(
msgtype=MessageType.TEXT,
format=Format.HTML,
body=(
f"Poll: {poll.question}\n{text_answers}\n"
f"Vote with !tg vote {poll_id} <choice number>"
),
formatted_body=(
f"<strong>Poll</strong>: {poll.question}<br/>\n"
f"<ol>{html_answers}</ol>\n"
f"Vote with <code>!tg vote {poll_id} &lt;choice number&gt;</code>"
),
relates_to=relates_to,
external_url=self._get_external_url(evt),
)
await intent.set_typing(self.mxid, is_typing=False)
return await self._send_message(intent, content, timestamp=evt.date)
async def _handle_telegram_dice(
self, _: au.AbstractUser, intent: IntentAPI, evt: Message, relates_to: RelatesTo
) -> EventID:
content = putil.make_dice_event_content(evt.media)
content.relates_to = relates_to
content.external_url = self._get_external_url(evt)
await intent.set_typing(self.mxid, is_typing=False)
return await self._send_message(intent, content, timestamp=evt.date)
@staticmethod
def _int_to_bytes(i: int) -> bytes:
hex_value = f"{i:010x}".encode("utf-8")
return codecs.decode(hex_value, "hex_codec")
def _encode_msgid(self, source: au.AbstractUser, evt: Message) -> str:
if self.peer_type == "channel":
play_id = b"c" + self._int_to_bytes(self.tgid) + self._int_to_bytes(evt.id)
elif self.peer_type == "chat":
play_id = (
b"g"
+ self._int_to_bytes(self.tgid)
+ self._int_to_bytes(evt.id)
+ self._int_to_bytes(source.tgid)
)
elif self.peer_type == "user":
play_id = b"u" + self._int_to_bytes(self.tgid) + self._int_to_bytes(evt.id)
else:
raise ValueError("Portal has invalid peer type")
return base64.b64encode(play_id).decode("utf-8").rstrip("=")
async def _handle_telegram_game(
self, source: au.AbstractUser, intent: IntentAPI, evt: Message, relates_to: RelatesTo
) -> EventID:
game = evt.media.game
play_id = self._encode_msgid(source, evt)
command = f"!tg play {play_id}"
override_text = f"Run {command} in your bridge management room to play {game.title}"
override_entities = [
MessageEntityPre(offset=len("Run "), length=len(command), language="")
]
content = await formatter.telegram_to_matrix(
evt,
source,
self.main_intent,
override_text=override_text,
override_entities=override_entities,
)
content.msgtype = MessageType.NOTICE
content.external_url = self._get_external_url(evt)
content.relates_to = relates_to
content["net.maunium.telegram.game"] = play_id
await intent.set_typing(self.mxid, is_typing=False)
return await self._send_message(intent, content, timestamp=evt.date)
async def _handle_telegram_contact(
self, source: au.AbstractUser, intent: IntentAPI, evt: Message, relates_to: RelatesTo
) -> EventID:
content = await putil.make_contact_event_content(source, evt.media)
content.relates_to = relates_to
content.external_url = self._get_external_url(evt)
await intent.set_typing(self.mxid, is_typing=False)
return await self._send_message(intent, content, timestamp=evt.date)
async def handle_telegram_edit(
self, source: au.AbstractUser, sender: p.Puppet, evt: Message
) -> None:
if not self.mxid:
self.log.trace("Ignoring edit to %d as chat has no Matrix room", evt.id)
return
elif hasattr(evt, "media") and isinstance(evt.media, MessageMediaGame):
self.log.debug("Ignoring game message edit event")
return
async with self.send_lock(sender.tgid if sender else None, required=False):
tg_space = self.tgid if self.peer_type == "channel" else source.tgid
temporary_identifier = EventID(
f"${random.randint(1000000000000, 9999999999999)}TGBRIDGEDITEMP"
)
duplicate_found = self.dedup.check(
evt, (temporary_identifier, tg_space), force_hash=True
)
if duplicate_found:
mxid, other_tg_space = duplicate_found
if tg_space != other_tg_space:
prev_edit_msg = await DBMessage.get_one_by_tgid(
TelegramID(evt.id), tg_space, edit_index=-1
)
if not prev_edit_msg:
return
await DBMessage(
mxid=mxid,
mx_room=self.mxid,
tg_space=tg_space,
tgid=TelegramID(evt.id),
edit_index=prev_edit_msg.edit_index + 1,
).insert()
return
content = await formatter.telegram_to_matrix(
evt, source, self.main_intent, no_reply_fallback=True
)
editing_msg = await DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space)
if not editing_msg:
self.log.info(
f"Didn't find edited message {evt.id}@{tg_space} (src {source.tgid}) "
"in database."
)
return
content.msgtype = (
MessageType.NOTICE
if (sender and sender.is_bot and self.get_config("bot_messages_as_notices"))
else MessageType.TEXT
)
content.external_url = self._get_external_url(evt)
content.set_edit(editing_msg.mxid)
intent = sender.intent_for(self) if sender else self.main_intent
await intent.set_typing(self.mxid, is_typing=False)
event_id = await self._send_message(intent, content)
prev_edit_msg = (
await DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space, -1) or editing_msg
)
await DBMessage(
mxid=event_id,
mx_room=self.mxid,
tg_space=tg_space,
tgid=TelegramID(evt.id),
edit_index=prev_edit_msg.edit_index + 1,
).insert()
await DBMessage.replace_temp_mxid(temporary_identifier, self.mxid, event_id)
@property
def _takeout_options(self) -> dict[str, bool | int]:
return {
"files": True,
"megagroups": self.megagroup,
"chats": self.peer_type == "chat",
"users": self.peer_type == "user",
"channels": (self.peer_type == "channel" and not self.megagroup),
"max_file_size": min(self.config["bridge.max_document_size"], 2000) * 1024 * 1024,
}
async def backfill(
self,
source: u.User,
is_initial: bool = False,
limit: int | None = None,
last_id: int | None = None,
) -> None:
async with self.backfill_method_lock:
await self._locked_backfill(source, is_initial, limit, last_id)
async def _locked_backfill(
self,
source: u.User,
is_initial: bool = False,
limit: int | None = None,
last_id: int | None = None,
) -> None:
limit = limit or (
self.config["bridge.backfill.initial_limit"]
if is_initial
else self.config["bridge.backfill.missed_limit"]
)
if limit == 0:
return
if not self.config["bridge.backfill.normal_groups"] and self.peer_type == "chat":
return
last = await DBMessage.find_last(
self.mxid, (source.tgid if self.peer_type != "channel" else self.tgid)
)
min_id = last.tgid if last else 0
if last_id is None:
messages = await source.client.get_messages(self.peer, limit=1)
if not messages:
# The chat seems empty
return
last_id = messages[0].id
if last_id <= min_id:
# Nothing to backfill
return
if limit < 0:
limit = last_id - min_id
self.log.debug(
f"Backfilling approximately {last_id - min_id} messages through {source.mxid}"
)
elif self.peer_type == "channel":
# This is a channel or supergroup, so we'll backfill messages based on the ID.
# There are some cases, such as deleted messages, where this may backfill less
# messages than the limit.
min_id = max(last_id - limit, min_id)
self.log.debug(
f"Backfilling messages after ID {min_id} (last message: {last_id}) "
f"through {source.mxid}"
)
else:
# Private chats and normal groups don't have their own message ID namespace,
# which means we'll have to fetch messages a different way.
# The _backfill_messages method will detect min_id=None and not use reverse=True
min_id = None
self.log.debug(f"Backfilling up to {limit} messages through {source.mxid}")
with self.backfill_lock:
await self._backfill(source, min_id, limit)
async def _backfill(self, source: u.User, min_id: int | None, limit: int) -> None:
self.backfill_leave = set()
if (
self.peer_type == "user"
and self.tgid != source.tgid
and self.config["bridge.backfill.invite_own_puppet"]
):
self.log.debug("Adding %s's default puppet to room for backfilling", source.mxid)
sender = await p.Puppet.get_by_tgid(source.tgid)
await self.main_intent.invite_user(self.mxid, sender.default_mxid)
await sender.default_mxid_intent.join_room_by_id(self.mxid)
self.backfill_leave.add(sender.default_mxid_intent)
client = source.client
async with NotificationDisabler(self.mxid, source):
if limit > self.config["bridge.backfill.takeout_limit"]:
self.log.debug(f"Opening takeout client for {source.tgid}")
async with client.takeout(**self._takeout_options) as takeout:
count = await self._backfill_messages(source, min_id, limit, takeout)
else:
count = await self._backfill_messages(source, min_id, limit, client)
for intent in self.backfill_leave:
self.log.trace("Leaving room with %s post-backfill", intent.mxid)
await intent.leave_room(self.mxid)
self.backfill_leave = None
self.log.info("Backfilled %d messages through %s", count, source.mxid)
async def _backfill_messages(
self, source: u.User, min_id: int | None, limit: int, client: MautrixTelegramClient
) -> int:
count = 0
entity = await self.get_input_entity(source)
if min_id is not None:
self.log.debug(f"Iterating all messages starting with {min_id} (approx: {limit})")
messages = client.iter_messages(entity, reverse=True, min_id=min_id)
async for message in messages:
sender = (
await p.Puppet.get_by_tgid(TelegramID(message.from_id.user_id))
if isinstance(message.from_id, PeerUser)
else None
)
# TODO handle service messages?
await self.handle_telegram_message(source, sender, message)
count += 1
else:
self.log.debug(f"Fetching up to {limit} most recent messages")
messages = await client.get_messages(entity, limit=limit)
for message in reversed(messages):
sender = (
await p.Puppet.get_by_tgid(TelegramID(message.from_id.user_id))
if isinstance(message.from_id, PeerUser)
else None
)
await self.handle_telegram_message(source, sender, message)
count += 1
return count
async def handle_telegram_message(
self, source: au.AbstractUser, sender: p.Puppet, evt: Message
) -> None:
if not self.mxid:
self.log.trace("Got telegram message %d, but no room exists, creating...", evt.id)
await self.create_matrix_room(source, invites=[source.mxid], update_if_exists=False)
if (
self.peer_type == "user"
and sender
and sender.tgid == self.tg_receiver
and not sender.is_real_user
and not await self.az.state_store.is_joined(self.mxid, sender.mxid)
):
self.log.debug(
f"Ignoring private chat message {evt.id}@{source.tgid} as receiver does"
" not have matrix puppeting and their default puppet isn't in the room"
)
return
async with self.send_lock(sender.tgid if sender else None, required=False):
tg_space = self.tgid if self.peer_type == "channel" else source.tgid
temporary_identifier = EventID(
f"${random.randint(1000000000000, 9999999999999)}TGBRIDGETEMP"
)
duplicate_found = self.dedup.check(evt, (temporary_identifier, tg_space))
if duplicate_found:
mxid, other_tg_space = duplicate_found
self.log.debug(
f"Ignoring message {evt.id}@{tg_space} (src {source.tgid}) "
f"as it was already handled (in space {other_tg_space})"
)
if tg_space != other_tg_space:
await DBMessage(
tgid=TelegramID(evt.id),
mx_room=self.mxid,
mxid=mxid,
tg_space=tg_space,
edit_index=0,
).insert()
return
if self.backfill_lock.locked or (self.dedup.pre_db_check and self.peer_type == "channel"):
msg = await DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space)
if msg:
self.log.debug(
f"Ignoring message {evt.id} (src {source.tgid}) as it was already "
f"handled into {msg.mxid}. This duplicate was catched in the db "
"check. If you get this message often, consider increasing "
"bridge.deduplication.cache_queue_length in the config."
)
return
self.log.trace("Handling Telegram message %s", evt)
if sender and not sender.displayname:
self.log.debug(
f"Telegram user {sender.tgid} sent a message, but doesn't have a "
"displayname, updating info..."
)
entity = await source.client.get_entity(PeerUser(sender.tgid))
await sender.update_info(source, entity)
if not sender.displayname:
self.log.debug(
f"Telegram user {sender.tgid} doesn't have a displayname even after"
f" updating with data {entity!s}"
)
allowed_media = (
MessageMediaPhoto,
MessageMediaDocument,
MessageMediaGeo,
MessageMediaGame,
MessageMediaDice,
MessageMediaPoll,
MessageMediaContact,
MessageMediaUnsupported,
)
if sender:
intent = sender.intent_for(self)
if (
self.backfill_lock.locked
and intent != sender.default_mxid_intent
and self.config["bridge.backfill.invite_own_puppet"]
):
intent = sender.default_mxid_intent
self.backfill_leave.add(intent)
else:
intent = self.main_intent
if hasattr(evt, "media") and isinstance(evt.media, allowed_media):
handler: MediaHandler = {
MessageMediaPhoto: self._handle_telegram_photo,
MessageMediaDocument: self._handle_telegram_document,
MessageMediaGeo: self._handle_telegram_location,
MessageMediaPoll: self._handle_telegram_poll,
MessageMediaDice: self._handle_telegram_dice,
MessageMediaUnsupported: self._handle_telegram_unsupported,
MessageMediaGame: self._handle_telegram_game,
MessageMediaContact: self._handle_telegram_contact,
}[type(evt.media)]
relates_to = await formatter.telegram_reply_to_matrix(evt, source)
event_id = await handler(source, intent, evt, relates_to)
elif evt.message:
is_bot = sender.is_bot if sender else False
event_id = await self._handle_telegram_text(source, intent, is_bot, evt)
else:
self.log.debug("Unhandled Telegram message %d", evt.id)
return
if not event_id:
return
self._new_messages_after_sponsored = True
prev_id = self.dedup.update(evt, (event_id, tg_space), (temporary_identifier, tg_space))
if prev_id:
self.log.debug(
f"Sent message {evt.id}@{tg_space} to Matrix as {event_id}. "
f"Temporary dedup identifier was {temporary_identifier}, "
f"but dedup map contained {prev_id[1]} instead! -- "
"This was probably a race condition caused by Telegram sending updates"
"to other clients before responding to the sender. I'll just redact "
"the likely duplicate message now."
)
await intent.redact(self.mxid, event_id)
return
self.log.debug("Handled telegram message %d -> %s", evt.id, event_id)
try:
await DBMessage(
tgid=TelegramID(evt.id),
mx_room=self.mxid,
mxid=event_id,
tg_space=tg_space,
edit_index=0,
).insert()
await DBMessage.replace_temp_mxid(temporary_identifier, self.mxid, event_id)
except (IntegrityError, UniqueViolationError) as e:
self.log.exception(
f"{e.__class__.__name__} while saving message mapping. "
"This might mean that an update was handled after it left the "
"dedup cache queue. You can try enabling bridge.deduplication."
"pre_db_check in the config."
)
await intent.redact(self.mxid, event_id)
await self._send_delivery_receipt(event_id)
async def _create_room_on_action(
self, source: au.AbstractUser, action: TypeMessageAction
) -> bool:
if source.is_relaybot and self.config["bridge.ignore_unbridged_group_chat"]:
return False
create_and_exit = (MessageActionChatCreate, MessageActionChannelCreate)
create_and_continue = (MessageActionChatAddUser, MessageActionChatJoinedByLink)
if isinstance(action, create_and_exit) or isinstance(action, create_and_continue):
await self.create_matrix_room(
source, invites=[source.mxid], update_if_exists=isinstance(action, create_and_exit)
)
if not isinstance(action, create_and_continue):
return False
return True
async def handle_telegram_action(
self, source: au.AbstractUser, sender: p.Puppet, update: MessageService
) -> None:
action = update.action
should_ignore = (
not self.mxid and not await self._create_room_on_action(source, action)
) or self.dedup.check_action(update)
if should_ignore or not self.mxid:
return
if isinstance(action, MessageActionChatEditTitle):
await self._update_title(action.title, sender=sender, save=True)
await self.update_bridge_info()
elif isinstance(action, MessageActionChatEditPhoto):
await self._update_avatar(source, action.photo, sender=sender, save=True)
await self.update_bridge_info()
elif isinstance(action, MessageActionChatDeletePhoto):
await self._update_avatar(source, ChatPhotoEmpty(), sender=sender, save=True)
await self.update_bridge_info()
elif isinstance(action, MessageActionChatAddUser):
for user_id in action.users:
await self._add_telegram_user(TelegramID(user_id), source)
elif isinstance(action, MessageActionChatJoinedByLink):
await self._add_telegram_user(sender.id, source)
elif isinstance(action, MessageActionChatDeleteUser):
await self._delete_telegram_user(TelegramID(action.user_id), sender)
elif isinstance(action, MessageActionChatMigrateTo):
await self._migrate_and_save_telegram(TelegramID(action.channel_id))
# TODO encrypt
await sender.intent_for(self).send_emote(
self.mxid, "upgraded this group to a supergroup."
)
await self.update_bridge_info()
elif isinstance(action, MessageActionGameScore):
# TODO handle game score
pass
else:
self.log.trace("Unhandled Telegram action in %s: %s", self.title, action)
async def set_telegram_admin(self, user_id: TelegramID) -> None:
puppet = await p.Puppet.get_by_tgid(user_id)
user = await u.User.get_by_tgid(user_id)
levels = await self.main_intent.get_power_levels(self.mxid)
if user:
levels.users[user.mxid] = 50
if puppet:
levels.users[puppet.mxid] = 50
await self.main_intent.set_power_levels(self.mxid, levels)
async def receive_telegram_pin_ids(
self, msg_ids: list[TelegramID], receiver: TelegramID, remove: bool
) -> None:
async with self._pin_lock:
tg_space = receiver if self.peer_type != "channel" else self.tgid
previously_pinned = await self.main_intent.get_pinned_messages(self.mxid)
currently_pinned_dict = {event_id: True for event_id in previously_pinned}
for message in await DBMessage.get_first_by_tgids(msg_ids, tg_space):
if remove:
currently_pinned_dict.pop(message.mxid, None)
else:
currently_pinned_dict[message.mxid] = True
currently_pinned = list(currently_pinned_dict.keys())
if currently_pinned != previously_pinned:
await self.main_intent.set_pinned_messages(self.mxid, currently_pinned)
async def set_telegram_admins_enabled(self, enabled: bool) -> None:
level = 50 if enabled else 10
levels = await self.main_intent.get_power_levels(self.mxid)
levels.invite = level
levels.events[EventType.ROOM_NAME] = level
levels.events[EventType.ROOM_AVATAR] = level
await self.main_intent.set_power_levels(self.mxid, levels)
# endregion
# region Miscellaneous getters
def get_config(self, key: str) -> Any:
local = util.recursive_get(self.local_config, key)
if local is not None:
return local
return self.config[f"bridge.{key}"]
@staticmethod
def _photo_size_key(photo: TypePhotoSize) -> int:
if isinstance(photo, PhotoSize):
return photo.size
elif isinstance(photo, PhotoSizeProgressive):
return max(photo.sizes)
elif isinstance(photo, PhotoSizeEmpty):
return 0
else:
return len(photo.bytes)
@classmethod
def _get_largest_photo_size(
cls, photo: Photo | Document
) -> tuple[InputPhotoFileLocation | None, TypePhotoSize | None]:
if (
not photo
or isinstance(photo, PhotoEmpty)
or (isinstance(photo, Document) and not photo.thumbs)
):
return None, None
largest = max(
photo.thumbs if isinstance(photo, Document) else photo.sizes, key=cls._photo_size_key
)
return (
InputPhotoFileLocation(
id=photo.id,
access_hash=photo.access_hash,
file_reference=photo.file_reference,
thumb_size=largest.type,
),
largest,
)
async def can_user_perform(self, user: u.User, event: str) -> bool:
if user.is_admin:
return True
if not self.mxid:
# No room for anybody to perform actions in
return False
try:
await self.main_intent.get_power_levels(self.mxid)
except MatrixRequestError:
return False
evt_type = EventType.find(f"net.maunium.telegram.{event}", t_class=EventType.Class.STATE)
return await self.main_intent.state_store.has_power_level(self.mxid, user.mxid, evt_type)
def get_input_entity(
self, user: au.AbstractUser
) -> Awaitable[TypeInputPeer | TypeInputChannel]:
return user.client.get_input_entity(self.peer)
async def get_entity(self, user: au.AbstractUser) -> TypeChat:
try:
return await user.client.get_entity(self.peer)
except ValueError:
if user.is_bot:
self.log.warning(f"Could not find entity with bot {user.tgid}. Failing...")
raise
self.log.warning(
f"Could not find entity with user {user.tgid}. falling back to get_dialogs."
)
async for dialog in user.client.iter_dialogs():
if dialog.entity.id == self.tgid:
return dialog.entity
raise
async def get_invite_link(
self, user: u.User, uses: int | None = None, expire: datetime | None = None
) -> str:
if self.peer_type == "user":
raise ValueError("You can't invite users to private chats.")
if self.username:
return f"https://t.me/{self.username}"
link = await user.client(
ExportChatInviteRequest(
peer=await self.get_input_entity(user), expire_date=expire, usage_limit=uses
)
)
return link.link
# endregion
# region Matrix room cleanup
async def get_authenticated_matrix_users(self) -> list[UserID]:
try:
members = await self.main_intent.get_room_members(self.mxid)
except MatrixRequestError:
return []
authenticated: list[UserID] = []
has_bot = self.has_bot
for member in members:
if p.Puppet.get_id_from_mxid(member) or member == self.az.bot_mxid:
continue
user = await u.User.get_and_start_by_mxid(member)
authenticated_through_bot = has_bot and user.relaybot_whitelisted
if authenticated_through_bot or await user.has_full_access(allow_bot=True):
authenticated.append(user.mxid)
return authenticated
async def cleanup_portal(
self, message: str, puppets_only: bool = False, delete: bool = True
) -> None:
if self.username:
try:
await self.main_intent.remove_room_alias(self.alias_localpart)
except (MatrixRequestError, IntentError):
self.log.warning("Failed to remove alias when cleaning up room", exc_info=True)
await self.cleanup_room(self.main_intent, self.mxid, message, puppets_only)
if delete:
await self.delete()
async def delete(self) -> None:
try:
del self.by_tgid[self.tgid_full]
except KeyError:
pass
try:
del self.by_mxid[self.mxid]
except KeyError:
pass
await super().delete()
await DBMessage.delete_all(self.mxid)
self.deleted = True
# endregion
# region Class instance lookup
async def postinit(self) -> None:
puppet = await p.Puppet.get_by_tgid(self.tgid) if self.is_direct else None
self._main_intent = puppet.intent_for(self) if self.is_direct else self.az.intent
if self.tgid:
self.by_tgid[self.tgid_full] = self
if self.mxid:
self.by_mxid[self.mxid] = self
@classmethod
async def all(cls) -> AsyncGenerator[Portal, None]:
portals = await super().all()
portal: cls
for portal in portals:
try:
yield cls.by_tgid[portal.tgid_full]
except KeyError:
await portal.postinit()
yield portal
@classmethod
async def find_private_chats(cls, tg_receiver: TelegramID) -> AsyncGenerator[Portal, None]:
portals = await super().find_private_chats(tg_receiver)
portal: cls
for portal in portals:
try:
yield cls.by_tgid[portal.tgid_full]
except KeyError:
await portal.postinit()
yield portal
@classmethod
@async_getter_lock
async def get_by_mxid(cls, mxid: RoomID) -> Portal | None:
try:
return cls.by_mxid[mxid]
except KeyError:
pass
portal = cast(cls, await super().get_by_mxid(mxid))
if portal:
await portal.postinit()
return portal
return None
@classmethod
def get_username_from_mx_alias(cls, alias: str) -> str | None:
return cls.alias_template.parse(alias)
@classmethod
async def find_by_username(cls, username: str) -> Portal | None:
if not username:
return None
username = username.lower()
for _, portal in cls.by_tgid.items():
if portal.username and portal.username.lower() == username:
return portal
portal = cast(cls, await super().find_by_username(username))
if portal:
try:
return cls.by_tgid[portal.tgid_full]
except KeyError:
await portal.postinit()
return portal
return None
@classmethod
@async_getter_lock
async def get_by_tgid(
cls, tgid: TelegramID, *, tg_receiver: TelegramID | None = None, peer_type: str = None
) -> Portal | None:
if peer_type == "user" and tg_receiver is None:
raise ValueError('tg_receiver is required when peer_type is "user"')
tg_receiver = tg_receiver or tgid
tgid_full = (tgid, tg_receiver)
try:
return cls.by_tgid[tgid_full]
except KeyError:
pass
portal = cast(cls, await super().get_by_tgid(tgid, tg_receiver))
if portal:
await portal.postinit()
return portal
if peer_type:
cls.log.info(f"Creating portal for {peer_type} {tgid} (receiver {tg_receiver})")
# TODO enable this for non-release builds
# (or add better wrong peer type error handling)
# if peer_type == "chat":
# import traceback
# cls.log.info("Chat portal stack trace:\n" + "".join(traceback.format_stack()))
portal = cls(tgid, peer_type=peer_type, tg_receiver=tg_receiver)
await portal.postinit()
await portal.insert()
return portal
return None
@classmethod
async def get_by_entity(
cls,
entity: TypeChat | TypePeer | TypeUser | TypeUserFull | TypeInputPeer,
tg_receiver: TelegramID | None = None,
create: bool = True,
) -> Portal | None:
entity_type = type(entity)
if entity_type in (Chat, ChatFull):
type_name = "chat"
entity_id = entity.id
elif entity_type in (PeerChat, InputPeerChat):
type_name = "chat"
entity_id = entity.chat_id
elif entity_type in (Channel, ChannelFull):
type_name = "channel"
entity_id = entity.id
elif entity_type in (PeerChannel, InputPeerChannel, InputChannel):
type_name = "channel"
entity_id = entity.channel_id
elif entity_type in (User, UserFull):
type_name = "user"
entity_id = entity.id
elif entity_type in (PeerUser, InputPeerUser, InputUser):
type_name = "user"
entity_id = entity.user_id
else:
raise ValueError(f"Unknown entity type {entity_type.__name__}")
return await cls.get_by_tgid(
TelegramID(entity_id),
tg_receiver=tg_receiver if type_name == "user" else entity_id,
peer_type=type_name if create else None,
)
# endregion