656 lines
26 KiB
Python
656 lines
26 KiB
Python
# -*- coding: future_fstrings -*-
|
|
# mautrix-telegram - A Matrix-Telegram puppeting bridge
|
|
# Copyright (C) 2018 Tulir Asokan
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
from telethon.tl.functions.messages import (GetFullChatRequest, EditChatAdminRequest,
|
|
CreateChatRequest, AddChatUserRequest)
|
|
from telethon.tl.functions.channels import (GetParticipantsRequest, CreateChannelRequest,
|
|
InviteToChannelRequest)
|
|
from telethon.errors.rpc_error_list import ChatAdminRequiredError, LocationInvalidError
|
|
from telethon.tl.types import *
|
|
from PIL import Image
|
|
from io import BytesIO
|
|
import mimetypes
|
|
import magic
|
|
from .db import Portal as DBPortal, Message as DBMessage
|
|
from . import puppet as p, user as u, formatter
|
|
|
|
mimetypes.init()
|
|
|
|
config = None
|
|
|
|
|
|
class Portal:
|
|
log = None
|
|
db = None
|
|
az = None
|
|
by_mxid = {}
|
|
by_tgid = {}
|
|
|
|
def __init__(self, tgid, peer_type, tg_receiver=None, mxid=None, username=None, title=None,
|
|
photo_id=None):
|
|
self.mxid = mxid
|
|
self.tgid = tgid
|
|
self.tg_receiver = tg_receiver or tgid
|
|
self.peer_type = peer_type
|
|
self.username = username
|
|
self.title = title
|
|
self.photo_id = photo_id
|
|
self._main_intent = None
|
|
|
|
if tgid:
|
|
self.by_tgid[self.tgid_full] = self
|
|
if mxid:
|
|
self.by_mxid[mxid] = self
|
|
|
|
@property
|
|
def tgid_full(self):
|
|
return self.tgid, self.tg_receiver
|
|
|
|
@property
|
|
def tgid_log(self):
|
|
if self.tgid == self.tg_receiver:
|
|
return self.tgid
|
|
return f"{self.tg_receiver}<->{self.tgid}"
|
|
|
|
@property
|
|
def peer(self):
|
|
if self.peer_type == "user":
|
|
return PeerUser(user_id=self.tgid)
|
|
elif self.peer_type == "chat":
|
|
return PeerChat(chat_id=self.tgid)
|
|
elif self.peer_type == "channel":
|
|
return PeerChannel(channel_id=self.tgid)
|
|
|
|
# region Matrix room info updating
|
|
|
|
@property
|
|
def main_intent(self):
|
|
if not self._main_intent:
|
|
direct = self.peer_type == "user"
|
|
puppet = p.Puppet.get(self.tgid) if direct else None
|
|
self._main_intent = puppet.intent if direct else self.az.intent
|
|
return self._main_intent
|
|
|
|
def invite_matrix(self, users=[]):
|
|
if isinstance(users, str):
|
|
self.main_intent.invite(self.mxid, users)
|
|
else:
|
|
for user in users:
|
|
self.main_intent.invite(self.mxid, user)
|
|
|
|
def update_after_create(self, user, entity, direct, puppet=None):
|
|
if not direct:
|
|
self.update_info(user, entity)
|
|
users, participants = self.get_users(user, entity)
|
|
self.sync_telegram_users(user, users)
|
|
self.update_telegram_participants(participants)
|
|
else:
|
|
if not puppet:
|
|
puppet = p.Puppet.get(self.tgid)
|
|
puppet.update_info(user, entity)
|
|
puppet.intent.join_room(self.mxid)
|
|
|
|
def create_matrix_room(self, user, entity=None, invites=[], update_if_exists=True):
|
|
if not entity:
|
|
entity = user.client.get_entity(self.peer)
|
|
self.log.debug("Fetched data: %s", entity)
|
|
direct = self.peer_type == "user"
|
|
|
|
if self.mxid:
|
|
if update_if_exists:
|
|
self.update_after_create(user, entity, direct)
|
|
self.invite_matrix(invites)
|
|
return self.mxid
|
|
|
|
self.log.debug(f"Creating room for {self.tgid_log}")
|
|
|
|
try:
|
|
title = entity.title
|
|
except AttributeError:
|
|
title = None
|
|
|
|
puppet = p.Puppet.get(self.tgid) if direct else None
|
|
intent = puppet.intent if direct else self.az.intent
|
|
|
|
# TODO set room alias if public channel.
|
|
room = intent.create_room(invitees=invites, name=title, is_direct=direct)
|
|
if not room:
|
|
raise Exception(f"Failed to create room for {self.tgid_log}")
|
|
|
|
self.mxid = room["room_id"]
|
|
self.by_mxid[self.mxid] = self
|
|
self.save()
|
|
|
|
power_level_requirement = 0 if self.peer_type == "chat" else 50
|
|
levels = self.main_intent.get_power_levels(self.mxid)
|
|
levels["ban"] = 100
|
|
levels["invite"] = 50
|
|
levels["events"]["m.room.name"] = power_level_requirement
|
|
levels["events"]["m.room.avatar"] = power_level_requirement
|
|
levels["events"]["m.room.topic"] = 50 if self.peer_type == "channel" else 100
|
|
levels["events"]["m.room.power_levels"] = 95
|
|
self.main_intent.set_power_levels(self.mxid, levels)
|
|
self.update_after_create(user, entity, direct, puppet)
|
|
|
|
def sync_telegram_users(self, source, users=[]):
|
|
for entity in users:
|
|
puppet = p.Puppet.get(entity.id)
|
|
puppet.update_info(source, entity)
|
|
puppet.intent.join_room(self.mxid)
|
|
|
|
def add_telegram_user(self, user_id, source=None):
|
|
puppet = p.Puppet.get(user_id)
|
|
if source:
|
|
entity = source.client.get_entity(user_id)
|
|
puppet.update_info(source, entity)
|
|
puppet.intent.join_room(self.mxid)
|
|
|
|
user = u.User.get_by_tgid(user_id)
|
|
if user:
|
|
self.main_intent.invite(self.mxid, user.mxid)
|
|
|
|
def delete_telegram_user(self, user_id, kick_message=None):
|
|
puppet = p.Puppet.get(user_id)
|
|
user = u.User.get_by_tgid(user_id)
|
|
if kick_message:
|
|
self.main_intent.kick(self.mxid, puppet.mxid, kick_message)
|
|
else:
|
|
puppet.intent.leave_room(self.mxid)
|
|
if user:
|
|
self.main_intent.kick(self.mxid, user.mxid, kick_message or "Left Telegram chat")
|
|
|
|
def update_info(self, user, entity=None):
|
|
if self.peer_type == "user":
|
|
self.log.warn(f"Called update_info() for direct chat portal {self.tgid_log}")
|
|
return
|
|
|
|
self.log.debug(f"Updating info of {self.tgid_log}")
|
|
if not entity:
|
|
entity = user.client.get_entity(self.peer)
|
|
self.log.debug("Fetched data: %s", entity)
|
|
changed = False
|
|
|
|
if self.peer_type == "channel":
|
|
if self.username != entity.username:
|
|
# TODO update room alias
|
|
self.username = entity.username
|
|
changed = True
|
|
|
|
changed = self.update_title(entity.title, self.main_intent) or changed
|
|
|
|
if isinstance(entity.photo, ChatPhoto):
|
|
changed = self.update_avatar(user, entity.photo.photo_big, self.main_intent) or changed
|
|
|
|
if changed:
|
|
self.save()
|
|
|
|
def update_title(self, title, intent=None):
|
|
if self.title != title:
|
|
self.title = title
|
|
self.main_intent.set_room_name(self.mxid, self.title)
|
|
return True
|
|
return False
|
|
|
|
def get_largest_photo_size(self, photo):
|
|
return max(photo.sizes, key=(lambda photo: (
|
|
len(photo.bytes) if isinstance(photo, PhotoCachedSize) else photo.size)))
|
|
|
|
def update_avatar(self, user, photo, intent=None):
|
|
photo_id = f"{photo.volume_id}-{photo.local_id}"
|
|
if self.photo_id != photo_id:
|
|
try:
|
|
file = user.download_file(photo)
|
|
except LocationInvalidError:
|
|
return False
|
|
uploaded = self.main_intent.upload_file(file)
|
|
self.main_intent.set_room_avatar(self.mxid, uploaded["content_uri"])
|
|
self.photo_id = photo_id
|
|
return True
|
|
return False
|
|
|
|
def get_users(self, user, entity):
|
|
if self.peer_type == "chat":
|
|
chat = user.client(GetFullChatRequest(chat_id=self.tgid))
|
|
return chat.users, chat.full_chat.participants.participants
|
|
elif self.peer_type == "channel":
|
|
try:
|
|
participants = user.client(GetParticipantsRequest(
|
|
entity, ChannelParticipantsRecent(), offset=0, limit=100, hash=0
|
|
))
|
|
return participants.users, participants.participants
|
|
except ChatAdminRequiredError:
|
|
return [], []
|
|
elif self.peer_type == "user":
|
|
return [entity], []
|
|
|
|
# endregion
|
|
# region Matrix event handling
|
|
|
|
def _get_file_meta(self, body, mime):
|
|
file_name = None
|
|
try:
|
|
current_extension = body[body.rindex("."):]
|
|
if mimetypes.types_map[current_extension] == mime:
|
|
file_name = body
|
|
else:
|
|
file_name = f"matrix_upload{mimetypes.guess_extension(mime)}"
|
|
except (ValueError, KeyError):
|
|
file_name = f"matrix_upload{mimetypes.guess_extension(mime)}"
|
|
return file_name, None if file_name == body else body
|
|
|
|
def handle_matrix_message(self, sender, message, event_id):
|
|
type = message["msgtype"]
|
|
if type in {"m.text", "m.emote"}:
|
|
if "format" in message and message["format"] == "org.matrix.custom.html":
|
|
message, entities = formatter.matrix_to_telegram(message["formatted_body"],
|
|
sender.tgid)
|
|
if type == "m.emote":
|
|
message = "/me " + message
|
|
reply_to = None
|
|
if len(entities) > 0 and isinstance(entities[0], formatter.MessageEntityReply):
|
|
reply_to = entities.pop(0).msg_id
|
|
response = sender.send_message(self.peer, message, entities=entities,
|
|
reply_to=reply_to)
|
|
else:
|
|
if type == "m.emote":
|
|
message["body"] = "/me " + message["body"]
|
|
response = sender.send_message(self.peer, message["body"])
|
|
elif type in {"m.image", "m.file", "m.audio", "m.video"}:
|
|
file = self.main_intent.download_file(message["url"])
|
|
|
|
info = message["info"]
|
|
mime = info["mimetype"]
|
|
|
|
file_name, caption = self._get_file_meta(message["body"], mime)
|
|
|
|
attributes = [DocumentAttributeFilename(file_name=file_name)]
|
|
if "w" in info and "h" in info:
|
|
attributes.append(DocumentAttributeImageSize(w=info["w"], h=info["h"]))
|
|
|
|
response = sender.send_file(self.peer, file, mime, caption, attributes, file_name)
|
|
else:
|
|
self.log.debug("Unhandled Matrix event: %s", message)
|
|
return
|
|
self.db.add(
|
|
DBMessage(tgid=response.id, mx_room=self.mxid, mxid=event_id, user=sender.tgid))
|
|
self.db.commit()
|
|
|
|
def handle_matrix_deletion(self, deleter, event_id):
|
|
message = DBMessage.query.filter(DBMessage.mxid == event_id and
|
|
DBMessage.user == deleter.tgid and
|
|
DBMessage.mx_room == self.mxid).one_or_none()
|
|
if not message:
|
|
return
|
|
deleter.client.delete_messages(self.peer, [message.tgid])
|
|
|
|
def handle_matrix_power_levels(self, sender, new_users, old_users):
|
|
for user, level in new_users.items():
|
|
puppet_match = p.Puppet.mxid_regex.search(user)
|
|
if puppet_match:
|
|
user_id = int(puppet_match.group(1))
|
|
else:
|
|
mx_user = u.User.get_by_mxid(user, create=False)
|
|
if not mx_user or not mx_user.tgid:
|
|
continue
|
|
user_id = mx_user.tgid
|
|
if user not in old_users or level != old_users[user]:
|
|
sender.client(
|
|
EditChatAdminRequest(chat_id=self.tgid, user_id=user_id, is_admin=level >= 50))
|
|
|
|
# endregion
|
|
# region Telegram chat info updating
|
|
|
|
def _get_telegram_users_in_matrix_room(self):
|
|
user_tgids = set()
|
|
user_mxids = self.main_intent.get_room_members(self.mxid, ("join", "invite"))
|
|
for user in user_mxids:
|
|
if user == self.az.intent.mxid:
|
|
continue
|
|
mx_user = u.User.get_by_mxid(user, create=False)
|
|
if mx_user and mx_user.tgid:
|
|
user_tgids.add(mx_user.tgid)
|
|
puppet_match = p.Puppet.mxid_regex.match(user)
|
|
if puppet_match:
|
|
user_tgids.add(int(puppet_match.group(1)))
|
|
return user_tgids
|
|
|
|
def create_telegram_chat(self, source, supergroup=False):
|
|
if not self.mxid:
|
|
raise ValueError("Can't create Telegram chat for portal without Matrix room.")
|
|
elif self.tgid:
|
|
raise ValueError("Can't create Telegram chat for portal with existing Telegram chat.")
|
|
|
|
invites = self._get_telegram_users_in_matrix_room()
|
|
if len(invites) < 2:
|
|
# TODO when we get the option for a bot, this won't happen when the bot is activated.
|
|
raise ValueError("Not enough Telegram users to create a chat")
|
|
|
|
invites = [source.client.get_input_entity(id) for id in invites]
|
|
|
|
if self.peer_type == "chat":
|
|
updates = source.client(CreateChatRequest(title=self.title, users=invites))
|
|
elif self.peer_type == "channel":
|
|
updates = source.client(CreateChannelRequest(title=self.title, megagroup=supergroup))
|
|
# TODO invite people
|
|
else:
|
|
raise ValueError("Invalid peer type for Telegram chat creation")
|
|
|
|
entity = updates.chats[0]
|
|
self.tgid = entity.id
|
|
self.tg_receiver = self.tgid
|
|
self.update_info(source, entity)
|
|
self.save()
|
|
|
|
def invite_telegram(self, source, puppet):
|
|
if self.peer_type == "chat":
|
|
source.client(AddChatUserRequest(chat_id=self.tgid, user_id=puppet.tgid, fwd_limit=0))
|
|
elif self.peer_type == "channel":
|
|
source.client(InviteToChannelRequest(channel=self.peer,
|
|
users=[InputUser(user_id=puppet.tgid)],
|
|
fwd_limit=0))
|
|
else:
|
|
raise ValueError("Invalid peer type for Telegram user invite")
|
|
|
|
# endregion
|
|
# region Telegram event handling
|
|
|
|
def handle_telegram_typing(self, user, event):
|
|
if self.mxid:
|
|
user.intent.set_typing(self.mxid, is_typing=True)
|
|
|
|
def handle_telegram_photo(self, source, sender, media):
|
|
largest_size = self.get_largest_photo_size(media.photo)
|
|
file = source.download_file(largest_size.location)
|
|
mime_type = magic.from_buffer(file, mime=True)
|
|
uploaded = sender.intent.upload_file(file, mime_type)
|
|
info = {
|
|
"h": largest_size.h,
|
|
"w": largest_size.w,
|
|
"size": len(largest_size.bytes) if (
|
|
isinstance(largest_size, PhotoCachedSize)) else largest_size.size,
|
|
"orientation": 0,
|
|
"mimetype": mime_type,
|
|
}
|
|
name = media.caption
|
|
sender.intent.set_typing(self.mxid, is_typing=False)
|
|
return sender.intent.send_image(self.mxid, uploaded["content_uri"], info=info, text=name)
|
|
|
|
@staticmethod
|
|
def convert_webp(file, to="png"):
|
|
try:
|
|
image = Image.open(BytesIO(file)).convert("RGBA")
|
|
new_file = BytesIO()
|
|
image.save(new_file, to)
|
|
return f"image/{to}", new_file.getvalue()
|
|
except:
|
|
return "image/webp", file
|
|
|
|
def handle_telegram_document(self, source, sender, media):
|
|
file = source.download_file(media.document)
|
|
mime_type = magic.from_buffer(file, mime=True)
|
|
dont_change_mime = False
|
|
if mime_type == "image/webp":
|
|
mime_type, file = self.convert_webp(file, to="png")
|
|
dont_change_mime = True
|
|
uploaded = sender.intent.upload_file(file, mime_type)
|
|
name = media.caption
|
|
for attr in media.document.attributes:
|
|
if not name and isinstance(attr, DocumentAttributeFilename):
|
|
name = attr.file_name
|
|
if not dont_change_mime:
|
|
(mime_from_name, _) = mimetypes.guess_type(name)
|
|
mime_type = mime_from_name or mime_type
|
|
elif isinstance(attr, DocumentAttributeSticker):
|
|
name = f"Sticker for {attr.alt}"
|
|
mime_type = media.document.mime_type or mime_type
|
|
info = {
|
|
"size": media.document.size,
|
|
"mimetype": mime_type,
|
|
}
|
|
type = "m.file"
|
|
if mime_type.startswith("video/"):
|
|
type = "m.video"
|
|
elif mime_type.startswith("audio/"):
|
|
type = "m.audio"
|
|
elif mime_type.startswith("image/"):
|
|
type = "m.image"
|
|
sender.intent.set_typing(self.mxid, is_typing=False)
|
|
return sender.intent.send_file(self.mxid, uploaded["content_uri"], info=info, text=name,
|
|
type=type)
|
|
|
|
def handle_telegram_location(self, source, sender, location):
|
|
long = location.long
|
|
lat = location.lat
|
|
long_char = "E" if long > 0 else "W"
|
|
lat_char = "N" if lat > 0 else "S"
|
|
rounded_long = abs(round(long * 100000) / 100000)
|
|
rounded_lat = abs(round(lat * 100000) / 100000)
|
|
|
|
body = f"{rounded_lat}° {lat_char}, {rounded_long}° {long_char}"
|
|
|
|
url = f"https://maps.google.com/?q={lat},{long}"
|
|
|
|
formatted_body = f"Location: <a href='{url}'>{body}</a>"
|
|
# At least Riot ignores formatting in m.location messages, so we'll add a plaintext link.
|
|
body = f"Location: {body}\n{url}"
|
|
|
|
return sender.intent.send_message(self.mxid, {
|
|
"msgtype": "m.location",
|
|
"geo_uri": f"geo:{lat},{long}",
|
|
"body": body,
|
|
"format": "org.matrix.custom.html",
|
|
"formatted_body": formatted_body,
|
|
})
|
|
|
|
def handle_telegram_text(self, source, sender, evt):
|
|
self.log.debug(f"Sending {evt.message} to {self.mxid} by {sender.id}")
|
|
text, html = formatter.telegram_event_to_matrix(evt, source)
|
|
sender.intent.set_typing(self.mxid, is_typing=False)
|
|
return sender.intent.send_text(self.mxid, text, html=html)
|
|
|
|
def handle_telegram_message(self, source, sender, evt):
|
|
if not self.mxid:
|
|
self.create_matrix_room(source, invites=[source.mxid])
|
|
|
|
if evt.message:
|
|
response = self.handle_telegram_text(source, sender, evt)
|
|
elif evt.media:
|
|
if isinstance(evt.media, MessageMediaPhoto):
|
|
response = self.handle_telegram_photo(source, sender, evt.media)
|
|
elif isinstance(evt.media, MessageMediaDocument):
|
|
response = self.handle_telegram_document(source, sender, evt.media)
|
|
elif isinstance(evt.media, MessageMediaGeo):
|
|
response = self.handle_telegram_location(source, sender, evt.media.geo)
|
|
else:
|
|
self.log.debug("Unhandled Telegram media: %s", evt.media)
|
|
return
|
|
else:
|
|
self.log.debug("Unhandled Telegram message: %s", evt)
|
|
return
|
|
|
|
self.db.add(DBMessage(tgid=evt.id, mx_room=self.mxid, mxid=response["event_id"],
|
|
user=source.tgid))
|
|
self.db.commit()
|
|
|
|
def handle_telegram_action(self, source, sender, action):
|
|
if not self.mxid:
|
|
create_and_exit = (MessageActionChatCreate, MessageActionChannelCreate)
|
|
create_and_continue = (MessageActionChatAddUser, MessageActionChatJoinedByLink)
|
|
if isinstance(action, create_and_exit + create_and_continue):
|
|
self.create_matrix_room(source, invites=[source.mxid])
|
|
if not isinstance(action, create_and_continue):
|
|
return
|
|
|
|
if isinstance(action, MessageActionChatEditTitle):
|
|
if self.update_title(action.title, self.main_intent):
|
|
self.save()
|
|
elif isinstance(action, MessageActionChatEditPhoto):
|
|
largest_size = self.get_largest_photo_size(action.photo)
|
|
if self.update_avatar(source, largest_size.location, self.main_intent):
|
|
self.save()
|
|
elif isinstance(action, MessageActionChatAddUser):
|
|
for user_id in action.users:
|
|
self.add_telegram_user(user_id, source)
|
|
elif isinstance(action, MessageActionChatJoinedByLink):
|
|
self.add_telegram_user(sender.id, source)
|
|
elif isinstance(action, MessageActionChatDeleteUser):
|
|
kick_message = None
|
|
if sender.id != action.user_id:
|
|
kick_message = f"Kicked by {sender.displayname}"
|
|
self.delete_telegram_user(action.user_id, kick_message)
|
|
elif isinstance(action, MessageActionChatMigrateTo):
|
|
self.peer_type = "channel"
|
|
self.migrate_and_save(action.channel_id)
|
|
sender.intent.send_emote(self.mxid, "upgraded this group to a supergroup.")
|
|
else:
|
|
self.log.debug("Unhandled Telegram action in %s: %s", self.title, action)
|
|
|
|
def set_telegram_admin(self, puppet, user):
|
|
levels = self.main_intent.get_power_levels(self.mxid)
|
|
if user:
|
|
levels["users"][user.mxid] = 50
|
|
if puppet:
|
|
levels["users"][puppet.mxid] = 50
|
|
self.main_intent.set_power_levels(self.mxid, levels)
|
|
|
|
def update_telegram_participants(self, participants):
|
|
levels = self.main_intent.get_power_levels(self.mxid)
|
|
levels["events"]["m.room.power_levels"] = 50
|
|
for participant in participants:
|
|
puppet = p.Puppet.get(participant.user_id)
|
|
user = u.User.get_by_tgid(participant.user_id)
|
|
new_level = 0
|
|
if isinstance(participant, ChatParticipantAdmin):
|
|
new_level = 50
|
|
elif isinstance(participant, ChatParticipantCreator):
|
|
new_level = 95
|
|
if user and (user.mxid in levels["users"] or new_level > 0):
|
|
levels["users"][user.mxid] = new_level
|
|
if puppet and (puppet.mxid in levels["users"] or new_level > 0):
|
|
levels["users"][puppet.mxid] = new_level
|
|
self.main_intent.set_power_levels(self.mxid, levels)
|
|
|
|
def set_telegram_admins_enabled(self, enabled):
|
|
level = 50 if enabled else 10
|
|
levels = self.main_intent.get_power_levels(self.mxid)
|
|
print(levels)
|
|
levels["invite"] = level
|
|
levels["events"]["m.room.name"] = level
|
|
levels["events"]["m.room.avatar"] = level
|
|
self.main_intent.set_power_levels(self.mxid, levels)
|
|
|
|
# endregion
|
|
# region Database conversion
|
|
|
|
def to_db(self):
|
|
return self.db.merge(
|
|
DBPortal(tgid=self.tgid, tg_receiver=self.tg_receiver, peer_type=self.peer_type,
|
|
mxid=self.mxid, username=self.username, title=self.title,
|
|
photo_id=self.photo_id))
|
|
|
|
def migrate_and_save(self, new_id):
|
|
existing = DBPortal.query.get(self.tgid_full)
|
|
if existing:
|
|
self.db.object_session(existing).delete(existing)
|
|
self.by_tgid[self.tgid_full] = None
|
|
self.tgid = new_id
|
|
self.by_tgid[self.tgid_full] = self
|
|
self.save()
|
|
|
|
def save(self):
|
|
self.to_db()
|
|
self.db.commit()
|
|
|
|
def delete(self):
|
|
self.db.delete(self.to_db())
|
|
|
|
@classmethod
|
|
def from_db(cls, db_portal):
|
|
return Portal(tgid=db_portal.tgid, tg_receiver=db_portal.tg_receiver,
|
|
peer_type=db_portal.peer_type, mxid=db_portal.mxid,
|
|
username=db_portal.username, title=db_portal.title,
|
|
photo_id=db_portal.photo_id)
|
|
|
|
# endregion
|
|
# region Class instance lookup
|
|
|
|
@classmethod
|
|
def get_by_mxid(cls, mxid):
|
|
try:
|
|
return cls.by_mxid[mxid]
|
|
except KeyError:
|
|
pass
|
|
|
|
portal = DBPortal.query.filter(DBPortal.mxid == mxid).one_or_none()
|
|
if portal:
|
|
return cls.from_db(portal)
|
|
|
|
return None
|
|
|
|
@classmethod
|
|
def get_by_tgid(cls, tgid, tg_receiver=None, peer_type=None):
|
|
tg_receiver = tg_receiver or tgid
|
|
tgid_full = (tgid, tg_receiver)
|
|
try:
|
|
return cls.by_tgid[tgid_full]
|
|
except KeyError:
|
|
pass
|
|
|
|
portal = DBPortal.query.get(tgid_full)
|
|
if portal:
|
|
return cls.from_db(portal)
|
|
|
|
if peer_type:
|
|
portal = Portal(tgid, peer_type=peer_type, tg_receiver=tg_receiver)
|
|
cls.db.add(portal.to_db())
|
|
portal.save()
|
|
return portal
|
|
|
|
return None
|
|
|
|
@classmethod
|
|
def get_by_entity(cls, entity, receiver_id=None):
|
|
entity_type = type(entity)
|
|
if entity_type in {Chat, ChatFull}:
|
|
type_name = "chat"
|
|
id = entity.id
|
|
elif entity_type in {PeerChat, InputPeerChat}:
|
|
type_name = "chat"
|
|
id = entity.chat_id
|
|
elif entity_type in {Channel, ChannelFull}:
|
|
type_name = "channel"
|
|
id = entity.id
|
|
elif entity_type in {PeerChannel, InputPeerChannel, InputChannel}:
|
|
type_name = "channel"
|
|
id = entity.channel_id
|
|
elif entity_type in {User, UserFull}:
|
|
type_name = "user"
|
|
id = entity.id
|
|
elif entity_type in {PeerUser, InputPeerUser, InputUser}:
|
|
type_name = "user"
|
|
id = entity.user_id
|
|
else:
|
|
raise ValueError(f"Unknown entity type {entity_type.__name__}")
|
|
return cls.get_by_tgid(id, receiver_id if type_name == "user" else id, type_name)
|
|
|
|
# endregion
|
|
|
|
|
|
def init(context):
|
|
global config
|
|
Portal.az, Portal.db, log, config = context
|
|
Portal.log = log.getChild("portal")
|