Fix some bugs and add command to list invite links
This commit is contained in:
@@ -1,5 +1,8 @@
|
|||||||
# v0.11.3 (unreleased)
|
# v0.11.3 (unreleased)
|
||||||
|
|
||||||
|
### Added
|
||||||
|
* Added `list-invite-links` command to list invite links in a chat.
|
||||||
|
|
||||||
### Improved
|
### Improved
|
||||||
* Dropped Python 3.7 support.
|
* Dropped Python 3.7 support.
|
||||||
* Telegram->Matrix message formatter will now replace `t.me/c/chatid/messageid`
|
* Telegram->Matrix message formatter will now replace `t.me/c/chatid/messageid`
|
||||||
|
|||||||
@@ -25,12 +25,22 @@ from telethon.errors import (
|
|||||||
UsernameNotModifiedError,
|
UsernameNotModifiedError,
|
||||||
UsernameOccupiedError,
|
UsernameOccupiedError,
|
||||||
)
|
)
|
||||||
|
from telethon.helpers import add_surrogate
|
||||||
from telethon.tl.functions.channels import GetFullChannelRequest
|
from telethon.tl.functions.channels import GetFullChannelRequest
|
||||||
from telethon.tl.functions.messages import GetFullChatRequest
|
from telethon.tl.functions.messages import GetExportedChatInvitesRequest, GetFullChatRequest
|
||||||
|
from telethon.tl.types import (
|
||||||
|
ChatInviteExported,
|
||||||
|
InputMessageEntityMentionName,
|
||||||
|
InputUserSelf,
|
||||||
|
MessageEntityMention,
|
||||||
|
TypeInputPeer,
|
||||||
|
TypeInputUser,
|
||||||
|
)
|
||||||
|
from telethon.tl.types.messages import ExportedChatInvites
|
||||||
|
|
||||||
from mautrix.types import EventID
|
from mautrix.types import EventID
|
||||||
|
|
||||||
from ... import portal as po
|
from ... import formatter as fmt, portal as po, puppet as pu
|
||||||
from .. import SECTION_MISC, SECTION_PORTAL_MANAGEMENT, CommandEvent, command_handler
|
from .. import SECTION_MISC, SECTION_PORTAL_MANAGEMENT, CommandEvent, command_handler
|
||||||
from .util import user_has_power_level
|
from .util import user_has_power_level
|
||||||
|
|
||||||
@@ -101,30 +111,37 @@ async def get_id(evt: CommandEvent) -> EventID:
|
|||||||
|
|
||||||
|
|
||||||
invite_link_usage = (
|
invite_link_usage = (
|
||||||
"**Usage:** `$cmdprefix+sp invite-link [--uses=<amount>] [--expire=<delta>]`"
|
"**Usage:** `$cmdprefix+sp invite-link "
|
||||||
|
"[--uses=<amount>] [--expire=<delta>] [--request-needed] -- [title]`"
|
||||||
"\n\n"
|
"\n\n"
|
||||||
"* `--uses`: the number of times the invite link can be used."
|
"* `--uses`: the number of times the invite link can be used."
|
||||||
" Defaults to unlimited.\n"
|
" Defaults to unlimited.\n"
|
||||||
"* `--expire`: the duration after which the link will expire."
|
"* `--expire`: the duration after which the link will expire."
|
||||||
" A number suffixed with d(ay), h(our), m(inute) or s(econd)"
|
" A number suffixed with d(ay), h(our), m(inute) or s(econd)\n"
|
||||||
|
"* `--request-needed`: should the link require admins to approve joins?\n"
|
||||||
|
"* `title`: a description of the link (only shown to admins)."
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def _parse_flag(args: list[str]) -> tuple[str, str]:
|
def _parse_flag(args: list[str]) -> tuple[str, str]:
|
||||||
arg = args.pop(0).lower()
|
arg = args.pop(0).lower()
|
||||||
|
if arg == "--":
|
||||||
|
return "", ""
|
||||||
|
value = ""
|
||||||
if arg.startswith("--"):
|
if arg.startswith("--"):
|
||||||
value_start = arg.index("=")
|
value_start = arg.find("=")
|
||||||
if value_start:
|
if value_start > 0:
|
||||||
flag = arg[2:value_start]
|
flag = arg[2:value_start]
|
||||||
value = arg[value_start + 1 :]
|
value = arg[value_start + 1 :]
|
||||||
else:
|
else:
|
||||||
flag = arg[2:]
|
flag = arg[2:]
|
||||||
value = args.pop(0).lower()
|
if arg not in ("request", "request-needed"):
|
||||||
|
value = args.pop(0).lower()
|
||||||
elif arg.startswith("-"):
|
elif arg.startswith("-"):
|
||||||
flag = arg[1]
|
flag = arg[1]
|
||||||
if len(arg) > 3 and arg[2] == "=":
|
if len(arg) > 3 and arg[2] == "=":
|
||||||
value = arg[3:]
|
value = arg[3:]
|
||||||
else:
|
elif arg != "r":
|
||||||
value = args.pop(0).lower()
|
value = args.pop(0).lower()
|
||||||
else:
|
else:
|
||||||
raise ValueError("invalid flag")
|
raise ValueError("invalid flag")
|
||||||
@@ -159,9 +176,12 @@ def _parse_delta(value: str) -> timedelta | None:
|
|||||||
@command_handler(
|
@command_handler(
|
||||||
help_section=SECTION_PORTAL_MANAGEMENT,
|
help_section=SECTION_PORTAL_MANAGEMENT,
|
||||||
help_text="Get a Telegram invite link to the current chat.",
|
help_text="Get a Telegram invite link to the current chat.",
|
||||||
help_args="[--uses=<amount>] [--expire=<time delta, e.g. 1d>] [--request-needed] [title]",
|
help_args="[--uses=<amount>] [--expire=<time delta, e.g. 1d>] [--request-needed] -- [title]",
|
||||||
)
|
)
|
||||||
async def invite_link(evt: CommandEvent) -> EventID:
|
async def invite_link(evt: CommandEvent) -> EventID:
|
||||||
|
if not evt.is_portal:
|
||||||
|
return await evt.reply("This is not a portal room.")
|
||||||
|
|
||||||
# TODO once we switch to Python 3.9 minimum, use argparse with exit_on_error=False
|
# TODO once we switch to Python 3.9 minimum, use argparse with exit_on_error=False
|
||||||
uses = None
|
uses = None
|
||||||
expire = None
|
expire = None
|
||||||
@@ -171,7 +191,9 @@ async def invite_link(evt: CommandEvent) -> EventID:
|
|||||||
flag, value = _parse_flag(evt.args)
|
flag, value = _parse_flag(evt.args)
|
||||||
except (ValueError, IndexError):
|
except (ValueError, IndexError):
|
||||||
return await evt.reply(invite_link_usage)
|
return await evt.reply(invite_link_usage)
|
||||||
if flag in ("uses", "u"):
|
if not flag:
|
||||||
|
break
|
||||||
|
elif flag in ("uses", "u"):
|
||||||
try:
|
try:
|
||||||
uses = int(value)
|
uses = int(value)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
@@ -185,24 +207,86 @@ async def invite_link(evt: CommandEvent) -> EventID:
|
|||||||
request_needed = True
|
request_needed = True
|
||||||
title = " ".join(evt.args)
|
title = " ".join(evt.args)
|
||||||
|
|
||||||
portal = await po.Portal.get_by_mxid(evt.room_id)
|
if evt.portal.peer_type == "user":
|
||||||
if not portal:
|
|
||||||
return await evt.reply("This is not a portal room.")
|
|
||||||
|
|
||||||
if portal.peer_type == "user":
|
|
||||||
return await evt.reply("You can't invite users to private chats.")
|
return await evt.reply("You can't invite users to private chats.")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
link = await portal.get_invite_link(
|
link = await evt.portal.get_invite_link(
|
||||||
evt.sender, uses=uses, expire=expire, request_needed=request_needed, title=title
|
evt.sender, uses=uses, expire=expire, request_needed=request_needed, title=title
|
||||||
)
|
)
|
||||||
return await evt.reply(f"Invite link to {portal.title}: {link}")
|
return await evt.reply(f"Invite link to {evt.portal.title}: {link}")
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
return await evt.reply(e.args[0])
|
return await evt.reply(e.args[0])
|
||||||
except ChatAdminRequiredError:
|
except ChatAdminRequiredError:
|
||||||
return await evt.reply("You don't have the permission to create an invite link.")
|
return await evt.reply("You don't have the permission to create an invite link.")
|
||||||
|
|
||||||
|
|
||||||
|
async def _format_invite_link(link: ChatInviteExported) -> str:
|
||||||
|
desc = f"* {link.link}"
|
||||||
|
if link.title:
|
||||||
|
desc += f" - {link.title}"
|
||||||
|
if link.expire_date:
|
||||||
|
desc += f" \n Expires at {link.expire_date.isoformat()}"
|
||||||
|
if link.usage_limit:
|
||||||
|
desc += f" \n Used {link.usage or 0} out of {link.usage_limit} times"
|
||||||
|
elif link.usage:
|
||||||
|
desc += f" \n Used {link.usage} times"
|
||||||
|
else:
|
||||||
|
desc += " \n Never used"
|
||||||
|
if link.request_needed:
|
||||||
|
desc += " \n Join requests enabled - using link requires admin approval"
|
||||||
|
return desc
|
||||||
|
|
||||||
|
|
||||||
|
async def _hacky_find_mention(evt: CommandEvent) -> TypeInputUser | TypeInputPeer | None:
|
||||||
|
if len(evt.args) == 0:
|
||||||
|
return None
|
||||||
|
text, entities = await fmt.matrix_to_telegram(
|
||||||
|
evt.sender.client, text=evt.content.body, html=evt.content.formatted_body
|
||||||
|
)
|
||||||
|
for entity in entities:
|
||||||
|
if isinstance(entity, MessageEntityMention):
|
||||||
|
admin_username = add_surrogate(text)[entity.offset + 1 : entity.offset + entity.length]
|
||||||
|
return await evt.sender.client.get_input_entity(admin_username)
|
||||||
|
elif isinstance(entity, InputMessageEntityMentionName):
|
||||||
|
return entity.user_id
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
@command_handler(
|
||||||
|
help_section=SECTION_PORTAL_MANAGEMENT,
|
||||||
|
help_text="List existing Telegram invite links to the current chat.",
|
||||||
|
help_args="[creator]",
|
||||||
|
)
|
||||||
|
async def list_invite_links(evt: CommandEvent) -> EventID:
|
||||||
|
admin_id = InputUserSelf()
|
||||||
|
try:
|
||||||
|
admin_id = await _hacky_find_mention(evt) or InputUserSelf()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
resp: ExportedChatInvites = await evt.sender.client(
|
||||||
|
GetExportedChatInvitesRequest(
|
||||||
|
peer=await evt.portal.get_input_entity(evt.sender),
|
||||||
|
admin_id=admin_id,
|
||||||
|
limit=100,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
if resp.count == 0:
|
||||||
|
if isinstance(admin_id, InputUserSelf):
|
||||||
|
return await evt.reply("You haven't created any invite links to the current chat")
|
||||||
|
else:
|
||||||
|
return await evt.reply("That user hasn't created any invite links to the current chat")
|
||||||
|
formatted_links = "\n".join([await _format_invite_link(link) for link in resp.invites])
|
||||||
|
if isinstance(admin_id, InputUserSelf):
|
||||||
|
await evt.reply(f"Your links to this chat:\n\n{formatted_links}")
|
||||||
|
else:
|
||||||
|
puppet = await pu.Puppet.get_by_peer(admin_id)
|
||||||
|
await evt.reply(
|
||||||
|
f"[{puppet.displayname}](https://matrix.to/#/{puppet.mxid})'s links to this chat:\n\n"
|
||||||
|
f"{formatted_links}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@command_handler(
|
@command_handler(
|
||||||
help_section=SECTION_PORTAL_MANAGEMENT,
|
help_section=SECTION_PORTAL_MANAGEMENT,
|
||||||
help_text="Upgrade a normal Telegram group to a supergroup.",
|
help_text="Upgrade a normal Telegram group to a supergroup.",
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ from telethon.tl.types import (
|
|||||||
ChatPhoto,
|
ChatPhoto,
|
||||||
ChatPhotoEmpty,
|
ChatPhotoEmpty,
|
||||||
InputPeerPhotoFileLocation,
|
InputPeerPhotoFileLocation,
|
||||||
|
InputPeerUser,
|
||||||
PeerChannel,
|
PeerChannel,
|
||||||
PeerChat,
|
PeerChat,
|
||||||
PeerUser,
|
PeerUser,
|
||||||
@@ -414,7 +415,7 @@ class Puppet(DBPuppet, BasePuppet):
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_id_from_peer(peer: TypePeer | User | Channel) -> TelegramID:
|
def get_id_from_peer(peer: TypePeer | User | Channel) -> TelegramID:
|
||||||
if isinstance(peer, PeerUser):
|
if isinstance(peer, (PeerUser, InputPeerUser)):
|
||||||
return TelegramID(peer.user_id)
|
return TelegramID(peer.user_id)
|
||||||
elif isinstance(peer, PeerChannel):
|
elif isinstance(peer, PeerChannel):
|
||||||
return TelegramID(peer.channel_id)
|
return TelegramID(peer.channel_id)
|
||||||
|
|||||||
Reference in New Issue
Block a user