Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 33 additions & 68 deletions packages/orchestrator/pkg/sandbox/block/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"strconv"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/singleflight"
Expand All @@ -18,67 +17,8 @@ import (
"github.com/e2b-dev/infra/packages/shared/pkg/logger"
"github.com/e2b-dev/infra/packages/shared/pkg/storage"
"github.com/e2b-dev/infra/packages/shared/pkg/storage/header"
"github.com/e2b-dev/infra/packages/shared/pkg/telemetry"
)

const (
pullType = "pull-type"
pullTypeLocal = "local"
pullTypeRemote = "remote"

failureReason = "failure-reason"

failureTypeLocalRead = "local-read"
failureTypeLocalReadAgain = "local-read-again"
failureTypeRemoteRead = "remote-read"
failureTypeCacheFetch = "cache-fetch"
)

type precomputedAttrs struct {
successFromCache metric.MeasurementOption
successFromRemote metric.MeasurementOption

failCacheRead metric.MeasurementOption
failRemoteFetch metric.MeasurementOption
failLocalReadAgain metric.MeasurementOption

// RemoteReads timer (runFetch)
remoteSuccess metric.MeasurementOption
remoteFailure metric.MeasurementOption
}

var chunkerAttrs = precomputedAttrs{
successFromCache: telemetry.PrecomputeAttrs(
telemetry.Success,
attribute.String(pullType, pullTypeLocal)),

successFromRemote: telemetry.PrecomputeAttrs(
telemetry.Success,
attribute.String(pullType, pullTypeRemote)),

failCacheRead: telemetry.PrecomputeAttrs(
telemetry.Failure,
attribute.String(pullType, pullTypeLocal),
attribute.String(failureReason, failureTypeLocalRead)),

failRemoteFetch: telemetry.PrecomputeAttrs(
telemetry.Failure,
attribute.String(pullType, pullTypeRemote),
attribute.String(failureReason, failureTypeCacheFetch)),

failLocalReadAgain: telemetry.PrecomputeAttrs(
telemetry.Failure,
attribute.String(pullType, pullTypeLocal),
attribute.String(failureReason, failureTypeLocalReadAgain)),

remoteSuccess: telemetry.PrecomputeAttrs(
telemetry.Success),

remoteFailure: telemetry.PrecomputeAttrs(
telemetry.Failure,
attribute.String(failureReason, failureTypeRemoteRead)),
}

// Chunker is the interface satisfied by both FullFetchChunker and StreamingChunker.
type Chunker interface {
Slice(ctx context.Context, off, length int64) ([]byte, error)
Expand Down Expand Up @@ -185,32 +125,40 @@ func (c *FullFetchChunker) Slice(ctx context.Context, off, length int64) ([]byte

b, err := c.cache.Slice(off, length)
if err == nil {
timer.RecordRaw(ctx, length, chunkerAttrs.successFromCache)
timer.Success(ctx, length,
attribute.String(pullType, pullTypeLocal))

Comment on lines 125 to 130
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟑 The benchmark chunk_bench_test.go was deleted with the stated justification that it 'relied on the removed PrecomputeAttrs API', but this is factually incorrect β€” the file never imported the telemetry package and never called PrecomputeAttrs, RecordRaw, or referenced chunkerAttrs. All four APIs it used (NewFullFetchChunker, cache.setIsCached, chunker.Slice, chunker.Close) still exist unchanged. Consider restoring the benchmark, especially since this PR changes the Slice cache-hit hot-path from a zero-allocation precomputed MeasurementOption to three metric.WithAttributes calls per invocation.

Extended reasoning...

What the bug is

The PR description states chunk_bench_test.go was deleted because it "relied on the removed PrecomputeAttrs API." This is factually incorrect. Examining the deleted file's content from the diff shows its imports are exclusively: context, path/filepath, testing, go.opentelemetry.io/otel/sdk/metric, and github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/block/metrics. The telemetry package is never imported. The functions PrecomputeAttrs, RecordRaw, and the chunkerAttrs variable are never referenced anywhere in the file.

The specific code path

The benchmark exercises four operations on FullFetchChunker:

  1. NewFullFetchChunker(cbCacheSize, cbBlockSize, nil, filepath.Join(b.TempDir(), "cache"), m) β€” signature is unchanged in the new code
  2. chunker.cache.setIsCached(0, cbCacheSize) β€” cache field and setIsCached method still exist (same package, unexported but accessible)
  3. chunker.Slice(ctx, off, cbChunkSize) β€” still exists with the same signature
  4. chunker.Close() β€” still exists

The benchmark would compile and run correctly against the refactored code without any modifications.

Why existing code doesn't prevent this

The PR's stated justification is simply a factual error. The benchmark comment even says it tests "timer.Success with attribute construction" β€” which is precisely what the new code path does (inline attribute.String(pullType, pullTypeLocal) construction inside timer.Success). The benchmark was more relevant after this change than before, not less.

What the impact is

The PR introduces a performance characteristic change in the FullFetchChunker.Slice cache-hit path: previously a single precomputed metric.MeasurementOption (one attribute.NewSet computed once at init time, zero per-call allocation) was used; now three metric.WithAttributes(kv...) calls are made per Slice invocation (each calling attribute.NewSet at call time). The benchmark explicitly measures "the full FullFetchChunker.Slice hot path on a cache hit: bitmap check + mmap slice return + OTEL timer.Success with attribute construction." Its deletion removes the only automated way to detect or quantify this allocation change and any future regressions on this hot path.

How to fix it

Restore chunk_bench_test.go β€” it requires zero changes to compile and run against the new code.

Step-by-step proof

  1. The deleted file's import block (visible in the diff): context, path/filepath, testing, sdkmetric, blockmetrics. No telemetry import. βœ“
  2. Search the deleted file for PrecomputeAttrs: not found. Search for RecordRaw: not found. Search for chunkerAttrs: not found. βœ“
  3. NewFullFetchChunker in the new chunk.go (line ~72): signature func NewFullFetchChunker(size, blockSize int64, base storage.SeekableReader, cachePath string, metrics metrics.Metrics) (*FullFetchChunker, error) β€” identical to what the benchmark calls. βœ“
  4. cache field on FullFetchChunker (line ~63 in new code): cache *Cache β€” still present. setIsCached on *Cache: called in the new code at multiple points, still exists. βœ“
  5. Conclusion: the benchmark would have compiled and passed with go test -bench=BenchmarkChunkerSlice_CacheHit ./packages/orchestrator/pkg/sandbox/block/ unchanged.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed β€” benchmark restored in follow-up commit. It compiles against the refactored code without modifications.

return b, nil
}

if !errors.As(err, &BytesNotAvailableError{}) {
timer.RecordRaw(ctx, length, chunkerAttrs.failCacheRead)
timer.Failure(ctx, length,
attribute.String(pullType, pullTypeLocal),
attribute.String(failureReason, failureTypeLocalRead))

return nil, fmt.Errorf("failed read from cache at offset %d: %w", off, err)
}

chunkErr := c.fetchToCache(ctx, off, length)
if chunkErr != nil {
timer.RecordRaw(ctx, length, chunkerAttrs.failRemoteFetch)
timer.Failure(ctx, length,
attribute.String(pullType, pullTypeRemote),
attribute.String(failureReason, failureTypeCacheFetch))

return nil, fmt.Errorf("failed to ensure data at %d-%d: %w", off, off+length, chunkErr)
}

b, cacheErr := c.cache.Slice(off, length)
if cacheErr != nil {
timer.RecordRaw(ctx, length, chunkerAttrs.failLocalReadAgain)
timer.Failure(ctx, length,
attribute.String(pullType, pullTypeLocal),
attribute.String(failureReason, failureTypeLocalReadAgain))

return nil, fmt.Errorf("failed to read from cache after ensuring data at %d-%d: %w", off, off+length, cacheErr)
}

timer.RecordRaw(ctx, length, chunkerAttrs.successFromRemote)
timer.Success(ctx, length,
attribute.String(pullType, pullTypeRemote))

return b, nil
}
Expand Down Expand Up @@ -262,20 +210,24 @@ func (c *FullFetchChunker) fetchToCache(ctx context.Context, off, length int64)

readBytes, err := c.base.ReadAt(ctx, b, fetchOff)
if err != nil {
fetchSW.RecordRaw(ctx, int64(readBytes), chunkerAttrs.remoteFailure)
fetchSW.Failure(ctx, int64(readBytes),
attribute.String(failureReason, failureTypeRemoteRead),
)

return nil, fmt.Errorf("failed to read chunk from base %d: %w", fetchOff, err)
}

if readBytes != len(b) {
fetchSW.RecordRaw(ctx, int64(readBytes), chunkerAttrs.remoteFailure)
fetchSW.Failure(ctx, int64(readBytes),
attribute.String(failureReason, failureTypeRemoteRead),
)

return nil, fmt.Errorf("failed to read chunk from base %d: expected %d bytes, got %d bytes", fetchOff, len(b), readBytes)
}

c.cache.setIsCached(fetchOff, int64(readBytes))

fetchSW.RecordRaw(ctx, int64(readBytes), chunkerAttrs.remoteSuccess)
fetchSW.Success(ctx, int64(readBytes))

return nil, nil
})
Expand All @@ -299,3 +251,16 @@ func (c *FullFetchChunker) Close() error {
func (c *FullFetchChunker) FileSize() (int64, error) {
return c.cache.FileSize()
}

const (
pullType = "pull-type"
pullTypeLocal = "local"
pullTypeRemote = "remote"

failureReason = "failure-reason"

failureTypeLocalRead = "local-read"
failureTypeLocalReadAgain = "local-read-again"
failureTypeRemoteRead = "remote-read"
failureTypeCacheFetch = "cache-fetch"
)
24 changes: 17 additions & 7 deletions packages/orchestrator/pkg/sandbox/block/streaming_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync/atomic"
"time"

"go.opentelemetry.io/otel/attribute"
"golang.org/x/sync/errgroup"

"github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/block/metrics"
Expand Down Expand Up @@ -226,13 +227,16 @@ func (c *StreamingChunker) Slice(ctx context.Context, off, length int64) ([]byte
// Fast path: already cached
b, err := c.cache.Slice(off, length)
if err == nil {
timer.RecordRaw(ctx, length, chunkerAttrs.successFromCache)
timer.Success(ctx, length,
attribute.String(pullType, pullTypeLocal))

return b, nil
}

if !errors.As(err, &BytesNotAvailableError{}) {
timer.RecordRaw(ctx, length, chunkerAttrs.failCacheRead)
timer.Failure(ctx, length,
attribute.String(pullType, pullTypeLocal),
attribute.String(failureReason, failureTypeLocalRead))

return nil, fmt.Errorf("failed read from cache at offset %d: %w", off, err)
}
Expand Down Expand Up @@ -265,19 +269,24 @@ func (c *StreamingChunker) Slice(ctx context.Context, off, length int64) ([]byte
}

if err := eg.Wait(); err != nil {
timer.RecordRaw(ctx, length, chunkerAttrs.failRemoteFetch)
timer.Failure(ctx, length,
attribute.String(pullType, pullTypeRemote),
attribute.String(failureReason, failureTypeCacheFetch))

return nil, fmt.Errorf("failed to ensure data at %d-%d: %w", off, off+length, err)
}

b, cacheErr := c.cache.Slice(off, length)
if cacheErr != nil {
timer.RecordRaw(ctx, length, chunkerAttrs.failLocalReadAgain)
timer.Failure(ctx, length,
attribute.String(pullType, pullTypeLocal),
attribute.String(failureReason, failureTypeLocalReadAgain))

return nil, fmt.Errorf("failed to read from cache after ensuring data at %d-%d: %w", off, off+length, cacheErr)
}

timer.RecordRaw(ctx, length, chunkerAttrs.successFromRemote)
timer.Success(ctx, length,
attribute.String(pullType, pullTypeRemote))

return b, nil
}
Expand Down Expand Up @@ -377,14 +386,15 @@ func (c *StreamingChunker) runFetch(ctx context.Context, s *fetchSession) {

err = c.progressiveRead(ctx, s, mmapSlice)
if err != nil {
fetchTimer.RecordRaw(ctx, s.chunkLen, chunkerAttrs.remoteFailure)
fetchTimer.Failure(ctx, s.chunkLen,
attribute.String(failureReason, failureTypeRemoteRead))

s.setError(err, false)

return
}

fetchTimer.RecordRaw(ctx, s.chunkLen, chunkerAttrs.remoteSuccess)
fetchTimer.Success(ctx, s.chunkLen)
s.setDone()
}

Expand Down
25 changes: 3 additions & 22 deletions packages/shared/pkg/telemetry/meters.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,12 +411,6 @@ const (
resultTypeFailure = "failure"
)

var (
// Pre-allocated result attributes for use with PrecomputeAttrs.
Success = attribute.String(resultAttr, resultTypeSuccess)
Failure = attribute.String(resultAttr, resultTypeFailure)
)

func (t Stopwatch) Success(ctx context.Context, total int64, kv ...attribute.KeyValue) {
t.end(ctx, resultTypeSuccess, total, kv...)
}
Expand All @@ -429,22 +423,9 @@ func (t Stopwatch) end(ctx context.Context, result string, total int64, kv ...at
kv = append(kv, attribute.KeyValue{Key: resultAttr, Value: attribute.StringValue(result)})
kv = append(t.kv, kv...)
opt := metric.WithAttributeSet(attribute.NewSet(kv...))
t.RecordRaw(ctx, total, opt)
}

// PrecomputeAttrs builds a reusable MeasurementOption from the given attribute
// key-values. The option must include all attributes (including "result").
// Use with Stopwatch.Record to avoid per-call attribute allocation.
func PrecomputeAttrs(kv ...attribute.KeyValue) metric.MeasurementOption {
return metric.WithAttributeSet(attribute.NewSet(kv...))
}

// RecordRaw records an operation using a precomputed attribute option, it does
// not include any previous attributes passed at Begin(). Zero-allocation
// alternative to Success/Failure for hot paths.
func (t Stopwatch) RecordRaw(ctx context.Context, total int64, precomputedAttrs metric.MeasurementOption) {
amount := time.Since(t.start).Milliseconds()
t.histogram.Record(ctx, amount, precomputedAttrs)
t.sum.Add(ctx, total, precomputedAttrs)
t.count.Add(ctx, 1, precomputedAttrs)
t.histogram.Record(ctx, amount, opt)
t.sum.Add(ctx, total, opt)
Comment on lines 427 to +429
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟑 The new end() method calls metric.WithAttributes(kv...) three separate times β€” once each for histogram.Record, sum.Add, and count.Add β€” causing the OTel SDK to independently sort and deduplicate the key-value slice (via attribute.NewSet) three times per recording instead of once. The fix is to compute opt := metric.WithAttributeSet(attribute.NewSet(kv...)) once before the three calls and reuse it.

Extended reasoning...

What the bug is and how it manifests

In the refactored Stopwatch.end() method (meters.go lines 427-429), metric.WithAttributes(kv...) is called three independent times β€” once for t.histogram.Record, once for t.sum.Add, and once for t.count.Add. The OTel Go SDK's metric.WithAttributes creates an attributeOption that wraps raw key-values; when each instrument processes its MeasurementOption, it internally calls attribute.NewSet(attrs...) to sort and deduplicate the slice, allocating a distinct attribute.Set each time. The result is 3x attribute-set construction per end() invocation instead of 1x.

The specific code path that triggers it

Every call to FullFetchChunker.Slice and StreamingChunker.Slice invokes either timer.Success or timer.Failure, both of which delegate to end(). Additionally, the remote-read inner timer (fetchSW / fetchTimer) goes through the same path. These are block-IO hot paths executed on every chunk read.

Why existing code does not prevent it

The previous end() computed the attribute set once: opt := metric.WithAttributeSet(attribute.NewSet(kv...)) and then called RecordRaw(ctx, total, opt), which passed that single pre-built MeasurementOption to all three instrument calls. RecordRaw was explicitly documented as a zero-allocation alternative for hot paths. This PR removes RecordRaw and PrecomputeAttrs to simplify the API, but the simplified end() does not preserve the single-allocation property.

What the impact is

Each attribute-set construction for a typical 2-4 KV slice requires sorting, deduplication, and a heap allocation. At 3x per recording on the chunker hot path, this is measurable under high block-IO throughput. The deleted benchmark (BenchmarkChunkerSlice_CacheHit) was specifically designed to expose this cost; its removal means the regression is now invisible to CI.

How to fix it

Compute the option once before the three instrument calls inside end():

opt := metric.WithAttributeSet(attribute.NewSet(kv...))
amount := time.Since(t.start).Milliseconds()
t.histogram.Record(ctx, amount, opt)
t.sum.Add(ctx, total, opt)
t.count.Add(ctx, 1, opt)

This restores 1x attribute-set construction per recording without requiring any change to callers or re-adding RecordRaw/PrecomputeAttrs.

Step-by-step proof

  1. FullFetchChunker.Slice calls timer.Success(ctx, length, attribute.String("pull-type", "local")) on a cache hit.
  2. Success delegates to end(ctx, "success", length, attribute.String("pull-type", "local")).
  3. end appends result=success and the Stopwatch base kv, producing [{pull-type, local}, {result, success}].
  4. t.histogram.Record(ctx, amount, metric.WithAttributes(kv...)) β€” OTel SDK builds attribute.NewSet(kv...) internally. Allocation Deploy infra in a new projectΒ #1.
  5. t.sum.Add(ctx, total, metric.WithAttributes(kv...)) β€” OTel SDK builds attribute.NewSet(kv...) again. Allocation Bump golang.org/x/net from 0.2.0 to 0.7.0 in /packages/apiΒ #2.
  6. t.count.Add(ctx, 1, metric.WithAttributes(kv...)) β€” OTel SDK builds attribute.NewSet(kv...) a third time. Allocation Bump github.com/gin-gonic/gin from 1.8.1 to 1.9.1 in /packages/apiΒ #3.
  7. On every Slice call (cache hit or miss), this triple allocation occurs. The old path did this once.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed β€” same as above, end() now computes the attribute set once.

t.count.Add(ctx, 1, opt)
}
Loading