Skip to content

Commit 8fcb860

Browse files
committed
feat(api): fix Volume File API with litestream restore
- Replace direct GCS meta.db download with litestream restore - Convert journal mode WAL -> DELETE after restore - Replace raw GCS upload with litestream replicate for writes - Add 409 Conflict for writes when volume attached to sandbox - Add 412 Precondition Failed for fresh volumes (not yet mounted) - Add cache invalidation on sandbox start/stop with volume - Install litestream and sqlite3 in API container Fixes: Volume File API 500 errors (volume metadata not found)
1 parent 728dad2 commit 8fcb860

File tree

10 files changed

+381
-80
lines changed

10 files changed

+381
-80
lines changed

packages/api/Dockerfile

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,15 @@ FROM alpine:${ALPINE_VERSION}
5454

5555
COPY --from=builder /build/api/bin/api .
5656

57+
# Install sqlite3 for journal mode conversion (WAL -> DELETE after litestream restore)
58+
RUN apk add --no-cache sqlite
59+
60+
# Install litestream from moru-ai fork (for volume metadata restore/replicate)
61+
ARG LITESTREAM_VERSION=v0.5.6-moru.1
62+
RUN wget -q -O /usr/local/bin/litestream \
63+
"https://github.com/moru-ai/litestream/releases/download/${LITESTREAM_VERSION}/litestream-linux" && \
64+
chmod +x /usr/local/bin/litestream
65+
5766
# Set Gin server to the production mode
5867
ENV GIN_MODE=release
5968

packages/api/internal/handlers/sandbox_create.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,12 @@ func (a *APIStore) PostSandboxes(c *gin.Context) {
268268
return
269269
}
270270

271+
// Invalidate volume client cache when sandbox attaches a volume
272+
// This ensures API sees fresh metadata after sandbox mounts the volume
273+
if volumeConfig != nil && a.juicefsPool != nil {
274+
a.juicefsPool.InvalidateVolume(volumeConfig.VolumeID)
275+
}
276+
271277
c.JSON(http.StatusCreated, &sbx)
272278
}
273279

packages/api/internal/handlers/store.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -149,12 +149,6 @@ func NewAPIStore(ctx context.Context, tel *telemetry.Client, config cfg.Config)
149149
// Start the periodic sync of template builds statuses
150150
go templateManager.BuildsStatusPeriodicalSync(ctx)
151151

152-
// Start sandbox runs consumer (writes sandbox events to PostgreSQL)
153-
if redisClient != nil {
154-
sandboxRunsConsumer := sandboxruns.NewConsumer(redisClient, sqlcDB)
155-
go sandboxRunsConsumer.Run(ctx)
156-
}
157-
158152
// Initialize volume events delivery for Redis Streams
159153
var volEventsDelivery events.Delivery[events.VolumeEvent]
160154
if redisClient != nil {
@@ -163,7 +157,7 @@ func NewAPIStore(ctx context.Context, tel *telemetry.Client, config cfg.Config)
163157
}
164158

165159
// JuiceFS pool for volume file operations (list, download, upload, delete)
166-
// Uses SQLite metadata downloaded from GCS for each volume
160+
// Uses litestream restore to get SQLite metadata from GCS for each volume
167161
var juicefsPool *juicefs.Pool
168162
if config.VolumesBucket != "" {
169163
juicefsPool = juicefs.NewPool(juicefs.Config{
@@ -175,6 +169,17 @@ func NewAPIStore(ctx context.Context, tel *telemetry.Client, config cfg.Config)
175169
logger.L().Info(ctx, "Volume file operations disabled (no VOLUMES_BUCKET configured)")
176170
}
177171

172+
// Start sandbox runs consumer (writes sandbox events to PostgreSQL)
173+
// Pass juicefsPool so it can invalidate cache when sandbox with volume terminates
174+
if redisClient != nil {
175+
var consumerOpts []sandboxruns.ConsumerOption
176+
if juicefsPool != nil {
177+
consumerOpts = append(consumerOpts, sandboxruns.WithVolumeInvalidator(juicefsPool.InvalidateVolume))
178+
}
179+
sandboxRunsConsumer := sandboxruns.NewConsumer(redisClient, sqlcDB, consumerOpts...)
180+
go sandboxRunsConsumer.Run(ctx)
181+
}
182+
178183
a := &APIStore{
179184
config: config,
180185
Healthy: false,

packages/api/internal/handlers/volume_files.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/google/uuid"
1818

1919
"github.com/moru-ai/sandbox-infra/packages/api/internal/api"
20+
"github.com/moru-ai/sandbox-infra/packages/api/internal/juicefs"
2021
"github.com/moru-ai/sandbox-infra/packages/db/queries"
2122
)
2223

@@ -92,6 +93,11 @@ func (a *APIStore) GetVolumesVolumeIDFiles(c *gin.Context, volumeID string, para
9293
// Note: redisDB parameter is deprecated, passing 0 (code won't reach here due to nil check above)
9394
client, err := a.juicefsPool.Get(ctx, volume.ID, 0)
9495
if err != nil {
96+
// Handle fresh volumes that haven't been mounted yet
97+
if errors.Is(err, juicefs.ErrVolumeNotInitialized) {
98+
a.sendAPIStoreError(c, http.StatusPreconditionFailed, "Volume not initialized - mount to a sandbox first")
99+
return
100+
}
95101
a.sendAPIStoreError(c, http.StatusInternalServerError, "Failed to connect to volume: "+err.Error())
96102
return
97103
}
@@ -176,6 +182,11 @@ func (a *APIStore) GetVolumesVolumeIDFilesDownload(c *gin.Context, volumeID stri
176182
// Note: redisDB parameter is deprecated, passing 0 (code won't reach here due to nil check above)
177183
client, err := a.juicefsPool.Get(ctx, volume.ID, 0)
178184
if err != nil {
185+
// Handle fresh volumes that haven't been mounted yet
186+
if errors.Is(err, juicefs.ErrVolumeNotInitialized) {
187+
a.sendAPIStoreError(c, http.StatusPreconditionFailed, "Volume not initialized - mount to a sandbox first")
188+
return
189+
}
179190
a.sendAPIStoreError(c, http.StatusInternalServerError, "Failed to connect to volume: "+err.Error())
180191
return
181192
}
@@ -229,6 +240,17 @@ func (a *APIStore) PutVolumesVolumeIDFilesUpload(c *gin.Context, volumeID string
229240
return
230241
}
231242

243+
// Check if volume is attached to a running sandbox (write conflict)
244+
isAttached, err := a.sqlcDB.IsVolumeAttached(ctx, &volume.ID)
245+
if err != nil {
246+
a.sendAPIStoreError(c, http.StatusInternalServerError, "Failed to check volume status")
247+
return
248+
}
249+
if isAttached {
250+
a.sendAPIStoreError(c, http.StatusConflict, "Cannot modify volume while attached to sandbox")
251+
return
252+
}
253+
232254
// Validate path
233255
if !strings.HasPrefix(params.Path, "/") {
234256
a.sendAPIStoreError(c, http.StatusBadRequest, "Path must be absolute")
@@ -242,6 +264,11 @@ func (a *APIStore) PutVolumesVolumeIDFilesUpload(c *gin.Context, volumeID string
242264
// Note: redisDB parameter is deprecated, passing 0 (code won't reach here due to nil check above)
243265
client, err := a.juicefsPool.Get(ctx, volume.ID, 0)
244266
if err != nil {
267+
// Handle fresh volumes that haven't been mounted yet
268+
if errors.Is(err, juicefs.ErrVolumeNotInitialized) {
269+
a.sendAPIStoreError(c, http.StatusPreconditionFailed, "Volume not initialized - mount to a sandbox first")
270+
return
271+
}
245272
a.sendAPIStoreError(c, http.StatusInternalServerError, "Failed to connect to volume: "+err.Error())
246273
return
247274
}
@@ -293,6 +320,17 @@ func (a *APIStore) DeleteVolumesVolumeIDFiles(c *gin.Context, volumeID string, p
293320
return
294321
}
295322

323+
// Check if volume is attached to a running sandbox (write conflict)
324+
isAttached, err := a.sqlcDB.IsVolumeAttached(ctx, &volume.ID)
325+
if err != nil {
326+
a.sendAPIStoreError(c, http.StatusInternalServerError, "Failed to check volume status")
327+
return
328+
}
329+
if isAttached {
330+
a.sendAPIStoreError(c, http.StatusConflict, "Cannot modify volume while attached to sandbox")
331+
return
332+
}
333+
296334
// Validate path
297335
if !strings.HasPrefix(params.Path, "/") {
298336
a.sendAPIStoreError(c, http.StatusBadRequest, "Path must be absolute")
@@ -312,6 +350,11 @@ func (a *APIStore) DeleteVolumesVolumeIDFiles(c *gin.Context, volumeID string, p
312350
// Note: redisDB parameter is deprecated, passing 0 (code won't reach here due to nil check above)
313351
client, err := a.juicefsPool.Get(ctx, volume.ID, 0)
314352
if err != nil {
353+
// Handle fresh volumes that haven't been mounted yet
354+
if errors.Is(err, juicefs.ErrVolumeNotInitialized) {
355+
a.sendAPIStoreError(c, http.StatusPreconditionFailed, "Volume not initialized - mount to a sandbox first")
356+
return
357+
}
315358
a.sendAPIStoreError(c, http.StatusInternalServerError, "Failed to connect to volume: "+err.Error())
316359
return
317360
}

packages/api/internal/juicefs/client.go

Lines changed: 26 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"syscall"
1717
"time"
1818

19-
"cloud.google.com/go/storage"
2019
"github.com/juicedata/juicefs/pkg/chunk"
2120
"github.com/juicedata/juicefs/pkg/fs"
2221
"github.com/juicedata/juicefs/pkg/meta"
@@ -60,57 +59,36 @@ type Client struct {
6059
closed bool
6160
}
6261

62+
// ErrVolumeNotInitialized is returned when a fresh volume has not been mounted to a sandbox yet.
63+
var ErrVolumeNotInitialized = fmt.Errorf("volume not initialized - mount to a sandbox first")
64+
6365
// NewClient creates a new JuiceFS client for a volume.
64-
// Downloads SQLite metadata from GCS and initializes JuiceFS for file operations.
66+
// Uses litestream restore to reconstruct SQLite metadata from LTX files in GCS.
6567
func NewClient(volumeID string, _ int32, config Config) (*Client, error) {
6668
ctx := context.Background()
6769

68-
// Create temporary directory for SQLite file
69-
tmpDir, err := os.MkdirTemp("", "juicefs-client-*")
70+
// Restore metadata from litestream
71+
restoreResult, err := restoreMetaDB(ctx, volumeID, config.GCSBucket)
7072
if err != nil {
71-
return nil, fmt.Errorf("create temp dir: %w", err)
73+
return nil, fmt.Errorf("restore metadata: %w", err)
7274
}
7375

74-
sqlitePath := filepath.Join(tmpDir, "meta.db")
75-
76-
// Download SQLite metadata from GCS
77-
gcsClient, err := storage.NewClient(ctx)
78-
if err != nil {
79-
os.RemoveAll(tmpDir)
80-
return nil, fmt.Errorf("create GCS client: %w", err)
76+
// Fresh volumes must be mounted to a sandbox first to initialize JuiceFS metadata
77+
if restoreResult.IsFreshVolume {
78+
cleanupVolumeDir(volumeID)
79+
return nil, ErrVolumeNotInitialized
8180
}
82-
defer gcsClient.Close()
83-
84-
_, metaPrefix := gcsPathsForVolume(config.GCSBucket, volumeID)
85-
bucket := gcsClient.Bucket(config.GCSBucket)
86-
metaObj := bucket.Object(metaPrefix + "meta.db")
8781

88-
reader, err := metaObj.NewReader(ctx)
89-
if err != nil {
90-
os.RemoveAll(tmpDir)
91-
if err == storage.ErrObjectNotExist {
92-
return nil, fmt.Errorf("volume metadata not found: %s", volumeID)
93-
}
94-
return nil, fmt.Errorf("download metadata: %w", err)
95-
}
96-
97-
sqliteFile, err := os.Create(sqlitePath)
98-
if err != nil {
99-
reader.Close()
100-
os.RemoveAll(tmpDir)
101-
return nil, fmt.Errorf("create sqlite file: %w", err)
102-
}
82+
sqlitePath := restoreResult.MetaDBPath
83+
tmpDir := filepath.Dir(sqlitePath)
10384

104-
if _, err = io.Copy(sqliteFile, reader); err != nil {
105-
sqliteFile.Close()
106-
reader.Close()
107-
os.RemoveAll(tmpDir)
108-
return nil, fmt.Errorf("write sqlite file: %w", err)
85+
// Convert journal mode from WAL to DELETE (required for JuiceFS)
86+
if err := convertJournalMode(ctx, sqlitePath); err != nil {
87+
cleanupVolumeDir(volumeID)
88+
return nil, fmt.Errorf("convert journal mode: %w", err)
10989
}
110-
sqliteFile.Close()
111-
reader.Close()
11290

113-
logger.L().Info(ctx, "Downloaded volume metadata",
91+
logger.L().Info(ctx, "Restored volume metadata via litestream",
11492
zap.String("volume_id", volumeID),
11593
zap.String("path", sqlitePath))
11694

@@ -251,50 +229,30 @@ func (c *Client) Close() error {
251229
return nil
252230
}
253231

254-
// SyncToGCS uploads the current SQLite metadata to GCS.
232+
// SyncToGCS syncs the current SQLite metadata to GCS via litestream.
255233
// This should be called after write operations to persist changes.
256234
func (c *Client) SyncToGCS() error {
257235
c.mu.Lock()
258236
defer c.mu.Unlock()
259237
return c.syncToGCSLocked()
260238
}
261239

262-
// syncToGCSLocked uploads SQLite metadata to GCS (must hold lock).
240+
// syncToGCSLocked syncs SQLite metadata to GCS via litestream (must hold lock).
241+
// Uses litestream replicate to ensure compatibility with sandbox's Litestream daemon.
263242
func (c *Client) syncToGCSLocked() error {
264243
if c.sqlitePath == "" {
265244
return nil
266245
}
267246

268247
ctx := context.Background()
269248

270-
gcsClient, err := storage.NewClient(ctx)
271-
if err != nil {
272-
return fmt.Errorf("create GCS client: %w", err)
273-
}
274-
defer gcsClient.Close()
275-
276-
_, metaPrefix := gcsPathsForVolume(c.config.GCSBucket, c.volumeID)
277-
bucket := gcsClient.Bucket(c.config.GCSBucket)
278-
metaObj := bucket.Object(metaPrefix + "meta.db")
279-
280-
// Read local SQLite file
281-
sqliteFile, err := os.Open(c.sqlitePath)
282-
if err != nil {
283-
return fmt.Errorf("open sqlite file: %w", err)
284-
}
285-
defer sqliteFile.Close()
286-
287-
// Upload to GCS
288-
writer := metaObj.NewWriter(ctx)
289-
if _, err = io.Copy(writer, sqliteFile); err != nil {
290-
writer.Close()
291-
return fmt.Errorf("upload metadata: %w", err)
292-
}
293-
if err = writer.Close(); err != nil {
294-
return fmt.Errorf("close GCS writer: %w", err)
249+
// Use litestream replicate to sync metadata to GCS
250+
// This ensures compatibility with the sandbox's Litestream daemon
251+
if err := syncViaLitestream(ctx, c.volumeID, c.sqlitePath, c.config.GCSBucket); err != nil {
252+
return fmt.Errorf("litestream sync: %w", err)
295253
}
296254

297-
logger.L().Debug(ctx, "Synced metadata to GCS",
255+
logger.L().Debug(ctx, "Synced metadata to GCS via litestream",
298256
zap.String("volume_id", c.volumeID))
299257

300258
return nil

packages/api/internal/juicefs/pool.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,15 @@ import (
77
"fmt"
88
"sync"
99
"time"
10+
11+
"go.uber.org/zap"
12+
13+
"github.com/moru-ai/sandbox-infra/packages/shared/pkg/logger"
1014
)
1115

1216
// Pool manages a pool of JuiceFS clients, one per volume.
1317
// Clients are cached and reused to avoid repeated initialization.
18+
// Cache is invalidated when volume mount state changes (sandbox starts/stops).
1419
type Pool struct {
1520
config Config
1621

@@ -66,6 +71,27 @@ func (p *Pool) Get(ctx context.Context, volumeID string, _ int32) (*Client, erro
6671
return client, nil
6772
}
6873

74+
// InvalidateVolume removes a volume's cached client.
75+
// This should be called when a sandbox starts or stops with the volume attached,
76+
// as the volume's metadata may have changed.
77+
func (p *Pool) InvalidateVolume(volumeID string) {
78+
p.mu.Lock()
79+
defer p.mu.Unlock()
80+
81+
if pc, ok := p.clients[volumeID]; ok {
82+
// Close the client (best effort - ignore errors during invalidation)
83+
if err := pc.client.Close(); err != nil {
84+
logger.L().Warn(context.Background(), "Error closing invalidated volume client",
85+
zap.String("volume_id", volumeID),
86+
zap.Error(err))
87+
}
88+
delete(p.clients, volumeID)
89+
90+
logger.L().Info(context.Background(), "Invalidated volume client cache",
91+
zap.String("volume_id", volumeID))
92+
}
93+
}
94+
6995
// Config returns the pool's configuration.
7096
func (p *Pool) Config() Config {
7197
return p.config

0 commit comments

Comments
 (0)