Files
mautrix-telegram/pkg/gotd/telegram/updates/sequence_box.go
T
2025-06-27 20:03:37 -07:00

190 lines
4.1 KiB
Go

package updates
import (
"context"
"sort"
"time"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
type sequenceBox struct {
state int
gaps gapBuffer
gapTimeout *time.Timer
pending []update
apply func(ctx context.Context, state int, updates []update) error
log *zap.Logger
tracer trace.Tracer
}
type sequenceConfig struct {
InitialState int
Apply func(ctx context.Context, state int, updates []update) error
Logger *zap.Logger
Tracer trace.Tracer
}
func newSequenceBox(cfg sequenceConfig) *sequenceBox {
if cfg.Apply == nil {
panic("Apply func nil")
}
if cfg.Logger == nil {
cfg.Logger = zap.NewNop()
}
if cfg.Tracer == nil {
cfg.Tracer = trace.NewNoopTracerProvider().Tracer("")
}
cfg.Logger.Debug("Initialized", zap.Int("internalState", cfg.InitialState))
t := time.NewTimer(fastgapTimeout)
_ = t.Stop()
return &sequenceBox{
state: cfg.InitialState,
gapTimeout: t,
apply: cfg.Apply,
log: cfg.Logger,
tracer: cfg.Tracer,
}
}
func (s *sequenceBox) Handle(ctx context.Context, u update) error {
ctx, span := s.tracer.Start(ctx, "sequenceBox.Handle")
defer span.End()
log := s.log.With(zap.Int("upd_from", u.start()), zap.Int("upd_to", u.end()))
if checkGap(s.state, u.State, u.Count) == gapIgnore {
log.Debug("Outdated update, skipping", zap.Int("internalState", s.state))
return nil
}
if s.gaps.Has() {
s.pending = append(s.pending, u)
if accepted := s.gaps.Consume(u); !accepted {
log.Debug("Out of gap range, postponed", zap.Array("gaps", s.gaps))
return nil
}
log.Debug("Gap accepted", zap.Array("gaps", s.gaps))
if !s.gaps.Has() {
_ = s.gapTimeout.Stop()
s.log.Debug("Gap was resolved by waiting")
return s.applyPending(ctx)
}
return nil
}
switch checkGap(s.state, u.State, u.Count) {
case gapApply:
if len(s.pending) > 0 {
s.pending = append(s.pending, u)
return s.applyPending(ctx)
}
if err := s.apply(ctx, u.State, []update{u}); err != nil {
return err
}
log.Debug("Accepted")
s.setState(u.State, "update")
return nil
case gapRefetch:
s.pending = append(s.pending, u)
s.gaps.Enable(s.state, u.start())
// Check if we already have acceptable updates in buffer.
for _, u := range s.pending {
_ = s.gaps.Consume(u)
}
if !s.gaps.Has() {
log.Debug("Gap was resolved by pending updates")
return s.applyPending(ctx)
}
_ = s.gapTimeout.Reset(fastgapTimeout)
s.log.Debug("Gap detected", zap.Array("gap", s.gaps))
return nil
default:
panic("unreachable")
}
}
func (s *sequenceBox) applyPending(ctx context.Context) error {
ctx, span := s.tracer.Start(ctx, "sequenceBox.applyPending")
defer span.End()
sort.SliceStable(s.pending, func(i, j int) bool {
return s.pending[i].start() < s.pending[j].start()
})
var (
cursor = 0
state = s.state
accepted []update
)
loop:
for i, update := range s.pending {
switch checkGap(state, update.State, update.Count) {
case gapApply:
accepted = append(accepted, update)
state = update.State
cursor = i + 1
continue
case gapIgnore:
cursor = i + 1
continue
case gapRefetch:
break loop
}
}
// Trim processed updates. Setting zero values for the rest
// of the slice lets GC collect referenced objects.
end := len(s.pending)
trim := end - cursor
copy(s.pending, s.pending[cursor:])
for i := trim; i < end; i++ {
s.pending[i] = update{}
}
s.pending = s.pending[:trim]
if len(accepted) == 0 {
s.log.Warn("Empty buffer", zap.Any("pending", s.pending), zap.Int("internalState", s.state))
return nil
}
if err := s.apply(ctx, state, accepted); err != nil {
return err
}
s.log.Debug("Pending updates applied",
zap.Int("prev_state", s.state),
zap.Int("new_state", state),
zap.Int("accepted_count", len(accepted)),
)
s.setState(state, "pending updates")
return nil
}
func (s *sequenceBox) State() int { return s.state }
func (s *sequenceBox) SetState(state int, reason string) {
s.setState(state, reason)
}
func (s *sequenceBox) setState(state int, reason string) {
old := s.state
s.state = state
s.log.Debug("State changed",
zap.Int("old", old),
zap.Int("new", state),
zap.String("reason", reason),
)
}