Skip to content

Commit 6ab1a4d

Browse files
committed
refactor into cumulative and delta sum types
1 parent 749a696 commit 6ab1a4d

File tree

2 files changed

+92
-75
lines changed

2 files changed

+92
-75
lines changed

sdk/metric/internal/aggregate/aggregate.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,12 @@ func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregati
110110

111111
// Sum returns a sum aggregate function input and output.
112112
func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
113-
s := newSum[N](monotonic, b.Temporality, b.AggregationLimit, b.resFunc())
114113
switch b.Temporality {
115114
case metricdata.DeltaTemporality:
115+
s := newDeltaSum[N](monotonic, b.AggregationLimit, b.resFunc())
116116
return b.filter(s.measure), s.delta
117117
default:
118+
s := newCumulativeSum[N](monotonic, b.AggregationLimit, b.resFunc())
118119
return b.filter(s.measure), s.cumulative
119120
}
120121
}

sdk/metric/internal/aggregate/sum.go

Lines changed: 90 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -19,57 +19,34 @@ type sumValue[N int64 | float64] struct {
1919
attrs attribute.Set
2020
}
2121

22-
// valueMap is the storage for sums.
2322
type valueMap[N int64 | float64] struct {
24-
newRes func(attribute.Set) FilteredExemplarReservoir[N]
25-
aggLimit int
26-
27-
// cumulative sums do not reset values during collection, so in that case
28-
// clearValuesOnCollection is false, hcwg is unused, and only values[0]
29-
// and len[0] are used. All other aggregations reset on collection, so we
30-
// use hcwg to swap between the hot and cold maps and len so measurements
31-
// can continue without blocking on collection.
32-
//
33-
// see hotColdWaitGroup for how this works.
34-
clearValuesOnCollection bool
35-
hcwg hotColdWaitGroup
36-
values [2]sync.Map
37-
len [2]atomic.Int64
38-
}
39-
40-
func newValueMap[N int64 | float64](
41-
limit int,
42-
r func(attribute.Set) FilteredExemplarReservoir[N],
43-
clearValuesOnCollection bool,
44-
) *valueMap[N] {
45-
return &valueMap[N]{
46-
newRes: r,
47-
aggLimit: limit,
48-
clearValuesOnCollection: clearValuesOnCollection,
49-
}
23+
values sync.Map
24+
len atomic.Int64
5025
}
5126

52-
func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
53-
hotIdx := uint64(0)
54-
if s.clearValuesOnCollection {
55-
hotIdx = s.hcwg.start()
56-
defer s.hcwg.done(hotIdx)
57-
}
58-
v, ok := s.values[hotIdx].Load(fltrAttr.Equivalent())
27+
func (s *valueMap[N]) measure(
28+
ctx context.Context,
29+
value N,
30+
fltrAttr attribute.Set,
31+
droppedAttr []attribute.KeyValue,
32+
newRes func(attribute.Set) FilteredExemplarReservoir[N],
33+
aggLimit int,
34+
) {
35+
v, ok := s.values.Load(fltrAttr.Equivalent())
5936
if !ok {
6037
// It is possible to exceed the attribute limit if it races with other
6138
// new attribute sets. This is an accepted tradeoff to avoid locking
6239
// for writes.
63-
if s.aggLimit > 0 && s.len[hotIdx].Load() >= int64(s.aggLimit-1) {
40+
if aggLimit > 0 && s.len.Load() >= int64(aggLimit-1) {
6441
fltrAttr = overflowSet
6542
}
6643
var loaded bool
67-
v, loaded = s.values[hotIdx].LoadOrStore(fltrAttr.Equivalent(), &sumValue[N]{
68-
res: s.newRes(fltrAttr),
44+
v, loaded = s.values.LoadOrStore(fltrAttr.Equivalent(), &sumValue[N]{
45+
res: newRes(fltrAttr),
6946
attrs: fltrAttr,
7047
})
7148
if !loaded {
72-
s.len[hotIdx].Add(1)
49+
s.len.Add(1)
7350
}
7451
}
7552
sv := v.(*sumValue[N])
@@ -80,32 +57,41 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.S
8057
sv.res.Offer(ctx, value, droppedAttr)
8158
}
8259

83-
// newSum returns an aggregator that summarizes a set of measurements as their
84-
// arithmetic sum. Each sum is scoped by attributes and the aggregation cycle
85-
// the measurements were made in.
86-
func newSum[N int64 | float64](
60+
// newDeltaSum returns an aggregator that summarizes a set of measurements as
61+
// their arithmetic sum. Each sum is scoped by attributes and the aggregation
62+
// cycle the measurements were made in.
63+
func newDeltaSum[N int64 | float64](
8764
monotonic bool,
88-
temporality metricdata.Temporality,
8965
limit int,
9066
r func(attribute.Set) FilteredExemplarReservoir[N],
91-
) *sum[N] {
92-
clearValuesOnCollection := temporality == metricdata.DeltaTemporality
93-
return &sum[N]{
94-
valueMap: newValueMap[N](limit, r, clearValuesOnCollection),
67+
) *deltaSum[N] {
68+
return &deltaSum[N]{
69+
newRes: r,
70+
aggLimit: limit,
9571
monotonic: monotonic,
9672
start: now(),
9773
}
9874
}
9975

100-
// sum summarizes a set of measurements made as their arithmetic sum.
101-
type sum[N int64 | float64] struct {
102-
*valueMap[N]
76+
// deltaSum is the storage for sums which resets every collection interval.
77+
type deltaSum[N int64 | float64] struct {
78+
newRes func(attribute.Set) FilteredExemplarReservoir[N]
79+
aggLimit int
10380

10481
monotonic bool
10582
start time.Time
83+
84+
hcwg hotColdWaitGroup
85+
hotColdValMap [2]valueMap[N]
10686
}
10787

108-
func (s *sum[N]) delta(
88+
func (s *deltaSum[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
89+
hotIdx := s.hcwg.start()
90+
defer s.hcwg.done(hotIdx)
91+
s.hotColdValMap[hotIdx].measure(ctx, value, fltrAttr, droppedAttr, s.newRes, s.aggLimit)
92+
}
93+
94+
func (s *deltaSum[N]) delta(
10995
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
11096
) int {
11197
t := now()
@@ -120,11 +106,11 @@ func (s *sum[N]) delta(
120106
readIdx := s.hcwg.swapHotAndWait()
121107
// The len will not change while we iterate over values, since we waited
122108
// for all writes to finish to the cold values and len.
123-
n := int(s.len[readIdx].Load())
109+
n := int(s.hotColdValMap[readIdx].len.Load())
124110
dPts := reset(sData.DataPoints, n, n)
125111

126112
var i int
127-
s.values[readIdx].Range(func(_, value any) bool {
113+
s.hotColdValMap[readIdx].values.Range(func(_, value any) bool {
128114
val := value.(*sumValue[N])
129115
collectExemplars(&dPts[i].Exemplars, val.res.Collect)
130116
dPts[i].Attributes = val.attrs
@@ -134,8 +120,8 @@ func (s *sum[N]) delta(
134120
i++
135121
return true
136122
})
137-
s.values[readIdx].Clear()
138-
s.len[readIdx].Store(0)
123+
s.hotColdValMap[readIdx].values.Clear()
124+
s.hotColdValMap[readIdx].len.Store(0)
139125
// The delta collection cycle resets.
140126
s.start = t
141127

@@ -145,7 +131,43 @@ func (s *sum[N]) delta(
145131
return i
146132
}
147133

148-
func (s *sum[N]) cumulative(
134+
// newCumulativeSum returns an aggregator that summarizes a set of measurements
135+
// as their arithmetic sum. Each sum is scoped by attributes and the
136+
// aggregation cycle the measurements were made in.
137+
func newCumulativeSum[N int64 | float64](
138+
monotonic bool,
139+
limit int,
140+
r func(attribute.Set) FilteredExemplarReservoir[N],
141+
) *cumulativeSum[N] {
142+
return &cumulativeSum[N]{
143+
newRes: r,
144+
aggLimit: limit,
145+
monotonic: monotonic,
146+
start: now(),
147+
}
148+
}
149+
150+
// deltaSum is the storage for sums which never reset.
151+
type cumulativeSum[N int64 | float64] struct {
152+
newRes func(attribute.Set) FilteredExemplarReservoir[N]
153+
aggLimit int
154+
155+
monotonic bool
156+
start time.Time
157+
158+
valMap valueMap[N]
159+
}
160+
161+
func (s *cumulativeSum[N]) measure(
162+
ctx context.Context,
163+
value N,
164+
fltrAttr attribute.Set,
165+
droppedAttr []attribute.KeyValue,
166+
) {
167+
s.valMap.measure(ctx, value, fltrAttr, droppedAttr, s.newRes, s.aggLimit)
168+
}
169+
170+
func (s *cumulativeSum[N]) cumulative(
149171
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
150172
) int {
151173
t := now()
@@ -156,13 +178,12 @@ func (s *sum[N]) cumulative(
156178
sData.Temporality = metricdata.CumulativeTemporality
157179
sData.IsMonotonic = s.monotonic
158180

159-
readIdx := 0
160181
// Values are being concurrently written while we iterate, so only use the
161182
// current length for capacity.
162-
dPts := reset(sData.DataPoints, 0, int(s.len[readIdx].Load()))
183+
dPts := reset(sData.DataPoints, 0, int(s.valMap.len.Load()))
163184

164185
var i int
165-
s.values[readIdx].Range(func(_, value any) bool {
186+
s.valMap.values.Range(func(_, value any) bool {
166187
val := value.(*sumValue[N])
167188
newPt := metricdata.DataPoint[N]{
168189
Attributes: val.attrs,
@@ -195,18 +216,13 @@ func newPrecomputedSum[N int64 | float64](
195216
r func(attribute.Set) FilteredExemplarReservoir[N],
196217
) *precomputedSum[N] {
197218
return &precomputedSum[N]{
198-
valueMap: newValueMap[N](limit, r, true),
199-
monotonic: monotonic,
200-
start: now(),
219+
deltaSum: newDeltaSum(monotonic, limit, r),
201220
}
202221
}
203222

204223
// precomputedSum summarizes a set of observations as their arithmetic sum.
205224
type precomputedSum[N int64 | float64] struct {
206-
*valueMap[N]
207-
208-
monotonic bool
209-
start time.Time
225+
*deltaSum[N]
210226

211227
reported map[any]N
212228
}
@@ -227,11 +243,11 @@ func (s *precomputedSum[N]) delta(
227243
readIdx := s.hcwg.swapHotAndWait()
228244
// The len will not change while we iterate over values, since we waited
229245
// for all writes to finish to the cold values and len.
230-
n := int(s.len[readIdx].Load())
246+
n := int(s.hotColdValMap[readIdx].len.Load())
231247
dPts := reset(sData.DataPoints, n, n)
232248

233249
var i int
234-
s.values[readIdx].Range(func(key, value any) bool {
250+
s.hotColdValMap[readIdx].values.Range(func(key, value any) bool {
235251
val := value.(*sumValue[N])
236252
n := val.n.load()
237253

@@ -245,8 +261,8 @@ func (s *precomputedSum[N]) delta(
245261
i++
246262
return true
247263
})
248-
s.values[readIdx].Clear()
249-
s.len[readIdx].Store(0)
264+
s.hotColdValMap[readIdx].values.Clear()
265+
s.hotColdValMap[readIdx].len.Store(0)
250266
s.reported = newReported
251267
// The delta collection cycle resets.
252268
s.start = t
@@ -272,11 +288,11 @@ func (s *precomputedSum[N]) cumulative(
272288
readIdx := s.hcwg.swapHotAndWait()
273289
// The len will not change while we iterate over values, since we waited
274290
// for all writes to finish to the cold values and len.
275-
n := int(s.len[readIdx].Load())
291+
n := int(s.hotColdValMap[readIdx].len.Load())
276292
dPts := reset(sData.DataPoints, n, n)
277293

278294
var i int
279-
s.values[readIdx].Range(func(_, value any) bool {
295+
s.hotColdValMap[readIdx].values.Range(func(_, value any) bool {
280296
val := value.(*sumValue[N])
281297
collectExemplars(&dPts[i].Exemplars, val.res.Collect)
282298
dPts[i].Attributes = val.attrs
@@ -286,8 +302,8 @@ func (s *precomputedSum[N]) cumulative(
286302
i++
287303
return true
288304
})
289-
s.values[readIdx].Clear()
290-
s.len[readIdx].Store(0)
305+
s.hotColdValMap[readIdx].values.Clear()
306+
s.hotColdValMap[readIdx].len.Store(0)
291307

292308
sData.DataPoints = dPts
293309
*dest = sData

0 commit comments

Comments
 (0)