diff --git a/README.md b/README.md index ef35e4b1..57704bd0 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # mautrix-telegram -A Matrix-Telegram puppeting bridge. +A Matrix-Telegram hybrid puppeting/relaybot bridge. ### [Wiki](https://github.com/tulir/mautrix-telegram/wiki) diff --git a/ROADMAP.md b/ROADMAP.md index 89b1763d..78b38cc4 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -25,7 +25,7 @@ * [x] Matrix users who have logged into Telegram * [x] Kicking * [ ] Joining - * [ ] Chat name as alias + * [x] Chat name as alias * [ ] ‡ Chat invite link as alias * [x] Leaving * [x] Room metadata changes (name, topic, avatar) @@ -46,7 +46,7 @@ * [x] Audio messages * [x] Video messages * [x] Documents - * [ ] Message deletions (no way to tell difference between user-specific deletion and global deletion) + * [x] Message deletions * [ ] Message edits (not supported in Matrix) * [x] Avatars * [x] Presence @@ -74,7 +74,7 @@ * [x] At startup * [x] When receiving invite or message * [x] Private chat creation by inviting Matrix puppet of Telegram user to new room - * [ ] Option to use bot to relay messages for unauthenticated Matrix users + * [x] Option to use bot to relay messages for unauthenticated Matrix users * [ ] Option to use own Matrix account for messages sent from other Telegram clients * [Commands](https://github.com/tulir/mautrix-telegram/wiki/Management-commands) * [x] Logging in and out (`login` + code entering) diff --git a/alembic/versions/1b241f7e8530_add_telegramfile_table.py b/alembic/versions/1b241f7e8530_add_telegramfile_table.py new file mode 100644 index 00000000..c9cca21c --- /dev/null +++ b/alembic/versions/1b241f7e8530_add_telegramfile_table.py @@ -0,0 +1,28 @@ +"""Add TelegramFile table + +Revision ID: 1b241f7e8530 +Revises: 97d2a942bcf8 +Create Date: 2018-02-19 23:52:06.605741 + +""" +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = '1b241f7e8530' +down_revision = '97d2a942bcf8' +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table('telegram_file', + sa.Column('id', sa.String(), nullable=False), + sa.Column('mxc', sa.String(), nullable=True), + sa.Column('mime_type', sa.String(), nullable=True), + sa.Column('was_converted', sa.Boolean(), nullable=True), + sa.PrimaryKeyConstraint('id')) + + +def downgrade(): + op.drop_table('telegram_file') diff --git a/alembic/versions/7d47d84380b6_add_timestamp_to_telegramfile.py b/alembic/versions/7d47d84380b6_add_timestamp_to_telegramfile.py new file mode 100644 index 00000000..10d59c87 --- /dev/null +++ b/alembic/versions/7d47d84380b6_add_timestamp_to_telegramfile.py @@ -0,0 +1,25 @@ +"""Add timestamp to TelegramFile + +Revision ID: 7d47d84380b6 +Revises: 1b241f7e8530 +Create Date: 2018-02-19 23:53:18.050871 + +""" +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = '7d47d84380b6' +down_revision = '1b241f7e8530' +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column('telegram_file', + sa.Column('timestamp', sa.BigInteger(), nullable=False, default=0, + server_default="true")) + + +def downgrade(): + op.drop_column('telegram_file', 'timestamp') diff --git a/alembic/versions/97d2a942bcf8_initial_revision.py b/alembic/versions/97d2a942bcf8_initial_revision.py index 449f7f54..00d3354d 100644 --- a/alembic/versions/97d2a942bcf8_initial_revision.py +++ b/alembic/versions/97d2a942bcf8_initial_revision.py @@ -1,14 +1,13 @@ """initial revision Revision ID: 97d2a942bcf8 -Revises: +Revises: Create Date: 2018-02-11 18:40:55.483842 """ from alembic import op import sqlalchemy as sa - # revision identifiers, used by Alembic. revision = '97d2a942bcf8' down_revision = None @@ -17,12 +16,61 @@ depends_on = None def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - pass - # ### end Alembic commands ### + op.create_table('portal', + sa.Column('tgid', sa.Integer), + sa.Column('tg_receiver', sa.Integer), + sa.Column('peer_type', sa.String, nullable=False, default=""), + sa.Column('mxid', sa.String, nullable=True), + sa.Column('username', sa.String, nullable=True), + sa.Column('title', sa.String, nullable=True), + sa.Column('about', sa.String, nullable=True), + sa.Column('photo_id', sa.String, nullable=True), + sa.PrimaryKeyConstraint('tgid', 'tg_receiver'), + sa.UniqueConstraint('mxid')) + op.create_table('user', + sa.Column('mxid', sa.String), + sa.Column('tgid', sa.Integer, nullable=True), + sa.Column('tg_username', sa.String, nullable=True), + sa.Column('saved_contacts', sa.Integer, nullable=False, default=0), + sa.PrimaryKeyConstraint('mxid')) + op.create_table('puppet', + sa.Column('id', sa.Integer), + sa.Column('displayname', sa.String, nullable=True), + sa.Column('username', sa.String, nullable=True), + sa.Column('photo_id', sa.String, nullable=True), + sa.PrimaryKeyConstraint('id')) + op.create_table('contact', + sa.Column('user', sa.Integer), + sa.Column('contact', sa.Integer), + sa.ForeignKeyConstraint(("user",), ("user.tgid",)), + sa.ForeignKeyConstraint(("contact",), ("puppet.id",)), + sa.PrimaryKeyConstraint('user', 'contact')) + op.create_table('user_portal', + sa.Column('user', sa.Integer), + sa.Column('portal', sa.Integer), + sa.Column('portal_receiver', sa.Integer), + sa.PrimaryKeyConstraint('user', 'portal', 'portal_receiver'), + sa.ForeignKeyConstraint(("user",), ("user.tgid",)), + sa.ForeignKeyConstraint(("portal", "portal_receiver"), + ("portal.tgid", "portal.tg_receiver"))) + op.create_table('message', + sa.Column('mxid', sa.String), + sa.Column('mx_room', sa.String), + sa.Column('tgid', sa.Integer), + sa.Column('tg_space', sa.Integer), + sa.PrimaryKeyConstraint('tgid', 'tg_space'), + sa.UniqueConstraint("mxid", "mx_room", "tg_space", name="_mx_id_room")) + op.create_table('bot_chat', + sa.Column('id', sa.Integer), + sa.Column('type', sa.String, nullable=False, default=""), + sa.PrimaryKeyConstraint('id')) def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - pass - # ### end Alembic commands ### + op.drop_table('bot_chat') + op.drop_table('message') + op.drop_table('user_portal') + op.drop_table('contact') + op.drop_table('puppet') + op.drop_table('user') + op.drop_table('portal') diff --git a/example-config.yaml b/example-config.yaml index 32a178e1..b467086d 100644 --- a/example-config.yaml +++ b/example-config.yaml @@ -79,6 +79,11 @@ bridge: # Show message editing as a reply to the original message. # If this is false, message edits are not shown at all, as Matrix does not support editing yet. edits_as_replies: False + # Whether or not Matrix bot messages (type m.notice) should be bridged. + bridge_notices: False + # The maximum number of simultaneous Telegram deletions to handle. + # A large number of simultaneous redactions could put strain on your homeserver. + max_telegram_delete: 10 # The prefix for commands. Only required in non-management rooms. command_prefix: "!tg" @@ -99,3 +104,5 @@ telegram: # Get your own API keys at https://my.telegram.org/apps api_id: 12345 api_hash: tjyd5yge35lbodk1xwzw2jstp90k55qz + # (Optional) Create your own bot at https://t.me/BotFather + #bot_token: 123456789:ABCD-QBPd3VrWRhg623xYh07WUWErYA9eMI diff --git a/mautrix_appservice/appservice.py b/mautrix_appservice/appservice.py index bc237007..bb340607 100644 --- a/mautrix_appservice/appservice.py +++ b/mautrix_appservice/appservice.py @@ -46,7 +46,7 @@ class AppService: self.log = (logging.getLogger(log) if isinstance(log, str) else log or logging.getLogger("mautrix_appservice")) - def default_query_handler(_): + async def default_query_handler(_): return None self.query_user = query_user or default_query_handler diff --git a/mautrix_appservice/errors.py b/mautrix_appservice/errors.py index 8c09936f..02c4f87b 100644 --- a/mautrix_appservice/errors.py +++ b/mautrix_appservice/errors.py @@ -31,7 +31,7 @@ class MatrixRequestError(MatrixError): """ The home server returned an error response. """ def __init__(self, code=0, text="", errcode=None, message=None): - super().__init__("%d: %s" % (code, text)) + super().__init__(f"{code}: {text}") self.code = code self.text = text self.errcode = errcode diff --git a/mautrix_appservice/intent_api.py b/mautrix_appservice/intent_api.py index 0fd002e0..f011f3c0 100644 --- a/mautrix_appservice/intent_api.py +++ b/mautrix_appservice/intent_api.py @@ -194,12 +194,16 @@ class IntentAPI: content = {"displayname": name} return await self.client.request("PUT", f"/profile/{self.mxid}/displayname", content) - async def set_presence(self, status="online"): + async def set_presence(self, status="online", ignore_cache=False): await self.ensure_registered() + if not ignore_cache and self.state_store.has_presence(self.mxid, status): + return content = { "presence": status } - return await self.client.request("PUT", f"/presence/{self.mxid}/status", content) + resp = await self.client.request("PUT", f"/presence/{self.mxid}/status", content) + self.state_store.set_presence(self.mxid, status) + return resp async def set_avatar(self, url): await self.ensure_registered() @@ -223,11 +227,14 @@ class IntentAPI: # region Room actions async def create_room(self, alias=None, is_public=False, name=None, topic=None, - is_direct=False, invitees=None, initial_state=None): + is_direct=False, invitees=None, initial_state=None, + guests_can_join=False): await self.ensure_registered() content = { - "visibility": "public" if is_public else "private", + "visibility": "private", "is_direct": is_direct, + "preset": "public_chat" if is_public else "private_chat", + "guests_can_join": guests_can_join, } if alias: content["room_alias_name"] = alias @@ -326,18 +333,29 @@ class IntentAPI: events.remove(event_id) await self.set_pinned_messages(room_id, events) + async def set_join_rule(self, room_id, join_rule): + if join_rule not in ("public", "knock", "invite", "private"): + raise ValueError(f"Invalid join rule \"{join_rule}\"") + await self.send_state_event(room_id, "m.room.join_rules", { + "join_rule": join_rule, + }) + async def get_event(self, room_id, event_id): await self.ensure_joined(room_id) return await self.client.request("GET", f"/rooms/{room_id}/event/{event_id}") - async def set_typing(self, room_id, is_typing=True, timeout=5000): + async def set_typing(self, room_id, is_typing=True, timeout=5000, ignore_cache=False): await self.ensure_joined(room_id) + if not ignore_cache and is_typing == self.state_store.is_typing(room_id, self.mxid): + return content = { "typing": is_typing } if is_typing: content["timeout"] = timeout - return await self.client.request("PUT", f"/rooms/{room_id}/typing/{self.mxid}", content) + resp = await self.client.request("PUT", f"/rooms/{room_id}/typing/{self.mxid}", content) + self.state_store.set_typing(room_id, self.mxid, is_typing, timeout) + return resp async def mark_read(self, room_id, event_id): await self.ensure_joined(room_id) @@ -407,11 +425,31 @@ class IntentAPI: return self.send_state_event(room_id, "m.room.member", body, state_key=user_id) + def redact(self, room_id, event_id, reason=None, txn_id=None): + txn_id = txn_id or str(self.client.txn_id) + str(int(time() * 1000)) + self.client.txn_id += 1 + content = {} + if reason: + content["reason"] = reason + return self.client.request("PUT", + f"/rooms/{quote(room_id)}/redact/{quote(event_id)}/{txn_id}", + content) + @staticmethod def _get_event_url(room_id, event_type, txn_id): + if not room_id: + raise ValueError("Room ID not given") + elif not event_type: + raise ValueError("Event type not given") + elif not txn_id: + raise ValueError("Transaction ID not given") return f"/rooms/{quote(room_id)}/send/{quote(event_type)}/{quote(txn_id)}" async def send_event(self, room_id, event_type, content, txn_id=None): + if not room_id: + raise ValueError("Room ID not given") + elif not event_type: + raise ValueError("Event type not given") await self.ensure_joined(room_id) await self._ensure_has_power_level_for(room_id, event_type) @@ -424,29 +462,47 @@ class IntentAPI: @staticmethod def _get_state_url(room_id, event_type, state_key=""): + if not room_id: + raise ValueError("Room ID not given") + elif not event_type: + raise ValueError("Event type not given") url = f"/rooms/{quote(room_id)}/state/{quote(event_type)}" if state_key: url += f"/{quote(state_key)}" return url async def send_state_event(self, room_id, event_type, content, state_key=""): + if not room_id: + raise ValueError("Room ID not given") + elif not event_type: + raise ValueError("Event type not given") await self.ensure_joined(room_id) - await self._ensure_has_power_level_for(room_id, event_type) + await self._ensure_has_power_level_for(room_id, event_type, is_state_event=True) url = self._get_state_url(room_id, event_type, state_key) return await self.client.request("PUT", url, content) async def get_state_event(self, room_id, event_type, state_key=""): + if not room_id: + raise ValueError("Room ID not given") + elif not event_type: + raise ValueError("Event type not given") await self.ensure_joined(room_id) url = self._get_state_url(room_id, event_type, state_key) return await self.client.request("GET", url) def join_room(self, room_id): + if not room_id: + raise ValueError("Room ID not given") return self.ensure_joined(room_id, ignore_cache=True) def _join_room_direct(self, room): + if not room: + raise ValueError("Room ID not given") return self.client.request("POST", f"/join/{quote(room)}") def leave_room(self, room_id): + if not room_id: + raise ValueError("Room ID not given") try: self.state_store.left(room_id, self.mxid) return self.client.request("POST", f"/rooms/{quote(room_id)}/leave") @@ -455,6 +511,8 @@ class IntentAPI: raise def get_room_memberships(self, room_id): + if not room_id: + raise ValueError("Room ID not given") return self.client.request("GET", f"/rooms/{quote(room_id)}/members") async def get_room_members(self, room_id, allowed_memberships=("join",)): @@ -472,6 +530,8 @@ class IntentAPI: # region Ensure functions async def ensure_joined(self, room_id, ignore_cache=False): + if not room_id: + raise ValueError("Room ID not given") if not ignore_cache and self.state_store.is_joined(room_id, self.mxid): return await self.ensure_registered() @@ -505,16 +565,22 @@ class IntentAPI: return self.state_store.registered(self.mxid) - async def _ensure_has_power_level_for(self, room_id, event_type): + async def _ensure_has_power_level_for(self, room_id, event_type, is_state_event=False): + if not room_id: + raise ValueError("Room ID not given") + elif not event_type: + raise ValueError("Event type not given") + if not self.state_store.has_power_levels(room_id): await self.get_power_levels(room_id) - if self.state_store.has_power_level(room_id, self.mxid, event_type): + if self.state_store.has_power_level(room_id, self.mxid, event_type, + is_state_event=is_state_event): return elif not self.bot: self.log.warning( f"Power level of {self.mxid} is not enough for {event_type} in {room_id}") # raise IntentError(f"Power level of {self.mxid} is not enough" - # + f"for {event_type} in {room_id}") + # f"for {event_type} in {room_id}") return # TODO implement diff --git a/mautrix_appservice/state_store.py b/mautrix_appservice/state_store.py index 1174dcde..b5421cd5 100644 --- a/mautrix_appservice/state_store.py +++ b/mautrix_appservice/state_store.py @@ -15,14 +15,21 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . import json +import time class StateStore: def __init__(self, autosave_file=None): + self.autosave_file = autosave_file + + # Persistent storage self.registrations = set() self.memberships = {} self.power_levels = {} - self.autosave_file = autosave_file + + # Non-persistent storage + self.presence = {} + self.typing = {} def save(self, file): if isinstance(file, str): @@ -63,6 +70,29 @@ class StateStore: if self.autosave_file: self.save(self.autosave_file) + def set_presence(self, user, presence): + self.presence[user] = presence + + def has_presence(self, user, presence): + try: + return self.presence[user] == presence + except KeyError: + return False + + def set_typing(self, room_id, user, is_typing, timeout=0): + if is_typing: + ts = int(round(time.time() * 1000)) + self.typing[(room_id, user)] = ts + timeout + else: + del self.typing[(room_id, user)] + + def is_typing(self, room_id, user): + ts = int(round(time.time() * 1000)) + try: + return self.typing[(room_id, user)] > ts + except KeyError: + return False + def is_registered(self, user): return user in self.registrations @@ -97,10 +127,12 @@ class StateStore: def get_power_levels(self, room): return self.power_levels[room] - def has_power_level(self, room, user, event): + def has_power_level(self, room, user, event, is_state_event=False): room_levels = self.power_levels.get(room, {}) - required = room_levels.get("events", {}).get(event, 95) - has = room_levels.get("users", {}).get(user, 0) + default_required = (room_levels.get("state_default", 50) if is_state_event + else room_levels.get("events_default", 0)) + required = room_levels.get("events", {}).get(event, default_required) + has = room_levels.get("users", {}).get(user, room_levels.get("users_default", 0)) return has >= required def set_power_level(self, room, user, level): diff --git a/mautrix_telegram/__main__.py b/mautrix_telegram/__main__.py index 27115a88..2f01e1ac 100644 --- a/mautrix_telegram/__main__.py +++ b/mautrix_telegram/__main__.py @@ -29,10 +29,13 @@ from .config import Config from .matrix import MatrixHandler from .db import init as init_db +from .abstract_user import init as init_abstract_user from .user import init as init_user, User +from .bot import init as init_bot from .portal import init as init_portal from .puppet import init as init_puppet from .public import PublicBridgeWebsite +from .context import Context log = logging.getLogger("mau") time_formatter = logging.Formatter("[%(asctime)s] [%(levelname)s@%(name)s] %(message)s") @@ -74,21 +77,30 @@ Base.metadata.bind = db_engine Base.metadata.create_all() loop = asyncio.get_event_loop() -az = AppService(config["homeserver.address"], config["homeserver.domain"], - config["appservice.as_token"], config["appservice.hs_token"], - config["appservice.bot_username"], log="mau.as", loop=loop) -context = (az, db_session, config, loop) + +appserv = AppService(config["homeserver.address"], config["homeserver.domain"], + config["appservice.as_token"], config["appservice.hs_token"], + config["appservice.bot_username"], log="mau.as", loop=loop) + + +context = Context(appserv, db_session, config, loop, None, None) if config["appservice.public.enabled"]: public = PublicBridgeWebsite(loop) - az.app.add_subapp(config.get("appservice.public.prefix", "/public"), public.app) + appserv.app.add_subapp(config.get("appservice.public.prefix", "/public"), public.app) -with az.run(config["appservice.hostname"], config["appservice.port"]) as start: - MatrixHandler(context) +with appserv.run(config["appservice.hostname"], config["appservice.port"]) as start: init_db(db_session) + init_abstract_user(context) + context.bot = init_bot(context) + context.mx = MatrixHandler(context) init_portal(context) init_puppet(context) - startup_actions = init_user(context) + [start] + startup_actions = init_user(context) + [start, context.mx.init_as_bot()] + + if context.bot: + startup_actions.append(context.bot.start()) + try: loop.run_until_complete(asyncio.gather(*startup_actions, loop=loop)) loop.run_forever() diff --git a/mautrix_telegram/abstract_user.py b/mautrix_telegram/abstract_user.py new file mode 100644 index 00000000..7619c432 --- /dev/null +++ b/mautrix_telegram/abstract_user.py @@ -0,0 +1,286 @@ +# -*- coding: future_fstrings -*- +# mautrix-telegram - A Matrix-Telegram puppeting bridge +# Copyright (C) 2018 Tulir Asokan +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +import platform +import os + +from telethon.tl.types import * +from mautrix_appservice import MatrixRequestError + +from .tgclient import MautrixTelegramClient +from .db import Message as DBMessage +from . import portal as po, puppet as pu, __version__ + +config = None +# Value updated from config in init() +MAX_DELETIONS = 10 + + +class AbstractUser: + loop = None + log = None + db = None + az = None + + def __init__(self): + self.connected = False + self.whitelisted = False + self.client = None + self.tgid = None + self.mxid = None + + def _init_client(self): + self.log.debug(f"Initializing client for {self.name}") + device = f"{platform.system()} {platform.release()}" + sysversion = MautrixTelegramClient.__version__ + self.client = MautrixTelegramClient(self.name, + config["telegram.api_id"], + config["telegram.api_hash"], + loop=self.loop, + app_version=__version__, + system_version=sysversion, + device_model=device) + self.client.add_update_handler(self._update_catch) + + async def update(self, update): + return False + + async def post_login(self): + raise NotImplementedError() + + async def _update_catch(self, update): + try: + if not await self.update(update): + await self._update(update) + except Exception: + self.log.exception("Failed to handle Telegram update") + + async def _get_dialogs(self, limit=None): + dialogs = await self.client.get_dialogs(limit=limit) + return [dialog.entity for dialog in dialogs if ( + not isinstance(dialog.entity, (User, ChatForbidden, ChannelForbidden)) + and not (isinstance(dialog.entity, Chat) + and (dialog.entity.deactivated or dialog.entity.left)))] + + @property + def name(self): + raise NotImplementedError() + + @property + def logged_in(self): + return self.client and self.client.is_user_authorized() + + @property + def has_full_access(self): + return self.logged_in and self.whitelisted + + async def start(self): + self.connected = await self.client.connect() + + async def ensure_started(self, even_if_no_session=False): + if not self.whitelisted: + return self + elif not self.connected and (even_if_no_session or os.path.exists(f"{self.name}.session")): + return await self.start() + return self + + def stop(self): + self.client.disconnect() + self.client = None + self.connected = False + + # region Telegram update handling + + async def _update(self, update): + if isinstance(update, + (UpdateShortChatMessage, UpdateShortMessage, UpdateNewChannelMessage, + UpdateNewMessage, UpdateEditMessage, UpdateEditChannelMessage)): + await self.update_message(update) + elif isinstance(update, UpdateDeleteMessages): + await self.delete_message(update) + elif isinstance(update, UpdateDeleteChannelMessages): + await self.delete_channel_message(update) + elif isinstance(update, (UpdateChatUserTyping, UpdateUserTyping)): + await self.update_typing(update) + elif isinstance(update, UpdateUserStatus): + await self.update_status(update) + elif isinstance(update, (UpdateChatAdmins, UpdateChatParticipantAdmin)): + await self.update_admin(update) + elif isinstance(update, UpdateChatParticipants): + portal = po.Portal.get_by_tgid(update.participants.chat_id) + if portal and portal.mxid: + await portal.update_telegram_participants(update.participants.participants) + elif isinstance(update, UpdateChannelPinnedMessage): + portal = po.Portal.get_by_tgid(update.channel_id) + if portal and portal.mxid: + await portal.update_telegram_pin(self, update.id) + elif isinstance(update, (UpdateUserName, UpdateUserPhoto)): + await self.update_others_info(update) + elif isinstance(update, UpdateReadHistoryOutbox): + await self.update_read_receipt(update) + else: + self.log.debug("Unhandled update: %s", update) + + async def update_read_receipt(self, update): + if not isinstance(update.peer, PeerUser): + self.log.debug("Unexpected read receipt peer: %s", update.peer) + return + + portal = po.Portal.get_by_tgid(update.peer.user_id, self.tgid) + if not portal or not portal.mxid: + return + + # We check that these are user read receipts, so tg_space is always the user ID. + message = DBMessage.query.get((update.max_id, self.tgid)) + if not message: + return + + puppet = pu.Puppet.get(update.peer.user_id) + await puppet.intent.mark_read(portal.mxid, message.mxid) + + async def update_admin(self, update): + # TODO duplication not checked + portal = po.Portal.get_by_tgid(update.chat_id, peer_type="chat") + if isinstance(update, UpdateChatAdmins): + await portal.set_telegram_admins_enabled(update.enabled) + elif isinstance(update, UpdateChatParticipantAdmin): + await portal.set_telegram_admin(update.user_id) + else: + self.log.warning("Unexpected admin status update: %s", update) + + async def update_typing(self, update): + if isinstance(update, UpdateUserTyping): + portal = po.Portal.get_by_tgid(update.user_id, self.tgid, "user") + else: + portal = po.Portal.get_by_tgid(update.chat_id, peer_type="chat") + sender = pu.Puppet.get(update.user_id) + await portal.handle_telegram_typing(sender, update) + + async def update_others_info(self, update): + # TODO duplication not checked + puppet = pu.Puppet.get(update.user_id) + if isinstance(update, UpdateUserName): + if await puppet.update_displayname(self, update): + puppet.save() + elif isinstance(update, UpdateUserPhoto): + if await puppet.update_avatar(self, update.photo.photo_big): + puppet.save() + else: + self.log.warning("Unexpected other user info update: %s", update) + + async def update_status(self, update): + puppet = pu.Puppet.get(update.user_id) + if isinstance(update.status, UserStatusOnline): + await puppet.intent.set_presence("online") + elif isinstance(update.status, UserStatusOffline): + await puppet.intent.set_presence("offline") + else: + self.log.warning("Unexpected user status update: %s", update) + return + + def get_message_details(self, update): + if isinstance(update, UpdateShortChatMessage): + portal = po.Portal.get_by_tgid(update.chat_id, peer_type="chat") + sender = pu.Puppet.get(update.from_id) + elif isinstance(update, UpdateShortMessage): + portal = po.Portal.get_by_tgid(update.user_id, self.tgid, "user") + sender = pu.Puppet.get(self.tgid if update.out else update.user_id) + elif isinstance(update, (UpdateNewMessage, UpdateNewChannelMessage, + UpdateEditMessage, UpdateEditChannelMessage)): + update = update.message + if isinstance(update.to_id, PeerUser) and not update.out: + portal = po.Portal.get_by_tgid(update.from_id, peer_type="user", + tg_receiver=self.tgid) + else: + portal = po.Portal.get_by_entity(update.to_id, receiver_id=self.tgid) + sender = pu.Puppet.get(update.from_id) if update.from_id else None + else: + self.log.warning( + f"Unexpected message type in User#get_message_details: {type(update)}") + return update, None, None + return update, sender, portal + + @staticmethod + async def _try_redact(portal, message): + if not portal: + return + try: + await portal.main_intent.redact(message.mx_room, message.mxid) + except MatrixRequestError: + pass + + async def delete_message(self, update): + if len(update.messages) > MAX_DELETIONS: + return + + for message in update.messages: + message = DBMessage.query.get((message, self.tgid)) + if not message: + continue + self.db.delete(message) + number_left = DBMessage.query.filter(DBMessage.mxid == message.mxid, + DBMessage.mx_room == message.mx_room).count() + if number_left == 0: + portal = po.Portal.get_by_mxid(message.mx_room) + await self._try_redact(portal, message) + self.db.commit() + + async def delete_channel_message(self, update): + if len(update.messages) > MAX_DELETIONS: + return + + portal = po.Portal.get_by_tgid(update.channel_id) + if not portal: + return + + for message in update.messages: + message = DBMessage.query.get((message, portal.tgid)) + if not message: + continue + self.db.delete(message) + await self._try_redact(portal, message) + self.db.commit() + + def update_message(self, original_update): + update, sender, portal = self.get_message_details(original_update) + + if isinstance(update, MessageService): + if isinstance(update.action, MessageActionChannelMigrateFrom): + self.log.debug(f"Ignoring action %s to %s by %d", update.action, + portal.tgid_log, + sender.id) + return + self.log.debug("Handling action %s to %s by %d", update.action, portal.tgid_log, + sender.id) + return portal.handle_telegram_action(self, sender, update) + + user = sender.tgid if sender else "admin" + if isinstance(original_update, (UpdateEditMessage, UpdateEditChannelMessage)): + if config["bridge.edits_as_replies"]: + self.log.debug("Handling edit %s to %s by %s", update, portal.tgid_log, user) + return portal.handle_telegram_edit(self, sender, update) + return + + self.log.debug("Handling message %s to %s by %s", update, portal.tgid_log, user) + return portal.handle_telegram_message(self, sender, update) + + # endregion + + +def init(context): + global config, MAX_DELETIONS + AbstractUser.az, AbstractUser.db, config, AbstractUser.loop, _ = context + MAX_DELETIONS = config.get("bridge.max_telegram_delete", 10) diff --git a/mautrix_telegram/bot.py b/mautrix_telegram/bot.py new file mode 100644 index 00000000..0a6e71a4 --- /dev/null +++ b/mautrix_telegram/bot.py @@ -0,0 +1,120 @@ +# -*- coding: future_fstrings -*- +# mautrix-telegram - A Matrix-Telegram puppeting bridge +# Copyright (C) 2018 Tulir Asokan +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +import logging + +from telethon.tl.types import * +from telethon.errors import ChannelInvalidError, ChannelPrivateError +from telethon.tl.functions.messages import GetChatsRequest +from telethon.tl.functions.channels import GetChannelsRequest + +from .abstract_user import AbstractUser +from .db import BotChat +from . import puppet as pu + +config = None + + +class Bot(AbstractUser): + log = logging.getLogger("mau.bot") + + def __init__(self, token): + super().__init__() + self.token = token + self.whitelisted = True + self._init_client() + self.chats = {chat.id: chat.type for chat in BotChat.query.all()} + + async def start(self): + await super().start() + if not self.logged_in: + await self.client.sign_in(bot_token=self.token) + await self.post_login() + return self + + async def post_login(self): + info = await self.client.get_me() + self.tgid = info.id + self.mxid = pu.Puppet.get_mxid_from_id(self.tgid) + + chat_ids = [id for id, type in self.chats.items() if type == "chat"] + response = await self.client(GetChatsRequest(chat_ids)) + for chat in response.chats: + if isinstance(chat, ChatForbidden) or chat.left or chat.deactivated: + self.remove_chat(chat.id) + + channel_ids = [InputChannel(id, 0) + for id, type in self.chats.items() + if type == "channel"] + for id in channel_ids: + try: + await self.client(GetChannelsRequest([id])) + except (ChannelPrivateError, ChannelInvalidError): + self.remove_chat(id.channel_id) + + def add_chat(self, id, type): + if id not in self.chats: + self.chats[id] = type + self.db.add(BotChat(id=id, type=type)) + self.db.commit() + + def remove_chat(self, id): + try: + del self.chats[id] + except KeyError: + pass + self.db.delete(BotChat.query.get(id)) + self.db.commit() + + async def update(self, update): + if not isinstance(update, (UpdateNewMessage, UpdateNewChannelMessage)): + return + elif not isinstance(update.message, MessageService): + return + + to_id = update.message.to_id + if isinstance(to_id, PeerChannel): + to_id = to_id.channel_id + type = "channel" + elif isinstance(to_id, PeerChat): + to_id = to_id.chat_id + type = "chat" + else: + return + + action = update.message.action + if isinstance(action, MessageActionChatAddUser): + if self.tgid in action.users: + self.add_chat(to_id, type) + elif isinstance(action, MessageActionChatDeleteUser): + if action.user_id == self.tgid: + self.remove_chat(to_id) + + def is_in_chat(self, peer_id): + return peer_id in self.chats + + @property + def name(self): + return "bot" + + +def init(context): + global config + config = context.config + token = config["telegram.bot_token"] + if token: + return Bot(token) + return None diff --git a/mautrix_telegram/commands/auth.py b/mautrix_telegram/commands/auth.py index 3797f651..80d4d39b 100644 --- a/mautrix_telegram/commands/auth.py +++ b/mautrix_telegram/commands/auth.py @@ -19,6 +19,7 @@ import asyncio from telethon.errors import * from . import command_handler +from .. import puppet as pu @command_handler(needs_auth=False) @@ -32,6 +33,18 @@ async def ping(evt): return await evt.reply("You're not logged in.") +@command_handler() +async def ping_bot(evt): + if not evt.tgbot: + return await evt.reply("Telegram message relay bot not configured.") + bot_info = await evt.tgbot.client.get_me() + mxid = pu.Puppet.get_mxid_from_id(bot_info.id) + displayname = bot_info.first_name + return await evt.reply("Telegram message relay bot is active: " + f"[{displayname}](https://matrix.to/#/{mxid}) (ID {bot_info.id})\n\n" + "To use the bot, simply invite it to a portal room.") + + @command_handler(needs_auth=False, management_only=True) def register(evt): return evt.reply("Not yet implemented.") @@ -45,6 +58,7 @@ async def login(evt): return await evt.reply("**Usage:** `$cmdprefix+sp login `") phone_number = evt.args[0] try: + await evt.sender.ensure_started(even_if_no_session=True) await evt.sender.client.sign_in(phone_number) evt.sender.command_status = { "next": enter_code, @@ -60,6 +74,9 @@ async def login(evt): "The ban is usually applied for around a day.") except PhoneNumberBannedError: return await evt.reply("Your phone number has been banned from Telegram.") + except PhoneNumberUnoccupiedError: + return await evt.reply("That phone number has not been registered. " + "Please register with `$cmdprefix+sp register `.") except Exception: evt.log.exception("Error requesting phone code") return await evt.reply("Unhandled exception while requesting code. " @@ -72,13 +89,11 @@ async def enter_code(evt): return await evt.reply("**Usage:** `$cmdprefix+sp enter-code `") try: + await evt.sender.ensure_started(even_if_no_session=True) user = await evt.sender.client.sign_in(code=evt.args[0]) asyncio.ensure_future(evt.sender.post_login(user), loop=evt.loop) evt.sender.command_status = None return await evt.reply(f"Successfully logged in as @{user.username}") - except PhoneNumberUnoccupiedError: - return await evt.reply("That phone number has not been registered. " - "Please register with `$cmdprefix+sp register `.") except PhoneCodeExpiredError: return await evt.reply( "Phone code expired. Try again with `$cmdprefix+sp login `.") @@ -103,6 +118,7 @@ async def enter_password(evt): return await evt.reply("**Usage:** `$cmdprefix+sp enter-password `") try: + await evt.sender.ensure_started(even_if_no_session=True) user = await evt.sender.client.sign_in(password=evt.args[0]) asyncio.ensure_future(evt.sender.post_login(user), loop=evt.loop) evt.sender.command_status = None diff --git a/mautrix_telegram/commands/clean_rooms.py b/mautrix_telegram/commands/clean_rooms.py index 96a2154c..06ebb428 100644 --- a/mautrix_telegram/commands/clean_rooms.py +++ b/mautrix_telegram/commands/clean_rooms.py @@ -52,7 +52,7 @@ async def _find_rooms(intent): return management_rooms, unidentified_rooms, portals, empty_portals -@command_handler(needs_admin=True, name="clean-rooms") +@command_handler(needs_admin=True, needs_auth=False, name="clean-rooms") async def clean_rooms(evt): if not evt.is_management: return await evt.reply("`clean-rooms` is a particularly spammy command. Please don't " @@ -66,7 +66,7 @@ async def clean_rooms(evt): or ["No management rooms found."]) reply.append("#### Active portal rooms (A)") reply += ([f"{n+1}. [P{n+1}](https://matrix.to/#/{portal.mxid}) " - + f"(to Telegram chat \"{portal.title}\")" + f"(to Telegram chat \"{portal.title}\")" for n, portal in enumerate(portals)] or ["No active portal rooms found."]) reply.append("#### Unidentified rooms (U)") @@ -75,7 +75,7 @@ async def clean_rooms(evt): or ["No unidentified rooms found."]) reply.append("#### Inactive portal rooms (I)") reply += ([f"{n}. [E{n}](https://matrix.to/#/{portal.mxid}) " - + f"(to Telegram chat \"{portal.title}\")" + f"(to Telegram chat \"{portal.title}\")" for n, portal in enumerate(empty_portals)] or ["No inactive portal rooms found."]) @@ -141,21 +141,21 @@ async def set_rooms_to_clean(evt, management_rooms, unidentified_rooms, portals, "**Usage:** `$cmdprefix+sp clean-groups <_M|A|U|I_>") else: return await evt.reply(f"Unknown room cleaning action `{command}`. " - + "Use `$cmdprefix+sp cancel` to cancel room " - + "cleaning.") + "Use `$cmdprefix+sp cancel` to cancel room " + "cleaning.") evt.sender.command_status = { "next": lambda confirm: execute_room_cleanup(confirm, rooms_to_clean), "action": "Room cleaning", } await evt.reply(f"To confirm cleaning up {len(rooms_to_clean)} rooms, type" - + "`$cmdprefix+sp confirm-clean`.") + "`$cmdprefix+sp confirm-clean`.") async def execute_room_cleanup(evt, rooms_to_clean): if len(evt.args) > 0 and evt.args[0] == "confirm-clean": await evt.reply(f"Cleaning {len(rooms_to_clean)} rooms. " - + "This might take a while.") + "This might take a while.") cleaned = 0 for room in rooms_to_clean: if isinstance(room, po.Portal): diff --git a/mautrix_telegram/commands/handler.py b/mautrix_telegram/commands/handler.py index a297ea8a..082afadd 100644 --- a/mautrix_telegram/commands/handler.py +++ b/mautrix_telegram/commands/handler.py @@ -29,7 +29,7 @@ def command_handler(needs_auth=True, management_only=False, needs_admin=False, n def wrapper(evt): if management_only and not evt.is_management: return evt.reply(f"`{evt.command}` is a restricted command:" - + "you may only run it in management rooms.") + "you may only run it in management rooms.") elif needs_auth and not evt.sender.logged_in: return evt.reply("This command requires you to be logged in.") elif needs_admin and not evt.sender.is_admin: @@ -47,6 +47,8 @@ class CommandEvent: self.az = handler.az self.log = handler.log self.loop = handler.loop + self.tgbot = handler.tgbot + self.config = handler.config self.command_prefix = handler.command_prefix self.room_id = room self.sender = sender @@ -71,7 +73,7 @@ class CommandHandler: log = logging.getLogger("mau.commands") def __init__(self, context): - self.az, self.db, self.config, self.loop = context + self.az, self.db, self.config, self.loop, self.tgbot = context self.command_prefix = self.config["bridge.command_prefix"] # region Utility functions for handling commands @@ -94,6 +96,6 @@ class CommandHandler: except FloodWaitError as e: return evt.reply(f"Flood error: Please wait {format_duration(e.seconds)}") except Exception: - self.log.exception(f"Fatal error handling command " - + f"{evt.command} {' '.join(args)} from {sender.mxid}") + self.log.exception("Fatal error handling command " + f"{evt.command} {' '.join(args)} from {sender.mxid}") return evt.reply("Fatal error while handling command. Check logs for more details.") diff --git a/mautrix_telegram/commands/meta.py b/mautrix_telegram/commands/meta.py index 6ffff94f..0338b402 100644 --- a/mautrix_telegram/commands/meta.py +++ b/mautrix_telegram/commands/meta.py @@ -65,6 +65,7 @@ def help(evt): `channel` (defaults to `group`). #### Portal management +**ping-bot** - Get info of the message relay Telegram bot. **upgrade** - Upgrade a normal Telegram group to a supergroup. **invite-link** - Get a Telegram invite link to the current chat. **delete-portal** - Forget the current portal room. Only works for group chats; to delete diff --git a/mautrix_telegram/commands/telegram.py b/mautrix_telegram/commands/telegram.py index f77c2110..4774501e 100644 --- a/mautrix_telegram/commands/telegram.py +++ b/mautrix_telegram/commands/telegram.py @@ -51,7 +51,7 @@ async def search(evt): else: reply += ["**Results in contacts:**", ""] reply += [(f"* [{puppet.displayname}](https://matrix.to/#/{puppet.mxid}): " - + f"{puppet.id} ({similarity}% match)") + f"{puppet.id} ({similarity}% match)") for puppet, similarity in results] # TODO somehow show remote channel results when joining by alias is possible? @@ -71,8 +71,8 @@ async def pm(evt): return await evt.reply("That doesn't seem to be a user.") portal = po.Portal.get_by_entity(user, evt.sender.tgid) await portal.create_matrix_room(evt.sender, user, [evt.sender.mxid]) - return await evt.reply( - f"Created private chat room with {pu.Puppet.get_displayname(user, False)}") + return await evt.reply("Created private chat room with " + f"{pu.Puppet.get_displayname(user, False)}") @command_handler() @@ -116,9 +116,9 @@ async def delete_portal(evt): "action": "Portal deletion", } return await evt.reply("Please confirm deletion of portal " - + f"[{room_id}](https://matrix.to/#/{room_id}) " - + f"to Telegram chat \"{portal.title}\" " - + "by typing `$cmdprefix+sp confirm-delete`") + f"[{room_id}](https://matrix.to/#/{room_id}) " + f"to Telegram chat \"{portal.title}\" " + "by typing `$cmdprefix+sp confirm-delete`") @command_handler() @@ -151,11 +151,12 @@ async def join(evt): for chat in updates.chats: portal = po.Portal.get_by_entity(chat) if portal.mxid: + await portal.invite_to_matrix([evt.sender.mxid]) + return await evt.reply(f"Invited you to portal of {portal.title}") + else: + await evt.reply(f"Creating room for {chat.title}... This might take a while.") await portal.create_matrix_room(evt.sender, chat, [evt.sender.mxid]) return await evt.reply(f"Created room for {portal.title}") - else: - await portal.invite_matrix([evt.sender.mxid]) - return await evt.reply(f"Invited you to portal of {portal.title}") @command_handler() @@ -183,16 +184,16 @@ async def create(evt): return await evt.reply("Please set a title before creating a Telegram chat.") elif (not levels or not levels["users"] or evt.az.intent.mxid not in levels["users"] or levels["users"][evt.az.intent.mxid] < 100): - return await evt.reply(f"Please give " - + f"[the bridge bot](https://matrix.to/#/{evt.az.intent.mxid})" - + f" a power level of 100 before creating a Telegram chat.") + return await evt.reply("Please give " + f"[the bridge bot](https://matrix.to/#/{evt.az.intent.mxid})" + " a power level of 100 before creating a Telegram chat.") else: for user, level in levels["users"].items(): if level >= 100 and user != evt.az.intent.mxid: return await evt.reply( f"Please make sure only the bridge bot has power level above" - + f"99 before creating a Telegram chat.\n\n" - + f"Use power level 95 instead of 100 for admins.") + f"99 before creating a Telegram chat.\n\n" + f"Use power level 95 instead of 100 for admins.") supergroup = type == "supergroup" type = { diff --git a/mautrix_telegram/config.py b/mautrix_telegram/config.py index f2f81426..8b4dd5e3 100644 --- a/mautrix_telegram/config.py +++ b/mautrix_telegram/config.py @@ -94,7 +94,7 @@ class Config(DictWithRecursion): self.set("appservice.hs_token", self._new_token()) url = (f"{self['appservice.protocol']}://" - + f"{self['appservice.hostname']}:{self['appservice.port']}") + f"{self['appservice.hostname']}:{self['appservice.port']}") self._registration = { "id": self.get("appservice.id", "telegram"), "as_token": self["appservice.as_token"], diff --git a/mautrix_telegram/context.py b/mautrix_telegram/context.py new file mode 100644 index 00000000..d53f7dc9 --- /dev/null +++ b/mautrix_telegram/context.py @@ -0,0 +1,34 @@ +# -*- coding: future_fstrings -*- +# mautrix-telegram - A Matrix-Telegram puppeting bridge +# Copyright (C) 2018 Tulir Asokan +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +class Context: + def __init__(self, az, db, config, loop, bot, mx): + self.az = az + self.db = db + self.config = config + self.loop = loop + self.bot = bot + self.mx = mx + + def __iter__(self): + yield self.az + yield self.db + yield self.config + yield self.loop + yield self.bot + # yield self.mx diff --git a/mautrix_telegram/db.py b/mautrix_telegram/db.py index b8a4b372..81829c7a 100644 --- a/mautrix_telegram/db.py +++ b/mautrix_telegram/db.py @@ -14,7 +14,8 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . -from sqlalchemy import Column, UniqueConstraint, ForeignKey, ForeignKeyConstraint, Integer, String +from sqlalchemy import (Column, UniqueConstraint, ForeignKey, ForeignKeyConstraint, Integer, + BigInteger, String, Boolean) from sqlalchemy.orm import relationship from .base import Base @@ -48,7 +49,7 @@ class Message(Base): tgid = Column(Integer, primary_key=True) tg_space = Column(Integer, primary_key=True) - __table_args__ = (UniqueConstraint('mxid', 'mx_room', 'tg_space', name='_mx_id_room'),) + __table_args__ = (UniqueConstraint("mxid", "mx_room", "tg_space", name="_mx_id_room"),) class UserPortal(Base): @@ -95,9 +96,30 @@ class Puppet(Base): photo_id = Column(String, nullable=True) +# Fucking Telegram not telling bots what chats they are in 3:< +class BotChat(Base): + query = None + __tablename__ = "bot_chat" + id = Column(Integer, primary_key=True) + type = Column(String, nullable=False) + + +class TelegramFile(Base): + query = None + __tablename__ = "telegram_file" + + id = Column(String, primary_key=True) + mxc = Column(String) + mime_type = Column(String) + was_converted = Column(Boolean) + timestamp = Column(BigInteger, primary_key=True) + + def init(db_session): Portal.query = db_session.query_property() Message.query = db_session.query_property() UserPortal.query = db_session.query_property() User.query = db_session.query_property() Puppet.query = db_session.query_property() + BotChat.query = db_session.query_property() + TelegramFile.query = db_session.query_property() diff --git a/mautrix_telegram/formatter.py b/mautrix_telegram/formatter.py deleted file mode 100644 index 9590b5ec..00000000 --- a/mautrix_telegram/formatter.py +++ /dev/null @@ -1,380 +0,0 @@ -# -*- coding: future_fstrings -*- -# mautrix-telegram - A Matrix-Telegram puppeting bridge -# Copyright (C) 2018 Tulir Asokan -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -from html import escape, unescape -from html.parser import HTMLParser -from collections import deque -import re -import logging - -from mautrix_appservice import MatrixRequestError - -from telethon.tl.types import * - -from . import user as u, puppet as p -from .db import Message as DBMessage - -log = logging.getLogger("mau.formatter") - -# TEXT LEN EXPLANATION: -# Telegram formatting counts two bytes in an UTF-16 string as one character. -# -# For Telegram -> Matrix formatting, we get the same counting mechanism by encoding the input -# text as UTF-16 Little Endian and doubling all the offsets and lengths given by Telegram. With -# those doubled values, we process the input entities and text. The text is converted back to -# native str format before it's inserted into the output HTML. -# -# For Matrix -> Telegram formatting, do the same input encoding, but divide the length by two -# instead of multiplying when generating the lengths and offsets of Telegram entities. -# -# The endianness doesn't matter, but it has to be specified to avoid the two BOM bits messing -# everything up. -TEMP_ENC = "utf-16-le" - - -# region Matrix to Telegram - - -class MatrixParser(HTMLParser): - mention_regex = re.compile("https://matrix.to/#/(@.+)") - - def __init__(self): - super().__init__() - self.text = "" - self.entities = [] - self._building_entities = {} - self._list_counter = 0 - self._open_tags = deque() - self._open_tags_meta = deque() - self._previous_ended_line = True - - def handle_starttag(self, tag, attrs): - self._open_tags.appendleft(tag) - self._open_tags_meta.appendleft(0) - attrs = dict(attrs) - entity_type = None - args = {} - if tag == "strong" or tag == "b": - entity_type = MessageEntityBold - elif tag == "em" or tag == "i": - entity_type = MessageEntityItalic - elif tag == "code": - try: - pre = self._building_entities["pre"] - try: - pre.language = attrs["class"][len("language-"):] - except KeyError: - pass - except KeyError: - entity_type = MessageEntityCode - elif tag == "pre": - entity_type = MessageEntityPre - args["language"] = "" - elif tag == "a": - try: - url = attrs["href"] - except KeyError: - return - mention = self.mention_regex.search(url) - if mention: - mxid = mention.group(1) - user = p.Puppet.get_by_mxid(mxid, create=False) - if not user: - user = u.User.get_by_mxid(mxid, create=False) - if not user: - return - if user.username: - entity_type = MessageEntityMention - url = f"@{user.username}" - else: - entity_type = MessageEntityMentionName - args["user_id"] = user.tgid - elif url.startswith("mailto:"): - url = url[len("mailto:"):] - entity_type = MessageEntityEmail - else: - if self.get_starttag_text() == url: - entity_type = MessageEntityUrl - else: - entity_type = MessageEntityTextUrl - args["url"] = url - url = None - self._open_tags_meta.popleft() - self._open_tags_meta.appendleft(url) - - if entity_type and tag not in self._building_entities: - # See "TEXT LEN EXPLANATION" near start of file - offset = int(len(self.text.encode(TEMP_ENC)) / 2) - self._building_entities[tag] = entity_type(offset=offset, length=0, **args) - - def _list_depth(self): - depth = 0 - for tag in self._open_tags: - if tag == "ol" or tag == "ul": - depth += 1 - return depth - - def handle_data(self, text): - text = unescape(text) - previous_tag = self._open_tags[0] if len(self._open_tags) > 0 else "" - list_format_offset = 0 - if previous_tag == "a": - url = self._open_tags_meta[0] - if url: - text = url - elif len(self._open_tags) > 1 and self._previous_ended_line and previous_tag == "li": - list_type = self._open_tags[1] - indent = (self._list_depth() - 1) * 4 * " " - text = text.strip("\n") - if len(text) == 0: - return - elif list_type == "ul": - text = f"{indent}* {text}" - list_format_offset = len(indent) + 2 - elif list_type == "ol": - n = self._open_tags_meta[1] - n += 1 - self._open_tags_meta[1] = n - text = f"{indent}{n}. {text}" - list_format_offset = len(indent) + 3 - for tag, entity in self._building_entities.items(): - # See "TEXT LEN EXPLANATION" near start of file - entity.length += int(len(text.strip("\n").encode(TEMP_ENC)) / 2) - entity.offset += list_format_offset - - if text.endswith("\n"): - self._previous_ended_line = True - else: - self._previous_ended_line = False - - self.text += text - - def handle_endtag(self, tag): - try: - self._open_tags.popleft() - self._open_tags_meta.popleft() - except IndexError: - pass - if (tag == "ul" or tag == "ol") and self.text.endswith("\n"): - self.text = self.text[:-1] - entity = self._building_entities.pop(tag, None) - if entity: - self.entities.append(entity) - - -def matrix_to_telegram(html): - try: - parser = MatrixParser() - parser.feed(html) - return parser.text, parser.entities - except Exception: - log.exception("Failed to convert Matrix format:\nhtml=%s", html) - - -def matrix_reply_to_telegram(content, tg_space, room_id=None): - try: - reply = content["m.relates_to"]["m.in_reply_to"] - room_id = room_id or reply["room_id"] - event_id = reply["event_id"] - message = DBMessage.query.filter(DBMessage.mxid == event_id and - DBMessage.tg_space == tg_space and - DBMessage.mx_room == room_id).one_or_none() - if message: - return message.tgid - except KeyError: - pass - return None - - -# endregion -# region Telegram to Matrix - -def telegram_reply_to_matrix(evt, source): - if evt.reply_to_msg_id: - space = (evt.to_id.channel_id - if isinstance(evt, Message) and isinstance(evt.to_id, PeerChannel) - else source.tgid) - msg = DBMessage.query.get((evt.reply_to_msg_id, space)) - if msg: - return { - "m.in_reply_to": { - "event_id": msg.mxid, - "room_id": msg.mx_room, - } - } - return {} - - -async def telegram_event_to_matrix(evt, source, native_replies=False, message_link_in_reply=False, - main_intent=None, reply_text="Reply"): - text = evt.message - html = telegram_to_matrix(evt.message, evt.entities) if evt.entities else None - relates_to = {} - - if evt.fwd_from: - if not html: - html = escape(text) - from_id = evt.fwd_from.from_id - user = u.User.get_by_tgid(from_id) - if user: - fwd_from = f"{user.mxid}" - else: - puppet = p.Puppet.get(from_id, create=False) - if puppet and puppet.displayname: - fwd_from = f"{puppet.displayname}" - else: - user = await source.client.get_entity(from_id) - if user: - fwd_from = p.Puppet.get_displayname(user, format=False) - else: - fwd_from = None - if not fwd_from: - fwd_from = "Unknown user" - html = (f"Forwarded message from {fwd_from}
" - + f"
{html}
") - - if evt.reply_to_msg_id: - space = (evt.to_id.channel_id - if isinstance(evt, Message) and isinstance(evt.to_id, PeerChannel) - else source.tgid) - msg = DBMessage.query.get((evt.reply_to_msg_id, space)) - if msg: - if native_replies: - relates_to["m.in_reply_to"] = { - "event_id": msg.mxid, - "room_id": msg.mx_room, - } - if reply_text == "Edit": - html = "Edit: " + (html or escape(text)) - else: - try: - event = await main_intent.get_event(msg.mx_room, msg.mxid) - content = event["content"] - body = (content["formatted_body"] - if "formatted_body" in content - else content["body"]) - sender = event['sender'] - puppet = p.Puppet.get_by_mxid(sender, create=False) - displayname = puppet.displayname if puppet else sender - reply_to_user = f"{displayname}" - reply_to_msg = (("{reply_text}") - if message_link_in_reply else "Reply") - quote = f"{reply_to_msg} to {reply_to_user}
{body}
" - except (ValueError, KeyError, MatrixRequestError): - quote = "{reply_text} to unknown user (Failed to fetch message):
" - if html: - html = quote + html - else: - html = quote + escape(text) - - if isinstance(evt, Message) and evt.post and evt.post_author: - if not html: - html = escape(text) - text += f"\n- {evt.post_author}" - html += f"
- {evt.post_author}" - - if html: - html = html.replace("\n", "
") - - return text, html, relates_to - - -def telegram_to_matrix(text, entities): - try: - return _telegram_to_matrix(text, entities) - except Exception: - log.exception("Failed to convert Telegram format:\n" - "message=%s\n" - "entities=%s", - text, entities) - - -def _telegram_to_matrix(text, entities): - if not entities: - return text - # See "TEXT LEN EXPLANATION" near start of file - text = text.encode(TEMP_ENC) - html = [] - last_offset = 0 - for entity in entities: - entity.offset *= 2 - entity.length *= 2 - if entity.offset > last_offset: - html.append(escape(text[last_offset:entity.offset].decode(TEMP_ENC))) - elif entity.offset < last_offset: - continue - - skip_entity = False - entity_text = escape(text[entity.offset:entity.offset + entity.length].decode(TEMP_ENC)) - entity_type = type(entity) - - if entity_type == MessageEntityBold: - html.append(f"{entity_text}") - elif entity_type == MessageEntityItalic: - html.append(f"{entity_text}") - elif entity_type == MessageEntityCode: - html.append(f"{entity_text}") - elif entity_type == MessageEntityPre: - if entity.language: - html.append("
"
-                            + f"{entity_text}"
-                            + "
") - else: - html.append(f"
{entity_text}
") - elif entity_type == MessageEntityMention: - username = entity_text[1:] - - user = u.User.find_by_username(username) - if user: - mxid = user.mxid - else: - puppet = p.Puppet.find_by_username(username) - mxid = puppet.mxid if puppet else None - if mxid: - html.append(f"{entity_text}") - else: - skip_entity = True - elif entity_type == MessageEntityMentionName: - user = u.User.get_by_tgid(entity.user_id) - if user: - mxid = user.mxid - else: - puppet = p.Puppet.get(entity.user_id, create=False) - mxid = puppet.mxid if puppet else None - if mxid: - html.append(f"{entity_text}") - else: - skip_entity = True - elif entity_type == MessageEntityEmail: - html.append(f"{entity_text}") - elif entity_type in {MessageEntityTextUrl, MessageEntityUrl}: - url = escape(entity.url) if entity_type == MessageEntityTextUrl else entity_text - if not url.startswith(("https://", "http://", "ftp://", "magnet://")): - url = "http://" + url - html.append(f"{entity_text}") - elif entity_type == MessageEntityBotCommand: - html.append(f"!{entity_text[1:]}") - elif entity_type == MessageEntityHashtag: - html.append(f"{entity_text}") - else: - skip_entity = True - last_offset = entity.offset + (0 if skip_entity else entity.length) - html.append(text[last_offset:].decode(TEMP_ENC)) - - return "".join(html) - -# endregion diff --git a/mautrix_telegram/formatter/__init__.py b/mautrix_telegram/formatter/__init__.py new file mode 100644 index 00000000..cf46d796 --- /dev/null +++ b/mautrix_telegram/formatter/__init__.py @@ -0,0 +1,2 @@ +from .from_matrix import matrix_reply_to_telegram, matrix_to_telegram +from .from_telegram import telegram_reply_to_matrix, telegram_to_matrix diff --git a/mautrix_telegram/formatter/from_matrix.py b/mautrix_telegram/formatter/from_matrix.py new file mode 100644 index 00000000..dfbe490b --- /dev/null +++ b/mautrix_telegram/formatter/from_matrix.py @@ -0,0 +1,178 @@ +# -*- coding: future_fstrings -*- +# mautrix-telegram - A Matrix-Telegram puppeting bridge +# Copyright (C) 2018 Tulir Asokan +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +from html import unescape +from html.parser import HTMLParser +from collections import deque +import re +import logging + +from telethon.tl.types import * + +from .. import user as u, puppet as p +from ..db import Message as DBMessage +from .util import add_surrogates, remove_surrogates + +log = logging.getLogger("mau.fmt.mx") + + +class MatrixParser(HTMLParser): + mention_regex = re.compile("https://matrix.to/#/(@.+)") + + def __init__(self): + super().__init__() + self.text = "" + self.entities = [] + self._building_entities = {} + self._list_counter = 0 + self._open_tags = deque() + self._open_tags_meta = deque() + self._previous_ended_line = True + + def handle_starttag(self, tag, attrs): + self._open_tags.appendleft(tag) + self._open_tags_meta.appendleft(0) + attrs = dict(attrs) + entity_type = None + args = {} + if tag == "strong" or tag == "b": + entity_type = MessageEntityBold + elif tag == "em" or tag == "i": + entity_type = MessageEntityItalic + elif tag == "code": + try: + pre = self._building_entities["pre"] + try: + pre.language = attrs["class"][len("language-"):] + except KeyError: + pass + except KeyError: + entity_type = MessageEntityCode + elif tag == "pre": + entity_type = MessageEntityPre + args["language"] = "" + elif tag == "a": + try: + url = attrs["href"] + except KeyError: + return + mention = self.mention_regex.search(url) + if mention: + mxid = mention.group(1) + user = p.Puppet.get_by_mxid(mxid, create=False) + if not user: + user = u.User.get_by_mxid(mxid, create=False) + if not user: + return + if user.username: + entity_type = MessageEntityMention + url = f"@{user.username}" + else: + entity_type = MessageEntityMentionName + args["user_id"] = user.tgid + elif url.startswith("mailto:"): + url = url[len("mailto:"):] + entity_type = MessageEntityEmail + else: + if self.get_starttag_text() == url: + entity_type = MessageEntityUrl + else: + entity_type = MessageEntityTextUrl + args["url"] = url + url = None + self._open_tags_meta.popleft() + self._open_tags_meta.appendleft(url) + + if entity_type and tag not in self._building_entities: + offset = len(self.text) + self._building_entities[tag] = entity_type(offset=offset, length=0, **args) + + def _list_depth(self): + depth = 0 + for tag in self._open_tags: + if tag == "ol" or tag == "ul": + depth += 1 + return depth + + def handle_data(self, text): + text = unescape(text) + previous_tag = self._open_tags[0] if len(self._open_tags) > 0 else "" + list_format_offset = 0 + if previous_tag == "a": + url = self._open_tags_meta[0] + if url: + text = url + elif len(self._open_tags) > 1 and self._previous_ended_line and previous_tag == "li": + list_type = self._open_tags[1] + indent = (self._list_depth() - 1) * 4 * " " + text = text.strip("\n") + if len(text) == 0: + return + elif list_type == "ul": + text = f"{indent}* {text}" + list_format_offset = len(indent) + 2 + elif list_type == "ol": + n = self._open_tags_meta[1] + n += 1 + self._open_tags_meta[1] = n + text = f"{indent}{n}. {text}" + list_format_offset = len(indent) + 3 + for tag, entity in self._building_entities.items(): + entity.length += len(text.strip("\n")) + entity.offset += list_format_offset + + if text.endswith("\n"): + self._previous_ended_line = True + else: + self._previous_ended_line = False + + self.text += text + + def handle_endtag(self, tag): + try: + self._open_tags.popleft() + self._open_tags_meta.popleft() + except IndexError: + pass + if (tag == "ul" or tag == "ol") and self.text.endswith("\n"): + self.text = self.text[:-1] + entity = self._building_entities.pop(tag, None) + if entity: + self.entities.append(entity) + + +def matrix_to_telegram(html): + try: + parser = MatrixParser() + parser.feed(add_surrogates(html)) + return remove_surrogates(parser.text), parser.entities + except Exception: + log.exception("Failed to convert Matrix format:\nhtml=%s", html) + + +def matrix_reply_to_telegram(content, tg_space, room_id=None): + try: + reply = content["m.relates_to"]["m.in_reply_to"] + room_id = room_id or reply["room_id"] + event_id = reply["event_id"] + message = DBMessage.query.filter(DBMessage.mxid == event_id, + DBMessage.tg_space == tg_space, + DBMessage.mx_room == room_id).one_or_none() + if message: + return message.tgid + except KeyError: + pass + return None diff --git a/mautrix_telegram/formatter/from_telegram.py b/mautrix_telegram/formatter/from_telegram.py new file mode 100644 index 00000000..aa74e3fa --- /dev/null +++ b/mautrix_telegram/formatter/from_telegram.py @@ -0,0 +1,199 @@ +# -*- coding: future_fstrings -*- +# mautrix-telegram - A Matrix-Telegram puppeting bridge +# Copyright (C) 2018 Tulir Asokan +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +from html import escape +import logging + +from telethon.tl.types import * +from mautrix_appservice import MatrixRequestError + +from .. import user as u, puppet as p +from ..db import Message as DBMessage +from .util import add_surrogates, remove_surrogates + +log = logging.getLogger("mau.fmt.tg") + + +def telegram_reply_to_matrix(evt, source): + if evt.reply_to_msg_id: + space = (evt.to_id.channel_id + if isinstance(evt, Message) and isinstance(evt.to_id, PeerChannel) + else source.tgid) + msg = DBMessage.query.get((evt.reply_to_msg_id, space)) + if msg: + return { + "m.in_reply_to": { + "event_id": msg.mxid, + "room_id": msg.mx_room, + } + } + return {} + + +async def telegram_to_matrix(evt, source, native_replies=False, message_link_in_reply=False, + main_intent=None, reply_text="Reply"): + text = add_surrogates(evt.message) + html = _telegram_entities_to_matrix_catch(text, evt.entities) if evt.entities else None + relates_to = {} + + if evt.fwd_from: + if not html: + html = escape(text) + from_id = evt.fwd_from.from_id + user = u.User.get_by_tgid(from_id) + if user: + fwd_from = f"{user.mxid}" + else: + puppet = p.Puppet.get(from_id, create=False) + if puppet and puppet.displayname: + fwd_from = f"{puppet.displayname}" + else: + user = await source.client.get_entity(from_id) + if user: + fwd_from = p.Puppet.get_displayname(user, format=False) + else: + fwd_from = None + if not fwd_from: + fwd_from = "Unknown user" + html = (f"Forwarded message from {fwd_from}
" + f"
{html}
") + + if evt.reply_to_msg_id: + space = (evt.to_id.channel_id + if isinstance(evt, Message) and isinstance(evt.to_id, PeerChannel) + else source.tgid) + msg = DBMessage.query.get((evt.reply_to_msg_id, space)) + if msg: + if native_replies: + relates_to["m.in_reply_to"] = { + "event_id": msg.mxid, + "room_id": msg.mx_room, + } + if reply_text == "Edit": + html = "Edit: " + (html or escape(text)) + else: + try: + event = await main_intent.get_event(msg.mx_room, msg.mxid) + content = event["content"] + body = (content["formatted_body"] + if "formatted_body" in content + else content["body"]) + sender = event['sender'] + puppet = p.Puppet.get_by_mxid(sender, create=False) + displayname = puppet.displayname if puppet else sender + reply_to_user = f"{displayname}" + reply_to_msg = (("{reply_text}") + if message_link_in_reply else "Reply") + quote = f"{reply_to_msg} to {reply_to_user}
{body}
" + except (ValueError, KeyError, MatrixRequestError): + quote = "{reply_text} to unknown user (Failed to fetch message):
" + if html: + html = quote + html + else: + html = quote + escape(text) + + if isinstance(evt, Message) and evt.post and evt.post_author: + if not html: + html = escape(text) + text += f"\n- {evt.post_author}" + html += f"
- {evt.post_author}" + + if html: + html = html.replace("\n", "
") + + return remove_surrogates(text), remove_surrogates(html), relates_to + + +def _telegram_entities_to_matrix_catch(text, entities): + try: + return _telegram_entities_to_matrix(text, entities) + except Exception: + log.exception("Failed to convert Telegram format:\n" + "message=%s\n" + "entities=%s", + text, entities) + + +def _telegram_entities_to_matrix(text, entities): + if not entities: + return text + html = [] + last_offset = 0 + for entity in entities: + if entity.offset > last_offset: + html.append(escape(text[last_offset:entity.offset])) + elif entity.offset < last_offset: + continue + + skip_entity = False + entity_text = escape(text[entity.offset:entity.offset + entity.length]) + entity_type = type(entity) + + if entity_type == MessageEntityBold: + html.append(f"{entity_text}") + elif entity_type == MessageEntityItalic: + html.append(f"{entity_text}") + elif entity_type == MessageEntityCode: + html.append(f"{entity_text}") + elif entity_type == MessageEntityPre: + if entity.language: + html.append("
"
+                            f"{entity_text}"
+                            "
") + else: + html.append(f"
{entity_text}
") + elif entity_type == MessageEntityMention: + username = entity_text[1:] + + user = u.User.find_by_username(username) + if user: + mxid = user.mxid + else: + puppet = p.Puppet.find_by_username(username) + mxid = puppet.mxid if puppet else None + if mxid: + html.append(f"{entity_text}") + else: + skip_entity = True + elif entity_type == MessageEntityMentionName: + user = u.User.get_by_tgid(entity.user_id) + if user: + mxid = user.mxid + else: + puppet = p.Puppet.get(entity.user_id, create=False) + mxid = puppet.mxid if puppet else None + if mxid: + html.append(f"{entity_text}") + else: + skip_entity = True + elif entity_type == MessageEntityEmail: + html.append(f"{entity_text}") + elif entity_type in {MessageEntityTextUrl, MessageEntityUrl}: + url = escape(entity.url) if entity_type == MessageEntityTextUrl else entity_text + if not url.startswith(("https://", "http://", "ftp://", "magnet://")): + url = "http://" + url + html.append(f"{entity_text}") + elif entity_type == MessageEntityBotCommand: + html.append(f"!{entity_text[1:]}") + elif entity_type == MessageEntityHashtag: + html.append(f"{entity_text}") + else: + skip_entity = True + last_offset = entity.offset + (0 if skip_entity else entity.length) + html.append(text[last_offset:]) + + return "".join(html) diff --git a/mautrix_telegram/formatter/util.py b/mautrix_telegram/formatter/util.py new file mode 100644 index 00000000..ff35519d --- /dev/null +++ b/mautrix_telegram/formatter/util.py @@ -0,0 +1,16 @@ +# Unicode surrogate handling +# From https://github.com/LonamiWebs/Telethon/blob/master/telethon/extensions/markdown.py +import struct + + +def add_surrogates(text): + if text is None: + return None + return "".join("".join(chr(y) for y in struct.unpack(" 1: await puppet.intent.error_and_leave(room, text=None, html=( f"Please invite " - + f"the bridge bot " - + f"first if you want to create a Telegram chat.")) + f"the bridge bot " + f"first if you want to create a Telegram chat.")) return await puppet.intent.join_room(room) @@ -71,9 +71,9 @@ class MatrixHandler: await puppet.intent.invite(portal.mxid, inviter.mxid) await puppet.intent.send_notice(room, text=None, html=( "You already have a private chat with me: " - + f"" - + "Link to room" - + "")) + f"" + "Link to room" + "")) await puppet.intent.leave_room(room) return except MatrixRequestError: @@ -88,7 +88,7 @@ class MatrixHandler: "Telegram chat is created for this room.") async def handle_invite(self, room, user, inviter): - inviter = User.get_by_mxid(inviter) + inviter = await User.get_by_mxid(inviter).ensure_started() if not inviter.whitelisted: return elif user == self.az.bot_mxid: @@ -101,6 +101,9 @@ class MatrixHandler: return user = User.get_by_mxid(user, create=False) + if not user: + return + await user.ensure_started() portal = Portal.get_by_mxid(room) if user and user.has_full_access and portal: await portal.invite_telegram(inviter, user) @@ -110,7 +113,7 @@ class MatrixHandler: self.log.debug(f"{inviter} invited {user} to {room}") async def handle_join(self, room, user): - user = User.get_by_mxid(user) + user = await User.get_by_mxid(user).ensure_started() portal = Portal.get_by_mxid(room) if not portal: @@ -120,19 +123,23 @@ class MatrixHandler: await portal.main_intent.kick(room, user.mxid, "You are not whitelisted on this Telegram bridge.") return - elif not user.logged_in: - # TODO[waiting-for-bots] once we have bot support, this won't be needed. + elif not user.logged_in and not portal.has_bot: await portal.main_intent.kick(room, user.mxid, - "You are not logged into this Telegram bridge.") + "This chat does not have a bot relaying " + "messages for unauthenticated users.") return self.log.debug(f"{user} joined {room}") - # TODO join Telegram chat if applicable + if user.logged_in: + await portal.join_matrix(user) async def handle_part(self, room, user, sender): self.log.debug(f"{user} left {room}") sender = User.get_by_mxid(sender, create=False) + if not sender: + return + await sender.ensure_started() portal = Portal.get_by_mxid(room) if not portal: @@ -143,7 +150,10 @@ class MatrixHandler: await portal.leave_matrix(puppet, sender) user = User.get_by_mxid(user, create=False) - if user and user.logged_in: + if not user: + return + await user.ensure_started() + if user.logged_in: await portal.leave_matrix(user, sender) def is_command(self, message): @@ -158,10 +168,12 @@ class MatrixHandler: self.log.debug(f"{sender} sent {message} to ${room}") is_command, text = self.is_command(message) - sender = User.get_by_mxid(sender) + sender = await User.get_by_mxid(sender).ensure_started() + if not sender.whitelisted: + return portal = Portal.get_by_mxid(room) - if sender.has_full_access and portal and not is_command: + if not is_command and portal and (sender.logged_in or portal.has_bot): await portal.handle_matrix_message(sender, message, event_id) return @@ -187,19 +199,19 @@ class MatrixHandler: async def handle_redaction(self, room, sender, event_id): portal = Portal.get_by_mxid(room) - sender = User.get_by_mxid(sender) + sender = await User.get_by_mxid(sender).ensure_started() if sender.has_full_access and portal: await portal.handle_matrix_deletion(sender, event_id) async def handle_power_levels(self, room, sender, new, old): portal = Portal.get_by_mxid(room) - sender = User.get_by_mxid(sender) + sender = await User.get_by_mxid(sender).ensure_started() if sender.has_full_access and portal: await portal.handle_matrix_power_levels(sender, new["users"], old["users"]) async def handle_room_meta(self, type, room, sender, content): portal = Portal.get_by_mxid(room) - sender = User.get_by_mxid(sender) + sender = await User.get_by_mxid(sender).ensure_started() if sender.has_full_access and portal: handler, content_key = { "m.room.name": (portal.handle_matrix_title, "name"), @@ -209,7 +221,7 @@ class MatrixHandler: if content_key not in content: # FIXME handle pass - await handler(sender, content[content_key]) + await handler(sender, content[content_key]) def filter_matrix_event(self, event): return (event["sender"] == self.az.bot_mxid @@ -236,5 +248,5 @@ class MatrixHandler: elif type == "m.room.power_levels": await self.handle_power_levels(evt["room_id"], evt["sender"], evt["content"], evt["prev_content"]) - elif type == "m.room.name" or type == "m.room.avatar" or type == "m.room.topic": + elif type in ("m.room.name", "m.room.avatar", "m.room.topic"): await self.handle_room_meta(type, evt["room_id"], evt["sender"], evt["content"]) diff --git a/mautrix_telegram/portal.py b/mautrix_telegram/portal.py index f7290287..858c3a9a 100644 --- a/mautrix_telegram/portal.py +++ b/mautrix_telegram/portal.py @@ -14,7 +14,6 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . -from io import BytesIO from collections import deque from datetime import datetime import asyncio @@ -23,7 +22,6 @@ import mimetypes import hashlib import logging -from PIL import Image import magic from telethon.tl.functions.messages import * @@ -33,7 +31,7 @@ from telethon.tl.types import * from mautrix_appservice import MatrixRequestError, IntentError from .db import Portal as DBPortal, Message as DBMessage -from . import puppet as p, user as u, formatter +from . import puppet as p, user as u, formatter, util mimetypes.init() @@ -44,11 +42,13 @@ class Portal: log = logging.getLogger("mau.portal") db = None az = None + bot = None + bridge_notices = False by_mxid = {} by_tgid = {} def __init__(self, tgid, peer_type, tg_receiver=None, mxid=None, username=None, title=None, - about=None, photo_id=None, save_to_cache=True): + about=None, photo_id=None, db_instance=None): self.mxid = mxid self.tgid = tgid self.tg_receiver = tg_receiver or tgid @@ -57,17 +57,21 @@ class Portal: self.title = title self.about = about self.photo_id = photo_id + self._db_instance = db_instance + self._main_intent = None self._room_create_lock = asyncio.Lock() self._dedup = deque() self._dedup_mxid = {} + self._dedup_action = deque() - if save_to_cache: - if tgid: - self.by_tgid[self.tgid_full] = self - if mxid: - self.by_mxid[mxid] = self + if tgid: + self.by_tgid[self.tgid_full] = self + if mxid: + self.by_mxid[mxid] = self + + # region Propegrties @property def tgid_full(self): @@ -88,36 +92,61 @@ class Portal: elif self.peer_type == "channel": return PeerChannel(channel_id=self.tgid) - def _hash_event(self, event): - if self.peer_type == "channel": - # Message IDs are unique per-channel - return event.id + @property + def has_bot(self): + return self.bot and self.bot.is_in_chat(self.tgid) + @property + def main_intent(self): + if not self._main_intent: + direct = self.peer_type == "user" + puppet = p.Puppet.get(self.tgid) if direct else None + self._main_intent = puppet.intent if direct else self.az.intent + return self._main_intent + + # endregion + # region Deduplication + + @staticmethod + def _hash_event(event): # Non-channel messages are unique per-user (wtf telegram), so we have no other choice than # to deduplicate based on a hash of the message content. - # The timestamp is only accurate to the second, so we can't rely on solely that either. - hash_content = [event.date.timestamp(), event.message] - if event.fwd_from: - hash_content += [event.fwd_from.from_id, event.fwd_from.channel_id] - elif isinstance(event, Message) and event.media: - try: - hash_content += { - MessageMediaContact: lambda media: [media.user_id], - MessageMediaDocument: lambda media: [media.document.id, media.caption], - MessageMediaPhoto: lambda media: [media.photo.id, media.caption], - MessageMediaGeo: lambda media: [media.geo.long, media.geo.lat], - }[type(event.media)](event.media) - except KeyError: - pass - + # The timestamp is only accurate to the second, so we can't rely solely on that either. + if isinstance(event, MessageService): + hash_content = [event.date.timestamp(), event.from_id, event.action] + else: + hash_content = [event.date.timestamp(), event.message] + if event.fwd_from: + hash_content += [event.fwd_from.from_id, event.fwd_from.channel_id] + elif isinstance(event, Message) and event.media: + try: + hash_content += { + MessageMediaContact: lambda media: [media.user_id], + MessageMediaDocument: lambda media: [media.document.id, media.caption], + MessageMediaPhoto: lambda media: [media.photo.id, media.caption], + MessageMediaGeo: lambda media: [media.geo.long, media.geo.lat], + }[type(event.media)](event.media) + except KeyError: + pass return hashlib.md5("-" .join(str(a) for a in hash_content) .encode("utf-8") ).hexdigest() - def is_duplicate(self, event, mxid=None): - hash = self._hash_event(event) + def is_duplicate_action(self, event): + hash = self._hash_event(event) if self.peer_type != "channel" else event.id + if hash in self._dedup_action: + return True + + self._dedup_action.append(hash) + + if len(self._dedup_action) > 20: + self._dedup_action.popleft() + return False + + def is_duplicate(self, event, mxid=None, force_hash=False): + hash = self._hash_event(event) if self.peer_type != "channel" or force_hash else event.id if hash in self._dedup: return self._dedup_mxid[hash] @@ -131,17 +160,10 @@ class Portal: def get_input_entity(self, user): return user.client.get_input_entity(self.peer) + # endregion # region Matrix room info updating - @property - def main_intent(self): - if not self._main_intent: - direct = self.peer_type == "user" - puppet = p.Puppet.get(self.tgid) if direct else None - self._main_intent = puppet.intent if direct else self.az.intent - return self._main_intent - - async def invite_matrix(self, users): + async def invite_to_matrix(self, users): if isinstance(users, str): await self.main_intent.invite(self.mxid, users, check_cache=True) elif isinstance(users, list): @@ -150,12 +172,12 @@ class Portal: else: raise ValueError("Invalid invite identifier given to invite_matrix()") - async def update_after_create(self, user, entity, direct, puppet=None, - levels=None, users=None, participants=None): + async def update_matrix_room(self, user, entity, direct, puppet=None, + levels=None, users=None, participants=None): if not direct: await self.update_info(user, entity) if not users or not participants: - users, participants = await self.get_users(user, entity) + users, participants = await self._get_users(user, entity) await self.sync_telegram_users(user, users) await self.update_telegram_participants(participants, levels) else: @@ -169,8 +191,8 @@ class Portal: if update_if_exists: if not entity: entity = await user.client.get_entity(self.peer) - await self.update_after_create(user, entity, self.peer_type == "user") - await self.invite_matrix(invites or []) + await self.update_matrix_room(user, entity, self.peer_type == "user") + await self.invite_to_matrix(invites or []) return self.mxid async with self._room_create_lock: return await self._create_matrix_room(user, entity, invites) @@ -196,8 +218,7 @@ class Portal: self._main_intent = puppet.intent if direct else self.az.intent if self.peer_type == "channel" and entity.username: - # TODO make public once safe - public = False + public = True alias = self._get_room_alias(entity.username) self.username = entity.username else: @@ -206,13 +227,13 @@ class Portal: alias = None if alias: - # TODO properly handle existing room aliases + # TODO? properly handle existing room aliases await self.main_intent.remove_room_alias(alias) power_levels = self._get_base_power_levels({}, entity) users = participants = None if not direct: - users, participants = await self.get_users(user, entity) + users, participants = await self._get_users(user, entity) self._participants_to_power_levels(participants, power_levels) initial_state = [{ "type": "m.room.power_levels", @@ -230,8 +251,8 @@ class Portal: self.save() self.az.state_store.set_power_levels(self.mxid, power_levels) user.register_portal(self) - await self.update_after_create(user, entity, direct, puppet, - levels=power_levels, users=users, participants=participants) + await self.update_matrix_room(user, entity, direct, puppet, + levels=power_levels, users=users, participants=participants) def _get_base_power_levels(self, levels=None, entity=None): levels = levels or {} @@ -246,6 +267,8 @@ class Portal: levels["events"]["m.room.topic"] = 50 if self.peer_type == "channel" else 99 levels["events"]["m.room.power_levels"] = 75 levels["events"]["m.room.history_visibility"] = 75 + levels["state_default"] = 50 + levels["users_default"] = 0 levels["events_default"] = (50 if self.peer_type == "channel" and not entity.megagroup else 0) if "users" not in levels: @@ -260,11 +283,32 @@ class Portal: groupname=username) async def sync_telegram_users(self, source, users): + allowed_tgids = set() for entity in users: puppet = p.Puppet.get(entity.id) + if self.bot and puppet.tgid == self.bot.tgid: + self.bot.add_chat(self.tgid, self.peer_type) + allowed_tgids.add(entity.id) await puppet.intent.ensure_joined(self.mxid) await puppet.update_info(source, entity) + joined_mxids = await self.main_intent.get_room_members(self.mxid) + for user in joined_mxids: + if user == self.az.intent.mxid: + continue + puppet_id = p.Puppet.get_id_from_mxid(user) + if puppet_id and puppet_id not in allowed_tgids: + if self.bot and puppet_id == self.bot.tgid: + self.bot.remove_chat(self.tgid) + await self.main_intent.kick(self.mxid, user, + "User had left this Telegram chat.") + continue + mx_user = u.User.get_by_mxid(user, create=False) + if mx_user and not self.has_bot and mx_user.tgid not in allowed_tgids: + await self.main_intent.kick(self.mxid, mx_user.mxid, + "You had left this Telegram chat.") + continue + async def add_telegram_user(self, user_id, source=None): puppet = p.Puppet.get(user_id) if source: @@ -319,6 +363,9 @@ class Portal: self.username = username or None if self.username: await self.main_intent.add_room_alias(self.mxid, self._get_room_alias()) + await self.main_intent.set_join_rule(self.mxid, "public") + else: + await self.main_intent.set_join_rule(self.mxid, "invite") return True return False @@ -344,26 +391,32 @@ class Portal: async def update_avatar(self, user, photo): photo_id = f"{photo.volume_id}-{photo.local_id}" if self.photo_id != photo_id: - try: - file = await user.client.download_file_bytes(photo) - except LocationInvalidError: - return False - uploaded = await self.main_intent.upload_file(file) - await self.main_intent.set_room_avatar(self.mxid, uploaded["content_uri"]) - self.photo_id = photo_id - return True + file = await util.transfer_file_to_matrix(self.db, user.client, self.main_intent, + photo) + if file: + await self.main_intent.set_avatar(file.mxc) + self.photo_id = photo_id + return True return False - async def get_users(self, user, entity): + async def _get_users(self, user, entity): if self.peer_type == "chat": chat = await user.client(GetFullChatRequest(chat_id=self.tgid)) return chat.users, chat.full_chat.participants.participants elif self.peer_type == "channel": try: - participants = await user.client(GetParticipantsRequest( - entity, ChannelParticipantsRecent(), offset=0, limit=100, hash=0 - )) - return participants.users, participants.participants + users, participants = [], [] + offset = 0 + while True: + response = await user.client(GetParticipantsRequest( + entity, ChannelParticipantsSearch(""), offset=offset, limit=100, hash=0 + )) + if not response.users: + break + participants += response.participants + users += response.users + offset += len(response.users) + return users, participants except ChatAdminRequiredError: return [], [] elif self.peer_type == "user": @@ -396,7 +449,7 @@ class Portal: for member in members: if p.Puppet.get_id_from_mxid(member) or member == self.main_intent.mxid: continue - user = u.User.get_by_mxid(member) + user = await u.User.get_by_mxid(member).ensure_started() if user.has_full_access: authenticated.append(user) return authenticated @@ -455,22 +508,42 @@ class Portal: channel = await self.get_input_entity(user) await user.client(LeaveChannelRequest(channel=channel)) + async def join_matrix(self, user): + if self.peer_type == "channel": + await user.client(JoinChannelRequest(channel=await self.get_input_entity(user))) + else: + # We'll just assume the user is already in the chat. + pass + async def handle_matrix_message(self, sender, message, event_id): type = message["msgtype"] - space = self.tgid if self.peer_type == "channel" else sender.tgid + if sender.logged_in: + client = sender.client + space = self.tgid if self.peer_type == "channel" else sender.tgid + else: + client = self.bot.client + space = self.tgid if self.peer_type == "channel" else self.bot.tgid reply_to = formatter.matrix_reply_to_telegram(message, space, room_id=self.mxid) - if type in {"m.text", "m.emote"}: + + if type == "m.emote": + if "formatted_body" in message: + message["formatted_body"] = f"* {sender.displayname} {message['formatted_body']}" + message["body"] = f"* {sender.displayname} {message['body']}" + type = "m.text" + elif not sender.logged_in: + if "formatted_body" in message: + message["formatted_body"] = (f"<{sender.displayname}> " + f"{message['formatted_body']}") + message["body"] = f"<{sender.displayname}> {message['body']}" + + if type == "m.text" or (self.bridge_notices and type == "m.notice"): if "format" in message and message["format"] == "org.matrix.custom.html": message, entities = formatter.matrix_to_telegram(message["formatted_body"]) - if type == "m.emote": - message = "/me " + message - response = await sender.client.send_message(self.peer, message, entities=entities, - reply_to=reply_to) + response = await client.send_message(self.peer, message, entities=entities, + reply_to=reply_to) else: - if type == "m.emote": - message["body"] = "/me " + message["body"] - response = await sender.client.send_message(self.peer, message["body"], - reply_to=reply_to) + response = await client.send_message(self.peer, message["body"], + reply_to=reply_to) elif type in {"m.image", "m.file", "m.audio", "m.video"}: file = await self.main_intent.download_file(message["url"]) @@ -483,8 +556,8 @@ class Portal: if "w" in info and "h" in info: attributes.append(DocumentAttributeImageSize(w=info["w"], h=info["h"])) - response = await sender.client.send_file(self.peer, file, mime, caption, attributes, - file_name, reply_to=reply_to) + response = await client.send_file(self.peer, file, mime, caption, attributes, + file_name, reply_to=reply_to) else: self.log.debug("Unhandled Matrix event: %s", message) return @@ -498,8 +571,8 @@ class Portal: async def handle_matrix_deletion(self, deleter, event_id): space = self.tgid if self.peer_type == "channel" else deleter.tgid - message = DBMessage.query.filter(DBMessage.mxid == event_id and - DBMessage.tg_space == space and + message = DBMessage.query.filter(DBMessage.mxid == event_id, + DBMessage.tg_space == space, DBMessage.mx_room == self.mxid).one_or_none() if not message: return @@ -545,10 +618,11 @@ class Portal: return if self.peer_type == "chat": - await sender.client(EditChatTitleRequest(chat_id=self.tgid, title=title)) + response = await sender.client(EditChatTitleRequest(chat_id=self.tgid, title=title)) else: channel = await self.get_input_entity(sender) - await sender.client(EditTitleRequest(channel=channel, title=title)) + response = await sender.client(EditTitleRequest(channel=channel, title=title)) + self._register_outgoing_actions_for_dedup(response) self.title = title self.save() @@ -564,11 +638,12 @@ class Portal: photo = InputChatUploadedPhoto(file=uploaded) if self.peer_type == "chat": - updates = await sender.client(EditChatPhotoRequest(chat_id=self.tgid, photo=photo)) + response = await sender.client(EditChatPhotoRequest(chat_id=self.tgid, photo=photo)) else: channel = await self.get_input_entity(sender) - updates = await sender.client(EditPhotoRequest(channel=channel, photo=photo)) - for update in updates.updates: + response = await sender.client(EditPhotoRequest(channel=channel, photo=photo)) + self._register_outgoing_actions_for_dedup(response) + for update in response.updates: is_photo_update = (isinstance(update, UpdateNewMessage) and isinstance(update.message, MessageService) and isinstance(update.message.action, MessageActionChatEditPhoto)) @@ -578,6 +653,13 @@ class Portal: self.save() break + def _register_outgoing_actions_for_dedup(self, response): + for update in response.updates: + check_dedup = (isinstance(update, (UpdateNewMessage, UpdateNewChannelMessage)) + and isinstance(update.message, MessageService)) + if check_dedup: + self.is_duplicate_action(update.message) + # endregion # region Telegram chat info updating @@ -627,7 +709,6 @@ class Portal: invites = await self._get_telegram_users_in_matrix_room() if len(invites) < 2: - # TODO[waiting-for-bots] This won't happen when the bot is enabled raise ValueError("Not enough Telegram users to create a chat") if self.peer_type == "chat": @@ -650,6 +731,9 @@ class Portal: await self.update_info(source, entity) self.save() + if self.bot and self.bot.mxid in invites: + self.bot.add_chat(self.tgid, self.peer_type) + levels = await self.main_intent.get_power_levels(self.mxid) levels = self._get_base_power_levels(levels, entity) already_saved = await self.handle_matrix_power_levels(source, levels["users"], {}) @@ -674,50 +758,37 @@ class Portal: async def handle_telegram_photo(self, source, intent, media, relates_to=None): largest_size = self._get_largest_photo_size(media.photo) - file = await source.client.download_file_bytes(largest_size.location) - mime_type = magic.from_buffer(file, mime=True) - uploaded = await intent.upload_file(file, mime_type) + file = await util.transfer_file_to_matrix(self.db, source.client, intent, + largest_size.location) + if not file: + return None info = { "h": largest_size.h, "w": largest_size.w, "size": len(largest_size.bytes) if ( isinstance(largest_size, PhotoCachedSize)) else largest_size.size, "orientation": 0, - "mimetype": mime_type, + "mimetype": file.mime_type, } name = media.caption await intent.set_typing(self.mxid, is_typing=False) - return await intent.send_image(self.mxid, uploaded["content_uri"], info=info, - text=name, relates_to=relates_to) - - def convert_webp(self, file, to="png"): - try: - image = Image.open(BytesIO(file)).convert("RGBA") - new_file = BytesIO() - image.save(new_file, to) - return f"image/{to}", new_file.getvalue() - except Exception: - self.log.exception(f"Failed to convert webp to {to}") - return "image/webp", file + return await intent.send_image(self.mxid, file.mxc, info=info, text=name, + relates_to=relates_to) async def handle_telegram_document(self, source, intent, media, relates_to=None): - file = await source.client.download_file_bytes(media.document) - mime_type = magic.from_buffer(file, mime=True) - dont_change_mime = False - if mime_type == "image/webp": - mime_type, file = self.convert_webp(file, to="png") - dont_change_mime = True - uploaded = await intent.upload_file(file, mime_type) + file = await util.transfer_file_to_matrix(self.db, source.client, intent, media.document) + if not file: + return None name = media.caption for attr in media.document.attributes: if not name and isinstance(attr, DocumentAttributeFilename): name = attr.file_name - if not dont_change_mime: + if not file.was_converted: (mime_from_name, _) = mimetypes.guess_type(name) - mime_type = mime_from_name or mime_type + file.mime_type = mime_from_name or file.mime_type elif isinstance(attr, DocumentAttributeSticker): name = f"Sticker for {attr.alt}" - mime_type = media.document.mime_type or mime_type + mime_type = media.document.mime_type or file.mime_type info = { "size": media.document.size, "mimetype": mime_type, @@ -730,8 +801,8 @@ class Portal: elif mime_type.startswith("image/"): type = "m.image" await intent.set_typing(self.mxid, is_typing=False) - return await intent.send_file(self.mxid, uploaded["content_uri"], info=info, - text=name, file_type=type, relates_to=relates_to) + return await intent.send_file(self.mxid, file.mxc, info=info, text=name, file_type=type, + relates_to=relates_to) def handle_telegram_location(self, source, intent, location, relates_to=None): long = location.long @@ -761,7 +832,7 @@ class Portal: async def handle_telegram_text(self, source, intent, evt): self.log.debug(f"Sending {evt.message} to {self.mxid} by {intent.mxid}") - text, html, relates_to = await formatter.telegram_event_to_matrix( + text, html, relates_to = await formatter.telegram_to_matrix( evt, source, config["bridge.native_replies"], config["bridge.link_in_reply"], @@ -778,7 +849,7 @@ class Portal: tg_space = self.tgid if self.peer_type == "channel" else source.tgid temporary_identifier = f"${random.randint(1000000000000,9999999999999)}TGBRIDGEDITEMP" - duplicate_found = self.is_duplicate(evt, (temporary_identifier, tg_space)) + duplicate_found = self.is_duplicate(evt, (temporary_identifier, tg_space), force_hash=True) if duplicate_found: mxid, other_tg_space = duplicate_found if tg_space != other_tg_space: @@ -789,7 +860,7 @@ class Portal: return evt.reply_to_msg_id = evt.id - text, html, relates_to = await formatter.telegram_event_to_matrix( + text, html, relates_to = await formatter.telegram_to_matrix( evt, source, config["bridge.native_replies"], config["bridge.link_in_reply"], @@ -848,6 +919,9 @@ class Portal: self.log.debug("Unhandled Telegram message: %s", evt) return + if not response: + return + mxid = response["event_id"] DBMessage.query \ .filter(DBMessage.mx_room == self.mxid, @@ -856,7 +930,8 @@ class Portal: self.db.add(DBMessage(tgid=evt.id, mx_room=self.mxid, mxid=mxid, tg_space=tg_space)) self.db.commit() - async def handle_telegram_action(self, source, sender, action): + async def handle_telegram_action(self, source, sender, update): + action = update.action if not self.mxid: create_and_exit = (MessageActionChatCreate, MessageActionChannelCreate) create_and_continue = (MessageActionChatAddUser, MessageActionChatJoinedByLink) @@ -866,6 +941,9 @@ class Portal: if not isinstance(action, create_and_continue): return + if self.is_duplicate_action(update): + return + # TODO figure out how to see changes to about text / channel username if isinstance(action, MessageActionChatEditTitle): if await self.update_title(action.title): @@ -890,7 +968,10 @@ class Portal: else: self.log.debug("Unhandled Telegram action in %s: %s", self.title, action) - async def set_telegram_admin(self, puppet, user): + async def set_telegram_admin(self, user_id): + puppet = p.Puppet.get(user_id) + user = await u.User.get_by_tgid(user_id) + levels = await self.main_intent.get_power_levels(self.mxid) if user: levels["users"][user.mxid] = 50 @@ -960,13 +1041,16 @@ class Portal: # endregion # region Database conversion - def to_db(self, merge=True): - portal = DBPortal(tgid=self.tgid, tg_receiver=self.tg_receiver, peer_type=self.peer_type, - mxid=self.mxid, username=self.username, title=self.title, - about=self.about, photo_id=self.photo_id) - if merge: - return self.db.merge(portal) - return portal + @property + def db_instance(self): + if not self._db_instance: + self._db_instance = self.new_db_instance() + return self._db_instance + + def new_db_instance(self): + return DBPortal(tgid=self.tgid, tg_receiver=self.tg_receiver, peer_type=self.peer_type, + mxid=self.mxid, username=self.username, title=self.title, + about=self.about, photo_id=self.photo_id) def migrate_and_save(self, new_id): existing = DBPortal.query.get(self.tgid_full) @@ -982,11 +1066,20 @@ class Portal: self.save() def save(self): - self.to_db() + self.db_instance.mxid = self.mxid + self.db_instance.username = self.username + self.db_instance.title = self.title + self.db_instance.about = self.about + self.db_instance.photo_id = self.photo_id self.db.commit() def delete(self): - self.db.delete(self.to_db()) + try: + del self.by_tgid[self.tgid_full] + del self.by_mxid[self.mxid] + except KeyError: + pass + self.db.delete(self.db_instance) self.db.commit() @classmethod @@ -994,7 +1087,8 @@ class Portal: return Portal(tgid=db_portal.tgid, tg_receiver=db_portal.tg_receiver, peer_type=db_portal.peer_type, mxid=db_portal.mxid, username=db_portal.username, title=db_portal.title, - about=db_portal.about, photo_id=db_portal.photo_id) + about=db_portal.about, photo_id=db_portal.photo_id, + db_instance=db_portal) # endregion # region Class instance lookup @@ -1026,11 +1120,9 @@ class Portal: return cls.from_db(portal) if peer_type: - portal = Portal(tgid, peer_type=peer_type, tg_receiver=tg_receiver, - save_to_cache=False) - cls.db.add(portal.to_db(merge=False)) + portal = Portal(tgid, peer_type=peer_type, tg_receiver=tg_receiver) + cls.db.add(portal.db_instance) cls.db.commit() - cls.by_tgid[portal.tgid_full] = portal return portal return None @@ -1065,4 +1157,5 @@ class Portal: def init(context): global config - Portal.az, Portal.db, config, _ = context + Portal.az, Portal.db, config, _, Portal.bot = context + Portal.bridge_notices = config["bridge.bridge_notices"] diff --git a/mautrix_telegram/puppet.py b/mautrix_telegram/puppet.py index 596bcd2e..a316a48d 100644 --- a/mautrix_telegram/puppet.py +++ b/mautrix_telegram/puppet.py @@ -18,10 +18,11 @@ from difflib import SequenceMatcher import re import logging -from telethon.tl.types import UserProfilePhoto, PeerUser +from telethon.tl.types import UserProfilePhoto from telethon.errors.rpc_error_list import LocationInvalidError from .db import Puppet as DBPuppet +from . import util config = None @@ -31,18 +32,19 @@ class Puppet: db = None az = None mxid_regex = None + username_template = None + hs_domain = None cache = {} - def __init__(self, id=None, username=None, displayname=None, photo_id=None): + def __init__(self, id=None, username=None, displayname=None, photo_id=None, db_instance=None): self.id = id + self.mxid = self.get_mxid_from_id(self.id) - self.localpart = config.get("bridge.username_template", "telegram_{userid}").format( - userid=self.id) - hs = config["homeserver"]["domain"] - self.mxid = f"@{self.localpart}:{hs}" self.username = username self.displayname = displayname self.photo_id = photo_id + self._db_instance = db_instance + self.intent = self.az.intent.user(self.mxid) self.cache[id] = self @@ -51,17 +53,25 @@ class Puppet: def tgid(self): return self.id - def to_db(self): - return self.db.merge( - DBPuppet(id=self.id, username=self.username, displayname=self.displayname, - photo_id=self.photo_id)) + @property + def db_instance(self): + if not self._db_instance: + self._db_instance = self.new_db_instance() + return self._db_instance + + def new_db_instance(self): + return DBPuppet(id=self.id, username=self.username, displayname=self.displayname, + photo_id=self.photo_id) @classmethod def from_db(cls, db_puppet): - return Puppet(db_puppet.id, db_puppet.username, db_puppet.displayname, db_puppet.photo_id) + return Puppet(db_puppet.id, db_puppet.username, db_puppet.displayname, db_puppet.photo_id, + db_instance=db_puppet) def save(self): - self.to_db() + self.db_instance.username = self.username + self.db_instance.displayname = self.displayname + self.db_instance.photo_id = self.photo_id self.db.commit() def similarity(self, query): @@ -120,14 +130,11 @@ class Puppet: async def update_avatar(self, source, photo): photo_id = f"{photo.volume_id}-{photo.local_id}" if self.photo_id != photo_id: - try: - file = await source.client.download_file_bytes(photo) - except LocationInvalidError: - return False - uploaded = await self.intent.upload_file(file) - await self.intent.set_avatar(uploaded["content_uri"]) - self.photo_id = photo_id - return True + file = await util.transfer_file_to_matrix(self.db, source.client, self.intent, photo) + if file: + await self.intent.set_avatar(file.mxc) + self.photo_id = photo_id + return True return False @classmethod @@ -143,7 +150,7 @@ class Puppet: if create: puppet = cls(id) - cls.db.add(puppet.to_db()) + cls.db.add(puppet.db_instance) cls.db.commit() return puppet @@ -161,6 +168,10 @@ class Puppet: return int(match.group(1)) return None + @classmethod + def get_mxid_from_id(cls, id): + return f"@{cls.username_template.format(userid=id)}:{cls.hs_domain}" + @classmethod def find_by_username(cls, username): for _, puppet in cls.cache.items(): @@ -176,7 +187,8 @@ class Puppet: def init(context): global config - Puppet.az, Puppet.db, config, _ = context - localpart = config.get("bridge.username_template", "telegram_{userid}").format(userid="(.+)") - hs = config["homeserver"]["domain"] - Puppet.mxid_regex = re.compile(f"@{localpart}:{hs}") + Puppet.az, Puppet.db, config, _, _ = context + Puppet.username_template = config.get("bridge.username_template", "telegram_{userid}") + Puppet.hs_domain = config["homeserver"]["domain"] + localpart = Puppet.username_template.format(userid="(.+)") + Puppet.mxid_regex = re.compile(f"@{localpart}:{Puppet.hs_domain}") diff --git a/mautrix_telegram/user.py b/mautrix_telegram/user.py index b1cc5aea..dd77893b 100644 --- a/mautrix_telegram/user.py +++ b/mautrix_telegram/user.py @@ -16,31 +16,28 @@ # along with this program. If not, see . import logging import asyncio -import platform +import re from telethon.tl.types import * from telethon.tl.types.contacts import ContactsNotModified -from telethon.tl.types import User as TLUser from telethon.tl.functions.contacts import GetContactsRequest, SearchRequest from mautrix_appservice import MatrixRequestError -from .db import User as DBUser, Message as DBMessage, Contact as DBContact -from .tgclient import MautrixTelegramClient -from . import portal as po, puppet as pu, __version__ +from .db import User as DBUser, Contact as DBContact +from .abstract_user import AbstractUser +from . import portal as po, puppet as pu config = None -class User: - loop = None +class User(AbstractUser): log = logging.getLogger("mau.user") - db = None - az = None by_mxid = {} by_tgid = {} def __init__(self, mxid, tgid=None, username=None, db_contacts=None, saved_contacts=0, - db_portals=None): + db_portals=None, db_instance=None): + super().__init__() self.mxid = mxid self.tgid = tgid self.username = username @@ -49,11 +46,9 @@ class User: self.db_contacts = db_contacts self.portals = {} self.db_portals = db_portals + self._db_instance = db_instance self.command_status = None - self.connected = False - self.client = None - self._init_client() self.is_admin = self.mxid in config.get("bridge.admins", []) @@ -67,13 +62,17 @@ class User: if tgid: self.by_tgid[tgid] = self - @property - def logged_in(self): - return self.client.is_user_authorized() + self._init_client() @property - def has_full_access(self): - return self.logged_in and self.whitelisted + def name(self): + return self.mxid + + @property + def displayname(self): + # TODO show better username + match = re.compile("@(.+):(.+)").match(self.mxid) + return match.group(1) @property def db_contacts(self): @@ -89,7 +88,7 @@ class User: @property def db_portals(self): - return [portal.to_db(merge=False) for _, portal in self.portals.items()] + return [portal.db_instance for portal in self.portals.values()] @db_portals.setter def db_portals(self, portals): @@ -102,14 +101,23 @@ class User: # region Database conversion - def to_db(self): - return self.db.merge( - DBUser(mxid=self.mxid, tgid=self.tgid, tg_username=self.username, - contacts=self.db_contacts, saved_contacts=self.saved_contacts, - portals=self.db_portals)) + @property + def db_instance(self): + if not self._db_instance: + self._db_instance = self.new_db_instance() + return self._db_instance + + def new_db_instance(self): + return DBUser(mxid=self.mxid, tgid=self.tgid, tg_username=self.username, + contacts=self.db_contacts, saved_contacts=self.saved_contacts, + portals=self.db_portals) def save(self): - self.to_db() + self.db_instance.tgid = self.tgid + self.db_instance.username = self.username + self.db_instance.contacts = self.db_contacts + self.db_instance.saved_contacts = self.saved_contacts + self.db_instance.portals = self.db_portals self.db.commit() def delete(self): @@ -118,34 +126,24 @@ class User: del self.by_tgid[self.tgid] except KeyError: pass - self.db.delete(self.to_db()) + self.db.delete(self.db_instance) self.db.commit() @classmethod def from_db(cls, db_user): return User(db_user.mxid, db_user.tgid, db_user.tg_username, db_user.contacts, - db_user.saved_contacts, db_user.portals) + db_user.saved_contacts, db_user.portals, db_instance=db_user) # endregion # region Telegram connection management - def _init_client(self): - device = f"{platform.system()} {platform.release()}" - sysversion = MautrixTelegramClient.__version__ - self.client = MautrixTelegramClient(self.mxid, - config["telegram.api_id"], - config["telegram.api_hash"], - loop=self.loop, - app_version=__version__, - system_version=sysversion, - device_model=device) - self.client.add_update_handler(self.update_catch) - async def start(self, delete_unless_authenticated=False): - self.connected = await self.client.connect() + await super().start() if self.logged_in: + self.log.debug(f"Ensuring post_login() for {self.name}") asyncio.ensure_future(self.post_login(), loop=self.loop) elif delete_unless_authenticated: + self.log.debug(f"Unauthenticated user {self.name} start()ed, deleting...") # User not logged in -> forget user self.client.disconnect() self.client.session.delete() @@ -160,11 +158,6 @@ class User: except Exception: self.log.exception("Failed to run post-login functions") - def stop(self): - self.client.disconnect() - self.client = None - self.connected = False - # endregion # region Telegram actions that need custom methods @@ -234,14 +227,8 @@ class User: return await self._search_remote(query), True async def sync_dialogs(self): - dialogs = await self.client.get_dialogs(limit=30) creators = [] - for dialog in dialogs: - entity = dialog.entity - invalid = (isinstance(entity, (TLUser, ChatForbidden, ChannelForbidden)) - or (isinstance(entity, Chat) and (entity.deactivated or entity.left))) - if invalid: - continue + for entity in await self._get_dialogs(limit=30): portal = po.Portal.get_by_entity(entity) self.portals[portal.tgid_full] = portal creators.append(portal.create_matrix_room(self, entity, invites=[self.mxid])) @@ -283,135 +270,6 @@ class User: self.contacts.append(puppet) self.save() - # endregion - # region Telegram update handling - - async def update_catch(self, update): - try: - await self.update(update) - except Exception: - self.log.exception("Failed to handle Telegram update") - - async def update(self, update): - if isinstance(update, (UpdateShortChatMessage, UpdateShortMessage, UpdateNewChannelMessage, - UpdateNewMessage, UpdateEditMessage, UpdateEditChannelMessage)): - await self.update_message(update) - elif isinstance(update, (UpdateChatUserTyping, UpdateUserTyping)): - await self.update_typing(update) - elif isinstance(update, UpdateUserStatus): - await self.update_status(update) - elif isinstance(update, (UpdateChatAdmins, UpdateChatParticipantAdmin)): - await self.update_admin(update) - elif isinstance(update, UpdateChatParticipants): - portal = po.Portal.get_by_tgid(update.participants.chat_id) - if portal and portal.mxid: - await portal.update_telegram_participants(update.participants.participants) - elif isinstance(update, UpdateChannelPinnedMessage): - portal = po.Portal.get_by_tgid(update.channel_id) - if portal and portal.mxid: - await portal.update_telegram_pin(self, update.id) - elif isinstance(update, (UpdateUserName, UpdateUserPhoto)): - await self.update_others_info(update) - elif isinstance(update, UpdateReadHistoryOutbox): - await self.update_read_receipt(update) - else: - self.log.debug("Unhandled update: %s", update) - - async def update_read_receipt(self, update): - if not isinstance(update.peer, PeerUser): - self.log.debug("Unexpected read receipt peer: %s", update.peer) - return - - portal = po.Portal.get_by_tgid(update.peer.user_id, self.tgid) - if not portal or not portal.mxid: - return - - # We check that these are user read receipts, so tg_space is always the user ID. - message = DBMessage.query.get((update.max_id, self.tgid)) - if not message: - return - - puppet = pu.Puppet.get(update.peer.user_id) - await puppet.intent.mark_read(portal.mxid, message.mxid) - - async def update_admin(self, update): - portal = po.Portal.get_by_tgid(update.chat_id, peer_type="chat") - if isinstance(update, UpdateChatAdmins): - await portal.set_telegram_admins_enabled(update.enabled) - elif isinstance(update, UpdateChatParticipantAdmin): - puppet = pu.Puppet.get(update.user_id) - user = User.get_by_tgid(update.user_id) - await portal.set_telegram_admin(puppet, user) - - async def update_typing(self, update): - if isinstance(update, UpdateUserTyping): - portal = po.Portal.get_by_tgid(update.user_id, self.tgid, "user") - else: - portal = po.Portal.get_by_tgid(update.chat_id, peer_type="chat") - sender = pu.Puppet.get(update.user_id) - await portal.handle_telegram_typing(sender, update) - - async def update_others_info(self, update): - puppet = pu.Puppet.get(update.user_id) - if isinstance(update, UpdateUserName): - if await puppet.update_displayname(self, update): - puppet.save() - elif isinstance(update, UpdateUserPhoto): - if await puppet.update_avatar(self, update.photo.photo_big): - puppet.save() - - async def update_status(self, update): - puppet = pu.Puppet.get(update.user_id) - if isinstance(update.status, UserStatusOnline): - await puppet.intent.set_presence("online") - elif isinstance(update.status, UserStatusOffline): - await puppet.intent.set_presence("offline") - else: - self.log.warning("Unexpected user status update: %s", update) - return - - def get_message_details(self, update): - if isinstance(update, UpdateShortChatMessage): - portal = po.Portal.get_by_tgid(update.chat_id, peer_type="chat") - sender = pu.Puppet.get(update.from_id) - elif isinstance(update, UpdateShortMessage): - portal = po.Portal.get_by_tgid(update.user_id, self.tgid, "user") - sender = pu.Puppet.get(self.tgid if update.out else update.user_id) - elif isinstance(update, (UpdateNewMessage, UpdateNewChannelMessage, - UpdateEditMessage, UpdateEditChannelMessage)): - update = update.message - if isinstance(update.to_id, PeerUser) and not update.out: - portal = po.Portal.get_by_tgid(update.from_id, peer_type="user", - tg_receiver=self.tgid) - else: - portal = po.Portal.get_by_entity(update.to_id, receiver_id=self.tgid) - sender = pu.Puppet.get(update.from_id) if update.from_id else None - else: - self.log.warning( - f"Unexpected message type in User#get_message_details: {type(update)}") - return update, None, None - return update, sender, portal - - def update_message(self, original_update): - update, sender, portal = self.get_message_details(original_update) - - if isinstance(update, MessageService): - if isinstance(update.action, MessageActionChannelMigrateFrom): - self.log.debug(f"Ignoring action %s to %s by %d", update.action, portal.tgid_log, - sender.id) - return - self.log.debug("Handling action %s to %s by %d", update.action, portal.tgid_log, - sender.id) - return portal.handle_telegram_action(self, sender, update.action) - - user = sender.tgid if sender else "admin" - if isinstance(original_update, (UpdateEditMessage, UpdateEditChannelMessage)): - self.log.debug("Handling edit %s to %s by %s", update, portal.tgid_log, user) - return portal.handle_telegram_edit(self, sender, update) - - self.log.debug("Handling message %s to %s by %s", update, portal.tgid_log, user) - return portal.handle_telegram_message(self, sender, update) - # endregion # region Class instance lookup @@ -425,14 +283,12 @@ class User: user = DBUser.query.get(mxid) if user: user = cls.from_db(user) - asyncio.ensure_future(user.start(), loop=cls.loop) return user if create: user = cls(mxid) - cls.db.add(user.to_db()) + cls.db.add(user.db_instance) cls.db.commit() - asyncio.ensure_future(user.start(), loop=cls.loop) return user return None @@ -447,7 +303,6 @@ class User: user = DBUser.query.filter(DBUser.tgid == tgid).one_or_none() if user: user = cls.from_db(user) - asyncio.ensure_future(user.start(), loop=cls.loop) return user return None @@ -468,7 +323,7 @@ class User: def init(context): global config - User.az, User.db, config, User.loop = context + config = context.config users = [User.from_db(user) for user in DBUser.query.all()] return [user.start(delete_unless_authenticated=True) for user in users] diff --git a/mautrix_telegram/util/__init__.py b/mautrix_telegram/util/__init__.py new file mode 100644 index 00000000..53d6aabd --- /dev/null +++ b/mautrix_telegram/util/__init__.py @@ -0,0 +1 @@ +from .file_transfer import transfer_file_to_matrix diff --git a/mautrix_telegram/util/file_transfer.py b/mautrix_telegram/util/file_transfer.py new file mode 100644 index 00000000..698cb874 --- /dev/null +++ b/mautrix_telegram/util/file_transfer.py @@ -0,0 +1,75 @@ +# -*- coding: future_fstrings -*- +# mautrix-telegram - A Matrix-Telegram puppeting bridge +# Copyright (C) 2018 Tulir Asokan +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +from io import BytesIO +import time +import logging + +import magic +from PIL import Image + +from telethon.tl.types import (Document, FileLocation, InputFileLocation, + InputDocumentFileLocation) +from telethon.errors import LocationInvalidError + +from ..db import TelegramFile as DBTelegramFile + +log = logging.getLogger("mau.util") + + +def _convert_webp(file, to="png"): + try: + image = Image.open(BytesIO(file)).convert("RGBA") + new_file = BytesIO() + image.save(new_file, to) + return f"image/{to}", new_file.getvalue() + except Exception: + log.exception(f"Failed to convert webp to {to}") + return "image/webp", file + + +async def transfer_file_to_matrix(db, client, intent, location): + if isinstance(location, (Document, InputDocumentFileLocation)): + id = f"{location.id}-{location.version}" + elif not isinstance(location, (FileLocation, InputFileLocation)): + id = f"{location.volume_id}-{location.local_id}" + else: + return None + + db_file = DBTelegramFile.query.get(id) + if db_file: + return db_file + + try: + file = await client.download_file_bytes(location) + except LocationInvalidError: + return None + mime_type = magic.from_buffer(file, mime=True) + + image_converted = False + if mime_type == "image/webp": + mime_type, file = _convert_webp(file, to="png") + image_converted = True + + uploaded = await intent.upload_file(file, mime_type) + + db_file = DBTelegramFile(id=id, mxc=uploaded["content_uri"], + mime_type=mime_type, was_converted=image_converted, + timestamp=int(time.time())) + db.add(db_file) + db.commit() + + return db_file diff --git a/setup.py b/setup.py index c7874097..ac7f42b0 100644 --- a/setup.py +++ b/setup.py @@ -17,12 +17,12 @@ setuptools.setup( install_requires=[ "aiohttp>=2.3.10,<3", - "SQLAlchemy>=1.2.2,<2", + "SQLAlchemy>=1.2.3,<2", "alembic>=0.9.7", "Markdown>=2.6.11,<3", "ruamel.yaml>=0.15.35,<0.16", "Pillow>=5.0.0,<6", - "future-fstrings>=0.4.1", + "future-fstrings>=0.4.2", "python-magic>=0.4.15,<0.5", ], dependency_links=[