From 332bbb8de1a7c10cb44f20694d213a645eb87d0d Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Thu, 26 Sep 2024 11:50:11 -0600 Subject: [PATCH] client: handle channel updates Signed-off-by: Sumner Evans --- pkg/connector/api.go | 63 +++++++++++++++++++++++++-------------- pkg/connector/client.go | 3 ++ pkg/connector/telegram.go | 49 ++++++++++++++++++++++++++++++ 3 files changed, 93 insertions(+), 22 deletions(-) diff --git a/pkg/connector/api.go b/pkg/connector/api.go index d23ee48b..3caa029c 100644 --- a/pkg/connector/api.go +++ b/pkg/connector/api.go @@ -11,9 +11,38 @@ type hasUserUpdates interface { GetUsers() []tg.UserClass } +type hasChatUpdates interface { + GetChats() []tg.ChatClass +} + type hasUpdates interface { hasUserUpdates - GetChats() []tg.ChatClass + hasChatUpdates +} + +func handleUserUpdates[U hasUserUpdates](ctx context.Context, t *TelegramClient, resp hasUserUpdates) error { + for _, user := range resp.GetUsers() { + user, ok := user.(*tg.User) + if !ok { + return fmt.Errorf("user is %T not *tg.User", user) + } + _, err := t.updateGhost(ctx, user.ID, user) + if err != nil { + return err + } + } + return nil +} + +func handleChatUpdates[U hasChatUpdates](ctx context.Context, t *TelegramClient, resp hasChatUpdates) error { + for _, c := range resp.GetChats() { + if channel, ok := c.(*tg.Channel); ok { + if err := t.updateChannel(ctx, channel); err != nil { + return err + } + } + } + return nil } func APICallWithOnlyUserUpdates[U hasUserUpdates](ctx context.Context, t *TelegramClient, fn func() (U, error)) (U, error) { @@ -21,35 +50,25 @@ func APICallWithOnlyUserUpdates[U hasUserUpdates](ctx context.Context, t *Telegr if err != nil { return *new(U), err } + return resp, handleUserUpdates[U](ctx, t, resp) +} - for _, user := range resp.GetUsers() { - user, ok := user.(*tg.User) - if !ok { - return *new(U), fmt.Errorf("user is %T not *tg.User", user) - } - _, err := t.updateGhost(ctx, user.ID, user) - if err != nil { - return *new(U), err - } +func APICallWithOnlyChatUpdates[U hasChatUpdates](ctx context.Context, t *TelegramClient, fn func() (U, error)) (U, error) { + resp, err := fn() + if err != nil { + return *new(U), err } - - return resp, nil + return resp, handleChatUpdates[U](ctx, t, resp) } // Wrapper for API calls that return a response with updates. func APICallWithUpdates[U hasUpdates](ctx context.Context, t *TelegramClient, fn func() (U, error)) (U, error) { - resp, err := APICallWithOnlyUserUpdates(ctx, t, fn) + resp, err := fn() if err != nil { return *new(U), err } - - for _, c := range resp.GetChats() { - if channel, ok := c.(*tg.Channel); ok { - if err := t.updateChannel(ctx, channel); err != nil { - return *new(U), err - } - } + if err = handleUserUpdates[U](ctx, t, resp); err != nil { + return *new(U), err } - - return resp, nil + return resp, handleChatUpdates[U](ctx, t, resp) } diff --git a/pkg/connector/client.go b/pkg/connector/client.go index e14d4b33..930fe58f 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -137,6 +137,9 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge dispatcher.OnNewMessage(func(ctx context.Context, e tg.Entities, update *tg.UpdateNewMessage) error { return client.onUpdateNewMessage(ctx, update) }) + dispatcher.OnChannel(func(ctx context.Context, e tg.Entities, update *tg.UpdateChannel) error { + return client.onUpdateChannel(ctx, update) + }) dispatcher.OnNewChannelMessage(func(ctx context.Context, e tg.Entities, update *tg.UpdateNewChannelMessage) error { return client.onUpdateNewMessage(ctx, update) }) diff --git a/pkg/connector/telegram.go b/pkg/connector/telegram.go index 5515348b..bd08b62c 100644 --- a/pkg/connector/telegram.go +++ b/pkg/connector/telegram.go @@ -9,6 +9,7 @@ import ( "time" "github.com/gotd/td/tg" + "github.com/gotd/td/tgerr" "github.com/rs/zerolog" "maunium.net/go/mautrix/bridge/status" "maunium.net/go/mautrix/bridgev2" @@ -32,6 +33,54 @@ type IGetMessages interface { GetMessages() []int } +func (t *TelegramClient) onUpdateChannel(ctx context.Context, update *tg.UpdateChannel) error { + log := zerolog.Ctx(ctx).With(). + Str("handler", "on_update_channel"). + Int64("channel_id", update.ChannelID). + Logger() + log.Debug().Msg("Fetching channel due to UpdateChannel event") + + leave := func() { + t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatDelete{ + EventMeta: simplevent.EventMeta{ + Type: bridgev2.RemoteEventChatDelete, + LogContext: func(c zerolog.Context) zerolog.Context { + return c.Int64("channel_id", update.ChannelID) + }, + PortalKey: t.makePortalKeyFromID(ids.PeerTypeChannel, update.ChannelID), + Sender: t.mySender(), + }, + OnlyForMe: true, + }) + } + + chats, err := APICallWithOnlyChatUpdates(ctx, t, func() (tg.MessagesChatsClass, error) { + if accessHash, err := t.ScopedStore.GetAccessHash(ctx, ids.PeerTypeChannel, update.ChannelID); err != nil { + return nil, err + } else { + return t.client.API().ChannelsGetChannels(ctx, []tg.InputChannelClass{ + &tg.InputChannel{ChannelID: update.ChannelID, AccessHash: accessHash}, + }) + } + }) + if err != nil { + if tgerr.Is(err, tg.ErrChannelInvalid, tg.ErrChannelPrivate) { + leave() + return nil + } + return fmt.Errorf("failed to get channel: %w", err) + } else if len(chats.GetChats()) != 1 { + return fmt.Errorf("expected 1 chat, got %d", len(chats.GetChats())) + } else if channel, ok := chats.GetChats()[0].(*tg.Channel); !ok { + log.Error().Type("chat_type", chats.GetChats()[0]).Msg("Expected channel, got something else. Leaving the channel.") + leave() + } else if channel.Left { + log.Error().Msg("Update was for a left channel. Leaving the channel.") + leave() + } + return nil +} + func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, update IGetMessage) error { log := zerolog.Ctx(ctx) switch msg := update.GetMessage().(type) {