1
0
Fork 0
mirror of https://github.com/mautrix/whatsapp.git synced 2026-05-15 10:16:52 -04:00
mautrix-whatsapp/pkg/connector/client.go

469 lines
14 KiB
Go

// mautrix-whatsapp - A Matrix-WhatsApp puppeting bridge.
// Copyright (C) 2024 Tulir Asokan
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package connector
import (
"context"
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/rs/zerolog"
"go.mau.fi/util/exsync"
"go.mau.fi/whatsmeow"
"go.mau.fi/whatsmeow/appstate"
waBinary "go.mau.fi/whatsmeow/binary"
"go.mau.fi/whatsmeow/proto/waWa6"
"go.mau.fi/whatsmeow/store"
"go.mau.fi/whatsmeow/types"
"go.mau.fi/whatsmeow/util/keys"
waLog "go.mau.fi/whatsmeow/util/log"
"golang.org/x/sync/semaphore"
"maunium.net/go/mautrix/bridgev2"
"maunium.net/go/mautrix/bridgev2/networkid"
"maunium.net/go/mautrix/bridgev2/status"
"go.mau.fi/mautrix-whatsapp/pkg/waid"
)
func (wa *WhatsAppConnector) LoadUserLogin(ctx context.Context, login *bridgev2.UserLogin) error {
w := &WhatsAppClient{
Main: wa,
UserLogin: login,
MC: noopMCInstance,
historySyncWakeup: make(chan struct{}, 1),
resyncQueue: make(map[types.JID]resyncQueueItem),
directMediaRetries: make(map[networkid.MessageID]*directMediaRetry),
mediaRetryLock: semaphore.NewWeighted(wa.Config.HistorySync.MediaRequests.MaxAsyncHandle),
pushNamesSynced: exsync.NewEvent(),
createDedup: exsync.NewSet[types.MessageID](),
appStateFullSyncAttempted: make(map[appstate.WAPatchName]time.Time),
}
login.Client = w
loginMetadata := login.Metadata.(*waid.UserLoginMetadata)
if loginMetadata.WADeviceID == 0 {
return nil
}
var err error
w.JID = waid.ParseUserLoginID(login.ID, loginMetadata.WADeviceID)
w.Device, err = wa.DeviceStore.GetDevice(ctx, w.JID)
if err != nil {
return err
}
if w.Device != nil {
log := w.UserLogin.Log.With().Str("component", "whatsmeow").Logger()
w.Client = whatsmeow.NewClient(w.Device, waLog.Zerolog(log))
w.Client.AddEventHandlerWithSuccessStatus(w.handleWAEvent)
w.Client.SynchronousAck = true
w.Client.EnableDecryptedEventBuffer = bridgev2.PortalEventBuffer == 0
w.Client.ManualHistorySyncDownload = true
w.Client.SendReportingTokens = true
w.Client.AutomaticMessageRerequestFromPhone = true
w.Client.GetMessageForRetry = w.trackNotFoundRetry
w.Client.PreRetryCallback = w.trackFoundRetry
w.Client.BackgroundEventCtx = w.UserLogin.Log.WithContext(wa.Bridge.BackgroundCtx)
w.Client.SetForceActiveDeliveryReceipts(wa.Config.ForceActiveDeliveryReceipts)
w.Client.InitialAutoReconnect = wa.Config.InitialAutoReconnect
w.Client.UseRetryMessageStore = wa.Config.UseWhatsAppRetryStore
} else {
w.UserLogin.Log.Warn().Stringer("jid", w.JID).Msg("No device found for user in whatsmeow store")
}
return nil
}
type resyncQueueItem struct {
portal *bridgev2.Portal
ghost *bridgev2.Ghost
}
type WhatsAppClient struct {
Main *WhatsAppConnector
UserLogin *bridgev2.UserLogin
Client *whatsmeow.Client
Device *store.Device
JID types.JID
MC mClient
historySyncWakeup chan struct{}
stopLoops atomic.Pointer[context.CancelFunc]
resyncQueue map[types.JID]resyncQueueItem
resyncQueueLock sync.Mutex
nextResync time.Time
directMediaRetries map[networkid.MessageID]*directMediaRetry
directMediaLock sync.Mutex
mediaRetryLock *semaphore.Weighted
offlineSyncWaiter atomic.Pointer[chan error]
isNewLogin bool
pushNamesSynced *exsync.Event
lastPresence types.Presence
createDedup *exsync.Set[types.MessageID]
appStateRecoveryLock sync.Mutex
appStateFullSyncAttempted map[appstate.WAPatchName]time.Time
}
var (
_ bridgev2.NetworkAPI = (*WhatsAppClient)(nil)
_ bridgev2.PushableNetworkAPI = (*WhatsAppClient)(nil)
_ bridgev2.BackgroundSyncingNetworkAPI = (*WhatsAppClient)(nil)
_ bridgev2.ChatViewingNetworkAPI = (*WhatsAppClient)(nil)
)
var pushCfg = &bridgev2.PushConfig{
// TODO fetch this from server instead of hardcoding?
Web: &bridgev2.WebPushConfig{VapidKey: "BIt4eFAVqVxe4yOA5_VLbZTbOlV-2y1FYJ_R4RlxWoyYazAq4glIxI7fh_xLbob1SNv7ZtTWn9mmZCsk2YNXYeY"},
FCM: &bridgev2.FCMPushConfig{SenderID: "293955441834"},
APNs: &bridgev2.APNsPushConfig{BundleID: "net.whatsapp.WhatsApp"},
}
func (wa *WhatsAppClient) GetPushConfigs() *bridgev2.PushConfig {
return pushCfg
}
func (wa *WhatsAppClient) RegisterPushNotifications(ctx context.Context, pushType bridgev2.PushType, token string) error {
if wa.Client == nil {
return bridgev2.ErrNotLoggedIn
}
var pc whatsmeow.PushConfig
switch pushType {
case bridgev2.PushTypeFCM:
pc = &whatsmeow.FCMPushConfig{Token: token}
case bridgev2.PushTypeWeb:
meta := wa.UserLogin.Metadata.(*waid.UserLoginMetadata)
if meta.PushKeys == nil {
meta.GeneratePushKeys()
err := wa.UserLogin.Save(ctx)
if err != nil {
return fmt.Errorf("failed to save push key: %w", err)
}
}
pc = &whatsmeow.WebPushConfig{
Endpoint: token,
Auth: meta.PushKeys.Auth,
P256DH: meta.PushKeys.P256DH,
}
case bridgev2.PushTypeAPNs:
meta := wa.UserLogin.Metadata.(*waid.UserLoginMetadata)
if meta.APNSEncPubKey == nil {
k := keys.NewKeyPair()
meta.APNSEncPubKey = k.Pub[:]
meta.APNSEncPrivKey = k.Priv[:]
err := wa.UserLogin.Save(ctx)
if err != nil {
return fmt.Errorf("failed to save push enc key: %w", err)
}
}
// TODO figure out if the key is supposed to be aes or curve25519
pc = &whatsmeow.APNsPushConfig{Token: token, MsgIDEncKey: meta.APNSEncPubKey}
default:
return fmt.Errorf("unsupported push type %s", pushType)
}
return wa.Client.RegisterForPushNotifications(ctx, pc)
}
func (wa *WhatsAppClient) IsThisUser(_ context.Context, userID networkid.UserID) bool {
return userID == waid.MakeUserID(wa.JID)
}
func (wa *WhatsAppClient) Connect(ctx context.Context) {
if wa.Client == nil {
state := status.BridgeState{
StateEvent: status.StateBadCredentials,
Error: WANotLoggedIn,
}
wa.UserLogin.BridgeState.Send(state)
return
}
wa.UserLogin.BridgeState.Send(status.BridgeState{StateEvent: status.StateConnecting})
wa.Main.firstClientConnectOnce.Do(wa.Main.onFirstClientConnect)
if err := wa.Main.updateProxy(ctx, wa.Client, false); err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to update proxy")
}
if ctx.Err() != nil {
return
}
wa.initMC()
wa.startLoops()
wa.Client.BackgroundEventCtx = wa.UserLogin.Log.WithContext(wa.Main.Bridge.BackgroundCtx)
zerolog.Ctx(ctx).Debug().Msg("Connecting to WhatsApp")
if err := wa.Client.ConnectContext(ctx); err != nil {
wa.callStopLoops()
zerolog.Ctx(ctx).Err(err).Msg("Failed to connect to WhatsApp")
state := status.BridgeState{
StateEvent: status.StateUnknownError,
Error: WAConnectionFailed,
Info: map[string]any{
"go_error": err.Error(),
},
}
wa.UserLogin.BridgeState.Send(state)
}
}
func (wa *WhatsAppClient) notifyOfflineSyncWaiter(err error) {
if ch := wa.offlineSyncWaiter.Load(); ch != nil {
select {
case *ch <- err:
default:
wa.UserLogin.Log.Warn().
AnErr("dropped_error", err).
Msg("Offline sync waiter channel was full, dropping input")
}
}
}
type PushNotificationData struct {
PN string `json:"pn"`
EncIV string `json:"enc_iv"`
EncPayload string `json:"enc_p"`
EncTag string `json:"enc_t"`
EncTimeMicros uint64 `json:"enc_c"`
// TODO unencrypted message ID field
}
type wrappedPushNotificationData struct {
Data PushNotificationData `json:"data"`
}
func (wa *WhatsAppClient) ConnectBackground(ctx context.Context, params *bridgev2.ConnectBackgroundParams) error {
if wa.Client == nil {
return bridgev2.ErrNotLoggedIn
}
wa.Client.BackgroundEventCtx = wa.UserLogin.Log.WithContext(wa.Main.Bridge.BackgroundCtx)
ch := make(chan error, 1)
wa.offlineSyncWaiter.Store(&ch)
defer wa.offlineSyncWaiter.Store(nil)
wa.Main.backgroundConnectOnce.Do(wa.Main.onFirstBackgroundConnect)
if err := wa.Main.updateProxy(ctx, wa.Client, false); err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to update proxy")
}
wa.Client.GetClientPayload = func() *waWa6.ClientPayload {
payload := wa.GetStore().GetClientPayload()
payload.ConnectReason = waWa6.ClientPayload_PUSH.Enum()
return payload
}
defer func() {
if cli := wa.Client; cli != nil {
cli.GetClientPayload = nil
}
}()
err := wa.Client.ConnectContext(ctx)
if err != nil {
return err
}
defer wa.Disconnect()
select {
case <-ctx.Done():
return ctx.Err()
case err = <-ch:
if err == nil {
var data wrappedPushNotificationData
err = json.Unmarshal(params.RawData, &data)
if err == nil && data.Data.PN != "" {
pnErr := wa.sendPNData(ctx, data.Data.PN)
if pnErr != nil {
zerolog.Ctx(ctx).Err(pnErr).Msg("Failed to send PN data")
}
}
}
return err
}
}
func (wa *WhatsAppClient) sendPNData(ctx context.Context, pn string) error {
//lint:ignore SA1019 this is supposed to be dangerous
resp, err := wa.Client.DangerousInternals().SendIQ(ctx, whatsmeow.DangerousInfoQuery{
Namespace: "urn:xmpp:whatsapp:push",
Type: "get",
To: types.ServerJID,
Content: []waBinary.Node{{
Tag: "pn",
Content: pn,
}},
})
if err != nil {
return fmt.Errorf("failed to send pn: %w", err)
}
cat, ok := resp.GetOptionalChildByTag("cat")
if !ok {
return fmt.Errorf("cat element not found in response")
}
catContentBytes, ok := cat.Content.([]byte)
if !ok {
return fmt.Errorf("cat element content is not a byte slice")
}
zerolog.Ctx(ctx).Debug().Str("cat_data", string(catContentBytes)).Msg("Received cat response from sending pn data")
//lint:ignore SA1019 this is supposed to be dangerous
err = wa.Client.DangerousInternals().SendNode(ctx, waBinary.Node{
Tag: "ib",
Content: []waBinary.Node{{
Tag: "cat",
Content: cat.Content,
}},
})
if err != nil {
return fmt.Errorf("failed to broadcast cat: %w", err)
}
zerolog.Ctx(ctx).Debug().Msg("Broadcasted cat from pn data")
return nil
}
func (wa *WhatsAppClient) startLoops() {
ctx, cancel := context.WithCancel(wa.Main.Bridge.BackgroundCtx)
oldStop := wa.stopLoops.Swap(&cancel)
if oldStop != nil {
(*oldStop)()
}
ctx = wa.UserLogin.Log.WithContext(ctx)
go wa.historySyncLoop(ctx)
go wa.ghostResyncLoop(ctx)
if mrc := wa.Main.Config.HistorySync.MediaRequests; mrc.AutoRequestMedia && mrc.RequestMethod == MediaRequestMethodLocalTime {
go wa.mediaRequestLoop(ctx)
}
}
func (wa *WhatsAppClient) GetStore() *store.Device {
if cli := wa.Client; cli != nil {
if currentStore := cli.Store; currentStore != nil {
return currentStore
}
}
wa.UserLogin.Log.Warn().Caller(1).Msg("Returning noop device in GetStore")
return store.NoopDevice
}
func (wa *WhatsAppClient) callStopLoops() {
if stopHistorySyncLoop := wa.stopLoops.Swap(nil); stopHistorySyncLoop != nil {
(*stopHistorySyncLoop)()
}
}
func (wa *WhatsAppClient) Disconnect() {
wa.callStopLoops()
if cli := wa.Client; cli != nil {
cli.Disconnect()
}
}
func (wa *WhatsAppClient) LogoutRemote(ctx context.Context) {
if cli := wa.Client; cli != nil {
err := cli.Logout(ctx)
if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to log out")
}
}
wa.Disconnect()
wa.Client = nil
}
func (wa *WhatsAppClient) IsLoggedIn() bool {
return wa.Client != nil && wa.Client.IsLoggedIn()
}
func (wa *WhatsAppClient) syncRemoteProfile(ctx context.Context, ghost *bridgev2.Ghost) {
ownID := waid.MakeUserID(wa.GetStore().GetJID())
if ghost == nil {
var err error
ghost, err = wa.Main.Bridge.GetExistingGhostByID(ctx, ownID)
if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to get own ghost to sync remote profile")
return
} else if ghost == nil {
return
}
}
if ghost.ID != ownID {
return
}
name := wa.GetStore().BusinessName
if name == "" {
name = wa.GetStore().PushName
}
if name == "" || wa.UserLogin.RemoteProfile.Name == name && wa.UserLogin.RemoteProfile.Avatar == ghost.AvatarMXC {
return
}
wa.UserLogin.RemoteProfile.Name = name
wa.UserLogin.RemoteProfile.Avatar = ghost.AvatarMXC
err := wa.UserLogin.Save(ctx)
if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to save remote profile")
}
// FIXME this might be racy, should invent a proper way to send last state with info filled
if wa.Client.IsConnected() {
wa.UserLogin.BridgeState.Send(status.BridgeState{StateEvent: status.StateConnected})
}
zerolog.Ctx(ctx).Info().Msg("Remote profile updated")
}
func (wa *WhatsAppClient) HandleMatrixViewingChat(ctx context.Context, msg *bridgev2.MatrixViewingChat) error {
var presence types.Presence
if msg.Portal != nil {
presence = types.PresenceAvailable
} else {
presence = types.PresenceUnavailable
}
if wa.lastPresence != presence {
err := wa.updatePresence(ctx, presence)
if err != nil {
zerolog.Ctx(ctx).Warn().Err(err).Msg("Failed to set presence when viewing chat")
}
}
if msg.Portal == nil || msg.Portal.Metadata.(*waid.PortalMetadata).LastSync.Add(5*time.Minute).After(time.Now()) {
// If we resynced this portal within the last 5 minutes, don't do it again
return nil
}
// Reset, but don't save, portal last sync time for immediate sync now
msg.Portal.Metadata.(*waid.PortalMetadata).LastSync.Time = time.Time{}
// Enqueue for the sync, don't block on it completing
wa.EnqueuePortalResync(msg.Portal, true)
if msg.Portal.OtherUserID != "" {
// If this is a DM, also sync the ghost of the other user immediately
ghost, err := wa.Main.Bridge.GetExistingGhostByID(ctx, msg.Portal.OtherUserID)
if err != nil {
return fmt.Errorf("failed to get ghost for sync: %w", err)
} else if ghost == nil {
zerolog.Ctx(ctx).Warn().
Str("other_user_id", string(msg.Portal.OtherUserID)).
Msg("No ghost found for other user in portal")
} else {
// Reset, but don't save, portal last sync time for immediate sync now
ghost.Metadata.(*waid.GhostMetadata).LastSync.Time = time.Time{}
wa.EnqueueGhostResync(ghost)
}
}
return nil
}
func (wa *WhatsAppClient) updatePresence(ctx context.Context, presence types.Presence) error {
err := wa.Client.SendPresence(ctx, presence)
if err == nil {
wa.lastPresence = presence
}
return err
}