Skip to content

Commit 6b71c54

Browse files
committed
Merge branch 'lev-block-cache-bitmap' of github.com:e2b-dev/infra into lev-block-cache-bitmap
2 parents f8e5216 + 791950e commit 6b71c54

File tree

5 files changed

+160
-55
lines changed

5 files changed

+160
-55
lines changed

packages/orchestrator/pkg/sandbox/block/cache_dirty_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,9 +282,9 @@ func TestSetIsCached_ConcurrentOverlapping(t *testing.T) {
282282

283283
const (
284284
benchBlockSize int64 = 4096
285-
benchNumBlocks int64 = 16384 // 64 MiB at 4K blocks — realistic memfile size
285+
benchNumBlocks int64 = 16384 // 64 MiB at 4K blocks — realistic memfile size
286286
benchCacheSize int64 = benchNumBlocks * benchBlockSize
287-
benchChunkSize int64 = 4 * 1024 * 1024 // 4 MiB — MemoryChunkSize
287+
benchChunkSize int64 = 4 * 1024 * 1024 // 4 MiB — MemoryChunkSize
288288
benchChunkCount int64 = benchCacheSize / benchChunkSize
289289
)
290290

packages/orchestrator/pkg/sandbox/block/chunk.go

Lines changed: 68 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"strconv"
99

1010
"go.opentelemetry.io/otel/attribute"
11+
"go.opentelemetry.io/otel/metric"
1112
"go.uber.org/zap"
1213
"golang.org/x/sync/errgroup"
1314
"golang.org/x/sync/singleflight"
@@ -17,8 +18,67 @@ import (
1718
"github.com/e2b-dev/infra/packages/shared/pkg/logger"
1819
"github.com/e2b-dev/infra/packages/shared/pkg/storage"
1920
"github.com/e2b-dev/infra/packages/shared/pkg/storage/header"
21+
"github.com/e2b-dev/infra/packages/shared/pkg/telemetry"
2022
)
2123

24+
const (
25+
pullType = "pull-type"
26+
pullTypeLocal = "local"
27+
pullTypeRemote = "remote"
28+
29+
failureReason = "failure-reason"
30+
31+
failureTypeLocalRead = "local-read"
32+
failureTypeLocalReadAgain = "local-read-again"
33+
failureTypeRemoteRead = "remote-read"
34+
failureTypeCacheFetch = "cache-fetch"
35+
)
36+
37+
type precomputedAttrs struct {
38+
successFromCache metric.MeasurementOption
39+
successFromRemote metric.MeasurementOption
40+
41+
failCacheRead metric.MeasurementOption
42+
failRemoteFetch metric.MeasurementOption
43+
failLocalReadAgain metric.MeasurementOption
44+
45+
// RemoteReads timer (runFetch)
46+
remoteSuccess metric.MeasurementOption
47+
remoteFailure metric.MeasurementOption
48+
}
49+
50+
var chunkerAttrs = precomputedAttrs{
51+
successFromCache: telemetry.PrecomputeAttrs(
52+
telemetry.Success,
53+
attribute.String(pullType, pullTypeLocal)),
54+
55+
successFromRemote: telemetry.PrecomputeAttrs(
56+
telemetry.Success,
57+
attribute.String(pullType, pullTypeRemote)),
58+
59+
failCacheRead: telemetry.PrecomputeAttrs(
60+
telemetry.Failure,
61+
attribute.String(pullType, pullTypeLocal),
62+
attribute.String(failureReason, failureTypeLocalRead)),
63+
64+
failRemoteFetch: telemetry.PrecomputeAttrs(
65+
telemetry.Failure,
66+
attribute.String(pullType, pullTypeRemote),
67+
attribute.String(failureReason, failureTypeCacheFetch)),
68+
69+
failLocalReadAgain: telemetry.PrecomputeAttrs(
70+
telemetry.Failure,
71+
attribute.String(pullType, pullTypeLocal),
72+
attribute.String(failureReason, failureTypeLocalReadAgain)),
73+
74+
remoteSuccess: telemetry.PrecomputeAttrs(
75+
telemetry.Success),
76+
77+
remoteFailure: telemetry.PrecomputeAttrs(
78+
telemetry.Failure,
79+
attribute.String(failureReason, failureTypeRemoteRead)),
80+
}
81+
2282
// Chunker is the interface satisfied by both FullFetchChunker and StreamingChunker.
2383
type Chunker interface {
2484
Slice(ctx context.Context, off, length int64) ([]byte, error)
@@ -125,40 +185,32 @@ func (c *FullFetchChunker) Slice(ctx context.Context, off, length int64) ([]byte
125185

126186
b, err := c.cache.Slice(off, length)
127187
if err == nil {
128-
timer.Success(ctx, length,
129-
attribute.String(pullType, pullTypeLocal))
188+
timer.RecordRaw(ctx, length, chunkerAttrs.successFromCache)
130189

131190
return b, nil
132191
}
133192

134193
if !errors.As(err, &BytesNotAvailableError{}) {
135-
timer.Failure(ctx, length,
136-
attribute.String(pullType, pullTypeLocal),
137-
attribute.String(failureReason, failureTypeLocalRead))
194+
timer.RecordRaw(ctx, length, chunkerAttrs.failCacheRead)
138195

139196
return nil, fmt.Errorf("failed read from cache at offset %d: %w", off, err)
140197
}
141198

142199
chunkErr := c.fetchToCache(ctx, off, length)
143200
if chunkErr != nil {
144-
timer.Failure(ctx, length,
145-
attribute.String(pullType, pullTypeRemote),
146-
attribute.String(failureReason, failureTypeCacheFetch))
201+
timer.RecordRaw(ctx, length, chunkerAttrs.failRemoteFetch)
147202

148203
return nil, fmt.Errorf("failed to ensure data at %d-%d: %w", off, off+length, chunkErr)
149204
}
150205

151206
b, cacheErr := c.cache.Slice(off, length)
152207
if cacheErr != nil {
153-
timer.Failure(ctx, length,
154-
attribute.String(pullType, pullTypeLocal),
155-
attribute.String(failureReason, failureTypeLocalReadAgain))
208+
timer.RecordRaw(ctx, length, chunkerAttrs.failLocalReadAgain)
156209

157210
return nil, fmt.Errorf("failed to read from cache after ensuring data at %d-%d: %w", off, off+length, cacheErr)
158211
}
159212

160-
timer.Success(ctx, length,
161-
attribute.String(pullType, pullTypeRemote))
213+
timer.RecordRaw(ctx, length, chunkerAttrs.successFromRemote)
162214

163215
return b, nil
164216
}
@@ -210,24 +262,20 @@ func (c *FullFetchChunker) fetchToCache(ctx context.Context, off, length int64)
210262

211263
readBytes, err := c.base.ReadAt(ctx, b, fetchOff)
212264
if err != nil {
213-
fetchSW.Failure(ctx, int64(readBytes),
214-
attribute.String(failureReason, failureTypeRemoteRead),
215-
)
265+
fetchSW.RecordRaw(ctx, int64(readBytes), chunkerAttrs.remoteFailure)
216266

217267
return nil, fmt.Errorf("failed to read chunk from base %d: %w", fetchOff, err)
218268
}
219269

220270
if readBytes != len(b) {
221-
fetchSW.Failure(ctx, int64(readBytes),
222-
attribute.String(failureReason, failureTypeRemoteRead),
223-
)
271+
fetchSW.RecordRaw(ctx, int64(readBytes), chunkerAttrs.remoteFailure)
224272

225273
return nil, fmt.Errorf("failed to read chunk from base %d: expected %d bytes, got %d bytes", fetchOff, len(b), readBytes)
226274
}
227275

228276
c.cache.setIsCached(fetchOff, int64(readBytes))
229277

230-
fetchSW.Success(ctx, int64(readBytes))
278+
fetchSW.RecordRaw(ctx, int64(readBytes), chunkerAttrs.remoteSuccess)
231279

232280
return nil, nil
233281
})
@@ -251,16 +299,3 @@ func (c *FullFetchChunker) Close() error {
251299
func (c *FullFetchChunker) FileSize() (int64, error) {
252300
return c.cache.FileSize()
253301
}
254-
255-
const (
256-
pullType = "pull-type"
257-
pullTypeLocal = "local"
258-
pullTypeRemote = "remote"
259-
260-
failureReason = "failure-reason"
261-
262-
failureTypeLocalRead = "local-read"
263-
failureTypeLocalReadAgain = "local-read-again"
264-
failureTypeRemoteRead = "remote-read"
265-
failureTypeCacheFetch = "cache-fetch"
266-
)
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package block
2+
3+
import (
4+
"context"
5+
"path/filepath"
6+
"testing"
7+
8+
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
9+
10+
blockmetrics "github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/block/metrics"
11+
)
12+
13+
const (
14+
cbBlockSize int64 = 4096
15+
cbNumBlocks int64 = 16384 // 64 MiB
16+
cbCacheSize int64 = cbNumBlocks * cbBlockSize
17+
cbChunkSize int64 = 4 * 1024 * 1024 // 4 MiB — MemoryChunkSize
18+
cbChunkCount int64 = cbCacheSize / cbChunkSize
19+
)
20+
21+
// BenchmarkChunkerSlice_CacheHit benchmarks the full FullFetchChunker.Slice
22+
// hot path on a cache hit: bitmap check + mmap slice return + OTEL
23+
// timer.Success with attribute construction.
24+
func BenchmarkChunkerSlice_CacheHit(b *testing.B) {
25+
provider := sdkmetric.NewMeterProvider()
26+
b.Cleanup(func() { provider.Shutdown(context.Background()) })
27+
28+
m, err := blockmetrics.NewMetrics(provider)
29+
if err != nil {
30+
b.Fatal(err)
31+
}
32+
33+
chunker, err := NewFullFetchChunker(
34+
cbCacheSize, cbBlockSize,
35+
nil, // base is never called on cache hit
36+
filepath.Join(b.TempDir(), "cache"),
37+
m,
38+
)
39+
if err != nil {
40+
b.Fatal(err)
41+
}
42+
b.Cleanup(func() { chunker.Close() })
43+
44+
// Pre-populate the cache so every Slice hits.
45+
chunker.cache.setIsCached(0, cbCacheSize)
46+
47+
ctx := context.Background()
48+
49+
b.ResetTimer()
50+
for i := range b.N {
51+
off := int64(i%int(cbChunkCount)) * cbChunkSize
52+
s, sliceErr := chunker.Slice(ctx, off, cbChunkSize)
53+
if sliceErr != nil {
54+
b.Fatal(sliceErr)
55+
}
56+
if len(s) == 0 {
57+
b.Fatal("empty slice")
58+
}
59+
}
60+
}

packages/orchestrator/pkg/sandbox/block/streaming_chunk.go

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"sync/atomic"
1212
"time"
1313

14-
"go.opentelemetry.io/otel/attribute"
1514
"golang.org/x/sync/errgroup"
1615

1716
"github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/block/metrics"
@@ -227,16 +226,13 @@ func (c *StreamingChunker) Slice(ctx context.Context, off, length int64) ([]byte
227226
// Fast path: already cached
228227
b, err := c.cache.Slice(off, length)
229228
if err == nil {
230-
timer.Success(ctx, length,
231-
attribute.String(pullType, pullTypeLocal))
229+
timer.RecordRaw(ctx, length, chunkerAttrs.successFromCache)
232230

233231
return b, nil
234232
}
235233

236234
if !errors.As(err, &BytesNotAvailableError{}) {
237-
timer.Failure(ctx, length,
238-
attribute.String(pullType, pullTypeLocal),
239-
attribute.String(failureReason, failureTypeLocalRead))
235+
timer.RecordRaw(ctx, length, chunkerAttrs.failCacheRead)
240236

241237
return nil, fmt.Errorf("failed read from cache at offset %d: %w", off, err)
242238
}
@@ -269,24 +265,19 @@ func (c *StreamingChunker) Slice(ctx context.Context, off, length int64) ([]byte
269265
}
270266

271267
if err := eg.Wait(); err != nil {
272-
timer.Failure(ctx, length,
273-
attribute.String(pullType, pullTypeRemote),
274-
attribute.String(failureReason, failureTypeCacheFetch))
268+
timer.RecordRaw(ctx, length, chunkerAttrs.failRemoteFetch)
275269

276270
return nil, fmt.Errorf("failed to ensure data at %d-%d: %w", off, off+length, err)
277271
}
278272

279273
b, cacheErr := c.cache.Slice(off, length)
280274
if cacheErr != nil {
281-
timer.Failure(ctx, length,
282-
attribute.String(pullType, pullTypeLocal),
283-
attribute.String(failureReason, failureTypeLocalReadAgain))
275+
timer.RecordRaw(ctx, length, chunkerAttrs.failLocalReadAgain)
284276

285277
return nil, fmt.Errorf("failed to read from cache after ensuring data at %d-%d: %w", off, off+length, cacheErr)
286278
}
287279

288-
timer.Success(ctx, length,
289-
attribute.String(pullType, pullTypeRemote))
280+
timer.RecordRaw(ctx, length, chunkerAttrs.successFromRemote)
290281

291282
return b, nil
292283
}
@@ -386,15 +377,14 @@ func (c *StreamingChunker) runFetch(ctx context.Context, s *fetchSession) {
386377

387378
err = c.progressiveRead(ctx, s, mmapSlice)
388379
if err != nil {
389-
fetchTimer.Failure(ctx, s.chunkLen,
390-
attribute.String(failureReason, failureTypeRemoteRead))
380+
fetchTimer.RecordRaw(ctx, s.chunkLen, chunkerAttrs.remoteFailure)
391381

392382
s.setError(err, false)
393383

394384
return
395385
}
396386

397-
fetchTimer.Success(ctx, s.chunkLen)
387+
fetchTimer.RecordRaw(ctx, s.chunkLen, chunkerAttrs.remoteSuccess)
398388
s.setDone()
399389
}
400390

packages/shared/pkg/telemetry/meters.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,12 @@ const (
411411
resultTypeFailure = "failure"
412412
)
413413

414+
var (
415+
// Pre-allocated result attributes for use with PrecomputeAttrs.
416+
Success = attribute.String(resultAttr, resultTypeSuccess)
417+
Failure = attribute.String(resultAttr, resultTypeFailure)
418+
)
419+
414420
func (t Stopwatch) Success(ctx context.Context, total int64, kv ...attribute.KeyValue) {
415421
t.end(ctx, resultTypeSuccess, total, kv...)
416422
}
@@ -422,9 +428,23 @@ func (t Stopwatch) Failure(ctx context.Context, total int64, kv ...attribute.Key
422428
func (t Stopwatch) end(ctx context.Context, result string, total int64, kv ...attribute.KeyValue) {
423429
kv = append(kv, attribute.KeyValue{Key: resultAttr, Value: attribute.StringValue(result)})
424430
kv = append(t.kv, kv...)
431+
opt := metric.WithAttributeSet(attribute.NewSet(kv...))
432+
t.RecordRaw(ctx, total, opt)
433+
}
434+
435+
// PrecomputeAttrs builds a reusable MeasurementOption from the given attribute
436+
// key-values. The option must include all attributes (including "result").
437+
// Use with Stopwatch.Record to avoid per-call attribute allocation.
438+
func PrecomputeAttrs(kv ...attribute.KeyValue) metric.MeasurementOption {
439+
return metric.WithAttributeSet(attribute.NewSet(kv...))
440+
}
425441

442+
// RecordRaw records an operation using a precomputed attribute option, it does
443+
// not include any previous attributes passed at Begin(). Zero-allocation
444+
// alternative to Success/Failure for hot paths.
445+
func (t Stopwatch) RecordRaw(ctx context.Context, total int64, precomputedAttrs metric.MeasurementOption) {
426446
amount := time.Since(t.start).Milliseconds()
427-
t.histogram.Record(ctx, amount, metric.WithAttributes(kv...))
428-
t.sum.Add(ctx, total, metric.WithAttributes(kv...))
429-
t.count.Add(ctx, 1, metric.WithAttributes(kv...))
447+
t.histogram.Record(ctx, amount, precomputedAttrs)
448+
t.sum.Add(ctx, total, precomputedAttrs)
449+
t.count.Add(ctx, 1, precomputedAttrs)
430450
}

0 commit comments

Comments
 (0)