Skip to content

Commit f4f3a87

Browse files
committed
RWLock for sum and last value aggregations
1 parent eda888f commit f4f3a87

File tree

7 files changed

+84
-40
lines changed

7 files changed

+84
-40
lines changed

sdk/metric/internal/aggregate/exponential_histogram.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ func newExponentialHistogram[N int64 | float64](
301301
maxScale: maxScale,
302302

303303
newRes: r,
304-
limit: newLimiter[*expoHistogramDataPoint[N]](limit),
304+
limit: newLimiter[expoHistogramDataPoint[N]](limit),
305305
values: make(map[attribute.Distinct]*expoHistogramDataPoint[N]),
306306

307307
start: now(),
@@ -317,7 +317,7 @@ type expoHistogram[N int64 | float64] struct {
317317
maxScale int32
318318

319319
newRes func(attribute.Set) FilteredExemplarReservoir[N]
320-
limit limiter[*expoHistogramDataPoint[N]]
320+
limit limiter[expoHistogramDataPoint[N]]
321321
values map[attribute.Distinct]*expoHistogramDataPoint[N]
322322
valuesMu sync.Mutex
323323

sdk/metric/internal/aggregate/filtered_reservoir.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg
55

66
import (
77
"context"
8+
"sync"
89
"time"
910

1011
"go.opentelemetry.io/otel/attribute"
@@ -27,6 +28,7 @@ type FilteredExemplarReservoir[N int64 | float64] interface {
2728

2829
// filteredExemplarReservoir handles the pre-sampled exemplar of measurements made.
2930
type filteredExemplarReservoir[N int64 | float64] struct {
31+
mu sync.Mutex
3032
filter exemplar.Filter
3133
reservoir exemplar.Reservoir
3234
}
@@ -45,6 +47,8 @@ func NewFilteredExemplarReservoir[N int64 | float64](
4547

4648
func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) {
4749
if f.filter(ctx) {
50+
f.mu.Lock()
51+
defer f.mu.Unlock()
4852
// only record the current time if we are sampling this measurement.
4953
f.reservoir.Offer(ctx, time.Now(), exemplar.NewValue(val), attr)
5054
}

sdk/metric/internal/aggregate/histogram.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ type histValues[N int64 | float64] struct {
4848
bounds []float64
4949

5050
newRes func(attribute.Set) FilteredExemplarReservoir[N]
51-
limit limiter[*buckets[N]]
51+
limit limiter[buckets[N]]
5252
values map[attribute.Distinct]*buckets[N]
5353
valuesMu sync.Mutex
5454
}
@@ -69,7 +69,7 @@ func newHistValues[N int64 | float64](
6969
noSum: noSum,
7070
bounds: b,
7171
newRes: r,
72-
limit: newLimiter[*buckets[N]](limit),
72+
limit: newLimiter[buckets[N]](limit),
7373
values: make(map[attribute.Distinct]*buckets[N]),
7474
}
7575
}

sdk/metric/internal/aggregate/lastvalue.go

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg
66
import (
77
"context"
88
"sync"
9+
"sync/atomic"
910
"time"
1011

1112
"go.opentelemetry.io/otel/attribute"
@@ -15,44 +16,50 @@ import (
1516
// datapoint is timestamped measurement data.
1617
type datapoint[N int64 | float64] struct {
1718
attrs attribute.Set
18-
value N
19+
value atomic.Value
1920
res FilteredExemplarReservoir[N]
2021
}
2122

2223
func newLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *lastValue[N] {
2324
return &lastValue[N]{
2425
newRes: r,
2526
limit: newLimiter[datapoint[N]](limit),
26-
values: make(map[attribute.Distinct]datapoint[N]),
27+
values: make(map[attribute.Distinct]*datapoint[N]),
2728
start: now(),
2829
}
2930
}
3031

3132
// lastValue summarizes a set of measurements as the last one made.
3233
type lastValue[N int64 | float64] struct {
33-
sync.Mutex
34+
sync.RWMutex
3435

3536
newRes func(attribute.Set) FilteredExemplarReservoir[N]
3637
limit limiter[datapoint[N]]
37-
values map[attribute.Distinct]datapoint[N]
38+
values map[attribute.Distinct]*datapoint[N]
3839
start time.Time
3940
}
4041

41-
func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
42-
s.Lock()
43-
defer s.Unlock()
44-
42+
func (s *lastValue[N]) getOrCreateDatapoint(fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) *datapoint[N] {
43+
s.RLock()
4544
attr := s.limit.Attributes(fltrAttr, s.values)
4645
d, ok := s.values[attr.Equivalent()]
46+
s.RUnlock()
4747
if !ok {
48-
d.res = s.newRes(attr)
48+
d = &datapoint[N]{
49+
res: s.newRes(attr),
50+
attrs: attr,
51+
}
52+
s.Lock()
53+
s.values[attr.Equivalent()] = d
54+
s.Unlock()
4955
}
56+
return d
57+
}
5058

51-
d.attrs = attr
52-
d.value = value
59+
func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
60+
d := s.getOrCreateDatapoint(fltrAttr, droppedAttr)
5361
d.res.Offer(ctx, value, droppedAttr)
54-
55-
s.values[attr.Equivalent()] = d
62+
d.value.Store(value)
5663
}
5764

5865
func (s *lastValue[N]) delta(
@@ -109,7 +116,7 @@ func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) in
109116
(*dest)[i].Attributes = v.attrs
110117
(*dest)[i].StartTime = s.start
111118
(*dest)[i].Time = t
112-
(*dest)[i].Value = v.value
119+
(*dest)[i].Value = v.value.Load().(N)
113120
collectExemplars(&(*dest)[i].Exemplars, v.res.Collect)
114121
i++
115122
}

sdk/metric/internal/aggregate/limit.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func newLimiter[V any](aggregation int) limiter[V] {
3030
// aggregation cardinality limit for the existing measurements. If it will,
3131
// overflowSet is returned. Otherwise, if it will not exceed the limit, or the
3232
// limit is not set (limit <= 0), attr is returned.
33-
func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Distinct]V) attribute.Set {
33+
func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Distinct]*V) attribute.Set {
3434
if l.aggLimit > 0 {
3535
_, exists := measurements[attrs.Equivalent()]
3636
if !exists && len(measurements) >= l.aggLimit-1 {

sdk/metric/internal/aggregate/limit_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ import (
1212
)
1313

1414
func TestLimiterAttributes(t *testing.T) {
15-
m := map[attribute.Distinct]struct{}{alice.Equivalent(): {}}
15+
var val struct{}
16+
m := map[attribute.Distinct]*struct{}{alice.Equivalent(): &val}
1617
t.Run("NoLimit", func(t *testing.T) {
1718
l := newLimiter[struct{}](0)
1819
assert.Equal(t, alice, l.Attributes(alice, m))
@@ -43,7 +44,8 @@ func TestLimiterAttributes(t *testing.T) {
4344
var limitedAttr attribute.Set
4445

4546
func BenchmarkLimiterAttributes(b *testing.B) {
46-
m := map[attribute.Distinct]struct{}{alice.Equivalent(): {}}
47+
var val struct{}
48+
m := map[attribute.Distinct]*struct{}{alice.Equivalent(): &val}
4749
l := newLimiter[struct{}](2)
4850

4951
b.ReportAllocs()

sdk/metric/internal/aggregate/sum.go

Lines changed: 50 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,50 +5,81 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg
55

66
import (
77
"context"
8+
"math"
89
"sync"
10+
"sync/atomic"
911
"time"
1012

1113
"go.opentelemetry.io/otel/attribute"
1214
"go.opentelemetry.io/otel/sdk/metric/metricdata"
1315
)
1416

1517
type sumValue[N int64 | float64] struct {
16-
n N
17-
res FilteredExemplarReservoir[N]
18-
attrs attribute.Set
18+
nFloatBits uint64
19+
nInt uint64
20+
value atomic.Value // Contains N
21+
res FilteredExemplarReservoir[N]
22+
attrs attribute.Set
23+
}
24+
25+
func (s *sumValue[N]) val() N {
26+
fval := math.Float64frombits(atomic.LoadUint64(&s.nFloatBits))
27+
ival := atomic.LoadUint64(&s.nInt)
28+
return N(fval + float64(ival))
1929
}
2030

2131
// valueMap is the storage for sums.
2232
type valueMap[N int64 | float64] struct {
23-
sync.Mutex
33+
sync.RWMutex
2434
newRes func(attribute.Set) FilteredExemplarReservoir[N]
2535
limit limiter[sumValue[N]]
26-
values map[attribute.Distinct]sumValue[N]
36+
values map[attribute.Distinct]*sumValue[N]
2737
}
2838

2939
func newValueMap[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *valueMap[N] {
3040
return &valueMap[N]{
3141
newRes: r,
3242
limit: newLimiter[sumValue[N]](limit),
33-
values: make(map[attribute.Distinct]sumValue[N]),
43+
values: make(map[attribute.Distinct]*sumValue[N]),
3444
}
3545
}
3646

37-
func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
38-
s.Lock()
39-
defer s.Unlock()
40-
47+
func (s *valueMap[N]) getOrCreateSumValue(fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) *sumValue[N] {
48+
s.RLock()
4149
attr := s.limit.Attributes(fltrAttr, s.values)
4250
v, ok := s.values[attr.Equivalent()]
51+
s.RUnlock()
4352
if !ok {
44-
v.res = s.newRes(attr)
53+
v = &sumValue[N]{
54+
res: s.newRes(attr),
55+
attrs: attr,
56+
}
57+
s.Lock()
58+
s.values[attr.Equivalent()] = v
59+
s.Unlock()
4560
}
61+
return v
62+
}
4663

47-
v.attrs = attr
48-
v.n += value
64+
func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
65+
v := s.getOrCreateSumValue(fltrAttr, droppedAttr)
4966
v.res.Offer(ctx, value, droppedAttr)
5067

51-
s.values[attr.Equivalent()] = v
68+
ival := uint64(value)
69+
// This case is where the value is an int, or if it is a whole-numbered float.
70+
if float64(ival) == float64(value) {
71+
atomic.AddUint64(&v.nInt, ival)
72+
return
73+
}
74+
75+
// Value must be a float below.
76+
for {
77+
oldBits := atomic.LoadUint64(&v.nFloatBits)
78+
newBits := math.Float64bits(math.Float64frombits(oldBits) + float64(value))
79+
if atomic.CompareAndSwapUint64(&v.nFloatBits, oldBits, newBits) {
80+
return
81+
}
82+
}
5283
}
5384

5485
// newSum returns an aggregator that summarizes a set of measurements as their
@@ -92,7 +123,7 @@ func (s *sum[N]) delta(
92123
dPts[i].Attributes = val.attrs
93124
dPts[i].StartTime = s.start
94125
dPts[i].Time = t
95-
dPts[i].Value = val.n
126+
dPts[i].Value = val.val()
96127
collectExemplars(&dPts[i].Exemplars, val.res.Collect)
97128
i++
98129
}
@@ -129,7 +160,7 @@ func (s *sum[N]) cumulative(
129160
dPts[i].Attributes = value.attrs
130161
dPts[i].StartTime = s.start
131162
dPts[i].Time = t
132-
dPts[i].Value = value.n
163+
dPts[i].Value = value.val()
133164
collectExemplars(&dPts[i].Exemplars, value.res.Collect)
134165
// TODO (#3006): This will use an unbounded amount of memory if there
135166
// are unbounded number of attribute sets being aggregated. Attribute
@@ -189,15 +220,15 @@ func (s *precomputedSum[N]) delta(
189220

190221
var i int
191222
for key, value := range s.values {
192-
delta := value.n - s.reported[key]
223+
delta := value.val() - s.reported[key]
193224

194225
dPts[i].Attributes = value.attrs
195226
dPts[i].StartTime = s.start
196227
dPts[i].Time = t
197228
dPts[i].Value = delta
198229
collectExemplars(&dPts[i].Exemplars, value.res.Collect)
199230

200-
newReported[key] = value.n
231+
newReported[key] = value.val()
201232
i++
202233
}
203234
// Unused attribute sets do not report.
@@ -234,7 +265,7 @@ func (s *precomputedSum[N]) cumulative(
234265
dPts[i].Attributes = val.attrs
235266
dPts[i].StartTime = s.start
236267
dPts[i].Time = t
237-
dPts[i].Value = val.n
268+
dPts[i].Value = val.val()
238269
collectExemplars(&dPts[i].Exemplars, val.res.Collect)
239270

240271
i++

0 commit comments

Comments
 (0)