Skip to content

Commit bd1b3da

Browse files
authored
Add exemplar reservoir parallel benchmarks (#7441)
This also fixes a bug introduced in #7423, where we were only locking around storage and not around other shared fields (e.g. count). Fixing the bug is required for benchmarks to run properly, but wasn't caught by concurrent safe tests because the SDK does not currently call exemplar methods concurrently. ``` goos: linux goarch: amd64 pkg: go.opentelemetry.io/otel/sdk/metric/exemplar cpu: Intel(R) Xeon(R) CPU @ 2.20GHz BenchmarkFixedSizeReservoirOffer-24 498955 248.4 ns/op 0 B/op 0 allocs/op BenchmarkHistogramReservoirOffer-24 478068 250.1 ns/op 0 B/op 0 allocs/op ```
1 parent dc906d6 commit bd1b3da

File tree

4 files changed

+59
-3
lines changed

4 files changed

+59
-3
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
5+
6+
import (
7+
"runtime"
8+
"testing"
9+
"time"
10+
)
11+
12+
func BenchmarkFixedSizeReservoirOffer(b *testing.B) {
13+
ts := time.Now()
14+
val := NewValue[int64](25)
15+
ctx := b.Context()
16+
reservoir := NewFixedSizeReservoir(runtime.NumCPU())
17+
b.RunParallel(func(pb *testing.PB) {
18+
i := 0
19+
for pb.Next() {
20+
reservoir.Offer(ctx, ts, val, nil)
21+
// Periodically trigger a reset, because the algorithm for fixed-size
22+
// reservoirs records exemplars very infrequently after a large
23+
// number of collect calls.
24+
if i%100 == 99 {
25+
reservoir.mu.Lock()
26+
reservoir.reset()
27+
reservoir.mu.Unlock()
28+
}
29+
i++
30+
}
31+
})
32+
}
33+
34+
func BenchmarkHistogramReservoirOffer(b *testing.B) {
35+
ts := time.Now()
36+
ctx := b.Context()
37+
buckets := []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}
38+
values := make([]Value, len(buckets))
39+
for i, bucket := range buckets {
40+
values[i] = NewValue[float64](bucket + 1)
41+
}
42+
res := NewHistogramReservoir(buckets)
43+
b.RunParallel(func(pb *testing.PB) {
44+
i := 0
45+
for pb.Next() {
46+
res.Offer(ctx, ts, values[i%len(values)], nil)
47+
i++
48+
}
49+
})
50+
}

sdk/metric/exemplar/fixed_size_reservoir.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ func (r *FixedSizeReservoir) Offer(ctx context.Context, t time.Time, n Value, a
125125
// https://github.com/MrAlias/reservoir-sampling for a performance
126126
// comparison of reservoir sampling algorithms.
127127

128+
r.mu.Lock()
129+
defer r.mu.Unlock()
128130
if int(r.count) < cap(r.measurements) {
129131
r.store(int(r.count), newMeasurement(ctx, t, n, a))
130132
} else if r.count == r.next {

sdk/metric/exemplar/histogram_reservoir.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,5 +68,11 @@ func (r *HistogramReservoir) Offer(ctx context.Context, t time.Time, v Value, a
6868
default:
6969
panic("unknown value type")
7070
}
71-
r.store(sort.SearchFloat64s(r.bounds, n), newMeasurement(ctx, t, v, a))
71+
72+
idx := sort.SearchFloat64s(r.bounds, n)
73+
m := newMeasurement(ctx, t, v, a)
74+
75+
r.mu.Lock()
76+
defer r.mu.Unlock()
77+
r.store(idx, m)
7278
}

sdk/metric/exemplar/storage.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ func newStorage(n int) *storage {
2727
}
2828

2929
func (r *storage) store(idx int, m measurement) {
30-
r.mu.Lock()
31-
defer r.mu.Unlock()
3230
r.measurements[idx] = m
3331
}
3432

0 commit comments

Comments
 (0)