Skip to content

Commit e513754

Browse files
committed
fix race where map elements could be overwritten
1 parent 0a20e5c commit e513754

File tree

7 files changed

+201
-115
lines changed

7 files changed

+201
-115
lines changed

sdk/metric/internal/aggregate/atomic.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,35 +8,35 @@ import (
88
"sync/atomic"
99
)
1010

11-
// counter is an efficient way of adding to a number which is either an
11+
// atomicSum is an efficient way of adding to a number which is either an
1212
// int64 or float64.
13-
type counter[N int64 | float64] struct {
13+
type atomicSum[N int64 | float64] struct {
1414
// nFloatBits contains only the non-integer portion of the counter.
15-
nFloatBits uint64
15+
nFloatBits atomic.Uint64
1616
// nInt contains only the integer portion of the counter.
17-
nInt uint64
17+
nInt atomic.Uint64
1818
}
1919

2020
// value returns the float or integer value.
21-
func (n *counter[N]) value() N {
22-
fval := math.Float64frombits(atomic.LoadUint64(&n.nFloatBits))
23-
ival := atomic.LoadUint64(&n.nInt)
21+
func (n *atomicSum[N]) value() N {
22+
fval := math.Float64frombits(n.nFloatBits.Load())
23+
ival := n.nInt.Load()
2424
return N(fval + float64(ival))
2525
}
2626

27-
func (n *counter[N]) add(value N) {
27+
func (n *atomicSum[N]) add(value N) {
2828
ival := uint64(value)
2929
// This case is where the value is an int, or if it is a whole-numbered float.
3030
if float64(ival) == float64(value) {
31-
atomic.AddUint64(&n.nInt, ival)
31+
n.nInt.Add(ival)
3232
return
3333
}
3434

3535
// Value must be a float below.
3636
for {
37-
oldBits := atomic.LoadUint64(&n.nFloatBits)
37+
oldBits := n.nFloatBits.Load()
3838
newBits := math.Float64bits(math.Float64frombits(oldBits) + float64(value))
39-
if atomic.CompareAndSwapUint64(&n.nFloatBits, oldBits, newBits) {
39+
if n.nFloatBits.CompareAndSwap(oldBits, newBits) {
4040
return
4141
}
4242
}

sdk/metric/internal/aggregate/exponential_histogram.go

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ func newExpoHistogramDataPoint[N int64 | float64](
7272
}
7373
}
7474

75+
func (p *expoHistogramDataPoint[N]) measure(ctx context.Context, v N, droppedAttr []attribute.KeyValue) {
76+
p.record(v)
77+
p.res.Offer(ctx, v, droppedAttr)
78+
}
79+
7580
// record adds a new measurement to the histogram. It will rescale the buckets if needed.
7681
func (p *expoHistogramDataPoint[N]) record(v N) {
7782
p.count++
@@ -316,10 +321,10 @@ type expoHistogram[N int64 | float64] struct {
316321
maxSize int
317322
maxScale int32
318323

319-
newRes func(attribute.Set) FilteredExemplarReservoir[N]
320-
limit limiter[expoHistogramDataPoint[N]]
321-
values map[attribute.Distinct]*expoHistogramDataPoint[N]
322-
valuesMu sync.Mutex
324+
newRes func(attribute.Set) FilteredExemplarReservoir[N]
325+
limit limiter[expoHistogramDataPoint[N]]
326+
values map[attribute.Distinct]*expoHistogramDataPoint[N]
327+
sync.RWMutex
323328

324329
start time.Time
325330
}
@@ -335,19 +340,30 @@ func (e *expoHistogram[N]) measure(
335340
return
336341
}
337342

338-
e.valuesMu.Lock()
339-
defer e.valuesMu.Unlock()
340-
343+
// Hold the RLock even after we are done reading from the values map to
344+
// ensure we don't race with collection.
345+
e.RLock()
341346
attr := e.limit.Attributes(fltrAttr, e.values)
342347
v, ok := e.values[attr.Equivalent()]
343-
if !ok {
344-
v = newExpoHistogramDataPoint[N](attr, e.maxSize, e.maxScale, e.noMinMax, e.noSum)
345-
v.res = e.newRes(attr)
346-
347-
e.values[attr.Equivalent()] = v
348+
if ok {
349+
v.measure(ctx, value, droppedAttr)
350+
e.RUnlock()
351+
return
352+
}
353+
e.RUnlock()
354+
// Switch to a full lock to add a new element to the map.
355+
e.Lock()
356+
defer e.Unlock()
357+
// Check that the element wasn't added since we last checked.
358+
v, ok = e.values[attr.Equivalent()]
359+
if ok {
360+
v.measure(ctx, value, droppedAttr)
361+
return
348362
}
349-
v.record(value)
350-
v.res.Offer(ctx, value, droppedAttr)
363+
v = newExpoHistogramDataPoint[N](attr, e.maxSize, e.maxScale, e.noMinMax, e.noSum)
364+
v.res = e.newRes(attr)
365+
v.measure(ctx, value, droppedAttr)
366+
e.values[attr.Equivalent()] = v
351367
}
352368

353369
func (e *expoHistogram[N]) delta(
@@ -360,8 +376,8 @@ func (e *expoHistogram[N]) delta(
360376
h, _ := (*dest).(metricdata.ExponentialHistogram[N])
361377
h.Temporality = metricdata.DeltaTemporality
362378

363-
e.valuesMu.Lock()
364-
defer e.valuesMu.Unlock()
379+
e.Lock()
380+
defer e.Unlock()
365381

366382
n := len(e.values)
367383
hDPts := reset(h.DataPoints, n, n)
@@ -423,8 +439,8 @@ func (e *expoHistogram[N]) cumulative(
423439
h, _ := (*dest).(metricdata.ExponentialHistogram[N])
424440
h.Temporality = metricdata.CumulativeTemporality
425441

426-
e.valuesMu.Lock()
427-
defer e.valuesMu.Unlock()
442+
e.Lock()
443+
defer e.Unlock()
428444

429445
n := len(e.values)
430446
hDPts := reset(h.DataPoints, n, n)

sdk/metric/internal/aggregate/filtered_reservoir.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ func NewFilteredExemplarReservoir[N int64 | float64](
4747

4848
func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) {
4949
if f.filter(ctx) {
50+
// We need to lock here because the individual aggregation only holds a
51+
// read lock.
5052
f.mu.Lock()
5153
defer f.mu.Unlock()
5254
// only record the current time if we are sampling this measurement.
@@ -55,7 +57,7 @@ func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []
5557
}
5658

5759
func (f *filteredExemplarReservoir[N]) Collect(dest *[]exemplar.Exemplar) {
58-
f.mu.Lock()
59-
defer f.mu.Unlock()
60+
// No need to lock here because the individual aggregation already holds
61+
// the RW lock.
6062
f.reservoir.Collect(dest)
6163
}

sdk/metric/internal/aggregate/histogram.go

Lines changed: 66 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -18,37 +18,42 @@ import (
1818
type buckets[N int64 | float64] struct {
1919
count uint64
2020
counts []uint64
21+
noSum bool
22+
noMinMax bool
2123
min, max atomic.Value
22-
total *counter[N]
24+
total atomicSum[N]
2325

2426
attrs attribute.Set
2527
res FilteredExemplarReservoir[N]
2628
}
2729

28-
// newBuckets returns buckets with n bins.
29-
func newBuckets[N int64 | float64](attrs attribute.Set, n int) *buckets[N] {
30-
return &buckets[N]{attrs: attrs, counts: make([]uint64, n), total: &counter[N]{}}
31-
}
32-
33-
func (b *buckets[N]) bin(idx int) {
30+
func (b *buckets[N]) measure(
31+
ctx context.Context,
32+
value N,
33+
idx int,
34+
droppedAttr []attribute.KeyValue,
35+
) {
3436
atomic.AddUint64(&b.counts[idx], 1)
3537
atomic.AddUint64(&b.count, 1)
36-
}
37-
38-
func (b *buckets[N]) minMax(value N) {
39-
for {
40-
minLoaded := b.min.Load()
41-
if value < minLoaded.(N) && !b.min.CompareAndSwap(minLoaded, value) {
42-
// We got a new min value, but lost the race. Try again.
43-
continue
44-
}
45-
maxLoaded := b.max.Load()
46-
if value > maxLoaded.(N) && !b.max.CompareAndSwap(maxLoaded, value) {
47-
// We got a new max value, but lost the race. Try again.
48-
continue
38+
if !b.noMinMax {
39+
for {
40+
minLoaded := b.min.Load()
41+
if (minLoaded == nil || value < minLoaded.(N)) && !b.min.CompareAndSwap(minLoaded, value) {
42+
// We got a new min value, but lost the race. Try again.
43+
continue
44+
}
45+
maxLoaded := b.max.Load()
46+
if (maxLoaded == nil || value > maxLoaded.(N)) && !b.max.CompareAndSwap(maxLoaded, value) {
47+
// We got a new max value, but lost the race. Try again.
48+
continue
49+
}
50+
break
4951
}
50-
return
5152
}
53+
if !b.noSum {
54+
b.total.add(value)
55+
}
56+
b.res.Offer(ctx, value, droppedAttr)
5257
}
5358

5459
// histValues summarizes a set of measurements as an histValues with
@@ -58,10 +63,10 @@ type histValues[N int64 | float64] struct {
5863
noMinMax bool
5964
bounds []float64
6065

61-
newRes func(attribute.Set) FilteredExemplarReservoir[N]
62-
limit limiter[buckets[N]]
63-
values map[attribute.Distinct]*buckets[N]
64-
valuesMu sync.RWMutex
66+
newRes func(attribute.Set) FilteredExemplarReservoir[N]
67+
limit limiter[buckets[N]]
68+
values map[attribute.Distinct]*buckets[N]
69+
sync.RWMutex
6570
}
6671

6772
func newHistValues[N int64 | float64](
@@ -102,39 +107,42 @@ func (s *histValues[N]) measure(
102107
// (s.bounds[len(s.bounds)-1], +∞).
103108
idx := sort.SearchFloat64s(s.bounds, float64(value))
104109

105-
s.valuesMu.RLock()
106-
110+
// Hold the RLock even after we are done reading from the values map to
111+
// ensure we don't race with collection.
112+
s.RLock()
107113
attr := s.limit.Attributes(fltrAttr, s.values)
108114
b, ok := s.values[attr.Equivalent()]
109-
s.valuesMu.RUnlock()
110-
if !ok {
115+
if ok {
116+
b.measure(ctx, value, idx, droppedAttr)
117+
s.RUnlock()
118+
return
119+
}
120+
s.RUnlock()
121+
// Switch to a full lock to add a new element to the map.
122+
s.Lock()
123+
defer s.Unlock()
124+
// Check that the element wasn't added since we last checked.
125+
b, ok = s.values[attr.Equivalent()]
126+
if ok {
127+
b.measure(ctx, value, idx, droppedAttr)
128+
return
129+
}
130+
b = &buckets[N]{
131+
attrs: attr,
111132
// N+1 buckets. For example:
112133
//
113134
// bounds = [0, 5, 10]
114135
//
115136
// Then,
116137
//
117138
// buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞)
118-
b = newBuckets[N](attr, len(s.bounds)+1)
119-
b.res = s.newRes(attr)
120-
121-
// Ensure min and max are recorded values (not zero), for new buckets.
122-
if !s.noMinMax {
123-
b.min.Store(value)
124-
b.max.Store(value)
125-
}
126-
s.valuesMu.Lock()
127-
s.values[attr.Equivalent()] = b
128-
s.valuesMu.Unlock()
129-
}
130-
b.bin(idx)
131-
if !s.noMinMax {
132-
b.minMax(value)
139+
counts: make([]uint64, len(s.bounds)+1),
140+
res: s.newRes(attr),
141+
noSum: s.noSum,
142+
noMinMax: s.noMinMax,
133143
}
134-
if !s.noSum {
135-
b.total.add(value)
136-
}
137-
b.res.Offer(ctx, value, droppedAttr)
144+
b.measure(ctx, value, idx, droppedAttr)
145+
s.values[attr.Equivalent()] = b
138146
}
139147

140148
// newHistogram returns an Aggregator that summarizes a set of measurements as
@@ -169,8 +177,11 @@ func (s *histogram[N]) delta(
169177
h, _ := (*dest).(metricdata.Histogram[N])
170178
h.Temporality = metricdata.DeltaTemporality
171179

172-
s.valuesMu.Lock()
173-
defer s.valuesMu.Unlock()
180+
// Aquire a full lock to ensure there are no concurrent measure() calls.
181+
// If we only used a RLock, we could observe "partial" measurements, such
182+
// as a histogram count increment without a histogram total increment.
183+
s.Lock()
184+
defer s.Unlock()
174185

175186
// Do not allow modification of our copy of bounds.
176187
bounds := slices.Clone(s.bounds)
@@ -221,8 +232,11 @@ func (s *histogram[N]) cumulative(
221232
h, _ := (*dest).(metricdata.Histogram[N])
222233
h.Temporality = metricdata.CumulativeTemporality
223234

224-
s.valuesMu.Lock()
225-
defer s.valuesMu.Unlock()
235+
// Aquire a full lock to ensure there are no concurrent measure() calls.
236+
// If we only used a RLock, we could observe "partial" measurements, such
237+
// as a histogram count increment without a histogram total increment.
238+
s.Lock()
239+
defer s.Unlock()
226240

227241
// Do not allow modification of our copy of bounds.
228242
bounds := slices.Clone(s.bounds)

sdk/metric/internal/aggregate/histogram_test.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -277,9 +277,11 @@ func TestBucketsBin(t *testing.T) {
277277

278278
func testBucketsBin[N int64 | float64]() func(t *testing.T) {
279279
return func(t *testing.T) {
280-
b := newBuckets[N](alice, 3)
281-
b.min.Store(N(0))
282-
b.max.Store(N(0))
280+
b := &buckets[N]{
281+
attrs: alice,
282+
counts: make([]uint64, 3),
283+
res: dropExemplars[N](alice),
284+
}
283285
assertB := func(counts []uint64, count uint64, mi, ma N) {
284286
t.Helper()
285287
assert.Equal(t, counts, b.counts)
@@ -288,12 +290,9 @@ func testBucketsBin[N int64 | float64]() func(t *testing.T) {
288290
assert.Equal(t, ma, b.max.Load().(N))
289291
}
290292

291-
assertB([]uint64{0, 0, 0}, 0, 0, 0)
292-
b.bin(1)
293-
b.minMax(2)
294-
assertB([]uint64{0, 1, 0}, 1, 0, 2)
295-
b.bin(0)
296-
b.minMax(-1)
293+
b.measure(context.Background(), 2, 1, nil)
294+
assertB([]uint64{0, 1, 0}, 1, 2, 2)
295+
b.measure(context.Background(), -1, 0, nil)
297296
assertB([]uint64{1, 1, 0}, 2, -1, 2)
298297
}
299298
}
@@ -305,7 +304,10 @@ func TestBucketsSum(t *testing.T) {
305304

306305
func testBucketsSum[N int64 | float64]() func(t *testing.T) {
307306
return func(t *testing.T) {
308-
b := newBuckets[N](alice, 3)
307+
b := &buckets[N]{
308+
attrs: alice,
309+
counts: make([]uint64, 3),
310+
}
309311

310312
var want N
311313
assert.Equal(t, want, b.total.value())

0 commit comments

Comments
 (0)