1
0
Fork 0
mirror of https://github.com/mautrix/whatsapp.git synced 2026-05-14 17:56:53 -04:00

Compare commits

...

1 commit

Author SHA1 Message Date
Adam Van Ymeren
cd7f229ca6 resync: resync portals/ghosts immediately upon viewing
- refactor all the resync delays to a single per ghost/portal resync interval
- trigger resync immediately upon opening a room
- also resync portals if bridge capabilities has changed
2025-09-25 09:05:02 -07:00
5 changed files with 76 additions and 118 deletions

View file

@ -45,7 +45,6 @@ func (wa *WhatsAppClient) getChatInfo(ctx context.Context, portalJID types.JID,
return nil, err
}
wrapped = wa.wrapGroupInfo(ctx, info)
wrapped.ExtraUpdates = bridgev2.MergeExtraUpdaters(wrapped.ExtraUpdates, updatePortalLastSyncAt)
case types.NewsletterServer:
info, err := wa.Client.GetNewsletterInfo(portalJID)
if err != nil {
@ -60,6 +59,9 @@ func (wa *WhatsAppClient) getChatInfo(ctx context.Context, portalJID types.JID,
}
func (wa *WhatsAppClient) addExtrasToWrapped(ctx context.Context, portalJID types.JID, wrapped *bridgev2.ChatInfo, conv *wadb.Conversation) {
_, capVer := wa.Main.GetBridgeInfoVersion()
wrapped.ExtraUpdates = bridgev2.MergeExtraUpdaters(wrapped.ExtraUpdates, updatePortalSyncMeta(capVer))
if conv == nil {
var err error
conv, err = wa.Main.DB.Conversation.Get(ctx, wa.UserLogin.ID, portalJID)
@ -73,11 +75,15 @@ func (wa *WhatsAppClient) addExtrasToWrapped(ctx context.Context, portalJID type
wa.applyChatSettings(ctx, portalJID, wrapped)
}
func updatePortalLastSyncAt(_ context.Context, portal *bridgev2.Portal) bool {
meta := portal.Metadata.(*waid.PortalMetadata)
forceSave := time.Since(meta.LastSync.Time) > 24*time.Hour
meta.LastSync = jsontime.UnixNow()
return forceSave
func updatePortalSyncMeta(expectedCapVer int) bridgev2.ExtraUpdater[*bridgev2.Portal] {
return func(_ context.Context, portal *bridgev2.Portal) bool {
meta := portal.Metadata.(*waid.PortalMetadata)
meta.LastSync = jsontime.UnixNow()
if meta.BridgeCapsVersion != expectedCapVer {
meta.BridgeCapsVersion = expectedCapVer
}
return true
}
}
func updateDisappearingTimerSetAt(ts int64) bridgev2.ExtraUpdater[*bridgev2.Portal] {

View file

@ -22,7 +22,6 @@ import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/rs/zerolog"
"go.mau.fi/util/exsync"
@ -49,7 +48,7 @@ func (wa *WhatsAppConnector) LoadUserLogin(ctx context.Context, login *bridgev2.
historySyncs: make(chan *waHistorySync.HistorySync, 64),
historySyncWakeup: make(chan struct{}, 1),
resyncQueue: make(map[types.JID]resyncQueueItem),
resyncQueueCh: make(chan resyncQueueItem, 256),
directMediaRetries: make(map[networkid.MessageID]*directMediaRetry),
mediaRetryLock: semaphore.NewWeighted(wa.Config.HistorySync.MediaRequests.MaxAsyncHandle),
pushNamesSynced: exsync.NewEvent(),
@ -107,9 +106,8 @@ type WhatsAppClient struct {
historySyncs chan *waHistorySync.HistorySync
historySyncWakeup chan struct{}
stopLoops atomic.Pointer[context.CancelFunc]
resyncQueue map[types.JID]resyncQueueItem
resyncQueueCh chan resyncQueueItem
resyncQueueLock sync.Mutex
nextResync time.Time
directMediaRetries map[networkid.MessageID]*directMediaRetry
directMediaLock sync.Mutex
mediaRetryLock *semaphore.Weighted
@ -325,7 +323,7 @@ func (wa *WhatsAppClient) startLoops() {
(*oldStop)()
}
go wa.historySyncLoop(ctx)
go wa.ghostResyncLoop(ctx)
go wa.resyncLoop(ctx)
if mrc := wa.Main.Config.HistorySync.MediaRequests; mrc.AutoRequestMedia && mrc.RequestMethod == MediaRequestMethodLocalTime {
go wa.mediaRequestLoop(ctx)
}
@ -415,14 +413,11 @@ func (wa *WhatsAppClient) HandleMatrixViewingChat(ctx context.Context, msg *brid
}
}
if msg.Portal == nil || msg.Portal.Metadata.(*waid.PortalMetadata).LastSync.Add(5*time.Minute).After(time.Now()) {
// If we resynced this portal within the last 5 minutes, don't do it again
if msg.Portal == nil {
return nil
}
// Reset, but don't save, portal last sync time for immediate sync now
msg.Portal.Metadata.(*waid.PortalMetadata).LastSync.Time = time.Time{}
// Enqueue for the sync, don't block on it completing
// Ask the queue to resync if needed (old or version mismatch). No-op otherwise.
wa.EnqueuePortalResync(msg.Portal)
if msg.Portal.OtherUserID != "" {
@ -435,8 +430,6 @@ func (wa *WhatsAppClient) HandleMatrixViewingChat(ctx context.Context, msg *brid
Str("other_user_id", string(msg.Portal.OtherUserID)).
Msg("No ghost found for other user in portal")
} else {
// Reset, but don't save, portal last sync time for immediate sync now
ghost.Metadata.(*waid.GhostMetadata).LastSync.Time = time.Time{}
wa.EnqueueGhostResync(ghost)
}
}

View file

@ -119,9 +119,10 @@ func fnSync(ce *commands.Event) {
ce.Reply("Failed to get joined groups: %v", err)
return
}
_, capVer := wa.Main.GetBridgeInfoVersion()
for _, group := range groups {
wrapped := wa.wrapGroupInfo(ce.Ctx, group)
wrapped.ExtraUpdates = bridgev2.MergeExtraUpdaters(wrapped.ExtraUpdates, updatePortalLastSyncAt)
wrapped.ExtraUpdates = bridgev2.MergeExtraUpdaters(wrapped.ExtraUpdates, updatePortalSyncMeta(capVer))
wa.addExtrasToWrapped(ce.Ctx, group.JID, wrapped, nil)
login.QueueRemoteEvent(&simplevent.ChatResync{
EventMeta: simplevent.EventMeta{

View file

@ -5,13 +5,11 @@ import (
"crypto/sha256"
"errors"
"fmt"
"math/rand/v2"
"regexp"
"strconv"
"time"
"github.com/rs/zerolog"
"go.mau.fi/util/exzerolog"
"go.mau.fi/util/jsontime"
"go.mau.fi/util/ptr"
"go.mau.fi/whatsmeow"
@ -32,139 +30,97 @@ func (wa *WhatsAppClient) EnqueueGhostResync(ghost *bridgev2.Ghost) {
if ghost.Metadata.(*waid.GhostMetadata).LastSync.Add(ResyncMinInterval).After(time.Now()) {
return
}
wa.resyncQueueLock.Lock()
jid := waid.ParseUserID(ghost.ID)
if _, exists := wa.resyncQueue[jid]; !exists {
wa.resyncQueue[jid] = resyncQueueItem{ghost: ghost}
nextResyncIn := time.Until(wa.nextResync).String()
if wa.nextResync.IsZero() {
nextResyncIn = "never"
}
wa.UserLogin.Log.Debug().
Stringer("jid", jid).
Str("next_resync_in", nextResyncIn).
Msg("Enqueued resync for ghost")
wa.UserLogin.Log.Debug().Stringer("jid", jid).Msg("Enqueued resync for ghost")
select {
case wa.resyncQueueCh <- resyncQueueItem{ghost: ghost}:
default:
wa.UserLogin.Log.Warn().Stringer("jid", jid).Msg("Resync queue channel full, dropping ghost resync")
}
wa.resyncQueueLock.Unlock()
}
func (wa *WhatsAppClient) EnqueuePortalResync(portal *bridgev2.Portal) {
jid, _ := waid.ParsePortalID(portal.ID)
if jid.Server != types.GroupServer || portal.Metadata.(*waid.PortalMetadata).LastSync.Add(ResyncMinInterval).After(time.Now()) {
meta := portal.Metadata.(*waid.PortalMetadata)
_, capVer := wa.Main.GetBridgeInfoVersion()
isOld := meta.LastSync.Add(ResyncMinInterval).Before(time.Now())
versionMismatch := meta.BridgeCapsVersion != capVer
if !isOld && !versionMismatch {
return
}
wa.resyncQueueLock.Lock()
if _, exists := wa.resyncQueue[jid]; !exists {
wa.resyncQueue[jid] = resyncQueueItem{portal: portal}
wa.UserLogin.Log.Debug().
Stringer("jid", jid).
Stringer("next_resync_in", time.Until(wa.nextResync)).
Msg("Enqueued resync for portal")
wa.UserLogin.Log.Debug().Stringer("jid", jid).Msg("Enqueued resync for portal")
select {
case wa.resyncQueueCh <- resyncQueueItem{portal: portal}:
default:
wa.UserLogin.Log.Warn().Stringer("jid", jid).Msg("Resync queue channel full, dropping portal resync")
}
wa.resyncQueueLock.Unlock()
}
func (wa *WhatsAppClient) ghostResyncLoop(ctx context.Context) {
log := wa.UserLogin.Log.With().Str("action", "ghost resync loop").Logger()
func (wa *WhatsAppClient) resyncLoop(ctx context.Context) {
log := wa.UserLogin.Log.With().Str("action", "resync loop").Logger()
ctx = log.WithContext(ctx)
wa.nextResync = time.Now().Add(ResyncLoopInterval).Add(-time.Duration(rand.IntN(ResyncJitterSeconds)) * time.Second)
timer := time.NewTimer(time.Until(wa.nextResync))
log.Info().Time("first_resync", wa.nextResync).Msg("Ghost resync queue starting")
log.Info().Msg("Resync queue starting")
for {
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
}
queue := wa.rotateResyncQueue()
timer.Reset(time.Until(wa.nextResync))
if len(queue) > 0 {
wa.doGhostResync(ctx, queue)
} else {
log.Trace().Msg("Nothing in background resync queue")
case item := <-wa.resyncQueueCh:
// Re-check gating after dequeue
if item.ghost != nil {
lastSync := item.ghost.Metadata.(*waid.GhostMetadata).LastSync.Time
if time.Since(lastSync) < ResyncMinInterval {
continue
}
} else if item.portal != nil {
meta := item.portal.Metadata.(*waid.PortalMetadata)
_, capVer := wa.Main.GetBridgeInfoVersion()
if time.Since(meta.LastSync.Time) < ResyncMinInterval && meta.BridgeCapsVersion == capVer {
continue
}
}
wa.doResync(ctx, item)
}
}
}
func (wa *WhatsAppClient) rotateResyncQueue() map[types.JID]resyncQueueItem {
wa.resyncQueueLock.Lock()
defer wa.resyncQueueLock.Unlock()
wa.nextResync = time.Now().Add(ResyncLoopInterval)
if len(wa.resyncQueue) == 0 {
return nil
}
queue := wa.resyncQueue
wa.resyncQueue = make(map[types.JID]resyncQueueItem)
return queue
}
func (wa *WhatsAppClient) doGhostResync(ctx context.Context, queue map[types.JID]resyncQueueItem) {
func (wa *WhatsAppClient) doResync(ctx context.Context, item resyncQueueItem) {
log := zerolog.Ctx(ctx)
if !wa.IsLoggedIn() {
log.Warn().Msg("Not logged in, skipping background resyncs")
log.Warn().Msg("Not logged in, skipping background resync")
return
}
log.Debug().Msg("Starting background resyncs")
defer log.Debug().Msg("Background resyncs finished")
var ghostJIDs []types.JID
var ghosts []*bridgev2.Ghost
var portals []*bridgev2.Portal
for jid, item := range queue {
var lastSync time.Time
if item.ghost != nil {
lastSync = item.ghost.Metadata.(*waid.GhostMetadata).LastSync.Time
} else if item.portal != nil {
lastSync = item.portal.Metadata.(*waid.PortalMetadata).LastSync.Time
}
if lastSync.Add(ResyncMinInterval).After(time.Now()) {
log.Debug().
Stringer("jid", jid).
Time("last_sync", lastSync).
Msg("Not resyncing, last sync was too recent")
continue
}
if item.ghost != nil {
ghosts = append(ghosts, item.ghost)
ghostJIDs = append(ghostJIDs, jid)
} else if item.portal != nil {
portals = append(portals, item.portal)
}
}
for _, portal := range portals {
log.Debug().Msg("Starting background resync")
defer log.Debug().Msg("Background resync finished")
if item.portal != nil {
portal := item.portal
wa.UserLogin.QueueRemoteEvent(&simplevent.ChatResync{
EventMeta: simplevent.EventMeta{
Type: bridgev2.RemoteEventChatResync,
LogContext: func(c zerolog.Context) zerolog.Context {
return c.Str("sync_reason", "queue")
},
PortalKey: portal.PortalKey,
Type: bridgev2.RemoteEventChatResync,
LogContext: func(c zerolog.Context) zerolog.Context { return c.Str("sync_reason", "resync_queue") },
PortalKey: portal.PortalKey,
},
GetChatInfoFunc: wa.GetChatInfo,
})
}
if len(ghostJIDs) == 0 {
return
}
log.Debug().Array("jids", exzerolog.ArrayOfStringers(ghostJIDs)).Msg("Doing background sync for users")
infos, err := wa.Client.GetUserInfo(ghostJIDs)
if err != nil {
log.Err(err).Msg("Failed to get user info for background sync")
return
}
for _, ghost := range ghosts {
jid := waid.ParseUserID(ghost.ID)
if item.ghost != nil {
jid := waid.ParseUserID(item.ghost.ID)
infos, err := wa.Client.GetUserInfo([]types.JID{jid})
if err != nil {
log.Err(err).Stringer("jid", jid).Msg("Failed to get user info for background sync")
return
}
info, ok := infos[jid]
if !ok {
log.Warn().Stringer("jid", jid).Msg("Didn't get info for puppet in background sync")
continue
return
}
userInfo, err := wa.getUserInfo(ctx, jid, info.PictureID != "" && string(ghost.AvatarID) != info.PictureID)
userInfo, err := wa.getUserInfo(ctx, jid, info.PictureID != "" && string(item.ghost.AvatarID) != info.PictureID)
if err != nil {
log.Err(err).Stringer("jid", jid).Msg("Failed to get user info for puppet in background sync")
continue
return
}
ghost.UpdateInfo(ctx, userInfo)
item.ghost.UpdateInfo(ctx, userInfo)
wa.syncAltGhostWithInfo(ctx, jid, userInfo)
}
}
@ -243,9 +199,8 @@ func (wa *WhatsAppClient) contactToUserInfo(ctx context.Context, jid types.JID,
func updateGhostLastSyncAt(_ context.Context, ghost *bridgev2.Ghost) bool {
meta := ghost.Metadata.(*waid.GhostMetadata)
forceSave := time.Since(meta.LastSync.Time) > 24*time.Hour
meta.LastSync = jsontime.UnixNow()
return forceSave
return true
}
var expiryRegex = regexp.MustCompile("oe=([0-9A-Fa-f]+)")

View file

@ -111,6 +111,9 @@ type PortalMetadata struct {
CommunityAnnouncementGroup bool `json:"is_cag,omitempty"`
AddressingMode types.AddressingMode `json:"addressing_mode,omitempty"`
LIDMigrationAttempted bool `json:"lid_migration_attempted,omitempty"`
// BridgeCapsVersion stores the last seen capabilities version for this portal,
// used to decide if a lazy resync-on-view is needed when the bridge version changes.
BridgeCapsVersion int `json:"bridge_caps_version,omitempty"`
}
type GhostMetadata struct {