diff --git a/packages/api/go.mod b/packages/api/go.mod index 9d197cf26d..302402ef2a 100644 --- a/packages/api/go.mod +++ b/packages/api/go.mod @@ -102,7 +102,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/bep/godartsass/v2 v2.3.2 // indirect github.com/bep/golibsass v1.2.0 // indirect - github.com/bits-and-blooms/bitset v1.22.0 // indirect + github.com/bits-and-blooms/bitset v1.24.2 // indirect github.com/bits-and-blooms/bloom/v3 v3.7.0 // indirect github.com/bytedance/sonic v1.13.3 // indirect github.com/bytedance/sonic/loader v0.2.4 // indirect diff --git a/packages/api/go.sum b/packages/api/go.sum index 1d207bd808..7fec5a8acb 100644 --- a/packages/api/go.sum +++ b/packages/api/go.sum @@ -149,8 +149,8 @@ github.com/bep/overlayfs v0.9.2/go.mod h1:aYY9W7aXQsGcA7V9x/pzeR8LjEgIxbtisZm8Q7 github.com/bep/tmc v0.5.1 h1:CsQnSC6MsomH64gw0cT5f+EwQDcvZz4AazKunFwTpuI= github.com/bep/tmc v0.5.1/go.mod h1:tGYHN8fS85aJPhDLgXETVKp+PR382OvFi2+q2GkGsq0= github.com/bits-and-blooms/bitset v1.10.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= -github.com/bits-and-blooms/bitset v1.22.0 h1:Tquv9S8+SGaS3EhyA+up3FXzmkhxPGjQQCkcs2uw7w4= -github.com/bits-and-blooms/bitset v1.22.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= +github.com/bits-and-blooms/bitset v1.24.2 h1:M7/NzVbsytmtfHbumG+K2bremQPMJuqv1JD3vOaFxp0= +github.com/bits-and-blooms/bitset v1.24.2/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/bits-and-blooms/bloom/v3 v3.7.0 h1:VfknkqV4xI+PsaDIsoHueyxVDZrfvMn56jeWUzvzdls= github.com/bits-and-blooms/bloom/v3 v3.7.0/go.mod h1:VKlUSvp0lFIYqxJjzdnSsZEw4iHb1kOL2tfHTgyJBHg= github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= diff --git a/packages/orchestrator/Makefile b/packages/orchestrator/Makefile index a80330edd1..8053f856c2 100644 --- a/packages/orchestrator/Makefile +++ b/packages/orchestrator/Makefile @@ -46,7 +46,7 @@ build-local: fetch-busybox .PHONY: build-debug build-debug: fetch-busybox - CGO_ENABLED=1 GOOS=linux GOARCH=$(BUILD_ARCH) go build -race -gcflags=all="-N -l" -o bin/orchestrator . + CGO_ENABLED=1 GOOS=linux GOARCH=$(BUILD_ARCH) go build -gcflags=all="-N -l" -o bin/orchestrator . .PHONY: run-debug run-debug: diff --git a/packages/orchestrator/go.mod b/packages/orchestrator/go.mod index 99f65ec53d..b701e3bc08 100644 --- a/packages/orchestrator/go.mod +++ b/packages/orchestrator/go.mod @@ -20,7 +20,7 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.32.6 github.com/aws/aws-sdk-go-v2/credentials v1.19.6 github.com/aws/aws-sdk-go-v2/service/ecr v1.44.0 - github.com/bits-and-blooms/bitset v1.22.0 + github.com/bits-and-blooms/bitset v1.24.2 github.com/bmatcuk/doublestar/v4 v4.9.1 github.com/caarlos0/env/v11 v11.3.1 github.com/containernetworking/plugins v1.9.0 @@ -97,6 +97,7 @@ require ( github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.54.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.54.0 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect + github.com/RoaringBitmap/roaring/v2 v2.16.0 // indirect github.com/andybalholm/brotli v1.2.0 // indirect github.com/armon/go-metrics v0.4.1 // indirect github.com/aws/aws-sdk-go-v2 v1.41.0 // indirect @@ -252,6 +253,7 @@ require ( github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect github.com/morikuni/aec v1.0.0 // indirect + github.com/mschoch/smat v0.2.0 // indirect github.com/oapi-codegen/oapi-codegen/v2 v2.5.1 // indirect github.com/oasdiff/yaml v0.0.0-20250309154309-f31be36b4037 // indirect github.com/oasdiff/yaml3 v0.0.0-20250309153720-d2182401db90 // indirect diff --git a/packages/orchestrator/go.sum b/packages/orchestrator/go.sum index f75731182b..2cd276d75f 100644 --- a/packages/orchestrator/go.sum +++ b/packages/orchestrator/go.sum @@ -112,6 +112,8 @@ github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb0 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= +github.com/RoaringBitmap/roaring/v2 v2.16.0 h1:Kys1UNf49d5W8Tq3bpuAhIr/Z8/yPB+59CO8A6c/BbE= +github.com/RoaringBitmap/roaring/v2 v2.16.0/go.mod h1:eq4wdNXxtJIS/oikeCzdX1rBzek7ANzbth041hrU8Q4= github.com/Shopify/logrus-bugsnag v0.0.0-20171204204709-577dee27f20d/go.mod h1:HI8ITrYtUY+O+ZhtlqUnD8+KwNPOyugEhfP9fdUIaEQ= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -178,8 +180,8 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngEKAMDJEczWVA= -github.com/bits-and-blooms/bitset v1.22.0 h1:Tquv9S8+SGaS3EhyA+up3FXzmkhxPGjQQCkcs2uw7w4= -github.com/bits-and-blooms/bitset v1.22.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= +github.com/bits-and-blooms/bitset v1.24.2 h1:M7/NzVbsytmtfHbumG+K2bremQPMJuqv1JD3vOaFxp0= +github.com/bits-and-blooms/bitset v1.24.2/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/blang/semver v3.1.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= github.com/bmatcuk/doublestar/v4 v4.9.1 h1:X8jg9rRZmJd4yRy7ZeNDRnM+T3ZfHv15JiBJ/avrEXE= @@ -982,6 +984,8 @@ github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJ github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/mrunalp/fileutils v0.5.0/go.mod h1:M1WthSahJixYnrXQl/DFQuteStB1weuxD2QJNHXfbSQ= +github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= +github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= diff --git a/packages/orchestrator/pkg/sandbox/block/cache.go b/packages/orchestrator/pkg/sandbox/block/cache.go index 7bf39d7a20..3b80b17ab4 100644 --- a/packages/orchestrator/pkg/sandbox/block/cache.go +++ b/packages/orchestrator/pkg/sandbox/block/cache.go @@ -20,6 +20,7 @@ import ( "go.uber.org/zap" "golang.org/x/sys/unix" + "github.com/e2b-dev/infra/packages/shared/pkg/atomicbitset" "github.com/e2b-dev/infra/packages/shared/pkg/logger" "github.com/e2b-dev/infra/packages/shared/pkg/storage/header" "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" @@ -52,13 +53,16 @@ type Cache struct { blockSize int64 mmap *mmap.MMap mu sync.RWMutex - dirty sync.Map + dirty atomicbitset.Bitset dirtyFile bool closed atomic.Bool } -// When we are passing filePath that is a file that has content we want to server want to use dirtyFile = true. func NewCache(size, blockSize int64, filePath string, dirtyFile bool) (*Cache, error) { + return newCache(size, blockSize, filePath, dirtyFile, "") +} + +func newCache(size, blockSize int64, filePath string, dirtyFile bool, bitsetImpl string) (*Cache, error) { f, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0o644) if err != nil { return nil, fmt.Errorf("error opening file: %w", err) @@ -96,6 +100,7 @@ func NewCache(size, blockSize int64, filePath string, dirtyFile bool) (*Cache, e size: size, blockSize: blockSize, dirtyFile: dirtyFile, + dirty: atomicbitset.New(uint(header.TotalBlocks(size, blockSize)), bitsetImpl), }, nil } @@ -138,15 +143,10 @@ func (c *Cache) ExportToDiff(ctx context.Context, out *os.File) (*header.DiffMet logger.L().Warn(ctx, "error syncing file", zap.Error(err)) } - buildStart := time.Now() - builder := header.NewDiffMetadataBuilder(c.size, c.blockSize) - - // We don't need to sort the keys as the bitset handles the ordering. - c.dirty.Range(func(key, _ any) bool { - builder.AddDirtyOffset(key.(int64)) + dirty := c.dirty.BitSet() - return true - }) + buildStart := time.Now() + builder := header.NewDiffMetadataBuilderFromDirtyBitSet(c.size, c.blockSize, dirty) diffMetadata := builder.Build() telemetry.SetAttributes(ctx, attribute.Int64("build_metadata_ms", time.Since(buildStart).Milliseconds())) @@ -301,6 +301,14 @@ func (c *Cache) Slice(off, length int64) ([]byte, error) { return nil, BytesNotAvailableError{} } +func (c *Cache) startBlock(off int64) uint { + return uint(header.BlockIdx(off, c.blockSize)) +} + +func (c *Cache) endBlock(off int64) uint { + return uint((off + c.blockSize - 1) / c.blockSize) +} + func (c *Cache) isCached(off, length int64) bool { // Make sure the offset is within the cache size if off >= c.size { @@ -309,23 +317,24 @@ func (c *Cache) isCached(off, length int64) bool { // Cap if the length goes beyond the cache size, so we don't check for blocks that are out of bounds. end := min(off+length, c.size) - // Recalculate the length based on the capped end, so we check for the correct blocks in case of capping. - length = end - off - for _, blockOff := range header.BlocksOffsets(length, c.blockSize) { - _, dirty := c.dirty.Load(off + blockOff) - if !dirty { - return false - } + lo := c.startBlock(off) + hi := c.endBlock(end) + + // Fast path: single-block check (common case for NBD reads). + if hi-lo == 1 { + return c.dirty.Has(lo) } - return true + return c.dirty.HasRange(lo, hi) } func (c *Cache) setIsCached(off, length int64) { - for _, blockOff := range header.BlocksOffsets(length, c.blockSize) { - c.dirty.Store(off+blockOff, struct{}{}) + if length <= 0 { + return } + + c.dirty.SetRange(c.startBlock(off), c.endBlock(off+length)) } // When using WriteAtWithoutLock you must ensure thread safety, ideally by only writing to the same block once and the exposing the slice. @@ -533,9 +542,7 @@ func (c *Cache) copyProcessMemory( return fmt.Errorf("failed to read memory: expected %d bytes, got %d", segmentSize, n) } - for _, blockOff := range header.BlocksOffsets(segmentSize, c.blockSize) { - c.dirty.Store(offset+blockOff, struct{}{}) - } + c.setIsCached(offset, segmentSize) offset += segmentSize diff --git a/packages/orchestrator/pkg/sandbox/block/chunk.go b/packages/orchestrator/pkg/sandbox/block/chunk.go index ad2017d2aa..9bb03b9a49 100644 --- a/packages/orchestrator/pkg/sandbox/block/chunk.go +++ b/packages/orchestrator/pkg/sandbox/block/chunk.go @@ -98,17 +98,19 @@ func NewChunker( cachePath string, metrics metrics.Metrics, ) (Chunker, error) { - useStreaming, minReadBatchSizeKB := getChunkerConfig(ctx, featureFlags) + useStreaming, minReadBatchSizeKB, bitsetImpl := getChunkerConfig(ctx, featureFlags) + + fmt.Printf("[DEBUG block.NewChunker] bitsetImpl=%q useStreaming=%v size=%d blockSize=%d cachePath=%s\n", bitsetImpl, useStreaming, size, blockSize, cachePath) if useStreaming { - return NewStreamingChunker(size, blockSize, upstream, cachePath, metrics, int64(minReadBatchSizeKB)*1024, featureFlags) + return newStreamingChunker(size, blockSize, upstream, cachePath, metrics, int64(minReadBatchSizeKB)*1024, featureFlags, bitsetImpl) } - return NewFullFetchChunker(size, blockSize, upstream, cachePath, metrics) + return newFullFetchChunker(size, blockSize, upstream, cachePath, metrics, bitsetImpl) } // getChunkerConfig fetches the chunker-config feature flag and returns the parsed values. -func getChunkerConfig(ctx context.Context, ff *featureflags.Client) (useStreaming bool, minReadBatchSizeKB int) { +func getChunkerConfig(ctx context.Context, ff *featureflags.Client) (useStreaming bool, minReadBatchSizeKB int, bitsetImpl string) { value := ff.JSONFlag(ctx, featureflags.ChunkerConfigFlag) if v := value.GetByKey("useStreaming"); v.IsDefined() { @@ -119,7 +121,11 @@ func getChunkerConfig(ctx context.Context, ff *featureflags.Client) (useStreamin minReadBatchSizeKB = v.IntValue() } - return useStreaming, minReadBatchSizeKB + if v := value.GetByKey("bitset"); v.IsDefined() { + bitsetImpl = v.StringValue() + } + + return useStreaming, minReadBatchSizeKB, bitsetImpl } type FullFetchChunker struct { @@ -138,7 +144,17 @@ func NewFullFetchChunker( cachePath string, metrics metrics.Metrics, ) (*FullFetchChunker, error) { - cache, err := NewCache(size, blockSize, cachePath, false) + return newFullFetchChunker(size, blockSize, base, cachePath, metrics, "") +} + +func newFullFetchChunker( + size, blockSize int64, + base storage.SeekableReader, + cachePath string, + metrics metrics.Metrics, + bitsetImpl string, +) (*FullFetchChunker, error) { + cache, err := newCache(size, blockSize, cachePath, false, bitsetImpl) if err != nil { return nil, fmt.Errorf("failed to create file cache: %w", err) } diff --git a/packages/orchestrator/pkg/sandbox/block/streaming_chunk.go b/packages/orchestrator/pkg/sandbox/block/streaming_chunk.go index 7e40b35c4e..d1f7f9ab5b 100644 --- a/packages/orchestrator/pkg/sandbox/block/streaming_chunk.go +++ b/packages/orchestrator/pkg/sandbox/block/streaming_chunk.go @@ -172,7 +172,19 @@ func NewStreamingChunker( minReadBatchSize int64, ff *featureflags.Client, ) (*StreamingChunker, error) { - cache, err := NewCache(size, blockSize, cachePath, false) + return newStreamingChunker(size, blockSize, upstream, cachePath, metrics, minReadBatchSize, ff, "") +} + +func newStreamingChunker( + size, blockSize int64, + upstream storage.StreamingReader, + cachePath string, + metrics metrics.Metrics, + minReadBatchSize int64, + ff *featureflags.Client, + bitsetImpl string, +) (*StreamingChunker, error) { + cache, err := newCache(size, blockSize, cachePath, false, bitsetImpl) if err != nil { return nil, fmt.Errorf("failed to create file cache: %w", err) } @@ -441,7 +453,7 @@ func (c *StreamingChunker) progressiveRead(ctx context.Context, s *fetchSession, // it can be tuned without restarting the service. func (c *StreamingChunker) getMinReadBatchSize(ctx context.Context) int64 { if c.featureFlags != nil { - _, minKB := getChunkerConfig(ctx, c.featureFlags) + _, minKB, _ := getChunkerConfig(ctx, c.featureFlags) if minKB > 0 { return int64(minKB) * 1024 } diff --git a/packages/shared/go.mod b/packages/shared/go.mod index 093aa0ae83..25b42a6b76 100644 --- a/packages/shared/go.mod +++ b/packages/shared/go.mod @@ -8,12 +8,13 @@ require ( cloud.google.com/go/artifactregistry v1.17.1 cloud.google.com/go/storage v1.59.2 connectrpc.com/connect v1.18.1 + github.com/RoaringBitmap/roaring/v2 v2.16.0 github.com/aws/aws-sdk-go-v2 v1.41.0 github.com/aws/aws-sdk-go-v2/config v1.32.6 github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.74 github.com/aws/aws-sdk-go-v2/service/ecr v1.44.0 github.com/aws/aws-sdk-go-v2/service/s3 v1.79.3 - github.com/bits-and-blooms/bitset v1.22.0 + github.com/bits-and-blooms/bitset v1.24.2 github.com/bsm/redislock v0.9.4 github.com/dchest/uniuri v1.2.0 github.com/gin-gonic/gin v1.10.1 @@ -267,6 +268,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect github.com/morikuni/aec v1.0.0 // indirect + github.com/mschoch/smat v0.2.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect github.com/oklog/ulid v1.3.1 // indirect diff --git a/packages/shared/go.sum b/packages/shared/go.sum index a58712d7d6..6abc73af5f 100644 --- a/packages/shared/go.sum +++ b/packages/shared/go.sum @@ -78,6 +78,8 @@ github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpz github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= +github.com/RoaringBitmap/roaring/v2 v2.16.0 h1:Kys1UNf49d5W8Tq3bpuAhIr/Z8/yPB+59CO8A6c/BbE= +github.com/RoaringBitmap/roaring/v2 v2.16.0/go.mod h1:eq4wdNXxtJIS/oikeCzdX1rBzek7ANzbth041hrU8Q4= github.com/Workiva/go-datastructures v1.1.6 h1:e2eUkTi+YlNRw6YxH2c+DmgXENTKjCofaiVeDIv6e/U= github.com/Workiva/go-datastructures v1.1.6/go.mod h1:1yZL+zfsztete+ePzZz/Zb1/t5BnDuE2Ya2MMGhzP6A= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= @@ -153,8 +155,8 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bits-and-blooms/bitset v1.10.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= -github.com/bits-and-blooms/bitset v1.22.0 h1:Tquv9S8+SGaS3EhyA+up3FXzmkhxPGjQQCkcs2uw7w4= -github.com/bits-and-blooms/bitset v1.22.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= +github.com/bits-and-blooms/bitset v1.24.2 h1:M7/NzVbsytmtfHbumG+K2bremQPMJuqv1JD3vOaFxp0= +github.com/bits-and-blooms/bitset v1.24.2/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/bits-and-blooms/bloom/v3 v3.7.0 h1:VfknkqV4xI+PsaDIsoHueyxVDZrfvMn56jeWUzvzdls= github.com/bits-and-blooms/bloom/v3 v3.7.0/go.mod h1:VKlUSvp0lFIYqxJjzdnSsZEw4iHb1kOL2tfHTgyJBHg= github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= @@ -648,6 +650,8 @@ github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFd github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= +github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= +github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= diff --git a/packages/shared/pkg/atomicbitset/bab.go b/packages/shared/pkg/atomicbitset/bab.go new file mode 100644 index 0000000000..27807e555c --- /dev/null +++ b/packages/shared/pkg/atomicbitset/bab.go @@ -0,0 +1,77 @@ +package atomicbitset + +import ( + "sync" + + "github.com/bits-and-blooms/bitset" +) + +// BitsAndBlooms wraps a bits-and-blooms/bitset.BitSet with an internal RWMutex. +type BitsAndBlooms struct { + mu sync.RWMutex + bs *bitset.BitSet + n uint +} + +func NewBitsAndBlooms(n uint) *BitsAndBlooms { + return &BitsAndBlooms{ + bs: bitset.New(n), + n: n, + } +} + +func (b *BitsAndBlooms) Len() uint { return b.n } + +func (b *BitsAndBlooms) Has(i uint) bool { + if i >= b.n { + return false + } + + b.mu.RLock() + v := b.bs.Test(i) + b.mu.RUnlock() + + return v +} + +func (b *BitsAndBlooms) HasRange(lo, hi uint) bool { + if lo >= hi { + return true + } + if hi > b.n { + hi = b.n + } + if lo >= hi { + return false + } + + b.mu.RLock() + defer b.mu.RUnlock() + + for i := lo; i < hi; i++ { + if !b.bs.Test(i) { + return false + } + } + + return true +} + +func (b *BitsAndBlooms) SetRange(lo, hi uint) { + if hi > b.n { + hi = b.n + } + if lo >= hi { + return + } + + b.mu.Lock() + for i := lo; i < hi; i++ { + b.bs.Set(i) + } + b.mu.Unlock() +} + +func (b *BitsAndBlooms) BitSet() *bitset.BitSet { + return b.bs.Clone() +} diff --git a/packages/shared/pkg/atomicbitset/bitset.go b/packages/shared/pkg/atomicbitset/bitset.go new file mode 100644 index 0000000000..fa60b6c4bf --- /dev/null +++ b/packages/shared/pkg/atomicbitset/bitset.go @@ -0,0 +1,50 @@ +// Package atomicbitset provides fixed-size bitset implementations. +// All implementations are safe for concurrent HasRange and SetRange. +package atomicbitset + +import ( + "fmt" + + "github.com/bits-and-blooms/bitset" +) + +type Bitset interface { + Has(i uint) bool + HasRange(lo, hi uint) bool + SetRange(lo, hi uint) + BitSet() *bitset.BitSet + Len() uint +} + +const ( + autoThreshold uint = 524_288 // 64 KB flat bitmap + + // Valid impl values for New. + BitsetDefault = "" + BitsetRoaring = "roaring" + BitsetRoaring64 = "roaring64" + BitsetAtomic = "atomic" + BitsetBitsAndBlooms = "bits-and-blooms" + BitsetSyncMap = "syncmap" +) + +func New(n uint, impl string) Bitset { + switch impl { + case BitsetDefault, BitsetRoaring: + return NewRoaring(n) + case BitsetRoaring64: + return NewRoaring64(n) + case BitsetBitsAndBlooms: + return NewBitsAndBlooms(n) + case BitsetAtomic: + if n <= autoThreshold { + return NewFlat(n) + } + + return NewSharded(n, DefaultShardBits) + case BitsetSyncMap: + return NewSyncMap(n) + default: + panic(fmt.Sprintf("atomicbitset: unknown implementation %q", impl)) + } +} diff --git a/packages/shared/pkg/atomicbitset/bitset_test.go b/packages/shared/pkg/atomicbitset/bitset_test.go new file mode 100644 index 0000000000..11a11855cd --- /dev/null +++ b/packages/shared/pkg/atomicbitset/bitset_test.go @@ -0,0 +1,457 @@ +package atomicbitset + +import ( + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/require" +) + +type implFactory struct { + name string + make func(n uint) Bitset +} + +var impls = []implFactory{ + {"Flat", func(n uint) Bitset { return NewFlat(n) }}, + {"Roaring", func(n uint) Bitset { return NewRoaring(n) }}, + {"Roaring64", func(n uint) Bitset { return NewRoaring64(n) }}, + {"BitsAndBlooms", func(n uint) Bitset { return NewBitsAndBlooms(n) }}, + {"Sharded", func(n uint) Bitset { return NewSharded(n, DefaultShardBits) }}, + {"Sharded/small", func(n uint) Bitset { return NewSharded(n, 64) }}, + {"SyncMap", func(n uint) Bitset { return NewSyncMap(n) }}, +} + +func TestHasRange(t *testing.T) { + t.Parallel() + for _, impl := range impls { + t.Run(impl.name, func(t *testing.T) { + t.Parallel() + b := impl.make(128) + b.SetRange(10, 20) + + require.True(t, b.HasRange(10, 20)) + require.True(t, b.HasRange(12, 18)) + require.False(t, b.HasRange(9, 20)) + require.False(t, b.HasRange(10, 21)) + require.True(t, b.HasRange(50, 50)) // empty + }) + } +} + +func TestSetRange_WordBoundary(t *testing.T) { + t.Parallel() + for _, impl := range impls { + t.Run(impl.name, func(t *testing.T) { + t.Parallel() + b := impl.make(256) + b.SetRange(60, 68) // crosses word boundary at 64 + + require.True(t, b.HasRange(60, 68)) + require.False(t, b.HasRange(59, 60)) + require.False(t, b.HasRange(68, 69)) + }) + } +} + +func TestSetRange_Large(t *testing.T) { + t.Parallel() + for _, impl := range impls { + t.Run(impl.name, func(t *testing.T) { + t.Parallel() + b := impl.make(512) + b.SetRange(50, 250) + + require.True(t, b.HasRange(50, 250)) + require.False(t, b.HasRange(49, 50)) + require.False(t, b.HasRange(250, 251)) + }) + } +} + +func TestSetRange_All(t *testing.T) { + t.Parallel() + for _, impl := range impls { + t.Run(impl.name, func(t *testing.T) { + t.Parallel() + b := impl.make(256) + b.SetRange(0, 256) + + require.True(t, b.HasRange(0, 256)) + }) + } +} + +func TestSetRange_PastLen(t *testing.T) { + t.Parallel() + for _, impl := range impls { + t.Run(impl.name, func(t *testing.T) { + t.Parallel() + b := impl.make(128) + b.SetRange(126, 200) // should not panic, capped to 128 + + require.True(t, b.HasRange(126, 128)) + }) + } +} + +func TestSetRange_Idempotent(t *testing.T) { + t.Parallel() + for _, impl := range impls { + t.Run(impl.name, func(t *testing.T) { + t.Parallel() + b := impl.make(128) + b.SetRange(5, 6) + b.SetRange(5, 6) + + require.True(t, b.Has(5)) + require.False(t, b.Has(4)) + require.False(t, b.Has(6)) + }) + } +} + +func TestSetRange_Overlapping(t *testing.T) { + t.Parallel() + for _, impl := range impls { + t.Run(impl.name, func(t *testing.T) { + t.Parallel() + b := impl.make(128) + b.SetRange(5, 10) + b.SetRange(8, 13) + + require.True(t, b.HasRange(5, 13)) + require.False(t, b.HasRange(4, 5)) + require.False(t, b.HasRange(13, 14)) + }) + } +} + +func TestSetRange_Concurrent(t *testing.T) { + t.Parallel() + + // All impls are safe for concurrent use (atomic via atomic ops, others via internal mutex). + for _, impl := range impls { + t.Run(impl.name, func(t *testing.T) { + t.Parallel() + b := impl.make(512) + + var wg sync.WaitGroup + for g := range 8 { + wg.Go(func() { + lo := uint(g) * 32 + b.SetRange(lo, lo+128) + }) + } + wg.Wait() + + last := uint(7*32 + 128) + require.True(t, b.HasRange(0, last)) + require.False(t, b.HasRange(last, last+1)) + }) + } +} + +func TestHas_Empty(t *testing.T) { + t.Parallel() + for _, impl := range impls { + t.Run(impl.name, func(t *testing.T) { + t.Parallel() + b := impl.make(128) + for i := range uint(128) { + require.False(t, b.Has(i), "bit %d should be unset", i) + } + require.False(t, b.HasRange(0, 128)) + }) + } +} + +func TestHas_Individual(t *testing.T) { + t.Parallel() + for _, impl := range impls { + t.Run(impl.name, func(t *testing.T) { + t.Parallel() + b := impl.make(256) + set := []uint{100, 5, 200, 63, 64} + for _, i := range set { + b.SetRange(i, i+1) + } + for _, i := range set { + require.True(t, b.Has(i), "bit %d should be set", i) + } + for _, i := range []uint{0, 4, 6, 62, 65, 99, 101, 199, 201, 255} { + require.False(t, b.Has(i), "bit %d should be unset", i) + } + }) + } +} + +func TestHasRange_Contiguous(t *testing.T) { + t.Parallel() + for _, impl := range impls { + t.Run(impl.name, func(t *testing.T) { + t.Parallel() + b := impl.make(128) + b.SetRange(10, 15) + require.True(t, b.HasRange(10, 15)) + require.False(t, b.Has(9)) + require.False(t, b.Has(15)) + }) + } +} + +func TestZero(t *testing.T) { + t.Parallel() + for _, impl := range impls { + t.Run(impl.name, func(t *testing.T) { + t.Parallel() + b := impl.make(0) + require.Equal(t, uint(0), b.Len()) + require.False(t, b.Has(0)) + b.SetRange(0, 1) // should not panic + }) + } +} + +func TestSetRange_NonAligned(t *testing.T) { + t.Parallel() + for _, impl := range impls { + t.Run(impl.name, func(t *testing.T) { + t.Parallel() + b := impl.make(100) // not a multiple of 64 + b.SetRange(95, 100) + + require.True(t, b.HasRange(95, 100)) + require.True(t, b.Has(99)) + require.False(t, b.Has(94)) + }) + } +} + +func TestHasRange_OutOfBounds(t *testing.T) { + t.Parallel() + for _, impl := range impls { + t.Run(impl.name, func(t *testing.T) { + t.Parallel() + b := impl.make(128) + b.SetRange(0, 128) + + require.False(t, b.HasRange(200, 300)) // lo past capacity + require.True(t, b.HasRange(50, 50)) // empty, in range + require.True(t, b.HasRange(128, 128)) // empty, at boundary + require.False(t, b.HasRange(128, 200)) // lo at capacity, non-empty after cap + }) + } +} + +func TestLen(t *testing.T) { + t.Parallel() + for _, impl := range impls { + t.Run(impl.name, func(t *testing.T) { + t.Parallel() + b := impl.make(100) + require.Equal(t, uint(100), b.Len()) + }) + } +} + +func TestNew(t *testing.T) { + t.Parallel() + + require.IsType(t, (*Roaring)(nil), New(1000, "")) + require.IsType(t, (*Roaring)(nil), New(autoThreshold+1, "")) + require.IsType(t, (*Flat)(nil), New(1000, "atomic")) + require.IsType(t, (*Sharded)(nil), New(autoThreshold+1, "atomic")) + require.IsType(t, (*Roaring)(nil), New(1000, "roaring")) + require.IsType(t, (*BitsAndBlooms)(nil), New(1000, "bits-and-blooms")) + require.IsType(t, (*SyncMap)(nil), New(1000, "syncmap")) + + require.Panics(t, func() { New(1000, "bogus") }) +} + +// TestCachePattern reproduces the exact SetRange/HasRange sequence that the +// block cache uses: chunk-aligned writes followed by arbitrary sub-block reads. +// Parameters mirror a real 6.5 MB rootfs with 4 KB blocks and 4 MB chunks. +func TestCachePattern(t *testing.T) { + t.Parallel() + + const ( + fileSize int64 = 6_815_744 // bytes + blockSize int64 = 4096 // bytes + chunkSize int64 = 4_194_304 // 4 MB + ) + + totalBlocks := uint((fileSize + blockSize - 1) / blockSize) // ceil + startBlock := func(off int64) uint { return uint(off / blockSize) } + endBlock := func(off int64) uint { return uint((off + blockSize - 1) / blockSize) } + + for _, impl := range impls { + t.Run(impl.name, func(t *testing.T) { + t.Parallel() + b := impl.make(totalBlocks) + + // Simulate chunk fetcher writing all chunks. + for fetchOff := int64(0); fetchOff < fileSize; fetchOff += chunkSize { + readBytes := min(chunkSize, fileSize-fetchOff) + lo := startBlock(fetchOff) + hi := endBlock(fetchOff + readBytes) + b.SetRange(lo, hi) + } + + // Every individual block should now be cached. + for blk := range totalBlocks { + require.True(t, b.Has(blk), "block %d not set", blk) + } + + // Full-range check. + require.True(t, b.HasRange(0, totalBlocks), "full range not set") + + // Simulate NBD reads: 4K-aligned reads across the entire file. + for off := int64(0); off < fileSize; off += blockSize { + end := min(off+blockSize, fileSize) + lo := startBlock(off) + hi := endBlock(end) + require.True(t, b.HasRange(lo, hi), + "HasRange(%d, %d) false for read at offset %d", lo, hi, off) + } + + // Simulate isCached(fetchOff, MemoryChunkSize) — the exact call + // the chunker makes to check whether a chunk is already cached. + for fetchOff := int64(0); fetchOff < fileSize; fetchOff += chunkSize { + end := min(fetchOff+chunkSize, fileSize) + lo := startBlock(fetchOff) + hi := endBlock(end) + require.True(t, b.HasRange(lo, hi), + "chunk HasRange(%d, %d) false for fetchOff %d", lo, hi, fetchOff) + } + }) + } +} + +// --- Benchmarks --- +// +// Realistic sizes: +// rootfs: 512 MB / 4 KB blocks = 131072 bits +// memfile: 2 GB / 2 MB hugepages = 1024 bits + +// Realistic size: 1 GB rootfs / 4 KB blocks = 262144 bits. +// Chunk = 1024 blocks (4 MB chunk at 4 KB block size). +// Sharded: DefaultShardBits=32768 → 8 shards (128 MB / 4 KB each). +const ( + benchBits uint = 262144 + benchChunk uint = 1024 +) + +// benchImpls excludes Sharded/small (not a production config). +var benchImpls = []implFactory{ + {"Flat", func(n uint) Bitset { return NewFlat(n) }}, + {"Roaring", func(n uint) Bitset { return NewRoaring(n) }}, + {"BitsAndBlooms", func(n uint) Bitset { return NewBitsAndBlooms(n) }}, + {"Sharded", func(n uint) Bitset { return NewSharded(n, DefaultShardBits) }}, +} + +func BenchmarkSetRange(b *testing.B) { + for _, impl := range benchImpls { + b.Run(impl.name, func(b *testing.B) { + bs := impl.make(benchBits) + b.ResetTimer() + for i := range b.N { + lo := uint(i) % (benchBits / benchChunk) * benchChunk + bs.SetRange(lo, lo+benchChunk) + } + }) + } +} + +func BenchmarkHas_Hit(b *testing.B) { + for _, impl := range benchImpls { + b.Run(impl.name, func(b *testing.B) { + bs := impl.make(benchBits) + bs.SetRange(0, benchBits) + b.ResetTimer() + for i := range b.N { + bit := uint(i) % benchBits + if !bs.Has(bit) { + b.Fatal("expected set") + } + } + }) + } +} + +func BenchmarkHas_Miss(b *testing.B) { + for _, impl := range benchImpls { + b.Run(impl.name, func(b *testing.B) { + bs := impl.make(benchBits) + b.ResetTimer() + for i := range b.N { + bit := uint(i) % benchBits + if bs.Has(bit) { + b.Fatal("expected unset") + } + } + }) + } +} + +func BenchmarkHasRange_Hit(b *testing.B) { + for _, impl := range benchImpls { + b.Run(impl.name, func(b *testing.B) { + bs := impl.make(benchBits) + bs.SetRange(0, benchBits) + b.ResetTimer() + for i := range b.N { + lo := uint(i) % (benchBits / benchChunk) * benchChunk + if !bs.HasRange(lo, lo+benchChunk) { + b.Fatal("expected set") + } + } + }) + } +} + +func BenchmarkHasRange_Miss(b *testing.B) { + for _, impl := range benchImpls { + b.Run(impl.name, func(b *testing.B) { + bs := impl.make(benchBits) + b.ResetTimer() + for i := range b.N { + lo := uint(i) % (benchBits / benchChunk) * benchChunk + if bs.HasRange(lo, lo+benchChunk) { + b.Fatal("expected unset") + } + } + }) + } +} + +// --- Concurrent read benchmarks --- + +var concurrencyLevels = []int{1, 4, 16, 64} + +func BenchmarkHas_HitConcurrent(b *testing.B) { + for _, impl := range benchImpls { + b.Run(impl.name, func(b *testing.B) { + for _, p := range concurrencyLevels { + b.Run(fmt.Sprintf("P%d", p), func(b *testing.B) { + bs := impl.make(benchBits) + bs.SetRange(0, benchBits) + + b.SetParallelism(p) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := uint(0) + for pb.Next() { + bit := i % benchBits + if !bs.Has(bit) { + b.Fatal("expected set") + } + i++ + } + }) + }) + } + }) + } +} diff --git a/packages/shared/pkg/atomicbitset/flat.go b/packages/shared/pkg/atomicbitset/flat.go new file mode 100644 index 0000000000..6d2860e6f5 --- /dev/null +++ b/packages/shared/pkg/atomicbitset/flat.go @@ -0,0 +1,95 @@ +package atomicbitset + +import ( + "math" + "sync/atomic" + + "github.com/bits-and-blooms/bitset" +) + +type Flat struct { + words []atomic.Uint64 + n uint +} + +func NewFlat(n uint) *Flat { + return &Flat{ + words: make([]atomic.Uint64, (n+63)/64), + n: n, + } +} + +func (b *Flat) Len() uint { return b.n } + +func (b *Flat) Has(i uint) bool { + if i >= b.n { + return false + } + + wordIndex := i / 64 + bitIndex := i % 64 + + mask := uint64(1) << bitIndex + + return b.words[wordIndex].Load()&mask != 0 +} + +func (b *Flat) HasRange(lo, hi uint) bool { + if lo >= hi { + return true + } + if hi > b.n { + hi = b.n + } + if lo >= hi { + return false + } + for i := lo; i < hi; { + w := i / 64 + bit := i % 64 + top := min(hi-w*64, 64) + mask := wordMask(bit, top) + + if b.words[w].Load()&mask != mask { + return false + } + i = (w + 1) * 64 + } + + return true +} + +func (b *Flat) SetRange(lo, hi uint) { + if hi > b.n { + hi = b.n + } + if lo >= hi { + return + } + for i := lo; i < hi; { + w := i / 64 + bit := i % 64 + top := min(hi-w*64, 64) + + b.words[w].Or(wordMask(bit, top)) + + i = (w + 1) * 64 + } +} + +func (b *Flat) BitSet() *bitset.BitSet { + words := make([]uint64, len(b.words)) + for i := range b.words { + words[i] = b.words[i].Load() + } + + return bitset.FromWithLength(b.n, words) +} + +func wordMask(lo, hi uint) uint64 { + if hi-lo == 64 { + return math.MaxUint64 + } + + return ((1 << (hi - lo)) - 1) << lo +} diff --git a/packages/shared/pkg/atomicbitset/roaring.go b/packages/shared/pkg/atomicbitset/roaring.go new file mode 100644 index 0000000000..f69be7a68f --- /dev/null +++ b/packages/shared/pkg/atomicbitset/roaring.go @@ -0,0 +1,81 @@ +package atomicbitset + +import ( + "sync" + + "github.com/RoaringBitmap/roaring/v2" + "github.com/bits-and-blooms/bitset" +) + +// Roaring wraps a roaring bitmap (32-bit) with an internal RWMutex. +type Roaring struct { + mu sync.RWMutex + bm *roaring.Bitmap + + n uint +} + +func NewRoaring(n uint) *Roaring { + return &Roaring{ + bm: roaring.New(), + n: n, + } +} + +func (r *Roaring) Len() uint { + return r.n +} + +func (r *Roaring) Has(i uint) bool { + if i >= r.n { + return false + } + + r.mu.RLock() + v := r.bm.Contains(uint32(i)) + r.mu.RUnlock() + + return v +} + +func (r *Roaring) HasRange(lo, hi uint) bool { + if lo >= hi { + return true + } + + if hi > r.n { + hi = r.n + } + + r.mu.RLock() + defer r.mu.RUnlock() + + for i := lo; i < hi; i++ { + if !r.bm.Contains(uint32(i)) { + return false + } + } + + return true +} + +func (r *Roaring) SetRange(lo, hi uint) { + if hi > r.n { + hi = r.n + } + + if lo >= hi { + return + } + + r.mu.Lock() + r.bm.AddRange(uint64(lo), uint64(hi)) + r.mu.Unlock() +} + +func (r *Roaring) BitSet() *bitset.BitSet { + r.mu.RLock() + defer r.mu.RUnlock() + + return r.bm.ToBitSet() +} diff --git a/packages/shared/pkg/atomicbitset/roaring64.go b/packages/shared/pkg/atomicbitset/roaring64.go new file mode 100644 index 0000000000..8916dadca5 --- /dev/null +++ b/packages/shared/pkg/atomicbitset/roaring64.go @@ -0,0 +1,85 @@ +package atomicbitset + +import ( + "sync" + + "github.com/RoaringBitmap/roaring/v2/roaring64" + "github.com/bits-and-blooms/bitset" +) + +// Roaring64 wraps a roaring64 bitmap (64-bit keys) with an internal RWMutex. +type Roaring64 struct { + mu sync.RWMutex + bm *roaring64.Bitmap + n uint +} + +func NewRoaring64(n uint) *Roaring64 { + return &Roaring64{ + bm: roaring64.New(), + n: n, + } +} + +func (r *Roaring64) Len() uint { return r.n } + +func (r *Roaring64) Has(i uint) bool { + if i >= r.n { + return false + } + + r.mu.RLock() + v := r.bm.Contains(uint64(i)) + r.mu.RUnlock() + + return v +} + +func (r *Roaring64) HasRange(lo, hi uint) bool { + if lo >= hi { + return true + } + if hi > r.n { + hi = r.n + } + if lo >= hi { + return false + } + + r.mu.RLock() + defer r.mu.RUnlock() + + for i := lo; i < hi; i++ { + if !r.bm.Contains(uint64(i)) { + return false + } + } + + return true +} + +func (r *Roaring64) SetRange(lo, hi uint) { + if hi > r.n { + hi = r.n + } + if lo >= hi { + return + } + + r.mu.Lock() + r.bm.AddRange(uint64(lo), uint64(hi)) + r.mu.Unlock() +} + +func (r *Roaring64) BitSet() *bitset.BitSet { + r.mu.RLock() + defer r.mu.RUnlock() + + bs := bitset.New(r.n) + it := r.bm.Iterator() + for it.HasNext() { + bs.Set(uint(it.Next())) + } + + return bs +} diff --git a/packages/shared/pkg/atomicbitset/sharded.go b/packages/shared/pkg/atomicbitset/sharded.go new file mode 100644 index 0000000000..9d32aa3b4a --- /dev/null +++ b/packages/shared/pkg/atomicbitset/sharded.go @@ -0,0 +1,184 @@ +package atomicbitset + +import ( + "sync/atomic" + + "github.com/bits-and-blooms/bitset" +) + +// DefaultShardBits is 128 MB / 4 KB = 32768 bits per shard (4 KB bitmap, one OS page). +const DefaultShardBits uint = 32768 + +type shard struct { + words []atomic.Uint64 +} + +func newShard(bitsPerShard uint) *shard { + return &shard{ + words: make([]atomic.Uint64, (bitsPerShard+63)/64), + } +} + +// Sharded is a two-level lock-free bitset with lazily allocated shard bitmaps. +type Sharded struct { + shards []atomic.Pointer[shard] + n uint + bitsPerShard uint +} + +func NewSharded(n, bitsPerShard uint) *Sharded { + if bitsPerShard == 0 { + bitsPerShard = DefaultShardBits + } + numShards := (n + bitsPerShard - 1) / bitsPerShard + + return &Sharded{ + shards: make([]atomic.Pointer[shard], numShards), + n: n, + bitsPerShard: bitsPerShard, + } +} + +func (s *Sharded) Len() uint { return s.n } + +func (s *Sharded) getShard(idx uint) *shard { + return s.shards[idx].Load() +} + +func (s *Sharded) getOrCreateShard(idx uint) *shard { + p := &s.shards[idx] + sh := p.Load() + if sh != nil { + return sh + } + candidate := newShard(s.bitsPerShard) + if p.CompareAndSwap(nil, candidate) { + return candidate + } + + return p.Load() +} + +func (s *Sharded) Has(i uint) bool { + if i >= s.n { + return false + } + sh := s.getShard(i / s.bitsPerShard) + if sh == nil { + return false + } + local := i % s.bitsPerShard + wordIndex := local / 64 + bitIndex := local % 64 + + mask := uint64(1) << bitIndex + + return sh.words[wordIndex].Load()&mask != 0 +} + +func (s *Sharded) HasRange(lo, hi uint) bool { + if lo >= hi { + return true + } + if hi > s.n { + hi = s.n + } + if lo >= hi { + return false + } + + for i := lo; i < hi; { + shardIdx := i / s.bitsPerShard + localLo := i % s.bitsPerShard + localHi := min(hi-shardIdx*s.bitsPerShard, s.bitsPerShard) + + sh := s.getShard(shardIdx) + if sh == nil { + return false + } + if !shardHasRange(sh, localLo, localHi) { + return false + } + + i = (shardIdx + 1) * s.bitsPerShard + } + + return true +} + +func (s *Sharded) SetRange(lo, hi uint) { + if hi > s.n { + hi = s.n + } + if lo >= hi { + return + } + + for i := lo; i < hi; { + shardIdx := i / s.bitsPerShard + localLo := i % s.bitsPerShard + localHi := min(hi-shardIdx*s.bitsPerShard, s.bitsPerShard) + + shardSetRange(s.getOrCreateShard(shardIdx), localLo, localHi) + + i = (shardIdx + 1) * s.bitsPerShard + } +} + +func (s *Sharded) BitSet() *bitset.BitSet { + totalWords := (s.n + 63) / 64 + words := make([]uint64, totalWords) + + for si := range s.shards { + sh := s.shards[si].Load() + if sh == nil { + continue + } + baseWord := uint(si) * s.bitsPerShard / 64 + for wi := range sh.words { + dst := baseWord + uint(wi) + if dst >= totalWords { + break + } + words[dst] = sh.words[wi].Load() + } + } + + return bitset.FromWithLength(s.n, words) +} + +func shardHasRange(sh *shard, lo, hi uint) bool { + for i := lo; i < hi; { + w := i / 64 + bit := i % 64 + top := min(hi-w*64, 64) + mask := wordMask(bit, top) + + if sh.words[w].Load()&mask != mask { + return false + } + i = (w + 1) * 64 + } + + return true +} + +func shardSetRange(sh *shard, lo, hi uint) { + for i := lo; i < hi; { + w := i / 64 + bit := i % 64 + top := min(hi-w*64, 64) + + sh.words[w].Or(wordMask(bit, top)) + + i = (w + 1) * 64 + } +} + +var ( + _ Bitset = (*Flat)(nil) + _ Bitset = (*Roaring)(nil) + _ Bitset = (*Roaring64)(nil) + _ Bitset = (*Sharded)(nil) + _ Bitset = (*SyncMap)(nil) +) diff --git a/packages/shared/pkg/atomicbitset/syncmap.go b/packages/shared/pkg/atomicbitset/syncmap.go new file mode 100644 index 0000000000..04d01695ba --- /dev/null +++ b/packages/shared/pkg/atomicbitset/syncmap.go @@ -0,0 +1,71 @@ +package atomicbitset + +import ( + "sync" + + "github.com/bits-and-blooms/bitset" +) + +// SyncMap wraps a sync.Map to implement Bitset using block-index keys. +// This mirrors the original sync.Map-based dirty tracking for A/B testing. +type SyncMap struct { + m sync.Map + n uint +} + +func NewSyncMap(n uint) *SyncMap { + return &SyncMap{n: n} +} + +func (s *SyncMap) Len() uint { return s.n } + +func (s *SyncMap) Has(i uint) bool { + if i >= s.n { + return false + } + + _, ok := s.m.Load(i) + + return ok +} + +func (s *SyncMap) HasRange(lo, hi uint) bool { + if lo >= hi { + return true + } + if hi > s.n { + hi = s.n + } + if lo >= hi { + return false + } + + for i := lo; i < hi; i++ { + if _, ok := s.m.Load(i); !ok { + return false + } + } + + return true +} + +func (s *SyncMap) SetRange(lo, hi uint) { + if hi > s.n { + hi = s.n + } + + for i := lo; i < hi; i++ { + s.m.Store(i, struct{}{}) + } +} + +func (s *SyncMap) BitSet() *bitset.BitSet { + bs := bitset.New(s.n) + s.m.Range(func(key, _ any) bool { + bs.Set(key.(uint)) + + return true + }) + + return bs +} diff --git a/packages/shared/pkg/featureflags/flags.go b/packages/shared/pkg/featureflags/flags.go index 4f9c31eec8..51b5e0a7ec 100644 --- a/packages/shared/pkg/featureflags/flags.go +++ b/packages/shared/pkg/featureflags/flags.go @@ -312,17 +312,19 @@ func GetTrackedTemplatesSet(ctx context.Context, ff *Client) map[string]struct{} return result } -// ChunkerConfigFlag is a JSON flag controlling the chunker implementation and tuning. +// ChunkerConfigFlag is a JSON flag controlling the chunker implementation and +// tuning. // -// NOTE: Changing useStreaming has no effect on chunkers already created for -// cached templates. A service restart (redeploy) is required for that change -// to take effect. minReadBatchSizeKB is checked just-in-time on each fetch, -// so it takes effect immediately. +// NOTE: Changing useStreaming or bitset has no effect on chunkers already +// created for cached templates. A service restart (redeploy) is required for +// those changes to take effect. minReadBatchSizeKB is checked just-in-time on +// each fetch, so it takes effect immediately. // -// JSON format: {"useStreaming": false, "minReadBatchSizeKB": 16} +// JSON format: {"useStreaming": false, "minReadBatchSizeKB": 16, "bitset": ""} var ChunkerConfigFlag = newJSONFlag("chunker-config", ldvalue.FromJSONMarshal(map[string]any{ "useStreaming": false, "minReadBatchSizeKB": 16, + "bitset": "", })) // TCPFirewallEgressThrottleConfig controls per-sandbox egress throttling via Firecracker's diff --git a/packages/shared/pkg/storage/header/metadata.go b/packages/shared/pkg/storage/header/metadata.go index 574dea78bf..2a41b280f2 100644 --- a/packages/shared/pkg/storage/header/metadata.go +++ b/packages/shared/pkg/storage/header/metadata.go @@ -112,6 +112,14 @@ type DiffMetadataBuilder struct { blockSize int64 } +func NewDiffMetadataBuilderFromDirtyBitSet(_, blockSize int64, dirty *bitset.BitSet) *DiffMetadataBuilder { + return &DiffMetadataBuilder{ + dirty: dirty, + empty: bitset.New(0), + blockSize: blockSize, + } +} + func NewDiffMetadataBuilder(size, blockSize int64) *DiffMetadataBuilder { return &DiffMetadataBuilder{ // TODO: We might be able to start with 0 as preallocating here actually takes space. diff --git a/tests/integration/internal/tests/orchestrator/filesystem_pause_resume_integrity_test.go b/tests/integration/internal/tests/orchestrator/filesystem_pause_resume_integrity_test.go index 4be2314e3a..b8c1ab0ec3 100644 --- a/tests/integration/internal/tests/orchestrator/filesystem_pause_resume_integrity_test.go +++ b/tests/integration/internal/tests/orchestrator/filesystem_pause_resume_integrity_test.go @@ -192,7 +192,7 @@ PY`, filePath)) expectedHash := exec(`sha256sum "` + filePath + `" | awk '{print $1}'`) expectedSize := exec(`stat -c %s "` + filePath + `"`) - cycles := getenvInt(t, "TESTS_FS_INTEGRITY_TRUNCATE_CYCLES", 1) + cycles := getenvInt(t, "TESTS_FS_INTEGRITY_TRUNCATE_CYCLES", 10) for i := range cycles { pause()