Files
T
2018-02-16 17:39:24 +02:00

183 lines
5.9 KiB
Python

# -*- coding: future_fstrings -*-
# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2018 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from difflib import SequenceMatcher
import re
import logging
from telethon.tl.types import UserProfilePhoto, PeerUser
from telethon.errors.rpc_error_list import LocationInvalidError
from .db import Puppet as DBPuppet
config = None
class Puppet:
log = logging.getLogger("mau.puppet")
db = None
az = None
mxid_regex = None
cache = {}
def __init__(self, id=None, username=None, displayname=None, photo_id=None):
self.id = id
self.localpart = config.get("bridge.username_template", "telegram_{userid}").format(
userid=self.id)
hs = config["homeserver"]["domain"]
self.mxid = f"@{self.localpart}:{hs}"
self.username = username
self.displayname = displayname
self.photo_id = photo_id
self.intent = self.az.intent.user(self.mxid)
self.cache[id] = self
@property
def tgid(self):
return self.id
def to_db(self):
return self.db.merge(
DBPuppet(id=self.id, username=self.username, displayname=self.displayname,
photo_id=self.photo_id))
@classmethod
def from_db(cls, db_puppet):
return Puppet(db_puppet.id, db_puppet.username, db_puppet.displayname, db_puppet.photo_id)
def save(self):
self.to_db()
self.db.commit()
def similarity(self, query):
username_similarity = (SequenceMatcher(None, self.username, query).ratio()
if self.username else 0)
displayname_similarity = (SequenceMatcher(None, self.displayname, query).ratio()
if self.displayname else 0)
similarity = max(username_similarity, displayname_similarity)
return round(similarity * 1000) / 10
@staticmethod
def get_displayname(info, format=True):
data = {
"phone number": info.phone if hasattr(info, "phone") else None,
"username": info.username,
"full name": " ".join([info.first_name or "", info.last_name or ""]).strip(),
"full name reversed": " ".join([info.first_name or "", info.last_name or ""]).strip(),
"first name": info.first_name,
"last name": info.last_name,
}
preferences = config.get("bridge.displayname_preference",
["full name", "username", "phone"])
name = None
for preference in preferences:
name = data[preference]
if name:
break
if not name:
name = info.id
if not format:
return name
return config.get("bridge.displayname_template", "{displayname} (Telegram)").format(
displayname=name)
async def update_info(self, source, info):
changed = False
if self.username != info.username:
self.username = info.username
changed = True
changed = await self.update_displayname(source, info) or changed
if isinstance(info.photo, UserProfilePhoto):
changed = await self.update_avatar(source, info.photo.photo_big) or changed
if changed:
self.save()
async def update_displayname(self, source, info):
displayname = self.get_displayname(info)
if displayname != self.displayname:
await self.intent.set_display_name(displayname)
self.displayname = displayname
return True
async def update_avatar(self, source, photo):
photo_id = f"{photo.volume_id}-{photo.local_id}"
if self.photo_id != photo_id:
try:
file = await source.client.download_file_bytes(photo)
except LocationInvalidError:
return False
uploaded = await self.intent.upload_file(file)
await self.intent.set_avatar(uploaded["content_uri"])
self.photo_id = photo_id
return True
return False
@classmethod
def get(cls, id, create=True):
try:
return cls.cache[id]
except KeyError:
pass
puppet = DBPuppet.query.get(id)
if puppet:
return cls.from_db(puppet)
if create:
puppet = cls(id)
cls.db.add(puppet.to_db())
cls.db.commit()
return puppet
return None
@classmethod
def get_by_mxid(cls, mxid, create=True):
tgid = cls.get_id_from_mxid(mxid)
return cls.get(tgid, create) if tgid else None
@classmethod
def get_id_from_mxid(cls, mxid):
match = cls.mxid_regex.match(mxid)
if match:
return int(match.group(1))
return None
@classmethod
def find_by_username(cls, username):
for _, puppet in cls.cache.items():
if puppet.username == username:
return puppet
puppet = DBPuppet.query.filter(DBPuppet.username == username).one_or_none()
if puppet:
return cls.from_db(puppet)
return None
def init(context):
global config
Puppet.az, Puppet.db, config, _ = context
localpart = config.get("bridge.username_template", "telegram_{userid}").format(userid="(.+)")
hs = config["homeserver"]["domain"]
Puppet.mxid_regex = re.compile(f"@{localpart}:{hs}")