diff --git a/pkg/connector/client.go b/pkg/connector/client.go index e65fe919..6259ac2f 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -81,7 +81,7 @@ type TelegramClient struct { updatesManager *updates.Manager clientCtx context.Context clientCancel context.CancelFunc - clientDone *Future[error] + clientDone *exsync.Event clientInitialized *exsync.Event mu sync.Mutex @@ -166,6 +166,7 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge prevReactionPoll: map[networkid.PortalKey]time.Time{}, clientInitialized: exsync.NewEvent(), + clientDone: exsync.NewEvent(), } if !login.Metadata.(*UserLoginMetadata).Session.HasAuthKey() { @@ -411,6 +412,9 @@ func (t *TelegramClient) sendBadCredentialsOrUnknownError(err error) { StateEvent: status.StateUnknownError, Error: "tg-unknown-error", Message: humanise.Error(err), + Info: map[string]any{ + "go_error": err.Error(), + }, }) } } @@ -508,15 +512,11 @@ func (t *TelegramClient) onAuthError(err error) { }() } -func (t *TelegramClient) Connect(_ context.Context) { +func (t *TelegramClient) Connect(ctx context.Context) { t.mu.Lock() defer t.mu.Unlock() - ctx := context.Background() - - log := zerolog.Ctx(context.Background()).With().Int64("user_id", t.telegramUserID).Logger() - ctx = log.WithContext(ctx) - + log := zerolog.Ctx(ctx) if !t.metadata.Session.HasAuthKey() { log.Warn().Msg("user does not have an auth key, sending bad credentials state") t.sendBadCredentialsOrUnknownError(ErrNoAuthKey) @@ -532,39 +532,38 @@ func (t *TelegramClient) Connect(_ context.Context) { ctx, cancel := context.WithCancel(ctx) t.clientCtx = ctx t.clientCancel = cancel - t.clientDone = NewFuture[error]() + t.clientDone.Clear() t.clientInitialized.Clear() + go t.runInBackground(ctx) +} - runTelegramClient(ctx, t.client, t.clientInitialized, t.clientDone, func(ctx context.Context) error { +func (t *TelegramClient) runInBackground(ctx context.Context) { + log := zerolog.Ctx(ctx) + err := t.client.Run(ctx, func(ctx context.Context) error { + t.clientInitialized.Set() log.Info().Msg("Client running starting updates") return t.updatesManager.Run(ctx, t.client.API(), t.telegramUserID, updates.AuthOptions{ IsBot: t.metadata.IsBot, }) }) -} - -func runTelegramClient(ctx context.Context, client *telegram.Client, initialized *exsync.Event, done *Future[error], callback func(ctx context.Context) error) { - go func() { - err := client.Run(ctx, func(ctx context.Context) error { - initialized.Set() - return callback(ctx) - }) - initialized.Set() - done.Set(err) - }() + t.clientDone.Set() + t.clientInitialized.Set() + if err != nil { + log.Err(err).Msg("Client exited with error") + t.sendBadCredentialsOrUnknownError(err) + } } func (t *TelegramClient) Disconnect() { t.mu.Lock() defer t.mu.Unlock() - t.userLogin.Log.Info().Msg("Disconnecting client") + t.userLogin.Log.Debug().Msg("Disconnecting client") if t.clientCancel != nil { t.clientCancel() - t.userLogin.Log.Info().Msg("Waiting for client") - err, _ := t.clientDone.Get(context.Background()) - t.userLogin.Log.Info().Err(err).Msg("Client done") + t.userLogin.Log.Debug().Msg("Waiting for client disconnection") + <-t.clientDone.GetChan() } t.userLogin.Log.Info().Msg("Disconnect complete") @@ -614,7 +613,8 @@ func (t *TelegramClient) getSingleChannel(ctx context.Context, id int64) (*tg.Ch func (t *TelegramClient) IsLoggedIn() bool { // TODO use less hacky check than context cancellation - return t != nil && t.clientCtx != nil && t.client != nil && t.clientCtx.Err() == nil && + return t != nil && t.client != nil && + t.clientInitialized.IsSet() && !t.clientDone.IsSet() && t.metadata.Session.HasAuthKey() } diff --git a/pkg/connector/login.go b/pkg/connector/login.go index 9cf116ba..e157e5a2 100644 --- a/pkg/connector/login.go +++ b/pkg/connector/login.go @@ -17,6 +17,7 @@ package connector import ( + "cmp" "context" "errors" "fmt" @@ -25,7 +26,6 @@ import ( "time" "github.com/rs/zerolog" - "go.mau.fi/util/exsync" "go.mau.fi/zerozap" "go.uber.org/zap" "maunium.net/go/mautrix/bridgev2" @@ -136,13 +136,22 @@ func (bl *baseLogin) makeClient(ctx context.Context, dispatcher *tg.UpdateDispat }) bl.ctx, bl.cancel = context.WithTimeoutCause(log.WithContext(bl.main.Bridge.BackgroundCtx), LoginTimeout, ErrLoginTimeout) - initialized := exsync.NewEvent() - done := NewFuture[error]() - runTelegramClient(bl.ctx, bl.client, initialized, done, waitContextDone) + connectResult := NewFuture[error]() + go func() { + err := bl.client.Run(bl.ctx, func(ctx context.Context) error { + connectResult.Set(nil) + <-ctx.Done() + return ctx.Err() + }) + connectResult.Set(err) + if err != nil && !errors.Is(err, bl.ctx.Err()) { + log.Err(err).Msg("Login client exited with error") + } + }() log.Debug().Msg("Waiting for client to connect") - err := initialized.Wait(ctx) - if err != nil { + connErr, ctxErr := connectResult.Get(ctx) + if err := cmp.Or(connErr, ctxErr); err != nil { bl.Cancel() return err } diff --git a/pkg/gotd/telegram/connect.go b/pkg/gotd/telegram/connect.go index e5d114e2..db7e2fd1 100644 --- a/pkg/gotd/telegram/connect.go +++ b/pkg/gotd/telegram/connect.go @@ -139,8 +139,8 @@ func (c *Client) Run(ctx context.Context, f func(ctx context.Context) error) (er // handling or pool creation. c.ctx, c.cancel = context.WithCancel(ctx) - c.log.Info("Starting") - defer c.log.Info("Closed") + c.log.Info("Client starting") + defer c.log.Info("Client closed") // Cancel client on exit. defer c.cancel() defer func() {