diff --git a/pkg/gotd/mtproto/conn.go b/pkg/gotd/mtproto/conn.go index 9a7fd356..70a6b46b 100644 --- a/pkg/gotd/mtproto/conn.go +++ b/pkg/gotd/mtproto/conn.go @@ -172,7 +172,7 @@ func New(dialer Dialer, opt Options) *Conn { // handleClose closes rpc engine and underlying connection on context done. func (c *Conn) handleClose(ctx context.Context) error { <-ctx.Done() - c.log.Debug("Closing") + c.log.Info("Connection context done, closing") // Close RPC Engine. c.rpc.ForceClose() @@ -180,6 +180,7 @@ func (c *Conn) handleClose(ctx context.Context) error { if err := c.conn.Close(); err != nil { c.log.Debug("Failed to cleanup connection", zap.Error(err)) } + c.log.Info("Connection closed") return nil } @@ -198,24 +199,22 @@ func (c *Conn) Run(ctx context.Context, f func(ctx context.Context) error) error ctx, cancel := context.WithCancel(ctx) defer cancel() - c.log.Debug("Run: start") - defer c.log.Debug("Run: end") + c.log.Info("Run: start") + defer c.log.Info("Run: end") if err := c.connect(ctx); err != nil { return errors.Wrap(err, "start") } - { - // All goroutines are bound to current call. - g := tdsync.NewLogGroup(ctx, c.log.Named("group")) - g.Go("handleClose", c.handleClose) - g.Go("pingLoop", c.pingLoop) - g.Go("ackLoop", c.ackLoop) - g.Go("saltsLoop", c.saltLoop) - g.Go("userCallback", f) - g.Go("readLoop", c.readLoop) + // All goroutines are bound to current call. + g := tdsync.NewLogGroup(ctx, c.log.Named("group")) + g.Go("handleClose", c.handleClose) + g.Go("pingLoop", c.pingLoop) + g.Go("ackLoop", c.ackLoop) + g.Go("saltsLoop", c.saltLoop) + g.Go("userCallback", f) + g.Go("readLoop", c.readLoop) - if err := g.Wait(); err != nil { - return errors.Wrap(err, "group") - } + if err := g.Wait(); err != nil { + return errors.Wrap(err, "group") } return nil } diff --git a/pkg/gotd/rpc/engine.go b/pkg/gotd/rpc/engine.go index ff1f5841..8cae945e 100644 --- a/pkg/gotd/rpc/engine.go +++ b/pkg/gotd/rpc/engine.go @@ -193,43 +193,39 @@ func (e *Engine) retryUntilAck(ctx context.Context, req Request) (sent bool, err return false, errors.Wrap(err, "send") } - loop := func() error { - timer := e.clock.Timer(e.retryInterval) - defer clock.StopTimer(timer) + timer := e.clock.Timer(e.retryInterval) + defer clock.StopTimer(timer) - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-e.reqCtx.Done(): - return errors.Wrap(e.reqCtx.Err(), "engine forcibly closed") - case <-ackChan: - return nil - case <-timer.C(): - timer.Reset(e.retryInterval) + for { + select { + case <-ctx.Done(): + return true, ctx.Err() + case <-e.reqCtx.Done(): + return true, errors.Wrap(e.reqCtx.Err(), "engine forcibly closed") + case <-ackChan: + return true, nil + case <-timer.C(): + timer.Reset(e.retryInterval) - log.Debug("Acknowledge timed out, performing retry") - if err := e.send(ctx, req.MsgID, req.SeqNo, req.Input); err != nil { - if errors.Is(err, context.Canceled) { - return nil - } - - log.Error("Retry failed", zap.Error(err)) - return err + log.Warn("Acknowledge timed out, performing retry") + if err := e.send(ctx, req.MsgID, req.SeqNo, req.Input); err != nil { + if errors.Is(err, context.Canceled) { + return true, nil } - retries++ - if retries >= e.maxRetries { - log.Error("Retry limit reached", zap.Int64("msg_id", req.MsgID)) - return &RetryLimitReachedErr{ - Retries: retries, - } + log.Error("Retry failed", zap.Error(err)) + return true, errors.Wrap(err, "retry send") + } + + retries++ + if retries >= e.maxRetries { + log.Error("Retry limit reached", zap.Int64("msg_id", req.MsgID)) + return true, &RetryLimitReachedErr{ + Retries: retries, } } } } - - return true, loop() } // NotifyResult notifies engine about received RPC response. @@ -269,7 +265,6 @@ func (e *Engine) isClosed() bool { // All Do method calls of closed engine will return ErrEngineClosed error. func (e *Engine) Close() { atomic.StoreUint32(&e.closed, 1) - e.log.Info("Close called") e.wg.Wait() }