Reduce usage of regexes
This commit is contained in:
@@ -139,7 +139,7 @@ def plain_mention_to_text() -> Tuple[List[TypeMessageEntity], Callable[[Match],
|
||||
def init_mx(context: "Context") -> None:
|
||||
global plain_mention_regex, should_bridge_plaintext_highlights
|
||||
config = context.config
|
||||
dn_template = config.get("bridge.displayname_template", "{displayname} (Telegram)")
|
||||
dn_template = config["bridge.displayname_template"]
|
||||
dn_template = re.escape(dn_template).replace(re.escape("{displayname}"), "[^>]+")
|
||||
plain_mention_regex = re.compile(f"^({dn_template})")
|
||||
should_bridge_plaintext_highlights = config["bridge.plaintext_highlights"] or False
|
||||
should_bridge_plaintext_highlights = config["bridge.plaintext_highlights"]
|
||||
|
||||
@@ -31,6 +31,7 @@ from telethon.tl.types import (Channel, ChannelFull, Chat, ChatFull, ChatInviteE
|
||||
from mautrix.errors import MatrixRequestError, IntentError
|
||||
from mautrix.appservice import AppService, IntentAPI
|
||||
from mautrix.types import RoomID, RoomAlias, UserID, EventType, PowerLevelStateEventContent
|
||||
from mautrix.util.simple_template import SimpleTemplate
|
||||
|
||||
from ..types import TelegramID
|
||||
from ..context import Context
|
||||
@@ -67,10 +68,8 @@ class BasePortal(ABC):
|
||||
sync_matrix_state: bool = True
|
||||
public_portals: bool = False
|
||||
|
||||
alias_template: str = None
|
||||
_mx_alias_prefix: str = None
|
||||
_mx_alias_suffix: str = None
|
||||
hs_domain: str = None
|
||||
alias_template: SimpleTemplate[str]
|
||||
hs_domain: str
|
||||
|
||||
# Instance cache
|
||||
by_mxid: Dict[RoomID, 'Portal'] = {}
|
||||
@@ -346,11 +345,7 @@ class BasePortal(ABC):
|
||||
|
||||
@classmethod
|
||||
def get_username_from_mx_alias(cls, alias: str) -> Optional[str]:
|
||||
prefix = cls._mx_alias_prefix
|
||||
suffix = cls._mx_alias_suffix
|
||||
if alias[:len(prefix)] == prefix and alias[-len(suffix):] == suffix:
|
||||
return alias[len(prefix):-len(suffix)]
|
||||
return None
|
||||
return cls.alias_template.parse(alias)
|
||||
|
||||
@classmethod
|
||||
def find_by_username(cls, username: str) -> Optional['Portal']:
|
||||
@@ -475,9 +470,5 @@ def init(context: Context) -> None:
|
||||
BasePortal.filter_mode = config["bridge.filter.mode"]
|
||||
BasePortal.filter_list = config["bridge.filter.list"]
|
||||
BasePortal.hs_domain = config["homeserver.domain"]
|
||||
BasePortal.alias_template = config["bridge.alias_template"]
|
||||
index = BasePortal.alias_template.index("{groupname}")
|
||||
length = len("{groupname}")
|
||||
BasePortal._mx_alias_prefix = f"#{BasePortal.alias_template[:index]}"
|
||||
BasePortal._mx_alias_suffix = (f"{BasePortal.alias_template[index + length:]}"
|
||||
f":{BasePortal.hs_domain}")
|
||||
BasePortal.alias_template = SimpleTemplate(config["bridge.alias_template"], "groupname",
|
||||
prefix="#", suffix=f":{BasePortal.hs_domain}")
|
||||
|
||||
@@ -418,7 +418,7 @@ class PortalMetadata(BasePortal, ABC):
|
||||
username = username or self.username
|
||||
if not username:
|
||||
return None
|
||||
return self.alias_template.format(groupname=username)
|
||||
return self.alias_template.format(username)
|
||||
|
||||
def _add_bot_chat(self, bot: User) -> None:
|
||||
if self.bot and bot.id == self.bot.tgid:
|
||||
|
||||
+11
-29
@@ -25,6 +25,7 @@ from mautrix.appservice import AppService, IntentAPI
|
||||
from mautrix.errors import MatrixRequestError
|
||||
from mautrix.bridge import CustomPuppetMixin
|
||||
from mautrix.types import UserID, SyncToken
|
||||
from mautrix.util.simple_template import SimpleTemplate
|
||||
|
||||
from .types import TelegramID
|
||||
from .db import Puppet as DBPuppet
|
||||
@@ -44,12 +45,9 @@ class Puppet(CustomPuppetMixin):
|
||||
az: AppService
|
||||
mx: 'MatrixHandler'
|
||||
loop: asyncio.AbstractEventLoop
|
||||
username_template: str
|
||||
hs_domain: str
|
||||
_mxid_prefix: str
|
||||
_mxid_suffix: str
|
||||
_displayname_prefix: str
|
||||
_displayname_suffix: str
|
||||
mxid_template: SimpleTemplate[TelegramID]
|
||||
displayname_template: SimpleTemplate[str]
|
||||
|
||||
cache: Dict[TelegramID, 'Puppet'] = {}
|
||||
by_custom_mxid: Dict[UserID, 'Puppet'] = {}
|
||||
@@ -137,11 +135,7 @@ class Puppet(CustomPuppetMixin):
|
||||
|
||||
@property
|
||||
def plain_displayname(self) -> str:
|
||||
prefix = self._mxid_prefix
|
||||
suffix = self._mxid_suffix
|
||||
if self.displayname[:len(prefix)] == prefix and self.displayname[-len(suffix):] == suffix:
|
||||
return self.displayname[len(prefix):-len(suffix)]
|
||||
return self.displayname
|
||||
return self.displayname_template.parse(self.displayname) or self.displayname
|
||||
|
||||
def get_input_entity(self, user: 'AbstractUser'
|
||||
) -> Awaitable[Union[TypeInputPeer, TypeInputUser]]:
|
||||
@@ -229,8 +223,7 @@ class Puppet(CustomPuppetMixin):
|
||||
|
||||
if not enable_format:
|
||||
return name
|
||||
return config["bridge.displayname_template"].format(
|
||||
displayname=name)
|
||||
return cls.displayname_template.format_full(name)
|
||||
|
||||
async def update_info(self, source: 'AbstractUser', info: User) -> None:
|
||||
if self.disable_updates:
|
||||
@@ -372,15 +365,11 @@ class Puppet(CustomPuppetMixin):
|
||||
|
||||
@classmethod
|
||||
def get_id_from_mxid(cls, mxid: UserID) -> Optional[TelegramID]:
|
||||
prefix = cls._mxid_prefix
|
||||
suffix = cls._mxid_suffix
|
||||
if mxid[:len(prefix)] == prefix and mxid[-len(suffix):] == suffix:
|
||||
return TelegramID(int(mxid[len(prefix):-len(suffix)]))
|
||||
return None
|
||||
return cls.mxid_template.parse(mxid)
|
||||
|
||||
@classmethod
|
||||
def get_mxid_from_id(cls, tgid: TelegramID) -> UserID:
|
||||
return UserID(f"@{cls.username_template.format(userid=tgid)}:{cls.hs_domain}")
|
||||
return UserID(cls.mxid_template.format_full(tgid))
|
||||
|
||||
@classmethod
|
||||
def find_by_username(cls, username: str) -> Optional['Puppet']:
|
||||
@@ -420,16 +409,9 @@ def init(context: 'Context') -> Iterable[Awaitable[Any]]:
|
||||
Puppet.mx = context.mx
|
||||
Puppet.hs_domain = config["homeserver"]["domain"]
|
||||
|
||||
Puppet.username_template = config["bridge.username_template"]
|
||||
index = Puppet.username_template.index("{userid}")
|
||||
length = len("{userid}")
|
||||
Puppet._mxid_prefix = f"@{Puppet.username_template[:index]}"
|
||||
Puppet._mxid_suffix = f"{Puppet.username_template[index + length:]}:{Puppet.hs_domain}"
|
||||
|
||||
displayname_template = config["bridge.displayname_template"]
|
||||
index = displayname_template.index("{displayname}")
|
||||
length = len("{displayname}")
|
||||
Puppet._displayname_prefix = displayname_template[:index]
|
||||
Puppet._displayname_suffix = displayname_template[index + length:]
|
||||
Puppet.mxid_template = SimpleTemplate(config["bridge.username_template"], "userid",
|
||||
prefix="@", suffix=f":{Puppet.hs_domain}", type=int)
|
||||
Puppet.displayname_template = SimpleTemplate(config["bridge.displayname_template"],
|
||||
"displayname")
|
||||
|
||||
return (puppet.start() for puppet in Puppet.all_with_custom_mxid())
|
||||
|
||||
@@ -13,11 +13,10 @@
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
from typing import (Awaitable, Dict, List, Iterable, Match, NewType, Optional, Tuple, Any, cast,
|
||||
from typing import (Awaitable, Dict, List, Iterable, NewType, Optional, Tuple, Any, cast,
|
||||
TYPE_CHECKING)
|
||||
import logging
|
||||
import asyncio
|
||||
import re
|
||||
|
||||
from telethon.tl.types import (TypeUpdate, UpdateNewMessage, UpdateNewChannelMessage, PeerUser,
|
||||
UpdateShortChatMessage, UpdateShortMessage, User as TLUser, Chat)
|
||||
@@ -25,6 +24,7 @@ from telethon.tl.types.contacts import ContactsNotModified
|
||||
from telethon.tl.functions.contacts import GetContactsRequest, SearchRequest
|
||||
from telethon.tl.functions.account import UpdateStatusRequest
|
||||
|
||||
from mautrix.client import Client
|
||||
from mautrix.errors import MatrixRequestError
|
||||
from mautrix.types import UserID
|
||||
|
||||
@@ -97,8 +97,8 @@ class User(AbstractUser):
|
||||
|
||||
@property
|
||||
def mxid_localpart(self) -> str:
|
||||
match: Match = re.compile("@(.+):(.+)").match(self.mxid)
|
||||
return match.group(1)
|
||||
localpart, server = Client.parse_user_id(self.mxid)
|
||||
return localpart
|
||||
|
||||
@property
|
||||
def human_tg_id(self) -> str:
|
||||
|
||||
Reference in New Issue
Block a user