diff --git a/CHANGELOG.md b/CHANGELOG.md index c6fe673a8fc..a2d89ac0628 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The `ErrorType` function in `go.opentelemetry.io/otel/semconv/v1.37.0` now handles custom error types. If an error implements an `ErrorType() string` method, the return value of that method will be used as the error type. (#7442) - Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7427) +- Improve the concurrent performance of `HistogramReservoir` in `go.opentelemetry.io/otel/sdk/metric/exemplar` by 4x. (#7443) diff --git a/sdk/metric/exemplar/fixed_size_reservoir.go b/sdk/metric/exemplar/fixed_size_reservoir.go index 453278a0c38..4dba8bdc64a 100644 --- a/sdk/metric/exemplar/fixed_size_reservoir.go +++ b/sdk/metric/exemplar/fixed_size_reservoir.go @@ -130,11 +130,11 @@ func (r *FixedSizeReservoir) Offer(ctx context.Context, t time.Time, n Value, a r.mu.Lock() defer r.mu.Unlock() if int(r.count) < cap(r.measurements) { - r.store(int(r.count), newMeasurement(ctx, t, n, a)) + r.store(ctx, int(r.count), t, n, a) } else if r.count == r.next { // Overwrite a random existing measurement with the one offered. idx := int(rand.Int64N(int64(cap(r.measurements)))) - r.store(idx, newMeasurement(ctx, t, n, a)) + r.store(ctx, idx, t, n, a) r.advance() } r.count++ diff --git a/sdk/metric/exemplar/fixed_size_reservoir_test.go b/sdk/metric/exemplar/fixed_size_reservoir_test.go index 914a2238d69..76b9a5f1be5 100644 --- a/sdk/metric/exemplar/fixed_size_reservoir_test.go +++ b/sdk/metric/exemplar/fixed_size_reservoir_test.go @@ -45,8 +45,8 @@ func TestNewFixedSizeReservoirSamplingCorrectness(t *testing.T) { } var sum float64 - for _, m := range r.measurements { - sum += m.Value.Float64() + for i := range r.measurements { + sum += r.measurements[i].Value.Float64() } mean := sum / float64(sampleSize) @@ -54,3 +54,12 @@ func TestNewFixedSizeReservoirSamplingCorrectness(t *testing.T) { // ensuring no bias in our random sampling algorithm. assert.InDelta(t, 1/mean, intensity, 0.02) // Within 5σ. } + +func TestFixedSizeReservoirConcurrentSafe(t *testing.T) { + t.Run("Int64", reservoirConcurrentSafeTest[int64](func(n int) (ReservoirProvider, int) { + return FixedSizeReservoirProvider(n), n + })) + t.Run("Float64", reservoirConcurrentSafeTest[float64](func(n int) (ReservoirProvider, int) { + return FixedSizeReservoirProvider(n), n + })) +} diff --git a/sdk/metric/exemplar/histogram_reservoir.go b/sdk/metric/exemplar/histogram_reservoir.go index 60c871a4432..dacac3ebaec 100644 --- a/sdk/metric/exemplar/histogram_reservoir.go +++ b/sdk/metric/exemplar/histogram_reservoir.go @@ -7,7 +7,6 @@ import ( "context" "slices" "sort" - "sync" "time" "go.opentelemetry.io/otel/attribute" @@ -43,7 +42,6 @@ var _ Reservoir = &HistogramReservoir{} type HistogramReservoir struct { reservoir.ConcurrentSafe *storage - mu sync.Mutex // bounds are bucket bounds in ascending order. bounds []float64 @@ -72,18 +70,13 @@ func (r *HistogramReservoir) Offer(ctx context.Context, t time.Time, v Value, a } idx := sort.SearchFloat64s(r.bounds, n) - m := newMeasurement(ctx, t, v, a) - r.mu.Lock() - defer r.mu.Unlock() - r.store(idx, m) + r.store(ctx, idx, t, v, a) } // Collect returns all the held exemplars. // // The Reservoir state is preserved after this call. func (r *HistogramReservoir) Collect(dest *[]Exemplar) { - r.mu.Lock() - defer r.mu.Unlock() r.storage.Collect(dest) } diff --git a/sdk/metric/exemplar/histogram_reservoir_test.go b/sdk/metric/exemplar/histogram_reservoir_test.go index 3f43e801e8f..57b07612af8 100644 --- a/sdk/metric/exemplar/histogram_reservoir_test.go +++ b/sdk/metric/exemplar/histogram_reservoir_test.go @@ -15,3 +15,13 @@ func TestHist(t *testing.T) { return HistogramReservoirProvider(bounds), len(bounds) })) } + +func TestHistogramReservoirConcurrentSafe(t *testing.T) { + bounds := []float64{0, 100} + t.Run("Int64", reservoirConcurrentSafeTest[int64](func(int) (ReservoirProvider, int) { + return HistogramReservoirProvider(bounds), len(bounds) + })) + t.Run("Float64", reservoirConcurrentSafeTest[float64](func(int) (ReservoirProvider, int) { + return HistogramReservoirProvider(bounds), len(bounds) + })) +} diff --git a/sdk/metric/exemplar/reservoir_test.go b/sdk/metric/exemplar/reservoir_test.go index 236c7496286..e0754d926be 100644 --- a/sdk/metric/exemplar/reservoir_test.go +++ b/sdk/metric/exemplar/reservoir_test.go @@ -4,6 +4,8 @@ package exemplar import ( + "context" + "sync" "testing" "time" @@ -144,3 +146,66 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) { }) } } + +func reservoirConcurrentSafeTest[N int64 | float64](f factory) func(*testing.T) { + return func(t *testing.T) { + t.Helper() + rp, n := f(1) + if n < 1 { + t.Skip("skipping, reservoir capacity less than 1:", n) + } + r := rp(*attribute.EmptySet()) + + var wg sync.WaitGroup + + // Call Offer concurrently with another Offer, and with Collect. + for i := range 2 { + wg.Add(1) + go func() { + ctx, ts, val, attrs := generateOfferInputs[N](t, i+1) + r.Offer(ctx, ts, val, attrs) + wg.Done() + }() + } + var dest []Exemplar + r.Collect(&dest) + for _, e := range dest { + validateExemplar[N](t, e) + } + wg.Wait() + } +} + +func generateOfferInputs[N int64 | float64]( + t *testing.T, + i int, +) (context.Context, time.Time, Value, []attribute.KeyValue) { + sc := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: trace.TraceID([16]byte{byte(i)}), + SpanID: trace.SpanID([8]byte{byte(i)}), + TraceFlags: trace.FlagsSampled, + }) + ctx := trace.ContextWithSpanContext(t.Context(), sc) + ts := time.Unix(int64(i), int64(i)) + val := NewValue(N(i)) + attrs := []attribute.KeyValue{attribute.Int("i", i)} + return ctx, ts, val, attrs +} + +func validateExemplar[N int64 | float64](t *testing.T, e Exemplar) { + i := 0 + switch e.Value.Type() { + case Int64ValueType: + i = int(e.Value.Int64()) + case Float64ValueType: + i = int(e.Value.Float64()) + } + ctx, ts, _, attrs := generateOfferInputs[N](t, i) + sc := trace.SpanContextFromContext(ctx) + tID := sc.TraceID() + sID := sc.SpanID() + assert.Equal(t, tID[:], e.TraceID) + assert.Equal(t, sID[:], e.SpanID) + assert.Equal(t, ts, e.Time) + assert.Equal(t, attrs, e.FilteredAttributes) +} diff --git a/sdk/metric/exemplar/storage.go b/sdk/metric/exemplar/storage.go index 16b61c07dec..790496027da 100644 --- a/sdk/metric/exemplar/storage.go +++ b/sdk/metric/exemplar/storage.go @@ -5,6 +5,7 @@ package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar" import ( "context" + "sync" "time" "go.opentelemetry.io/otel/attribute" @@ -24,8 +25,14 @@ func newStorage(n int) *storage { return &storage{measurements: make([]measurement, n)} } -func (r *storage) store(idx int, m measurement) { - r.measurements[idx] = m +func (r *storage) store(ctx context.Context, idx int, ts time.Time, v Value, droppedAttr []attribute.KeyValue) { + r.measurements[idx].mux.Lock() + defer r.measurements[idx].mux.Unlock() + r.measurements[idx].FilteredAttributes = droppedAttr + r.measurements[idx].Time = ts + r.measurements[idx].Value = v + r.measurements[idx].Ctx = ctx + r.measurements[idx].valid = true } // Collect returns all the held exemplars. @@ -34,61 +41,57 @@ func (r *storage) store(idx int, m measurement) { func (r *storage) Collect(dest *[]Exemplar) { *dest = reset(*dest, len(r.measurements), len(r.measurements)) var n int - for _, m := range r.measurements { - if !m.valid { - continue + for i := range r.measurements { + if r.measurements[i].exemplar(&(*dest)[n]) { + n++ } - - m.exemplar(&(*dest)[n]) - n++ } *dest = (*dest)[:n] } // measurement is a measurement made by a telemetry system. type measurement struct { + mux sync.Mutex // FilteredAttributes are the attributes dropped during the measurement. FilteredAttributes []attribute.KeyValue // Time is the time when the measurement was made. Time time.Time // Value is the value of the measurement. Value Value - // SpanContext is the SpanContext active when a measurement was made. - SpanContext trace.SpanContext + // Ctx is the context active when a measurement was made. + Ctx context.Context valid bool } -// newMeasurement returns a new non-empty Measurement. -func newMeasurement(ctx context.Context, ts time.Time, v Value, droppedAttr []attribute.KeyValue) measurement { - return measurement{ - FilteredAttributes: droppedAttr, - Time: ts, - Value: v, - SpanContext: trace.SpanContextFromContext(ctx), - valid: true, +// exemplar returns m as an [Exemplar]. +// returns true if it populated the exemplar. +func (m *measurement) exemplar(dest *Exemplar) bool { + m.mux.Lock() + defer m.mux.Unlock() + if !m.valid { + return false } -} -// exemplar returns m as an [Exemplar]. -func (m measurement) exemplar(dest *Exemplar) { dest.FilteredAttributes = m.FilteredAttributes dest.Time = m.Time dest.Value = m.Value - if m.SpanContext.HasTraceID() { - traceID := m.SpanContext.TraceID() + sc := trace.SpanContextFromContext(m.Ctx) + if sc.HasTraceID() { + traceID := sc.TraceID() dest.TraceID = traceID[:] } else { dest.TraceID = dest.TraceID[:0] } - if m.SpanContext.HasSpanID() { - spanID := m.SpanContext.SpanID() + if sc.HasSpanID() { + spanID := sc.SpanID() dest.SpanID = spanID[:] } else { dest.SpanID = dest.SpanID[:0] } + return true } func reset[T any](s []T, length, capacity int) []T {