Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The `ErrorType` function in `go.opentelemetry.io/otel/semconv/v1.37.0` now handles custom error types.
If an error implements an `ErrorType() string` method, the return value of that method will be used as the error type. (#7442)
- Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7427)
- Improve the concurrent performance of `HistogramReservoir` in `go.opentelemetry.io/otel/sdk/metric/exemplar` by 4x. (#7443)

<!-- Released section -->
<!-- Don't change this section unless doing release -->
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/exemplar/fixed_size_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ func (r *FixedSizeReservoir) Offer(ctx context.Context, t time.Time, n Value, a
r.mu.Lock()
defer r.mu.Unlock()
if int(r.count) < cap(r.measurements) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drive-by: I think this (and all similar code) should be len not cap.
In the current code they are always the same, but it's a slight jar when reading it to wonder what was intended.

r.store(int(r.count), newMeasurement(ctx, t, n, a))
r.store(ctx, int(r.count), t, n, a)
} else if r.count == r.next {
// Overwrite a random existing measurement with the one offered.
idx := int(rand.Int64N(int64(cap(r.measurements))))
r.store(idx, newMeasurement(ctx, t, n, a))
r.store(ctx, idx, t, n, a)
r.advance()
}
r.count++
Expand Down
13 changes: 11 additions & 2 deletions sdk/metric/exemplar/fixed_size_reservoir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,21 @@ func TestNewFixedSizeReservoirSamplingCorrectness(t *testing.T) {
}

var sum float64
for _, m := range r.measurements {
sum += m.Value.Float64()
for i := range r.measurements {
sum += r.measurements[i].Value.Float64()
}
mean := sum / float64(sampleSize)

// Check the intensity/rate of the sampled distribution is preserved
// ensuring no bias in our random sampling algorithm.
assert.InDelta(t, 1/mean, intensity, 0.02) // Within 5σ.
}

func TestFixedSizeReservoirConcurrentSafe(t *testing.T) {
t.Run("Int64", reservoirConcurrentSafeTest[int64](func(n int) (ReservoirProvider, int) {
return FixedSizeReservoirProvider(n), n
}))
t.Run("Float64", reservoirConcurrentSafeTest[float64](func(n int) (ReservoirProvider, int) {
return FixedSizeReservoirProvider(n), n
}))
}
9 changes: 1 addition & 8 deletions sdk/metric/exemplar/histogram_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"slices"
"sort"
"sync"
"time"

"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -43,7 +42,6 @@ var _ Reservoir = &HistogramReservoir{}
type HistogramReservoir struct {
reservoir.ConcurrentSafe
*storage
mu sync.Mutex

// bounds are bucket bounds in ascending order.
bounds []float64
Expand Down Expand Up @@ -72,18 +70,13 @@ func (r *HistogramReservoir) Offer(ctx context.Context, t time.Time, v Value, a
}

idx := sort.SearchFloat64s(r.bounds, n)
m := newMeasurement(ctx, t, v, a)

r.mu.Lock()
defer r.mu.Unlock()
r.store(idx, m)
r.store(ctx, idx, t, v, a)
}

// Collect returns all the held exemplars.
//
// The Reservoir state is preserved after this call.
func (r *HistogramReservoir) Collect(dest *[]Exemplar) {
r.mu.Lock()
defer r.mu.Unlock()
r.storage.Collect(dest)
}
10 changes: 10 additions & 0 deletions sdk/metric/exemplar/histogram_reservoir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,13 @@ func TestHist(t *testing.T) {
return HistogramReservoirProvider(bounds), len(bounds)
}))
}

func TestHistogramReservoirConcurrentSafe(t *testing.T) {
bounds := []float64{0, 100}
t.Run("Int64", reservoirConcurrentSafeTest[int64](func(int) (ReservoirProvider, int) {
return HistogramReservoirProvider(bounds), len(bounds)
}))
t.Run("Float64", reservoirConcurrentSafeTest[float64](func(int) (ReservoirProvider, int) {
return HistogramReservoirProvider(bounds), len(bounds)
}))
}
65 changes: 65 additions & 0 deletions sdk/metric/exemplar/reservoir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package exemplar

import (
"context"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -144,3 +146,66 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) {
})
}
}

func reservoirConcurrentSafeTest[N int64 | float64](f factory) func(*testing.T) {
return func(t *testing.T) {
t.Helper()
rp, n := f(1)
if n < 1 {
t.Skip("skipping, reservoir capacity less than 1:", n)
}
r := rp(*attribute.EmptySet())

var wg sync.WaitGroup

// Call Offer concurrently with another Offer, and with Collect.
for i := range 2 {
wg.Add(1)
go func() {
ctx, ts, val, attrs := generateOfferInputs[N](t, i+1)
r.Offer(ctx, ts, val, attrs)
wg.Done()
}()
}
var dest []Exemplar
r.Collect(&dest)
for _, e := range dest {
validateExemplar[N](t, e)
}
wg.Wait()
}
}

func generateOfferInputs[N int64 | float64](
t *testing.T,
i int,
) (context.Context, time.Time, Value, []attribute.KeyValue) {
sc := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: trace.TraceID([16]byte{byte(i)}),
SpanID: trace.SpanID([8]byte{byte(i)}),
TraceFlags: trace.FlagsSampled,
})
ctx := trace.ContextWithSpanContext(t.Context(), sc)
ts := time.Unix(int64(i), int64(i))
val := NewValue(N(i))
attrs := []attribute.KeyValue{attribute.Int("i", i)}
return ctx, ts, val, attrs
}

func validateExemplar[N int64 | float64](t *testing.T, e Exemplar) {
i := 0
switch e.Value.Type() {
case Int64ValueType:
i = int(e.Value.Int64())
case Float64ValueType:
i = int(e.Value.Float64())
}
ctx, ts, _, attrs := generateOfferInputs[N](t, i)
sc := trace.SpanContextFromContext(ctx)
tID := sc.TraceID()
sID := sc.SpanID()
assert.Equal(t, tID[:], e.TraceID)
assert.Equal(t, sID[:], e.SpanID)
assert.Equal(t, ts, e.Time)
assert.Equal(t, attrs, e.FilteredAttributes)
}
53 changes: 28 additions & 25 deletions sdk/metric/exemplar/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"

import (
"context"
"sync"
"time"

"go.opentelemetry.io/otel/attribute"
Expand All @@ -24,8 +25,14 @@ func newStorage(n int) *storage {
return &storage{measurements: make([]measurement, n)}
}

func (r *storage) store(idx int, m measurement) {
r.measurements[idx] = m
func (r *storage) store(ctx context.Context, idx int, ts time.Time, v Value, droppedAttr []attribute.KeyValue) {
r.measurements[idx].mux.Lock()
defer r.measurements[idx].mux.Unlock()
r.measurements[idx].FilteredAttributes = droppedAttr
r.measurements[idx].Time = ts
r.measurements[idx].Value = v
r.measurements[idx].Ctx = ctx
r.measurements[idx].valid = true
}

// Collect returns all the held exemplars.
Expand All @@ -34,61 +41,57 @@ func (r *storage) store(idx int, m measurement) {
func (r *storage) Collect(dest *[]Exemplar) {
*dest = reset(*dest, len(r.measurements), len(r.measurements))
var n int
for _, m := range r.measurements {
if !m.valid {
continue
for i := range r.measurements {
if r.measurements[i].exemplar(&(*dest)[n]) {
n++
}

m.exemplar(&(*dest)[n])
n++
}
*dest = (*dest)[:n]
}

// measurement is a measurement made by a telemetry system.
type measurement struct {
mux sync.Mutex
// FilteredAttributes are the attributes dropped during the measurement.
FilteredAttributes []attribute.KeyValue
// Time is the time when the measurement was made.
Time time.Time
// Value is the value of the measurement.
Value Value
// SpanContext is the SpanContext active when a measurement was made.
SpanContext trace.SpanContext
// Ctx is the context active when a measurement was made.
Ctx context.Context

valid bool
}

// newMeasurement returns a new non-empty Measurement.
func newMeasurement(ctx context.Context, ts time.Time, v Value, droppedAttr []attribute.KeyValue) measurement {
return measurement{
FilteredAttributes: droppedAttr,
Time: ts,
Value: v,
SpanContext: trace.SpanContextFromContext(ctx),
valid: true,
// exemplar returns m as an [Exemplar].
// returns true if it populated the exemplar.
func (m *measurement) exemplar(dest *Exemplar) bool {
m.mux.Lock()
defer m.mux.Unlock()
if !m.valid {
return false
}
}

// exemplar returns m as an [Exemplar].
func (m measurement) exemplar(dest *Exemplar) {
dest.FilteredAttributes = m.FilteredAttributes
dest.Time = m.Time
dest.Value = m.Value

if m.SpanContext.HasTraceID() {
traceID := m.SpanContext.TraceID()
sc := trace.SpanContextFromContext(m.Ctx)
if sc.HasTraceID() {
traceID := sc.TraceID()
dest.TraceID = traceID[:]
} else {
dest.TraceID = dest.TraceID[:0]
}

if m.SpanContext.HasSpanID() {
spanID := m.SpanContext.SpanID()
if sc.HasSpanID() {
spanID := sc.SpanID()
dest.SpanID = spanID[:]
} else {
dest.SpanID = dest.SpanID[:0]
}
return true
}

func reset[T any](s []T, length, capacity int) []T {
Expand Down