Add support for sponsored messages. Fixes #699

This commit is contained in:
Tulir Asokan
2021-12-21 19:51:00 +02:00
parent 190064bfff
commit 50c6f2b009
9 changed files with 285 additions and 49 deletions
+15 -6
View File
@@ -22,7 +22,7 @@ from asyncpg import Record
from attr import dataclass
import attr
from mautrix.types import ContentURI, RoomID
from mautrix.types import ContentURI, EventID, RoomID
from mautrix.util.async_db import Database
from ..types import TelegramID
@@ -45,6 +45,10 @@ class Portal:
avatar_url: ContentURI | None
encrypted: bool
sponsored_event_id: EventID | None
sponsored_event_ts: int | None
sponsored_msg_random_id: bytes | None
# Telegram chat metadata
username: str | None
title: str | None
@@ -62,8 +66,8 @@ class Portal:
return cls(**data)
columns: ClassVar[str] = (
"tgid, tg_receiver, peer_type, megagroup, mxid, avatar_url, encrypted, config, "
"username, title, about, photo_id"
"tgid, tg_receiver, peer_type, megagroup, mxid, avatar_url, encrypted, sponsored_event_id,"
"sponsored_event_ts, sponsored_msg_random_id, username, title, about, photo_id, config"
)
@classmethod
@@ -100,6 +104,9 @@ class Portal:
self.mxid,
self.avatar_url,
self.encrypted,
self.sponsored_event_id,
self.sponsored_event_ts,
self.sponsored_msg_random_id,
self.username,
self.title,
self.about,
@@ -110,8 +117,9 @@ class Portal:
async def save(self) -> None:
q = (
"UPDATE portal SET mxid=$4, avatar_url=$5, encrypted=$6, username=$7, title=$8,"
" about=$9, photo_id=$10, megagroup=$11, config=$12 "
"UPDATE portal SET mxid=$4, avatar_url=$5, encrypted=$6, sponsored_event_id=$7,"
" sponsored_event_ts=$8, sponsored_msg_random_id=$9, username=$10,"
" title=$11, about=$12, photo_id=$13, megagroup=$14, config=$15 "
"WHERE tgid=$1 AND tg_receiver=$2 AND (peer_type=$3 OR true)"
)
await self.db.execute(q, *self._values)
@@ -129,8 +137,9 @@ class Portal:
async def insert(self) -> None:
q = (
"INSERT INTO portal (tgid, tg_receiver, peer_type, mxid, avatar_url, encrypted,"
" sponsored_event_id, sponsored_event_ts, sponsored_msg_random_id,"
" username, title, about, photo_id, megagroup, config) "
"VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)"
"VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)"
)
await self.db.execute(q, *self._values)
+1 -1
View File
@@ -2,4 +2,4 @@ from mautrix.util.async_db import UpgradeTable
upgrade_table = UpgradeTable()
from . import v01_initial_revision
from . import v01_initial_revision, v02_sponsored_events
@@ -0,0 +1,25 @@
# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2021 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from asyncpg import Connection
from . import upgrade_table
@upgrade_table.register(description="Add column to store sponsored message event ID in channels")
async def upgrade_v2(conn: Connection) -> None:
await conn.execute("ALTER TABLE portal ADD COLUMN sponsored_event_id TEXT")
await conn.execute("ALTER TABLE portal ADD COLUMN sponsored_event_ts BIGINT")
await conn.execute("ALTER TABLE portal ADD COLUMN sponsored_msg_random_id bytea")
+15 -12
View File
@@ -43,6 +43,7 @@ from telethon.tl.types import (
PeerChannel,
PeerChat,
PeerUser,
SponsoredMessage,
TypeMessageEntity,
)
@@ -175,7 +176,7 @@ async def _add_reply_header(
async def telegram_to_matrix(
evt: Message,
evt: Message | SponsoredMessage,
source: au.AbstractUser,
main_intent: IntentAPI | None = None,
prefix_text: str | None = None,
@@ -183,6 +184,7 @@ async def telegram_to_matrix(
override_text: str = None,
override_entities: list[TypeMessageEntity] = None,
no_reply_fallback: bool = False,
require_html: bool = False,
) -> TextMessageEventContent:
content = TextMessageEventContent(
msgtype=MessageType.TEXT,
@@ -191,33 +193,34 @@ async def telegram_to_matrix(
entities = override_entities or evt.entities
if entities:
content.format = Format.HTML
content.formatted_body = await _telegram_entities_to_matrix_catch(content.body, entities)
html = await _telegram_entities_to_matrix_catch(add_surrogate(content.body), entities)
content.formatted_body = del_surrogate(html).replace("\n", "<br/>")
if prefix_html:
def force_html():
if not content.formatted_body:
content.format = Format.HTML
content.formatted_body = escape(content.body)
if require_html:
force_html()
if prefix_html:
force_html()
content.formatted_body = prefix_html + content.formatted_body
if prefix_text:
content.body = prefix_text + content.body
if evt.fwd_from:
if getattr(evt, "fwd_from", None):
await _add_forward_header(source, content, evt.fwd_from)
if evt.reply_to and not no_reply_fallback:
if getattr(evt, "reply_to", None) and not no_reply_fallback:
await _add_reply_header(source, content, evt, main_intent)
if isinstance(evt, Message) and evt.post and evt.post_author:
if not content.formatted_body:
content.formatted_body = escape(content.body)
force_html()
content.body += f"\n- {evt.post_author}"
content.formatted_body += f"<br/><i>- <u>{evt.post_author}</u></i>"
content.body = del_surrogate(content.body)
if content.formatted_body:
content.formatted_body = del_surrogate(content.formatted_body.replace("\n", "<br/>"))
return content
+10 -21
View File
@@ -17,6 +17,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Iterable
from mautrix.appservice import DOUBLE_PUPPET_SOURCE_KEY
from mautrix.bridge import BaseMatrixHandler
from mautrix.errors import MatrixError
from mautrix.types import (
@@ -35,6 +36,7 @@ from mautrix.types import (
RoomID,
RoomNameStateEventContent as NameContent,
RoomTopicStateEventContent as TopicContent,
SingleReceiptEventContent,
StateEvent,
TextMessageEventContent,
TypingEvent,
@@ -128,8 +130,9 @@ class MatrixHandler(BaseMatrixHandler):
EventType.ROOM_MESSAGE,
TextMessageEventContent(
msgtype=MessageType.NOTICE,
body="Portal to private chat created and end-to-bridge"
" encryption enabled.",
body=(
"Portal to private chat created and end-to-bridge encryption enabled."
),
),
)
await intent.send_message_event(room_id, evt_type, content)
@@ -352,26 +355,12 @@ class MatrixHandler(BaseMatrixHandler):
user, profile.displayname, prev_profile.displayname, event_id
)
@staticmethod
def parse_read_receipts(content: ReceiptEventContent) -> Iterable[tuple[UserID, EventID]]:
return (
(user_id, event_id)
for event_id, receipts in content.items()
for user_id in receipts.get(ReceiptType.READ, {})
)
@staticmethod
async def handle_read_receipts(
room_id: RoomID, receipts: Iterable[tuple[UserID, EventID]]
async def handle_read_receipt(
self, user: u.User, portal: po.Portal, event_id: EventID, data: SingleReceiptEventContent
) -> None:
portal = await po.Portal.get_by_mxid(room_id)
if not portal or not portal.allow_bridging:
if not portal.allow_bridging:
return
for user_id, event_id in receipts:
user = await u.User.get_by_mxid(user_id, check_db=False, create=False)
if user and await user.is_logged_in():
await portal.mark_read(user, event_id)
await portal.mark_read(user, event_id, data.get("ts", 0))
@staticmethod
async def handle_presence(user_id: UserID, presence: PresenceState) -> None:
@@ -402,7 +391,7 @@ class MatrixHandler(BaseMatrixHandler):
self, evt: ReceiptEvent | PresenceEvent | TypingEvent
) -> None:
if evt.type == EventType.RECEIPT:
await self.handle_read_receipts(evt.room_id, self.parse_read_receipts(evt.content))
await self.handle_receipt(evt)
elif evt.type == EventType.PRESENCE:
await self.handle_presence(evt.sender, evt.content.presence)
elif evt.type == EventType.TYPING:
+122 -8
View File
@@ -35,6 +35,7 @@ import base64
import codecs
import mimetypes
import random
import time
import unicodedata
from asyncpg import UniqueViolationError
@@ -53,6 +54,7 @@ from telethon.tl.functions.channels import (
InviteToChannelRequest,
JoinChannelRequest,
UpdateUsernameRequest,
ViewSponsoredMessageRequest,
)
from telethon.tl.functions.messages import (
AddChatUserRequest,
@@ -122,6 +124,7 @@ from telethon.tl.types import (
Poll,
SendMessageCancelAction,
SendMessageTypingAction,
SponsoredMessage,
TypeChannelParticipant,
TypeChat,
TypeChatParticipant,
@@ -250,6 +253,14 @@ class Portal(DBPortal, BasePortal):
_main_intent: IntentAPI | None
_room_create_lock: asyncio.Lock
_sponsored_msg: SponsoredMessage | None
_sponsored_entity: User | Channel | None
_sponsored_msg_ts: float
_sponsored_msg_lock: asyncio.Lock
_sponsored_evt_id: EventID | None
_sponsored_seen: dict[UserID, bool]
_new_messages_after_sponsored: bool
def __init__(
self,
tgid: TelegramID,
@@ -259,6 +270,9 @@ class Portal(DBPortal, BasePortal):
mxid: RoomID | None = None,
avatar_url: ContentURI | None = None,
encrypted: bool = False,
sponsored_event_id: EventID | None = None,
sponsored_event_ts: int | None = None,
sponsored_msg_random_id: bytes | None = None,
username: str | None = None,
title: str | None = None,
about: str | None = None,
@@ -273,6 +287,9 @@ class Portal(DBPortal, BasePortal):
mxid=mxid,
avatar_url=avatar_url,
encrypted=encrypted,
sponsored_event_id=sponsored_event_id,
sponsored_event_ts=sponsored_event_ts,
sponsored_msg_random_id=sponsored_msg_random_id,
username=username,
title=title,
about=about,
@@ -293,6 +310,12 @@ class Portal(DBPortal, BasePortal):
self._pin_lock = asyncio.Lock()
self._room_create_lock = asyncio.Lock()
self._sponsored_msg = None
self._sponsored_msg_ts = 0
self._sponsored_msg_lock = asyncio.Lock()
self._sponsored_seen = {}
self._new_messages_after_sponsored = True
# region Properties
@property
@@ -1190,7 +1213,91 @@ class Portal(DBPortal, BasePortal):
SetTypingRequest(self.peer, action() if typing else SendMessageCancelAction())
)
async def mark_read(self, user: u.User, event_id: EventID) -> None:
async def _get_sponsored_message(
self, user: u.User
) -> tuple[SponsoredMessage | None, Channel | User | None]:
if user.is_bot:
return None, None
elif self._sponsored_msg_ts + 5 * 60 > time.monotonic():
return self._sponsored_msg, self._sponsored_entity
self._sponsored_msg, t_id, self._sponsored_entity = await putil.get_sponsored_message(
user, await self.get_input_entity(user)
)
self._sponsored_msg_ts = time.monotonic()
if self._sponsored_entity is None:
self.log.warning(f"GetSponsoredMessages didn't return entity for {t_id}")
return self._sponsored_msg, self._sponsored_entity
async def _send_sponsored_msg(self, user: u.User) -> None:
self.log.trace(f"Getting a new sponsored message through {user.mxid}")
msg, entity = await self._get_sponsored_message(user)
if msg is None:
self.log.trace("Didn't get a sponsored message")
return
if self.sponsored_event_id is not None:
self.log.debug(
f"Redacting old sponsored {self.sponsored_event_id}"
" in preparation for sending new one"
)
await self.main_intent.redact(self.mxid, self.sponsored_event_id)
content = await putil.make_sponsored_message_content(user, msg, entity)
self.log.trace("Sending sponsored message")
self.sponsored_event_id = await self._send_message(self.main_intent, content)
self.sponsored_event_ts = int(time.time())
self.sponsored_msg_random_id = msg.random_id
self._new_messages_after_sponsored = False
self._sponsored_seen = {}
await self.save()
self.log.debug(
f"Sent sponsored message {base64.b64encode(self.sponsored_msg_random_id)} "
f"to Matrix {self.sponsored_event_id} / {self.sponsored_event_ts}"
)
@property
def _sponsored_is_expired(self) -> bool:
return (
self.sponsored_event_id is None
or self.sponsored_event_ts + 24 * 60 * 60 < int(time.time())
) and self._new_messages_after_sponsored
async def _try_handle_read_for_sponsored_msg(
self, user: u.User, event_id: EventID, timestamp: int
) -> None:
try:
await self._handle_read_for_sponsored_msg(user, event_id, timestamp)
except Exception:
self.log.warning(
"Error handling read receipt for sponsored message processing", exc_info=True
)
async def _handle_read_for_sponsored_msg(
self, user: u.User, event_id: EventID, timestamp: int
) -> None:
if user.is_bot:
return
if self._sponsored_is_expired:
self.log.trace("Sponsored message is expired, sending new one")
async with self._sponsored_msg_lock:
if self._sponsored_is_expired:
await self._send_sponsored_msg(user)
return
if (
self.sponsored_event_id == event_id or self.sponsored_event_ts <= timestamp
) and not self._sponsored_seen.get(user.mxid, False):
self._sponsored_seen[user.mxid] = True
self.log.debug(
f"Marking sponsored message {self.sponsored_event_id} as seen by {user.mxid}"
)
await user.client(
ViewSponsoredMessageRequest(
channel=await self.get_input_entity(user),
random_id=self.sponsored_msg_random_id,
)
)
async def mark_read(self, user: u.User, event_id: EventID, timestamp: int) -> None:
if user.is_bot:
return
space = self.tgid if self.peer_type == "channel" else user.tgid
@@ -1200,8 +1307,7 @@ class Portal(DBPortal, BasePortal):
if not message:
self.log.debug(
f"Dropping Matrix read receipt from {user.mxid}: "
f"target message {event_id} not known and last message"
" in chat not found"
f"target message {event_id} not known and last message in chat not found"
)
return
else:
@@ -1218,6 +1324,8 @@ class Portal(DBPortal, BasePortal):
await user.client.send_read_acknowledge(
self.peer, max_id=message.tgid, clear_mentions=True
)
if self.peer_type == "channel" and not self.megagroup:
asyncio.create_task(self._try_handle_read_for_sponsored_msg(user, event_id, timestamp))
async def _preproc_kick_ban(
self, user: u.User | p.Puppet, source: u.User
@@ -2195,11 +2303,15 @@ class Portal(DBPortal, BasePortal):
content = TextMessageEventContent(
msgtype=MessageType.TEXT,
format=Format.HTML,
body=f"Poll: {poll.question}\n{text_answers}\n"
f"Vote with !tg vote {poll_id} <choice number>",
formatted_body=f"<strong>Poll</strong>: {poll.question}<br/>\n"
f"<ol>{html_answers}</ol>\n"
f"Vote with <code>!tg vote {poll_id} &lt;choice number&gt;</code>",
body=(
f"Poll: {poll.question}\n{text_answers}\n"
f"Vote with !tg vote {poll_id} <choice number>"
),
formatted_body=(
f"<strong>Poll</strong>: {poll.question}<br/>\n"
f"<ol>{html_answers}</ol>\n"
f"Vote with <code>!tg vote {poll_id} &lt;choice number&gt;</code>"
),
relates_to=relates_to,
external_url=self._get_external_url(evt),
)
@@ -2588,6 +2700,8 @@ class Portal(DBPortal, BasePortal):
if not event_id:
return
self._new_messages_after_sponsored = True
prev_id = self.dedup.update(evt, (event_id, tg_space), (temporary_identifier, tg_space))
if prev_id:
self.log.debug(
+1
View File
@@ -3,3 +3,4 @@ from .media_fallback import make_contact_event_content, make_dice_event_content
from .participants import get_users
from .power_levels import get_base_power_levels, participants_to_power_levels
from .send_lock import PortalSendLock
from .sponsored_message import get_sponsored_message, make_sponsored_message_content
@@ -0,0 +1,95 @@
# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2021 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
import base64
import html
from telethon.tl.functions.channels import GetSponsoredMessagesRequest
from telethon.tl.types import Channel, InputChannel, PeerChannel, PeerUser, SponsoredMessage, User
from mautrix.types import MessageType, TextMessageEventContent
from .. import user as u
from ..formatter import telegram_to_matrix
async def get_sponsored_message(
user: u.User,
entity: InputChannel,
) -> tuple[SponsoredMessage | None, int | None, Channel | User | None]:
resp = await user.client(GetSponsoredMessagesRequest(entity))
if len(resp.messages) == 0:
return None, None, None
msg = resp.messages[0]
if isinstance(msg.from_id, PeerUser):
entities = resp.users
target_id = msg.from_id.user_id
else:
entities = resp.chats
target_id = msg.from_id.channel_id
try:
entity = next(ent for ent in entities if ent.id == target_id)
except StopIteration:
entity = None
return msg, target_id, entity
async def make_sponsored_message_content(
source: u.User, msg: SponsoredMessage, entity: Channel | User
) -> TextMessageEventContent | None:
content = await telegram_to_matrix(msg, source, require_html=True)
content.external_url = f"https://t.me/{entity.username}"
content.msgtype = MessageType.NOTICE
sponsored_meta = {
"random_id": base64.b64encode(msg.random_id).decode("utf-8"),
}
if isinstance(msg.from_id, PeerChannel):
sponsored_meta["channel_id"] = msg.from_id.channel_id
if getattr(msg, "channel_post", None) is not None:
sponsored_meta["channel_post"] = msg.channel_post
content.external_url += f"/{msg.channel_post}"
action = "View Post"
else:
action = "View Channel"
elif isinstance(msg.from_id, PeerUser):
sponsored_meta["bot_id"] = msg.from_id.user_id
if msg.start_param:
content.external_url += f"?start={msg.start_param}"
action = "View Bot"
else:
return None
if isinstance(entity, User):
name_parts = [entity.first_name, entity.last_name]
sponsor_name = " ".join(x for x in name_parts if x)
sponsor_name_html = f"<strong>{html.escape(sponsor_name)}</strong>"
elif isinstance(entity, Channel):
sponsor_name = entity.title
sponsor_name_html = f"<strong>{html.escape(sponsor_name)}</strong>"
else:
sponsor_name = sponsor_name_html = "unknown entity"
content["net.maunium.telegram.sponsored"] = sponsored_meta
content.formatted_body += (
f"<br/><br/>Sponsored message from {sponsor_name_html} "
f"- <a href='{content.external_url}'>{action}</a>"
)
content.body += (
f"\n\nSponsored message from {sponsor_name} - {action} at {content.external_url}"
)
return content
+1 -1
View File
@@ -3,7 +3,7 @@ python-magic>=0.4,<0.5
commonmark>=0.8,<0.10
aiohttp>=3,<4
yarl>=1,<2
mautrix==0.14.0rc1
mautrix==0.14.0rc2
#telethon>=1.24,<1.25
# Fork to make session storage async
tulir-telethon==1.25.0a1