pkg-proxy/internal/storage/blob.go
Andrew Nesbitt 24d5e77443
Fix cross-device link error when running in Docker with volumes (#66)
`fileblob` creates temp files in `os.TempDir()` (`/tmp`) by default,
then uses `os.Rename` to move them to the final path. When the storage
directory is on a different filesystem (e.g. a Docker volume mount at
`/data`), the rename fails with "invalid cross-device link".

Set `no_tmp_dir=true` on file:// bucket URLs so fileblob creates temp
files next to the final destination instead.

Fixes #65
2026-04-06 13:07:31 +01:00

180 lines
4.5 KiB
Go

package storage
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"os"
"path/filepath"
"runtime"
"strings"
"gocloud.dev/blob"
_ "gocloud.dev/blob/fileblob"
_ "gocloud.dev/blob/s3blob"
"gocloud.dev/gcerrors"
)
const osWindows = "windows"
// Blob implements Storage using gocloud.dev/blob.
// Supports local filesystem (file://) and S3 (s3://) URLs.
type Blob struct {
bucket *blob.Bucket
url string
}
// OpenBucket opens a blob bucket from a URL.
//
// Supported URL schemes:
// - file:///path/to/dir - Local filesystem storage
// - s3://bucket-name - Amazon S3 (uses AWS_* environment variables)
// - s3://bucket-name?region=us-east-1&endpoint=http://localhost:9000 - S3-compatible (MinIO, etc.)
//
// For local filesystem, the directory is created if it doesn't exist.
func OpenBucket(ctx context.Context, urlStr string) (*Blob, error) {
// Handle file:// URLs specially to create the directory
if strings.HasPrefix(urlStr, "file://") {
path := strings.TrimPrefix(urlStr, "file://")
// Handle file:/// (three slashes) for absolute paths
if strings.HasPrefix(path, "/") && runtime.GOOS != osWindows {
// Unix: file:///path -> /path
// path is already correct
} else if strings.HasPrefix(path, "/") && runtime.GOOS == osWindows {
// Windows: file:///C:/path -> C:/path
path = strings.TrimPrefix(path, "/")
}
// Convert forward slashes to native path separators for filesystem operations
nativePath := filepath.FromSlash(path)
// Ensure directory exists
if err := os.MkdirAll(nativePath, dirPermissions); err != nil {
return nil, fmt.Errorf("creating directory: %w", err)
}
// fileblob requires an absolute path with forward slashes
absPath, err := filepath.Abs(nativePath)
if err != nil {
return nil, fmt.Errorf("resolving path: %w", err)
}
// Convert back to URL format with forward slashes
urlPath := filepath.ToSlash(absPath)
if runtime.GOOS == osWindows {
// Windows needs file:///C:/path format
urlStr = "file:///" + urlPath
} else {
urlStr = "file://" + urlPath
}
// Create temp files next to the final path instead of in os.TempDir.
// This avoids "invalid cross-device link" errors from os.Rename when
// the bucket directory and os.TempDir are on different filesystems
// (e.g. Docker volume mounts).
urlStr += "?no_tmp_dir=true"
}
bucket, err := blob.OpenBucket(ctx, urlStr)
if err != nil {
return nil, fmt.Errorf("opening bucket: %w", err)
}
return &Blob{bucket: bucket, url: urlStr}, nil
}
func (b *Blob) Store(ctx context.Context, path string, r io.Reader) (int64, string, error) {
// Compute hash while writing
h := sha256.New()
tee := io.TeeReader(r, h)
opts := &blob.WriterOptions{}
w, err := b.bucket.NewWriter(ctx, path, opts)
if err != nil {
return 0, "", fmt.Errorf("creating writer: %w", err)
}
size, err := io.Copy(w, tee)
if err != nil {
_ = w.Close()
return 0, "", fmt.Errorf("writing content: %w", err)
}
if err := w.Close(); err != nil {
return 0, "", fmt.Errorf("closing writer: %w", err)
}
hash := hex.EncodeToString(h.Sum(nil))
return size, hash, nil
}
func (b *Blob) Open(ctx context.Context, path string) (io.ReadCloser, error) {
r, err := b.bucket.NewReader(ctx, path, nil)
if err != nil {
if isNotExist(err) {
return nil, ErrNotFound
}
return nil, fmt.Errorf("opening reader: %w", err)
}
return r, nil
}
func (b *Blob) Exists(ctx context.Context, path string) (bool, error) {
exists, err := b.bucket.Exists(ctx, path)
if err != nil {
return false, fmt.Errorf("checking existence: %w", err)
}
return exists, nil
}
func (b *Blob) Delete(ctx context.Context, path string) error {
err := b.bucket.Delete(ctx, path)
if err != nil && !isNotExist(err) {
return fmt.Errorf("deleting object: %w", err)
}
return nil
}
func (b *Blob) Size(ctx context.Context, path string) (int64, error) {
attrs, err := b.bucket.Attributes(ctx, path)
if err != nil {
if isNotExist(err) {
return 0, ErrNotFound
}
return 0, fmt.Errorf("getting attributes: %w", err)
}
return attrs.Size, nil
}
func (b *Blob) UsedSpace(ctx context.Context) (int64, error) {
var total int64
iter := b.bucket.List(nil)
for {
obj, err := iter.Next(ctx)
if err == io.EOF {
break
}
if err != nil {
return 0, fmt.Errorf("listing objects: %w", err)
}
total += obj.Size
}
return total, nil
}
func (b *Blob) Close() error {
return b.bucket.Close()
}
func (b *Blob) URL() string {
return b.url
}
func isNotExist(err error) bool {
return gcerrors.Code(err) == gcerrors.NotFound
}