2026-03-19 21:06:02 +00:00
|
|
|
package mirror
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"crypto/rand"
|
|
|
|
|
"fmt"
|
|
|
|
|
"sync"
|
|
|
|
|
"time"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// JobState represents the current state of a mirror job.
|
|
|
|
|
type JobState string
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
JobStatePending JobState = "pending"
|
|
|
|
|
JobStateRunning JobState = "running"
|
|
|
|
|
JobStateComplete JobState = "complete"
|
|
|
|
|
JobStateFailed JobState = "failed"
|
|
|
|
|
JobStateCanceled JobState = "canceled"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const jobTTL = 1 * time.Hour
|
|
|
|
|
const cleanupInterval = 5 * time.Minute //nolint:mnd // cleanup ticker
|
|
|
|
|
|
|
|
|
|
// Job represents an async mirror operation.
|
|
|
|
|
type Job struct {
|
|
|
|
|
ID string `json:"id"`
|
|
|
|
|
State JobState `json:"state"`
|
|
|
|
|
Progress Progress `json:"progress"`
|
|
|
|
|
CreatedAt time.Time `json:"created_at"`
|
|
|
|
|
Error string `json:"error,omitempty"`
|
|
|
|
|
|
|
|
|
|
cancel context.CancelFunc
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// JobRequest is the JSON body for starting a mirror job via the API.
|
|
|
|
|
type JobRequest struct {
|
|
|
|
|
PURLs []string `json:"purls,omitempty"`
|
|
|
|
|
Registry string `json:"registry,omitempty"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// JobStore manages in-memory mirror jobs.
|
|
|
|
|
type JobStore struct {
|
2026-04-18 07:43:22 -04:00
|
|
|
mu sync.RWMutex
|
|
|
|
|
jobs map[string]*Job
|
|
|
|
|
mirror *Mirror
|
2026-04-01 15:40:18 +01:00
|
|
|
parentCtx context.Context
|
2026-03-19 21:06:02 +00:00
|
|
|
}
|
|
|
|
|
|
2026-04-01 15:40:18 +01:00
|
|
|
// NewJobStore creates a new job store. The parent context is used as the base
|
|
|
|
|
// for all job contexts so that jobs are canceled when the server shuts down.
|
|
|
|
|
func NewJobStore(ctx context.Context, m *Mirror) *JobStore {
|
2026-03-19 21:06:02 +00:00
|
|
|
return &JobStore{
|
2026-04-01 15:40:18 +01:00
|
|
|
jobs: make(map[string]*Job),
|
|
|
|
|
mirror: m,
|
|
|
|
|
parentCtx: ctx,
|
2026-03-19 21:06:02 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Create starts a new mirror job and returns its ID.
|
|
|
|
|
func (js *JobStore) Create(req JobRequest) (string, error) {
|
|
|
|
|
source, err := js.sourceFromRequest(req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
id := newJobID()
|
2026-04-01 15:40:18 +01:00
|
|
|
ctx, cancel := context.WithCancel(js.parentCtx)
|
2026-03-19 21:06:02 +00:00
|
|
|
|
|
|
|
|
job := &Job{
|
|
|
|
|
ID: id,
|
|
|
|
|
State: JobStatePending,
|
|
|
|
|
CreatedAt: time.Now(),
|
|
|
|
|
cancel: cancel,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
js.mu.Lock()
|
|
|
|
|
js.jobs[id] = job
|
|
|
|
|
js.mu.Unlock()
|
|
|
|
|
|
2026-04-01 15:40:18 +01:00
|
|
|
go js.runJob(ctx, cancel, job, source)
|
2026-03-19 21:06:02 +00:00
|
|
|
|
|
|
|
|
return id, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Get returns a snapshot of a job by ID. The returned copy is safe to
|
|
|
|
|
// serialize without holding the lock.
|
|
|
|
|
func (js *JobStore) Get(id string) *Job {
|
|
|
|
|
js.mu.RLock()
|
|
|
|
|
defer js.mu.RUnlock()
|
|
|
|
|
job := js.jobs[id]
|
|
|
|
|
if job == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
snapshot := *job
|
|
|
|
|
snapshot.cancel = nil // don't leak cancel func
|
|
|
|
|
if len(job.Progress.Errors) > 0 {
|
|
|
|
|
snapshot.Progress.Errors = make([]MirrorError, len(job.Progress.Errors))
|
|
|
|
|
copy(snapshot.Progress.Errors, job.Progress.Errors)
|
|
|
|
|
}
|
|
|
|
|
return &snapshot
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Cancel cancels a running job.
|
|
|
|
|
func (js *JobStore) Cancel(id string) bool {
|
|
|
|
|
js.mu.Lock()
|
|
|
|
|
defer js.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
job := js.jobs[id]
|
|
|
|
|
if job == nil || job.cancel == nil {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if job.State != JobStatePending && job.State != JobStateRunning {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
job.cancel()
|
|
|
|
|
job.State = JobStateCanceled
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Cleanup removes completed/failed/canceled jobs older than jobTTL.
|
|
|
|
|
func (js *JobStore) Cleanup() {
|
|
|
|
|
js.mu.Lock()
|
|
|
|
|
defer js.mu.Unlock()
|
|
|
|
|
for id, job := range js.jobs {
|
|
|
|
|
if job.State == JobStateComplete || job.State == JobStateFailed || job.State == JobStateCanceled {
|
|
|
|
|
if time.Since(job.CreatedAt) > jobTTL {
|
|
|
|
|
delete(js.jobs, id)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// StartCleanup runs periodic cleanup of old jobs until the context is canceled.
|
|
|
|
|
func (js *JobStore) StartCleanup(ctx context.Context) {
|
|
|
|
|
ticker := time.NewTicker(cleanupInterval)
|
|
|
|
|
defer ticker.Stop()
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return
|
|
|
|
|
case <-ticker.C:
|
|
|
|
|
js.Cleanup()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-01 15:40:18 +01:00
|
|
|
func (js *JobStore) runJob(ctx context.Context, cancel context.CancelFunc, job *Job, source Source) {
|
|
|
|
|
defer cancel()
|
|
|
|
|
|
2026-03-19 21:06:02 +00:00
|
|
|
js.mu.Lock()
|
2026-04-01 16:14:07 +01:00
|
|
|
if job.State == JobStateCanceled {
|
|
|
|
|
js.mu.Unlock()
|
|
|
|
|
return
|
|
|
|
|
}
|
2026-03-19 21:06:02 +00:00
|
|
|
job.State = JobStateRunning
|
|
|
|
|
js.mu.Unlock()
|
|
|
|
|
|
Fix metadata caching, 404 propagation, mirror progress, and registry stubs
- ProxyCached now stores upstream Last-Modified in the cache and uses it
(along with ETag) for conditional request handling, returning 304 when
client validators match. Adds Content-Length to cached responses.
- Handlers calling FetchOrCacheMetadata (pypi, composer, pub, nuget) now
check for ErrUpstreamNotFound and return 404 instead of 502, matching
the existing npm and cargo behavior.
- Mirror jobs report live progress via a periodic callback while running,
so API polls return real counts instead of zeroed progress.
- Registry mirroring removed from CLI flags, API acceptance, README, and
docs since every enumerator was a stub returning "not yet implemented".
- Added tests for the conditional metadata path (ETag/If-None-Match,
Last-Modified/If-Modified-Since, 304 responses, header omission).
2026-04-01 20:14:11 +01:00
|
|
|
progress, err := js.mirror.Run(ctx, source, func(p Progress) {
|
|
|
|
|
js.mu.Lock()
|
|
|
|
|
defer js.mu.Unlock()
|
|
|
|
|
if job.State == JobStateRunning {
|
|
|
|
|
job.Progress = p
|
|
|
|
|
}
|
|
|
|
|
})
|
2026-03-19 21:06:02 +00:00
|
|
|
|
|
|
|
|
js.mu.Lock()
|
|
|
|
|
defer js.mu.Unlock()
|
|
|
|
|
|
2026-04-01 16:14:07 +01:00
|
|
|
// Cancel() may have already set the state; don't overwrite it.
|
|
|
|
|
if job.State == JobStateCanceled {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-19 21:06:02 +00:00
|
|
|
if err != nil {
|
|
|
|
|
job.State = JobStateFailed
|
|
|
|
|
job.Error = err.Error()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
job.Progress = *progress
|
|
|
|
|
if progress.Failed > 0 && progress.Completed == 0 {
|
|
|
|
|
job.State = JobStateFailed
|
|
|
|
|
} else {
|
|
|
|
|
job.State = JobStateComplete
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (js *JobStore) sourceFromRequest(req JobRequest) (Source, error) { //nolint:ireturn // interface return is the design
|
|
|
|
|
switch {
|
|
|
|
|
case len(req.PURLs) > 0:
|
|
|
|
|
return &PURLSource{PURLs: req.PURLs}, nil
|
|
|
|
|
case req.Registry != "":
|
Fix metadata caching, 404 propagation, mirror progress, and registry stubs
- ProxyCached now stores upstream Last-Modified in the cache and uses it
(along with ETag) for conditional request handling, returning 304 when
client validators match. Adds Content-Length to cached responses.
- Handlers calling FetchOrCacheMetadata (pypi, composer, pub, nuget) now
check for ErrUpstreamNotFound and return 404 instead of 502, matching
the existing npm and cargo behavior.
- Mirror jobs report live progress via a periodic callback while running,
so API polls return real counts instead of zeroed progress.
- Registry mirroring removed from CLI flags, API acceptance, README, and
docs since every enumerator was a stub returning "not yet implemented".
- Added tests for the conditional metadata path (ETag/If-None-Match,
Last-Modified/If-Modified-Since, 304 responses, header omission).
2026-04-01 20:14:11 +01:00
|
|
|
return nil, fmt.Errorf("registry mirroring is not yet implemented; use purls instead")
|
2026-03-19 21:06:02 +00:00
|
|
|
default:
|
Fix metadata caching, 404 propagation, mirror progress, and registry stubs
- ProxyCached now stores upstream Last-Modified in the cache and uses it
(along with ETag) for conditional request handling, returning 304 when
client validators match. Adds Content-Length to cached responses.
- Handlers calling FetchOrCacheMetadata (pypi, composer, pub, nuget) now
check for ErrUpstreamNotFound and return 404 instead of 502, matching
the existing npm and cargo behavior.
- Mirror jobs report live progress via a periodic callback while running,
so API polls return real counts instead of zeroed progress.
- Registry mirroring removed from CLI flags, API acceptance, README, and
docs since every enumerator was a stub returning "not yet implemented".
- Added tests for the conditional metadata path (ETag/If-None-Match,
Last-Modified/If-Modified-Since, 304 responses, header omission).
2026-04-01 20:14:11 +01:00
|
|
|
return nil, fmt.Errorf("request must include purls")
|
2026-03-19 21:06:02 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// newJobID generates a random hex job ID.
|
|
|
|
|
func newJobID() string {
|
|
|
|
|
b := make([]byte, 16) //nolint:mnd // 128-bit ID
|
|
|
|
|
_, _ = rand.Read(b)
|
|
|
|
|
return fmt.Sprintf("%x", b)
|
|
|
|
|
}
|