# mautrix-telegram - A Matrix-Telegram puppeting bridge # Copyright (C) 2022 Tulir Asokan # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from typing import Any, Literal, TypedDict from pathlib import Path import argparse import asyncio import io import json import logging import math import mimetypes import pickle import random import string from lottie.exporters import export_tgs from lottie.exporters.cairo import export_png from lottie.exporters.tgs_validator import Severity, TgsValidator from lottie.importers.svg import import_svg from lottie.objects import Animation from lottie.utils.stripper import float_strip from PIL import Image from telethon import TelegramClient from telethon.custom import Conversation, Message from telethon.tl.functions.messages import GetStickerSetRequest from telethon.tl.types import ( Document, DocumentAttributeCustomEmoji, DocumentAttributeFilename, DocumentAttributeImageSize, InputMediaUploadedDocument, InputStickerSetShortName, ) import aiohttp mimetypes.add_type("image/webp", ".webp") parser = argparse.ArgumentParser(description="mautrix-telegram unicode emoji packer") parser.add_argument( "-i", "--api-id", type=int, required=True, metavar="", help="Telegram API ID" ) parser.add_argument( "-a", "--api-hash", type=str, required=True, metavar="", help="Telegram API hash" ) parser.add_argument( "-s", "--session", type=str, default="unicodemojipacker.session", metavar="", help="Telethon session name", ) parser.add_argument( "-o", "--output", type=str, default="mautrix_telegram/unicodemojipack.json", metavar="", help="Path to save created emoji pack document IDs", ) parser.add_argument( "-f", "--font-directory", type=Path, required=True, metavar="", help="Path to the Noto color emoji files", ) parser.add_argument( "-m", "--media-directory", type=Path, required=True, metavar="", help="Path to save converted tgs and webp emoji files", ) args = parser.parse_args() font_dir: Path = args.font_directory media_dir: Path = args.media_directory EMOJI_DATA_URL = "https://raw.githubusercontent.com/iamcal/emoji-data/master/emoji.json" def unified_to_unicode(unified: str) -> str: return ( "".join(rf"\U{chunk:0>8}" for chunk in unified.split("-")) .encode("ascii") .decode("unicode_escape") ) def tag_to_str(unified: str) -> str: return "".join(chr(int(x.removeprefix("E00"), 16)) for x in unified.split("-")) EmojiType = Literal["webp", "tgs"] PackType = Literal["Animated emoji", "Static emoji"] class Emoji(TypedDict): hex: str emoji: str type: EmojiType filename: str class EmojiData(TypedDict): tgs: list[Emoji] webp: list[Emoji] def parse_emoji_data(tone: dict[str, Any], emoji: dict[str, Any]) -> Emoji: hex = (tone["non_qualified"] or tone["unified"]).replace("-FE0F", "") filename_hex = hex.replace("-", "_").lower() filename = f"svg/emoji_u{filename_hex}.svg" if emoji["category"] == "Flags" and emoji["subcategory"] in ( "country-flag", "subdivision-flag", ): filename = f"third_party/region-flags/waved-svg/emoji_u{filename_hex}.svg" with (font_dir / filename).open() as f: lot: Animation = import_svg(f) float_strip(lot) lot.tgs_sanitize() output = io.BytesIO() export_tgs(lot, output) validator = TgsValidator() validator(lot) validator.check_size(len(output.getvalue())) errors = [err for err in validator.errors if err.severity != Severity.Note] if errors or ("region-flags" in filename and len(output.getvalue()) > 32768): lot.scale(100, 100) png_out = io.BytesIO() export_png(lot, png_out) img = Image.open(png_out) output = io.BytesIO() output.name = "image.webp" img.save(output, "webp") media_type: EmojiType = "webp" else: media_type: EmojiType = "tgs" path = media_dir / f"{filename_hex}.{media_type}" with path.open("wb") as f: f.write(output.getvalue()) print( "Converted", filename, "->", path.name, "//" if errors else "", "\n".join(map(str, errors)) ) return { "hex": hex, "emoji": unified_to_unicode(tone["unified"]), "type": media_type, "filename": path.name, } async def load_emoji_data() -> EmojiData: cache_path = media_dir / "conversion-cache.json" try: with cache_path.open() as f: return json.load(f) except FileNotFoundError: pass async with aiohttp.ClientSession() as sess, sess.get(EMOJI_DATA_URL) as resp: raw_emoji_data = sorted( await resp.json(content_type=None), key=lambda dat: dat["sort_order"], ) tgs_emoji = [] webp_emoji = [] for emoji in raw_emoji_data: for tone in (emoji, *emoji.get("skin_variations", {}).values()): parsed_emoji = parse_emoji_data(tone, emoji) if parsed_emoji["type"] == "tgs": tgs_emoji.append(parsed_emoji) else: webp_emoji.append(parsed_emoji) full_data = {"tgs": tgs_emoji, "webp": webp_emoji} with cache_path.open("w") as f: json.dump(full_data, f, ensure_ascii=False) return full_data async def create_pack(conv: Conversation, name: str, pack_type: str) -> None: await conv.send_message("/newemojipack") resp: Message = await conv.get_response() assert "A new set of custom emoji" in resp.raw_text assert "Please choose the type" in resp.raw_text await conv.send_message(pack_type) resp = await conv.get_response() if pack_type == "Animated emoji": assert "When ready to upload, tell me the name of your set." in resp.raw_text else: assert "Now choose a name for your set." in resp.raw_text await conv.send_message(name) resp = await conv.get_response() if pack_type == "Animated emoji": assert "Now send me the first animated emoji" in resp.raw_text else: assert "Now send me the custom emoji" in resp.raw_text async def publish_pack(conv: Conversation, shortname: str) -> None: await conv.send_message("/publish") resp: Message = await conv.get_response() assert "You can send me a custom emoji from your emoji set" in resp.raw_text await conv.send_message("/skip") resp = await conv.get_response() assert "Please provide a short name for your emoji set" in resp.raw_text await conv.send_message(shortname) resp = await conv.get_response() assert "I've just published your emoji set" in resp.raw_text async def send_emoji( conv: Conversation, file: bytes | Path | InputMediaUploadedDocument, emoji: str ) -> None: await conv.send_file(file) resp: Message = await conv.get_response() assert "Send me a replacement emoji that corresponds to your custom emoji" in resp.raw_text await conv.send_message(emoji) resp = await conv.get_response() if "Sorry, too many attempts" in resp.raw_text: print(resp.raw_text) input("Press enter to continue") await conv.send_message(emoji) resp = await conv.get_response() while "Please send an emoji that best describes your custom emoji." in resp.raw_text: emoji = input(f"{emoji} was rejected, provide replacement: ") await conv.send_message(emoji) resp = await conv.get_response() assert "Congratulations" in resp.raw_text class CachedPack(TypedDict): name: str short_name: str part: int type: PackType published: bool collected: bool emojis: list[Emoji] class CachedData(TypedDict): packs: list[CachedPack] def _split_packs_int( emoji_list: list[Emoji], pack_type: PackType, current_part: int, total_parts: int ) -> tuple[list[CachedPack], int]: packs = [] current_pack: CachedPack | None = None for i, emoji in enumerate(emoji_list): if i % 200 == 0: current_part += 1 random_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=8)) short_name = f"mxtg_unicodemoji_{random_id}" name = f"mautrix-telegram unicodemoji ({current_part}/{total_parts})" current_pack = { "type": pack_type, "short_name": short_name, "part": current_part, "name": name, "published": False, "collected": False, "emojis": [], } packs.append(current_pack) current_pack["emojis"].append(emoji) return packs, current_part def split_packs(emoji_data: EmojiData) -> list[CachedPack]: total_parts = math.ceil(len(emoji_data["tgs"]) / 200) + math.ceil( len(emoji_data["webp"]) / 200 ) current_part = 0 animated_packs, current_part = _split_packs_int( emoji_data["tgs"], "Animated emoji", current_part, total_parts ) static_packs, current_part = _split_packs_int( emoji_data["webp"], "Static emoji", current_part, total_parts ) return animated_packs + static_packs async def create_and_fill_pack( client: TelegramClient, conv: Conversation, pack: CachedPack ) -> None: if pack["short_name"] == "mxtg_unicodemoji_xvzs6743": print("Continuing pack", pack["name"]) else: print("Creating pack", pack["name"]) await create_pack(conv, pack["name"], pack["type"]) total = len(pack["emojis"]) for i, emoji in enumerate(pack["emojis"]): if pack["short_name"] == "mxtg_unicodemoji_xvzs6743" and i < 87: continue print(f"Adding emoji {i+1}/{total}", emoji["hex"], emoji["emoji"]) emoji_file = media_dir / emoji["filename"] if emoji["type"] == "webp": attrs = [ DocumentAttributeImageSize(w=100, h=100), DocumentAttributeFilename(file_name="image.webp"), ] with emoji_file.open("rb") as f: file_handle = await client.upload_file(f, file_name="emoji.webp") emoji_file = InputMediaUploadedDocument( file_handle, mime_type="image/webp", attributes=attrs ) await send_emoji(conv, emoji_file, emoji["emoji"]) await asyncio.sleep(2) print("Publishing pack", pack["short_name"]) await publish_pack(conv, pack["short_name"]) async def main(): logging.basicConfig(level=logging.INFO) emoji_data = await load_emoji_data() split_cache = media_dir / "split-cache.json" try: with split_cache.open() as f: packs: list[CachedPack] = json.load(f) except FileNotFoundError: packs = split_packs(emoji_data) with split_cache.open("w") as f: json.dump(packs, f) doc_id_file = Path(args.output) try: with doc_id_file.open() as f: doc_ids = json.load(f) except FileNotFoundError: doc_ids = {} client = TelegramClient(args.session, args.api_id, args.api_hash, flood_sleep_threshold=3600) await client.start() async with client.conversation("Stickers", max_messages=20000) as conv: for pack in packs: if not pack["published"]: await create_and_fill_pack(client, conv, pack) pack["published"] = True with split_cache.open("w") as f: json.dump(packs, f, ensure_ascii=False) if not pack["collected"] or True: print("Collecting document IDs from pack", pack["short_name"]) stickers = await client( GetStickerSetRequest(InputStickerSetShortName(pack["short_name"]), 0) ) doc: Document for i, doc in enumerate(stickers.documents): attr = next( attr for attr in doc.attributes if isinstance(attr, DocumentAttributeCustomEmoji) ) base_emoji = attr.alt.replace("\ufe0f", "") emoji = pack["emojis"][i]["emoji"].replace("\ufe0f", "") doc_ids[emoji] = doc.id print(f"Mapped {emoji} (fallback: {base_emoji}) -> {doc_ids[emoji]}") pack["collected"] = True with split_cache.open("w") as f: json.dump(packs, f, ensure_ascii=False) with doc_id_file.open("w") as f: json.dump(doc_ids, f, ensure_ascii=False) print("Pack completed") await asyncio.sleep(5) with open(args.output.replace(".json", ".pickle"), "wb") as f: pickle.dump(doc_ids, f) print("Wrote pickle") asyncio.run(main())