diff --git a/pkg/connector/handletelegram.go b/pkg/connector/handletelegram.go index 4c70763b..9ac48927 100644 --- a/pkg/connector/handletelegram.go +++ b/pkg/connector/handletelegram.go @@ -54,12 +54,13 @@ type IGetMessages interface { GetMessages() []int } -func (t *TelegramClient) selfLeaveChat(ctx context.Context, portalKey networkid.PortalKey) error { +func (t *TelegramClient) selfLeaveChat(ctx context.Context, portalKey networkid.PortalKey, reason error) error { peerType, id, _, err := ids.ParsePortalID(portalKey.ID) if err != nil { return err } if peerType == ids.PeerTypeChannel { + t.updatesManager.RemoveChannel(id, reason) topics, err := t.main.Store.Topic.GetAll(ctx, id) if err != nil { return err @@ -127,17 +128,17 @@ func (t *TelegramClient) onUpdateChannel(ctx context.Context, e tg.Entities, upd }) if err != nil { if tgerr.Is(err, tg.ErrChannelInvalid, tg.ErrChannelPrivate) { - return t.selfLeaveChat(ctx, portalKey) + return t.selfLeaveChat(ctx, portalKey, fmt.Errorf("error fetching after UpdateChannel: %w", err)) } log.Err(err).Msg("Failed to get channel info after UpdateChannel event") } else if len(chats.GetChats()) != 1 { log.Warn().Int("chat_count", len(chats.GetChats())).Msg("Got more than 1 chat in GetChannels response") } else if channel, ok := chats.GetChats()[0].(*tg.Channel); !ok { log.Error().Type("chat_type", chats.GetChats()[0]).Msg("Expected channel, got something else. Leaving the channel.") - return t.selfLeaveChat(ctx, portalKey) + return t.selfLeaveChat(ctx, portalKey, fmt.Errorf("channel not returned in getChannels after UpdateChannel")) } else if channel.Left { log.Error().Msg("Update was for a left channel. Leaving the channel.") - return t.selfLeaveChat(ctx, portalKey) + return t.selfLeaveChat(ctx, portalKey, fmt.Errorf("channel has left=true in getChannels after UpdateChannel")) } else { res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ EventMeta: simplevent.EventMeta{ @@ -340,7 +341,7 @@ func (t *TelegramClient) handleServiceMessage(ctx context.Context, msg *tg.Messa return resultToError(res) case *tg.MessageActionChatDeleteUser: if action.UserID == t.telegramUserID { - return t.selfLeaveChat(ctx, eventMeta.PortalKey) + return t.selfLeaveChat(ctx, eventMeta.PortalKey, fmt.Errorf("delete user event for chat")) } res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatInfoChange{ EventMeta: eventMeta.WithType(bridgev2.RemoteEventChatInfoChange), @@ -824,11 +825,14 @@ func (t *TelegramClient) onEntityUpdate(ctx context.Context, e tg.Entities) erro } for chatID, chat := range e.Chats { if chat.GetLeft() { - t.selfLeaveChat(ctx, t.makePortalKeyFromID(ids.PeerTypeChat, chatID, 0)) + // TODO don't ignore errors + t.selfLeaveChat(ctx, t.makePortalKeyFromID(ids.PeerTypeChat, chatID, 0), fmt.Errorf("left flag in entity update")) } } for _, channel := range e.Channels { - if _, err := t.updateChannel(ctx, channel); err != nil { + if channel.GetLeft() { + t.selfLeaveChat(ctx, t.makePortalKeyFromID(ids.PeerTypeChannel, channel.ID, 0), fmt.Errorf("left flag in entity update")) + } else if _, err := t.updateChannel(ctx, channel); err != nil { return err } } diff --git a/pkg/gotd/telegram/updates/manager.go b/pkg/gotd/telegram/updates/manager.go index 64616ac4..8a8c0d09 100644 --- a/pkg/gotd/telegram/updates/manager.go +++ b/pkg/gotd/telegram/updates/manager.go @@ -161,6 +161,13 @@ func (m *Manager) Run(ctx context.Context, api API, userID int64, opt AuthOption return wg.Wait() } +func (m *Manager) RemoveChannel(channelID int64, reason error) { + if m == nil { + return + } + m.state.RemoveChannel(channelID, reason) +} + func (m *Manager) loadState(ctx context.Context, api API, userID int64, forget bool) (State, error) { onNotFound: var state State diff --git a/pkg/gotd/telegram/updates/state.go b/pkg/gotd/telegram/updates/state.go index 0e23caa9..fab29585 100644 --- a/pkg/gotd/telegram/updates/state.go +++ b/pkg/gotd/telegram/updates/state.go @@ -383,6 +383,19 @@ func (s *internalState) handleChannel(ctx context.Context, channelID int64, date return state.Push(ctx, cu) } +func (s *internalState) RemoveChannel(channelID int64, reason error) { + if s == nil { + return + } + s.channelsLock.Lock() + state, ok := s.channels[channelID] + s.channelsLock.Unlock() + if !ok { + return + } + state.stop(fmt.Errorf("%w: %w", ErrRemoveChannelState, reason)) +} + func (s *internalState) createAndRunChannelState(ctx context.Context, channelID, accessHash int64, initialPts int) (state *channelState) { state = s.newChannelState(channelID, accessHash, initialPts) s.channelsLock.Lock()