2026-03-19 21:06:02 +00:00
|
|
|
// Package mirror provides selective package mirroring for pre-populating the proxy cache.
|
|
|
|
|
package mirror
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"fmt"
|
|
|
|
|
"log/slog"
|
|
|
|
|
"sync"
|
|
|
|
|
"sync/atomic"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/git-pkgs/proxy/internal/database"
|
|
|
|
|
"github.com/git-pkgs/proxy/internal/handler"
|
|
|
|
|
"github.com/git-pkgs/proxy/internal/storage"
|
|
|
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Mirror pre-populates the proxy cache from various input sources.
|
|
|
|
|
type Mirror struct {
|
|
|
|
|
proxy *handler.Proxy
|
|
|
|
|
db *database.DB
|
|
|
|
|
storage storage.Storage
|
|
|
|
|
logger *slog.Logger
|
|
|
|
|
workers int
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// New creates a new Mirror with the given dependencies.
|
|
|
|
|
func New(proxy *handler.Proxy, db *database.DB, store storage.Storage, logger *slog.Logger, workers int) *Mirror {
|
|
|
|
|
if workers < 1 {
|
|
|
|
|
workers = 1
|
|
|
|
|
}
|
|
|
|
|
return &Mirror{
|
|
|
|
|
proxy: proxy,
|
|
|
|
|
db: db,
|
|
|
|
|
storage: store,
|
|
|
|
|
logger: logger,
|
|
|
|
|
workers: workers,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Progress tracks the state of a mirror operation.
|
|
|
|
|
type Progress struct {
|
2026-04-18 07:43:22 -04:00
|
|
|
Total int64 `json:"total"`
|
|
|
|
|
Completed int64 `json:"completed"`
|
|
|
|
|
Skipped int64 `json:"skipped"`
|
|
|
|
|
Failed int64 `json:"failed"`
|
|
|
|
|
Bytes int64 `json:"bytes"`
|
2026-03-19 21:06:02 +00:00
|
|
|
Errors []MirrorError `json:"errors,omitempty"`
|
2026-04-18 07:43:22 -04:00
|
|
|
StartedAt time.Time `json:"started_at"`
|
|
|
|
|
Phase string `json:"phase"`
|
2026-03-19 21:06:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// MirrorError records a single failed mirror attempt.
|
|
|
|
|
type MirrorError struct {
|
|
|
|
|
Ecosystem string `json:"ecosystem"`
|
|
|
|
|
Name string `json:"name"`
|
|
|
|
|
Version string `json:"version"`
|
|
|
|
|
Error string `json:"error"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type progressTracker struct {
|
|
|
|
|
total atomic.Int64
|
|
|
|
|
completed atomic.Int64
|
|
|
|
|
skipped atomic.Int64
|
|
|
|
|
failed atomic.Int64
|
|
|
|
|
bytes atomic.Int64
|
|
|
|
|
mu sync.Mutex
|
|
|
|
|
errors []MirrorError
|
|
|
|
|
startedAt time.Time
|
|
|
|
|
phase atomic.Value // string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func newProgressTracker() *progressTracker {
|
|
|
|
|
pt := &progressTracker{
|
|
|
|
|
startedAt: time.Now(),
|
|
|
|
|
}
|
|
|
|
|
pt.phase.Store("resolving")
|
|
|
|
|
return pt
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-01 15:40:18 +01:00
|
|
|
const maxTrackedErrors = 1000
|
Fix metadata caching, 404 propagation, mirror progress, and registry stubs
- ProxyCached now stores upstream Last-Modified in the cache and uses it
(along with ETag) for conditional request handling, returning 304 when
client validators match. Adds Content-Length to cached responses.
- Handlers calling FetchOrCacheMetadata (pypi, composer, pub, nuget) now
check for ErrUpstreamNotFound and return 404 instead of 502, matching
the existing npm and cargo behavior.
- Mirror jobs report live progress via a periodic callback while running,
so API polls return real counts instead of zeroed progress.
- Registry mirroring removed from CLI flags, API acceptance, README, and
docs since every enumerator was a stub returning "not yet implemented".
- Added tests for the conditional metadata path (ETag/If-None-Match,
Last-Modified/If-Modified-Since, 304 responses, header omission).
2026-04-01 20:14:11 +01:00
|
|
|
const progressReportInterval = 500 * time.Millisecond //nolint:mnd // progress update frequency
|
2026-04-01 15:40:18 +01:00
|
|
|
|
2026-03-19 21:06:02 +00:00
|
|
|
func (pt *progressTracker) addError(eco, name, version, err string) {
|
|
|
|
|
pt.mu.Lock()
|
2026-04-01 15:40:18 +01:00
|
|
|
if len(pt.errors) < maxTrackedErrors {
|
|
|
|
|
pt.errors = append(pt.errors, MirrorError{
|
|
|
|
|
Ecosystem: eco,
|
|
|
|
|
Name: name,
|
|
|
|
|
Version: version,
|
|
|
|
|
Error: err,
|
|
|
|
|
})
|
|
|
|
|
}
|
2026-03-19 21:06:02 +00:00
|
|
|
pt.mu.Unlock()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (pt *progressTracker) snapshot() Progress {
|
|
|
|
|
pt.mu.Lock()
|
|
|
|
|
errs := make([]MirrorError, len(pt.errors))
|
|
|
|
|
copy(errs, pt.errors)
|
|
|
|
|
pt.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
phase, _ := pt.phase.Load().(string)
|
|
|
|
|
return Progress{
|
|
|
|
|
Total: pt.total.Load(),
|
|
|
|
|
Completed: pt.completed.Load(),
|
|
|
|
|
Skipped: pt.skipped.Load(),
|
|
|
|
|
Failed: pt.failed.Load(),
|
|
|
|
|
Bytes: pt.bytes.Load(),
|
|
|
|
|
Errors: errs,
|
|
|
|
|
StartedAt: pt.startedAt,
|
|
|
|
|
Phase: phase,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
Fix metadata caching, 404 propagation, mirror progress, and registry stubs
- ProxyCached now stores upstream Last-Modified in the cache and uses it
(along with ETag) for conditional request handling, returning 304 when
client validators match. Adds Content-Length to cached responses.
- Handlers calling FetchOrCacheMetadata (pypi, composer, pub, nuget) now
check for ErrUpstreamNotFound and return 404 instead of 502, matching
the existing npm and cargo behavior.
- Mirror jobs report live progress via a periodic callback while running,
so API polls return real counts instead of zeroed progress.
- Registry mirroring removed from CLI flags, API acceptance, README, and
docs since every enumerator was a stub returning "not yet implemented".
- Added tests for the conditional metadata path (ETag/If-None-Match,
Last-Modified/If-Modified-Since, 304 responses, header omission).
2026-04-01 20:14:11 +01:00
|
|
|
// ProgressFunc is called periodically with a snapshot of the current progress.
|
|
|
|
|
type ProgressFunc func(Progress)
|
|
|
|
|
|
2026-03-19 21:06:02 +00:00
|
|
|
// Run mirrors all packages from the source using a bounded worker pool.
|
Fix metadata caching, 404 propagation, mirror progress, and registry stubs
- ProxyCached now stores upstream Last-Modified in the cache and uses it
(along with ETag) for conditional request handling, returning 304 when
client validators match. Adds Content-Length to cached responses.
- Handlers calling FetchOrCacheMetadata (pypi, composer, pub, nuget) now
check for ErrUpstreamNotFound and return 404 instead of 502, matching
the existing npm and cargo behavior.
- Mirror jobs report live progress via a periodic callback while running,
so API polls return real counts instead of zeroed progress.
- Registry mirroring removed from CLI flags, API acceptance, README, and
docs since every enumerator was a stub returning "not yet implemented".
- Added tests for the conditional metadata path (ETag/If-None-Match,
Last-Modified/If-Modified-Since, 304 responses, header omission).
2026-04-01 20:14:11 +01:00
|
|
|
// It returns the final progress when complete. If onProgress is non-nil,
|
|
|
|
|
// it is called with progress snapshots as work proceeds.
|
|
|
|
|
func (m *Mirror) Run(ctx context.Context, source Source, onProgress ...ProgressFunc) (*Progress, error) {
|
2026-03-19 21:06:02 +00:00
|
|
|
tracker := newProgressTracker()
|
|
|
|
|
|
|
|
|
|
// Collect items from source
|
|
|
|
|
var items []PackageVersion
|
|
|
|
|
tracker.phase.Store("resolving")
|
|
|
|
|
err := source.Enumerate(ctx, func(pv PackageVersion) error {
|
|
|
|
|
items = append(items, pv)
|
|
|
|
|
return nil
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("enumerating packages: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tracker.total.Store(int64(len(items)))
|
|
|
|
|
tracker.phase.Store("downloading")
|
|
|
|
|
|
Fix metadata caching, 404 propagation, mirror progress, and registry stubs
- ProxyCached now stores upstream Last-Modified in the cache and uses it
(along with ETag) for conditional request handling, returning 304 when
client validators match. Adds Content-Length to cached responses.
- Handlers calling FetchOrCacheMetadata (pypi, composer, pub, nuget) now
check for ErrUpstreamNotFound and return 404 instead of 502, matching
the existing npm and cargo behavior.
- Mirror jobs report live progress via a periodic callback while running,
so API polls return real counts instead of zeroed progress.
- Registry mirroring removed from CLI flags, API acceptance, README, and
docs since every enumerator was a stub returning "not yet implemented".
- Added tests for the conditional metadata path (ETag/If-None-Match,
Last-Modified/If-Modified-Since, 304 responses, header omission).
2026-04-01 20:14:11 +01:00
|
|
|
// Start periodic progress reporting if a callback was provided
|
|
|
|
|
var progressFn ProgressFunc
|
|
|
|
|
if len(onProgress) > 0 && onProgress[0] != nil {
|
|
|
|
|
progressFn = onProgress[0]
|
|
|
|
|
}
|
|
|
|
|
progressDone := make(chan struct{})
|
|
|
|
|
if progressFn != nil {
|
|
|
|
|
progressFn(tracker.snapshot()) // initial snapshot with total set
|
|
|
|
|
go func() {
|
|
|
|
|
ticker := time.NewTicker(progressReportInterval)
|
|
|
|
|
defer ticker.Stop()
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-progressDone:
|
|
|
|
|
return
|
|
|
|
|
case <-ticker.C:
|
|
|
|
|
progressFn(tracker.snapshot())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-19 21:06:02 +00:00
|
|
|
// Process items with bounded concurrency
|
|
|
|
|
g, gctx := errgroup.WithContext(ctx)
|
|
|
|
|
g.SetLimit(m.workers)
|
|
|
|
|
|
|
|
|
|
for _, item := range items {
|
2026-04-01 15:40:18 +01:00
|
|
|
g.Go(func() (err error) {
|
|
|
|
|
defer func() {
|
|
|
|
|
if r := recover(); r != nil {
|
|
|
|
|
m.logger.Error("panic in mirror worker", "recover", r,
|
|
|
|
|
"ecosystem", item.Ecosystem, "name", item.Name, "version", item.Version)
|
|
|
|
|
tracker.failed.Add(1)
|
|
|
|
|
tracker.addError(item.Ecosystem, item.Name, item.Version, fmt.Sprintf("panic: %v", r))
|
|
|
|
|
}
|
|
|
|
|
}()
|
2026-03-19 21:06:02 +00:00
|
|
|
m.mirrorOne(gctx, item, tracker)
|
|
|
|
|
return nil // never fail the group; errors are tracked
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_ = g.Wait()
|
|
|
|
|
|
Fix metadata caching, 404 propagation, mirror progress, and registry stubs
- ProxyCached now stores upstream Last-Modified in the cache and uses it
(along with ETag) for conditional request handling, returning 304 when
client validators match. Adds Content-Length to cached responses.
- Handlers calling FetchOrCacheMetadata (pypi, composer, pub, nuget) now
check for ErrUpstreamNotFound and return 404 instead of 502, matching
the existing npm and cargo behavior.
- Mirror jobs report live progress via a periodic callback while running,
so API polls return real counts instead of zeroed progress.
- Registry mirroring removed from CLI flags, API acceptance, README, and
docs since every enumerator was a stub returning "not yet implemented".
- Added tests for the conditional metadata path (ETag/If-None-Match,
Last-Modified/If-Modified-Since, 304 responses, header omission).
2026-04-01 20:14:11 +01:00
|
|
|
close(progressDone) // stop the progress reporter goroutine
|
|
|
|
|
|
2026-03-19 21:06:02 +00:00
|
|
|
tracker.phase.Store("complete")
|
|
|
|
|
p := tracker.snapshot()
|
Fix metadata caching, 404 propagation, mirror progress, and registry stubs
- ProxyCached now stores upstream Last-Modified in the cache and uses it
(along with ETag) for conditional request handling, returning 304 when
client validators match. Adds Content-Length to cached responses.
- Handlers calling FetchOrCacheMetadata (pypi, composer, pub, nuget) now
check for ErrUpstreamNotFound and return 404 instead of 502, matching
the existing npm and cargo behavior.
- Mirror jobs report live progress via a periodic callback while running,
so API polls return real counts instead of zeroed progress.
- Registry mirroring removed from CLI flags, API acceptance, README, and
docs since every enumerator was a stub returning "not yet implemented".
- Added tests for the conditional metadata path (ETag/If-None-Match,
Last-Modified/If-Modified-Since, 304 responses, header omission).
2026-04-01 20:14:11 +01:00
|
|
|
|
|
|
|
|
// Send final snapshot
|
|
|
|
|
if progressFn != nil {
|
|
|
|
|
progressFn(p)
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-19 21:06:02 +00:00
|
|
|
return &p, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// RunDryRun enumerates what would be mirrored without downloading.
|
|
|
|
|
func (m *Mirror) RunDryRun(ctx context.Context, source Source) ([]PackageVersion, error) {
|
|
|
|
|
var items []PackageVersion
|
|
|
|
|
err := source.Enumerate(ctx, func(pv PackageVersion) error {
|
|
|
|
|
items = append(items, pv)
|
|
|
|
|
return nil
|
|
|
|
|
})
|
|
|
|
|
return items, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (m *Mirror) mirrorOne(ctx context.Context, pv PackageVersion, tracker *progressTracker) {
|
|
|
|
|
result, err := m.proxy.GetOrFetchArtifact(ctx, pv.Ecosystem, pv.Name, pv.Version, "")
|
|
|
|
|
if err != nil {
|
|
|
|
|
tracker.failed.Add(1)
|
|
|
|
|
tracker.addError(pv.Ecosystem, pv.Name, pv.Version, err.Error())
|
|
|
|
|
m.logger.Warn("mirror failed",
|
|
|
|
|
"ecosystem", pv.Ecosystem, "name", pv.Name, "version", pv.Version, "error", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_ = result.Reader.Close()
|
|
|
|
|
|
|
|
|
|
if result.Cached {
|
|
|
|
|
tracker.skipped.Add(1)
|
|
|
|
|
m.logger.Debug("already cached",
|
|
|
|
|
"ecosystem", pv.Ecosystem, "name", pv.Name, "version", pv.Version)
|
|
|
|
|
} else {
|
|
|
|
|
tracker.completed.Add(1)
|
|
|
|
|
tracker.bytes.Add(result.Size)
|
|
|
|
|
m.logger.Info("mirrored",
|
|
|
|
|
"ecosystem", pv.Ecosystem, "name", pv.Name, "version", pv.Version,
|
|
|
|
|
"size", result.Size)
|
|
|
|
|
}
|
|
|
|
|
}
|