Revert "Revert "client: unblock connect without network""
This reverts commit ea4626107c.
Adds waiting support for initial connection established to avoid locking
up gotd. This isn't extremely pretty but should do the job for now.
This commit is contained in:
+46
-38
@@ -67,6 +67,7 @@ type TelegramClient struct {
|
||||
clientCtx context.Context
|
||||
clientCancel context.CancelFunc
|
||||
clientCloseC <-chan struct{}
|
||||
initialized chan struct{}
|
||||
mu sync.Mutex
|
||||
|
||||
appConfigLock sync.Mutex
|
||||
@@ -164,6 +165,8 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge
|
||||
takeoutAccepted: exsync.NewEvent(),
|
||||
|
||||
prevReactionPoll: map[networkid.PortalKey]time.Time{},
|
||||
|
||||
initialized: make(chan struct{}),
|
||||
}
|
||||
|
||||
if !login.Metadata.(*UserLoginMetadata).Session.HasAuthKey() {
|
||||
@@ -498,48 +501,53 @@ func (t *TelegramClient) Connect(ctx context.Context) {
|
||||
|
||||
var err error
|
||||
t.clientCtx, t.clientCancel = context.WithCancel(ctx)
|
||||
if t.clientCloseC, err = connectTelegramClient(t.clientCtx, t.clientCancel, t.client); err != nil {
|
||||
t.sendBadCredentialsOrUnknownError(err)
|
||||
return
|
||||
}
|
||||
t.updatesCloseC = make(chan struct{})
|
||||
go func() {
|
||||
defer close(t.updatesCloseC)
|
||||
for {
|
||||
err = t.updatesManager.Run(t.clientCtx, t.client.API(), t.telegramUserID, updates.AuthOptions{})
|
||||
if err == nil || errors.Is(err, context.Canceled) {
|
||||
return
|
||||
}
|
||||
|
||||
zerolog.Ctx(t.clientCtx).Err(err).Msg("failed to run updates manager, retrying")
|
||||
|
||||
select {
|
||||
case <-t.clientCtx.Done():
|
||||
return
|
||||
case <-time.After(5 * time.Second):
|
||||
}
|
||||
defer close(t.initialized)
|
||||
if t.clientCloseC, err = connectTelegramClient(t.clientCtx, t.clientCancel, t.client); err != nil {
|
||||
t.sendBadCredentialsOrUnknownError(err)
|
||||
close(t.updatesCloseC)
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer close(t.updatesCloseC)
|
||||
for {
|
||||
err = t.updatesManager.Run(t.clientCtx, t.client.API(), t.telegramUserID, updates.AuthOptions{})
|
||||
if err == nil || errors.Is(err, context.Canceled) {
|
||||
return
|
||||
}
|
||||
|
||||
zerolog.Ctx(t.clientCtx).Err(err).Msg("failed to run updates manager, retrying")
|
||||
|
||||
select {
|
||||
case <-t.clientCtx.Done():
|
||||
return
|
||||
case <-time.After(5 * time.Second):
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Update the logged-in user's ghost info (this also updates the user
|
||||
// login's remote name and profile).
|
||||
if me, err := t.client.Self(t.clientCtx); err != nil {
|
||||
t.sendBadCredentialsOrUnknownError(err)
|
||||
} else if _, err := t.updateGhost(t.clientCtx, t.telegramUserID, me); err != nil {
|
||||
t.sendBadCredentialsOrUnknownError(err)
|
||||
} else {
|
||||
t.userLogin.BridgeState.Send(status.BridgeState{StateEvent: status.StateConnected})
|
||||
}
|
||||
|
||||
// Fix the "Telegram Saved Messages" chat
|
||||
t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{
|
||||
ChatInfo: t.getDMChatInfo(t.telegramUserID),
|
||||
EventMeta: simplevent.EventMeta{
|
||||
Type: bridgev2.RemoteEventChatResync,
|
||||
PortalKey: t.makePortalKeyFromID(ids.PeerTypeUser, t.telegramUserID),
|
||||
CreatePortal: false, // Do not create the portal if it doesn't already exist
|
||||
},
|
||||
})
|
||||
}()
|
||||
|
||||
// Update the logged-in user's ghost info (this also updates the user
|
||||
// login's remote name and profile).
|
||||
if me, err := t.client.Self(t.clientCtx); err != nil {
|
||||
t.sendBadCredentialsOrUnknownError(err)
|
||||
} else if _, err := t.updateGhost(t.clientCtx, t.telegramUserID, me); err != nil {
|
||||
t.sendBadCredentialsOrUnknownError(err)
|
||||
} else {
|
||||
t.userLogin.BridgeState.Send(status.BridgeState{StateEvent: status.StateConnected})
|
||||
}
|
||||
|
||||
// Fix the "Telegram Saved Messages" chat
|
||||
t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{
|
||||
ChatInfo: t.getDMChatInfo(t.telegramUserID),
|
||||
EventMeta: simplevent.EventMeta{
|
||||
Type: bridgev2.RemoteEventChatResync,
|
||||
PortalKey: t.makePortalKeyFromID(ids.PeerTypeUser, t.telegramUserID),
|
||||
CreatePortal: false, // Do not create the portal if it doesn't already exist
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func (t *TelegramClient) Disconnect() {
|
||||
|
||||
@@ -76,6 +76,12 @@ func finalizeLogin(ctx context.Context, user *bridgev2.User, authorization *tg.A
|
||||
}
|
||||
ul.Client.Connect(ul.Log.WithContext(context.Background()))
|
||||
client := ul.Client.(*TelegramClient)
|
||||
// Connecting is non-blocking so wait for gotd to initialize before doing anythign to avoid deadlocking
|
||||
select {
|
||||
case <-client.initialized:
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
me, err := client.client.Self(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -218,6 +218,19 @@ func parseRandomID(txnID networkid.RawTransactionID) int64 {
|
||||
}
|
||||
|
||||
func (t *TelegramClient) HandleMatrixMessage(ctx context.Context, msg *bridgev2.MatrixMessage) (resp *bridgev2.MatrixMessageResponse, err error) {
|
||||
// Handle Matrix events only after initial connection has been established to avoid deadlocking gotd
|
||||
select {
|
||||
case <-t.initialized:
|
||||
default:
|
||||
zerolog.Ctx(ctx).Warn().Msg("Got Matrix event before connected, blocking until done")
|
||||
|
||||
select {
|
||||
case <-t.initialized:
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
peer, err := t.inputPeerForPortalID(ctx, msg.Portal.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
Reference in New Issue
Block a user