From a4aedec0445a9992ab45d3dc7519ab45af0c6f69 Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Thu, 6 Jun 2024 12:26:56 -0600 Subject: [PATCH] dms: implement basic text message handling Signed-off-by: Sumner Evans --- pkg/connector/client.go | 210 ++++++++++++++++++++++++++++++- pkg/connector/connector.go | 28 +---- pkg/connector/ids.go | 71 +++++++++++ pkg/connector/login.go | 10 +- pkg/connector/msgconv/from-tg.go | 12 +- 5 files changed, 287 insertions(+), 44 deletions(-) create mode 100644 pkg/connector/ids.go diff --git a/pkg/connector/client.go b/pkg/connector/client.go index a72797de..c3317190 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -3,21 +3,72 @@ package connector import ( "context" "errors" + "fmt" + "strconv" + "strings" + "time" "github.com/gotd/td/telegram" + "github.com/gotd/td/telegram/message" + "github.com/gotd/td/telegram/updates" + "github.com/gotd/td/telegram/updates/hook" + "github.com/gotd/td/tg" "github.com/rs/zerolog" + "go.mau.fi/zerozap" + "go.uber.org/zap" "maunium.net/go/mautrix/bridgev2" "maunium.net/go/mautrix/bridgev2/database" "maunium.net/go/mautrix/bridgev2/networkid" + "maunium.net/go/mautrix/event" ) type TelegramClient struct { main *TelegramConnector + loginID int64 userLogin *bridgev2.UserLogin client *telegram.Client clientCancel context.CancelFunc } +func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridgev2.UserLogin) (*TelegramClient, error) { + loginID, err := strconv.ParseInt(string(login.ID), 10, 64) + if err != nil { + return nil, err + } + + logger := zerolog.Ctx(ctx).With(). + Str("component", "telegram_client"). + Int64("login_id", loginID). + Logger() + + zaplog := zap.New(zerozap.New(logger)) + + client := TelegramClient{ + main: tc, + loginID: loginID, + userLogin: login, + } + + dispatcher := tg.NewUpdateDispatcher() + dispatcher.OnNewMessage(client.onUpdateNewMessage) + + updatesManager := updates.New(updates.Config{ + Handler: dispatcher, + Logger: zaplog.Named("gaps"), + }) + + client.client = telegram.NewClient(tc.Config.AppID, tc.Config.AppHash, telegram.Options{ + SessionStorage: tc.store.GetSessionStore(loginID), + Logger: zaplog, + UpdateHandler: updatesManager, + Middlewares: []telegram.Middleware{ + hook.UpdateHook(updatesManager.Handle), + }, + }) + client.clientCancel, err = connectTelegramClient(ctx, client.client) + return &client, err +} + var _ bridgev2.NetworkAPI = (*TelegramClient)(nil) // connectTelegramClient blocks until client is connected, calling Run @@ -53,25 +104,176 @@ func connectTelegramClient(ctx context.Context, client *telegram.Client) (contex return cancel, nil } +func (t *TelegramClient) onUpdateNewMessage(ctx context.Context, e tg.Entities, update *tg.UpdateNewMessage) error { + log := zerolog.Ctx(ctx) + msg, ok := update.GetMessage().(*tg.Message) + if !ok { + log.Error().Type("message", update.GetMessage()).Msg("unknown message type") + return nil + } + + var sender bridgev2.EventSender + if msg.Out { + sender.IsFromMe = true + sender.SenderLogin = makeUserLoginID(t.loginID) + sender.Sender = makeUserID(t.loginID) + } else if msg.FromID != nil { + switch from := msg.FromID.(type) { + case *tg.PeerUser: + sender.SenderLogin = makeUserLoginID(from.UserID) + sender.Sender = makeUserID(from.UserID) + default: + fmt.Printf("%+v\n", msg.FromID) + fmt.Printf("%T\n", msg.FromID) + panic("unimplemented FromID") + } + } else if peer, ok := msg.PeerID.(*tg.PeerUser); ok { + sender.SenderLogin = makeUserLoginID(peer.UserID) + sender.Sender = makeUserID(peer.UserID) + } else { + panic("not from anyone") + } + + t.main.Bridge.QueueRemoteEvent(t.userLogin, &bridgev2.SimpleRemoteEvent[*tg.Message]{ + Type: bridgev2.RemoteEventMessage, + LogContext: func(c zerolog.Context) zerolog.Context { + return c. + Int("message_id", update.Message.GetID()) + }, + ID: makeMessageID(msg.ID), + Sender: sender, + PortalID: makePortalID(msg.PeerID), + Data: msg, + CreatePortal: true, + + ConvertMessageFunc: func(ctx context.Context, portal *bridgev2.Portal, intent bridgev2.MatrixAPI, data *tg.Message) (*bridgev2.ConvertedMessage, error) { + cm := &bridgev2.ConvertedMessage{ + Timestamp: time.Unix(int64(data.Date), 0), + } + if data.Message != "" { + converted := bridgev2.ConvertedMessagePart{ + Type: event.EventMessage, + Content: &event.MessageEventContent{MsgType: event.MsgText, Body: data.Message}, + } + cm.Parts = append(cm.Parts, &converted) + } + return cm, nil + }, + }) + return nil +} + func (t *TelegramClient) Connect(ctx context.Context) (err error) { t.clientCancel, err = connectTelegramClient(ctx, t.client) return } +func getFullName(user *tg.User) string { + return strings.TrimSpace(fmt.Sprintf("%s %s", user.FirstName, user.LastName)) +} + +func getFullNamePtr(user *tg.User) *string { + fullName := getFullName(user) + return &fullName +} + func (t *TelegramClient) GetChatInfo(ctx context.Context, portal *bridgev2.Portal) (*bridgev2.PortalInfo, error) { + fmt.Printf("%+v\n", portal) + peerType, id, err := parsePortalID(portal.ID) + if err != nil { + return nil, err + } + isSpace := false + + switch peerType { + case peerTypeUser: + users, err := t.client.API().UsersGetUsers(ctx, []tg.InputUserClass{&tg.InputUser{UserID: id}}) + if err != nil { + return nil, err + } + if len(users) == 0 { + return nil, fmt.Errorf("failed to get user info for user %d", id) + } + if user, ok := users[0].(*tg.User); !ok { + return nil, fmt.Errorf("returned user is not *tg.User") + } else { + isDM := true + return &bridgev2.PortalInfo{ + Name: getFullNamePtr(user), + // Topic *string + // Avatar *Avatar + + Members: []networkid.UserID{makeUserID(id), makeUserID(t.loginID)}, + IsDirectChat: &isDM, + IsSpace: &isSpace, + }, nil + } + } + fmt.Printf("%s %d\n", peerType, id) panic("unimplemented getchatinfo") } func (t *TelegramClient) GetUserInfo(ctx context.Context, ghost *bridgev2.Ghost) (*bridgev2.UserInfo, error) { - panic("unimplemented getuserinfo") + id, err := parseUserID(ghost.ID) + if err != nil { + return nil, err + } + users, err := t.client.API().UsersGetUsers(ctx, []tg.InputUserClass{&tg.InputUser{UserID: id}}) + if err != nil { + return nil, err + } + if len(users) == 0 { + return nil, fmt.Errorf("failed to get user info for user %d", id) + } + if user, ok := users[0].(*tg.User); !ok { + return nil, fmt.Errorf("returned user is not *tg.User") + } else { + var identifiers []string + + if username, ok := user.GetUsername(); ok { + identifiers = append(identifiers, fmt.Sprintf("telegram:%s", username)) + } + if phone, ok := user.GetPhone(); ok { + identifiers = append(identifiers, fmt.Sprintf("tel:+%s", strings.TrimPrefix(phone, "+"))) + } + + return &bridgev2.UserInfo{ + IsBot: &user.Bot, + Name: getFullNamePtr(user), + // Avatar *Avatar + Identifiers: identifiers, + }, nil + } } func (t *TelegramClient) HandleMatrixEdit(ctx context.Context, msg *bridgev2.MatrixEdit) error { panic("unimplemented edit") } -func (t *TelegramClient) HandleMatrixMessage(ctx context.Context, msg *bridgev2.MatrixMessage) (message *database.Message, err error) { - panic("unimplemented message") +func (t *TelegramClient) HandleMatrixMessage(ctx context.Context, msg *bridgev2.MatrixMessage) (dbMessage *database.Message, err error) { + sender := message.NewSender(t.client.API()) + peer, err := inputPeerForPortalID(msg.Portal.ID) + if err != nil { + return nil, err + } + + updates, err := sender.To(peer).Text(ctx, msg.Content.Body) + if err != nil { + return nil, err + } + sentMessage, ok := updates.(*tg.UpdateShortSentMessage) + if !ok { + return nil, fmt.Errorf("unknown update from message response %T", updates) + } + + dbMessage = &database.Message{ + ID: makeMessageID(sentMessage.ID), + MXID: msg.Event.ID, + RoomID: msg.Portal.ID, + SenderID: makeUserID(t.loginID), + Timestamp: time.Unix(int64(sentMessage.Date), 0), + } + return } func (t *TelegramClient) HandleMatrixMessageRemove(ctx context.Context, msg *bridgev2.MatrixMessageRemove) error { @@ -92,7 +294,7 @@ func (t *TelegramClient) IsLoggedIn() bool { } func (t *TelegramClient) IsThisUser(ctx context.Context, userID networkid.UserID) bool { - panic("unimplemented istheiruser") + return userID == networkid.UserID(t.userLogin.ID) } func (t *TelegramClient) LogoutRemote(ctx context.Context) { diff --git a/pkg/connector/connector.go b/pkg/connector/connector.go index d8c623da..2a001a94 100644 --- a/pkg/connector/connector.go +++ b/pkg/connector/connector.go @@ -18,13 +18,8 @@ package connector import ( "context" - "strconv" - "github.com/gotd/td/telegram" - "github.com/rs/zerolog" "go.mau.fi/util/dbutil" - "go.mau.fi/zerozap" - "go.uber.org/zap" "maunium.net/go/mautrix/bridgev2" "go.mau.fi/mautrix-telegram/pkg/store" @@ -58,24 +53,7 @@ func (tg *TelegramConnector) Start(ctx context.Context) error { return tg.store.Upgrade(ctx) } -func (tg *TelegramConnector) LoadUserLogin(ctx context.Context, login *bridgev2.UserLogin) error { - loginID, err := strconv.ParseInt(string(login.ID), 10, 64) - if err != nil { - return err - } - - logger := zerolog.Ctx(ctx).With(). - Str("component", "telegram_client"). - Int64("login_id", loginID). - Logger() - - login.Client = &TelegramClient{ - main: tg, - userLogin: login, - client: telegram.NewClient(tg.Config.AppID, tg.Config.AppHash, telegram.Options{ - SessionStorage: tg.store.GetSessionStore(loginID), - Logger: zap.New(zerozap.New(logger)), - }), - } - return nil +func (tc *TelegramConnector) LoadUserLogin(ctx context.Context, login *bridgev2.UserLogin) (err error) { + login.Client, err = NewTelegramClient(ctx, tc, login) + return } diff --git a/pkg/connector/ids.go b/pkg/connector/ids.go new file mode 100644 index 00000000..43f44c63 --- /dev/null +++ b/pkg/connector/ids.go @@ -0,0 +1,71 @@ +package connector + +import ( + "fmt" + "strconv" + "strings" + + "github.com/gotd/td/tg" + "maunium.net/go/mautrix/bridgev2/networkid" +) + +func makeUserID(userID int64) networkid.UserID { + return networkid.UserID(strconv.FormatInt(userID, 10)) +} + +func parseUserID(userID networkid.UserID) (int64, error) { + return strconv.ParseInt(string(userID), 10, 64) +} + +func makeUserLoginID(userID int64) networkid.UserLoginID { + return networkid.UserLoginID(strconv.FormatInt(userID, 10)) +} + +func makeMessageID(messageID int) networkid.MessageID { + return networkid.MessageID(strconv.Itoa(messageID)) +} + +type peerType string + +const ( + peerTypeUser peerType = "user" + peerTypeChat peerType = "chat" + peerTypeChannel peerType = "channel" +) + +func makePortalID(peer tg.PeerClass) networkid.PortalID { + switch v := peer.(type) { + case *tg.PeerUser: + return networkid.PortalID(fmt.Sprintf("%s:%d", peerTypeUser, v.UserID)) + case *tg.PeerChat: + return networkid.PortalID(fmt.Sprintf("%s:%d", peerTypeChat, v.ChatID)) + case *tg.PeerChannel: + return networkid.PortalID(fmt.Sprintf("%s:%d", peerTypeChannel, v.ChannelID)) + default: + panic(fmt.Errorf("unknown peer class type %T", v)) + } +} + +func parsePortalID(portalID networkid.PortalID) (pt peerType, id int64, err error) { + parts := strings.Split(string(portalID), ":") + pt = peerType(parts[0]) + id, err = strconv.ParseInt(parts[1], 10, 64) + return +} + +func inputPeerForPortalID(portalID networkid.PortalID) (tg.InputPeerClass, error) { + peerType, id, err := parsePortalID(portalID) + if err != nil { + return nil, err + } + switch peerType { + case peerTypeUser: + return &tg.InputPeerUser{UserID: id}, nil + case peerTypeChat: + return &tg.InputPeerChat{ChatID: id}, nil + case peerTypeChannel: + return &tg.InputPeerChannel{ChannelID: id}, nil + default: + panic("invalid peer type") + } +} diff --git a/pkg/connector/login.go b/pkg/connector/login.go index f1f769c2..a6d83bf6 100644 --- a/pkg/connector/login.go +++ b/pkg/connector/login.go @@ -20,8 +20,6 @@ import ( "context" "errors" "fmt" - "strconv" - "strings" "github.com/gotd/td/session" "github.com/gotd/td/telegram" @@ -32,7 +30,6 @@ import ( "go.uber.org/zap" "maunium.net/go/mautrix/bridgev2" "maunium.net/go/mautrix/bridgev2/database" - "maunium.net/go/mautrix/bridgev2/networkid" ) const LoginFlowIDPhone = "phone" @@ -175,10 +172,6 @@ func (p *PhoneLogin) SubmitUserInput(ctx context.Context, input map[string]strin return nil, fmt.Errorf("unexpected state during phone login") } -func makeUserLoginID(userID int64) networkid.UserLoginID { - return networkid.UserLoginID(strconv.FormatInt(userID, 10)) -} - func (p *PhoneLogin) handleAuthSuccess(ctx context.Context, authorization *tg.AuthAuthorization) (*bridgev2.LoginStep, error) { // Now that we have the Telegram user ID, store it in the database and // close the login client. @@ -217,11 +210,10 @@ func (p *PhoneLogin) handleAuthSuccess(ctx context.Context, authorization *tg.Au if err != nil { return nil, err } - name := strings.TrimSpace(fmt.Sprintf("%s %s", user.FirstName, user.LastName)) return &bridgev2.LoginStep{ Type: bridgev2.LoginStepTypeComplete, StepID: completeStep, - Instructions: fmt.Sprintf("Successfully logged in as %d / +%s (%s)", user.ID, user.Phone, name), + Instructions: fmt.Sprintf("Successfully logged in as %d / +%s (%s)", user.ID, user.Phone, getFullName(user)), CompleteParams: &bridgev2.LoginCompleteParams{ UserLoginID: ul.ID, }, diff --git a/pkg/connector/msgconv/from-tg.go b/pkg/connector/msgconv/from-tg.go index 40015999..7cb019b6 100644 --- a/pkg/connector/msgconv/from-tg.go +++ b/pkg/connector/msgconv/from-tg.go @@ -78,12 +78,12 @@ func (mc *MessageConverter) ToMatrix(ctx context.Context, msg tg.MessageClass) * fmt.Printf("photo: %v\n", photo) largest := getLargestPhotoSize(photo.GetSizes()) - file := tg.InputPhotoFileLocation{ - ID: photo.GetID(), - AccessHash: photo.GetAccessHash(), - FileReference: photo.GetFileReference(), - ThumbSize: largest.GetType(), - } + // file := tg.InputPhotoFileLocation{ + // ID: photo.GetID(), + // AccessHash: photo.GetAccessHash(), + // FileReference: photo.GetFileReference(), + // ThumbSize: largest.GetType(), + // } mxc := id.ContentURIString( fmt.Sprintf("mxc://telegram.sumner.user.beeper.com/p.i%d.a%d.f%s.t%s", photo.GetID(), photo.GetAccessHash(), base64.RawURLEncoding.EncodeToString(photo.GetFileReference()), largest.GetType()),