diff --git a/pkg/gotd/telegram/updates/state.go b/pkg/gotd/telegram/updates/state.go index 3d8ce3d3..91d6f348 100644 --- a/pkg/gotd/telegram/updates/state.go +++ b/pkg/gotd/telegram/updates/state.go @@ -7,6 +7,7 @@ import ( "time" "github.com/go-faster/errors" + "go.mau.fi/util/exsync" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -45,8 +46,9 @@ type internalState struct { idleTimeout *time.Timer // Channel states. - channels map[int64]*channelState - channelsLock sync.Mutex + channels map[int64]*channelState + channelsLock sync.Mutex + recentlyLeftChannels *exsync.Set[int64] // Immutable fields. client API @@ -84,7 +86,8 @@ func newState(ctx context.Context, cfg stateConfig) *internalState { date: cfg.State.Date, idleTimeout: time.NewTimer(idleTimeout), - channels: make(map[int64]*channelState), + channels: make(map[int64]*channelState), + recentlyLeftChannels: exsync.NewSet[int64](), client: cfg.RawClient, log: cfg.Logger, @@ -341,14 +344,17 @@ func (s *internalState) handleChannel(ctx context.Context, channelID int64, date s.log.Error("Pts validation failed", zap.Error(err), zap.Any("update", cu.update)) return nil } + found := false for _, ent := range cu.entities.Chats { if ent.GetID() == channelID { + found = true switch te := ent.(type) { case *tg.Channel: if te.Left { s.log.Info("Not adding new channel state for left channel", zap.Int64("channel_id", channelID)) return nil } + s.recentlyLeftChannels.Remove(channelID) case *tg.ChannelForbidden: s.log.Info("Not adding new channel state for forbidden channel", zap.Int64("channel_id", channelID)) return nil @@ -356,6 +362,10 @@ func (s *internalState) handleChannel(ctx context.Context, channelID int64, date break } } + if !found && s.recentlyLeftChannels.Has(channelID) { + s.log.Info("Not adding new channel state for recently left channel", zap.Int64("channel_id", channelID)) + return nil + } s.channelsLock.Lock() state, ok := s.channels[channelID] diff --git a/pkg/gotd/telegram/updates/state_apply.go b/pkg/gotd/telegram/updates/state_apply.go index 67011d9c..ebea3f60 100644 --- a/pkg/gotd/telegram/updates/state_apply.go +++ b/pkg/gotd/telegram/updates/state_apply.go @@ -51,9 +51,11 @@ func (s *internalState) applyCombined(ctx context.Context, comb *tg.UpdatesCombi ptsChanged = true continue case *tg.UpdateChannelTooLong: + s.channelsLock.Lock() st, ok := s.channels[u.ChannelID] + s.channelsLock.Unlock() if !ok { - s.log.Debug("ChannelTooLong for channel that is not in the internalState, update ignored", zap.Int64("channel_id", u.ChannelID)) + s.log.Info("ChannelTooLong for channel that is not in the internalState, update ignored", zap.Int64("channel_id", u.ChannelID)) continue } if err := st.Push(ctx, channelUpdate{