Skip to content

Commit 4adb38d

Browse files
committed
use RWMutex for map access in the metrics SDK
1 parent 8c8cd0a commit 4adb38d

File tree

10 files changed

+169
-85
lines changed

10 files changed

+169
-85
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
88

99
## [Unreleased]
1010

11+
### Changed
12+
13+
- Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7189)
14+
1115
### Removed
1216

1317
- Drop support for [Go 1.23]. (#7274)
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
5+
6+
import (
7+
"math"
8+
"sync/atomic"
9+
)
10+
11+
// counter is an efficient way of adding to a number which is either an
12+
// int64 or float64.
13+
type counter[N int64 | float64] struct {
14+
// nFloatBits contains only the non-integer portion of the counter.
15+
nFloatBits uint64
16+
// nInt contains only the integer portion of the counter.
17+
nInt uint64
18+
}
19+
20+
// value returns the float or integer value.
21+
func (n *counter[N]) value() N {
22+
fval := math.Float64frombits(atomic.LoadUint64(&n.nFloatBits))
23+
ival := atomic.LoadUint64(&n.nInt)
24+
return N(fval + float64(ival))
25+
}
26+
27+
func (n *counter[N]) add(value N) {
28+
ival := uint64(value)
29+
// This case is where the value is an int, or if it is a whole-numbered float.
30+
if float64(ival) == float64(value) {
31+
atomic.AddUint64(&n.nInt, ival)
32+
return
33+
}
34+
35+
// Value must be a float below.
36+
for {
37+
oldBits := atomic.LoadUint64(&n.nFloatBits)
38+
newBits := math.Float64bits(math.Float64frombits(oldBits) + float64(value))
39+
if atomic.CompareAndSwapUint64(&n.nFloatBits, oldBits, newBits) {
40+
return
41+
}
42+
}
43+
}

sdk/metric/internal/aggregate/exponential_histogram.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ func newExponentialHistogram[N int64 | float64](
301301
maxScale: maxScale,
302302

303303
newRes: r,
304-
limit: newLimiter[*expoHistogramDataPoint[N]](limit),
304+
limit: newLimiter[expoHistogramDataPoint[N]](limit),
305305
values: make(map[attribute.Distinct]*expoHistogramDataPoint[N]),
306306

307307
start: now(),
@@ -317,7 +317,7 @@ type expoHistogram[N int64 | float64] struct {
317317
maxScale int32
318318

319319
newRes func(attribute.Set) FilteredExemplarReservoir[N]
320-
limit limiter[*expoHistogramDataPoint[N]]
320+
limit limiter[expoHistogramDataPoint[N]]
321321
values map[attribute.Distinct]*expoHistogramDataPoint[N]
322322
valuesMu sync.Mutex
323323

sdk/metric/internal/aggregate/filtered_reservoir.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg
55

66
import (
77
"context"
8+
"sync"
89
"time"
910

1011
"go.opentelemetry.io/otel/attribute"
@@ -27,6 +28,7 @@ type FilteredExemplarReservoir[N int64 | float64] interface {
2728

2829
// filteredExemplarReservoir handles the pre-sampled exemplar of measurements made.
2930
type filteredExemplarReservoir[N int64 | float64] struct {
31+
mu sync.Mutex
3032
filter exemplar.Filter
3133
reservoir exemplar.Reservoir
3234
}
@@ -45,9 +47,15 @@ func NewFilteredExemplarReservoir[N int64 | float64](
4547

4648
func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) {
4749
if f.filter(ctx) {
50+
f.mu.Lock()
51+
defer f.mu.Unlock()
4852
// only record the current time if we are sampling this measurement.
4953
f.reservoir.Offer(ctx, time.Now(), exemplar.NewValue(val), attr)
5054
}
5155
}
5256

53-
func (f *filteredExemplarReservoir[N]) Collect(dest *[]exemplar.Exemplar) { f.reservoir.Collect(dest) }
57+
func (f *filteredExemplarReservoir[N]) Collect(dest *[]exemplar.Exemplar) {
58+
f.mu.Lock()
59+
defer f.mu.Unlock()
60+
f.reservoir.Collect(dest)
61+
}

sdk/metric/internal/aggregate/histogram.go

Lines changed: 57 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -8,54 +8,66 @@ import (
88
"slices"
99
"sort"
1010
"sync"
11+
"sync/atomic"
1112
"time"
1213

1314
"go.opentelemetry.io/otel/attribute"
1415
"go.opentelemetry.io/otel/sdk/metric/metricdata"
1516
)
1617

1718
type buckets[N int64 | float64] struct {
19+
count uint64
20+
counts []uint64
21+
min, max atomic.Value
22+
total *counter[N]
23+
1824
attrs attribute.Set
1925
res FilteredExemplarReservoir[N]
20-
21-
counts []uint64
22-
count uint64
23-
total N
24-
min, max N
2526
}
2627

2728
// newBuckets returns buckets with n bins.
2829
func newBuckets[N int64 | float64](attrs attribute.Set, n int) *buckets[N] {
29-
return &buckets[N]{attrs: attrs, counts: make([]uint64, n)}
30+
return &buckets[N]{attrs: attrs, counts: make([]uint64, n), total: &counter[N]{}}
3031
}
3132

32-
func (b *buckets[N]) sum(value N) { b.total += value }
33+
func (b *buckets[N]) bin(idx int) {
34+
atomic.AddUint64(&b.counts[idx], 1)
35+
atomic.AddUint64(&b.count, 1)
36+
}
3337

34-
func (b *buckets[N]) bin(idx int, value N) {
35-
b.counts[idx]++
36-
b.count++
37-
if value < b.min {
38-
b.min = value
39-
} else if value > b.max {
40-
b.max = value
38+
func (b *buckets[N]) minMax(value N) {
39+
for {
40+
minLoaded := b.min.Load()
41+
if value < minLoaded.(N) && !b.min.CompareAndSwap(minLoaded, value) {
42+
// We got a new min value, but lost the race. Try again.
43+
continue
44+
}
45+
maxLoaded := b.max.Load()
46+
if value > maxLoaded.(N) && !b.max.CompareAndSwap(maxLoaded, value) {
47+
// We got a new max value, but lost the race. Try again.
48+
continue
49+
}
50+
return
4151
}
4252
}
4353

4454
// histValues summarizes a set of measurements as an histValues with
4555
// explicitly defined buckets.
4656
type histValues[N int64 | float64] struct {
47-
noSum bool
48-
bounds []float64
57+
noSum bool
58+
noMinMax bool
59+
bounds []float64
4960

5061
newRes func(attribute.Set) FilteredExemplarReservoir[N]
51-
limit limiter[*buckets[N]]
62+
limit limiter[buckets[N]]
5263
values map[attribute.Distinct]*buckets[N]
53-
valuesMu sync.Mutex
64+
valuesMu sync.RWMutex
5465
}
5566

5667
func newHistValues[N int64 | float64](
5768
bounds []float64,
5869
noSum bool,
70+
noMinMax bool,
5971
limit int,
6072
r func(attribute.Set) FilteredExemplarReservoir[N],
6173
) *histValues[N] {
@@ -66,11 +78,12 @@ func newHistValues[N int64 | float64](
6678
b := slices.Clone(bounds)
6779
slices.Sort(b)
6880
return &histValues[N]{
69-
noSum: noSum,
70-
bounds: b,
71-
newRes: r,
72-
limit: newLimiter[*buckets[N]](limit),
73-
values: make(map[attribute.Distinct]*buckets[N]),
81+
noSum: noSum,
82+
noMinMax: noMinMax,
83+
bounds: b,
84+
newRes: r,
85+
limit: newLimiter[buckets[N]](limit),
86+
values: make(map[attribute.Distinct]*buckets[N]),
7487
}
7588
}
7689

@@ -89,11 +102,11 @@ func (s *histValues[N]) measure(
89102
// (s.bounds[len(s.bounds)-1], +∞).
90103
idx := sort.SearchFloat64s(s.bounds, float64(value))
91104

92-
s.valuesMu.Lock()
93-
defer s.valuesMu.Unlock()
105+
s.valuesMu.RLock()
94106

95107
attr := s.limit.Attributes(fltrAttr, s.values)
96108
b, ok := s.values[attr.Equivalent()]
109+
s.valuesMu.RUnlock()
97110
if !ok {
98111
// N+1 buckets. For example:
99112
//
@@ -106,12 +119,20 @@ func (s *histValues[N]) measure(
106119
b.res = s.newRes(attr)
107120

108121
// Ensure min and max are recorded values (not zero), for new buckets.
109-
b.min, b.max = value, value
122+
if !s.noMinMax {
123+
b.min.Store(value)
124+
b.max.Store(value)
125+
}
126+
s.valuesMu.Lock()
110127
s.values[attr.Equivalent()] = b
128+
s.valuesMu.Unlock()
129+
}
130+
b.bin(idx)
131+
if !s.noMinMax {
132+
b.minMax(value)
111133
}
112-
b.bin(idx, value)
113134
if !s.noSum {
114-
b.sum(value)
135+
b.total.add(value)
115136
}
116137
b.res.Offer(ctx, value, droppedAttr)
117138
}
@@ -125,8 +146,7 @@ func newHistogram[N int64 | float64](
125146
r func(attribute.Set) FilteredExemplarReservoir[N],
126147
) *histogram[N] {
127148
return &histogram[N]{
128-
histValues: newHistValues[N](boundaries, noSum, limit, r),
129-
noMinMax: noMinMax,
149+
histValues: newHistValues[N](boundaries, noSum, noMinMax, limit, r),
130150
start: now(),
131151
}
132152
}
@@ -136,8 +156,7 @@ func newHistogram[N int64 | float64](
136156
type histogram[N int64 | float64] struct {
137157
*histValues[N]
138158

139-
noMinMax bool
140-
start time.Time
159+
start time.Time
141160
}
142161

143162
func (s *histogram[N]) delta(
@@ -169,12 +188,12 @@ func (s *histogram[N]) delta(
169188
hDPts[i].BucketCounts = val.counts
170189

171190
if !s.noSum {
172-
hDPts[i].Sum = val.total
191+
hDPts[i].Sum = val.total.value()
173192
}
174193

175194
if !s.noMinMax {
176-
hDPts[i].Min = metricdata.NewExtrema(val.min)
177-
hDPts[i].Max = metricdata.NewExtrema(val.max)
195+
hDPts[i].Min = metricdata.NewExtrema(val.min.Load().(N))
196+
hDPts[i].Max = metricdata.NewExtrema(val.max.Load().(N))
178197
}
179198

180199
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)
@@ -227,12 +246,12 @@ func (s *histogram[N]) cumulative(
227246
hDPts[i].BucketCounts = slices.Clone(val.counts)
228247

229248
if !s.noSum {
230-
hDPts[i].Sum = val.total
249+
hDPts[i].Sum = val.total.value()
231250
}
232251

233252
if !s.noMinMax {
234-
hDPts[i].Min = metricdata.NewExtrema(val.min)
235-
hDPts[i].Max = metricdata.NewExtrema(val.max)
253+
hDPts[i].Min = metricdata.NewExtrema(val.min.Load().(N))
254+
hDPts[i].Max = metricdata.NewExtrema(val.max.Load().(N))
236255
}
237256

238257
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)

sdk/metric/internal/aggregate/histogram_test.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -278,18 +278,22 @@ func TestBucketsBin(t *testing.T) {
278278
func testBucketsBin[N int64 | float64]() func(t *testing.T) {
279279
return func(t *testing.T) {
280280
b := newBuckets[N](alice, 3)
281+
b.min.Store(N(0))
282+
b.max.Store(N(0))
281283
assertB := func(counts []uint64, count uint64, mi, ma N) {
282284
t.Helper()
283285
assert.Equal(t, counts, b.counts)
284286
assert.Equal(t, count, b.count)
285-
assert.Equal(t, mi, b.min)
286-
assert.Equal(t, ma, b.max)
287+
assert.Equal(t, mi, b.min.Load().(N))
288+
assert.Equal(t, ma, b.max.Load().(N))
287289
}
288290

289291
assertB([]uint64{0, 0, 0}, 0, 0, 0)
290-
b.bin(1, 2)
292+
b.bin(1)
293+
b.minMax(2)
291294
assertB([]uint64{0, 1, 0}, 1, 0, 2)
292-
b.bin(0, -1)
295+
b.bin(0)
296+
b.minMax(-1)
293297
assertB([]uint64{1, 1, 0}, 2, -1, 2)
294298
}
295299
}
@@ -304,15 +308,15 @@ func testBucketsSum[N int64 | float64]() func(t *testing.T) {
304308
b := newBuckets[N](alice, 3)
305309

306310
var want N
307-
assert.Equal(t, want, b.total)
311+
assert.Equal(t, want, b.total.value())
308312

309-
b.sum(2)
313+
b.total.add(2)
310314
want = 2
311-
assert.Equal(t, want, b.total)
315+
assert.Equal(t, want, b.total.value())
312316

313-
b.sum(-1)
317+
b.total.add(-1)
314318
want = 1
315-
assert.Equal(t, want, b.total)
319+
assert.Equal(t, want, b.total.value())
316320
}
317321
}
318322

0 commit comments

Comments
 (0)