diff --git a/pkg/connector/client.go b/pkg/connector/client.go index f6a7dd36..e65fe919 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -537,7 +537,9 @@ func (t *TelegramClient) Connect(_ context.Context) { runTelegramClient(ctx, t.client, t.clientInitialized, t.clientDone, func(ctx context.Context) error { log.Info().Msg("Client running starting updates") - return t.updatesManager.Run(ctx, t.client.API(), t.telegramUserID, updates.AuthOptions{}) + return t.updatesManager.Run(ctx, t.client.API(), t.telegramUserID, updates.AuthOptions{ + IsBot: t.metadata.IsBot, + }) }) } diff --git a/pkg/gotd/telegram/updates/manager.go b/pkg/gotd/telegram/updates/manager.go index 8a8c0d09..8e56b525 100644 --- a/pkg/gotd/telegram/updates/manager.go +++ b/pkg/gotd/telegram/updates/manager.go @@ -84,8 +84,8 @@ func (m *Manager) Run(ctx context.Context, api API, userID int64, opt AuthOption zap.Bool("is_bot", opt.IsBot), zap.Bool("forget", opt.Forget), ) - lg.Debug("Run") - defer lg.Debug("Done") + lg.Info("Starting update manager") + defer lg.Info("Update manager exiting") wg, ctx := errgroup.WithContext(ctx) @@ -157,7 +157,7 @@ func (m *Manager) Run(ctx context.Context, api API, userID int64, opt AuthOption wg.Go(func() error { return m.state.Run(ctx) }) - lg.Debug("Wait") + lg.Debug("Waiting for manager workgroup to exit") return wg.Wait() } diff --git a/pkg/gotd/telegram/updates/state.go b/pkg/gotd/telegram/updates/state.go index fab29585..f9268762 100644 --- a/pkg/gotd/telegram/updates/state.go +++ b/pkg/gotd/telegram/updates/state.go @@ -397,7 +397,20 @@ func (s *internalState) RemoveChannel(channelID int64, reason error) { } func (s *internalState) createAndRunChannelState(ctx context.Context, channelID, accessHash int64, initialPts int) (state *channelState) { - state = s.newChannelState(channelID, accessHash, initialPts) + state = newChannelState(ctx, channelStateConfig{ + Out: s.internalQueue, + InitialPts: initialPts, + ChannelID: channelID, + AccessHash: accessHash, + SelfID: s.selfID, + Storage: s.storage, + DiffLimit: s.diffLim, + RawClient: s.client, + Handler: s.handler, + OnChannelTooLong: s.onTooLong, + Logger: s.log.Named("channel").With(zap.Int64("channel_id", channelID)), + Tracer: s.tracer, + }) s.channelsLock.Lock() s.channels[channelID] = state s.channelsLock.Unlock() @@ -415,23 +428,6 @@ func (s *internalState) createAndRunChannelState(ctx context.Context, channelID, return state } -func (s *internalState) newChannelState(channelID, accessHash int64, initialPts int) *channelState { - return newChannelState(channelStateConfig{ - Out: s.internalQueue, - InitialPts: initialPts, - ChannelID: channelID, - AccessHash: accessHash, - SelfID: s.selfID, - Storage: s.storage, - DiffLimit: s.diffLim, - RawClient: s.client, - Handler: s.handler, - OnChannelTooLong: s.onTooLong, - Logger: s.log.Named("channel").With(zap.Int64("channel_id", channelID)), - Tracer: s.tracer, - }) -} - func (s *internalState) getDifference(ctx context.Context) error { ctx, span := s.tracer.Start(ctx, "getDifference") defer span.End() diff --git a/pkg/gotd/telegram/updates/state_channel.go b/pkg/gotd/telegram/updates/state_channel.go index 776223c6..eb025aa5 100644 --- a/pkg/gotd/telegram/updates/state_channel.go +++ b/pkg/gotd/telegram/updates/state_channel.go @@ -62,7 +62,7 @@ type channelStateConfig struct { Tracer trace.Tracer } -func newChannelState(cfg channelStateConfig) *channelState { +func newChannelState(ctx context.Context, cfg channelStateConfig) *channelState { state := &channelState{ updates: make(chan channelUpdate, 10), out: cfg.Out, @@ -88,6 +88,8 @@ func newChannelState(cfg channelStateConfig) *channelState { Tracer: cfg.Tracer, }) + state.runCtx, state.stop = context.WithCancelCause(ctx) + return state } @@ -105,7 +107,6 @@ func (s *channelState) Push(ctx context.Context, u channelUpdate) error { var ErrRemoveChannelState = errors.New("remove channel state") func (s *channelState) Run(ctx context.Context) error { - s.runCtx, s.stop = context.WithCancelCause(ctx) defer s.stop(nil) // Subscribe to channel updates. if err := s.getDifference(s.runCtx); err != nil {