package connector import ( "context" "errors" "fmt" "slices" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/rs/zerolog" "go.mau.fi/util/ptr" "go.mau.fi/whatsmeow" "go.mau.fi/whatsmeow/proto/waE2E" "go.mau.fi/whatsmeow/proto/waHistorySync" "go.mau.fi/whatsmeow/proto/waWeb" "go.mau.fi/whatsmeow/types" "google.golang.org/protobuf/proto" "maunium.net/go/mautrix/bridgev2" "maunium.net/go/mautrix/bridgev2/networkid" "maunium.net/go/mautrix/bridgev2/simplevent" "go.mau.fi/mautrix-whatsapp/pkg/connector/wadb" "go.mau.fi/mautrix-whatsapp/pkg/waid" ) var _ bridgev2.BackfillingNetworkAPI = (*WhatsAppClient)(nil) func (wa *WhatsAppClient) historySyncLoop(ctx context.Context) { dispatchTimer := time.NewTimer(wa.Main.Config.HistorySync.DispatchWait) var timerPending atomic.Bool if !wa.isNewLogin && wa.UserLogin.Metadata.(*waid.UserLoginMetadata).HistorySyncPortalsNeedCreating { dispatchTimer.Reset(5 * time.Second) timerPending.Store(true) } else { dispatchTimer.Stop() } if wa.Client.ManualHistorySyncDownload { // Wake up the queue once to check if there are pending notifications select { case wa.historySyncWakeup <- struct{}{}: default: } } wa.UserLogin.Log.Debug().Msg("Starting history sync loops") // Separate loop for creating portals to ensure it doesn't block processing new history sync payloads. go func() { for { select { case <-dispatchTimer.C: timerPending.Store(false) wa.createPortalsFromHistorySync(ctx) case <-ctx.Done(): wa.UserLogin.Log.Debug().Msg("Stopping portal creation history sync loop") return } } }() for { var resetTimer bool select { case <-wa.historySyncWakeup: dispatchTimer.Stop() notif, rowid, err := wa.Main.DB.HSNotif.GetNext(ctx, wa.UserLogin.ID) if err != nil { wa.UserLogin.Log.Err(err).Msg("Failed to get next history sync notification") } else if notif == nil { wa.UserLogin.Log.Debug().Msg("No more queued history sync notifications") } else { resetTimer = wa.downloadAndSaveWAHistorySyncData(ctx, notif, rowid) // Continue waking up the loop until all queued notifications are processed select { case wa.historySyncWakeup <- struct{}{}: default: } } case <-ctx.Done(): wa.UserLogin.Log.Debug().Msg("Stopping main history sync loop") return } if resetTimer { timerPending.Store(true) } if timerPending.Load() { dispatchTimer.Reset(wa.Main.Config.HistorySync.DispatchWait) } } } func (wa *WhatsAppClient) saveWAHistorySyncNotification(ctx context.Context, evt *waE2E.HistorySyncNotification) { err := wa.Main.DB.HSNotif.Put(ctx, wa.UserLogin.ID, evt) if err != nil { wa.UserLogin.Log.Err(err).Msg("Failed to store history sync notification in queue") return } wa.UserLogin.Log.Debug(). Stringer("sync_type", evt.GetSyncType()). Uint32("chunk_order", evt.GetChunkOrder()). Uint32("progress", evt.GetProgress()). Msg("Stored history sync notification in queue") select { case wa.historySyncWakeup <- struct{}{}: default: } } func (wa *WhatsAppClient) downloadAndSaveWAHistorySyncData(ctx context.Context, evt *waE2E.HistorySyncNotification, rowid int) (resetTimer bool) { log := wa.UserLogin.Log.With(). Str("action", "download history sync"). Stringer("sync_type", evt.GetSyncType()). Uint32("chunk_order", evt.GetChunkOrder()). Uint32("progress", evt.GetProgress()). Logger() log.Debug().Msg("Downloading history sync") blob, err := wa.Client.DownloadHistorySync(log.WithContext(ctx), evt, true) if err != nil { log.Err(err).Msg("Failed to download history sync") return } err = wa.Main.DB.DoTxn(ctx, nil, func(ctx context.Context) (innerErr error) { resetTimer, innerErr = wa.handleWAHistorySync(ctx, evt, blob, true) if innerErr != nil { return } innerErr = wa.Main.DB.HSNotif.Delete(ctx, rowid) if innerErr != nil { innerErr = fmt.Errorf("failed to delete queued history sync notification: %w", innerErr) } return }) if err != nil { log.Err(err).Msg("Failed to store history sync notification data") } return } func (wa *WhatsAppClient) handleWAHistorySync( ctx context.Context, notif *waE2E.HistorySyncNotification, evt *waHistorySync.HistorySync, stopOnError bool, ) (bool, error) { if evt == nil || evt.SyncType == nil { return false, nil } log := wa.UserLogin.Log.With(). Str("action", "store history sync"). Stringer("sync_type", evt.GetSyncType()). Uint32("chunk_order", evt.GetChunkOrder()). Uint32("progress", evt.GetProgress()). Logger() ctx = log.WithContext(ctx) if evt.GetGlobalSettings() != nil { log.Debug().Interface("global_settings", evt.GetGlobalSettings()).Msg("Got global settings in history sync") } if evt.GetSyncType() == waHistorySync.HistorySync_INITIAL_STATUS_V3 || evt.GetSyncType() == waHistorySync.HistorySync_PUSH_NAME || evt.GetSyncType() == waHistorySync.HistorySync_NON_BLOCKING_DATA { if evt.GetSyncType() == waHistorySync.HistorySync_PUSH_NAME { wa.pushNamesSynced.Set() } log.Debug(). Int("conversation_count", len(evt.GetConversations())). Int("pushname_count", len(evt.GetPushnames())). Int("status_count", len(evt.GetStatusV3Messages())). Int("recent_sticker_count", len(evt.GetRecentStickers())). Int("past_participant_count", len(evt.GetPastParticipants())). Msg("Ignoring history sync") return false, nil } log.Info(). Int("conversation_count", len(evt.GetConversations())). Int("past_participant_count", len(evt.GetPastParticipants())). Dict("notification_metadata", zerolog.Dict(). Int64("oldest_msg_in_chunk_ts", notif.GetOldestMsgInChunkTimestampSec()). Any("full_request_meta", notif.GetFullHistorySyncOnDemandRequestMetadata()). Any("access_status", notif.GetMessageAccessStatus())). Msg("Storing history sync") start := time.Now() successfullySavedTotal := 0 failedToSaveTotal := 0 totalMessageCount := 0 for _, conv := range evt.GetConversations() { log := log.With(). Int("msg_count", len(conv.GetMessages())). Logger() jid, err := types.ParseJID(conv.GetID()) if err != nil { totalMessageCount += len(conv.GetMessages()) log.Warn().Err(err). Str("chat_jid", conv.GetID()). Msg("Failed to parse chat JID in history sync") continue } else if jid.Server == types.BroadcastServer { log.Debug().Stringer("chat_jid", jid).Msg("Skipping broadcast list in history sync") continue } else { totalMessageCount += len(conv.GetMessages()) } if jid.Server == types.HiddenUserServer { pn, err := wa.GetStore().LIDs.GetPNForLID(ctx, jid) if err != nil { log.Err(err).Stringer("lid", jid).Msg("Failed to get PN for LID in history sync") } else if pn.IsEmpty() { log.Warn().Stringer("lid", jid).Msg("No PN found for LID in history sync") } else { log.Debug(). Stringer("lid", jid). Stringer("pn", pn). Msg("Rerouting LID DM to phone number in history sync") jid = pn } } log.UpdateContext(func(c zerolog.Context) zerolog.Context { return c.Stringer("chat_jid", jid) }) var minTime, maxTime time.Time var minTimeIndex, maxTimeIndex int ignoredTypes := 0 messages := make([]*wadb.HistorySyncMessageTuple, 0, len(conv.GetMessages())) for i, rawMsg := range conv.GetMessages() { // Don't store messages that will just be skipped. msgEvt, err := wa.Client.ParseWebMessage(jid, rawMsg.GetMessage()) if err != nil { log.Warn().Err(err). Int("msg_index", i). Str("msg_id", rawMsg.GetMessage().GetKey().GetID()). Uint64("msg_time_seconds", rawMsg.GetMessage().GetMessageTimestamp()). Msg("Dropping historical message due to parse error") continue } if minTime.IsZero() || msgEvt.Info.Timestamp.Before(minTime) { minTime = msgEvt.Info.Timestamp minTimeIndex = i } if maxTime.IsZero() || msgEvt.Info.Timestamp.After(maxTime) { maxTime = msgEvt.Info.Timestamp maxTimeIndex = i } msgType := getMessageType(msgEvt.Message) if msgType == "ignore" || strings.HasPrefix(msgType, "unknown_protocol_") { ignoredTypes++ continue } marshaled, err := proto.Marshal(rawMsg) if err != nil { log.Warn().Err(err). Int("msg_index", i). Str("msg_id", msgEvt.Info.ID). Msg("Failed to marshal message") continue } messages = append(messages, &wadb.HistorySyncMessageTuple{Info: &msgEvt.Info, Message: marshaled}) } log.Debug(). Int("wrapped_count", len(messages)). Int("ignored_msg_type_count", ignoredTypes). Time("lowest_time", minTime). Int("lowest_time_index", minTimeIndex). Time("highest_time", maxTime). Int("highest_time_index", maxTimeIndex). Dict("metadata", zerolog.Dict(). Uint32("ephemeral_expiration", conv.GetEphemeralExpiration()). Int64("ephemeral_setting_timestamp", conv.GetEphemeralSettingTimestamp()). Uint64("last_message_ts", conv.GetLastMsgTimestamp()). Bool("marked_unread", conv.GetMarkedAsUnread()). Bool("archived", conv.GetArchived()). Uint32("pinned", conv.GetPinned()). Uint64("mute_end", conv.GetMuteEndTime()). Uint32("unread_count", conv.GetUnreadCount()), ). Msg("Collected messages to save from history sync conversation") if len(messages) > 0 { err = wa.Main.DB.Conversation.Put(ctx, wadb.NewConversation(wa.UserLogin.ID, jid, conv, maxTime)) if err != nil { if stopOnError { return false, fmt.Errorf("failed to save conversation metadata for %s: %w", jid, err) } log.Err(err).Msg("Failed to save conversation metadata") continue } err = wa.Main.DB.Message.Put(ctx, wa.UserLogin.ID, jid, messages) if err != nil { if stopOnError { return false, fmt.Errorf("failed to save messages in %s: %w", jid, err) } log.Err(err).Msg("Failed to save messages") failedToSaveTotal += len(messages) } else { successfullySavedTotal += len(messages) } err = wa.Main.Bridge.DB.BackfillTask.MarkNotDone(ctx, wa.makeWAPortalKey(jid), wa.UserLogin.ID) if err != nil { if stopOnError { return false, fmt.Errorf("failed to mark backfill task as not done for %s: %w", jid, err) } log.Err(err).Msg("Failed to mark backfill task as not done") } } } log.Info(). Int("total_saved_count", successfullySavedTotal). Int("total_failed_count", failedToSaveTotal). Int("total_message_count", totalMessageCount). Dur("duration", time.Since(start)). Msg("Finished storing history sync") resetTimer := evt.GetSyncType() == waHistorySync.HistorySync_RECENT || evt.GetSyncType() == waHistorySync.HistorySync_FULL return resetTimer, nil } func (wa *WhatsAppClient) createPortalsFromHistorySync(ctx context.Context) { log := wa.UserLogin.Log.With(). Str("action", "create portals from history sync"). Logger() ctx = log.WithContext(ctx) limit := wa.Main.Config.HistorySync.MaxInitialConversations loginTS := wa.UserLogin.Metadata.(*waid.UserLoginMetadata).LoggedInAt conversations, err := wa.Main.DB.Conversation.GetRecent(ctx, wa.UserLogin.ID, limit, loginTS) if err != nil { log.Err(err).Msg("Failed to get recent conversations from database") return } log.Info(). Int("limit", limit). Int("conversation_count", len(conversations)). Int64("login_timestamp", loginTS.Unix()). Msg("Creating portals from history sync") rateLimitErrors := 0 var wg sync.WaitGroup wg.Add(len(conversations)) for i := 0; i < len(conversations); i++ { if ctx.Err() != nil { log.Warn().Err(ctx.Err()).Msg("Context cancelled, stopping history sync portal creation") return } else if wa.Client == nil { log.Warn().Msg("Client is nil, stopping history sync portal creation") return } conv := conversations[i] if conv.ChatJID == types.StatusBroadcastJID && !wa.Main.Config.EnableStatusBroadcast { wg.Done() continue } else if conv.ChatJID == types.PSAJID || conv.ChatJID == types.LegacyPSAJID { // We don't currently support new PSAs, so don't bother backfilling them either wg.Done() continue } // TODO can the chat info fetch be avoided entirely? select { case <-time.After(time.Duration(rateLimitErrors) * time.Second): case <-ctx.Done(): log.Warn().Err(ctx.Err()).Msg("Context cancelled, stopping history sync portal creation") return } wrappedInfo, err := wa.getChatInfo(ctx, conv.ChatJID, conv, true) if errors.Is(err, whatsmeow.ErrNotInGroup) { log.Debug().Stringer("chat_jid", conv.ChatJID). Msg("Skipping creating room because the user is not a participant") //err = wa.Main.DB.Message.DeleteAllInChat(ctx, wa.UserLogin.ID, conv.ChatJID) //if err != nil { // log.Err(err).Msg("Failed to delete historical messages for portal") //} err = wa.Main.DB.Conversation.Delete(ctx, wa.UserLogin.ID, conv.ChatJID) if err != nil { log.Err(err).Msg("Failed to delete conversation user is not in") } wg.Done() continue } else if errors.Is(err, whatsmeow.ErrIQRateOverLimit) { rateLimitErrors++ i-- log.Err(err).Stringer("chat_jid", conv.ChatJID). Int("error_count", rateLimitErrors). Msg("Ratelimit error getting chat info, retrying after sleep") select { case <-time.After(time.Duration(rateLimitErrors) * time.Second): case <-ctx.Done(): log.Warn().Err(ctx.Err()).Msg("Context cancelled, stopping history sync portal creation") return } continue } else if err != nil { log.Err(err).Stringer("chat_jid", conv.ChatJID).Msg("Failed to get chat info") wg.Done() continue } res := wa.UserLogin.QueueRemoteEvent(&simplevent.ChatResync{ EventMeta: simplevent.EventMeta{ Type: bridgev2.RemoteEventChatResync, LogContext: func(c zerolog.Context) zerolog.Context { return c. Stringer("chat_jid", conv.ChatJID). Time("latest_message_ts", conv.LastMessageTimestamp) }, PortalKey: wa.makeWAPortalKey(conv.ChatJID), CreatePortal: true, PostHandleFunc: func(ctx context.Context, portal *bridgev2.Portal) { err := wa.Main.DB.Conversation.MarkSynced(ctx, wa.UserLogin.ID, conv.ChatJID, loginTS) if err != nil { zerolog.Ctx(ctx).Err(err).Msg("Failed to mark conversation as bridged") } wg.Done() }, }, ChatInfo: wrappedInfo, LatestMessageTS: conv.LastMessageTimestamp, }) if !res.Success { log.Debug().Msg("Cancelling history sync portal creation loop") return } } log.Info().Int("conversation_count", len(conversations)).Msg("Finished creating portals from history sync") go func() { wg.Wait() wa.UserLogin.Metadata.(*waid.UserLoginMetadata).HistorySyncPortalsNeedCreating = false err = wa.UserLogin.Save(ctx) if err != nil { log.Err(err).Msg("Failed to save user login history sync portals created flag") } log.Info().Msg("Finished processing all history sync chat resync events") }() } func (wa *WhatsAppClient) FetchMessages(ctx context.Context, params bridgev2.FetchMessagesParams) (*bridgev2.FetchMessagesResponse, error) { portalJID, err := waid.ParsePortalID(params.Portal.ID) if err != nil { return nil, err } var markRead bool var startTime, endTime *time.Time if params.Forward { if params.AnchorMessage != nil { startTime = ptr.Ptr(params.AnchorMessage.Timestamp) } conv, err := wa.Main.DB.Conversation.Get(ctx, wa.UserLogin.ID, portalJID) if err != nil { return nil, fmt.Errorf("failed to get conversation from database: %w", err) } else if conv != nil { markRead = !ptr.Val(conv.MarkedAsUnread) && ptr.Val(conv.UnreadCount) == 0 } } else if params.Cursor != "" { endTimeUnix, err := strconv.ParseInt(string(params.Cursor), 10, 64) if err != nil { return nil, fmt.Errorf("failed to parse cursor: %w", err) } endTime = ptr.Ptr(time.Unix(endTimeUnix, 0)) } else if params.AnchorMessage != nil { endTime = ptr.Ptr(params.AnchorMessage.Timestamp) } var anchorID types.MessageID if params.AnchorMessage != nil { parsedID, _ := waid.ParseMessageID(params.AnchorMessage.ID) if parsedID != nil { anchorID = parsedID.ID } } messages, err := wa.Main.DB.Message.GetBetween(ctx, wa.UserLogin.ID, portalJID, startTime, endTime, params.Count+1) if err != nil { return nil, fmt.Errorf("failed to load messages from database: %w", err) } else if len(messages) == 0 || (len(messages) == 1 && anchorID != "" && messages[0].GetKey().GetID() == anchorID) { return &bridgev2.FetchMessagesResponse{ HasMore: false, Forward: params.Forward, }, nil } hasMore := false oldestTS := messages[len(messages)-1].GetMessageTimestamp() newestTS := messages[0].GetMessageTimestamp() if len(messages) > params.Count { hasMore = true // For safety, cut off messages with the oldest timestamp in the response. // Otherwise, if there are multiple messages with the same timestamp, the next fetch may miss some. for i := len(messages) - 2; i >= 0; i-- { if messages[i].GetMessageTimestamp() > oldestTS { messages = messages[:i+1] break } } } convertedMessages := make([]*bridgev2.BackfillMessage, len(messages)) var mediaRequests []*wadb.MediaRequest for i, msg := range messages { evt, err := wa.Client.ParseWebMessage(portalJID, msg) if err != nil { // This should never happen because the info is already parsed once before being stored in the database return nil, fmt.Errorf("failed to parse info of message %s: %w", msg.GetKey().GetID(), err) } var mediaReq *wadb.MediaRequest isViewOnce := evt.IsViewOnce || evt.IsViewOnceV2 || evt.IsViewOnceV2Extension convertedMessages[i], mediaReq = wa.convertHistorySyncMessage( ctx, params.Portal, &evt.Info, evt.Message, evt.RawMessage, isViewOnce, msg.Reactions, ) if mediaReq != nil { mediaRequests = append(mediaRequests, mediaReq) } } slices.Reverse(convertedMessages) return &bridgev2.FetchMessagesResponse{ Messages: convertedMessages, Cursor: networkid.PaginationCursor(strconv.FormatUint(oldestTS, 10)), HasMore: hasMore, Forward: endTime == nil, MarkRead: markRead, // TODO set remaining or total count CompleteCallback: func() { // TODO this only deletes after backfilling. If there's no need for backfill after a relogin, // the messages will be stuck in the database var err error if !wa.Main.Bridge.Config.Backfill.Queue.Enabled && !wa.Main.Bridge.Config.Backfill.WillPaginateManually { // If the backfill queue isn't enabled, delete all messages after backfilling a batch. err = wa.Main.DB.Message.DeleteAllInChat(ctx, wa.UserLogin.ID, portalJID) } else { // Otherwise just delete the messages that got backfilled err = wa.Main.DB.Message.DeleteBetween(ctx, wa.UserLogin.ID, portalJID, newestTS, oldestTS) } if err != nil { zerolog.Ctx(ctx).Warn().Err(err).Msg("Failed to delete messages from database after backfill") } if len(mediaRequests) > 0 { go func(ctx context.Context) { for _, req := range mediaRequests { err := wa.Main.DB.MediaRequest.Put(ctx, req) if err != nil { zerolog.Ctx(ctx).Err(err).Msg("Failed to save media request to database") } if wa.Main.Config.HistorySync.MediaRequests.AutoRequestMedia && wa.Main.Config.HistorySync.MediaRequests.RequestMethod == MediaRequestMethodImmediate { wa.sendMediaRequest(ctx, req) } } }(context.WithoutCancel(ctx)) } }, }, nil } func (wa *WhatsAppClient) convertHistorySyncMessage( ctx context.Context, portal *bridgev2.Portal, info *types.MessageInfo, msg, rawMsg *waE2E.Message, isViewOnce bool, reactions []*waWeb.Reaction, ) (*bridgev2.BackfillMessage, *wadb.MediaRequest) { // New messages turn these into edits, but in backfill we only have the last version, // so no need to do the edit thing. Instead, just unwrap the message. if msg.GetAssociatedChildMessage().GetMessage() != nil { msg = msg.GetAssociatedChildMessage().GetMessage() } // TODO use proper intent intent := wa.Main.Bridge.Bot wrapped := &bridgev2.BackfillMessage{ ConvertedMessage: wa.Main.MsgConv.ToMatrix(ctx, portal, wa.Client, intent, msg, rawMsg, info, isViewOnce, true, nil), Sender: wa.makeEventSender(ctx, info.Sender), ID: waid.MakeMessageID(info.Chat, info.Sender, info.ID), TxnID: networkid.TransactionID(waid.MakeMessageID(info.Chat, info.Sender, info.ID)), Timestamp: info.Timestamp, StreamOrder: info.Timestamp.Unix(), Reactions: make([]*bridgev2.BackfillReaction, 0, len(reactions)), } mediaReq := wa.processFailedMedia(ctx, portal.PortalKey, wrapped.ID, wrapped.ConvertedMessage, true) for _, reaction := range reactions { var sender types.JID if reaction.GetKey().GetFromMe() { sender = wa.JID } else if reaction.GetKey().GetParticipant() != "" { sender, _ = types.ParseJID(*reaction.Key.Participant) } else if info.Chat.Server == types.DefaultUserServer || info.Chat.Server == types.BotServer { sender = info.Chat } if sender.IsEmpty() { continue } wrapped.Reactions = append(wrapped.Reactions, &bridgev2.BackfillReaction{ TargetPart: ptr.Ptr(networkid.PartID("")), Timestamp: time.UnixMilli(reaction.GetSenderTimestampMS()), Sender: wa.makeEventSender(ctx, sender), Emoji: reaction.GetText(), }) } return wrapped, mediaReq }