From 50c6f2b0093b69bc2a834ea2c1c43c9439d74f99 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Tue, 21 Dec 2021 19:51:00 +0200 Subject: [PATCH] Add support for sponsored messages. Fixes #699 --- mautrix_telegram/db/portal.py | 21 ++- mautrix_telegram/db/upgrade/__init__.py | 2 +- .../db/upgrade/v02_sponsored_events.py | 25 ++++ mautrix_telegram/formatter/from_telegram.py | 27 ++-- mautrix_telegram/matrix.py | 31 ++--- mautrix_telegram/portal.py | 130 ++++++++++++++++-- mautrix_telegram/portal_util/__init__.py | 1 + .../portal_util/sponsored_message.py | 95 +++++++++++++ requirements.txt | 2 +- 9 files changed, 285 insertions(+), 49 deletions(-) create mode 100644 mautrix_telegram/db/upgrade/v02_sponsored_events.py create mode 100644 mautrix_telegram/portal_util/sponsored_message.py diff --git a/mautrix_telegram/db/portal.py b/mautrix_telegram/db/portal.py index be8a295d..622a8a66 100644 --- a/mautrix_telegram/db/portal.py +++ b/mautrix_telegram/db/portal.py @@ -22,7 +22,7 @@ from asyncpg import Record from attr import dataclass import attr -from mautrix.types import ContentURI, RoomID +from mautrix.types import ContentURI, EventID, RoomID from mautrix.util.async_db import Database from ..types import TelegramID @@ -45,6 +45,10 @@ class Portal: avatar_url: ContentURI | None encrypted: bool + sponsored_event_id: EventID | None + sponsored_event_ts: int | None + sponsored_msg_random_id: bytes | None + # Telegram chat metadata username: str | None title: str | None @@ -62,8 +66,8 @@ class Portal: return cls(**data) columns: ClassVar[str] = ( - "tgid, tg_receiver, peer_type, megagroup, mxid, avatar_url, encrypted, config, " - "username, title, about, photo_id" + "tgid, tg_receiver, peer_type, megagroup, mxid, avatar_url, encrypted, sponsored_event_id," + "sponsored_event_ts, sponsored_msg_random_id, username, title, about, photo_id, config" ) @classmethod @@ -100,6 +104,9 @@ class Portal: self.mxid, self.avatar_url, self.encrypted, + self.sponsored_event_id, + self.sponsored_event_ts, + self.sponsored_msg_random_id, self.username, self.title, self.about, @@ -110,8 +117,9 @@ class Portal: async def save(self) -> None: q = ( - "UPDATE portal SET mxid=$4, avatar_url=$5, encrypted=$6, username=$7, title=$8," - " about=$9, photo_id=$10, megagroup=$11, config=$12 " + "UPDATE portal SET mxid=$4, avatar_url=$5, encrypted=$6, sponsored_event_id=$7," + " sponsored_event_ts=$8, sponsored_msg_random_id=$9, username=$10," + " title=$11, about=$12, photo_id=$13, megagroup=$14, config=$15 " "WHERE tgid=$1 AND tg_receiver=$2 AND (peer_type=$3 OR true)" ) await self.db.execute(q, *self._values) @@ -129,8 +137,9 @@ class Portal: async def insert(self) -> None: q = ( "INSERT INTO portal (tgid, tg_receiver, peer_type, mxid, avatar_url, encrypted," + " sponsored_event_id, sponsored_event_ts, sponsored_msg_random_id," " username, title, about, photo_id, megagroup, config) " - "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)" + "VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)" ) await self.db.execute(q, *self._values) diff --git a/mautrix_telegram/db/upgrade/__init__.py b/mautrix_telegram/db/upgrade/__init__.py index 146e7134..2146830c 100644 --- a/mautrix_telegram/db/upgrade/__init__.py +++ b/mautrix_telegram/db/upgrade/__init__.py @@ -2,4 +2,4 @@ from mautrix.util.async_db import UpgradeTable upgrade_table = UpgradeTable() -from . import v01_initial_revision +from . import v01_initial_revision, v02_sponsored_events diff --git a/mautrix_telegram/db/upgrade/v02_sponsored_events.py b/mautrix_telegram/db/upgrade/v02_sponsored_events.py new file mode 100644 index 00000000..273319d1 --- /dev/null +++ b/mautrix_telegram/db/upgrade/v02_sponsored_events.py @@ -0,0 +1,25 @@ +# mautrix-telegram - A Matrix-Telegram puppeting bridge +# Copyright (C) 2021 Tulir Asokan +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +from asyncpg import Connection + +from . import upgrade_table + + +@upgrade_table.register(description="Add column to store sponsored message event ID in channels") +async def upgrade_v2(conn: Connection) -> None: + await conn.execute("ALTER TABLE portal ADD COLUMN sponsored_event_id TEXT") + await conn.execute("ALTER TABLE portal ADD COLUMN sponsored_event_ts BIGINT") + await conn.execute("ALTER TABLE portal ADD COLUMN sponsored_msg_random_id bytea") diff --git a/mautrix_telegram/formatter/from_telegram.py b/mautrix_telegram/formatter/from_telegram.py index 00765b1c..e650f3c7 100644 --- a/mautrix_telegram/formatter/from_telegram.py +++ b/mautrix_telegram/formatter/from_telegram.py @@ -43,6 +43,7 @@ from telethon.tl.types import ( PeerChannel, PeerChat, PeerUser, + SponsoredMessage, TypeMessageEntity, ) @@ -175,7 +176,7 @@ async def _add_reply_header( async def telegram_to_matrix( - evt: Message, + evt: Message | SponsoredMessage, source: au.AbstractUser, main_intent: IntentAPI | None = None, prefix_text: str | None = None, @@ -183,6 +184,7 @@ async def telegram_to_matrix( override_text: str = None, override_entities: list[TypeMessageEntity] = None, no_reply_fallback: bool = False, + require_html: bool = False, ) -> TextMessageEventContent: content = TextMessageEventContent( msgtype=MessageType.TEXT, @@ -191,33 +193,34 @@ async def telegram_to_matrix( entities = override_entities or evt.entities if entities: content.format = Format.HTML - content.formatted_body = await _telegram_entities_to_matrix_catch(content.body, entities) + html = await _telegram_entities_to_matrix_catch(add_surrogate(content.body), entities) + content.formatted_body = del_surrogate(html).replace("\n", "
") - if prefix_html: + def force_html(): if not content.formatted_body: content.format = Format.HTML content.formatted_body = escape(content.body) + + if require_html: + force_html() + + if prefix_html: + force_html() content.formatted_body = prefix_html + content.formatted_body if prefix_text: content.body = prefix_text + content.body - if evt.fwd_from: + if getattr(evt, "fwd_from", None): await _add_forward_header(source, content, evt.fwd_from) - if evt.reply_to and not no_reply_fallback: + if getattr(evt, "reply_to", None) and not no_reply_fallback: await _add_reply_header(source, content, evt, main_intent) if isinstance(evt, Message) and evt.post and evt.post_author: - if not content.formatted_body: - content.formatted_body = escape(content.body) + force_html() content.body += f"\n- {evt.post_author}" content.formatted_body += f"
- {evt.post_author}" - content.body = del_surrogate(content.body) - - if content.formatted_body: - content.formatted_body = del_surrogate(content.formatted_body.replace("\n", "
")) - return content diff --git a/mautrix_telegram/matrix.py b/mautrix_telegram/matrix.py index dd9208db..7ff88c59 100644 --- a/mautrix_telegram/matrix.py +++ b/mautrix_telegram/matrix.py @@ -17,6 +17,7 @@ from __future__ import annotations from typing import TYPE_CHECKING, Iterable +from mautrix.appservice import DOUBLE_PUPPET_SOURCE_KEY from mautrix.bridge import BaseMatrixHandler from mautrix.errors import MatrixError from mautrix.types import ( @@ -35,6 +36,7 @@ from mautrix.types import ( RoomID, RoomNameStateEventContent as NameContent, RoomTopicStateEventContent as TopicContent, + SingleReceiptEventContent, StateEvent, TextMessageEventContent, TypingEvent, @@ -128,8 +130,9 @@ class MatrixHandler(BaseMatrixHandler): EventType.ROOM_MESSAGE, TextMessageEventContent( msgtype=MessageType.NOTICE, - body="Portal to private chat created and end-to-bridge" - " encryption enabled.", + body=( + "Portal to private chat created and end-to-bridge encryption enabled." + ), ), ) await intent.send_message_event(room_id, evt_type, content) @@ -352,26 +355,12 @@ class MatrixHandler(BaseMatrixHandler): user, profile.displayname, prev_profile.displayname, event_id ) - @staticmethod - def parse_read_receipts(content: ReceiptEventContent) -> Iterable[tuple[UserID, EventID]]: - return ( - (user_id, event_id) - for event_id, receipts in content.items() - for user_id in receipts.get(ReceiptType.READ, {}) - ) - - @staticmethod - async def handle_read_receipts( - room_id: RoomID, receipts: Iterable[tuple[UserID, EventID]] + async def handle_read_receipt( + self, user: u.User, portal: po.Portal, event_id: EventID, data: SingleReceiptEventContent ) -> None: - portal = await po.Portal.get_by_mxid(room_id) - if not portal or not portal.allow_bridging: + if not portal.allow_bridging: return - - for user_id, event_id in receipts: - user = await u.User.get_by_mxid(user_id, check_db=False, create=False) - if user and await user.is_logged_in(): - await portal.mark_read(user, event_id) + await portal.mark_read(user, event_id, data.get("ts", 0)) @staticmethod async def handle_presence(user_id: UserID, presence: PresenceState) -> None: @@ -402,7 +391,7 @@ class MatrixHandler(BaseMatrixHandler): self, evt: ReceiptEvent | PresenceEvent | TypingEvent ) -> None: if evt.type == EventType.RECEIPT: - await self.handle_read_receipts(evt.room_id, self.parse_read_receipts(evt.content)) + await self.handle_receipt(evt) elif evt.type == EventType.PRESENCE: await self.handle_presence(evt.sender, evt.content.presence) elif evt.type == EventType.TYPING: diff --git a/mautrix_telegram/portal.py b/mautrix_telegram/portal.py index 7c4eaa0e..96932023 100644 --- a/mautrix_telegram/portal.py +++ b/mautrix_telegram/portal.py @@ -35,6 +35,7 @@ import base64 import codecs import mimetypes import random +import time import unicodedata from asyncpg import UniqueViolationError @@ -53,6 +54,7 @@ from telethon.tl.functions.channels import ( InviteToChannelRequest, JoinChannelRequest, UpdateUsernameRequest, + ViewSponsoredMessageRequest, ) from telethon.tl.functions.messages import ( AddChatUserRequest, @@ -122,6 +124,7 @@ from telethon.tl.types import ( Poll, SendMessageCancelAction, SendMessageTypingAction, + SponsoredMessage, TypeChannelParticipant, TypeChat, TypeChatParticipant, @@ -250,6 +253,14 @@ class Portal(DBPortal, BasePortal): _main_intent: IntentAPI | None _room_create_lock: asyncio.Lock + _sponsored_msg: SponsoredMessage | None + _sponsored_entity: User | Channel | None + _sponsored_msg_ts: float + _sponsored_msg_lock: asyncio.Lock + _sponsored_evt_id: EventID | None + _sponsored_seen: dict[UserID, bool] + _new_messages_after_sponsored: bool + def __init__( self, tgid: TelegramID, @@ -259,6 +270,9 @@ class Portal(DBPortal, BasePortal): mxid: RoomID | None = None, avatar_url: ContentURI | None = None, encrypted: bool = False, + sponsored_event_id: EventID | None = None, + sponsored_event_ts: int | None = None, + sponsored_msg_random_id: bytes | None = None, username: str | None = None, title: str | None = None, about: str | None = None, @@ -273,6 +287,9 @@ class Portal(DBPortal, BasePortal): mxid=mxid, avatar_url=avatar_url, encrypted=encrypted, + sponsored_event_id=sponsored_event_id, + sponsored_event_ts=sponsored_event_ts, + sponsored_msg_random_id=sponsored_msg_random_id, username=username, title=title, about=about, @@ -293,6 +310,12 @@ class Portal(DBPortal, BasePortal): self._pin_lock = asyncio.Lock() self._room_create_lock = asyncio.Lock() + self._sponsored_msg = None + self._sponsored_msg_ts = 0 + self._sponsored_msg_lock = asyncio.Lock() + self._sponsored_seen = {} + self._new_messages_after_sponsored = True + # region Properties @property @@ -1190,7 +1213,91 @@ class Portal(DBPortal, BasePortal): SetTypingRequest(self.peer, action() if typing else SendMessageCancelAction()) ) - async def mark_read(self, user: u.User, event_id: EventID) -> None: + async def _get_sponsored_message( + self, user: u.User + ) -> tuple[SponsoredMessage | None, Channel | User | None]: + if user.is_bot: + return None, None + elif self._sponsored_msg_ts + 5 * 60 > time.monotonic(): + return self._sponsored_msg, self._sponsored_entity + + self._sponsored_msg, t_id, self._sponsored_entity = await putil.get_sponsored_message( + user, await self.get_input_entity(user) + ) + self._sponsored_msg_ts = time.monotonic() + if self._sponsored_entity is None: + self.log.warning(f"GetSponsoredMessages didn't return entity for {t_id}") + return self._sponsored_msg, self._sponsored_entity + + async def _send_sponsored_msg(self, user: u.User) -> None: + self.log.trace(f"Getting a new sponsored message through {user.mxid}") + msg, entity = await self._get_sponsored_message(user) + if msg is None: + self.log.trace("Didn't get a sponsored message") + return + if self.sponsored_event_id is not None: + self.log.debug( + f"Redacting old sponsored {self.sponsored_event_id}" + " in preparation for sending new one" + ) + await self.main_intent.redact(self.mxid, self.sponsored_event_id) + content = await putil.make_sponsored_message_content(user, msg, entity) + self.log.trace("Sending sponsored message") + self.sponsored_event_id = await self._send_message(self.main_intent, content) + self.sponsored_event_ts = int(time.time()) + self.sponsored_msg_random_id = msg.random_id + self._new_messages_after_sponsored = False + self._sponsored_seen = {} + await self.save() + self.log.debug( + f"Sent sponsored message {base64.b64encode(self.sponsored_msg_random_id)} " + f"to Matrix {self.sponsored_event_id} / {self.sponsored_event_ts}" + ) + + @property + def _sponsored_is_expired(self) -> bool: + return ( + self.sponsored_event_id is None + or self.sponsored_event_ts + 24 * 60 * 60 < int(time.time()) + ) and self._new_messages_after_sponsored + + async def _try_handle_read_for_sponsored_msg( + self, user: u.User, event_id: EventID, timestamp: int + ) -> None: + try: + await self._handle_read_for_sponsored_msg(user, event_id, timestamp) + except Exception: + self.log.warning( + "Error handling read receipt for sponsored message processing", exc_info=True + ) + + async def _handle_read_for_sponsored_msg( + self, user: u.User, event_id: EventID, timestamp: int + ) -> None: + if user.is_bot: + return + if self._sponsored_is_expired: + self.log.trace("Sponsored message is expired, sending new one") + async with self._sponsored_msg_lock: + if self._sponsored_is_expired: + await self._send_sponsored_msg(user) + return + + if ( + self.sponsored_event_id == event_id or self.sponsored_event_ts <= timestamp + ) and not self._sponsored_seen.get(user.mxid, False): + self._sponsored_seen[user.mxid] = True + self.log.debug( + f"Marking sponsored message {self.sponsored_event_id} as seen by {user.mxid}" + ) + await user.client( + ViewSponsoredMessageRequest( + channel=await self.get_input_entity(user), + random_id=self.sponsored_msg_random_id, + ) + ) + + async def mark_read(self, user: u.User, event_id: EventID, timestamp: int) -> None: if user.is_bot: return space = self.tgid if self.peer_type == "channel" else user.tgid @@ -1200,8 +1307,7 @@ class Portal(DBPortal, BasePortal): if not message: self.log.debug( f"Dropping Matrix read receipt from {user.mxid}: " - f"target message {event_id} not known and last message" - " in chat not found" + f"target message {event_id} not known and last message in chat not found" ) return else: @@ -1218,6 +1324,8 @@ class Portal(DBPortal, BasePortal): await user.client.send_read_acknowledge( self.peer, max_id=message.tgid, clear_mentions=True ) + if self.peer_type == "channel" and not self.megagroup: + asyncio.create_task(self._try_handle_read_for_sponsored_msg(user, event_id, timestamp)) async def _preproc_kick_ban( self, user: u.User | p.Puppet, source: u.User @@ -2195,11 +2303,15 @@ class Portal(DBPortal, BasePortal): content = TextMessageEventContent( msgtype=MessageType.TEXT, format=Format.HTML, - body=f"Poll: {poll.question}\n{text_answers}\n" - f"Vote with !tg vote {poll_id} ", - formatted_body=f"Poll: {poll.question}
\n" - f"
    {html_answers}
\n" - f"Vote with !tg vote {poll_id} <choice number>", + body=( + f"Poll: {poll.question}\n{text_answers}\n" + f"Vote with !tg vote {poll_id} " + ), + formatted_body=( + f"Poll: {poll.question}
\n" + f"
    {html_answers}
\n" + f"Vote with !tg vote {poll_id} <choice number>" + ), relates_to=relates_to, external_url=self._get_external_url(evt), ) @@ -2588,6 +2700,8 @@ class Portal(DBPortal, BasePortal): if not event_id: return + self._new_messages_after_sponsored = True + prev_id = self.dedup.update(evt, (event_id, tg_space), (temporary_identifier, tg_space)) if prev_id: self.log.debug( diff --git a/mautrix_telegram/portal_util/__init__.py b/mautrix_telegram/portal_util/__init__.py index 841a46c8..20e0ec41 100644 --- a/mautrix_telegram/portal_util/__init__.py +++ b/mautrix_telegram/portal_util/__init__.py @@ -3,3 +3,4 @@ from .media_fallback import make_contact_event_content, make_dice_event_content from .participants import get_users from .power_levels import get_base_power_levels, participants_to_power_levels from .send_lock import PortalSendLock +from .sponsored_message import get_sponsored_message, make_sponsored_message_content diff --git a/mautrix_telegram/portal_util/sponsored_message.py b/mautrix_telegram/portal_util/sponsored_message.py new file mode 100644 index 00000000..3440e63b --- /dev/null +++ b/mautrix_telegram/portal_util/sponsored_message.py @@ -0,0 +1,95 @@ +# mautrix-telegram - A Matrix-Telegram puppeting bridge +# Copyright (C) 2021 Tulir Asokan +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +from __future__ import annotations + +import base64 +import html + +from telethon.tl.functions.channels import GetSponsoredMessagesRequest +from telethon.tl.types import Channel, InputChannel, PeerChannel, PeerUser, SponsoredMessage, User + +from mautrix.types import MessageType, TextMessageEventContent + +from .. import user as u +from ..formatter import telegram_to_matrix + + +async def get_sponsored_message( + user: u.User, + entity: InputChannel, +) -> tuple[SponsoredMessage | None, int | None, Channel | User | None]: + resp = await user.client(GetSponsoredMessagesRequest(entity)) + if len(resp.messages) == 0: + return None, None, None + msg = resp.messages[0] + if isinstance(msg.from_id, PeerUser): + entities = resp.users + target_id = msg.from_id.user_id + else: + entities = resp.chats + target_id = msg.from_id.channel_id + try: + entity = next(ent for ent in entities if ent.id == target_id) + except StopIteration: + entity = None + return msg, target_id, entity + + +async def make_sponsored_message_content( + source: u.User, msg: SponsoredMessage, entity: Channel | User +) -> TextMessageEventContent | None: + content = await telegram_to_matrix(msg, source, require_html=True) + content.external_url = f"https://t.me/{entity.username}" + content.msgtype = MessageType.NOTICE + sponsored_meta = { + "random_id": base64.b64encode(msg.random_id).decode("utf-8"), + } + if isinstance(msg.from_id, PeerChannel): + sponsored_meta["channel_id"] = msg.from_id.channel_id + if getattr(msg, "channel_post", None) is not None: + sponsored_meta["channel_post"] = msg.channel_post + content.external_url += f"/{msg.channel_post}" + action = "View Post" + else: + action = "View Channel" + elif isinstance(msg.from_id, PeerUser): + sponsored_meta["bot_id"] = msg.from_id.user_id + if msg.start_param: + content.external_url += f"?start={msg.start_param}" + action = "View Bot" + else: + return None + + if isinstance(entity, User): + name_parts = [entity.first_name, entity.last_name] + sponsor_name = " ".join(x for x in name_parts if x) + sponsor_name_html = f"{html.escape(sponsor_name)}" + elif isinstance(entity, Channel): + sponsor_name = entity.title + sponsor_name_html = f"{html.escape(sponsor_name)}" + else: + sponsor_name = sponsor_name_html = "unknown entity" + + content["net.maunium.telegram.sponsored"] = sponsored_meta + content.formatted_body += ( + f"

Sponsored message from {sponsor_name_html} " + f"- {action}" + ) + content.body += ( + f"\n\nSponsored message from {sponsor_name} - {action} at {content.external_url}" + ) + + return content diff --git a/requirements.txt b/requirements.txt index b7974c74..17d14e42 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ python-magic>=0.4,<0.5 commonmark>=0.8,<0.10 aiohttp>=3,<4 yarl>=1,<2 -mautrix==0.14.0rc1 +mautrix==0.14.0rc2 #telethon>=1.24,<1.25 # Fork to make session storage async tulir-telethon==1.25.0a1