Files
mautrix-telegram/mautrix_telegram/puppet.py
T
Tulir Asokan 1f5261ff8f Initial solution and database update for #11
The database now contains a displayname_source field which is the
telegram user ID of the user whose point of view the displayname
is from.

Updates from the relaybot user always take precendence, but currently
the relaybot will never automatically fetch displaynames.
2018-05-19 17:22:16 +03:00

236 lines
7.8 KiB
Python

# -*- coding: future_fstrings -*-
# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2018 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from difflib import SequenceMatcher
import re
import logging
from telethon.tl.types import UserProfilePhoto
from telethon.errors.rpc_error_list import LocationInvalidError
from .db import Puppet as DBPuppet
from . import util
config = None
class Puppet:
log = logging.getLogger("mau.puppet")
db = None
az = None
mxid_regex = None
username_template = None
hs_domain = None
cache = {}
def __init__(self, id=None, username=None, displayname=None, displayname_source=None,
photo_id=None, is_bot=None, db_instance=None):
self.id = id
self.mxid = self.get_mxid_from_id(self.id)
self.username = username
self.displayname = displayname
self.displayname_source = displayname_source
self.photo_id = photo_id
self.is_bot = is_bot
self._db_instance = db_instance
self.intent = self.az.intent.user(self.mxid)
self.logged_in = True
self.cache[id] = self
@property
def tgid(self):
return self.id
@property
def db_instance(self):
if not self._db_instance:
self._db_instance = self.new_db_instance()
return self._db_instance
def new_db_instance(self):
return DBPuppet(id=self.id, username=self.username, displayname=self.displayname,
displayname_source=self.displayname_source, photo_id=self.photo_id,
is_bot=self.is_bot)
@classmethod
def from_db(cls, db_puppet):
return Puppet(db_puppet.id, db_puppet.username, db_puppet.displayname,
db_puppet.displayname_source, db_puppet.photo_id, db_puppet.is_bot,
db_instance=db_puppet)
def save(self):
self.db_instance.username = self.username
self.db_instance.displayname = self.displayname
self.db_instance.displayname_source = self.displayname_source
self.db_instance.photo_id = self.photo_id
self.db_instance.is_bot = self.is_bot
self.db.commit()
def similarity(self, query):
username_similarity = (SequenceMatcher(None, self.username, query).ratio()
if self.username else 0)
displayname_similarity = (SequenceMatcher(None, self.displayname, query).ratio()
if self.displayname else 0)
similarity = max(username_similarity, displayname_similarity)
return round(similarity * 1000) / 10
@staticmethod
def get_displayname(info, format=True):
data = {
"phone number": info.phone if hasattr(info, "phone") else None,
"username": info.username,
"full name": " ".join([info.first_name or "", info.last_name or ""]).strip(),
"full name reversed": " ".join([info.first_name or "", info.last_name or ""]).strip(),
"first name": info.first_name,
"last name": info.last_name,
}
preferences = config.get("bridge.displayname_preference",
["full name", "username", "phone"])
name = None
for preference in preferences:
name = data[preference]
if name:
break
if info.deleted:
name = f"Deleted account {info.id}"
elif not name:
name = info.id
if not format:
return name
return config.get("bridge.displayname_template", "{displayname} (Telegram)").format(
displayname=name)
async def update_info(self, source, info):
changed = False
if self.username != info.username:
self.username = info.username
changed = True
changed = await self.update_displayname(source, info) or changed
if isinstance(info.photo, UserProfilePhoto):
changed = await self.update_avatar(source, info.photo.photo_big) or changed
self.is_bot = info.bot
if changed:
self.save()
async def update_displayname(self, source, info):
ignore_source = (not source.is_relaybot
and self.displayname_source is not None
and self.displayname_source != source.tgid)
if ignore_source:
return
displayname = self.get_displayname(info)
if displayname != self.displayname:
await self.intent.set_display_name(displayname)
self.displayname = displayname
self.displayname_source = source.tgid
return True
elif source.is_relaybot or self.displayname_source is None:
self.displayname_source = source.tgid
return True
async def update_avatar(self, source, photo):
photo_id = f"{photo.volume_id}-{photo.local_id}"
if self.photo_id != photo_id:
file = await util.transfer_file_to_matrix(self.db, source.client, self.intent, photo)
if file:
await self.intent.set_avatar(file.mxc)
self.photo_id = photo_id
return True
return False
@classmethod
def get(cls, id, create=True):
try:
return cls.cache[id]
except KeyError:
pass
puppet = DBPuppet.query.get(id)
if puppet:
return cls.from_db(puppet)
if create:
puppet = cls(id)
cls.db.add(puppet.db_instance)
cls.db.commit()
return puppet
return None
@classmethod
def get_by_mxid(cls, mxid, create=True):
tgid = cls.get_id_from_mxid(mxid)
return cls.get(tgid, create) if tgid else None
@classmethod
def get_id_from_mxid(cls, mxid):
match = cls.mxid_regex.match(mxid)
if match:
return int(match.group(1))
return None
@classmethod
def get_mxid_from_id(cls, id):
return f"@{cls.username_template.format(userid=id)}:{cls.hs_domain}"
@classmethod
def find_by_username(cls, username):
if not username:
return None
for _, puppet in cls.cache.items():
if puppet.username and puppet.username.lower() == username.lower():
return puppet
puppet = DBPuppet.query.filter(DBPuppet.username == username).one_or_none()
if puppet:
return cls.from_db(puppet)
return None
@classmethod
def find_by_displayname(cls, displayname):
if not displayname:
return None
for _, puppet in cls.cache.items():
if puppet.displayname and puppet.displayname == displayname:
return puppet
puppet = DBPuppet.query.filter(DBPuppet.displayname == displayname).one_or_none()
if puppet:
return cls.from_db(puppet)
return None
def init(context):
global config
Puppet.az, Puppet.db, config, _, _ = context
Puppet.username_template = config.get("bridge.username_template", "telegram_{userid}")
Puppet.hs_domain = config["homeserver"]["domain"]
localpart = Puppet.username_template.format(userid="(.+)")
Puppet.mxid_regex = re.compile(f"@{localpart}:{Puppet.hs_domain}")