forked from mirrors/pkg-proxy
730 lines
23 KiB
Go
730 lines
23 KiB
Go
// Package handler provides HTTP protocol handlers for package manager proxying.
|
|
package handler
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/git-pkgs/proxy/internal/cooldown"
|
|
"github.com/git-pkgs/proxy/internal/database"
|
|
"github.com/git-pkgs/proxy/internal/metrics"
|
|
"github.com/git-pkgs/proxy/internal/storage"
|
|
"github.com/git-pkgs/purl"
|
|
"github.com/git-pkgs/registries/fetch"
|
|
)
|
|
|
|
// containsPathTraversal returns true if the path contains ".." segments
|
|
// that could be used to escape the intended directory.
|
|
func containsPathTraversal(path string) bool {
|
|
for _, segment := range strings.Split(path, "/") {
|
|
if segment == ".." {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
const defaultHTTPTimeout = 30 * time.Second
|
|
|
|
const contentTypeJSON = "application/json"
|
|
|
|
// maxMetadataSize is the maximum size of upstream metadata responses (100 MB).
|
|
// Package metadata (e.g. npm with many versions) can be large, but unbounded
|
|
// reads risk OOM if an upstream misbehaves.
|
|
const maxMetadataSize = 100 << 20
|
|
|
|
// ErrMetadataTooLarge is returned when upstream metadata exceeds maxMetadataSize.
|
|
var ErrMetadataTooLarge = errors.New("metadata response exceeds size limit")
|
|
|
|
// ReadMetadata reads an upstream response body with a size limit to prevent OOM
|
|
// from unexpectedly large responses. Returns ErrMetadataTooLarge if the response
|
|
// is truncated by the limit.
|
|
func ReadMetadata(r io.Reader) ([]byte, error) {
|
|
data, err := io.ReadAll(io.LimitReader(r, maxMetadataSize+1))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if int64(len(data)) > maxMetadataSize {
|
|
return nil, ErrMetadataTooLarge
|
|
}
|
|
return data, nil
|
|
}
|
|
|
|
// Proxy provides shared functionality for protocol handlers.
|
|
type Proxy struct {
|
|
DB *database.DB
|
|
Storage storage.Storage
|
|
Fetcher fetch.FetcherInterface
|
|
Resolver *fetch.Resolver
|
|
Logger *slog.Logger
|
|
Cooldown *cooldown.Config
|
|
CacheMetadata bool
|
|
MetadataTTL time.Duration
|
|
HTTPClient *http.Client
|
|
}
|
|
|
|
// NewProxy creates a new Proxy with the given dependencies.
|
|
func NewProxy(db *database.DB, store storage.Storage, fetcher fetch.FetcherInterface, resolver *fetch.Resolver, logger *slog.Logger) *Proxy {
|
|
if logger == nil {
|
|
logger = slog.Default()
|
|
}
|
|
return &Proxy{
|
|
DB: db,
|
|
Storage: store,
|
|
Fetcher: fetcher,
|
|
Resolver: resolver,
|
|
Logger: logger,
|
|
HTTPClient: &http.Client{
|
|
Timeout: defaultHTTPTimeout,
|
|
},
|
|
}
|
|
}
|
|
|
|
// CacheResult contains information about a cached or fetched artifact.
|
|
type CacheResult struct {
|
|
Reader io.ReadCloser
|
|
Size int64
|
|
ContentType string
|
|
Hash string
|
|
Cached bool
|
|
}
|
|
|
|
// GetOrFetchArtifact retrieves an artifact from cache or fetches from upstream.
|
|
func (p *Proxy) GetOrFetchArtifact(ctx context.Context, ecosystem, name, version, filename string) (*CacheResult, error) {
|
|
pkgPURL := purl.MakePURLString(ecosystem, name, "")
|
|
versionPURL := purl.MakePURLString(ecosystem, name, version)
|
|
|
|
if cached, err := p.checkCache(ctx, pkgPURL, versionPURL, filename); err != nil {
|
|
return nil, err
|
|
} else if cached != nil {
|
|
return cached, nil
|
|
}
|
|
|
|
return p.fetchAndCache(ctx, ecosystem, name, version, filename, pkgPURL, versionPURL)
|
|
}
|
|
|
|
// checkCache looks up an artifact in the cache. Returns nil if not cached.
|
|
func (p *Proxy) checkCache(ctx context.Context, pkgPURL, versionPURL, filename string) (*CacheResult, error) {
|
|
pkg, err := p.DB.GetPackageByPURL(pkgPURL)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("checking package cache: %w", err)
|
|
}
|
|
if pkg == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
ver, err := p.DB.GetVersionByPURL(versionPURL)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("checking version cache: %w", err)
|
|
}
|
|
if ver == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
artifact, err := p.DB.GetArtifact(versionPURL, filename)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("checking artifact cache: %w", err)
|
|
}
|
|
if artifact == nil || !artifact.IsCached() {
|
|
return nil, nil
|
|
}
|
|
|
|
start := time.Now()
|
|
reader, err := p.Storage.Open(ctx, artifact.StoragePath.String)
|
|
metrics.RecordStorageOperation("read", time.Since(start))
|
|
if err != nil {
|
|
metrics.RecordStorageError("read")
|
|
p.Logger.Warn("cached artifact missing from storage, will refetch",
|
|
"path", artifact.StoragePath.String, "error", err)
|
|
return nil, nil
|
|
}
|
|
|
|
_ = p.DB.RecordArtifactHit(versionPURL, filename)
|
|
|
|
// Extract ecosystem from pkgPURL for metrics
|
|
if p, err := purl.Parse(pkgPURL); err == nil {
|
|
metrics.RecordCacheHit(purl.PURLTypeToEcosystem(p.Type))
|
|
}
|
|
|
|
return &CacheResult{
|
|
Reader: reader,
|
|
Size: artifact.Size.Int64,
|
|
ContentType: artifact.ContentType.String,
|
|
Hash: artifact.ContentHash.String,
|
|
Cached: true,
|
|
}, nil
|
|
}
|
|
|
|
func (p *Proxy) fetchAndCache(ctx context.Context, ecosystem, name, version, filename, pkgPURL, versionPURL string) (*CacheResult, error) {
|
|
// Record cache miss
|
|
metrics.RecordCacheMiss(ecosystem)
|
|
|
|
// Resolve download URL
|
|
info, err := p.Resolver.Resolve(ctx, ecosystem, name, version)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("resolving download URL: %w", err)
|
|
}
|
|
|
|
// Use resolved filename if provided filename is empty
|
|
if filename == "" {
|
|
filename = info.Filename
|
|
}
|
|
|
|
p.Logger.Info("fetching from upstream",
|
|
"ecosystem", ecosystem, "name", name, "version", version, "url", info.URL)
|
|
|
|
// Fetch from upstream with timing
|
|
fetchStart := time.Now()
|
|
artifact, err := p.Fetcher.Fetch(ctx, info.URL)
|
|
fetchDuration := time.Since(fetchStart)
|
|
|
|
if err != nil {
|
|
metrics.RecordUpstreamFetch(ecosystem, fetchDuration)
|
|
metrics.RecordUpstreamError(ecosystem, "fetch_failed")
|
|
return nil, fmt.Errorf("fetching from upstream: %w", err)
|
|
}
|
|
metrics.RecordUpstreamFetch(ecosystem, fetchDuration)
|
|
|
|
// Store in cache
|
|
storagePath := storage.ArtifactPath(ecosystem, "", name, version, filename)
|
|
storeStart := time.Now()
|
|
size, hash, err := p.Storage.Store(ctx, storagePath, artifact.Body)
|
|
_ = artifact.Body.Close()
|
|
metrics.RecordStorageOperation("write", time.Since(storeStart))
|
|
|
|
if err != nil {
|
|
metrics.RecordStorageError("write")
|
|
return nil, fmt.Errorf("storing artifact: %w", err)
|
|
}
|
|
|
|
// Update database
|
|
if err := p.updateCacheDB(ecosystem, name, filename, pkgPURL, versionPURL, info.URL, storagePath, hash, size, artifact.ContentType); err != nil {
|
|
p.Logger.Warn("failed to update cache database", "error", err)
|
|
// Continue anyway - we have the file
|
|
}
|
|
|
|
// Open the stored file to return
|
|
readStart := time.Now()
|
|
reader, err := p.Storage.Open(ctx, storagePath)
|
|
metrics.RecordStorageOperation("read", time.Since(readStart))
|
|
|
|
if err != nil {
|
|
metrics.RecordStorageError("read")
|
|
return nil, fmt.Errorf("opening cached artifact: %w", err)
|
|
}
|
|
|
|
return &CacheResult{
|
|
Reader: reader,
|
|
Size: size,
|
|
ContentType: artifact.ContentType,
|
|
Hash: hash,
|
|
Cached: false,
|
|
}, nil
|
|
}
|
|
|
|
func (p *Proxy) updateCacheDB(ecosystem, name, filename, pkgPURL, versionPURL, upstreamURL, storagePath, hash string, size int64, contentType string) error {
|
|
now := time.Now()
|
|
|
|
// Upsert package
|
|
pkg := &database.Package{
|
|
PURL: pkgPURL,
|
|
Ecosystem: ecosystem,
|
|
Name: name,
|
|
RegistryURL: sql.NullString{String: upstreamURL, Valid: true},
|
|
EnrichedAt: sql.NullTime{Time: now, Valid: true},
|
|
}
|
|
if err := p.DB.UpsertPackage(pkg); err != nil {
|
|
return fmt.Errorf("upserting package: %w", err)
|
|
}
|
|
|
|
// Upsert version
|
|
ver := &database.Version{
|
|
PURL: versionPURL,
|
|
PackagePURL: pkgPURL,
|
|
EnrichedAt: sql.NullTime{Time: now, Valid: true},
|
|
}
|
|
if err := p.DB.UpsertVersion(ver); err != nil {
|
|
return fmt.Errorf("upserting version: %w", err)
|
|
}
|
|
|
|
// Upsert artifact
|
|
art := &database.Artifact{
|
|
VersionPURL: versionPURL,
|
|
Filename: filename,
|
|
UpstreamURL: upstreamURL,
|
|
StoragePath: sql.NullString{String: storagePath, Valid: true},
|
|
ContentHash: sql.NullString{String: hash, Valid: true},
|
|
Size: sql.NullInt64{Int64: size, Valid: true},
|
|
ContentType: sql.NullString{String: contentType, Valid: true},
|
|
FetchedAt: sql.NullTime{Time: now, Valid: true},
|
|
}
|
|
if err := p.DB.UpsertArtifact(art); err != nil {
|
|
return fmt.Errorf("upserting artifact: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ServeArtifact writes a CacheResult to an HTTP response.
|
|
func ServeArtifact(w http.ResponseWriter, result *CacheResult) {
|
|
defer func() { _ = result.Reader.Close() }()
|
|
|
|
if result.ContentType != "" {
|
|
w.Header().Set("Content-Type", result.ContentType)
|
|
}
|
|
if result.Size > 0 {
|
|
w.Header().Set("Content-Length", fmt.Sprintf("%d", result.Size))
|
|
}
|
|
if result.Hash != "" {
|
|
w.Header().Set("ETag", fmt.Sprintf(`"%s"`, result.Hash))
|
|
}
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = io.Copy(w, result.Reader)
|
|
}
|
|
|
|
// ProxyUpstream forwards a request to an upstream URL without caching.
|
|
// It copies the request, forwards specified headers, and streams the response back.
|
|
// If forwardHeaders is nil, all response headers are copied.
|
|
func (p *Proxy) ProxyUpstream(w http.ResponseWriter, r *http.Request, upstreamURL string, forwardHeaders []string) {
|
|
p.Logger.Debug("proxying to upstream", "url", upstreamURL)
|
|
|
|
req, err := http.NewRequestWithContext(r.Context(), r.Method, upstreamURL, nil)
|
|
if err != nil {
|
|
http.Error(w, "failed to create request", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// Copy request headers that affect content negotiation / caching
|
|
for _, header := range forwardHeaders {
|
|
if v := r.Header.Get(header); v != "" {
|
|
req.Header.Set(header, v)
|
|
}
|
|
}
|
|
|
|
resp, err := p.HTTPClient.Do(req)
|
|
if err != nil {
|
|
p.Logger.Error("upstream request failed", "error", err)
|
|
http.Error(w, "upstream request failed", http.StatusBadGateway)
|
|
return
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
for k, vv := range resp.Header {
|
|
for _, v := range vv {
|
|
w.Header().Add(k, v)
|
|
}
|
|
}
|
|
|
|
w.WriteHeader(resp.StatusCode)
|
|
_, _ = io.Copy(w, resp.Body)
|
|
}
|
|
|
|
// ProxyFile forwards a file request to upstream, copying all response headers.
|
|
func (p *Proxy) ProxyFile(w http.ResponseWriter, r *http.Request, upstreamURL string) {
|
|
req, err := http.NewRequestWithContext(r.Context(), r.Method, upstreamURL, nil)
|
|
if err != nil {
|
|
http.Error(w, "failed to create request", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
resp, err := p.HTTPClient.Do(req)
|
|
if err != nil {
|
|
http.Error(w, "failed to fetch from upstream", http.StatusBadGateway)
|
|
return
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
for key, values := range resp.Header {
|
|
for _, v := range values {
|
|
w.Header().Add(key, v)
|
|
}
|
|
}
|
|
|
|
w.WriteHeader(resp.StatusCode)
|
|
_, _ = io.Copy(w, resp.Body)
|
|
}
|
|
|
|
// JSONError writes a JSON error response.
|
|
func JSONError(w http.ResponseWriter, status int, message string) {
|
|
w.Header().Set("Content-Type", contentTypeJSON)
|
|
w.WriteHeader(status)
|
|
_, _ = fmt.Fprintf(w, `{"error":%q}`, message)
|
|
}
|
|
|
|
// ErrUpstreamNotFound indicates the upstream returned 404.
|
|
var ErrUpstreamNotFound = fmt.Errorf("upstream: not found")
|
|
|
|
// errStale304 is returned when upstream sends 304 but the cached file is missing.
|
|
var errStale304 = fmt.Errorf("upstream returned 304 but cached file is missing")
|
|
|
|
// metadataStoragePath builds a storage path for cached metadata.
|
|
func metadataStoragePath(ecosystem, cacheKey string) string {
|
|
return "_metadata/" + ecosystem + "/" + cacheKey + "/metadata"
|
|
}
|
|
|
|
// FetchOrCacheMetadata fetches metadata from upstream with caching.
|
|
// On success it returns the raw response bytes and content type.
|
|
// If upstream fails and a cached copy exists, the cached version is returned.
|
|
// cacheKey is typically the package name but can include subpath components.
|
|
// Optional acceptHeaders specify the Accept header(s) to send; defaults to application/json.
|
|
func (p *Proxy) FetchOrCacheMetadata(ctx context.Context, ecosystem, cacheKey, upstreamURL string, acceptHeaders ...string) ([]byte, string, error) {
|
|
if containsPathTraversal(cacheKey) {
|
|
return nil, "", fmt.Errorf("invalid cache key: %q", cacheKey)
|
|
}
|
|
|
|
storagePath := metadataStoragePath(ecosystem, cacheKey)
|
|
|
|
// Check for existing cache entry (for ETag revalidation and TTL)
|
|
var entry *database.MetadataCacheEntry
|
|
if p.CacheMetadata && p.DB != nil {
|
|
entry, _ = p.DB.GetMetadataCache(ecosystem, cacheKey)
|
|
}
|
|
|
|
// Serve from cache if within TTL (skip upstream entirely)
|
|
if entry != nil && p.MetadataTTL > 0 && entry.FetchedAt.Valid {
|
|
if time.Since(entry.FetchedAt.Time) < p.MetadataTTL {
|
|
cached, readErr := p.Storage.Open(ctx, entry.StoragePath)
|
|
if readErr == nil {
|
|
defer func() { _ = cached.Close() }()
|
|
data, readErr := ReadMetadata(cached)
|
|
if readErr == nil {
|
|
ct := contentTypeJSON
|
|
if entry.ContentType.Valid {
|
|
ct = entry.ContentType.String
|
|
}
|
|
return data, ct, nil
|
|
}
|
|
}
|
|
// Cache file missing/unreadable, fall through to upstream
|
|
}
|
|
}
|
|
|
|
accept := contentTypeJSON
|
|
if len(acceptHeaders) > 0 && acceptHeaders[0] != "" {
|
|
accept = acceptHeaders[0]
|
|
}
|
|
|
|
// Try upstream
|
|
body, contentType, etag, lastModified, err := p.fetchUpstreamMetadata(ctx, upstreamURL, entry, accept)
|
|
if errors.Is(err, errStale304) {
|
|
// 304 but cached file is gone; retry without ETag
|
|
body, contentType, etag, lastModified, err = p.fetchUpstreamMetadata(ctx, upstreamURL, nil, accept)
|
|
}
|
|
if err == nil {
|
|
if p.CacheMetadata {
|
|
p.cacheMetadataBlob(ctx, ecosystem, cacheKey, storagePath, body, contentType, etag, lastModified)
|
|
}
|
|
return body, contentType, nil
|
|
}
|
|
|
|
// Upstream failed -- fall back to cache if available
|
|
if !p.CacheMetadata || entry == nil {
|
|
return nil, "", fmt.Errorf("upstream failed and no cached metadata: %w", err)
|
|
}
|
|
|
|
p.Logger.Warn("upstream metadata fetch failed, checking cache",
|
|
"ecosystem", ecosystem, "key", cacheKey, "error", err)
|
|
|
|
cached, readErr := p.Storage.Open(ctx, entry.StoragePath)
|
|
if readErr != nil {
|
|
return nil, "", fmt.Errorf("upstream failed and cached file missing: %w", err)
|
|
}
|
|
defer func() { _ = cached.Close() }()
|
|
|
|
data, readErr := ReadMetadata(cached)
|
|
if readErr != nil {
|
|
return nil, "", fmt.Errorf("upstream failed and cached read error: %w", err)
|
|
}
|
|
|
|
ct := contentTypeJSON
|
|
if entry.ContentType.Valid {
|
|
ct = entry.ContentType.String
|
|
}
|
|
p.Logger.Info("serving metadata from cache",
|
|
"ecosystem", ecosystem, "key", cacheKey)
|
|
return data, ct, nil
|
|
}
|
|
|
|
// fetchUpstreamMetadata fetches metadata from upstream, using ETag for conditional revalidation.
|
|
// Returns the body, content type, ETag, upstream Last-Modified time, and any error.
|
|
func (p *Proxy) fetchUpstreamMetadata(ctx context.Context, upstreamURL string, entry *database.MetadataCacheEntry, accept string) ([]byte, string, string, time.Time, error) {
|
|
var zeroTime time.Time
|
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, upstreamURL, nil)
|
|
if err != nil {
|
|
return nil, "", "", zeroTime, fmt.Errorf("creating request: %w", err)
|
|
}
|
|
req.Header.Set("Accept", accept)
|
|
|
|
if entry != nil && entry.ETag.Valid {
|
|
req.Header.Set("If-None-Match", entry.ETag.String)
|
|
}
|
|
|
|
resp, err := p.HTTPClient.Do(req)
|
|
if err != nil {
|
|
return nil, "", "", zeroTime, fmt.Errorf("fetching metadata: %w", err)
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
// 304 Not Modified -- our cached copy is still good
|
|
if resp.StatusCode == http.StatusNotModified && entry != nil {
|
|
cached, readErr := p.Storage.Open(ctx, entry.StoragePath)
|
|
if readErr != nil {
|
|
return nil, "", "", zeroTime, errStale304
|
|
}
|
|
defer func() { _ = cached.Close() }()
|
|
data, readErr := ReadMetadata(cached)
|
|
if readErr != nil {
|
|
return nil, "", "", zeroTime, errStale304
|
|
}
|
|
ct := contentTypeJSON
|
|
if entry.ContentType.Valid {
|
|
ct = entry.ContentType.String
|
|
}
|
|
lm := zeroTime
|
|
if entry.LastModified.Valid {
|
|
lm = entry.LastModified.Time
|
|
}
|
|
return data, ct, entry.ETag.String, lm, nil
|
|
}
|
|
|
|
if resp.StatusCode == http.StatusNotFound {
|
|
return nil, "", "", zeroTime, ErrUpstreamNotFound
|
|
}
|
|
if resp.StatusCode != http.StatusOK {
|
|
return nil, "", "", zeroTime, fmt.Errorf("upstream returned %d", resp.StatusCode)
|
|
}
|
|
|
|
body, err := ReadMetadata(resp.Body)
|
|
if err != nil {
|
|
return nil, "", "", zeroTime, fmt.Errorf("reading response: %w", err)
|
|
}
|
|
|
|
contentType := resp.Header.Get("Content-Type")
|
|
if contentType == "" {
|
|
contentType = contentTypeJSON
|
|
}
|
|
|
|
etag := resp.Header.Get("ETag")
|
|
|
|
var lastModified time.Time
|
|
if lm := resp.Header.Get("Last-Modified"); lm != "" {
|
|
lastModified, _ = http.ParseTime(lm)
|
|
}
|
|
|
|
return body, contentType, etag, lastModified, nil
|
|
}
|
|
|
|
// cacheMetadataBlob stores metadata bytes in storage and updates the database.
|
|
func (p *Proxy) cacheMetadataBlob(ctx context.Context, ecosystem, cacheKey, storagePath string, data []byte, contentType, etag string, lastModified time.Time) {
|
|
if p.DB == nil || p.Storage == nil {
|
|
return
|
|
}
|
|
|
|
size, _, err := p.Storage.Store(ctx, storagePath, bytes.NewReader(data))
|
|
if err != nil {
|
|
p.Logger.Warn("failed to cache metadata", "ecosystem", ecosystem, "key", cacheKey, "error", err)
|
|
return
|
|
}
|
|
|
|
_ = p.DB.UpsertMetadataCache(&database.MetadataCacheEntry{
|
|
Ecosystem: ecosystem,
|
|
Name: cacheKey,
|
|
StoragePath: storagePath,
|
|
ETag: sql.NullString{String: etag, Valid: etag != ""},
|
|
ContentType: sql.NullString{String: contentType, Valid: contentType != ""},
|
|
Size: sql.NullInt64{Int64: size, Valid: true},
|
|
LastModified: sql.NullTime{Time: lastModified, Valid: !lastModified.IsZero()},
|
|
FetchedAt: sql.NullTime{Time: time.Now(), Valid: true},
|
|
})
|
|
}
|
|
|
|
// cachedMeta holds cache validators and freshness state from a metadata cache entry.
|
|
type cachedMeta struct {
|
|
etag string
|
|
lastModified time.Time
|
|
stale bool
|
|
}
|
|
|
|
// lookupCachedMeta retrieves cache validators for a metadata entry.
|
|
func (p *Proxy) lookupCachedMeta(ecosystem, cacheKey string) cachedMeta {
|
|
if p.DB == nil {
|
|
return cachedMeta{}
|
|
}
|
|
entry, err := p.DB.GetMetadataCache(ecosystem, cacheKey)
|
|
if err != nil || entry == nil {
|
|
return cachedMeta{}
|
|
}
|
|
var cm cachedMeta
|
|
if entry.ETag.Valid {
|
|
cm.etag = entry.ETag.String
|
|
}
|
|
if entry.LastModified.Valid {
|
|
cm.lastModified = entry.LastModified.Time
|
|
}
|
|
// If FetchedAt is older than TTL, upstream must have failed and
|
|
// we served from stale cache (successful fetches update FetchedAt).
|
|
if p.MetadataTTL > 0 && entry.FetchedAt.Valid && time.Since(entry.FetchedAt.Time) > p.MetadataTTL {
|
|
cm.stale = true
|
|
}
|
|
return cm
|
|
}
|
|
|
|
// ProxyCached fetches metadata from upstream (with optional caching for offline fallback)
|
|
// and writes it to the response. Optional acceptHeaders specify the Accept header to send.
|
|
// When metadata caching is disabled, the response is streamed directly to avoid buffering
|
|
// large metadata responses (e.g. npm packages with many versions) in memory.
|
|
func (p *Proxy) ProxyCached(w http.ResponseWriter, r *http.Request, upstreamURL, ecosystem, cacheKey string, acceptHeaders ...string) {
|
|
if !p.CacheMetadata {
|
|
// Stream directly without buffering when caching is off.
|
|
p.proxyMetadataStream(w, r, upstreamURL, acceptHeaders...)
|
|
return
|
|
}
|
|
|
|
body, contentType, err := p.FetchOrCacheMetadata(r.Context(), ecosystem, cacheKey, upstreamURL, acceptHeaders...)
|
|
if err != nil {
|
|
if errors.Is(err, ErrUpstreamNotFound) {
|
|
http.Error(w, "not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
p.Logger.Error("metadata fetch failed", "error", err)
|
|
http.Error(w, "failed to fetch from upstream", http.StatusBadGateway)
|
|
return
|
|
}
|
|
|
|
cm := p.lookupCachedMeta(ecosystem, cacheKey)
|
|
|
|
// Honor client conditional request headers
|
|
if cm.etag != "" {
|
|
if match := r.Header.Get("If-None-Match"); match != "" && match == cm.etag {
|
|
w.WriteHeader(http.StatusNotModified)
|
|
return
|
|
}
|
|
}
|
|
if !cm.lastModified.IsZero() {
|
|
if ims := r.Header.Get("If-Modified-Since"); ims != "" {
|
|
if t, err := http.ParseTime(ims); err == nil && !cm.lastModified.After(t) {
|
|
w.WriteHeader(http.StatusNotModified)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
w.Header().Set("Content-Type", contentType)
|
|
w.Header().Set("Content-Length", strconv.Itoa(len(body)))
|
|
if cm.etag != "" {
|
|
w.Header().Set("ETag", cm.etag)
|
|
}
|
|
if !cm.lastModified.IsZero() {
|
|
w.Header().Set("Last-Modified", cm.lastModified.UTC().Format(http.TimeFormat))
|
|
}
|
|
if cm.stale {
|
|
w.Header().Set("Warning", `110 - "Response is Stale"`)
|
|
}
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write(body)
|
|
}
|
|
|
|
// proxyMetadataStream forwards an upstream metadata response by streaming it to the client
|
|
// without buffering the full body in memory.
|
|
func (p *Proxy) proxyMetadataStream(w http.ResponseWriter, r *http.Request, upstreamURL string, acceptHeaders ...string) {
|
|
req, err := http.NewRequestWithContext(r.Context(), http.MethodGet, upstreamURL, nil)
|
|
if err != nil {
|
|
http.Error(w, "failed to create request", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
accept := contentTypeJSON
|
|
if len(acceptHeaders) > 0 && acceptHeaders[0] != "" {
|
|
accept = acceptHeaders[0]
|
|
}
|
|
req.Header.Set("Accept", accept)
|
|
|
|
for _, header := range []string{"Accept-Encoding", "If-Modified-Since", "If-None-Match"} {
|
|
if v := r.Header.Get(header); v != "" {
|
|
req.Header.Set(header, v)
|
|
}
|
|
}
|
|
|
|
resp, err := p.HTTPClient.Do(req)
|
|
if err != nil {
|
|
http.Error(w, "failed to fetch from upstream", http.StatusBadGateway)
|
|
return
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
for _, header := range []string{"Content-Type", "Content-Length", "Last-Modified", "ETag"} {
|
|
if v := resp.Header.Get(header); v != "" {
|
|
w.Header().Set(header, v)
|
|
}
|
|
}
|
|
|
|
w.WriteHeader(resp.StatusCode)
|
|
_, _ = io.Copy(w, resp.Body)
|
|
}
|
|
|
|
// GetOrFetchArtifactFromURL retrieves an artifact from cache or fetches from a specific URL.
|
|
// This is useful for registries where download URLs are determined from metadata.
|
|
func (p *Proxy) GetOrFetchArtifactFromURL(ctx context.Context, ecosystem, name, version, filename, downloadURL string) (*CacheResult, error) {
|
|
return p.GetOrFetchArtifactFromURLWithHeaders(ctx, ecosystem, name, version, filename, downloadURL, nil)
|
|
}
|
|
|
|
// GetOrFetchArtifactFromURLWithHeaders retrieves an artifact from cache or fetches from a URL
|
|
// with additional HTTP headers. This is needed for registries that require authentication
|
|
// (e.g. Docker Hub requires a Bearer token even for public images).
|
|
func (p *Proxy) GetOrFetchArtifactFromURLWithHeaders(ctx context.Context, ecosystem, name, version, filename, downloadURL string, headers http.Header) (*CacheResult, error) {
|
|
pkgPURL := purl.MakePURLString(ecosystem, name, "")
|
|
versionPURL := purl.MakePURLString(ecosystem, name, version)
|
|
|
|
if cached, err := p.checkCache(ctx, pkgPURL, versionPURL, filename); err != nil {
|
|
return nil, err
|
|
} else if cached != nil {
|
|
return cached, nil
|
|
}
|
|
|
|
return p.fetchAndCacheFromURL(ctx, ecosystem, name, version, filename, pkgPURL, versionPURL, downloadURL, headers)
|
|
}
|
|
|
|
func (p *Proxy) fetchAndCacheFromURL(ctx context.Context, ecosystem, name, version, filename, pkgPURL, versionPURL, downloadURL string, headers http.Header) (*CacheResult, error) {
|
|
p.Logger.Info("fetching from upstream",
|
|
"ecosystem", ecosystem, "name", name, "version", version, "url", downloadURL)
|
|
|
|
artifact, err := p.Fetcher.FetchWithHeaders(ctx, downloadURL, headers)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("fetching from upstream: %w", err)
|
|
}
|
|
|
|
storagePath := storage.ArtifactPath(ecosystem, "", name, version, filename)
|
|
size, hash, err := p.Storage.Store(ctx, storagePath, artifact.Body)
|
|
_ = artifact.Body.Close()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("storing artifact: %w", err)
|
|
}
|
|
|
|
if err := p.updateCacheDB(ecosystem, name, filename, pkgPURL, versionPURL, downloadURL, storagePath, hash, size, artifact.ContentType); err != nil {
|
|
p.Logger.Warn("failed to update cache database", "error", err)
|
|
}
|
|
|
|
reader, err := p.Storage.Open(ctx, storagePath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("opening cached artifact: %w", err)
|
|
}
|
|
|
|
return &CacheResult{
|
|
Reader: reader,
|
|
Size: size,
|
|
ContentType: artifact.ContentType,
|
|
Hash: hash,
|
|
Cached: false,
|
|
}, nil
|
|
}
|