gotd: remove redundant closures and improve logs on disconnect

This commit is contained in:
Tulir Asokan
2025-12-10 18:34:07 +02:00
parent 4268ee9909
commit 7f13284b59
2 changed files with 39 additions and 45 deletions
+14 -15
View File
@@ -172,7 +172,7 @@ func New(dialer Dialer, opt Options) *Conn {
// handleClose closes rpc engine and underlying connection on context done.
func (c *Conn) handleClose(ctx context.Context) error {
<-ctx.Done()
c.log.Debug("Closing")
c.log.Info("Connection context done, closing")
// Close RPC Engine.
c.rpc.ForceClose()
@@ -180,6 +180,7 @@ func (c *Conn) handleClose(ctx context.Context) error {
if err := c.conn.Close(); err != nil {
c.log.Debug("Failed to cleanup connection", zap.Error(err))
}
c.log.Info("Connection closed")
return nil
}
@@ -198,24 +199,22 @@ func (c *Conn) Run(ctx context.Context, f func(ctx context.Context) error) error
ctx, cancel := context.WithCancel(ctx)
defer cancel()
c.log.Debug("Run: start")
defer c.log.Debug("Run: end")
c.log.Info("Run: start")
defer c.log.Info("Run: end")
if err := c.connect(ctx); err != nil {
return errors.Wrap(err, "start")
}
{
// All goroutines are bound to current call.
g := tdsync.NewLogGroup(ctx, c.log.Named("group"))
g.Go("handleClose", c.handleClose)
g.Go("pingLoop", c.pingLoop)
g.Go("ackLoop", c.ackLoop)
g.Go("saltsLoop", c.saltLoop)
g.Go("userCallback", f)
g.Go("readLoop", c.readLoop)
// All goroutines are bound to current call.
g := tdsync.NewLogGroup(ctx, c.log.Named("group"))
g.Go("handleClose", c.handleClose)
g.Go("pingLoop", c.pingLoop)
g.Go("ackLoop", c.ackLoop)
g.Go("saltsLoop", c.saltLoop)
g.Go("userCallback", f)
g.Go("readLoop", c.readLoop)
if err := g.Wait(); err != nil {
return errors.Wrap(err, "group")
}
if err := g.Wait(); err != nil {
return errors.Wrap(err, "group")
}
return nil
}
+25 -30
View File
@@ -193,43 +193,39 @@ func (e *Engine) retryUntilAck(ctx context.Context, req Request) (sent bool, err
return false, errors.Wrap(err, "send")
}
loop := func() error {
timer := e.clock.Timer(e.retryInterval)
defer clock.StopTimer(timer)
timer := e.clock.Timer(e.retryInterval)
defer clock.StopTimer(timer)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-e.reqCtx.Done():
return errors.Wrap(e.reqCtx.Err(), "engine forcibly closed")
case <-ackChan:
return nil
case <-timer.C():
timer.Reset(e.retryInterval)
for {
select {
case <-ctx.Done():
return true, ctx.Err()
case <-e.reqCtx.Done():
return true, errors.Wrap(e.reqCtx.Err(), "engine forcibly closed")
case <-ackChan:
return true, nil
case <-timer.C():
timer.Reset(e.retryInterval)
log.Debug("Acknowledge timed out, performing retry")
if err := e.send(ctx, req.MsgID, req.SeqNo, req.Input); err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
log.Error("Retry failed", zap.Error(err))
return err
log.Warn("Acknowledge timed out, performing retry")
if err := e.send(ctx, req.MsgID, req.SeqNo, req.Input); err != nil {
if errors.Is(err, context.Canceled) {
return true, nil
}
retries++
if retries >= e.maxRetries {
log.Error("Retry limit reached", zap.Int64("msg_id", req.MsgID))
return &RetryLimitReachedErr{
Retries: retries,
}
log.Error("Retry failed", zap.Error(err))
return true, errors.Wrap(err, "retry send")
}
retries++
if retries >= e.maxRetries {
log.Error("Retry limit reached", zap.Int64("msg_id", req.MsgID))
return true, &RetryLimitReachedErr{
Retries: retries,
}
}
}
}
return true, loop()
}
// NotifyResult notifies engine about received RPC response.
@@ -269,7 +265,6 @@ func (e *Engine) isClosed() bool {
// All Do method calls of closed engine will return ErrEngineClosed error.
func (e *Engine) Close() {
atomic.StoreUint32(&e.closed, 1)
e.log.Info("Close called")
e.wg.Wait()
}