Add png thumbnails for webm animated stickers. Fixes #467
This commit is contained in:
@@ -108,8 +108,10 @@ def _location_to_id(location: TypeLocation) -> str:
|
||||
|
||||
|
||||
async def transfer_thumbnail_to_matrix(client: MautrixTelegramClient, intent: IntentAPI,
|
||||
thumbnail_loc: TypeLocation, video: bytes, mime: str,
|
||||
encrypt: bool) -> Optional[DBTelegramFile]:
|
||||
thumbnail_loc: TypeLocation, mime_type: str, encrypt: bool,
|
||||
video: Optional[bytes], custom_data: Optional[bytes] = None,
|
||||
width: Optional[int] = None, height: [int] = None
|
||||
) -> Optional[DBTelegramFile]:
|
||||
if not Image or not VideoFileClip:
|
||||
return None
|
||||
|
||||
@@ -117,12 +119,17 @@ async def transfer_thumbnail_to_matrix(client: MautrixTelegramClient, intent: In
|
||||
if not loc_id:
|
||||
return None
|
||||
|
||||
if custom_data:
|
||||
loc_id += "-mau_custom_thumbnail"
|
||||
|
||||
db_file = DBTelegramFile.get(loc_id)
|
||||
if db_file:
|
||||
return db_file
|
||||
|
||||
video_ext = sane_mimetypes.guess_extension(mime)
|
||||
if VideoFileClip and video_ext and video:
|
||||
video_ext = sane_mimetypes.guess_extension(mime_type)
|
||||
if custom_data:
|
||||
file = custom_data
|
||||
elif VideoFileClip and video_ext and video:
|
||||
try:
|
||||
file, width, height = _read_video_thumbnail(video, video_ext, frame_ext="png")
|
||||
except OSError:
|
||||
@@ -193,6 +200,8 @@ async def _unlocked_transfer_file_to_matrix(client: MautrixTelegramClient, inten
|
||||
if db_file:
|
||||
return db_file
|
||||
|
||||
converted_anim = None
|
||||
|
||||
if parallel_id and isinstance(location, Document) and (not is_sticker or not tgs_convert):
|
||||
db_file = await parallel_transfer_to_matrix(client, intent, loc_id, location, filename,
|
||||
encrypt, parallel_id)
|
||||
@@ -212,13 +221,17 @@ async def _unlocked_transfer_file_to_matrix(client: MautrixTelegramClient, inten
|
||||
|
||||
image_converted = False
|
||||
# A weird bug in alpine/magic makes it return application/octet-stream for gzips...
|
||||
if is_sticker and tgs_convert and (mime_type == "application/gzip" or (
|
||||
mime_type == "application/octet-stream"
|
||||
and magic.from_buffer(file).startswith("gzip"))):
|
||||
mime_type, file, width, height = await convert_tgs_to(
|
||||
file, tgs_convert["target"], **tgs_convert["args"])
|
||||
thumbnail = None
|
||||
is_tgs = (mime_type == "application/gzip" or (mime_type == "application/octet-stream"
|
||||
and magic.from_buffer(file).startswith(
|
||||
"gzip")))
|
||||
if is_sticker and tgs_convert and is_tgs:
|
||||
converted_anim = await convert_tgs_to(file, tgs_convert["target"],
|
||||
**tgs_convert["args"])
|
||||
mime_type = converted_anim.mime
|
||||
file = converted_anim.data
|
||||
width, height = converted_anim.width, converted_anim.height
|
||||
image_converted = mime_type != "application/gzip"
|
||||
thumbnail = None
|
||||
|
||||
if mime_type == "image/webp":
|
||||
new_mime_type, file, width, height = convert_image(
|
||||
@@ -245,10 +258,16 @@ async def _unlocked_transfer_file_to_matrix(client: MautrixTelegramClient, inten
|
||||
if isinstance(thumbnail, (PhotoSize, PhotoCachedSize)):
|
||||
thumbnail = thumbnail.location
|
||||
try:
|
||||
db_file.thumbnail = await transfer_thumbnail_to_matrix(client, intent, thumbnail, file,
|
||||
mime_type, encrypt)
|
||||
db_file.thumbnail = await transfer_thumbnail_to_matrix(client, intent, thumbnail,
|
||||
video=file, mime_type=mime_type,
|
||||
encrypt=encrypt)
|
||||
except FileIdInvalidError:
|
||||
log.warning(f"Failed to transfer thumbnail for {thumbnail!s}", exc_info=True)
|
||||
elif converted_anim and converted_anim.thumbnail_data:
|
||||
db_file.thumbnail = await transfer_thumbnail_to_matrix(
|
||||
client, intent, location, video=None, encrypt=encrypt,
|
||||
custom_data=converted_anim.thumbnail_data, mime_type=converted_anim.thumbnail_mime,
|
||||
width=converted_anim.width, height=converted_anim.height)
|
||||
|
||||
try:
|
||||
db_file.insert()
|
||||
|
||||
@@ -21,8 +21,23 @@ import shutil
|
||||
import os.path
|
||||
import tempfile
|
||||
|
||||
from attr import dataclass
|
||||
|
||||
log: logging.Logger = logging.getLogger("mau.util.tgs")
|
||||
converters: Dict[str, Callable[[bytes, int, int, Any], Awaitable[Tuple[str, bytes]]]] = {}
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConvertedSticker:
|
||||
mime: str
|
||||
data: bytes
|
||||
thumbnail_mime: Optional[str] = None
|
||||
thumbnail_data: Optional[bytes] = None
|
||||
width: int = 0
|
||||
height: int = 0
|
||||
|
||||
|
||||
Converter = Callable[[bytes, int, int, Any], Awaitable[ConvertedSticker]]
|
||||
converters: Dict[str, Converter] = {}
|
||||
|
||||
|
||||
def abswhich(program: Optional[str]) -> Optional[str]:
|
||||
@@ -34,7 +49,7 @@ lottieconverter = abswhich("lottieconverter")
|
||||
ffmpeg = abswhich("ffmpeg")
|
||||
|
||||
if lottieconverter:
|
||||
async def tgs_to_png(file: bytes, width: int, height: int, **_: Any) -> Tuple[str, bytes]:
|
||||
async def tgs_to_png(file: bytes, width: int, height: int, **_: Any) -> ConvertedSticker:
|
||||
frame = 1
|
||||
proc = await asyncio.create_subprocess_exec(lottieconverter, "-", "-", "png",
|
||||
f"{width}x{height}", str(frame),
|
||||
@@ -42,26 +57,26 @@ if lottieconverter:
|
||||
stdin=asyncio.subprocess.PIPE)
|
||||
stdout, stderr = await proc.communicate(file)
|
||||
if proc.returncode == 0:
|
||||
return "image/png", stdout
|
||||
return ConvertedSticker("image/png", stdout)
|
||||
else:
|
||||
log.error("lottieconverter error: " + (stderr.decode("utf-8") if stderr is not None
|
||||
else f"unknown ({proc.returncode})"))
|
||||
return "application/gzip", file
|
||||
else f"unknown ({proc.returncode})"))
|
||||
return ConvertedSticker("application/gzip", file)
|
||||
|
||||
|
||||
async def tgs_to_gif(file: bytes, width: int, height: int, background: str = "202020",
|
||||
**_: Any) -> Tuple[str, bytes]:
|
||||
**_: Any) -> ConvertedSticker:
|
||||
proc = await asyncio.create_subprocess_exec(lottieconverter, "-", "-", "gif",
|
||||
f"{width}x{height}", f"0x{background}",
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stdin=asyncio.subprocess.PIPE)
|
||||
stdout, stderr = await proc.communicate(file)
|
||||
if proc.returncode == 0:
|
||||
return "image/gif", stdout
|
||||
return ConvertedSticker("image/gif", stdout)
|
||||
else:
|
||||
log.error("lottieconverter error: " + (stderr.decode("utf-8") if stderr is not None
|
||||
else f"unknown ({proc.returncode})"))
|
||||
return "application/gzip", file
|
||||
else f"unknown ({proc.returncode})"))
|
||||
return ConvertedSticker("application/gzip", file)
|
||||
|
||||
|
||||
converters["png"] = tgs_to_png
|
||||
@@ -69,7 +84,7 @@ if lottieconverter:
|
||||
|
||||
if lottieconverter and ffmpeg:
|
||||
async def tgs_to_webm(file: bytes, width: int, height: int, fps: int = 30,
|
||||
**_: Any) -> Tuple[str, bytes]:
|
||||
**_: Any) -> ConvertedSticker:
|
||||
with tempfile.TemporaryDirectory(prefix="tgs_") as tmpdir:
|
||||
file_template = tmpdir + "/out_"
|
||||
proc = await asyncio.create_subprocess_exec(lottieconverter, "-", file_template,
|
||||
@@ -78,6 +93,8 @@ if lottieconverter and ffmpeg:
|
||||
stdin=asyncio.subprocess.PIPE)
|
||||
_, stderr = await proc.communicate(file)
|
||||
if proc.returncode == 0:
|
||||
with open(f"{file_template}00.png", "rb") as first_frame_file:
|
||||
first_frame_data = first_frame_file.read()
|
||||
proc = await asyncio.create_subprocess_exec(ffmpeg, "-hide_banner", "-loglevel",
|
||||
"error", "-framerate", str(fps),
|
||||
"-pattern_type", "glob", "-i",
|
||||
@@ -88,25 +105,27 @@ if lottieconverter and ffmpeg:
|
||||
stdin=asyncio.subprocess.PIPE)
|
||||
stdout, stderr = await proc.communicate()
|
||||
if proc.returncode == 0:
|
||||
return "video/webm", stdout
|
||||
return ConvertedSticker("video/webm", stdout, "image/png", first_frame_data)
|
||||
else:
|
||||
log.error("ffmpeg error: " + (stderr.decode("utf-8") if stderr is not None
|
||||
else f"unknown ({proc.returncode})"))
|
||||
else f"unknown ({proc.returncode})"))
|
||||
else:
|
||||
log.error("lottieconverter error: " + (stderr.decode("utf-8") if stderr is not None
|
||||
else f"unknown ({proc.returncode})"))
|
||||
return "application/gzip", file
|
||||
else f"unknown ({proc.returncode})"))
|
||||
return ConvertedSticker("application/gzip", file)
|
||||
|
||||
|
||||
converters["webm"] = tgs_to_webm
|
||||
|
||||
|
||||
async def convert_tgs_to(file: bytes, convert_to: str, width: int, height: int, **kwargs: Any
|
||||
) -> Tuple[str, bytes, Optional[int], Optional[int]]:
|
||||
) -> ConvertedSticker:
|
||||
if convert_to in converters:
|
||||
converter = converters[convert_to]
|
||||
mime, out = await converter(file, width, height, **kwargs)
|
||||
return mime, out, width, height
|
||||
converted = await converter(file, width, height, **kwargs)
|
||||
converted.width = width
|
||||
converted.height = height
|
||||
return converted
|
||||
elif convert_to != "disable":
|
||||
log.warning(f"Unable to convert animated sticker, type {convert_to} not supported")
|
||||
return "application/gzip", file, None, None
|
||||
return ConvertedSticker("application/gzip", file)
|
||||
|
||||
Reference in New Issue
Block a user