Skip to content

Commit 3f0daf4

Browse files
levbclaude
andcommitted
feat(block): add flat/roaring/sharded bitset implementations behind FF
Introduce three atomicbitset implementations behind a shared Bitset interface, selectable at runtime via the chunker-config feature flag ("bitset" key): roaring (default), and atomic (flat/sharded auto). - Flat: []atomic.Uint64, lock-free, for small bitmaps (≤64KB) - Sharded: two-level with lazy shard allocation, lock-free, for large sparse bitmaps - Roaring: compressed bitmap with RWMutex, default via FF - Iterator/UnsafeIterator split for safe vs caller-synchronized iteration - Conformance tests across all implementations with race detector Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 19a857a commit 3f0daf4

File tree

11 files changed

+704
-248
lines changed

11 files changed

+704
-248
lines changed

packages/orchestrator/pkg/sandbox/block/cache.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,11 @@ type Cache struct {
5454
closed atomic.Bool
5555
}
5656

57-
// When we are passing filePath that is a file that has content we want to server want to use dirtyFile = true.
5857
func NewCache(size, blockSize int64, filePath string, dirtyFile bool) (*Cache, error) {
58+
return newCache(size, blockSize, filePath, dirtyFile, "")
59+
}
60+
61+
func newCache(size, blockSize int64, filePath string, dirtyFile bool, bitsetImpl string) (*Cache, error) {
5962
f, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0o644)
6063
if err != nil {
6164
return nil, fmt.Errorf("error opening file: %w", err)
@@ -95,7 +98,7 @@ func NewCache(size, blockSize int64, filePath string, dirtyFile bool) (*Cache, e
9598
size: size,
9699
blockSize: blockSize,
97100
dirtyFile: dirtyFile,
98-
dirty: atomicbitset.New(uint(numBlocks)),
101+
dirty: atomicbitset.New(uint(numBlocks), bitsetImpl),
99102
}, nil
100103
}
101104

@@ -295,6 +298,10 @@ func (c *Cache) WriteAtWithoutLock(b []byte, off int64) (int, error) {
295298

296299
// dirtySortedKeys returns a sorted list of dirty block offsets.
297300
func (c *Cache) dirtySortedKeys() []int64 {
301+
if c.dirty == nil {
302+
return nil
303+
}
304+
298305
var keys []int64
299306

300307
for i := range c.dirty.Iterator() {

packages/orchestrator/pkg/sandbox/block/chunk.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,17 +98,17 @@ func NewChunker(
9898
cachePath string,
9999
metrics metrics.Metrics,
100100
) (Chunker, error) {
101-
useStreaming, minReadBatchSizeKB := getChunkerConfig(ctx, featureFlags)
101+
useStreaming, minReadBatchSizeKB, bitsetImpl := getChunkerConfig(ctx, featureFlags)
102102

103103
if useStreaming {
104-
return NewStreamingChunker(size, blockSize, upstream, cachePath, metrics, int64(minReadBatchSizeKB)*1024, featureFlags)
104+
return newStreamingChunker(size, blockSize, upstream, cachePath, metrics, int64(minReadBatchSizeKB)*1024, featureFlags, bitsetImpl)
105105
}
106106

107-
return NewFullFetchChunker(size, blockSize, upstream, cachePath, metrics)
107+
return newFullFetchChunker(size, blockSize, upstream, cachePath, metrics, bitsetImpl)
108108
}
109109

110110
// getChunkerConfig fetches the chunker-config feature flag and returns the parsed values.
111-
func getChunkerConfig(ctx context.Context, ff *featureflags.Client) (useStreaming bool, minReadBatchSizeKB int) {
111+
func getChunkerConfig(ctx context.Context, ff *featureflags.Client) (useStreaming bool, minReadBatchSizeKB int, bitsetImpl string) {
112112
value := ff.JSONFlag(ctx, featureflags.ChunkerConfigFlag)
113113

114114
if v := value.GetByKey("useStreaming"); v.IsDefined() {
@@ -119,7 +119,11 @@ func getChunkerConfig(ctx context.Context, ff *featureflags.Client) (useStreamin
119119
minReadBatchSizeKB = v.IntValue()
120120
}
121121

122-
return useStreaming, minReadBatchSizeKB
122+
if v := value.GetByKey("bitset"); v.IsDefined() {
123+
bitsetImpl = v.StringValue()
124+
}
125+
126+
return useStreaming, minReadBatchSizeKB, bitsetImpl
123127
}
124128

125129
type FullFetchChunker struct {
@@ -138,7 +142,17 @@ func NewFullFetchChunker(
138142
cachePath string,
139143
metrics metrics.Metrics,
140144
) (*FullFetchChunker, error) {
141-
cache, err := NewCache(size, blockSize, cachePath, false)
145+
return newFullFetchChunker(size, blockSize, base, cachePath, metrics, "")
146+
}
147+
148+
func newFullFetchChunker(
149+
size, blockSize int64,
150+
base storage.SeekableReader,
151+
cachePath string,
152+
metrics metrics.Metrics,
153+
bitsetImpl string,
154+
) (*FullFetchChunker, error) {
155+
cache, err := newCache(size, blockSize, cachePath, false, bitsetImpl)
142156
if err != nil {
143157
return nil, fmt.Errorf("failed to create file cache: %w", err)
144158
}

packages/orchestrator/pkg/sandbox/block/streaming_chunk.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,19 @@ func NewStreamingChunker(
172172
minReadBatchSize int64,
173173
ff *featureflags.Client,
174174
) (*StreamingChunker, error) {
175-
cache, err := NewCache(size, blockSize, cachePath, false)
175+
return newStreamingChunker(size, blockSize, upstream, cachePath, metrics, minReadBatchSize, ff, "")
176+
}
177+
178+
func newStreamingChunker(
179+
size, blockSize int64,
180+
upstream storage.StreamingReader,
181+
cachePath string,
182+
metrics metrics.Metrics,
183+
minReadBatchSize int64,
184+
ff *featureflags.Client,
185+
bitsetImpl string,
186+
) (*StreamingChunker, error) {
187+
cache, err := newCache(size, blockSize, cachePath, false, bitsetImpl)
176188
if err != nil {
177189
return nil, fmt.Errorf("failed to create file cache: %w", err)
178190
}
@@ -441,7 +453,7 @@ func (c *StreamingChunker) progressiveRead(ctx context.Context, s *fetchSession,
441453
// it can be tuned without restarting the service.
442454
func (c *StreamingChunker) getMinReadBatchSize(ctx context.Context) int64 {
443455
if c.featureFlags != nil {
444-
_, minKB := getChunkerConfig(ctx, c.featureFlags)
456+
_, minKB, _ := getChunkerConfig(ctx, c.featureFlags)
445457
if minKB > 0 {
446458
return int64(minKB) * 1024
447459
}

packages/shared/go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ require (
1313
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.74
1414
github.com/aws/aws-sdk-go-v2/service/ecr v1.44.0
1515
github.com/aws/aws-sdk-go-v2/service/s3 v1.79.3
16-
github.com/bits-and-blooms/bitset v1.22.0
16+
github.com/bits-and-blooms/bitset v1.24.2
1717
github.com/bsm/redislock v0.9.4
1818
github.com/dchest/uniuri v1.2.0
1919
github.com/gin-gonic/gin v1.10.1
@@ -87,6 +87,7 @@ require (
8787
github.com/Masterminds/semver/v3 v3.4.0 // indirect
8888
github.com/Masterminds/sprig/v3 v3.3.0 // indirect
8989
github.com/Microsoft/go-winio v0.6.2 // indirect
90+
github.com/RoaringBitmap/roaring/v2 v2.16.0 // indirect
9091
github.com/Workiva/go-datastructures v1.1.6 // indirect
9192
github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b // indirect
9293
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
@@ -267,6 +268,7 @@ require (
267268
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
268269
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
269270
github.com/morikuni/aec v1.0.0 // indirect
271+
github.com/mschoch/smat v0.2.0 // indirect
270272
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
271273
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
272274
github.com/oklog/ulid v1.3.1 // indirect

packages/shared/go.sum

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 27 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -1,115 +1,40 @@
1-
// Package atomicbitset provides a fixed-size bitset with atomic set operations.
1+
// Package atomicbitset provides fixed-size bitset implementations.
2+
// All implementations are safe for concurrent HasRange and SetRange.
23
package atomicbitset
34

45
import (
6+
"fmt"
57
"iter"
6-
"math"
7-
"math/bits"
8-
"sync/atomic"
98
)
109

11-
// Bitset is a fixed-size bitset backed by atomic uint64 words.
12-
// SetRange uses atomic OR, so concurrent writers are safe without
13-
// external locking.
14-
//
15-
// A Bitset must not be copied after first use (copies share the
16-
// underlying array).
17-
type Bitset struct {
18-
words []atomic.Uint64
19-
n uint
10+
type Bitset interface {
11+
Has(i uint) bool
12+
HasRange(lo, hi uint) bool
13+
SetRange(lo, hi uint)
14+
Iterator() iter.Seq[uint]
15+
UnsafeIterator() iter.Seq[uint]
16+
Len() uint
2017
}
2118

22-
// New returns a Bitset with capacity for n bits, all initially zero.
23-
func New(n uint) Bitset {
24-
return Bitset{
25-
words: make([]atomic.Uint64, (n+63)/64),
26-
n: n,
27-
}
28-
}
29-
30-
// Len returns the capacity in bits.
31-
func (b *Bitset) Len() uint { return b.n }
32-
33-
// Has reports whether bit i is set. Out-of-range returns false.
34-
func (b *Bitset) Has(i uint) bool {
35-
if i >= b.n {
36-
return false
37-
}
38-
39-
return b.words[i/64].Load()&(1<<(i%64)) != 0
40-
}
19+
const (
20+
autoThreshold uint = 524_288 // 64 KB flat bitmap
4121

42-
// wordMask returns a bitmask covering bits [lo, hi) within a single uint64 word.
43-
func wordMask(lo, hi uint) uint64 {
44-
if hi-lo == 64 {
45-
return math.MaxUint64
46-
}
47-
48-
return ((1 << (hi - lo)) - 1) << lo
49-
}
50-
51-
// HasRange reports whether every bit in [lo, hi) is set.
52-
// An empty range returns true. hi is capped to Len().
53-
// Returns false if lo is out of range and the range is non-empty.
54-
func (b *Bitset) HasRange(lo, hi uint) bool {
55-
if lo >= hi {
56-
return true
57-
}
58-
if hi > b.n {
59-
hi = b.n
60-
}
61-
if lo >= hi {
62-
return false
63-
}
64-
for i := lo; i < hi; {
65-
w := i / 64
66-
bit := i % 64
67-
top := min(hi-w*64, 64)
68-
mask := wordMask(bit, top)
69-
70-
if b.words[w].Load()&mask != mask {
71-
return false
72-
}
73-
i = (w + 1) * 64
74-
}
75-
76-
return true
77-
}
78-
79-
// SetRange sets every bit in [lo, hi) using atomic OR.
80-
// hi is capped to Len().
81-
func (b *Bitset) SetRange(lo, hi uint) {
82-
if hi > b.n {
83-
hi = b.n
84-
}
85-
if lo >= hi {
86-
return
87-
}
88-
for i := lo; i < hi; {
89-
w := i / 64
90-
bit := i % 64
91-
top := min(hi-w*64, 64)
92-
93-
b.words[w].Or(wordMask(bit, top))
94-
95-
i = (w + 1) * 64
96-
}
97-
}
22+
// Valid impl values for New.
23+
BitsetDefault = ""
24+
BitsetRoaring = "roaring"
25+
BitsetAtomic = "atomic"
26+
)
9827

99-
// Iterator returns an iterator over the indices of set bits
100-
// in ascending order.
101-
func (b *Bitset) Iterator() iter.Seq[uint] {
102-
return func(yield func(uint) bool) {
103-
for wi := range b.words {
104-
word := b.words[wi].Load()
105-
base := uint(wi) * 64
106-
for word != 0 {
107-
tz := uint(bits.TrailingZeros64(word))
108-
if !yield(base + tz) {
109-
return
110-
}
111-
word &= word - 1
112-
}
28+
func New(n uint, impl string) Bitset {
29+
switch impl {
30+
case BitsetDefault, BitsetRoaring:
31+
return NewRoaring(n)
32+
case BitsetAtomic:
33+
if n <= autoThreshold {
34+
return NewFlat(n)
11335
}
36+
return NewSharded(n, DefaultShardBits)
37+
default:
38+
panic(fmt.Sprintf("atomicbitset: unknown implementation %q", impl))
11439
}
11540
}

0 commit comments

Comments
 (0)