Update Telethon and add support for invite link customization
This commit is contained in:
@@ -13,6 +13,10 @@
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
from typing import Optional, List, Tuple
|
||||
from datetime import timedelta, datetime
|
||||
import re
|
||||
|
||||
from telethon.tl.functions.channels import GetFullChannelRequest
|
||||
from telethon.tl.functions.messages import GetFullChatRequest
|
||||
from telethon.errors import (ChatAdminRequiredError, UsernameInvalidError,
|
||||
@@ -80,9 +84,81 @@ async def get_id(evt: CommandEvent) -> EventID:
|
||||
await evt.reply(f"This room is bridged to Telegram chat ID `{tgid}`.")
|
||||
|
||||
|
||||
invite_link_usage = ("**Usage:** `$cmdprefix+sp invite-link [--uses=<amount>] [--expire=<delta>]`"
|
||||
"\n\n"
|
||||
"* `--uses`: the number of times the invite link can be used."
|
||||
" Defaults to unlimited.\n"
|
||||
"* `--expire`: the duration after which the link will expire."
|
||||
" A number suffixed with d(ay), h(our), m(inute) or s(econd)")
|
||||
|
||||
|
||||
def _parse_flag(args: List[str]) -> Tuple[str, str]:
|
||||
arg = args.pop(0).lower()
|
||||
if arg.startswith("--"):
|
||||
value_start = arg.index("=")
|
||||
if value_start:
|
||||
flag = arg[2:value_start]
|
||||
value = arg[value_start+1:]
|
||||
else:
|
||||
flag = arg[2:]
|
||||
value = args.pop(0).lower()
|
||||
elif arg.startswith("-"):
|
||||
flag = arg[1]
|
||||
if len(arg) > 3 and arg[2] == "=":
|
||||
value = arg[3:]
|
||||
else:
|
||||
value = args.pop(0).lower()
|
||||
else:
|
||||
raise ValueError("invalid flag")
|
||||
return flag, value
|
||||
|
||||
|
||||
delta_regex = re.compile("([0-9]+)(w(?:eek)?|d(?:ay)?|h(?:our)?|m(?:in(?:ute)?)?|s(?:ec(?:ond)?)?)")
|
||||
|
||||
|
||||
def _parse_delta(value: str) -> Optional[timedelta]:
|
||||
match = delta_regex.fullmatch(value)
|
||||
if not match:
|
||||
return None
|
||||
number = int(match.group(1))
|
||||
unit = match.group(2)[0]
|
||||
if unit == "w":
|
||||
return timedelta(weeks=number)
|
||||
elif unit == "d":
|
||||
return timedelta(days=number)
|
||||
elif unit == "h":
|
||||
return timedelta(hours=number)
|
||||
elif unit == "m":
|
||||
return timedelta(minutes=number)
|
||||
elif unit == "s":
|
||||
return timedelta(seconds=number)
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
@command_handler(help_section=SECTION_PORTAL_MANAGEMENT,
|
||||
help_text="Get a Telegram invite link to the current chat.")
|
||||
help_text="Get a Telegram invite link to the current chat.",
|
||||
help_args="[--uses=<amount>] [--expire=<time delta, e.g. 1d>]")
|
||||
async def invite_link(evt: CommandEvent) -> EventID:
|
||||
# TODO once we switch to Python 3.9 minimum, use argparse with exit_on_error=False
|
||||
uses = None
|
||||
expire = None
|
||||
while evt.args:
|
||||
try:
|
||||
flag, value = _parse_flag(evt.args)
|
||||
except (ValueError, IndexError):
|
||||
return await evt.reply(invite_link_usage)
|
||||
if flag in ("uses", "u"):
|
||||
try:
|
||||
uses = int(value)
|
||||
except ValueError:
|
||||
await evt.reply("The number of uses must be an integer")
|
||||
elif flag in ("expire", "e"):
|
||||
expire_delta = _parse_delta(value)
|
||||
if not expire_delta:
|
||||
await evt.reply("Invalid format for expiry time delta")
|
||||
expire = datetime.now() + expire_delta
|
||||
|
||||
portal = po.Portal.get_by_mxid(evt.room_id)
|
||||
if not portal:
|
||||
return await evt.reply("This is not a portal room.")
|
||||
@@ -91,7 +167,7 @@ async def invite_link(evt: CommandEvent) -> EventID:
|
||||
return await evt.reply("You can't invite users to private chats.")
|
||||
|
||||
try:
|
||||
link = await portal.get_invite_link(evt.sender)
|
||||
link = await portal.get_invite_link(evt.sender, uses=uses, expire=expire)
|
||||
return await evt.reply(f"Invite link to {portal.title}: {link}")
|
||||
except ValueError as e:
|
||||
return await evt.reply(e.args[0])
|
||||
|
||||
@@ -15,12 +15,13 @@
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
from typing import Awaitable, Dict, List, Optional, Tuple, Union, Any, Set, Iterable, TYPE_CHECKING
|
||||
from abc import ABC, abstractmethod
|
||||
from datetime import datetime
|
||||
import asyncio
|
||||
import logging
|
||||
import json
|
||||
|
||||
from telethon.tl.functions.messages import ExportChatInviteRequest
|
||||
from telethon.tl.types import (Channel, ChannelFull, Chat, ChatFull, ChatInviteEmpty, InputChannel,
|
||||
from telethon.tl.types import (Channel, ChannelFull, Chat, ChatFull, InputChannel,
|
||||
InputPeerChannel, InputPeerChat, InputPeerUser, InputUser,
|
||||
PeerChannel, PeerChat, PeerUser, TypeChat, TypeInputPeer, TypePeer,
|
||||
TypeUser, TypeUserFull, User, UserFull, TypeInputChannel, Photo,
|
||||
@@ -30,7 +31,7 @@ from telethon.tl.types import (Channel, ChannelFull, Chat, ChatFull, ChatInviteE
|
||||
|
||||
from mautrix.errors import MatrixRequestError, IntentError
|
||||
from mautrix.appservice import AppService, IntentAPI
|
||||
from mautrix.types import (RoomID, RoomAlias, UserID, EventID, EventType, MessageEventContent,
|
||||
from mautrix.types import (RoomID, RoomAlias, UserID, EventID, EventType,
|
||||
PowerLevelStateEventContent, ContentURI)
|
||||
from mautrix.util.simple_template import SimpleTemplate
|
||||
from mautrix.util.simple_lock import SimpleLock
|
||||
@@ -270,14 +271,14 @@ class BasePortal(MautrixBasePortal, ABC):
|
||||
return dialog.entity
|
||||
raise
|
||||
|
||||
async def get_invite_link(self, user: 'u.User') -> str:
|
||||
async def get_invite_link(self, user: 'u.User', uses: Optional[int] = None,
|
||||
expire: Optional[datetime] = None) -> str:
|
||||
if self.peer_type == "user":
|
||||
raise ValueError("You can't invite users to private chats.")
|
||||
if self.username:
|
||||
return f"https://t.me/{self.username}"
|
||||
link = await user.client(ExportChatInviteRequest(peer=await self.get_input_entity(user)))
|
||||
if isinstance(link, ChatInviteEmpty):
|
||||
raise ValueError("Failed to get invite link.")
|
||||
link = await user.client(ExportChatInviteRequest(peer=await self.get_input_entity(user),
|
||||
expire_date=expire, usage_limit=uses))
|
||||
return link.link
|
||||
|
||||
# endregion
|
||||
|
||||
+1
-1
@@ -6,5 +6,5 @@ commonmark>=0.8,<0.10
|
||||
aiohttp>=3,<4
|
||||
yarl>=1,<2
|
||||
mautrix>=0.8.11,<0.9
|
||||
telethon>=1.18,<1.20,!=1.19.0,!=1.19.1,!=1.19.2,!=1.19.3
|
||||
telethon>=1.20,<1.21
|
||||
telethon-session-sqlalchemy>=0.2.14,<0.3
|
||||
|
||||
Reference in New Issue
Block a user