diff --git a/pkg/connector/handletelegram.go b/pkg/connector/handletelegram.go index bccf7cc6..0eaab578 100644 --- a/pkg/connector/handletelegram.go +++ b/pkg/connector/handletelegram.go @@ -70,6 +70,9 @@ func (t *TelegramClient) selfLeaveChat(ctx context.Context, portalKey networkid. Type: bridgev2.RemoteEventChatDelete, PortalKey: t.makePortalKeyFromID(peerType, id, topicID), Sender: t.mySender(), + LogContext: func(c zerolog.Context) zerolog.Context { + return c.AnErr("self_leave_reason", reason) + }, }, OnlyForMe: true, }) @@ -83,6 +86,9 @@ func (t *TelegramClient) selfLeaveChat(ctx context.Context, portalKey networkid. Type: bridgev2.RemoteEventChatDelete, PortalKey: portalKey, Sender: t.mySender(), + LogContext: func(c zerolog.Context) zerolog.Context { + return c.AnErr("self_leave_reason", reason) + }, }, OnlyForMe: true, }) @@ -96,6 +102,9 @@ func (t *TelegramClient) selfLeaveChat(ctx context.Context, portalKey networkid. Type: bridgev2.RemoteEventChatDelete, PortalKey: t.makePortalKeyFromID(peerType, id, ids.TopicIDSpaceRoom), Sender: t.mySender(), + LogContext: func(c zerolog.Context) zerolog.Context { + return c.AnErr("self_leave_reason", reason) + }, }, OnlyForMe: true, }) @@ -145,8 +154,12 @@ func (t *TelegramClient) onUpdateChannel(ctx context.Context, e tg.Entities, upd } else { res := t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.ChatResync{ EventMeta: simplevent.EventMeta{ + Type: bridgev2.RemoteEventChatResync, PortalKey: portalKey, CreatePortal: true, + LogContext: func(c zerolog.Context) zerolog.Context { + return c.Str("tg_event", "updateChannel") + }, }, GetChatInfoFunc: func(ctx context.Context, portal *bridgev2.Portal) (*bridgev2.ChatInfo, error) { chatInfo, mfm, err := t.wrapChatInfo(portal.ID, channel) @@ -972,6 +985,9 @@ func (t *TelegramClient) handleTyping(portal networkid.PortalKey, sender bridgev Type: bridgev2.RemoteEventTyping, PortalKey: portal, Sender: sender, + LogContext: func(c zerolog.Context) zerolog.Context { + return c.Str("tg_event", "update*Typing") + }, }, Timeout: timeout, Type: typingType, @@ -994,6 +1010,9 @@ func (t *TelegramClient) updateReadReceipt(ctx context.Context, e tg.Entities, u SenderLogin: ids.MakeUserLoginID(user.UserID), Sender: ids.MakeUserID(user.UserID), }, + LogContext: func(c zerolog.Context) zerolog.Context { + return c.Str("tg_event", "updateReadHistoryOutbox") + }, }, LastTarget: ids.MakeMessageID(update.Peer, update.MaxID), ReadUpToStreamOrder: int64(update.MaxID), @@ -1007,6 +1026,9 @@ func (t *TelegramClient) onOwnReadReceipt(portalKey networkid.PortalKey, maxID i Type: bridgev2.RemoteEventReadReceipt, PortalKey: portalKey, Sender: t.mySender(), + LogContext: func(c zerolog.Context) zerolog.Context { + return c.Str("tg_event", "updateRead*Inbox") + }, }, LastTarget: ids.MakeMessageID(portalKey, maxID), ReadUpToStreamOrder: int64(maxID), @@ -1208,6 +1230,11 @@ func (t *TelegramClient) onNotifySettings(ctx context.Context, e tg.Entities, up EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventChatInfoChange, PortalKey: portalKey, + LogContext: func(c zerolog.Context) zerolog.Context { + return c. + Str("tg_event", "updateNotifySettings"). + Time("muted_until", *mutedUntil) + }, }, }) return resultToError(res) @@ -1244,6 +1271,11 @@ func (t *TelegramClient) onPinnedDialogs(ctx context.Context, e tg.Entities, msg EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventChatInfoChange, PortalKey: portalKey, + LogContext: func(c zerolog.Context) zerolog.Context { + return c. + Str("tg_event", "updatePinnedDialogs"). + Bool("pinned", true) + }, }, }) if err := resultToError(res); err != nil { @@ -1264,6 +1296,11 @@ func (t *TelegramClient) onPinnedDialogs(ctx context.Context, e tg.Entities, msg EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventChatInfoChange, PortalKey: portalKey, + LogContext: func(c zerolog.Context) zerolog.Context { + return c. + Str("tg_event", "updatePinnedDialogs"). + Bool("pinned", false) + }, }, }) if err := resultToError(res); err != nil { @@ -1287,6 +1324,9 @@ func (t *TelegramClient) onChatDefaultBannedRights(ctx context.Context, entities EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventChatInfoChange, PortalKey: t.makePortalKeyFromPeer(update.Peer, 0), + LogContext: func(c zerolog.Context) zerolog.Context { + return c.Str("tg_event", "updateChatDefaultBannedRights") + }, }, }) return resultToError(res) @@ -1322,6 +1362,9 @@ func (t *TelegramClient) onPeerBlocked(ctx context.Context, e tg.Entities, updat EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventChatResync, PortalKey: t.makePortalKeyFromPeer(update.PeerID, 0), + LogContext: func(c zerolog.Context) zerolog.Context { + return c.Str("tg_event", "updatePeerBlocked") + }, }, }) return resultToError(res) @@ -1358,6 +1401,9 @@ func (t *TelegramClient) onPhoneCall(ctx context.Context, e tg.Entities, update PortalKey: t.makePortalKeyFromID(ids.PeerTypeUser, call.AdminID, 0), CreatePortal: true, Sender: t.senderForUserID(call.AdminID), + LogContext: func(c zerolog.Context) zerolog.Context { + return c.Str("tg_event", "updatePhoneCall") + }, }, ID: networkid.MessageID(fmt.Sprintf("requested-%d", call.ID)), ConvertMessageFunc: func(ctx context.Context, portal *bridgev2.Portal, intent bridgev2.MatrixAPI, data any) (*bridgev2.ConvertedMessage, error) { diff --git a/pkg/gotd/mtproto/conn.go b/pkg/gotd/mtproto/conn.go index 2229bfd9..645a57ae 100644 --- a/pkg/gotd/mtproto/conn.go +++ b/pkg/gotd/mtproto/conn.go @@ -175,7 +175,7 @@ func New(dialer Dialer, opt Options) *Conn { // handleClose closes rpc engine and underlying connection on context done. func (c *Conn) handleClose(ctx context.Context) error { <-ctx.Done() - c.log.Info("Connection context done, closing") + c.log.Info("Connection context done, closing", zap.NamedError("ctx_err", context.Cause(ctx))) // Close RPC Engine. c.rpc.ForceClose() @@ -187,6 +187,8 @@ func (c *Conn) handleClose(ctx context.Context) error { return nil } +var errRunReturned = errors.New("Conn.Run() returned") + // Run initializes MTProto connection to server and blocks until disconnection. // // When connection is ready, Handler.OnSession is called. @@ -199,8 +201,8 @@ func (c *Conn) Run(ctx context.Context, f func(ctx context.Context) error) error return errors.New("do Run on closed connection") } - ctx, cancel := context.WithCancel(ctx) - defer cancel() + ctx, cancel := context.WithCancelCause(ctx) + defer cancel(errRunReturned) c.log.Info("Run: start") defer c.log.Info("Run: end") diff --git a/pkg/gotd/telegram/client.go b/pkg/gotd/telegram/client.go index 2b2f7cb2..5232020e 100644 --- a/pkg/gotd/telegram/client.go +++ b/pkg/gotd/telegram/client.go @@ -114,7 +114,7 @@ type Client struct { // Client context. Will be canceled by Run on exit. ctx context.Context - cancel context.CancelFunc + cancel context.CancelCauseFunc // Client config. appID int // immutable diff --git a/pkg/gotd/telegram/client_test.go b/pkg/gotd/telegram/client_test.go index b3104040..5876d1ed 100644 --- a/pkg/gotd/telegram/client_test.go +++ b/pkg/gotd/telegram/client_test.go @@ -84,7 +84,7 @@ func newTestClient(h testHandler) *Client { appHash: TestAppHash, conn: &testConn{engine: engine, ready: ready}, ctx: context.Background(), - cancel: func() {}, + cancel: func(error) {}, updateHandler: UpdateHandlerFunc(func(ctx context.Context, u tg.UpdatesClass) error { return nil }), onTransfer: noopOnTransfer, } diff --git a/pkg/gotd/telegram/connect.go b/pkg/gotd/telegram/connect.go index db7e2fd1..5dffc7e5 100644 --- a/pkg/gotd/telegram/connect.go +++ b/pkg/gotd/telegram/connect.go @@ -2,6 +2,7 @@ package telegram import ( "context" + "fmt" "time" "github.com/cenkalti/backoff/v4" @@ -117,6 +118,8 @@ func (c *Client) resetReady() { c.ready.Reset() } +var errRunReturned = errors.New("Client.Run() returned") + // Run starts client session and blocks until connection close. // The f callback is called on successful session initialization and Run // will return on f() result. @@ -137,12 +140,12 @@ func (c *Client) Run(ctx context.Context, f func(ctx context.Context) error) (er // Setting up client context for background operations like updates // handling or pool creation. - c.ctx, c.cancel = context.WithCancel(ctx) + c.ctx, c.cancel = context.WithCancelCause(ctx) c.log.Info("Client starting") defer c.log.Info("Client closed") // Cancel client on exit. - defer c.cancel() + defer c.cancel(errRunReturned) defer func() { c.subConnsMux.Lock() defer c.subConnsMux.Unlock() @@ -164,7 +167,7 @@ func (c *Client) Run(ctx context.Context, f func(ctx context.Context) error) (er g.Go(func(ctx context.Context) error { select { case <-ctx.Done(): - c.cancel() + c.cancel(fmt.Errorf("Client.Run group context done: %w", context.Cause(ctx))) return ctx.Err() case <-c.ctx.Done(): return c.ctx.Err() diff --git a/pkg/gotd/telegram/migrate_to_dc_test.go b/pkg/gotd/telegram/migrate_to_dc_test.go index 6310d787..02dbf785 100644 --- a/pkg/gotd/telegram/migrate_to_dc_test.go +++ b/pkg/gotd/telegram/migrate_to_dc_test.go @@ -112,7 +112,7 @@ func newMigrationClient(t *testing.T, h migrationTestHandler) *Client { }), newConnBackoff: defaultBackoff(clock.System), ctx: context.Background(), - cancel: func() {}, + cancel: func(error) {}, migrationTimeout: 10 * time.Second, } client.init()