7a04f298d2
- update to latest telegram layer - remove some references to fields in tg.Entities that don't exist in the schema - originally added here: https://github.com/beeper/td/commit/820929062a2ba0104397bc01235ab58a9cff780e - referenced here - https://github.com/mautrix/telegramgo/commit/124f0967ed195b5a380c9bd02e170ada9710dde3 - https://github.com/mautrix/telegramgo/commit/4205047aab2e0639217148b5d125bfaab668bd8e
190 lines
4.1 KiB
Go
190 lines
4.1 KiB
Go
package updates
|
|
|
|
import (
|
|
"context"
|
|
"sort"
|
|
"time"
|
|
|
|
"go.opentelemetry.io/otel/trace"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type sequenceBox struct {
|
|
state int
|
|
gaps gapBuffer
|
|
gapTimeout *time.Timer
|
|
pending []update
|
|
|
|
apply func(ctx context.Context, state int, updates []update) error
|
|
log *zap.Logger
|
|
tracer trace.Tracer
|
|
}
|
|
|
|
type sequenceConfig struct {
|
|
InitialState int
|
|
Apply func(ctx context.Context, state int, updates []update) error
|
|
Logger *zap.Logger
|
|
Tracer trace.Tracer
|
|
}
|
|
|
|
func newSequenceBox(cfg sequenceConfig) *sequenceBox {
|
|
if cfg.Apply == nil {
|
|
panic("Apply func nil")
|
|
}
|
|
if cfg.Logger == nil {
|
|
cfg.Logger = zap.NewNop()
|
|
}
|
|
if cfg.Tracer == nil {
|
|
cfg.Tracer = trace.NewNoopTracerProvider().Tracer("")
|
|
}
|
|
|
|
cfg.Logger.Debug("Initialized", zap.Int("internalState", cfg.InitialState))
|
|
|
|
t := time.NewTimer(fastgapTimeout)
|
|
_ = t.Stop()
|
|
return &sequenceBox{
|
|
state: cfg.InitialState,
|
|
gapTimeout: t,
|
|
apply: cfg.Apply,
|
|
log: cfg.Logger,
|
|
tracer: cfg.Tracer,
|
|
}
|
|
}
|
|
|
|
func (s *sequenceBox) Handle(ctx context.Context, u update) error {
|
|
ctx, span := s.tracer.Start(ctx, "sequenceBox.Handle")
|
|
defer span.End()
|
|
|
|
log := s.log.With(zap.Int("upd_from", u.start()), zap.Int("upd_to", u.end()))
|
|
if checkGap(s.state, u.State, u.Count) == gapIgnore {
|
|
log.Debug("Outdated update, skipping", zap.Int("internalState", s.state))
|
|
return nil
|
|
}
|
|
|
|
if s.gaps.Has() {
|
|
s.pending = append(s.pending, u)
|
|
if accepted := s.gaps.Consume(u); !accepted {
|
|
log.Debug("Out of gap range, postponed", zap.Array("gaps", s.gaps))
|
|
return nil
|
|
}
|
|
|
|
log.Debug("Gap accepted", zap.Array("gaps", s.gaps))
|
|
if !s.gaps.Has() {
|
|
_ = s.gapTimeout.Stop()
|
|
s.log.Debug("Gap was resolved by waiting")
|
|
return s.applyPending(ctx)
|
|
}
|
|
return nil
|
|
}
|
|
switch checkGap(s.state, u.State, u.Count) {
|
|
case gapApply:
|
|
if len(s.pending) > 0 {
|
|
s.pending = append(s.pending, u)
|
|
return s.applyPending(ctx)
|
|
}
|
|
|
|
if err := s.apply(ctx, u.State, []update{u}); err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Debug("Accepted")
|
|
s.setState(u.State, "update")
|
|
return nil
|
|
case gapRefetch:
|
|
s.pending = append(s.pending, u)
|
|
s.gaps.Enable(s.state, u.start())
|
|
|
|
// Check if we already have acceptable updates in buffer.
|
|
for _, u := range s.pending {
|
|
_ = s.gaps.Consume(u)
|
|
}
|
|
|
|
if !s.gaps.Has() {
|
|
log.Debug("Gap was resolved by pending updates")
|
|
return s.applyPending(ctx)
|
|
}
|
|
|
|
_ = s.gapTimeout.Reset(fastgapTimeout)
|
|
s.log.Debug("Gap detected", zap.Array("gap", s.gaps))
|
|
return nil
|
|
default:
|
|
panic("unreachable")
|
|
}
|
|
}
|
|
|
|
func (s *sequenceBox) applyPending(ctx context.Context) error {
|
|
ctx, span := s.tracer.Start(ctx, "sequenceBox.applyPending")
|
|
defer span.End()
|
|
|
|
sort.SliceStable(s.pending, func(i, j int) bool {
|
|
return s.pending[i].start() < s.pending[j].start()
|
|
})
|
|
|
|
var (
|
|
cursor = 0
|
|
state = s.state
|
|
accepted []update
|
|
)
|
|
|
|
loop:
|
|
for i, update := range s.pending {
|
|
switch checkGap(state, update.State, update.Count) {
|
|
case gapApply:
|
|
accepted = append(accepted, update)
|
|
state = update.State
|
|
cursor = i + 1
|
|
continue
|
|
|
|
case gapIgnore:
|
|
cursor = i + 1
|
|
continue
|
|
|
|
case gapRefetch:
|
|
break loop
|
|
}
|
|
}
|
|
|
|
// Trim processed updates. Setting zero values for the rest
|
|
// of the slice lets GC collect referenced objects.
|
|
end := len(s.pending)
|
|
trim := end - cursor
|
|
copy(s.pending, s.pending[cursor:])
|
|
for i := trim; i < end; i++ {
|
|
s.pending[i] = update{}
|
|
}
|
|
s.pending = s.pending[:trim]
|
|
if len(accepted) == 0 {
|
|
s.log.Warn("Empty buffer", zap.Any("pending", s.pending), zap.Int("internalState", s.state))
|
|
return nil
|
|
}
|
|
|
|
if err := s.apply(ctx, state, accepted); err != nil {
|
|
return err
|
|
}
|
|
|
|
s.log.Debug("Pending updates applied",
|
|
zap.Int("prev_state", s.state),
|
|
zap.Int("new_state", state),
|
|
zap.Int("accepted_count", len(accepted)),
|
|
)
|
|
|
|
s.setState(state, "pending updates")
|
|
return nil
|
|
}
|
|
|
|
func (s *sequenceBox) State() int { return s.state }
|
|
|
|
func (s *sequenceBox) SetState(state int, reason string) {
|
|
s.setState(state, reason)
|
|
}
|
|
|
|
func (s *sequenceBox) setState(state int, reason string) {
|
|
old := s.state
|
|
s.state = state
|
|
s.log.Debug("State changed",
|
|
zap.Int("old", old),
|
|
zap.Int("new", state),
|
|
zap.String("reason", reason),
|
|
)
|
|
}
|