Even more migrations to mautrix-python
This commit is contained in:
+88
-222
@@ -13,19 +13,21 @@
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
from typing import Awaitable, Any, Dict, List, Iterable, Optional, Pattern, Union, TYPE_CHECKING
|
||||
from typing import Awaitable, Any, Dict, Iterable, Optional, Union, TYPE_CHECKING
|
||||
from difflib import SequenceMatcher
|
||||
from enum import Enum
|
||||
from aiohttp import ServerDisconnectedError
|
||||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
|
||||
from telethon.tl.types import (UserProfilePhoto, User, UpdateUserName, PeerUser, TypeInputPeer,
|
||||
InputPeerPhotoFileLocation, UserProfilePhotoEmpty)
|
||||
from mautrix_appservice import AppService, IntentAPI, IntentError, MatrixRequestError
|
||||
|
||||
from .types import MatrixUserID, TelegramID
|
||||
from mautrix.appservice import AppService, IntentAPI
|
||||
from mautrix.errors import MatrixRequestError
|
||||
from mautrix.bridge import CustomPuppetMixin
|
||||
from mautrix.types import UserID
|
||||
|
||||
from .types import TelegramID
|
||||
from .db import Puppet as DBPuppet
|
||||
from . import util
|
||||
|
||||
@@ -35,26 +37,48 @@ if TYPE_CHECKING:
|
||||
from .context import Context
|
||||
from .abstract_user import AbstractUser
|
||||
|
||||
PuppetError = Enum('PuppetError', 'Success OnlyLoginSelf InvalidAccessToken')
|
||||
|
||||
config = None # type: Config
|
||||
config: Optional['Config'] = None
|
||||
|
||||
|
||||
class Puppet:
|
||||
log = logging.getLogger("mau.puppet") # type: logging.Logger
|
||||
az = None # type: AppService
|
||||
mx = None # type: MatrixHandler
|
||||
loop = None # type: asyncio.AbstractEventLoop
|
||||
mxid_regex = None # type: Pattern
|
||||
username_template = None # type: str
|
||||
hs_domain = None # type: str
|
||||
cache = {} # type: Dict[TelegramID, Puppet]
|
||||
by_custom_mxid = {} # type: Dict[str, Puppet]
|
||||
class Puppet(CustomPuppetMixin):
|
||||
log: logging.Logger = logging.getLogger("mau.puppet")
|
||||
az: AppService
|
||||
mx: 'MatrixHandler'
|
||||
loop: asyncio.AbstractEventLoop
|
||||
username_template: str
|
||||
hs_domain: str
|
||||
_mxid_prefix: str
|
||||
_mxid_suffix: str
|
||||
_displayname_prefix: str
|
||||
_displayname_suffix: str
|
||||
|
||||
cache: Dict[TelegramID, 'Puppet'] = {}
|
||||
by_custom_mxid: Dict[UserID, 'Puppet'] = {}
|
||||
|
||||
id: TelegramID
|
||||
access_token: Optional[str]
|
||||
custom_mxid: Optional[UserID]
|
||||
default_mxid: UserID
|
||||
|
||||
username: Optional[str]
|
||||
displayname: Optional[str]
|
||||
displayname_source: Optional[TelegramID]
|
||||
photo_id: Optional[str]
|
||||
is_bot: bool
|
||||
is_registered: bool
|
||||
disable_updates: bool
|
||||
|
||||
default_mxid_intent: IntentAPI
|
||||
intent: IntentAPI
|
||||
|
||||
sync_task: Optional[asyncio.Future]
|
||||
|
||||
_db_instance: Optional[DBPuppet]
|
||||
|
||||
def __init__(self,
|
||||
id: TelegramID,
|
||||
access_token: Optional[str] = None,
|
||||
custom_mxid: Optional[MatrixUserID] = None,
|
||||
custom_mxid: Optional[UserID] = None,
|
||||
username: Optional[str] = None,
|
||||
displayname: Optional[str] = None,
|
||||
displayname_source: Optional[TelegramID] = None,
|
||||
@@ -63,41 +87,32 @@ class Puppet:
|
||||
is_registered: bool = False,
|
||||
disable_updates: bool = False,
|
||||
db_instance: Optional[DBPuppet] = None) -> None:
|
||||
self.id = id # type: TelegramID
|
||||
self.access_token = access_token # type: Optional[str]
|
||||
self.custom_mxid = custom_mxid # type: Optional[MatrixUserID]
|
||||
self.default_mxid = self.get_mxid_from_id(self.id) # type: MatrixUserID
|
||||
self.id = id
|
||||
self.access_token = access_token
|
||||
self.custom_mxid = custom_mxid
|
||||
self.default_mxid = self.get_mxid_from_id(self.id)
|
||||
|
||||
self.username = username # type: Optional[str]
|
||||
self.displayname = displayname # type: Optional[str]
|
||||
self.displayname_source = displayname_source # type: Optional[TelegramID]
|
||||
self.photo_id = photo_id # type: Optional[str]
|
||||
self.is_bot = is_bot # type: bool
|
||||
self.is_registered = is_registered # type: bool
|
||||
self.disable_updates = disable_updates # type: bool
|
||||
self._db_instance = db_instance # type: Optional[DBPuppet]
|
||||
self.username = username
|
||||
self.displayname = displayname
|
||||
self.displayname_source = displayname_source
|
||||
self.photo_id = photo_id
|
||||
self.is_bot = is_bot
|
||||
self.is_registered = is_registered
|
||||
self.disable_updates = disable_updates
|
||||
self._db_instance = db_instance
|
||||
|
||||
self.default_mxid_intent = self.az.intent.user(self.default_mxid)
|
||||
self.intent = self._fresh_intent() # type: IntentAPI
|
||||
self.sync_task = None # type: Optional[asyncio.Future]
|
||||
self.intent = self._fresh_intent()
|
||||
self.sync_task = None
|
||||
|
||||
self.cache[id] = self
|
||||
if self.custom_mxid:
|
||||
self.by_custom_mxid[self.custom_mxid] = self
|
||||
|
||||
@property
|
||||
def mxid(self) -> MatrixUserID:
|
||||
return self.custom_mxid or self.default_mxid
|
||||
|
||||
@property
|
||||
def tgid(self) -> TelegramID:
|
||||
return self.id
|
||||
|
||||
@property
|
||||
def is_real_user(self) -> bool:
|
||||
""" Is True when the puppet is a real Matrix user. """
|
||||
return bool(self.custom_mxid and self.access_token)
|
||||
|
||||
@staticmethod
|
||||
async def is_logged_in() -> bool:
|
||||
""" Is True if the puppet is logged in. """
|
||||
@@ -105,175 +120,15 @@ class Puppet:
|
||||
|
||||
@property
|
||||
def plain_displayname(self) -> str:
|
||||
tpl = config["bridge.displayname_template"]
|
||||
if tpl == "{displayname}":
|
||||
# Template has no extra stuff, no need to parse.
|
||||
return self.displayname
|
||||
regex = re.compile("^" + re.escape(tpl).replace(re.escape("{displayname}"), "(.+?)") + "$")
|
||||
match = regex.match(self.displayname)
|
||||
return match.group(1) or self.displayname
|
||||
prefix = self._mxid_prefix
|
||||
suffix = self._mxid_suffix
|
||||
if self.displayname[:len(prefix)] == prefix and self.displayname[-len(suffix):] == suffix:
|
||||
return self.displayname[len(prefix):-len(suffix)]
|
||||
return self.displayname
|
||||
|
||||
def get_input_entity(self, user: 'AbstractUser') -> Awaitable[TypeInputPeer]:
|
||||
return user.client.get_input_entity(PeerUser(user_id=self.tgid))
|
||||
|
||||
# region Custom puppet management
|
||||
def _fresh_intent(self) -> IntentAPI:
|
||||
return (self.az.intent.user(self.custom_mxid, self.access_token)
|
||||
if self.is_real_user else self.default_mxid_intent)
|
||||
|
||||
async def switch_mxid(self, access_token: Optional[str],
|
||||
mxid: Optional[MatrixUserID]) -> PuppetError:
|
||||
prev_mxid = self.custom_mxid
|
||||
self.custom_mxid = mxid
|
||||
self.access_token = access_token
|
||||
self.intent = self._fresh_intent()
|
||||
|
||||
err = await self.init_custom_mxid()
|
||||
if err != PuppetError.Success:
|
||||
return err
|
||||
|
||||
try:
|
||||
del self.by_custom_mxid[prev_mxid] # type: ignore
|
||||
except KeyError:
|
||||
pass
|
||||
if self.mxid != self.default_mxid:
|
||||
self.by_custom_mxid[self.mxid] = self
|
||||
await self.leave_rooms_with_default_user()
|
||||
self.save()
|
||||
return PuppetError.Success
|
||||
|
||||
async def init_custom_mxid(self) -> PuppetError:
|
||||
if not self.is_real_user:
|
||||
return PuppetError.Success
|
||||
|
||||
mxid = await self.intent.whoami()
|
||||
if not mxid or mxid != self.custom_mxid:
|
||||
self.custom_mxid = None
|
||||
self.access_token = None
|
||||
self.intent = self._fresh_intent()
|
||||
if mxid != self.custom_mxid:
|
||||
return PuppetError.OnlyLoginSelf
|
||||
return PuppetError.InvalidAccessToken
|
||||
if config["bridge.sync_with_custom_puppets"]:
|
||||
self.sync_task = asyncio.ensure_future(self.sync(), loop=self.loop)
|
||||
return PuppetError.Success
|
||||
|
||||
async def leave_rooms_with_default_user(self) -> None:
|
||||
for room_id in await self.default_mxid_intent.get_joined_rooms():
|
||||
try:
|
||||
await self.default_mxid_intent.leave_room(room_id)
|
||||
await self.intent.ensure_joined(room_id)
|
||||
except (IntentError, MatrixRequestError):
|
||||
pass
|
||||
|
||||
def create_sync_filter(self) -> Awaitable[str]:
|
||||
return self.intent.client.create_filter(self.custom_mxid, {
|
||||
"room": {
|
||||
"include_leave": False,
|
||||
"state": {
|
||||
"types": []
|
||||
},
|
||||
"timeline": {
|
||||
"types": [],
|
||||
},
|
||||
"ephemeral": {
|
||||
"types": ["m.typing", "m.receipt"],
|
||||
},
|
||||
"account_data": {
|
||||
"types": []
|
||||
}
|
||||
},
|
||||
"account_data": {
|
||||
"types": [],
|
||||
},
|
||||
"presence": {
|
||||
"types": ["m.presence"],
|
||||
"senders": [self.custom_mxid],
|
||||
},
|
||||
})
|
||||
|
||||
def filter_events(self, events: List[Dict]) -> List:
|
||||
new_events = []
|
||||
for event in events:
|
||||
evt_type = event.get("type", None)
|
||||
event.setdefault("content", {})
|
||||
if evt_type == "m.typing":
|
||||
is_typing = self.custom_mxid in event["content"].get("user_ids", [])
|
||||
event["content"]["user_ids"] = [self.custom_mxid] if is_typing else []
|
||||
elif evt_type == "m.receipt":
|
||||
val = None
|
||||
evt = None
|
||||
for event_id in event["content"]:
|
||||
try:
|
||||
val = event["content"][event_id]["m.read"][self.custom_mxid]
|
||||
evt = event_id
|
||||
break
|
||||
except KeyError:
|
||||
pass
|
||||
if val and evt:
|
||||
event["content"] = {evt: {"m.read": {
|
||||
self.custom_mxid: val
|
||||
}}}
|
||||
else:
|
||||
continue
|
||||
new_events.append(event)
|
||||
return new_events
|
||||
|
||||
def handle_sync(self, presence: List, ephemeral: Dict) -> None:
|
||||
presence_events = [self.mx.try_handle_ephemeral_event(event) for event in presence]
|
||||
|
||||
for room_id, events in ephemeral.items():
|
||||
for event in events:
|
||||
event["room_id"] = room_id
|
||||
|
||||
ephemeral_events = [self.mx.try_handle_ephemeral_event(event)
|
||||
for events in ephemeral.values()
|
||||
for event in self.filter_events(events)]
|
||||
|
||||
events = ephemeral_events + presence_events # List[Callable[[int], Awaitable[None]]]
|
||||
coro = asyncio.gather(*events, loop=self.loop)
|
||||
asyncio.ensure_future(coro, loop=self.loop)
|
||||
|
||||
async def sync(self) -> None:
|
||||
try:
|
||||
await self._sync()
|
||||
except asyncio.CancelledError:
|
||||
self.log.info("Syncing cancelled")
|
||||
except Exception:
|
||||
self.log.exception("Fatal error syncing")
|
||||
|
||||
async def _sync(self) -> None:
|
||||
if not self.is_real_user:
|
||||
self.log.warning("Called sync() for non-custom puppet.")
|
||||
return
|
||||
custom_mxid = self.custom_mxid
|
||||
access_token_at_start = self.access_token
|
||||
errors = 0
|
||||
next_batch = None
|
||||
filter_id = await self.create_sync_filter()
|
||||
self.log.debug(f"Starting syncer for {custom_mxid} with sync filter {filter_id}.")
|
||||
while access_token_at_start == self.access_token:
|
||||
try:
|
||||
sync_resp = await self.intent.client.sync(filter=filter_id, since=next_batch,
|
||||
set_presence="offline") # type: Dict
|
||||
errors = 0
|
||||
if next_batch is not None:
|
||||
presence = sync_resp.get("presence", {}).get("events", []) # type: List
|
||||
ephemeral = {room: data.get("ephemeral", {}).get("events", [])
|
||||
for room, data
|
||||
in sync_resp.get("rooms", {}).get("join", {}).items()
|
||||
} # type: Dict
|
||||
self.handle_sync(presence, ephemeral)
|
||||
next_batch = sync_resp.get("next_batch", None)
|
||||
except (MatrixRequestError, ServerDisconnectedError) as e:
|
||||
wait = min(errors, 11) ** 2
|
||||
self.log.warning(f"Syncer for {custom_mxid} errored: {e}. "
|
||||
f"Waiting for {wait} seconds...")
|
||||
errors += 1
|
||||
await asyncio.sleep(wait)
|
||||
self.log.debug(f"Syncer for custom puppet {custom_mxid} stopped.")
|
||||
|
||||
# endregion
|
||||
# region DB conversion
|
||||
|
||||
@property
|
||||
@@ -378,7 +233,7 @@ class Puppet:
|
||||
self.displayname = displayname
|
||||
self.displayname_source = source.tgid
|
||||
try:
|
||||
await self.default_mxid_intent.set_display_name(displayname[:100])
|
||||
await self.default_mxid_intent.set_displayname(displayname[:100])
|
||||
except MatrixRequestError:
|
||||
self.log.exception("Failed to set displayname")
|
||||
self.displayname = ""
|
||||
@@ -402,7 +257,7 @@ class Puppet:
|
||||
if not photo_id:
|
||||
self.photo_id = ""
|
||||
try:
|
||||
await self.default_mxid_intent.set_avatar("")
|
||||
await self.default_mxid_intent.set_avatar_url("")
|
||||
except MatrixRequestError:
|
||||
self.log.exception("Failed to set avatar")
|
||||
self.photo_id = ""
|
||||
@@ -418,7 +273,7 @@ class Puppet:
|
||||
if file:
|
||||
self.photo_id = photo_id
|
||||
try:
|
||||
await self.default_mxid_intent.set_avatar(file.mxc)
|
||||
await self.default_mxid_intent.set_avatar_url(file.mxc)
|
||||
except MatrixRequestError:
|
||||
self.log.exception("Failed to set avatar")
|
||||
self.photo_id = ""
|
||||
@@ -447,7 +302,7 @@ class Puppet:
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def get_by_mxid(cls, mxid: MatrixUserID, create: bool = True) -> Optional['Puppet']:
|
||||
def get_by_mxid(cls, mxid: UserID, create: bool = True) -> Optional['Puppet']:
|
||||
tgid = cls.get_id_from_mxid(mxid)
|
||||
if tgid:
|
||||
return cls.get(tgid, create)
|
||||
@@ -455,7 +310,7 @@ class Puppet:
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def get_by_custom_mxid(cls, mxid: MatrixUserID) -> Optional['Puppet']:
|
||||
def get_by_custom_mxid(cls, mxid: UserID) -> Optional['Puppet']:
|
||||
if not mxid:
|
||||
raise ValueError("Matrix ID can't be empty")
|
||||
|
||||
@@ -479,15 +334,16 @@ class Puppet:
|
||||
for puppet in DBPuppet.all_with_custom_mxid())
|
||||
|
||||
@classmethod
|
||||
def get_id_from_mxid(cls, mxid: MatrixUserID) -> Optional[TelegramID]:
|
||||
match = cls.mxid_regex.match(mxid)
|
||||
if match:
|
||||
return TelegramID(int(match.group(1)))
|
||||
def get_id_from_mxid(cls, mxid: UserID) -> Optional[TelegramID]:
|
||||
prefix = cls._mxid_prefix
|
||||
suffix = cls._mxid_suffix
|
||||
if mxid[:len(prefix)] == prefix and mxid[-len(suffix):] == suffix:
|
||||
return TelegramID(int(mxid[len(prefix):-len(suffix)]))
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def get_mxid_from_id(cls, tgid: TelegramID) -> MatrixUserID:
|
||||
return MatrixUserID(f"@{cls.username_template.format(userid=tgid)}:{cls.hs_domain}")
|
||||
def get_mxid_from_id(cls, tgid: TelegramID) -> UserID:
|
||||
return UserID(f"@{cls.username_template.format(userid=tgid)}:{cls.hs_domain}")
|
||||
|
||||
@classmethod
|
||||
def find_by_username(cls, username: str) -> Optional['Puppet']:
|
||||
@@ -525,8 +381,18 @@ def init(context: 'Context') -> Iterable[Awaitable[Any]]:
|
||||
global config
|
||||
Puppet.az, config, Puppet.loop, _ = context.core
|
||||
Puppet.mx = context.mx
|
||||
Puppet.username_template = config.get("bridge.username_template", "telegram_{userid}")
|
||||
Puppet.hs_domain = config["homeserver"]["domain"]
|
||||
Puppet.mxid_regex = re.compile(
|
||||
f"@{Puppet.username_template.format(userid='([0-9]+)')}:{Puppet.hs_domain}")
|
||||
return (puppet.init_custom_mxid() for puppet in Puppet.all_with_custom_mxid())
|
||||
|
||||
Puppet.username_template = config["bridge.username_template"]
|
||||
index = Puppet.username_template.index("{userid}")
|
||||
length = len("{userid}")
|
||||
Puppet._mxid_prefix = f"@{Puppet.username_template[:index]}"
|
||||
Puppet._mxid_suffix = f"{Puppet.username_template[index + length:]}:{Puppet.hs_domain}"
|
||||
|
||||
displayname_template = config["bridge.displayname_template"]
|
||||
index = displayname_template.index("{displayname}")
|
||||
length = len("{displayname}")
|
||||
Puppet._displayname_prefix = displayname_template[:index]
|
||||
Puppet._displayname_suffix = displayname_template[index+length:]
|
||||
|
||||
return (puppet.start() for puppet in Puppet.all_with_custom_mxid())
|
||||
|
||||
Reference in New Issue
Block a user