Skip to content

Commit 778d81f

Browse files
committed
RWLock for aggregation attribute lookup
1 parent eda888f commit 778d81f

File tree

7 files changed

+97
-47
lines changed

7 files changed

+97
-47
lines changed

sdk/metric/internal/aggregate/exponential_histogram.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ func newExponentialHistogram[N int64 | float64](
301301
maxScale: maxScale,
302302

303303
newRes: r,
304-
limit: newLimiter[*expoHistogramDataPoint[N]](limit),
304+
limit: newLimiter[expoHistogramDataPoint[N]](limit),
305305
values: make(map[attribute.Distinct]*expoHistogramDataPoint[N]),
306306

307307
start: now(),
@@ -317,9 +317,9 @@ type expoHistogram[N int64 | float64] struct {
317317
maxScale int32
318318

319319
newRes func(attribute.Set) FilteredExemplarReservoir[N]
320-
limit limiter[*expoHistogramDataPoint[N]]
320+
limit limiter[expoHistogramDataPoint[N]]
321321
values map[attribute.Distinct]*expoHistogramDataPoint[N]
322-
valuesMu sync.Mutex
322+
valuesMu sync.RWMutex
323323

324324
start time.Time
325325
}
@@ -335,18 +335,21 @@ func (e *expoHistogram[N]) measure(
335335
return
336336
}
337337

338-
e.valuesMu.Lock()
339-
defer e.valuesMu.Unlock()
340-
338+
e.valuesMu.RLock()
341339
attr := e.limit.Attributes(fltrAttr, e.values)
342340
v, ok := e.values[attr.Equivalent()]
341+
e.valuesMu.RUnlock()
343342
if !ok {
344343
v = newExpoHistogramDataPoint[N](attr, e.maxSize, e.maxScale, e.noMinMax, e.noSum)
345344
v.res = e.newRes(attr)
346345

346+
e.valuesMu.Lock()
347347
e.values[attr.Equivalent()] = v
348+
e.valuesMu.Unlock()
348349
}
350+
e.valuesMu.Lock()
349351
v.record(value)
352+
e.valuesMu.Unlock()
350353
v.res.Offer(ctx, value, droppedAttr)
351354
}
352355

sdk/metric/internal/aggregate/filtered_reservoir.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg
55

66
import (
77
"context"
8+
"sync"
89
"time"
910

1011
"go.opentelemetry.io/otel/attribute"
@@ -27,6 +28,7 @@ type FilteredExemplarReservoir[N int64 | float64] interface {
2728

2829
// filteredExemplarReservoir handles the pre-sampled exemplar of measurements made.
2930
type filteredExemplarReservoir[N int64 | float64] struct {
31+
mu sync.Mutex
3032
filter exemplar.Filter
3133
reservoir exemplar.Reservoir
3234
}
@@ -45,6 +47,8 @@ func NewFilteredExemplarReservoir[N int64 | float64](
4547

4648
func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) {
4749
if f.filter(ctx) {
50+
f.mu.Lock()
51+
defer f.mu.Unlock()
4852
// only record the current time if we are sampling this measurement.
4953
f.reservoir.Offer(ctx, time.Now(), exemplar.NewValue(val), attr)
5054
}

sdk/metric/internal/aggregate/histogram.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ type histValues[N int64 | float64] struct {
4848
bounds []float64
4949

5050
newRes func(attribute.Set) FilteredExemplarReservoir[N]
51-
limit limiter[*buckets[N]]
51+
limit limiter[buckets[N]]
5252
values map[attribute.Distinct]*buckets[N]
53-
valuesMu sync.Mutex
53+
valuesMu sync.RWMutex
5454
}
5555

5656
func newHistValues[N int64 | float64](
@@ -69,7 +69,7 @@ func newHistValues[N int64 | float64](
6969
noSum: noSum,
7070
bounds: b,
7171
newRes: r,
72-
limit: newLimiter[*buckets[N]](limit),
72+
limit: newLimiter[buckets[N]](limit),
7373
values: make(map[attribute.Distinct]*buckets[N]),
7474
}
7575
}
@@ -89,11 +89,11 @@ func (s *histValues[N]) measure(
8989
// (s.bounds[len(s.bounds)-1], +∞).
9090
idx := sort.SearchFloat64s(s.bounds, float64(value))
9191

92-
s.valuesMu.Lock()
93-
defer s.valuesMu.Unlock()
92+
s.valuesMu.RLock()
9493

9594
attr := s.limit.Attributes(fltrAttr, s.values)
9695
b, ok := s.values[attr.Equivalent()]
96+
s.valuesMu.RUnlock()
9797
if !ok {
9898
// N+1 buckets. For example:
9999
//
@@ -107,12 +107,16 @@ func (s *histValues[N]) measure(
107107

108108
// Ensure min and max are recorded values (not zero), for new buckets.
109109
b.min, b.max = value, value
110+
s.valuesMu.Lock()
110111
s.values[attr.Equivalent()] = b
112+
s.valuesMu.Unlock()
111113
}
114+
s.valuesMu.Lock()
112115
b.bin(idx, value)
113116
if !s.noSum {
114117
b.sum(value)
115118
}
119+
s.valuesMu.Unlock()
116120
b.res.Offer(ctx, value, droppedAttr)
117121
}
118122

sdk/metric/internal/aggregate/lastvalue.go

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg
66
import (
77
"context"
88
"sync"
9+
"sync/atomic"
910
"time"
1011

1112
"go.opentelemetry.io/otel/attribute"
@@ -15,44 +16,50 @@ import (
1516
// datapoint is timestamped measurement data.
1617
type datapoint[N int64 | float64] struct {
1718
attrs attribute.Set
18-
value N
19+
value atomic.Value
1920
res FilteredExemplarReservoir[N]
2021
}
2122

2223
func newLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *lastValue[N] {
2324
return &lastValue[N]{
2425
newRes: r,
2526
limit: newLimiter[datapoint[N]](limit),
26-
values: make(map[attribute.Distinct]datapoint[N]),
27+
values: make(map[attribute.Distinct]*datapoint[N]),
2728
start: now(),
2829
}
2930
}
3031

3132
// lastValue summarizes a set of measurements as the last one made.
3233
type lastValue[N int64 | float64] struct {
33-
sync.Mutex
34+
sync.RWMutex
3435

3536
newRes func(attribute.Set) FilteredExemplarReservoir[N]
3637
limit limiter[datapoint[N]]
37-
values map[attribute.Distinct]datapoint[N]
38+
values map[attribute.Distinct]*datapoint[N]
3839
start time.Time
3940
}
4041

41-
func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
42-
s.Lock()
43-
defer s.Unlock()
44-
42+
func (s *lastValue[N]) getOrCreateDatapoint(fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) *datapoint[N] {
43+
s.RLock()
4544
attr := s.limit.Attributes(fltrAttr, s.values)
4645
d, ok := s.values[attr.Equivalent()]
46+
s.RUnlock()
4747
if !ok {
48-
d.res = s.newRes(attr)
48+
d = &datapoint[N]{
49+
res: s.newRes(attr),
50+
attrs: attr,
51+
}
52+
s.Lock()
53+
s.values[attr.Equivalent()] = d
54+
s.Unlock()
4955
}
56+
return d
57+
}
5058

51-
d.attrs = attr
52-
d.value = value
59+
func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
60+
d := s.getOrCreateDatapoint(fltrAttr, droppedAttr)
5361
d.res.Offer(ctx, value, droppedAttr)
54-
55-
s.values[attr.Equivalent()] = d
62+
d.value.Store(value)
5663
}
5764

5865
func (s *lastValue[N]) delta(
@@ -109,7 +116,7 @@ func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) in
109116
(*dest)[i].Attributes = v.attrs
110117
(*dest)[i].StartTime = s.start
111118
(*dest)[i].Time = t
112-
(*dest)[i].Value = v.value
119+
(*dest)[i].Value = v.value.Load().(N)
113120
collectExemplars(&(*dest)[i].Exemplars, v.res.Collect)
114121
i++
115122
}

sdk/metric/internal/aggregate/limit.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func newLimiter[V any](aggregation int) limiter[V] {
3030
// aggregation cardinality limit for the existing measurements. If it will,
3131
// overflowSet is returned. Otherwise, if it will not exceed the limit, or the
3232
// limit is not set (limit <= 0), attr is returned.
33-
func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Distinct]V) attribute.Set {
33+
func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Distinct]*V) attribute.Set {
3434
if l.aggLimit > 0 {
3535
_, exists := measurements[attrs.Equivalent()]
3636
if !exists && len(measurements) >= l.aggLimit-1 {

sdk/metric/internal/aggregate/limit_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ import (
1212
)
1313

1414
func TestLimiterAttributes(t *testing.T) {
15-
m := map[attribute.Distinct]struct{}{alice.Equivalent(): {}}
15+
var val struct{}
16+
m := map[attribute.Distinct]*struct{}{alice.Equivalent(): &val}
1617
t.Run("NoLimit", func(t *testing.T) {
1718
l := newLimiter[struct{}](0)
1819
assert.Equal(t, alice, l.Attributes(alice, m))
@@ -43,7 +44,8 @@ func TestLimiterAttributes(t *testing.T) {
4344
var limitedAttr attribute.Set
4445

4546
func BenchmarkLimiterAttributes(b *testing.B) {
46-
m := map[attribute.Distinct]struct{}{alice.Equivalent(): {}}
47+
var val struct{}
48+
m := map[attribute.Distinct]*struct{}{alice.Equivalent(): &val}
4749
l := newLimiter[struct{}](2)
4850

4951
b.ReportAllocs()

sdk/metric/internal/aggregate/sum.go

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,50 +5,80 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg
55

66
import (
77
"context"
8+
"math"
89
"sync"
10+
"sync/atomic"
911
"time"
1012

1113
"go.opentelemetry.io/otel/attribute"
1214
"go.opentelemetry.io/otel/sdk/metric/metricdata"
1315
)
1416

1517
type sumValue[N int64 | float64] struct {
16-
n N
17-
res FilteredExemplarReservoir[N]
18-
attrs attribute.Set
18+
nFloatBits uint64
19+
nInt uint64
20+
res FilteredExemplarReservoir[N]
21+
attrs attribute.Set
22+
}
23+
24+
func (s *sumValue[N]) val() N {
25+
fval := math.Float64frombits(atomic.LoadUint64(&s.nFloatBits))
26+
ival := atomic.LoadUint64(&s.nInt)
27+
return N(fval + float64(ival))
1928
}
2029

2130
// valueMap is the storage for sums.
2231
type valueMap[N int64 | float64] struct {
23-
sync.Mutex
32+
sync.RWMutex
2433
newRes func(attribute.Set) FilteredExemplarReservoir[N]
2534
limit limiter[sumValue[N]]
26-
values map[attribute.Distinct]sumValue[N]
35+
values map[attribute.Distinct]*sumValue[N]
2736
}
2837

2938
func newValueMap[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *valueMap[N] {
3039
return &valueMap[N]{
3140
newRes: r,
3241
limit: newLimiter[sumValue[N]](limit),
33-
values: make(map[attribute.Distinct]sumValue[N]),
42+
values: make(map[attribute.Distinct]*sumValue[N]),
3443
}
3544
}
3645

37-
func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
38-
s.Lock()
39-
defer s.Unlock()
40-
46+
func (s *valueMap[N]) getOrCreateSumValue(fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) *sumValue[N] {
47+
s.RLock()
4148
attr := s.limit.Attributes(fltrAttr, s.values)
4249
v, ok := s.values[attr.Equivalent()]
50+
s.RUnlock()
4351
if !ok {
44-
v.res = s.newRes(attr)
52+
v = &sumValue[N]{
53+
res: s.newRes(attr),
54+
attrs: attr,
55+
}
56+
s.Lock()
57+
s.values[attr.Equivalent()] = v
58+
s.Unlock()
4559
}
60+
return v
61+
}
4662

47-
v.attrs = attr
48-
v.n += value
63+
func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
64+
v := s.getOrCreateSumValue(fltrAttr, droppedAttr)
4965
v.res.Offer(ctx, value, droppedAttr)
5066

51-
s.values[attr.Equivalent()] = v
67+
ival := uint64(value)
68+
// This case is where the value is an int, or if it is a whole-numbered float.
69+
if float64(ival) == float64(value) {
70+
atomic.AddUint64(&v.nInt, ival)
71+
return
72+
}
73+
74+
// Value must be a float below.
75+
for {
76+
oldBits := atomic.LoadUint64(&v.nFloatBits)
77+
newBits := math.Float64bits(math.Float64frombits(oldBits) + float64(value))
78+
if atomic.CompareAndSwapUint64(&v.nFloatBits, oldBits, newBits) {
79+
return
80+
}
81+
}
5282
}
5383

5484
// newSum returns an aggregator that summarizes a set of measurements as their
@@ -92,7 +122,7 @@ func (s *sum[N]) delta(
92122
dPts[i].Attributes = val.attrs
93123
dPts[i].StartTime = s.start
94124
dPts[i].Time = t
95-
dPts[i].Value = val.n
125+
dPts[i].Value = val.val()
96126
collectExemplars(&dPts[i].Exemplars, val.res.Collect)
97127
i++
98128
}
@@ -129,7 +159,7 @@ func (s *sum[N]) cumulative(
129159
dPts[i].Attributes = value.attrs
130160
dPts[i].StartTime = s.start
131161
dPts[i].Time = t
132-
dPts[i].Value = value.n
162+
dPts[i].Value = value.val()
133163
collectExemplars(&dPts[i].Exemplars, value.res.Collect)
134164
// TODO (#3006): This will use an unbounded amount of memory if there
135165
// are unbounded number of attribute sets being aggregated. Attribute
@@ -189,15 +219,15 @@ func (s *precomputedSum[N]) delta(
189219

190220
var i int
191221
for key, value := range s.values {
192-
delta := value.n - s.reported[key]
222+
delta := value.val() - s.reported[key]
193223

194224
dPts[i].Attributes = value.attrs
195225
dPts[i].StartTime = s.start
196226
dPts[i].Time = t
197227
dPts[i].Value = delta
198228
collectExemplars(&dPts[i].Exemplars, value.res.Collect)
199229

200-
newReported[key] = value.n
230+
newReported[key] = value.val()
201231
i++
202232
}
203233
// Unused attribute sets do not report.
@@ -234,7 +264,7 @@ func (s *precomputedSum[N]) cumulative(
234264
dPts[i].Attributes = val.attrs
235265
dPts[i].StartTime = s.start
236266
dPts[i].Time = t
237-
dPts[i].Value = val.n
267+
dPts[i].Value = val.val()
238268
collectExemplars(&dPts[i].Exemplars, val.res.Collect)
239269

240270
i++

0 commit comments

Comments
 (0)