Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 63 additions & 11 deletions packages/orchestrator/pkg/sandbox/block/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ type Cache struct {
blockSize int64
mmap *mmap.MMap
mu sync.RWMutex
dirty sync.Map
dirty *bitset.BitSet
dirtyMu sync.RWMutex // protects dirty bitset for concurrent access
dirtyFile bool
closed atomic.Bool
}
Expand All @@ -71,6 +72,7 @@ func NewCache(size, blockSize int64, filePath string, dirtyFile bool) (*Cache, e
filePath: filePath,
size: size,
blockSize: blockSize,
dirty: bitset.New(0),
dirtyFile: dirtyFile,
}, nil
}
Expand All @@ -90,11 +92,14 @@ func NewCache(size, blockSize int64, filePath string, dirtyFile bool) (*Cache, e
return nil, fmt.Errorf("error mapping file: %w", err)
}

totalBlocks := uint(header.TotalBlocks(size, blockSize))

return &Cache{
mmap: &mm,
filePath: filePath,
size: size,
blockSize: blockSize,
dirty: bitset.New(totalBlocks),
dirtyFile: dirtyFile,
}, nil
}
Expand Down Expand Up @@ -302,19 +307,33 @@ func (c *Cache) Slice(off, length int64) ([]byte, error) {
}

func (c *Cache) isCached(off, length int64) bool {
// Reject negative offsets
if off < 0 {
return false
}

// Make sure the offset is within the cache size
if off >= c.size {
return false
}

// Cap if the length goes beyond the cache size, so we don't check for blocks that are out of bounds.
end := min(off+length, c.size)
// Recalculate the length based on the capped end, so we check for the correct blocks in case of capping.
length = end - off

for _, blockOff := range header.BlocksOffsets(length, c.blockSize) {
_, dirty := c.dirty.Load(off + blockOff)
if !dirty {
// Handle zero-length or empty range (vacuously true - no blocks to check)
if end <= off {
return true
}

// Check all blocks in the range
startBlock := uint(header.BlockIdx(off, c.blockSize))
endBlock := uint(header.BlockIdx(end-1, c.blockSize))

c.dirtyMu.RLock()
defer c.dirtyMu.RUnlock()

for i := startBlock; i <= endBlock; i++ {
if !c.dirty.Test(i) {
return false
}
}
Expand All @@ -323,9 +342,24 @@ func (c *Cache) isCached(off, length int64) bool {
}

func (c *Cache) setIsCached(off, length int64) {
for _, blockOff := range header.BlocksOffsets(length, c.blockSize) {
c.dirty.Store(off+blockOff, struct{}{})
// Reject negative offsets
if off < 0 {
return
}

// Handle zero-length - nothing to mark
if length <= 0 {
return
}

startBlock := uint(header.BlockIdx(off, c.blockSize))
endBlock := uint(header.BlockIdx(off+length-1, c.blockSize))

c.dirtyMu.Lock()
for i := startBlock; i <= endBlock; i++ {
c.dirty.Set(i)
}
c.dirtyMu.Unlock()
}

// When using WriteAtWithoutLock you must ensure thread safety, ideally by only writing to the same block once and the exposing the slice.
Expand All @@ -338,6 +372,10 @@ func (c *Cache) WriteAtWithoutLock(b []byte, off int64) (int, error) {
return 0, nil
}

if off < 0 || off >= c.size {
return 0, nil
}

end := min(off+int64(len(b)), c.size)

n := copy((*c.mmap)[off:end], b)
Expand All @@ -347,6 +385,22 @@ func (c *Cache) WriteAtWithoutLock(b []byte, off int64) (int, error) {
return n, nil
}

// dirtySortedKeys returns a sorted list of dirty block offsets.
func (c *Cache) dirtySortedKeys() []int64 {
c.dirtyMu.RLock()
defer c.dirtyMu.RUnlock()

// Pre-allocate with estimated capacity
keys := make([]int64, 0, c.dirty.Count())

// Iterate set bits in order (bitset iteration is already sorted)
for i, ok := c.dirty.NextSet(0); ok; i, ok = c.dirty.NextSet(i + 1) {
keys = append(keys, header.BlockOffset(int64(i), c.blockSize))
}

return keys
}

// FileSize returns the size of the cache on disk.
// The size might differ from the dirty size, as it may not be fully on disk.
func (c *Cache) FileSize() (int64, error) {
Expand Down Expand Up @@ -533,9 +587,7 @@ func (c *Cache) copyProcessMemory(
return fmt.Errorf("failed to read memory: expected %d bytes, got %d", segmentSize, n)
}

for _, blockOff := range header.BlocksOffsets(segmentSize, c.blockSize) {
c.dirty.Store(offset+blockOff, struct{}{})
}
c.setIsCached(offset, segmentSize)

offset += segmentSize

Expand Down
177 changes: 177 additions & 0 deletions packages/orchestrator/pkg/sandbox/block/cache_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package block

import (
"fmt"
"os"
"testing"

"github.com/e2b-dev/infra/packages/shared/pkg/storage/header"
)

// BenchmarkDirtyTracking benchmarks the dirty tracking operations
// which were changed from sync.Map to bitset.BitSet.
func BenchmarkDirtyTracking(b *testing.B) {
const (
blockSize = int64(header.PageSize) // 4KB blocks
cacheSize = 1024 * 1024 * 1024 // 1GB cache = 262144 blocks
)

tmpFile := b.TempDir() + "/bench_cache"

cache, err := NewCache(cacheSize, blockSize, tmpFile, false)
if err != nil {
b.Fatalf("failed to create cache: %v", err)
}
defer cache.Close()

// Simulate write data
data := make([]byte, blockSize)

b.Run("SetIsCached", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()

for i := range b.N {
// Simulate marking a block as cached
off := int64(i%262144) * blockSize
cache.setIsCached(off, blockSize)
}
})

b.Run("IsCached_Hit", func(b *testing.B) {
// Pre-populate some blocks as cached
for i := range int64(1000) {
cache.setIsCached(i*blockSize, blockSize)
}

b.ReportAllocs()
b.ResetTimer()

for i := range b.N {
off := int64(i%1000) * blockSize
cache.isCached(off, blockSize)
}
})

b.Run("IsCached_Miss", func(b *testing.B) {
// Use a fresh cache to ensure we're measuring actual misses,
// not hits from blocks populated by previous sub-benchmarks
missCache, err := NewCache(cacheSize, blockSize, b.TempDir()+"/bench_cache_miss", false)
if err != nil {
b.Fatalf("failed to create miss cache: %v", err)
}
defer missCache.Close()

b.ReportAllocs()
b.ResetTimer()

for i := range b.N {
off := int64(i%262144) * blockSize
missCache.isCached(off, blockSize)
}
})

b.Run("WriteAt_SingleBlock", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()

for i := range b.N {
off := int64(i%262144) * blockSize
cache.WriteAt(data, off)
}
})

b.Run("WriteAt_MultiBlock", func(b *testing.B) {
// Write spanning 4 blocks
multiBlockData := make([]byte, blockSize*4)

b.ReportAllocs()
b.ResetTimer()

for i := range b.N {
off := int64(i%65536) * blockSize * 4
cache.WriteAt(multiBlockData, off)
}
})
}

// BenchmarkDirtySortedKeys benchmarks the dirtySortedKeys operation
// used during export.
func BenchmarkDirtySortedKeys(b *testing.B) {
const (
blockSize = int64(header.PageSize)
cacheSize = 1024 * 1024 * 1024 // 1GB
)

tmpFile := b.TempDir() + "/bench_cache"

cache, err := NewCache(cacheSize, blockSize, tmpFile, false)
if err != nil {
b.Fatalf("failed to create cache: %v", err)
}
defer cache.Close()

// Mark 10% of blocks as dirty (26214 blocks)
for i := range int64(26214) {
cache.setIsCached(i*blockSize, blockSize)
}

b.Run("DirtySortedKeys_10pct", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()

for range b.N {
keys := cache.dirtySortedKeys()
_ = keys
}
})
}

// BenchmarkCacheCreation benchmarks cache creation overhead.
func BenchmarkCacheCreation(b *testing.B) {
const (
blockSize = int64(header.PageSize)
)

sizes := []int64{
1 * 1024 * 1024, // 1MB
100 * 1024 * 1024, // 100MB
1024 * 1024 * 1024, // 1GB
10 * 1024 * 1024 * 1024, // 10GB
}

for _, size := range sizes {
name := formatSize(size)
b.Run(name, func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()

for range b.N {
tmpFile := b.TempDir() + "/bench_cache"
cache, err := NewCache(size, blockSize, tmpFile, false)
if err != nil {
b.Fatalf("failed to create cache: %v", err)
}
cache.Close()
os.RemoveAll(tmpFile)
}
})
}
}

func formatSize(bytes int64) string {
const (
KB = 1024
MB = 1024 * KB
GB = 1024 * MB
)

switch {
case bytes >= GB:
return fmt.Sprintf("%dGB", bytes/GB)
case bytes >= MB:
return fmt.Sprintf("%dMB", bytes/MB)
default:
return fmt.Sprintf("%dKB", bytes/KB)
}
}
Loading
Loading