Add command to set caption for telegram files

This commit is contained in:
Tulir Asokan
2019-10-26 19:27:03 +03:00
parent 623b802d56
commit 22f6a12842
2 changed files with 36 additions and 29 deletions
+23 -8
View File
@@ -13,7 +13,7 @@
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import List, Optional, Tuple
from typing import List, Optional, Tuple, cast
import logging
import codecs
import base64
@@ -23,7 +23,7 @@ from telethon.errors import (InviteHashInvalidError, InviteHashExpiredError, Opt
UserAlreadyParticipantError, ChatIdInvalidError)
from telethon.tl.patched import Message
from telethon.tl.types import (User as TLUser, TypeUpdates, MessageMediaGame, MessageMediaPoll,
TypePeer)
TypeInputPeer)
from telethon.tl.types.messages import BotCallbackAnswer
from telethon.tl.functions.messages import (ImportChatInviteRequest, CheckChatInviteRequest,
GetBotCallbackAnswerRequest, SendVoteRequest)
@@ -38,6 +38,21 @@ from ...types import TelegramID
from ...commands import command_handler, CommandEvent, SECTION_MISC, SECTION_CREATING_PORTALS
@command_handler(needs_auth=False,
help_section=SECTION_MISC, help_args="<_caption_>",
help_text="Set a caption for the next image you send")
async def caption(evt: CommandEvent) -> EventID:
if len(evt.args) == 0:
return await evt.reply("**Usage:** `$cmdprefix+sp caption <caption>`")
text = " ".join(evt.args)
evt.sender.command_status = {"caption": text}
quoted_text = "\n".join(f"> {row}" for row in text.split("\n"))
return await evt.reply("Your next image will be captioned with\n\n"
f"{quoted_text}\n\n"
"Use `$cmdprefix+sp cancel` to cancel the caption.")
@command_handler(help_section=SECTION_MISC,
help_args="[_-r|--remote_] <_query_>",
help_text="Search your contacts or the Telegram servers for users.")
@@ -76,8 +91,7 @@ async def search(evt: CommandEvent) -> EventID:
return await evt.reply("\n".join(reply))
@command_handler(help_section=SECTION_CREATING_PORTALS,
help_args="<_identifier_>",
@command_handler(help_section=SECTION_CREATING_PORTALS, help_args="<_identifier_>",
help_text="Open a private chat with the given Telegram user. The identifier is "
"either the internal user ID, the username or the phone number. "
"**N.B.** The phone numbers you start chats with must already be in "
@@ -183,7 +197,7 @@ class MessageIDError(ValueError):
async def _parse_encoded_msgid(user: AbstractUser, enc_id: str, type_name: str
) -> Tuple[TypePeer, Message]:
) -> Tuple[TypeInputPeer, Message]:
try:
enc_id += (4 - len(enc_id) % 4) * "="
enc_id = base64.b64decode(enc_id)
@@ -212,7 +226,7 @@ async def _parse_encoded_msgid(user: AbstractUser, enc_id: str, type_name: str
msg = await user.client.get_messages(entity=peer, ids=msg_id)
if not msg:
raise MessageIDError(f"Invalid {type_name} ID (message not found)")
return peer, msg
return peer, cast(Message, msg)
@command_handler(help_section=SECTION_MISC,
@@ -234,12 +248,13 @@ async def play(evt: CommandEvent) -> EventID:
if not isinstance(msg.media, MessageMediaGame):
return await evt.reply("Invalid play ID (message doesn't look like a game)")
game = await evt.sender.client(GetBotCallbackAnswerRequest(peer=peer, msg_id=msg.id, game=True))
game = await evt.sender.client(
GetBotCallbackAnswerRequest(peer=peer, msg_id=msg.id, game=True))
if not isinstance(game, BotCallbackAnswer):
return await evt.reply("Game request response invalid")
return await evt.reply(f"Click [here]({game.url}) to play {msg.media.game.title}:\n\n"
f"{msg.media.game.description}")
f"{msg.media.game.description}")
@command_handler(help_section=SECTION_MISC,
+13 -21
View File
@@ -57,19 +57,6 @@ config: Optional['Config'] = None
class PortalMatrix(BasePortal, MautrixBasePortal, ABC):
@staticmethod
def _get_file_meta(body: str, mime: str) -> str:
try:
current_extension = body[body.rindex("."):].lower()
body = body[:body.rindex(".")]
if mimetypes.types_map[current_extension] == mime:
return body + current_extension
except (ValueError, KeyError):
pass
if mime:
return f"matrix_upload{sane_mimetypes.guess_extension(mime)}"
return ""
async def _get_state_change_message(self, event: str, user: 'u.User', **kwargs: Any
) -> Optional[str]:
tpl = self.get_config(f"state_event_formats.{event}")
@@ -266,23 +253,21 @@ class PortalMatrix(BasePortal, MautrixBasePortal, ABC):
file = await self.main_intent.download_media(content.url)
mime = content.info.mimetype
w, h = content.info.width, content.info.height
file_name = content["net.maunium.telegram.internal.filename"]
if content.msgtype == MessageType.STICKER:
if mime != "image/gif":
mime, file, w, h = util.convert_image(file, source_mime=mime, target_type="webp")
else:
# Remove sticker description
content["net.maunium.telegram.internal.filename"] = "sticker.gif"
content.body = ""
file_name = "sticker.gif"
file_name = self._get_file_meta(content["net.maunium.telegram.internal.filename"], mime)
attributes = [DocumentAttributeFilename(file_name=file_name)]
if w and h:
attributes.append(DocumentAttributeImageSize(w, h))
caption = content.body if content.body.lower() != file_name.lower() else None
caption = content.body if content.body != file_name else None
media = await client.upload_file_direct(
file, mime, attributes, file_name,
@@ -364,7 +349,15 @@ class PortalMatrix(BasePortal, MautrixBasePortal, ABC):
else (sender.tgid if logged_in else self.bot.tgid))
reply_to = formatter.matrix_reply_to_telegram(content, space, room_id=self.mxid)
content["net.maunium.telegram.internal.filename"] = content.body
media = (MessageType.STICKER, MessageType.IMAGE, MessageType.FILE, MessageType.AUDIO,
MessageType.VIDEO)
if content.msgtype in media:
content["net.maunium.telegram.internal.filename"] = content.body
try:
content.body = sender.command_status["caption"]
sender.command_status = None
except (KeyError, TypeError):
pass
await self._pre_process_matrix_message(sender, not logged_in, content)
if content.msgtype == MessageType.NOTICE:
@@ -378,8 +371,7 @@ class PortalMatrix(BasePortal, MautrixBasePortal, ABC):
elif content.msgtype == MessageType.LOCATION:
await self._handle_matrix_location(sender_id, event_id, space, client, content,
reply_to)
elif content.msgtype in (MessageType.STICKER, MessageType.IMAGE, MessageType.FILE,
MessageType.AUDIO, MessageType.VIDEO):
elif content.msgtype in media:
await self._handle_matrix_file(sender_id, event_id, space, client, content, reply_to)
else:
self.log.debug(f"Unhandled Matrix event: {content}")