diff --git a/example-config.yaml b/example-config.yaml index 507d0d76..26a4e9e9 100644 --- a/example-config.yaml +++ b/example-config.yaml @@ -165,9 +165,9 @@ bridge: max_document_size: 100 # Format, animated sticker convert to (unstable). # Supported values: - # image - converts to png (preferred), - # gif - converts to gif animation (sometimes loses alpha), - # video - VP9 video in mp4 container + # image - converts to png (fastest and preferred), + # gif - converts to gif animation (requires gifski binary and takes a lot of time), + # video - video in mp4 container (requires gifski and ffmpeg binary and takes a lot of time, but less than gif) animated_sticker_target_type: image # Whether to bridge Telegram bot messages as m.notices or m.texts. diff --git a/mautrix_telegram/util/file_transfer.py b/mautrix_telegram/util/file_transfer.py index b7dfe002..29e6cf15 100644 --- a/mautrix_telegram/util/file_transfer.py +++ b/mautrix_telegram/util/file_transfer.py @@ -49,7 +49,7 @@ try: except ImportError: VideoFileClip = random = string = os = mimetypes = None -from .tgs_converter import convert_tgs +from .tgs_converter import convert_tgs_to log: logging.Logger = logging.getLogger("mau.util") @@ -203,8 +203,9 @@ async def _unlocked_transfer_file_to_matrix(client: MautrixTelegramClient, inten image_converted = False if mime_type == "application/gzip" and is_sticker: - mime_type, file, width, height, thumbnail = convert_tgs(file, tgs_convert_type, 256, 256) + mime_type, file, width, height = convert_tgs_to(file, tgs_convert_type, 256, 256) image_converted = width is not None + thumbnail = None if mime_type == "image/webp": new_mime_type, file, width, height = convert_image( diff --git a/mautrix_telegram/util/tgs_converter.py b/mautrix_telegram/util/tgs_converter.py index edacadbf..c62bea5b 100644 --- a/mautrix_telegram/util/tgs_converter.py +++ b/mautrix_telegram/util/tgs_converter.py @@ -1,144 +1,38 @@ # Generated by Netbeans # Author: Eramde # Date: 09.2019 - -from typing import Optional, Tuple, Union, Dict - - +import tempfile import logging +import gzip +import subprocess +import os +from io import BytesIO +from typing import Optional, Tuple + LOG: logging.Logger = logging.getLogger("mau.util.tgs") -try: - - import cairosvg - from io import BytesIO, StringIO - from tgs.objects import Animation - from tgs.parsers.tgs import parse_tgs as tgs_importer - from tgs.exporters import svg as tgs_svg_exporter - - def _tgs_to_png(animation: Animation, width: int = None, - height: int = None, frame: int = None) -> Tuple[bytes, Optional[bytes]]: - if not frame: - frame = int(animation.out_point * 0.3) - if not (width and height): - width = animation.width - height = animation.height - svg = StringIO() - tgs_svg_exporter.export_svg(animation, svg, frame=frame) - svg.seek(0) - fo = BytesIO() - cairosvg.svg2png(file_obj=svg, write_to=fo, output_width=width, output_height=height) - return fo.getvalue(), None - - TGS_CONVERTERS = {"image": _tgs_to_png} - - try: - from PIL import Image - - def _tgs_to_gif(animation: Animation, width: int = None, height: int = None) \ - -> Tuple[bytes, Optional[bytes]]: - """ - FIXME: copy-pasted from tgs.exporters.gif, because it's method don't resize images - """ - start = int(animation.in_point) - end = int(animation.out_point) - skip_frames = 5 - frames = [] - first_frame = None - for i in range(start, end + 1, skip_frames): - frame, _ = _tgs_to_png(animation, width, height, i) - if not first_frame: - first_frame = frame - image = Image.open(BytesIO(frame)) - if image.mode not in ["RGBA", "RGBa"]: - image = image.convert("RGBA") - alpha = image.getchannel("A") - image = image.convert('P', palette=Image.ADAPTIVE, colors=255) - mask = Image.eval(alpha, lambda a: 255 if a <= 128 else 0) - image.paste(255, mask) - frames.append(image) - - duration = 1000 / animation.frame_rate - fo = BytesIO() - frames[0].save( - fo, - format='GIF', - append_images=frames[1:], - save_all=True, - duration=duration, - loop=0, - transparency=255, - disposal=2, - ) - return fo.getvalue(), first_frame - - TGS_CONVERTERS.update({"gif": _tgs_to_gif}) - except ImportError: - LOG.warn("Unable to create tgs to gif converter, install PIL") - - try: - import cv2 - import numpy - import tempfile - import os - - def _tgs_to_video(animation: Animation, width: int = None, height: int = None) \ - -> Tuple[bytes, Optional[bytes]]: - start = int(animation.in_point) - end = int(animation.out_point) - with tempfile.NamedTemporaryFile(mode="r+b", suffix=".mp4") as tmp: - video_tmp_file = tmp.name - video = None - first_frame = None - try: - video = cv2.VideoWriter(filename=video_tmp_file, apiPreference=cv2.CAP_ANY, - fourcc=cv2.VideoWriter_fourcc(*'vp09'), - fps=int(animation.frame_rate), - frameSize=(width or animation.width, - height or animation.height)) - - for i in range(start, end + 1): - frame, _ = _tgs_to_png(animation, width, height, i) - if not first_frame: - first_frame = frame - video.write(cv2.cvtColor(numpy.array(Image.open(BytesIO(frame))), - cv2.COLOR_RGB2BGR)) - - finally: - if video: - video.release() - with open(video_tmp_file, "rb") as video_file: - out = video_file.read() - os.remove(video_tmp_file) - return out, first_frame - """ - It seems, that riot don't wont to play converted videos... - """ - TGS_CONVERTERS.update({"video": _tgs_to_video}) - except ImportError: - LOG.warn("Unable to create tgs to video converter, " - "install PIL, numpy and opencv-python-headless") - -except (ImportError, OSError): - LOG.exception("Unable to init tgs converters, possibly missing tgs and/or cairo libraries") - TGS_CONVERTERS = {} - TYPE_TO_MIME = {"image": "image/png", "gif": "image/gif", "video": "video/mp4"} +TYPE_TO_FORMAT = {"image": ".png", "gif": ".gif", "video": ".mp4"} -def convert_tgs(file: bytes, convert_to: str, width: int = None, height: int = None) \ - -> Tuple[str, bytes, Optional[int], Optional[int], Optional[bytes]]: - if convert_to in TGS_CONVERTERS: - converter = TGS_CONVERTERS[convert_to] +def convert_tgs_to(file: bytes, convert_to: str, width: int = 200, height: int = 200) \ + -> Tuple[str, bytes, Optional[int], Optional[int]]: + if convert_to in TYPE_TO_FORMAT: + file_ext = TYPE_TO_FORMAT[convert_to] mime = TYPE_TO_MIME[convert_to] - try: - animation = tgs_importer(BytesIO(file)) - out, preview = converter(animation, width, height) - return mime, out, width or animation.width, height or animation.height, preview - # Yep... some animations crash library... - except AttributeError: - LOG.exception("Error occurred while converting animated sticker") + lottie = gzip.open(BytesIO(file)) + with tempfile.NamedTemporaryFile(mode="w+b", suffix=".json") as json_out: + with tempfile.NamedTemporaryFile(mode="r+b", suffix=file_ext) as tmp: + tmp_output_file = tmp.name + json_out.write(lottie.read()) + json_out.flush() + subprocess.run(["puppeteer-lottie", "-q", "-i", json_out.name, "-o", tmp_output_file, + "-w", str(width), "-h", str(height)], capture_output=True) + with open(tmp_output_file, mode="r+b") as out_file: + out = out_file.read() + os.remove(tmp_output_file) + return mime, out, width, height else: - LOG.warning(f"Unable to convert animated sticker, no converter for type {convert_to}") - return "application/gzip", file, None, None, None + LOG.warning(f"Unable to convert animated sticker, type {convert_to} not supported") + return "application/gzip", file, None, None