Skip to content

Commit 7ca3ccc

Browse files
committed
new atomic types and cleanups
1 parent e513754 commit 7ca3ccc

File tree

6 files changed

+143
-98
lines changed

6 files changed

+143
-98
lines changed

sdk/metric/internal/aggregate/atomic.go

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ type atomicSum[N int64 | float64] struct {
1717
nInt atomic.Uint64
1818
}
1919

20-
// value returns the float or integer value.
21-
func (n *atomicSum[N]) value() N {
20+
// load returns the float or integer value.
21+
func (n *atomicSum[N]) load() N {
2222
fval := math.Float64frombits(n.nFloatBits.Load())
2323
ival := n.nInt.Load()
2424
return N(fval + float64(ival))
@@ -41,3 +41,65 @@ func (n *atomicSum[N]) add(value N) {
4141
}
4242
}
4343
}
44+
45+
type atomicIntOrFloat[N int64 | float64] struct {
46+
nIntOrBits atomic.Uint64
47+
}
48+
49+
func (n *atomicIntOrFloat[N]) store(value N) {
50+
switch v := any(value).(type) {
51+
case int64:
52+
n.nIntOrBits.Store(uint64(v))
53+
case float64:
54+
n.nIntOrBits.Store(math.Float64bits(v))
55+
}
56+
}
57+
58+
func (n *atomicIntOrFloat[N]) load() (value N) {
59+
switch any(value).(type) {
60+
case int64:
61+
value = N(n.nIntOrBits.Load())
62+
case float64:
63+
value = N(math.Float64frombits(n.nIntOrBits.Load()))
64+
}
65+
return
66+
}
67+
68+
func (n *atomicIntOrFloat[N]) compareAndSwap(old, new N) bool {
69+
switch any(old).(type) {
70+
case float64:
71+
return n.nIntOrBits.CompareAndSwap(math.Float64bits(float64(old)), math.Float64bits(float64(new)))
72+
}
73+
return n.nIntOrBits.CompareAndSwap(uint64(old), uint64(new))
74+
}
75+
76+
type atomicMinMax[N int64 | float64] struct {
77+
min atomicIntOrFloat[N]
78+
max atomicIntOrFloat[N]
79+
isSet atomic.Bool
80+
}
81+
82+
func (n *atomicMinMax[N]) observe(value N) {
83+
for {
84+
minLoaded := n.min.load()
85+
if (!n.isSet.Load() || value < minLoaded) && !n.min.compareAndSwap(minLoaded, value) {
86+
// We got a new min value, but lost the race. Try again.
87+
continue
88+
}
89+
maxLoaded := n.max.load()
90+
if (!n.isSet.Load() || value > maxLoaded) && !n.max.compareAndSwap(maxLoaded, value) {
91+
// We got a new max value, but lost the race. Try again.
92+
continue
93+
}
94+
break
95+
}
96+
n.isSet.Store(true)
97+
}
98+
99+
func (n *atomicMinMax[N]) loadMin() (value N) {
100+
return n.min.load()
101+
}
102+
103+
func (n *atomicMinMax[N]) loadMax() (value N) {
104+
return n.max.load()
105+
}

sdk/metric/internal/aggregate/exponential_histogram.go

Lines changed: 43 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"errors"
99
"math"
1010
"sync"
11+
"sync/atomic"
1112
"time"
1213

1314
"go.opentelemetry.io/otel"
@@ -29,20 +30,20 @@ const (
2930

3031
// expoHistogramDataPoint is a single data point in an exponential histogram.
3132
type expoHistogramDataPoint[N int64 | float64] struct {
32-
count uint64
33-
min N
34-
max N
35-
sum N
33+
count uint64
34+
zeroCount uint64
35+
sum atomicSum[N]
36+
minMax atomicMinMax[N]
3637

3738
maxSize int
3839
noMinMax bool
3940
noSum bool
4041

41-
scale int32
42+
scaleMux sync.RWMutex
43+
scale int32
4244

4345
posBuckets expoBuckets
4446
negBuckets expoBuckets
45-
zeroCount uint64
4647

4748
attrs attribute.Set
4849
res FilteredExemplarReservoir[N]
@@ -54,17 +55,8 @@ func newExpoHistogramDataPoint[N int64 | float64](
5455
maxScale int32,
5556
noMinMax, noSum bool,
5657
) *expoHistogramDataPoint[N] { // nolint:revive // we need this control flag
57-
f := math.MaxFloat64
58-
ma := N(f) // if N is int64, max will overflow to -9223372036854775808
59-
mi := N(-f)
60-
if N(maxInt64) > N(f) {
61-
ma = N(maxInt64)
62-
mi = N(minInt64)
63-
}
6458
return &expoHistogramDataPoint[N]{
6559
attrs: attrs,
66-
min: ma,
67-
max: mi,
6860
maxSize: maxSize,
6961
noMinMax: noMinMax,
7062
noSum: noSum,
@@ -79,27 +71,23 @@ func (p *expoHistogramDataPoint[N]) measure(ctx context.Context, v N, droppedAtt
7971

8072
// record adds a new measurement to the histogram. It will rescale the buckets if needed.
8173
func (p *expoHistogramDataPoint[N]) record(v N) {
82-
p.count++
74+
atomic.AddUint64(&p.count, 1)
8375

8476
if !p.noMinMax {
85-
if v < p.min {
86-
p.min = v
87-
}
88-
if v > p.max {
89-
p.max = v
90-
}
77+
p.minMax.observe(v)
9178
}
9279
if !p.noSum {
93-
p.sum += v
80+
p.sum.add(v)
9481
}
9582

9683
absV := math.Abs(float64(v))
9784

9885
if float64(absV) == 0.0 {
99-
p.zeroCount++
86+
atomic.AddUint64(&p.zeroCount, 1)
10087
return
10188
}
10289

90+
p.scaleMux.RLock()
10391
bin := p.getBin(absV)
10492

10593
bucket := &p.posBuckets
@@ -109,22 +97,32 @@ func (p *expoHistogramDataPoint[N]) record(v N) {
10997

11098
// If the new bin would make the counts larger than maxScale, we need to
11199
// downscale current measurements.
112-
if scaleDelta := p.scaleChange(bin, bucket.startBin, len(bucket.counts)); scaleDelta > 0 {
113-
if p.scale-scaleDelta < expoMinScale {
114-
// With a scale of -10 there is only two buckets for the whole range of float64 values.
115-
// This can only happen if there is a max size of 1.
116-
otel.Handle(errors.New("exponential histogram scale underflow"))
117-
return
118-
}
119-
// Downscale
120-
p.scale -= scaleDelta
121-
p.posBuckets.downscale(scaleDelta)
122-
p.negBuckets.downscale(scaleDelta)
123-
124-
bin = p.getBin(absV)
100+
scaleDelta := p.scaleChange(bin, bucket.startBin, len(bucket.counts))
101+
if scaleDelta <= 0 {
102+
bucket.record(bin)
103+
p.scaleMux.RUnlock()
104+
return
125105
}
126-
127-
bucket.record(bin)
106+
if p.scale-scaleDelta < expoMinScale {
107+
// With a scale of -10 there is only two buckets for the whole range of float64 values.
108+
// This can only happen if there is a max size of 1.
109+
otel.Handle(errors.New("exponential histogram scale underflow"))
110+
p.scaleMux.RUnlock()
111+
return
112+
}
113+
// Switch to a full Lock for downscaling
114+
p.scaleMux.RUnlock()
115+
p.scaleMux.Lock()
116+
defer p.scaleMux.Unlock()
117+
// recompute the scaleDelta now that we hold the exclusive lock
118+
scaleDelta = p.scaleChange(bin, bucket.startBin, len(bucket.counts))
119+
// Downscale
120+
p.scale -= scaleDelta
121+
p.posBuckets.downscale(scaleDelta)
122+
p.negBuckets.downscale(scaleDelta)
123+
124+
// TODO: is it worth switching back to a read-lock?
125+
bucket.record(p.getBin(absV))
128126
}
129127

130128
// getBin returns the bin v should be recorded into.
@@ -409,11 +407,11 @@ func (e *expoHistogram[N]) delta(
409407
copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts)
410408

411409
if !e.noSum {
412-
hDPts[i].Sum = val.sum
410+
hDPts[i].Sum = val.sum.load()
413411
}
414412
if !e.noMinMax {
415-
hDPts[i].Min = metricdata.NewExtrema(val.min)
416-
hDPts[i].Max = metricdata.NewExtrema(val.max)
413+
hDPts[i].Min = metricdata.NewExtrema(val.minMax.loadMin())
414+
hDPts[i].Max = metricdata.NewExtrema(val.minMax.loadMax())
417415
}
418416

419417
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)
@@ -472,11 +470,11 @@ func (e *expoHistogram[N]) cumulative(
472470
copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts)
473471

474472
if !e.noSum {
475-
hDPts[i].Sum = val.sum
473+
hDPts[i].Sum = val.sum.load()
476474
}
477475
if !e.noMinMax {
478-
hDPts[i].Min = metricdata.NewExtrema(val.min)
479-
hDPts[i].Max = metricdata.NewExtrema(val.max)
476+
hDPts[i].Min = metricdata.NewExtrema(val.minMax.loadMin())
477+
hDPts[i].Max = metricdata.NewExtrema(val.minMax.loadMax())
480478
}
481479

482480
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)

sdk/metric/internal/aggregate/exponential_histogram_test.go

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -178,9 +178,9 @@ func testExpoHistogramMinMaxSumInt64(t *testing.T) {
178178
}
179179
dp := h.values[alice.Equivalent()]
180180

181-
assert.Equal(t, tt.expected.max, dp.max)
182-
assert.Equal(t, tt.expected.min, dp.min)
183-
assert.InDelta(t, tt.expected.sum, dp.sum, 0.01)
181+
assert.Equal(t, tt.expected.max, dp.minMax.loadMax())
182+
assert.Equal(t, tt.expected.min, dp.minMax.loadMin())
183+
assert.InDelta(t, tt.expected.sum, dp.sum.load(), 0.01)
184184
})
185185
}
186186
}
@@ -220,9 +220,9 @@ func testExpoHistogramMinMaxSumFloat64(t *testing.T) {
220220
}
221221
dp := h.values[alice.Equivalent()]
222222

223-
assert.Equal(t, tt.expected.max, dp.max)
224-
assert.Equal(t, tt.expected.min, dp.min)
225-
assert.InDelta(t, tt.expected.sum, dp.sum, 0.01)
223+
assert.Equal(t, tt.expected.max, dp.minMax.loadMax())
224+
assert.Equal(t, tt.expected.min, dp.minMax.loadMin())
225+
assert.InDelta(t, tt.expected.sum, dp.sum.load(), 0.01)
226226
})
227227
}
228228
}
@@ -703,27 +703,24 @@ func BenchmarkExponentialHistogram(b *testing.B) {
703703
}
704704

705705
func TestSubNormal(t *testing.T) {
706-
want := &expoHistogramDataPoint[float64]{
707-
attrs: alice,
708-
maxSize: 4,
709-
count: 3,
710-
min: math.SmallestNonzeroFloat64,
711-
max: math.SmallestNonzeroFloat64,
712-
sum: 3 * math.SmallestNonzeroFloat64,
713-
714-
scale: 20,
715-
posBuckets: expoBuckets{
716-
startBin: -1126170625,
717-
counts: []uint64{3},
718-
},
706+
wantBuckets := expoBuckets{
707+
startBin: -1126170625,
708+
counts: []uint64{3},
719709
}
720710

721711
ehdp := newExpoHistogramDataPoint[float64](alice, 4, 20, false, false)
722712
ehdp.record(math.SmallestNonzeroFloat64)
723713
ehdp.record(math.SmallestNonzeroFloat64)
724714
ehdp.record(math.SmallestNonzeroFloat64)
725715

726-
assert.Equal(t, want, ehdp)
716+
assert.Equal(t, alice, ehdp.attrs)
717+
assert.Equal(t, 4, ehdp.maxSize)
718+
assert.Equal(t, uint64(3), ehdp.count)
719+
assert.Equal(t, math.SmallestNonzeroFloat64, ehdp.minMax.loadMin())
720+
assert.Equal(t, math.SmallestNonzeroFloat64, ehdp.minMax.loadMax())
721+
assert.Equal(t, 3*math.SmallestNonzeroFloat64, ehdp.sum.load())
722+
assert.Equal(t, int32(20), ehdp.scale)
723+
assert.Equal(t, wantBuckets, ehdp.posBuckets)
727724
}
728725

729726
func TestExponentialHistogramAggregation(t *testing.T) {

sdk/metric/internal/aggregate/histogram.go

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ import (
1818
type buckets[N int64 | float64] struct {
1919
count uint64
2020
counts []uint64
21+
minMax atomicMinMax[N]
22+
total atomicSum[N]
2123
noSum bool
2224
noMinMax bool
23-
min, max atomic.Value
24-
total atomicSum[N]
2525

2626
attrs attribute.Set
2727
res FilteredExemplarReservoir[N]
@@ -36,19 +36,7 @@ func (b *buckets[N]) measure(
3636
atomic.AddUint64(&b.counts[idx], 1)
3737
atomic.AddUint64(&b.count, 1)
3838
if !b.noMinMax {
39-
for {
40-
minLoaded := b.min.Load()
41-
if (minLoaded == nil || value < minLoaded.(N)) && !b.min.CompareAndSwap(minLoaded, value) {
42-
// We got a new min value, but lost the race. Try again.
43-
continue
44-
}
45-
maxLoaded := b.max.Load()
46-
if (maxLoaded == nil || value > maxLoaded.(N)) && !b.max.CompareAndSwap(maxLoaded, value) {
47-
// We got a new max value, but lost the race. Try again.
48-
continue
49-
}
50-
break
51-
}
39+
b.minMax.observe(value)
5240
}
5341
if !b.noSum {
5442
b.total.add(value)
@@ -199,12 +187,12 @@ func (s *histogram[N]) delta(
199187
hDPts[i].BucketCounts = val.counts
200188

201189
if !s.noSum {
202-
hDPts[i].Sum = val.total.value()
190+
hDPts[i].Sum = val.total.load()
203191
}
204192

205193
if !s.noMinMax {
206-
hDPts[i].Min = metricdata.NewExtrema(val.min.Load().(N))
207-
hDPts[i].Max = metricdata.NewExtrema(val.max.Load().(N))
194+
hDPts[i].Min = metricdata.NewExtrema(val.minMax.loadMin())
195+
hDPts[i].Max = metricdata.NewExtrema(val.minMax.loadMax())
208196
}
209197

210198
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)
@@ -260,12 +248,12 @@ func (s *histogram[N]) cumulative(
260248
hDPts[i].BucketCounts = slices.Clone(val.counts)
261249

262250
if !s.noSum {
263-
hDPts[i].Sum = val.total.value()
251+
hDPts[i].Sum = val.total.load()
264252
}
265253

266254
if !s.noMinMax {
267-
hDPts[i].Min = metricdata.NewExtrema(val.min.Load().(N))
268-
hDPts[i].Max = metricdata.NewExtrema(val.max.Load().(N))
255+
hDPts[i].Min = metricdata.NewExtrema(val.minMax.loadMin())
256+
hDPts[i].Max = metricdata.NewExtrema(val.minMax.loadMax())
269257
}
270258

271259
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)

0 commit comments

Comments
 (0)