Rework telegram client lifecycle to hopefully fix not stopping issues (#112)
This commit is contained in:
+42
-114
@@ -60,20 +60,19 @@ var (
|
||||
)
|
||||
|
||||
type TelegramClient struct {
|
||||
main *TelegramConnector
|
||||
ScopedStore *store.ScopedStore
|
||||
telegramUserID int64
|
||||
loginID networkid.UserLoginID
|
||||
userID networkid.UserID
|
||||
userLogin *bridgev2.UserLogin
|
||||
client *telegram.Client
|
||||
updatesManager *updates.Manager
|
||||
updatesCloseC chan struct{}
|
||||
clientCtx context.Context
|
||||
clientCancel context.CancelFunc
|
||||
clientCloseC chan struct{}
|
||||
initialized chan struct{}
|
||||
mu sync.Mutex
|
||||
main *TelegramConnector
|
||||
ScopedStore *store.ScopedStore
|
||||
telegramUserID int64
|
||||
loginID networkid.UserLoginID
|
||||
userID networkid.UserID
|
||||
userLogin *bridgev2.UserLogin
|
||||
client *telegram.Client
|
||||
updatesManager *updates.Manager
|
||||
clientCtx context.Context
|
||||
clientCancel context.CancelFunc
|
||||
clientDone *Future[error]
|
||||
clientInitialized *exsync.Event
|
||||
mu sync.Mutex
|
||||
|
||||
appConfigLock sync.Mutex
|
||||
appConfig map[string]any
|
||||
@@ -182,7 +181,7 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge
|
||||
|
||||
prevReactionPoll: map[networkid.PortalKey]time.Time{},
|
||||
|
||||
initialized: make(chan struct{}),
|
||||
clientInitialized: exsync.NewEvent(),
|
||||
}
|
||||
|
||||
if !login.Metadata.(*UserLoginMetadata).Session.HasAuthKey() {
|
||||
@@ -394,38 +393,6 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge
|
||||
return &client, err
|
||||
}
|
||||
|
||||
// connectTelegramClient blocks until client is connected, calling Run
|
||||
// internally.
|
||||
// Technique from: https://github.com/gotd/contrib/blob/master/bg/connect.go
|
||||
func connectTelegramClient(ctx context.Context, cancel context.CancelFunc, client *telegram.Client) (<-chan struct{}, error) {
|
||||
errC := make(chan error, 1)
|
||||
initDone := make(chan struct{})
|
||||
closeC := make(chan struct{})
|
||||
go func() {
|
||||
defer close(errC)
|
||||
defer close(closeC)
|
||||
errC <- client.Run(ctx, func(ctx context.Context) error {
|
||||
close(initDone)
|
||||
<-ctx.Done()
|
||||
if errors.Is(ctx.Err(), context.Canceled) {
|
||||
return nil
|
||||
}
|
||||
return ctx.Err()
|
||||
})
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done(): // context canceled
|
||||
cancel()
|
||||
return nil, fmt.Errorf("context cancelled before init done: %w", ctx.Err())
|
||||
case err := <-errC: // startup timeout
|
||||
cancel()
|
||||
return nil, fmt.Errorf("client connection timeout: %w", err)
|
||||
case <-initDone: // init done
|
||||
}
|
||||
return closeC, nil
|
||||
}
|
||||
|
||||
func (t *TelegramClient) onDead() {
|
||||
prevState := t.userLogin.BridgeState.GetPrev().StateEvent
|
||||
if slices.Contains([]status.BridgeStateEvent{
|
||||
@@ -504,18 +471,20 @@ func (t *TelegramClient) onAuthError(err error) {
|
||||
t.sendBadCredentialsOrUnknownError(err)
|
||||
t.userLogin.Metadata.(*UserLoginMetadata).ResetOnLogout()
|
||||
go func() {
|
||||
t.Disconnect()
|
||||
if err := t.userLogin.Save(context.Background()); err != nil {
|
||||
t.main.Bridge.Log.Err(err).Msg("failed to save user login")
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (t *TelegramClient) Connect(ctx context.Context) {
|
||||
func (t *TelegramClient) Connect(_ context.Context) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
||||
log := zerolog.Ctx(ctx).With().Int64("user_id", t.telegramUserID).Logger()
|
||||
ctx := context.Background()
|
||||
|
||||
log := zerolog.Ctx(context.Background()).With().Int64("user_id", t.telegramUserID).Logger()
|
||||
ctx = log.WithContext(ctx)
|
||||
|
||||
if !t.userLogin.Metadata.(*UserLoginMetadata).Session.HasAuthKey() {
|
||||
log.Warn().Msg("user does not have an auth key, sending bad credentials state")
|
||||
@@ -525,61 +494,28 @@ func (t *TelegramClient) Connect(ctx context.Context) {
|
||||
|
||||
log.Info().Msg("Connecting client")
|
||||
|
||||
t.clientCtx, t.clientCancel = context.WithCancel(ctx)
|
||||
t.clientCloseC = make(chan struct{})
|
||||
t.updatesCloseC = make(chan struct{})
|
||||
// Add a cancellation layer we can use for explicit Disconnect
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
t.clientCtx = ctx
|
||||
t.clientCancel = cancel
|
||||
t.clientDone = NewFuture[error]()
|
||||
t.clientInitialized.Clear()
|
||||
|
||||
runTelegramClient(ctx, t.client, t.clientInitialized, t.clientDone, func(ctx context.Context) error {
|
||||
log.Info().Msg("Client running starting updates")
|
||||
return t.updatesManager.Run(ctx, t.client.API(), t.telegramUserID, updates.AuthOptions{})
|
||||
})
|
||||
}
|
||||
|
||||
func runTelegramClient(ctx context.Context, client *telegram.Client, initialized *exsync.Event, done *Future[error], callback func(ctx context.Context) error) {
|
||||
go func() {
|
||||
defer close(t.initialized)
|
||||
connectClientCloseC, err := connectTelegramClient(t.clientCtx, t.clientCancel, t.client)
|
||||
if err != nil {
|
||||
t.sendBadCredentialsOrUnknownError(err)
|
||||
close(t.updatesCloseC)
|
||||
return
|
||||
}
|
||||
|
||||
// awful hack to prevent assigning clientCloseC from racing Disconnect()
|
||||
go func() {
|
||||
<-connectClientCloseC
|
||||
close(t.clientCloseC)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer close(t.updatesCloseC)
|
||||
for {
|
||||
err = t.updatesManager.Run(t.clientCtx, t.client.API(), t.telegramUserID, updates.AuthOptions{})
|
||||
if err == nil || errors.Is(err, context.Canceled) {
|
||||
return
|
||||
}
|
||||
|
||||
zerolog.Ctx(t.clientCtx).Err(err).Msg("failed to run updates manager, retrying")
|
||||
|
||||
select {
|
||||
case <-t.clientCtx.Done():
|
||||
return
|
||||
case <-time.After(5 * time.Second):
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Update the logged-in user's ghost info (this also updates the user
|
||||
// login's remote name and profile).
|
||||
if me, err := t.client.Self(t.clientCtx); err != nil {
|
||||
t.sendBadCredentialsOrUnknownError(err)
|
||||
} else if _, err := t.updateGhost(t.clientCtx, t.telegramUserID, me); err != nil {
|
||||
t.sendBadCredentialsOrUnknownError(err)
|
||||
} else {
|
||||
t.userLogin.BridgeState.Send(status.BridgeState{StateEvent: status.StateConnected})
|
||||
}
|
||||
|
||||
// Fix the "Telegram Saved Messages" chat
|
||||
t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{
|
||||
ChatInfo: t.getDMChatInfo(t.telegramUserID),
|
||||
EventMeta: simplevent.EventMeta{
|
||||
Type: bridgev2.RemoteEventChatResync,
|
||||
PortalKey: t.makePortalKeyFromID(ids.PeerTypeUser, t.telegramUserID),
|
||||
CreatePortal: false, // Do not create the portal if it doesn't already exist
|
||||
},
|
||||
err := client.Run(ctx, func(ctx context.Context) error {
|
||||
initialized.Set()
|
||||
return callback(ctx)
|
||||
})
|
||||
initialized.Set()
|
||||
done.Set(err)
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -591,17 +527,9 @@ func (t *TelegramClient) Disconnect() {
|
||||
|
||||
if t.clientCancel != nil {
|
||||
t.clientCancel()
|
||||
t.clientCancel = nil
|
||||
}
|
||||
if t.clientCloseC != nil {
|
||||
t.userLogin.Log.Debug().Msg("Waiting for client to finish")
|
||||
<-t.clientCloseC
|
||||
t.clientCloseC = nil
|
||||
}
|
||||
if t.updatesCloseC != nil {
|
||||
t.userLogin.Log.Debug().Msg("Waiting for updates to finish")
|
||||
<-t.updatesCloseC
|
||||
t.updatesCloseC = nil
|
||||
t.userLogin.Log.Info().Msg("Waiting for client")
|
||||
err, _ := t.clientDone.Get(context.Background())
|
||||
t.userLogin.Log.Info().Err(err).Msg("Client done")
|
||||
}
|
||||
|
||||
t.userLogin.Log.Info().Msg("Disconnect complete")
|
||||
|
||||
@@ -0,0 +1,51 @@
|
||||
// mautrix-telegram - A Matrix-Telegram puppeting bridge.
|
||||
// Copyright (C) 2025 Automattic Inc.
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
package connector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Future[T any] struct {
|
||||
value T
|
||||
err error
|
||||
ready chan struct{}
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
func NewFuture[T any]() *Future[T] {
|
||||
return &Future[T]{
|
||||
ready: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (f *Future[T]) Set(value T) {
|
||||
f.once.Do(func() {
|
||||
f.value = value
|
||||
close(f.ready)
|
||||
})
|
||||
}
|
||||
|
||||
func (f *Future[T]) Get(ctx context.Context) (T, error) {
|
||||
select {
|
||||
case <-f.ready:
|
||||
return f.value, nil
|
||||
case <-ctx.Done():
|
||||
return f.value, ctx.Err()
|
||||
}
|
||||
}
|
||||
@@ -75,21 +75,22 @@ func finalizeLogin(ctx context.Context, user *bridgev2.User, authorization *tg.A
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to save new login: %w", err)
|
||||
}
|
||||
ul.Client.Connect(ul.Log.WithContext(context.Background()))
|
||||
ul.Client.Connect(ul.Log.WithContext(ctx))
|
||||
client := ul.Client.(*TelegramClient)
|
||||
|
||||
// Connecting is non-blocking so wait for gotd to initialize before doing anythign to avoid deadlocking
|
||||
select {
|
||||
case <-client.initialized:
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
err = client.clientInitialized.Wait(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
me, err := client.client.Self(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go func() {
|
||||
log := ul.Log.With().Str("component", "login_sync_chats").Logger()
|
||||
if err := client.SyncChats(log.WithContext(context.Background())); err != nil {
|
||||
if err := client.SyncChats(log.WithContext(client.clientCtx)); err != nil {
|
||||
log.Err(err).Msg("Failed to sync chats")
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
"go.mau.fi/util/exsync"
|
||||
"go.mau.fi/zerozap"
|
||||
"go.uber.org/zap"
|
||||
"maunium.net/go/mautrix/bridgev2"
|
||||
@@ -45,7 +46,6 @@ type PhoneLogin struct {
|
||||
authClient *telegram.Client
|
||||
authClientCtx context.Context
|
||||
authClientCancel context.CancelFunc
|
||||
authClientCloseC <-chan struct{}
|
||||
|
||||
phone string
|
||||
hash string
|
||||
@@ -56,9 +56,7 @@ var _ bridgev2.LoginProcessUserInput = (*PhoneLogin)(nil)
|
||||
func (p *PhoneLogin) Cancel() {
|
||||
if p.authClientCancel != nil {
|
||||
p.authClientCancel()
|
||||
}
|
||||
if p.authClientCloseC != nil {
|
||||
<-p.authClientCloseC
|
||||
<-p.authClientCtx.Done()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,11 +86,21 @@ func (p *PhoneLogin) SubmitUserInput(ctx context.Context, input map[string]strin
|
||||
CustomSessionStorage: &p.authData,
|
||||
Logger: zap.New(zerozap.New(zerolog.Ctx(ctx).With().Str("component", "telegram_phone_login_client").Logger())),
|
||||
})
|
||||
var err error
|
||||
p.authClientCtx, p.authClientCancel = context.WithTimeoutCause(log.WithContext(context.Background()), time.Hour, errors.New("phone login took over one hour"))
|
||||
if p.authClientCloseC, err = connectTelegramClient(p.authClientCtx, p.authClientCancel, p.authClient); err != nil {
|
||||
|
||||
p.authClientCtx, p.authClientCancel = context.WithTimeoutCause(log.WithContext(ctx), time.Hour, errors.New("phone login took over one hour"))
|
||||
initialized := exsync.NewEvent()
|
||||
done := NewFuture[error]()
|
||||
runTelegramClient(p.authClientCtx, p.authClient, initialized, done, func(ctx context.Context) error {
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
})
|
||||
|
||||
log.Info().Msg("Waiting for client to connect.")
|
||||
err := initialized.Wait(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sentCode, err := p.authClient.Auth().SendCode(p.authClientCtx, p.phone, auth.SendCodeOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -23,6 +23,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
"go.mau.fi/util/exsync"
|
||||
"go.mau.fi/zerozap"
|
||||
"go.uber.org/zap"
|
||||
"maunium.net/go/mautrix/bridgev2"
|
||||
@@ -48,7 +49,6 @@ type QRLogin struct {
|
||||
|
||||
authClientCtx context.Context
|
||||
authClientCancel context.CancelFunc
|
||||
authClientCloseC <-chan struct{}
|
||||
|
||||
auth chan qrAuthResult
|
||||
qrToken chan qrlogin.Token
|
||||
@@ -62,9 +62,7 @@ var _ bridgev2.LoginProcessUserInput = (*QRLogin)(nil) // For asking for pa
|
||||
func (q *QRLogin) Cancel() {
|
||||
if q.authClientCancel != nil {
|
||||
q.authClientCancel()
|
||||
}
|
||||
if q.authClientCloseC != nil {
|
||||
<-q.authClientCloseC
|
||||
<-q.authClientCtx.Done()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,9 +86,18 @@ func (q *QRLogin) Start(ctx context.Context) (*bridgev2.LoginStep, error) {
|
||||
Logger: zaplog,
|
||||
})
|
||||
|
||||
var err error
|
||||
q.authClientCtx, q.authClientCancel = context.WithTimeoutCause(log.WithContext(context.Background()), time.Hour, errors.New("phone login took over one hour"))
|
||||
if q.authClientCloseC, err = connectTelegramClient(q.authClientCtx, q.authClientCancel, q.authClient); err != nil {
|
||||
q.authClientCtx, q.authClientCancel = context.WithTimeoutCause(log.WithContext(ctx), time.Hour, errors.New("phone login took over one hour"))
|
||||
|
||||
initialized := exsync.NewEvent()
|
||||
done := NewFuture[error]()
|
||||
runTelegramClient(q.authClientCtx, q.authClient, initialized, done, func(ctx context.Context) error {
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
})
|
||||
|
||||
log.Info().Msg("Waiting for client to connect.")
|
||||
err := initialized.Wait(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
+3
-10
@@ -220,16 +220,9 @@ func parseRandomID(txnID networkid.RawTransactionID) int64 {
|
||||
|
||||
func (t *TelegramClient) HandleMatrixMessage(ctx context.Context, msg *bridgev2.MatrixMessage) (resp *bridgev2.MatrixMessageResponse, err error) {
|
||||
// Handle Matrix events only after initial connection has been established to avoid deadlocking gotd
|
||||
select {
|
||||
case <-t.initialized:
|
||||
default:
|
||||
zerolog.Ctx(ctx).Warn().Msg("Got Matrix event before connected, blocking until done")
|
||||
|
||||
select {
|
||||
case <-t.initialized:
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
err = t.clientInitialized.Wait(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
peer, err := t.inputPeerForPortalID(ctx, msg.Portal.ID)
|
||||
|
||||
Reference in New Issue
Block a user