Migrated to rlottie utility
This commit is contained in:
+6
-11
@@ -163,18 +163,13 @@ bridge:
|
||||
image_as_file_size: 10
|
||||
# Maximum size of Telegram documents in megabytes to bridge.
|
||||
max_document_size: 100
|
||||
# Format, animated sticker convert to (unstable).
|
||||
# Format, animated sticker convert to.
|
||||
# Supported values:
|
||||
# image - converts to png (fastest and preferred),
|
||||
# gif - converts to gif animation (requires gifski binary and takes a lot of time),
|
||||
# video - video in mp4 container (requires gifski and ffmpeg binary and takes a lot of time, but less than gif)
|
||||
# To install library:
|
||||
# source venv/bin/activate
|
||||
# pip install nodeenv
|
||||
# nodeenv -p
|
||||
# source venv/bin/activate
|
||||
# npm install -g puppeteer-lottie-cli
|
||||
animated_sticker_target_type: image
|
||||
# png - converts to png (fastest and preferred),
|
||||
# gif - converts to gif animation, requires PIL, slow, sometimes loses transparency,
|
||||
# gifc - uses same utility as png, faster but without transparency at all
|
||||
# mp4 - video in mp4 container (ffmpeg binary and takes a lot of time, but less than gif)
|
||||
animated_sticker_target_type: gif
|
||||
|
||||
# Whether to bridge Telegram bot messages as m.notices or m.texts.
|
||||
bot_messages_as_notices: true
|
||||
|
||||
@@ -167,9 +167,9 @@ async def transfer_file_to_matrix(client: MautrixTelegramClient, intent: IntentA
|
||||
if not location_id:
|
||||
return None
|
||||
|
||||
db_file = DBTelegramFile.get(location_id)
|
||||
if db_file:
|
||||
return db_file
|
||||
#db_file = DBTelegramFile.get(location_id)
|
||||
#if db_file:
|
||||
# return db_file
|
||||
|
||||
try:
|
||||
lock = transfer_locks[location_id]
|
||||
@@ -186,9 +186,9 @@ async def _unlocked_transfer_file_to_matrix(client: MautrixTelegramClient, inten
|
||||
thumbnail: TypeThumbnail, is_sticker: bool,
|
||||
tgs_convert_type: str
|
||||
) -> Optional[DBTelegramFile]:
|
||||
db_file = DBTelegramFile.get(loc_id)
|
||||
if db_file:
|
||||
return db_file
|
||||
#db_file = DBTelegramFile.get(loc_id)
|
||||
#if db_file:
|
||||
# return db_file
|
||||
|
||||
try:
|
||||
file = await client.download_file(location)
|
||||
@@ -203,9 +203,8 @@ async def _unlocked_transfer_file_to_matrix(client: MautrixTelegramClient, inten
|
||||
|
||||
image_converted = False
|
||||
if mime_type == "application/gzip" and is_sticker:
|
||||
mime_type, file, width, height = convert_tgs_to(file, tgs_convert_type, 256, 256)
|
||||
mime_type, file, width, height, thumbnail = convert_tgs_to(file, tgs_convert_type, 256, 256)
|
||||
image_converted = width is not None
|
||||
thumbnail = None
|
||||
|
||||
if mime_type == "image/webp":
|
||||
new_mime_type, file, width, height = convert_image(
|
||||
|
||||
@@ -1,38 +1,137 @@
|
||||
# Generated by Netbeans
|
||||
# Author: Eramde
|
||||
# Date: 09.2019
|
||||
import tempfile
|
||||
import logging
|
||||
import gzip
|
||||
import subprocess
|
||||
import os
|
||||
from io import BytesIO
|
||||
from typing import Optional, Tuple
|
||||
|
||||
|
||||
LOG: logging.Logger = logging.getLogger("mau.util.tgs")
|
||||
|
||||
TYPE_TO_MIME = {"image": "image/png", "gif": "image/gif", "video": "video/mp4"}
|
||||
TYPE_TO_FORMAT = {"image": ".png", "gif": ".gif", "video": ".mp4"}
|
||||
try:
|
||||
import gzip
|
||||
import subprocess
|
||||
|
||||
proc = subprocess.Popen(["lottieconverter"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
_, err = proc.communicate()
|
||||
if err is not None and not err.decode("utf-8").startswith("Usage"):
|
||||
raise ImportError(err)
|
||||
|
||||
|
||||
def _tgs_to_png(file: bytes, width: int,
|
||||
height: int, frame: int = None) -> Tuple[bytes, Optional[bytes]]:
|
||||
if not frame:
|
||||
frame = 1
|
||||
p = subprocess.run(["lottieconverter", "-", "-", "png",
|
||||
str.format(f"{width}x{height}"), str(frame)], stdout=subprocess.PIPE,
|
||||
input=file, universal_newlines=False)
|
||||
return p.stdout, None
|
||||
|
||||
|
||||
TGS_CONVERTERS = {"png": _tgs_to_png}
|
||||
|
||||
def _tgs_to_gif(file: bytes, width: int, height: int) -> Tuple[bytes, Optional[bytes]]:
|
||||
p = subprocess.run(["lottieconverter", "-", "-", "gif",
|
||||
str.format(f"{width}x{height}"), "0", "0x202020"],
|
||||
stdout=subprocess.PIPE,
|
||||
input=file, universal_newlines=False)
|
||||
return p.stdout, None
|
||||
|
||||
TGS_CONVERTERS.update({"gifc": _tgs_to_gif})
|
||||
|
||||
try:
|
||||
from PIL import Image
|
||||
|
||||
def _tgs_to_gif(file: bytes, width: int, height: int) \
|
||||
-> Tuple[bytes, Optional[bytes]]:
|
||||
frames = []
|
||||
first_frame = None
|
||||
for i in range(1, 100):
|
||||
frame, _ = _tgs_to_png(file, width, height, i)
|
||||
if not first_frame:
|
||||
first_frame = frame
|
||||
image = Image.open(BytesIO(frame))
|
||||
if image.mode not in ["RGBA", "RGBa"]:
|
||||
image = image.convert("RGBA")
|
||||
alpha = image.getchannel("A")
|
||||
image = image.convert('P', palette=Image.ADAPTIVE, colors=255)
|
||||
mask = Image.eval(alpha, lambda a: 255 if a <= 128 else 0)
|
||||
image.paste(255, mask)
|
||||
frames.append(image)
|
||||
|
||||
duration = 100
|
||||
fo = BytesIO()
|
||||
frames[0].save(
|
||||
fo,
|
||||
format='GIF',
|
||||
append_images=frames[1:],
|
||||
save_all=True,
|
||||
duration=duration,
|
||||
loop=0,
|
||||
transparency=255,
|
||||
disposal=2,
|
||||
)
|
||||
return fo.getvalue(), first_frame
|
||||
|
||||
TGS_CONVERTERS.update({"gif": _tgs_to_gif})
|
||||
except ImportError:
|
||||
LOG.warn("Unable to create tgs to gif converter, install PIL")
|
||||
|
||||
try:
|
||||
import cv2
|
||||
import numpy
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
def _tgs_to_video(file: bytes, width: int, height: int) \
|
||||
-> Tuple[bytes, Optional[bytes]]:
|
||||
with tempfile.NamedTemporaryFile(mode="r+b", suffix=".mp4") as tmp:
|
||||
video_tmp_file = tmp.name
|
||||
video = None
|
||||
first_frame = None
|
||||
try:
|
||||
video = cv2.VideoWriter(filename=video_tmp_file, apiPreference=cv2.CAP_ANY,
|
||||
fourcc=cv2.VideoWriter_fourcc(*'vp09'),
|
||||
fps=10,
|
||||
frameSize=(width, height))
|
||||
|
||||
for i in range(1, 100):
|
||||
frame, _ = _tgs_to_png(file, width, height, i)
|
||||
if not first_frame:
|
||||
first_frame = frame
|
||||
video.write(cv2.cvtColor(numpy.array(Image.open(BytesIO(frame))),
|
||||
cv2.COLOR_RGB2BGR))
|
||||
|
||||
finally:
|
||||
if video:
|
||||
video.release()
|
||||
with open(video_tmp_file, "rb") as video_file:
|
||||
out = video_file.read()
|
||||
os.remove(video_tmp_file)
|
||||
return out, first_frame
|
||||
"""
|
||||
It seems, that riot don't wont to play converted videos...
|
||||
"""
|
||||
TGS_CONVERTERS.update({"mp4": _tgs_to_video})
|
||||
except ImportError:
|
||||
LOG.warn("Unable to create tgs to video converter, "
|
||||
"install PIL, numpy and opencv-python-headless")
|
||||
|
||||
except (ImportError, OSError):
|
||||
LOG.exception("Unable to init tgs converters, possibly missing lottieconverter")
|
||||
TGS_CONVERTERS = {}
|
||||
|
||||
|
||||
TYPE_TO_MIME = {"png": "image/png", "gif": "image/gif", "gifc": "image/gif", "mp4": "video/mp4"}
|
||||
|
||||
|
||||
def convert_tgs_to(file: bytes, convert_to: str, width: int = 200, height: int = 200) \
|
||||
-> Tuple[str, bytes, Optional[int], Optional[int]]:
|
||||
if convert_to in TYPE_TO_FORMAT:
|
||||
file_ext = TYPE_TO_FORMAT[convert_to]
|
||||
-> Tuple[str, bytes, Optional[int], Optional[int], Optional[bytes]]:
|
||||
if convert_to in TGS_CONVERTERS:
|
||||
mime = TYPE_TO_MIME[convert_to]
|
||||
lottie = gzip.open(BytesIO(file))
|
||||
with tempfile.NamedTemporaryFile(mode="w+b", suffix=".json") as json_out:
|
||||
with tempfile.NamedTemporaryFile(mode="r+b", suffix=file_ext) as tmp:
|
||||
tmp_output_file = tmp.name
|
||||
json_out.write(lottie.read())
|
||||
json_out.flush()
|
||||
subprocess.run(["puppeteer-lottie", "-q", "-i", json_out.name, "-o", tmp_output_file,
|
||||
"-w", str(width), "-h", str(height)], capture_output=True)
|
||||
with open(tmp_output_file, mode="r+b") as out_file:
|
||||
out = out_file.read()
|
||||
os.remove(tmp_output_file)
|
||||
return mime, out, width, height
|
||||
converter = TGS_CONVERTERS[convert_to]
|
||||
out, preview = converter(file, width, height)
|
||||
return mime, out, width, height, preview
|
||||
else:
|
||||
LOG.warning(f"Unable to convert animated sticker, type {convert_to} not supported")
|
||||
return "application/gzip", file, None, None
|
||||
return "application/gzip", file, None, None, None
|
||||
|
||||
Reference in New Issue
Block a user