media: implement streaming for direct downloads

Signed-off-by: Sumner Evans <sumner.evans@automattic.com>
This commit is contained in:
Sumner Evans
2024-10-22 11:25:57 -06:00
parent a573740b9a
commit 13f21a7c70
5 changed files with 67 additions and 50 deletions
+1 -1
View File
@@ -59,4 +59,4 @@ require (
rsc.io/qr v0.2.0 // indirect
)
replace github.com/gotd/td => github.com/beeper/td v0.107.1-0.20241016153132-ce975d53c414
replace github.com/gotd/td => github.com/beeper/td v0.107.1-0.20241022172048-68acd408d05a
+2 -2
View File
@@ -2,8 +2,8 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
github.com/beeper/td v0.107.1-0.20241016153132-ce975d53c414 h1:fN29TN7xu5GaMMEc6VDIReoSbAvDs8mEvUpi/MBew2c=
github.com/beeper/td v0.107.1-0.20241016153132-ce975d53c414/go.mod h1:zzgUtTDJD4TVaCpKfCD0rxazQxPhSlPzx/CVBpqsx1g=
github.com/beeper/td v0.107.1-0.20241022172048-68acd408d05a h1:M+Ntmuj1TTZRdis4w/WpaOs5adpMqSNCEsZylQFO+Sw=
github.com/beeper/td v0.107.1-0.20241022172048-68acd408d05a/go.mod h1:zzgUtTDJD4TVaCpKfCD0rxazQxPhSlPzx/CVBpqsx1g=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs=
+6 -7
View File
@@ -1,7 +1,6 @@
package connector
import (
"bytes"
"context"
"fmt"
"io"
@@ -133,21 +132,21 @@ func (tc *TelegramConnector) Download(ctx context.Context, mediaID networkid.Med
return nil, fmt.Errorf("unhandled media type %T", msgMedia)
}
data, fileInfo, err := readyTransferer.Download(ctx)
r, mimeType, size, err := readyTransferer.Stream(ctx)
if err != nil {
log.Err(err).Msg("failed to download media")
return nil, err
}
log.Debug().
Str("mime_type", fileInfo.MimeType).
Int("size", fileInfo.Size).
Str("mime_type", mimeType).
Int("size", size).
Msg("Downloaded media successfully")
return &mediaproxy.GetMediaResponseData{
Reader: io.NopCloser(bytes.NewBuffer(data)),
ContentType: fileInfo.MimeType,
ContentLength: int64(fileInfo.Size),
Reader: io.NopCloser(r),
ContentType: mimeType,
ContentLength: int64(size),
}, nil
}
+16 -14
View File
@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
@@ -25,45 +26,43 @@ type AnimatedStickerConfig struct {
}
type ConvertedSticker struct {
Data []byte
DataWriter io.Reader
MIMEType string
ThumbnailData []byte
ThumbnailMIMEType string
Width int
Height int
Size int
}
func (c AnimatedStickerConfig) convert(ctx context.Context, data []byte) ConvertedSticker {
input := bytes.NewBuffer(data)
if c.Target == "disable" {
return ConvertedSticker{Data: data, MIMEType: "application/x-tgsticker"}
return ConvertedSticker{DataWriter: input, MIMEType: "application/x-tgsticker"}
}
log := zerolog.Ctx(ctx).With().Str("animated_sticker_target", c.Target).Logger()
if !lottie.Supported() {
log.Warn().Msg("lottie not supported, cannot convert animated stickers")
return ConvertedSticker{Data: data, MIMEType: "application/x-tgsticker"}
return ConvertedSticker{DataWriter: input, MIMEType: "application/x-tgsticker"}
} else if (c.Target == "webp" || c.Target == "webm") && !ffmpeg.Supported() {
log.Warn().Msg("ffmpeg not supported, cannot convert animated stickers")
return ConvertedSticker{Data: data, MIMEType: "application/x-tgsticker"}
return ConvertedSticker{DataWriter: input, MIMEType: "application/x-tgsticker"}
}
input := bytes.NewBuffer(data)
var convertedData, thumbnailData []byte
dataWriter := new(bytes.Buffer)
var thumbnailData []byte
var mimeType, thumbnailMIMEType string
var err error
switch c.Target {
case "png":
mimeType = "image/png"
outputWriter := new(bytes.Buffer)
err = lottie.Convert(ctx, input, "", outputWriter, c.Target, c.Args.Width, c.Args.Height, "1")
convertedData = outputWriter.Bytes()
err = lottie.Convert(ctx, input, "", dataWriter, c.Target, c.Args.Width, c.Args.Height, "1")
case "gif":
mimeType = "image/gif"
outputWriter := new(bytes.Buffer)
err = lottie.Convert(ctx, input, "", outputWriter, c.Target, c.Args.Width, c.Args.Height, strconv.Itoa(c.Args.FPS))
convertedData = outputWriter.Bytes()
err = lottie.Convert(ctx, input, "", dataWriter, c.Target, c.Args.Width, c.Args.Height, strconv.Itoa(c.Args.FPS))
case "webm", "webp":
tmpFile := filepath.Join(os.TempDir(), fmt.Sprintf("mautrix-telegram-lottieconverter-%s.%s", random.String(10), c.Target))
defer func() {
@@ -75,7 +74,9 @@ func (c AnimatedStickerConfig) convert(ctx context.Context, data []byte) Convert
if err != nil {
break
}
var convertedData []byte
convertedData, err = os.ReadFile(tmpFile)
dataWriter = bytes.NewBuffer(convertedData)
default:
err = fmt.Errorf("unsupported target format %s", c.Target)
}
@@ -85,16 +86,17 @@ func (c AnimatedStickerConfig) convert(ctx context.Context, data []byte) Convert
Msg("failed to convert animated sticker to target format")
// Fallback to original data
return ConvertedSticker{Data: data, MIMEType: "application/x-tgsticker"}
return ConvertedSticker{DataWriter: input, MIMEType: "application/x-tgsticker"}
}
return ConvertedSticker{
Data: convertedData,
DataWriter: dataWriter,
MIMEType: mimeType,
ThumbnailData: thumbnailData,
ThumbnailMIMEType: thumbnailMIMEType,
Width: c.Args.Width,
Height: c.Args.Height,
Size: dataWriter.Len(),
}
}
+42 -26
View File
@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"github.com/gotd/td/telegram/downloader"
@@ -24,30 +25,32 @@ type dimensionable interface {
GetH() int
}
func getLargestPhotoSize(sizes []tg.PhotoSizeClass) (width, height int, largest tg.PhotoSizeClass) {
func getLargestPhotoSize(sizes []tg.PhotoSizeClass) (width, height, fileSize int, largest tg.PhotoSizeClass) {
if len(sizes) == 0 {
panic("cannot get largest size from empty list of sizes")
}
// FIXME this max size seems to be confusing bytes and dimensions.
var maxSize int
for _, s := range sizes {
var currentSize int
switch size := s.(type) {
case *tg.PhotoSize:
currentSize = size.GetSize()
case *tg.PhotoCachedSize:
currentSize = max(size.GetW(), size.GetH())
currentSize = max(size.W, size.H, len(size.Bytes))
case *tg.PhotoSizeProgressive:
currentSize = max(size.GetW(), size.GetH())
currentSize = max(size.W, size.H)
for _, sz := range size.Sizes {
currentSize = max(currentSize, sz)
}
case *tg.PhotoPathSize:
currentSize = len(size.GetBytes())
case *tg.PhotoStrippedSize:
currentSize = len(size.GetBytes())
}
if currentSize > maxSize {
maxSize = currentSize
if currentSize > fileSize {
fileSize = currentSize
largest = s
if d, ok := s.(dimensionable); ok {
width = d.GetW()
@@ -161,7 +164,7 @@ func (t *Transferer) WithDocument(doc tg.DocumentClass, thumbnail bool) *ReadyTr
FileReference: document.GetFileReference(),
}
if thumbnail {
_, _, largestThumbnail := getLargestPhotoSize(document.Thumbs)
_, _, _, largestThumbnail := getLargestPhotoSize(document.Thumbs)
documentFileLocation.ThumbSize = largestThumbnail.GetType()
} else {
t.fileInfo.Size = int(document.Size)
@@ -178,7 +181,7 @@ func (t *Transferer) WithDocument(doc tg.DocumentClass, thumbnail bool) *ReadyTr
func (t *Transferer) WithPhoto(pc tg.PhotoClass) *ReadyTransferer {
photo := pc.(*tg.Photo)
var largest tg.PhotoSizeClass
t.fileInfo.Width, t.fileInfo.Height, largest = getLargestPhotoSize(photo.GetSizes())
t.fileInfo.Width, t.fileInfo.Height, t.fileInfo.Size, largest = getLargestPhotoSize(photo.GetSizes())
return &ReadyTransferer{
inner: t,
loc: &tg.InputPhotoFileLocation{
@@ -246,18 +249,23 @@ func (t *ReadyTransferer) Transfer(ctx context.Context, store *store.Container,
return file.MXC, nil, &t.inner.fileInfo, nil
}
data, err := t.DownloadBytes(ctx)
var reader io.Reader
reader, t.inner.fileInfo.MimeType, t.inner.fileInfo.Size, err = t.Stream(ctx)
if err != nil {
return "", nil, nil, fmt.Errorf("downloading file failed: %w", err)
}
if t.inner.animatedStickerConfig != nil && t.inner.fileInfo.MimeType == "application/x-tgsticker" {
data, err := io.ReadAll(reader)
if err != nil {
return "", nil, nil, fmt.Errorf("reading sticker data failed: %w", err)
}
converted := t.inner.animatedStickerConfig.convert(ctx, data)
data = converted.Data
reader = converted.DataWriter
t.inner.fileInfo.MimeType = converted.MIMEType
t.inner.fileInfo.Width = converted.Width
t.inner.fileInfo.Height = converted.Height
t.inner.fileInfo.Size = len(data)
t.inner.fileInfo.Size = converted.Size
if len(converted.ThumbnailData) > 0 {
thumbnailMXC, thumbnailFileInfo, err := intent.UploadMedia(ctx, t.inner.roomID, converted.ThumbnailData, t.inner.filename, converted.ThumbnailMIMEType)
@@ -274,7 +282,13 @@ func (t *ReadyTransferer) Transfer(ctx context.Context, store *store.Container,
}
}
mxc, encryptedFileInfo, err = intent.UploadMedia(ctx, t.inner.roomID, data, t.inner.filename, t.inner.fileInfo.MimeType)
mxc, encryptedFileInfo, err = intent.UploadMediaStream(ctx, t.inner.roomID, int64(t.inner.fileInfo.Size), false, func(file io.Writer) (*bridgev2.FileStreamResult, error) {
_, err := io.Copy(file, reader)
return &bridgev2.FileStreamResult{
FileName: t.inner.filename,
MimeType: t.inner.fileInfo.MimeType,
}, err
})
if err != nil {
return "", nil, nil, fmt.Errorf("failed to upload media to Matrix: %w", err)
}
@@ -294,13 +308,12 @@ func (t *ReadyTransferer) Transfer(ctx context.Context, store *store.Container,
return mxc, encryptedFileInfo, &t.inner.fileInfo, nil
}
// Download downloads the media from Telegram.
func (t *ReadyTransferer) Download(ctx context.Context) ([]byte, *event.FileInfo, error) {
// TODO convert entire function to streaming? Maybe at least stream to file?
var buf bytes.Buffer
storageFileTypeClass, err := downloader.NewDownloader().Download(t.inner.client, t.loc).Stream(ctx, &buf)
// Stream streams the media from Telegram to an [io.Reader].
func (t *ReadyTransferer) Stream(ctx context.Context) (r io.Reader, mimeType string, fileSize int, err error) {
var storageFileTypeClass tg.StorageFileTypeClass
storageFileTypeClass, r, err = downloader.NewDownloader().Download(t.inner.client, t.loc).StreamToReader(ctx)
if err != nil {
return nil, nil, err
return nil, "", 0, err
}
if t.inner.fileInfo.MimeType == "" {
switch storageFileTypeClass.(type) {
@@ -321,26 +334,29 @@ func (t *ReadyTransferer) Download(ctx context.Context) ([]byte, *event.FileInfo
case *tg.StorageFileWebp:
t.inner.fileInfo.MimeType = "image/webp"
default:
t.inner.fileInfo.MimeType = http.DetectContentType(buf.Bytes())
// Just guess it's a JPEG. All documents should have specified the
// MIME type, and all photos are JPEG.
t.inner.fileInfo.MimeType = "image/jpeg"
}
}
t.inner.fileInfo.Size = buf.Len()
if t.inner.animatedStickerConfig != nil {
detected := http.DetectContentType(buf.Bytes())
if detected == "application/x-tgsticker" || detected == "application/x-gzip" {
if unzipped, err := gnuzip.MaybeGUnzip(buf.Bytes()); err != nil {
data, err := io.ReadAll(r)
if err != nil {
return nil, "", 0, fmt.Errorf("failed to read animated sticker data: %w", err)
} else if detected := http.DetectContentType(data); detected == "application/x-tgsticker" || detected == "application/x-gzip" {
if unzipped, err := gnuzip.MaybeGUnzip(data); err != nil {
zerolog.Ctx(ctx).Err(err).Msg("failed to unzip animated sticker")
} else {
converted := t.inner.animatedStickerConfig.convert(ctx, unzipped)
t.inner.fileInfo.MimeType = converted.MIMEType
t.inner.fileInfo.Size = len(converted.Data)
return converted.Data, &t.inner.fileInfo, nil
t.inner.fileInfo.Size = converted.Size
return converted.DataWriter, t.inner.fileInfo.MimeType, t.inner.fileInfo.Size, nil
}
}
}
return buf.Bytes(), &t.inner.fileInfo, nil
return r, t.inner.fileInfo.MimeType, t.inner.fileInfo.Size, nil
}
// DownloadBytes downloads the media from Telegram to a byte buffer.