diff --git a/packages/orchestrator/pkg/sandbox/block/chunk.go b/packages/orchestrator/pkg/sandbox/block/chunk.go index ad2017d2aa..c3f29ea15f 100644 --- a/packages/orchestrator/pkg/sandbox/block/chunk.go +++ b/packages/orchestrator/pkg/sandbox/block/chunk.go @@ -8,7 +8,6 @@ import ( "strconv" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" "go.uber.org/zap" "golang.org/x/sync/errgroup" "golang.org/x/sync/singleflight" @@ -18,67 +17,8 @@ import ( "github.com/e2b-dev/infra/packages/shared/pkg/logger" "github.com/e2b-dev/infra/packages/shared/pkg/storage" "github.com/e2b-dev/infra/packages/shared/pkg/storage/header" - "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" ) -const ( - pullType = "pull-type" - pullTypeLocal = "local" - pullTypeRemote = "remote" - - failureReason = "failure-reason" - - failureTypeLocalRead = "local-read" - failureTypeLocalReadAgain = "local-read-again" - failureTypeRemoteRead = "remote-read" - failureTypeCacheFetch = "cache-fetch" -) - -type precomputedAttrs struct { - successFromCache metric.MeasurementOption - successFromRemote metric.MeasurementOption - - failCacheRead metric.MeasurementOption - failRemoteFetch metric.MeasurementOption - failLocalReadAgain metric.MeasurementOption - - // RemoteReads timer (runFetch) - remoteSuccess metric.MeasurementOption - remoteFailure metric.MeasurementOption -} - -var chunkerAttrs = precomputedAttrs{ - successFromCache: telemetry.PrecomputeAttrs( - telemetry.Success, - attribute.String(pullType, pullTypeLocal)), - - successFromRemote: telemetry.PrecomputeAttrs( - telemetry.Success, - attribute.String(pullType, pullTypeRemote)), - - failCacheRead: telemetry.PrecomputeAttrs( - telemetry.Failure, - attribute.String(pullType, pullTypeLocal), - attribute.String(failureReason, failureTypeLocalRead)), - - failRemoteFetch: telemetry.PrecomputeAttrs( - telemetry.Failure, - attribute.String(pullType, pullTypeRemote), - attribute.String(failureReason, failureTypeCacheFetch)), - - failLocalReadAgain: telemetry.PrecomputeAttrs( - telemetry.Failure, - attribute.String(pullType, pullTypeLocal), - attribute.String(failureReason, failureTypeLocalReadAgain)), - - remoteSuccess: telemetry.PrecomputeAttrs( - telemetry.Success), - - remoteFailure: telemetry.PrecomputeAttrs( - telemetry.Failure, - attribute.String(failureReason, failureTypeRemoteRead)), -} - // Chunker is the interface satisfied by both FullFetchChunker and StreamingChunker. type Chunker interface { Slice(ctx context.Context, off, length int64) ([]byte, error) @@ -185,32 +125,40 @@ func (c *FullFetchChunker) Slice(ctx context.Context, off, length int64) ([]byte b, err := c.cache.Slice(off, length) if err == nil { - timer.RecordRaw(ctx, length, chunkerAttrs.successFromCache) + timer.Success(ctx, length, + attribute.String(pullType, pullTypeLocal)) return b, nil } if !errors.As(err, &BytesNotAvailableError{}) { - timer.RecordRaw(ctx, length, chunkerAttrs.failCacheRead) + timer.Failure(ctx, length, + attribute.String(pullType, pullTypeLocal), + attribute.String(failureReason, failureTypeLocalRead)) return nil, fmt.Errorf("failed read from cache at offset %d: %w", off, err) } chunkErr := c.fetchToCache(ctx, off, length) if chunkErr != nil { - timer.RecordRaw(ctx, length, chunkerAttrs.failRemoteFetch) + timer.Failure(ctx, length, + attribute.String(pullType, pullTypeRemote), + attribute.String(failureReason, failureTypeCacheFetch)) return nil, fmt.Errorf("failed to ensure data at %d-%d: %w", off, off+length, chunkErr) } b, cacheErr := c.cache.Slice(off, length) if cacheErr != nil { - timer.RecordRaw(ctx, length, chunkerAttrs.failLocalReadAgain) + timer.Failure(ctx, length, + attribute.String(pullType, pullTypeLocal), + attribute.String(failureReason, failureTypeLocalReadAgain)) return nil, fmt.Errorf("failed to read from cache after ensuring data at %d-%d: %w", off, off+length, cacheErr) } - timer.RecordRaw(ctx, length, chunkerAttrs.successFromRemote) + timer.Success(ctx, length, + attribute.String(pullType, pullTypeRemote)) return b, nil } @@ -262,20 +210,24 @@ func (c *FullFetchChunker) fetchToCache(ctx context.Context, off, length int64) readBytes, err := c.base.ReadAt(ctx, b, fetchOff) if err != nil { - fetchSW.RecordRaw(ctx, int64(readBytes), chunkerAttrs.remoteFailure) + fetchSW.Failure(ctx, int64(readBytes), + attribute.String(failureReason, failureTypeRemoteRead), + ) return nil, fmt.Errorf("failed to read chunk from base %d: %w", fetchOff, err) } if readBytes != len(b) { - fetchSW.RecordRaw(ctx, int64(readBytes), chunkerAttrs.remoteFailure) + fetchSW.Failure(ctx, int64(readBytes), + attribute.String(failureReason, failureTypeRemoteRead), + ) return nil, fmt.Errorf("failed to read chunk from base %d: expected %d bytes, got %d bytes", fetchOff, len(b), readBytes) } c.cache.setIsCached(fetchOff, int64(readBytes)) - fetchSW.RecordRaw(ctx, int64(readBytes), chunkerAttrs.remoteSuccess) + fetchSW.Success(ctx, int64(readBytes)) return nil, nil }) @@ -299,3 +251,16 @@ func (c *FullFetchChunker) Close() error { func (c *FullFetchChunker) FileSize() (int64, error) { return c.cache.FileSize() } + +const ( + pullType = "pull-type" + pullTypeLocal = "local" + pullTypeRemote = "remote" + + failureReason = "failure-reason" + + failureTypeLocalRead = "local-read" + failureTypeLocalReadAgain = "local-read-again" + failureTypeRemoteRead = "remote-read" + failureTypeCacheFetch = "cache-fetch" +) diff --git a/packages/orchestrator/pkg/sandbox/block/streaming_chunk.go b/packages/orchestrator/pkg/sandbox/block/streaming_chunk.go index 7e40b35c4e..956d71e0b3 100644 --- a/packages/orchestrator/pkg/sandbox/block/streaming_chunk.go +++ b/packages/orchestrator/pkg/sandbox/block/streaming_chunk.go @@ -11,6 +11,7 @@ import ( "sync/atomic" "time" + "go.opentelemetry.io/otel/attribute" "golang.org/x/sync/errgroup" "github.com/e2b-dev/infra/packages/orchestrator/pkg/sandbox/block/metrics" @@ -226,13 +227,16 @@ func (c *StreamingChunker) Slice(ctx context.Context, off, length int64) ([]byte // Fast path: already cached b, err := c.cache.Slice(off, length) if err == nil { - timer.RecordRaw(ctx, length, chunkerAttrs.successFromCache) + timer.Success(ctx, length, + attribute.String(pullType, pullTypeLocal)) return b, nil } if !errors.As(err, &BytesNotAvailableError{}) { - timer.RecordRaw(ctx, length, chunkerAttrs.failCacheRead) + timer.Failure(ctx, length, + attribute.String(pullType, pullTypeLocal), + attribute.String(failureReason, failureTypeLocalRead)) return nil, fmt.Errorf("failed read from cache at offset %d: %w", off, err) } @@ -265,19 +269,24 @@ func (c *StreamingChunker) Slice(ctx context.Context, off, length int64) ([]byte } if err := eg.Wait(); err != nil { - timer.RecordRaw(ctx, length, chunkerAttrs.failRemoteFetch) + timer.Failure(ctx, length, + attribute.String(pullType, pullTypeRemote), + attribute.String(failureReason, failureTypeCacheFetch)) return nil, fmt.Errorf("failed to ensure data at %d-%d: %w", off, off+length, err) } b, cacheErr := c.cache.Slice(off, length) if cacheErr != nil { - timer.RecordRaw(ctx, length, chunkerAttrs.failLocalReadAgain) + timer.Failure(ctx, length, + attribute.String(pullType, pullTypeLocal), + attribute.String(failureReason, failureTypeLocalReadAgain)) return nil, fmt.Errorf("failed to read from cache after ensuring data at %d-%d: %w", off, off+length, cacheErr) } - timer.RecordRaw(ctx, length, chunkerAttrs.successFromRemote) + timer.Success(ctx, length, + attribute.String(pullType, pullTypeRemote)) return b, nil } @@ -377,14 +386,15 @@ func (c *StreamingChunker) runFetch(ctx context.Context, s *fetchSession) { err = c.progressiveRead(ctx, s, mmapSlice) if err != nil { - fetchTimer.RecordRaw(ctx, s.chunkLen, chunkerAttrs.remoteFailure) + fetchTimer.Failure(ctx, s.chunkLen, + attribute.String(failureReason, failureTypeRemoteRead)) s.setError(err, false) return } - fetchTimer.RecordRaw(ctx, s.chunkLen, chunkerAttrs.remoteSuccess) + fetchTimer.Success(ctx, s.chunkLen) s.setDone() } diff --git a/packages/shared/pkg/telemetry/meters.go b/packages/shared/pkg/telemetry/meters.go index b6174169c4..012de489d3 100644 --- a/packages/shared/pkg/telemetry/meters.go +++ b/packages/shared/pkg/telemetry/meters.go @@ -411,12 +411,6 @@ const ( resultTypeFailure = "failure" ) -var ( - // Pre-allocated result attributes for use with PrecomputeAttrs. - Success = attribute.String(resultAttr, resultTypeSuccess) - Failure = attribute.String(resultAttr, resultTypeFailure) -) - func (t Stopwatch) Success(ctx context.Context, total int64, kv ...attribute.KeyValue) { t.end(ctx, resultTypeSuccess, total, kv...) } @@ -429,22 +423,9 @@ func (t Stopwatch) end(ctx context.Context, result string, total int64, kv ...at kv = append(kv, attribute.KeyValue{Key: resultAttr, Value: attribute.StringValue(result)}) kv = append(t.kv, kv...) opt := metric.WithAttributeSet(attribute.NewSet(kv...)) - t.RecordRaw(ctx, total, opt) -} - -// PrecomputeAttrs builds a reusable MeasurementOption from the given attribute -// key-values. The option must include all attributes (including "result"). -// Use with Stopwatch.Record to avoid per-call attribute allocation. -func PrecomputeAttrs(kv ...attribute.KeyValue) metric.MeasurementOption { - return metric.WithAttributeSet(attribute.NewSet(kv...)) -} -// RecordRaw records an operation using a precomputed attribute option, it does -// not include any previous attributes passed at Begin(). Zero-allocation -// alternative to Success/Failure for hot paths. -func (t Stopwatch) RecordRaw(ctx context.Context, total int64, precomputedAttrs metric.MeasurementOption) { amount := time.Since(t.start).Milliseconds() - t.histogram.Record(ctx, amount, precomputedAttrs) - t.sum.Add(ctx, total, precomputedAttrs) - t.count.Add(ctx, 1, precomputedAttrs) + t.histogram.Record(ctx, amount, opt) + t.sum.Add(ctx, total, opt) + t.count.Add(ctx, 1, opt) }