Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 38 additions & 19 deletions packages/orchestrator/pkg/sandbox/block/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"math"
"math/rand"
"os"
"slices"
"sync"
"sync/atomic"
"syscall"
Expand Down Expand Up @@ -49,7 +48,8 @@ type Cache struct {
blockSize int64
mmap *mmap.MMap
mu sync.RWMutex
dirty sync.Map
dirty *bitset.BitSet
dirtyMu sync.RWMutex // protects dirty bitset for concurrent access
dirtyFile bool
closed atomic.Bool
}
Expand All @@ -68,6 +68,7 @@ func NewCache(size, blockSize int64, filePath string, dirtyFile bool) (*Cache, e
filePath: filePath,
size: size,
blockSize: blockSize,
dirty: bitset.New(0),
dirtyFile: dirtyFile,
}, nil
}
Expand All @@ -87,11 +88,14 @@ func NewCache(size, blockSize int64, filePath string, dirtyFile bool) (*Cache, e
return nil, fmt.Errorf("error mapping file: %w", err)
}

totalBlocks := uint(header.TotalBlocks(size, blockSize))

return &Cache{
mmap: &mm,
filePath: filePath,
size: size,
blockSize: blockSize,
dirty: bitset.New(totalBlocks),
dirtyFile: dirtyFile,
}, nil
}
Expand Down Expand Up @@ -253,12 +257,16 @@ func (c *Cache) isCached(off, length int64) bool {

// Cap if the length goes beyond the cache size, so we don't check for blocks that are out of bounds.
end := min(off+length, c.size)
// Recalculate the length based on the capped end, so we check for the correct blocks in case of capping.
length = end - off

for _, blockOff := range header.BlocksOffsets(length, c.blockSize) {
_, dirty := c.dirty.Load(off + blockOff)
if !dirty {
// Check all blocks in the range
startBlock := uint(header.BlockIdx(off, c.blockSize))
endBlock := uint(header.BlockIdx(end-1, c.blockSize))

c.dirtyMu.RLock()
defer c.dirtyMu.RUnlock()

for i := startBlock; i <= endBlock; i++ {
if !c.dirty.Test(i) {
return false
}
}
Expand All @@ -267,9 +275,14 @@ func (c *Cache) isCached(off, length int64) bool {
}

func (c *Cache) setIsCached(off, length int64) {
for _, blockOff := range header.BlocksOffsets(length, c.blockSize) {
c.dirty.Store(off+blockOff, struct{}{})
startBlock := uint(header.BlockIdx(off, c.blockSize))
endBlock := uint(header.BlockIdx(off+length-1, c.blockSize))

c.dirtyMu.Lock()
for i := startBlock; i <= endBlock; i++ {
c.dirty.Set(i)
}
c.dirtyMu.Unlock()
}

// When using WriteAtWithoutLock you must ensure thread safety, ideally by only writing to the same block once and the exposing the slice.
Expand All @@ -291,16 +304,18 @@ func (c *Cache) WriteAtWithoutLock(b []byte, off int64) (int, error) {
return n, nil
}

// dirtySortedKeys returns a sorted list of dirty keys.
// Key represents a block offset.
// dirtySortedKeys returns a sorted list of dirty block offsets.
func (c *Cache) dirtySortedKeys() []int64 {
var keys []int64
c.dirty.Range(func(key, _ any) bool {
keys = append(keys, key.(int64))
c.dirtyMu.RLock()
defer c.dirtyMu.RUnlock()

// Pre-allocate with estimated capacity
keys := make([]int64, 0, c.dirty.Count())

return true
})
slices.Sort(keys)
// Iterate set bits in order (bitset iteration is already sorted)
for i, ok := c.dirty.NextSet(0); ok; i, ok = c.dirty.NextSet(i + 1) {
keys = append(keys, header.BlockOffset(int64(i), c.blockSize))
}

return keys
}
Expand Down Expand Up @@ -491,9 +506,13 @@ func (c *Cache) copyProcessMemory(
return fmt.Errorf("failed to read memory: expected %d bytes, got %d", segmentSize, n)
}

for _, blockOff := range header.BlocksOffsets(segmentSize, c.blockSize) {
c.dirty.Store(offset+blockOff, struct{}{})
startBlock := uint(header.BlockIdx(offset, c.blockSize))
endBlock := uint(header.BlockIdx(offset+segmentSize-1, c.blockSize))
c.dirtyMu.Lock()
for i := startBlock; i <= endBlock; i++ {
c.dirty.Set(i)
}
c.dirtyMu.Unlock()

offset += segmentSize

Expand Down
169 changes: 169 additions & 0 deletions packages/orchestrator/pkg/sandbox/block/cache_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package block

import (
"os"
"testing"

"github.com/e2b-dev/infra/packages/shared/pkg/storage/header"
)

// BenchmarkDirtyTracking benchmarks the dirty tracking operations
// which were changed from sync.Map to bitset.BitSet.
func BenchmarkDirtyTracking(b *testing.B) {
const (
blockSize = int64(header.PageSize) // 4KB blocks
cacheSize = 1024 * 1024 * 1024 // 1GB cache = 262144 blocks
)

tmpFile := b.TempDir() + "/bench_cache"

cache, err := NewCache(cacheSize, blockSize, tmpFile, false)
if err != nil {
b.Fatalf("failed to create cache: %v", err)
}
defer cache.Close()

// Simulate write data
data := make([]byte, blockSize)

b.Run("SetIsCached", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {

Check failure on line 33 in packages/orchestrator/pkg/sandbox/block/cache_bench_test.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint (/home/runner/work/infra/infra/packages/orchestrator)

for loop can be changed to use an integer range (Go 1.22+) (intrange)
// Simulate marking a block as cached
off := int64(i%262144) * blockSize
cache.setIsCached(off, blockSize)
}
})

b.Run("IsCached_Hit", func(b *testing.B) {
// Pre-populate some blocks as cached
for i := int64(0); i < 1000; i++ {

Check failure on line 42 in packages/orchestrator/pkg/sandbox/block/cache_bench_test.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint (/home/runner/work/infra/infra/packages/orchestrator)

for loop can be changed to use an integer range (Go 1.22+) (intrange)
cache.setIsCached(i*blockSize, blockSize)
}

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {

Check failure on line 49 in packages/orchestrator/pkg/sandbox/block/cache_bench_test.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint (/home/runner/work/infra/infra/packages/orchestrator)

for loop can be changed to use an integer range (Go 1.22+) (intrange)
off := int64(i%1000) * blockSize
cache.isCached(off, blockSize)
}
})

b.Run("IsCached_Miss", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {

Check failure on line 59 in packages/orchestrator/pkg/sandbox/block/cache_bench_test.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint (/home/runner/work/infra/infra/packages/orchestrator)

for loop can be changed to use an integer range (Go 1.22+) (intrange)
// Check blocks that are definitely not cached (high offsets)
off := int64(100000+i%100000) * blockSize
cache.isCached(off, blockSize)
}
})

b.Run("WriteAt_SingleBlock", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {

Check failure on line 70 in packages/orchestrator/pkg/sandbox/block/cache_bench_test.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint (/home/runner/work/infra/infra/packages/orchestrator)

for loop can be changed to use an integer range (Go 1.22+) (intrange)
off := int64(i%262144) * blockSize
cache.WriteAt(data, off)
}
})

b.Run("WriteAt_MultiBlock", func(b *testing.B) {
// Write spanning 4 blocks
multiBlockData := make([]byte, blockSize*4)

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {

Check failure on line 83 in packages/orchestrator/pkg/sandbox/block/cache_bench_test.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint (/home/runner/work/infra/infra/packages/orchestrator)

for loop can be changed to use an integer range (Go 1.22+) (intrange)
off := int64(i%65536) * blockSize * 4
cache.WriteAt(multiBlockData, off)
}
})
}

// BenchmarkDirtySortedKeys benchmarks the dirtySortedKeys operation
// used during export.
func BenchmarkDirtySortedKeys(b *testing.B) {
const (
blockSize = int64(header.PageSize)
cacheSize = 1024 * 1024 * 1024 // 1GB
)

tmpFile := b.TempDir() + "/bench_cache"

cache, err := NewCache(cacheSize, blockSize, tmpFile, false)
if err != nil {
b.Fatalf("failed to create cache: %v", err)
}
defer cache.Close()

// Mark 10% of blocks as dirty (26214 blocks)
for i := int64(0); i < 26214; i++ {

Check failure on line 107 in packages/orchestrator/pkg/sandbox/block/cache_bench_test.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint (/home/runner/work/infra/infra/packages/orchestrator)

for loop can be changed to use an integer range (Go 1.22+) (intrange)
cache.setIsCached(i*blockSize, blockSize)
}

b.Run("DirtySortedKeys_10pct", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {

Check failure on line 115 in packages/orchestrator/pkg/sandbox/block/cache_bench_test.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint (/home/runner/work/infra/infra/packages/orchestrator)

for loop can be changed to use an integer range (Go 1.22+) (intrange)
keys := cache.dirtySortedKeys()
_ = keys
}
})
}

// BenchmarkCacheCreation benchmarks cache creation overhead.
func BenchmarkCacheCreation(b *testing.B) {
const (
blockSize = int64(header.PageSize)
)

sizes := []int64{
1 * 1024 * 1024, // 1MB
100 * 1024 * 1024, // 100MB
1024 * 1024 * 1024, // 1GB
10 * 1024 * 1024 * 1024, // 10GB
}

for _, size := range sizes {
name := formatSize(size)
b.Run(name, func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {

Check failure on line 141 in packages/orchestrator/pkg/sandbox/block/cache_bench_test.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint (/home/runner/work/infra/infra/packages/orchestrator)

for loop can be changed to use an integer range (Go 1.22+) (intrange)
tmpFile := b.TempDir() + "/bench_cache"
cache, err := NewCache(size, blockSize, tmpFile, false)
if err != nil {
b.Fatalf("failed to create cache: %v", err)
}
cache.Close()
os.RemoveAll(tmpFile)
}
})
}
}

func formatSize(bytes int64) string {
const (
KB = 1024
MB = 1024 * KB
GB = 1024 * MB
)

switch {
case bytes >= GB:
return string(rune('0'+bytes/GB)) + "GB"
case bytes >= MB:
return string(rune('0'+bytes/MB)) + "MB"
default:
return string(rune('0'+bytes/KB)) + "KB"
}
}
Loading