Skip to content

Commit 774cbb8

Browse files
committed
use RWMutex for map access in the metrics SDK
1 parent 666f95c commit 774cbb8

File tree

10 files changed

+149
-76
lines changed

10 files changed

+149
-76
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
2929
- `WithInstrumentationAttributes` in `go.opentelemetry.io/otel/trace` synchronously de-duplicates the passed attributes instead of delegating it to the returned `TracerOption`. (#7266)
3030
- `WithInstrumentationAttributes` in `go.opentelemetry.io/otel/meter` synchronously de-duplicates the passed attributes instead of delegating it to the returned `MeterOption`. (#7266)
3131
- `WithInstrumentationAttributes` in `go.opentelemetry.io/otel/log` synchronously de-duplicates the passed attributes instead of delegating it to the returned `LoggerOption`. (#7266)
32+
- Improve performance of concurrent measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7189)
3233

3334
<!-- Released section -->
3435
<!-- Don't change this section unless doing release -->
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
5+
6+
import (
7+
"math"
8+
"sync/atomic"
9+
)
10+
11+
// counter is an efficient way of adding to a number which is either an
12+
// int64 or float64.
13+
type counter[N int64 | float64] struct {
14+
// nFloatBits contains only the non-integer portion of the counter.
15+
nFloatBits uint64
16+
// nInt contains only the integer portion of the counter.
17+
nInt uint64
18+
}
19+
20+
// value returns the float or integer value.
21+
func (n *counter[N]) value() N {
22+
fval := math.Float64frombits(atomic.LoadUint64(&n.nFloatBits))
23+
ival := atomic.LoadUint64(&n.nInt)
24+
return N(fval + float64(ival))
25+
}
26+
27+
func (n *counter[N]) add(value N) {
28+
ival := uint64(value)
29+
// This case is where the value is an int, or if it is a whole-numbered float.
30+
if float64(ival) == float64(value) {
31+
atomic.AddUint64(&n.nInt, ival)
32+
return
33+
}
34+
35+
// Value must be a float below.
36+
for {
37+
oldBits := atomic.LoadUint64(&n.nFloatBits)
38+
newBits := math.Float64bits(math.Float64frombits(oldBits) + float64(value))
39+
if atomic.CompareAndSwapUint64(&n.nFloatBits, oldBits, newBits) {
40+
return
41+
}
42+
}
43+
}

sdk/metric/internal/aggregate/exponential_histogram.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ func newExponentialHistogram[N int64 | float64](
301301
maxScale: maxScale,
302302

303303
newRes: r,
304-
limit: newLimiter[*expoHistogramDataPoint[N]](limit),
304+
limit: newLimiter[expoHistogramDataPoint[N]](limit),
305305
values: make(map[attribute.Distinct]*expoHistogramDataPoint[N]),
306306

307307
start: now(),
@@ -317,7 +317,7 @@ type expoHistogram[N int64 | float64] struct {
317317
maxScale int32
318318

319319
newRes func(attribute.Set) FilteredExemplarReservoir[N]
320-
limit limiter[*expoHistogramDataPoint[N]]
320+
limit limiter[expoHistogramDataPoint[N]]
321321
values map[attribute.Distinct]*expoHistogramDataPoint[N]
322322
valuesMu sync.Mutex
323323

sdk/metric/internal/aggregate/filtered_reservoir.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg
55

66
import (
77
"context"
8+
"sync"
89
"time"
910

1011
"go.opentelemetry.io/otel/attribute"
@@ -27,6 +28,7 @@ type FilteredExemplarReservoir[N int64 | float64] interface {
2728

2829
// filteredExemplarReservoir handles the pre-sampled exemplar of measurements made.
2930
type filteredExemplarReservoir[N int64 | float64] struct {
31+
mu sync.Mutex
3032
filter exemplar.Filter
3133
reservoir exemplar.Reservoir
3234
}
@@ -45,9 +47,15 @@ func NewFilteredExemplarReservoir[N int64 | float64](
4547

4648
func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) {
4749
if f.filter(ctx) {
50+
f.mu.Lock()
51+
defer f.mu.Unlock()
4852
// only record the current time if we are sampling this measurement.
4953
f.reservoir.Offer(ctx, time.Now(), exemplar.NewValue(val), attr)
5054
}
5155
}
5256

53-
func (f *filteredExemplarReservoir[N]) Collect(dest *[]exemplar.Exemplar) { f.reservoir.Collect(dest) }
57+
func (f *filteredExemplarReservoir[N]) Collect(dest *[]exemplar.Exemplar) {
58+
f.mu.Lock()
59+
defer f.mu.Unlock()
60+
f.reservoir.Collect(dest)
61+
}

sdk/metric/internal/aggregate/histogram.go

Lines changed: 44 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -8,58 +8,66 @@ import (
88
"slices"
99
"sort"
1010
"sync"
11+
"sync/atomic"
1112
"time"
1213

1314
"go.opentelemetry.io/otel/attribute"
1415
"go.opentelemetry.io/otel/sdk/metric/metricdata"
1516
)
1617

1718
type buckets[N int64 | float64] struct {
19+
count uint64
20+
counts []uint64
21+
min, max atomic.Value
22+
total *counter[N]
23+
1824
attrs attribute.Set
1925
res FilteredExemplarReservoir[N]
20-
21-
counts []uint64
22-
count uint64
23-
total N
24-
min, max N
2526
}
2627

2728
// newBuckets returns buckets with n bins.
2829
func newBuckets[N int64 | float64](attrs attribute.Set, n int) *buckets[N] {
29-
return &buckets[N]{attrs: attrs, counts: make([]uint64, n)}
30+
return &buckets[N]{attrs: attrs, counts: make([]uint64, n), total: &counter[N]{}}
3031
}
3132

32-
func (b *buckets[N]) sum(value N) { b.total += value }
33-
3433
func (b *buckets[N]) bin(idx int) {
35-
b.counts[idx]++
36-
b.count++
34+
atomic.AddUint64(&b.counts[idx], 1)
35+
atomic.AddUint64(&b.count, 1)
3736
}
3837

3938
func (b *buckets[N]) minMax(value N) {
40-
if value < b.min {
41-
b.min = value
42-
} else if value > b.max {
43-
b.max = value
39+
for {
40+
minLoaded := b.min.Load()
41+
if value < minLoaded.(N) && !b.min.CompareAndSwap(minLoaded, value) {
42+
// We got a new min value, but lost the race. Try again.
43+
continue
44+
}
45+
maxLoaded := b.max.Load()
46+
if value > maxLoaded.(N) && !b.max.CompareAndSwap(maxLoaded, value) {
47+
// We got a new max value, but lost the race. Try again.
48+
continue
49+
}
50+
return
4451
}
4552
}
4653

4754
// histValues summarizes a set of measurements as an histValues with
4855
// explicitly defined buckets.
4956
type histValues[N int64 | float64] struct {
50-
noMinMax bool
5157
noSum bool
58+
noMinMax bool
5259
bounds []float64
5360

5461
newRes func(attribute.Set) FilteredExemplarReservoir[N]
55-
limit limiter[*buckets[N]]
62+
limit limiter[buckets[N]]
5663
values map[attribute.Distinct]*buckets[N]
57-
valuesMu sync.Mutex
64+
valuesMu sync.RWMutex
5865
}
5966

6067
func newHistValues[N int64 | float64](
6168
bounds []float64,
62-
noMinMax, noSum bool,
69+
noSum bool,
70+
noMinMax bool,
6371
limit int,
6472
r func(attribute.Set) FilteredExemplarReservoir[N],
6573
) *histValues[N] {
@@ -70,11 +78,11 @@ func newHistValues[N int64 | float64](
7078
b := slices.Clone(bounds)
7179
slices.Sort(b)
7280
return &histValues[N]{
73-
noMinMax: noMinMax,
7481
noSum: noSum,
82+
noMinMax: noMinMax,
7583
bounds: b,
7684
newRes: r,
77-
limit: newLimiter[*buckets[N]](limit),
85+
limit: newLimiter[buckets[N]](limit),
7886
values: make(map[attribute.Distinct]*buckets[N]),
7987
}
8088
}
@@ -94,11 +102,11 @@ func (s *histValues[N]) measure(
94102
// (s.bounds[len(s.bounds)-1], +∞).
95103
idx := sort.SearchFloat64s(s.bounds, float64(value))
96104

97-
s.valuesMu.Lock()
98-
defer s.valuesMu.Unlock()
105+
s.valuesMu.RLock()
99106

100107
attr := s.limit.Attributes(fltrAttr, s.values)
101108
b, ok := s.values[attr.Equivalent()]
109+
s.valuesMu.RUnlock()
102110
if !ok {
103111
// N+1 buckets. For example:
104112
//
@@ -111,15 +119,20 @@ func (s *histValues[N]) measure(
111119
b.res = s.newRes(attr)
112120

113121
// Ensure min and max are recorded values (not zero), for new buckets.
114-
b.min, b.max = value, value
122+
if !s.noMinMax {
123+
b.min.Store(value)
124+
b.max.Store(value)
125+
}
126+
s.valuesMu.Lock()
115127
s.values[attr.Equivalent()] = b
128+
s.valuesMu.Unlock()
116129
}
117130
b.bin(idx)
118131
if !s.noMinMax {
119132
b.minMax(value)
120133
}
121134
if !s.noSum {
122-
b.sum(value)
135+
b.total.add(value)
123136
}
124137
b.res.Offer(ctx, value, droppedAttr)
125138
}
@@ -133,7 +146,7 @@ func newHistogram[N int64 | float64](
133146
r func(attribute.Set) FilteredExemplarReservoir[N],
134147
) *histogram[N] {
135148
return &histogram[N]{
136-
histValues: newHistValues[N](boundaries, noMinMax, noSum, limit, r),
149+
histValues: newHistValues[N](boundaries, noSum, noMinMax, limit, r),
137150
start: now(),
138151
}
139152
}
@@ -175,12 +188,12 @@ func (s *histogram[N]) delta(
175188
hDPts[i].BucketCounts = val.counts
176189

177190
if !s.noSum {
178-
hDPts[i].Sum = val.total
191+
hDPts[i].Sum = val.total.value()
179192
}
180193

181194
if !s.noMinMax {
182-
hDPts[i].Min = metricdata.NewExtrema(val.min)
183-
hDPts[i].Max = metricdata.NewExtrema(val.max)
195+
hDPts[i].Min = metricdata.NewExtrema(val.min.Load().(N))
196+
hDPts[i].Max = metricdata.NewExtrema(val.max.Load().(N))
184197
}
185198

186199
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)
@@ -233,12 +246,12 @@ func (s *histogram[N]) cumulative(
233246
hDPts[i].BucketCounts = slices.Clone(val.counts)
234247

235248
if !s.noSum {
236-
hDPts[i].Sum = val.total
249+
hDPts[i].Sum = val.total.value()
237250
}
238251

239252
if !s.noMinMax {
240-
hDPts[i].Min = metricdata.NewExtrema(val.min)
241-
hDPts[i].Max = metricdata.NewExtrema(val.max)
253+
hDPts[i].Min = metricdata.NewExtrema(val.min.Load().(N))
254+
hDPts[i].Max = metricdata.NewExtrema(val.max.Load().(N))
242255
}
243256

244257
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)

sdk/metric/internal/aggregate/histogram_test.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -278,12 +278,14 @@ func TestBucketsBin(t *testing.T) {
278278
func testBucketsBin[N int64 | float64]() func(t *testing.T) {
279279
return func(t *testing.T) {
280280
b := newBuckets[N](alice, 3)
281+
b.min.Store(N(0))
282+
b.max.Store(N(0))
281283
assertB := func(counts []uint64, count uint64, mi, ma N) {
282284
t.Helper()
283285
assert.Equal(t, counts, b.counts)
284286
assert.Equal(t, count, b.count)
285-
assert.Equal(t, mi, b.min)
286-
assert.Equal(t, ma, b.max)
287+
assert.Equal(t, mi, b.min.Load().(N))
288+
assert.Equal(t, ma, b.max.Load().(N))
287289
}
288290

289291
assertB([]uint64{0, 0, 0}, 0, 0, 0)
@@ -306,15 +308,15 @@ func testBucketsSum[N int64 | float64]() func(t *testing.T) {
306308
b := newBuckets[N](alice, 3)
307309

308310
var want N
309-
assert.Equal(t, want, b.total)
311+
assert.Equal(t, want, b.total.value())
310312

311-
b.sum(2)
313+
b.total.add(2)
312314
want = 2
313-
assert.Equal(t, want, b.total)
315+
assert.Equal(t, want, b.total.value())
314316

315-
b.sum(-1)
317+
b.total.add(-1)
316318
want = 1
317-
assert.Equal(t, want, b.total)
319+
assert.Equal(t, want, b.total.value())
318320
}
319321
}
320322

sdk/metric/internal/aggregate/lastvalue.go

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg
66
import (
77
"context"
88
"sync"
9+
"sync/atomic"
910
"time"
1011

1112
"go.opentelemetry.io/otel/attribute"
@@ -14,45 +15,46 @@ import (
1415

1516
// datapoint is timestamped measurement data.
1617
type datapoint[N int64 | float64] struct {
18+
value atomic.Value
1719
attrs attribute.Set
18-
value N
1920
res FilteredExemplarReservoir[N]
2021
}
2122

2223
func newLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *lastValue[N] {
2324
return &lastValue[N]{
2425
newRes: r,
2526
limit: newLimiter[datapoint[N]](limit),
26-
values: make(map[attribute.Distinct]datapoint[N]),
27+
values: make(map[attribute.Distinct]*datapoint[N]),
2728
start: now(),
2829
}
2930
}
3031

3132
// lastValue summarizes a set of measurements as the last one made.
3233
type lastValue[N int64 | float64] struct {
33-
sync.Mutex
34+
sync.RWMutex
3435

3536
newRes func(attribute.Set) FilteredExemplarReservoir[N]
3637
limit limiter[datapoint[N]]
37-
values map[attribute.Distinct]datapoint[N]
38+
values map[attribute.Distinct]*datapoint[N]
3839
start time.Time
3940
}
4041

4142
func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
42-
s.Lock()
43-
defer s.Unlock()
44-
43+
s.RLock()
4544
attr := s.limit.Attributes(fltrAttr, s.values)
4645
d, ok := s.values[attr.Equivalent()]
46+
s.RUnlock()
4747
if !ok {
48-
d.res = s.newRes(attr)
48+
d = &datapoint[N]{
49+
res: s.newRes(attr),
50+
attrs: attr,
51+
}
52+
s.Lock()
53+
s.values[attr.Equivalent()] = d
54+
s.Unlock()
4955
}
50-
51-
d.attrs = attr
52-
d.value = value
5356
d.res.Offer(ctx, value, droppedAttr)
54-
55-
s.values[attr.Equivalent()] = d
57+
d.value.Store(value)
5658
}
5759

5860
func (s *lastValue[N]) delta(
@@ -109,7 +111,7 @@ func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) in
109111
(*dest)[i].Attributes = v.attrs
110112
(*dest)[i].StartTime = s.start
111113
(*dest)[i].Time = t
112-
(*dest)[i].Value = v.value
114+
(*dest)[i].Value = v.value.Load().(N)
113115
collectExemplars(&(*dest)[i].Exemplars, v.res.Collect)
114116
i++
115117
}

0 commit comments

Comments
 (0)