Allow reacting with any unicode emoji using custom pack
This commit is contained in:
@@ -0,0 +1,397 @@
|
||||
# mautrix-telegram - A Matrix-Telegram puppeting bridge
|
||||
# Copyright (C) 2022 Tulir Asokan
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
from typing import Any, Literal, TypedDict
|
||||
from pathlib import Path
|
||||
import argparse
|
||||
import asyncio
|
||||
import io
|
||||
import json
|
||||
import logging
|
||||
import math
|
||||
import mimetypes
|
||||
import pickle
|
||||
import random
|
||||
import string
|
||||
|
||||
from lottie.exporters import export_tgs
|
||||
from lottie.exporters.cairo import export_png
|
||||
from lottie.exporters.tgs_validator import Severity, TgsValidator
|
||||
from lottie.importers.svg import import_svg
|
||||
from lottie.objects import Animation
|
||||
from lottie.utils.stripper import float_strip
|
||||
from PIL import Image
|
||||
from telethon import TelegramClient
|
||||
from telethon.custom import Conversation, Message
|
||||
from telethon.tl.functions.messages import GetStickerSetRequest
|
||||
from telethon.tl.types import (
|
||||
Document,
|
||||
DocumentAttributeCustomEmoji,
|
||||
DocumentAttributeFilename,
|
||||
DocumentAttributeImageSize,
|
||||
InputMediaUploadedDocument,
|
||||
InputStickerSetShortName,
|
||||
)
|
||||
import aiohttp
|
||||
|
||||
mimetypes.add_type("image/webp", ".webp")
|
||||
|
||||
parser = argparse.ArgumentParser(description="mautrix-telegram unicode emoji packer")
|
||||
parser.add_argument(
|
||||
"-i", "--api-id", type=int, required=True, metavar="<api id>", help="Telegram API ID"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-a", "--api-hash", type=str, required=True, metavar="<api hash>", help="Telegram API hash"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-s",
|
||||
"--session",
|
||||
type=str,
|
||||
default="unicodemojipacker.session",
|
||||
metavar="<file name>",
|
||||
help="Telethon session name",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-o",
|
||||
"--output",
|
||||
type=str,
|
||||
default="mautrix_telegram/unicodemojipack.json",
|
||||
metavar="<file name>",
|
||||
help="Path to save created emoji pack document IDs",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-f",
|
||||
"--font-directory",
|
||||
type=Path,
|
||||
required=True,
|
||||
metavar="<directory path>",
|
||||
help="Path to the Noto color emoji files",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-m",
|
||||
"--media-directory",
|
||||
type=Path,
|
||||
required=True,
|
||||
metavar="<directory path>",
|
||||
help="Path to save converted tgs and webp emoji files",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
font_dir: Path = args.font_directory
|
||||
media_dir: Path = args.media_directory
|
||||
|
||||
EMOJI_DATA_URL = "https://raw.githubusercontent.com/iamcal/emoji-data/master/emoji.json"
|
||||
|
||||
|
||||
def unified_to_unicode(unified: str) -> str:
|
||||
return (
|
||||
"".join(rf"\U{chunk:0>8}" for chunk in unified.split("-"))
|
||||
.encode("ascii")
|
||||
.decode("unicode_escape")
|
||||
)
|
||||
|
||||
|
||||
def tag_to_str(unified: str) -> str:
|
||||
return "".join(chr(int(x.removeprefix("E00"), 16)) for x in unified.split("-"))
|
||||
|
||||
|
||||
EmojiType = Literal["webp", "tgs"]
|
||||
PackType = Literal["Animated emoji", "Static emoji"]
|
||||
|
||||
|
||||
class Emoji(TypedDict):
|
||||
hex: str
|
||||
emoji: str
|
||||
type: EmojiType
|
||||
filename: str
|
||||
|
||||
|
||||
class EmojiData(TypedDict):
|
||||
tgs: list[Emoji]
|
||||
webp: list[Emoji]
|
||||
|
||||
|
||||
def parse_emoji_data(tone: dict[str, Any], emoji: dict[str, Any]) -> Emoji:
|
||||
hex = (tone["non_qualified"] or tone["unified"]).replace("-FE0F", "")
|
||||
filename_hex = hex.replace("-", "_").lower()
|
||||
filename = f"svg/emoji_u{filename_hex}.svg"
|
||||
if emoji["category"] == "Flags" and emoji["subcategory"] in (
|
||||
"country-flag",
|
||||
"subdivision-flag",
|
||||
):
|
||||
filename = f"third_party/region-flags/waved-svg/emoji_u{filename_hex}.svg"
|
||||
|
||||
with (font_dir / filename).open() as f:
|
||||
lot: Animation = import_svg(f)
|
||||
float_strip(lot)
|
||||
lot.tgs_sanitize()
|
||||
|
||||
output = io.BytesIO()
|
||||
export_tgs(lot, output)
|
||||
|
||||
validator = TgsValidator()
|
||||
validator(lot)
|
||||
validator.check_size(len(output.getvalue()))
|
||||
errors = [err for err in validator.errors if err.severity != Severity.Note]
|
||||
if errors or ("region-flags" in filename and len(output.getvalue()) > 32768):
|
||||
lot.scale(100, 100)
|
||||
|
||||
png_out = io.BytesIO()
|
||||
export_png(lot, png_out)
|
||||
img = Image.open(png_out)
|
||||
output = io.BytesIO()
|
||||
output.name = "image.webp"
|
||||
img.save(output, "webp")
|
||||
|
||||
media_type: EmojiType = "webp"
|
||||
else:
|
||||
media_type: EmojiType = "tgs"
|
||||
path = media_dir / f"{filename_hex}.{media_type}"
|
||||
with path.open("wb") as f:
|
||||
f.write(output.getvalue())
|
||||
print(
|
||||
"Converted", filename, "->", path.name, "//" if errors else "", "\n".join(map(str, errors))
|
||||
)
|
||||
|
||||
return {
|
||||
"hex": hex,
|
||||
"emoji": unified_to_unicode(tone["unified"]),
|
||||
"type": media_type,
|
||||
"filename": path.name,
|
||||
}
|
||||
|
||||
|
||||
async def load_emoji_data() -> EmojiData:
|
||||
cache_path = media_dir / "conversion-cache.json"
|
||||
try:
|
||||
with cache_path.open() as f:
|
||||
return json.load(f)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
async with aiohttp.ClientSession() as sess, sess.get(EMOJI_DATA_URL) as resp:
|
||||
raw_emoji_data = sorted(
|
||||
await resp.json(content_type=None),
|
||||
key=lambda dat: dat["sort_order"],
|
||||
)
|
||||
tgs_emoji = []
|
||||
webp_emoji = []
|
||||
for emoji in raw_emoji_data:
|
||||
for tone in (emoji, *emoji.get("skin_variations", {}).values()):
|
||||
parsed_emoji = parse_emoji_data(tone, emoji)
|
||||
if parsed_emoji["type"] == "tgs":
|
||||
tgs_emoji.append(parsed_emoji)
|
||||
else:
|
||||
webp_emoji.append(parsed_emoji)
|
||||
full_data = {"tgs": tgs_emoji, "webp": webp_emoji}
|
||||
with cache_path.open("w") as f:
|
||||
json.dump(full_data, f, ensure_ascii=False)
|
||||
return full_data
|
||||
|
||||
|
||||
async def create_pack(conv: Conversation, name: str, pack_type: str) -> None:
|
||||
await conv.send_message("/newemojipack")
|
||||
resp: Message = await conv.get_response()
|
||||
assert "A new set of custom emoji" in resp.raw_text
|
||||
assert "Please choose the type" in resp.raw_text
|
||||
await conv.send_message(pack_type)
|
||||
resp = await conv.get_response()
|
||||
if pack_type == "Animated emoji":
|
||||
assert "When ready to upload, tell me the name of your set." in resp.raw_text
|
||||
else:
|
||||
assert "Now choose a name for your set." in resp.raw_text
|
||||
await conv.send_message(name)
|
||||
resp = await conv.get_response()
|
||||
if pack_type == "Animated emoji":
|
||||
assert "Now send me the first animated emoji" in resp.raw_text
|
||||
else:
|
||||
assert "Now send me the custom emoji" in resp.raw_text
|
||||
|
||||
|
||||
async def publish_pack(conv: Conversation, shortname: str) -> None:
|
||||
await conv.send_message("/publish")
|
||||
|
||||
resp: Message = await conv.get_response()
|
||||
assert "You can send me a custom emoji from your emoji set" in resp.raw_text
|
||||
await conv.send_message("/skip")
|
||||
|
||||
resp = await conv.get_response()
|
||||
assert "Please provide a short name for your emoji set" in resp.raw_text
|
||||
await conv.send_message(shortname)
|
||||
|
||||
resp = await conv.get_response()
|
||||
assert "I've just published your emoji set" in resp.raw_text
|
||||
|
||||
|
||||
async def send_emoji(
|
||||
conv: Conversation, file: bytes | Path | InputMediaUploadedDocument, emoji: str
|
||||
) -> None:
|
||||
await conv.send_file(file)
|
||||
resp: Message = await conv.get_response()
|
||||
assert "Send me a replacement emoji that corresponds to your custom emoji" in resp.raw_text
|
||||
await conv.send_message(emoji)
|
||||
resp = await conv.get_response()
|
||||
if "Sorry, too many attempts" in resp.raw_text:
|
||||
print(resp.raw_text)
|
||||
input("Press enter to continue")
|
||||
await conv.send_message(emoji)
|
||||
resp = await conv.get_response()
|
||||
while "Please send an emoji that best describes your custom emoji." in resp.raw_text:
|
||||
emoji = input(f"{emoji} was rejected, provide replacement: ")
|
||||
await conv.send_message(emoji)
|
||||
resp = await conv.get_response()
|
||||
assert "Congratulations" in resp.raw_text
|
||||
|
||||
|
||||
class CachedPack(TypedDict):
|
||||
name: str
|
||||
short_name: str
|
||||
part: int
|
||||
type: PackType
|
||||
published: bool
|
||||
collected: bool
|
||||
emojis: list[Emoji]
|
||||
|
||||
|
||||
class CachedData(TypedDict):
|
||||
packs: list[CachedPack]
|
||||
|
||||
|
||||
def _split_packs_int(
|
||||
emoji_list: list[Emoji], pack_type: PackType, current_part: int, total_parts: int
|
||||
) -> tuple[list[CachedPack], int]:
|
||||
packs = []
|
||||
current_pack: CachedPack | None = None
|
||||
for i, emoji in enumerate(emoji_list):
|
||||
if i % 200 == 0:
|
||||
current_part += 1
|
||||
random_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=8))
|
||||
short_name = f"mxtg_unicodemoji_{random_id}"
|
||||
name = f"mautrix-telegram unicodemoji ({current_part}/{total_parts})"
|
||||
current_pack = {
|
||||
"type": pack_type,
|
||||
"short_name": short_name,
|
||||
"part": current_part,
|
||||
"name": name,
|
||||
"published": False,
|
||||
"collected": False,
|
||||
"emojis": [],
|
||||
}
|
||||
packs.append(current_pack)
|
||||
current_pack["emojis"].append(emoji)
|
||||
return packs, current_part
|
||||
|
||||
|
||||
def split_packs(emoji_data: EmojiData) -> list[CachedPack]:
|
||||
total_parts = math.ceil(len(emoji_data["tgs"]) / 200) + math.ceil(
|
||||
len(emoji_data["webp"]) / 200
|
||||
)
|
||||
current_part = 0
|
||||
animated_packs, current_part = _split_packs_int(
|
||||
emoji_data["tgs"], "Animated emoji", current_part, total_parts
|
||||
)
|
||||
static_packs, current_part = _split_packs_int(
|
||||
emoji_data["webp"], "Static emoji", current_part, total_parts
|
||||
)
|
||||
return animated_packs + static_packs
|
||||
|
||||
|
||||
async def create_and_fill_pack(
|
||||
client: TelegramClient, conv: Conversation, pack: CachedPack
|
||||
) -> None:
|
||||
if pack["short_name"] == "mxtg_unicodemoji_xvzs6743":
|
||||
print("Continuing pack", pack["name"])
|
||||
else:
|
||||
print("Creating pack", pack["name"])
|
||||
await create_pack(conv, pack["name"], pack["type"])
|
||||
total = len(pack["emojis"])
|
||||
for i, emoji in enumerate(pack["emojis"]):
|
||||
if pack["short_name"] == "mxtg_unicodemoji_xvzs6743" and i < 87:
|
||||
continue
|
||||
print(f"Adding emoji {i+1}/{total}", emoji["hex"], emoji["emoji"])
|
||||
emoji_file = media_dir / emoji["filename"]
|
||||
if emoji["type"] == "webp":
|
||||
attrs = [
|
||||
DocumentAttributeImageSize(w=100, h=100),
|
||||
DocumentAttributeFilename(file_name="image.webp"),
|
||||
]
|
||||
with emoji_file.open("rb") as f:
|
||||
file_handle = await client.upload_file(f, file_name="emoji.webp")
|
||||
emoji_file = InputMediaUploadedDocument(
|
||||
file_handle, mime_type="image/webp", attributes=attrs
|
||||
)
|
||||
await send_emoji(conv, emoji_file, emoji["emoji"])
|
||||
await asyncio.sleep(2)
|
||||
print("Publishing pack", pack["short_name"])
|
||||
await publish_pack(conv, pack["short_name"])
|
||||
|
||||
|
||||
async def main():
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
emoji_data = await load_emoji_data()
|
||||
|
||||
split_cache = media_dir / "split-cache.json"
|
||||
try:
|
||||
with split_cache.open() as f:
|
||||
packs: list[CachedPack] = json.load(f)
|
||||
except FileNotFoundError:
|
||||
packs = split_packs(emoji_data)
|
||||
with split_cache.open("w") as f:
|
||||
json.dump(packs, f)
|
||||
|
||||
doc_id_file = Path(args.output)
|
||||
try:
|
||||
with doc_id_file.open() as f:
|
||||
doc_ids = json.load(f)
|
||||
except FileNotFoundError:
|
||||
doc_ids = {}
|
||||
|
||||
client = TelegramClient(args.session, args.api_id, args.api_hash, flood_sleep_threshold=3600)
|
||||
await client.start()
|
||||
async with client.conversation("Stickers", max_messages=20000) as conv:
|
||||
for pack in packs:
|
||||
if not pack["published"]:
|
||||
await create_and_fill_pack(client, conv, pack)
|
||||
pack["published"] = True
|
||||
with split_cache.open("w") as f:
|
||||
json.dump(packs, f, ensure_ascii=False)
|
||||
if not pack["collected"] or True:
|
||||
print("Collecting document IDs from pack", pack["short_name"])
|
||||
stickers = await client(
|
||||
GetStickerSetRequest(InputStickerSetShortName(pack["short_name"]), 0)
|
||||
)
|
||||
doc: Document
|
||||
for i, doc in enumerate(stickers.documents):
|
||||
attr = next(
|
||||
attr
|
||||
for attr in doc.attributes
|
||||
if isinstance(attr, DocumentAttributeCustomEmoji)
|
||||
)
|
||||
base_emoji = attr.alt.replace("\ufe0f", "")
|
||||
emoji = pack["emojis"][i]["emoji"].replace("\ufe0f", "")
|
||||
doc_ids[emoji] = doc.id
|
||||
print(f"Mapped {emoji} (fallback: {base_emoji}) -> {doc_ids[emoji]}")
|
||||
pack["collected"] = True
|
||||
with split_cache.open("w") as f:
|
||||
json.dump(packs, f, ensure_ascii=False)
|
||||
with doc_id_file.open("w") as f:
|
||||
json.dump(doc_ids, f, ensure_ascii=False)
|
||||
print("Pack completed")
|
||||
await asyncio.sleep(5)
|
||||
with open(args.output.replace(".json", ".pickle"), "wb") as f:
|
||||
pickle.dump(doc_ids, f)
|
||||
print("Wrote pickle")
|
||||
|
||||
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user