pkg-proxy/internal/mirror/mirror.go

228 lines
5.9 KiB
Go
Raw Permalink Normal View History

// Package mirror provides selective package mirroring for pre-populating the proxy cache.
package mirror
import (
"context"
"fmt"
"log/slog"
"sync"
"sync/atomic"
"time"
"github.com/git-pkgs/proxy/internal/database"
"github.com/git-pkgs/proxy/internal/handler"
"github.com/git-pkgs/proxy/internal/storage"
"golang.org/x/sync/errgroup"
)
// Mirror pre-populates the proxy cache from various input sources.
type Mirror struct {
proxy *handler.Proxy
db *database.DB
storage storage.Storage
logger *slog.Logger
workers int
}
// New creates a new Mirror with the given dependencies.
func New(proxy *handler.Proxy, db *database.DB, store storage.Storage, logger *slog.Logger, workers int) *Mirror {
if workers < 1 {
workers = 1
}
return &Mirror{
proxy: proxy,
db: db,
storage: store,
logger: logger,
workers: workers,
}
}
// Progress tracks the state of a mirror operation.
type Progress struct {
Total int64 `json:"total"`
Completed int64 `json:"completed"`
Skipped int64 `json:"skipped"`
Failed int64 `json:"failed"`
Bytes int64 `json:"bytes"`
Errors []MirrorError `json:"errors,omitempty"`
StartedAt time.Time `json:"started_at"`
Phase string `json:"phase"`
}
// MirrorError records a single failed mirror attempt.
type MirrorError struct {
Ecosystem string `json:"ecosystem"`
Name string `json:"name"`
Version string `json:"version"`
Error string `json:"error"`
}
type progressTracker struct {
total atomic.Int64
completed atomic.Int64
skipped atomic.Int64
failed atomic.Int64
bytes atomic.Int64
mu sync.Mutex
errors []MirrorError
startedAt time.Time
phase atomic.Value // string
}
func newProgressTracker() *progressTracker {
pt := &progressTracker{
startedAt: time.Now(),
}
pt.phase.Store("resolving")
return pt
}
const maxTrackedErrors = 1000
const progressReportInterval = 500 * time.Millisecond //nolint:mnd // progress update frequency
func (pt *progressTracker) addError(eco, name, version, err string) {
pt.mu.Lock()
if len(pt.errors) < maxTrackedErrors {
pt.errors = append(pt.errors, MirrorError{
Ecosystem: eco,
Name: name,
Version: version,
Error: err,
})
}
pt.mu.Unlock()
}
func (pt *progressTracker) snapshot() Progress {
pt.mu.Lock()
errs := make([]MirrorError, len(pt.errors))
copy(errs, pt.errors)
pt.mu.Unlock()
phase, _ := pt.phase.Load().(string)
return Progress{
Total: pt.total.Load(),
Completed: pt.completed.Load(),
Skipped: pt.skipped.Load(),
Failed: pt.failed.Load(),
Bytes: pt.bytes.Load(),
Errors: errs,
StartedAt: pt.startedAt,
Phase: phase,
}
}
// ProgressFunc is called periodically with a snapshot of the current progress.
type ProgressFunc func(Progress)
// Run mirrors all packages from the source using a bounded worker pool.
// It returns the final progress when complete. If onProgress is non-nil,
// it is called with progress snapshots as work proceeds.
func (m *Mirror) Run(ctx context.Context, source Source, onProgress ...ProgressFunc) (*Progress, error) {
tracker := newProgressTracker()
// Collect items from source
var items []PackageVersion
tracker.phase.Store("resolving")
err := source.Enumerate(ctx, func(pv PackageVersion) error {
items = append(items, pv)
return nil
})
if err != nil {
return nil, fmt.Errorf("enumerating packages: %w", err)
}
tracker.total.Store(int64(len(items)))
tracker.phase.Store("downloading")
// Start periodic progress reporting if a callback was provided
var progressFn ProgressFunc
if len(onProgress) > 0 && onProgress[0] != nil {
progressFn = onProgress[0]
}
progressDone := make(chan struct{})
if progressFn != nil {
progressFn(tracker.snapshot()) // initial snapshot with total set
go func() {
ticker := time.NewTicker(progressReportInterval)
defer ticker.Stop()
for {
select {
case <-progressDone:
return
case <-ticker.C:
progressFn(tracker.snapshot())
}
}
}()
}
// Process items with bounded concurrency
g, gctx := errgroup.WithContext(ctx)
g.SetLimit(m.workers)
for _, item := range items {
g.Go(func() (err error) {
defer func() {
if r := recover(); r != nil {
m.logger.Error("panic in mirror worker", "recover", r,
"ecosystem", item.Ecosystem, "name", item.Name, "version", item.Version)
tracker.failed.Add(1)
tracker.addError(item.Ecosystem, item.Name, item.Version, fmt.Sprintf("panic: %v", r))
}
}()
m.mirrorOne(gctx, item, tracker)
return nil // never fail the group; errors are tracked
})
}
_ = g.Wait()
close(progressDone) // stop the progress reporter goroutine
tracker.phase.Store("complete")
p := tracker.snapshot()
// Send final snapshot
if progressFn != nil {
progressFn(p)
}
return &p, nil
}
// RunDryRun enumerates what would be mirrored without downloading.
func (m *Mirror) RunDryRun(ctx context.Context, source Source) ([]PackageVersion, error) {
var items []PackageVersion
err := source.Enumerate(ctx, func(pv PackageVersion) error {
items = append(items, pv)
return nil
})
return items, err
}
func (m *Mirror) mirrorOne(ctx context.Context, pv PackageVersion, tracker *progressTracker) {
result, err := m.proxy.GetOrFetchArtifact(ctx, pv.Ecosystem, pv.Name, pv.Version, "")
if err != nil {
tracker.failed.Add(1)
tracker.addError(pv.Ecosystem, pv.Name, pv.Version, err.Error())
m.logger.Warn("mirror failed",
"ecosystem", pv.Ecosystem, "name", pv.Name, "version", pv.Version, "error", err)
return
}
_ = result.Reader.Close()
if result.Cached {
tracker.skipped.Add(1)
m.logger.Debug("already cached",
"ecosystem", pv.Ecosystem, "name", pv.Name, "version", pv.Version)
} else {
tracker.completed.Add(1)
tracker.bytes.Add(result.Size)
m.logger.Info("mirrored",
"ecosystem", pv.Ecosystem, "name", pv.Name, "version", pv.Version,
"size", result.Size)
}
}