@@ -19,57 +19,34 @@ type sumValue[N int64 | float64] struct {
1919 attrs attribute.Set
2020}
2121
22- // valueMap is the storage for sums.
2322type valueMap [N int64 | float64 ] struct {
24- newRes func (attribute.Set ) FilteredExemplarReservoir [N ]
25- aggLimit int
26-
27- // cumulative sums do not reset values during collection, so in that case
28- // clearValuesOnCollection is false, hcwg is unused, and only values[0]
29- // and len[0] are used. All other aggregations reset on collection, so we
30- // use hcwg to swap between the hot and cold maps and len so measurements
31- // can continue without blocking on collection.
32- //
33- // see hotColdWaitGroup for how this works.
34- clearValuesOnCollection bool
35- hcwg hotColdWaitGroup
36- values [2 ]sync.Map
37- len [2 ]atomic.Int64
38- }
39-
40- func newValueMap [N int64 | float64 ](
41- limit int ,
42- r func (attribute.Set ) FilteredExemplarReservoir [N ],
43- clearValuesOnCollection bool ,
44- ) * valueMap [N ] {
45- return & valueMap [N ]{
46- newRes : r ,
47- aggLimit : limit ,
48- clearValuesOnCollection : clearValuesOnCollection ,
49- }
23+ values sync.Map
24+ len atomic.Int64
5025}
5126
52- func (s * valueMap [N ]) measure (ctx context.Context , value N , fltrAttr attribute.Set , droppedAttr []attribute.KeyValue ) {
53- hotIdx := uint64 (0 )
54- if s .clearValuesOnCollection {
55- hotIdx = s .hcwg .start ()
56- defer s .hcwg .done (hotIdx )
57- }
58- v , ok := s .values [hotIdx ].Load (fltrAttr .Equivalent ())
27+ func (s * valueMap [N ]) measure (
28+ ctx context.Context ,
29+ value N ,
30+ fltrAttr attribute.Set ,
31+ droppedAttr []attribute.KeyValue ,
32+ newRes func (attribute.Set ) FilteredExemplarReservoir [N ],
33+ aggLimit int ,
34+ ) {
35+ v , ok := s .values .Load (fltrAttr .Equivalent ())
5936 if ! ok {
6037 // It is possible to exceed the attribute limit if it races with other
6138 // new attribute sets. This is an accepted tradeoff to avoid locking
6239 // for writes.
63- if s . aggLimit > 0 && s .len [ hotIdx ] .Load () >= int64 (s . aggLimit - 1 ) {
40+ if aggLimit > 0 && s .len .Load () >= int64 (aggLimit - 1 ) {
6441 fltrAttr = overflowSet
6542 }
6643 var loaded bool
67- v , loaded = s .values [ hotIdx ] .LoadOrStore (fltrAttr .Equivalent (), & sumValue [N ]{
68- res : s . newRes (fltrAttr ),
44+ v , loaded = s .values .LoadOrStore (fltrAttr .Equivalent (), & sumValue [N ]{
45+ res : newRes (fltrAttr ),
6946 attrs : fltrAttr ,
7047 })
7148 if ! loaded {
72- s .len [ hotIdx ] .Add (1 )
49+ s .len .Add (1 )
7350 }
7451 }
7552 sv := v .(* sumValue [N ])
@@ -80,32 +57,41 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.S
8057 sv .res .Offer (ctx , value , droppedAttr )
8158}
8259
83- // newSum returns an aggregator that summarizes a set of measurements as their
84- // arithmetic sum. Each sum is scoped by attributes and the aggregation cycle
85- // the measurements were made in.
86- func newSum [N int64 | float64 ](
60+ // newDeltaSum returns an aggregator that summarizes a set of measurements as
61+ // their arithmetic sum. Each sum is scoped by attributes and the aggregation
62+ // cycle the measurements were made in.
63+ func newDeltaSum [N int64 | float64 ](
8764 monotonic bool ,
88- temporality metricdata.Temporality ,
8965 limit int ,
9066 r func (attribute.Set ) FilteredExemplarReservoir [N ],
91- ) * sum [N ] {
92- clearValuesOnCollection := temporality == metricdata . DeltaTemporality
93- return & sum [ N ]{
94- valueMap : newValueMap [ N ]( limit , r , clearValuesOnCollection ) ,
67+ ) * deltaSum [N ] {
68+ return & deltaSum [ N ]{
69+ newRes : r ,
70+ aggLimit : limit ,
9571 monotonic : monotonic ,
9672 start : now (),
9773 }
9874}
9975
100- // sum summarizes a set of measurements made as their arithmetic sum.
101- type sum [N int64 | float64 ] struct {
102- * valueMap [N ]
76+ // deltaSum is the storage for sums which resets every collection interval.
77+ type deltaSum [N int64 | float64 ] struct {
78+ newRes func (attribute.Set ) FilteredExemplarReservoir [N ]
79+ aggLimit int
10380
10481 monotonic bool
10582 start time.Time
83+
84+ hcwg hotColdWaitGroup
85+ hotColdValMap [2 ]valueMap [N ]
10686}
10787
108- func (s * sum [N ]) delta (
88+ func (s * deltaSum [N ]) measure (ctx context.Context , value N , fltrAttr attribute.Set , droppedAttr []attribute.KeyValue ) {
89+ hotIdx := s .hcwg .start ()
90+ defer s .hcwg .done (hotIdx )
91+ s .hotColdValMap [hotIdx ].measure (ctx , value , fltrAttr , droppedAttr , s .newRes , s .aggLimit )
92+ }
93+
94+ func (s * deltaSum [N ]) delta (
10995 dest * metricdata.Aggregation , //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
11096) int {
11197 t := now ()
@@ -120,11 +106,11 @@ func (s *sum[N]) delta(
120106 readIdx := s .hcwg .swapHotAndWait ()
121107 // The len will not change while we iterate over values, since we waited
122108 // for all writes to finish to the cold values and len.
123- n := int (s .len [readIdx ].Load ())
109+ n := int (s .hotColdValMap [readIdx ]. len .Load ())
124110 dPts := reset (sData .DataPoints , n , n )
125111
126112 var i int
127- s .values [readIdx ].Range (func (_ , value any ) bool {
113+ s .hotColdValMap [readIdx ]. values .Range (func (_ , value any ) bool {
128114 val := value .(* sumValue [N ])
129115 collectExemplars (& dPts [i ].Exemplars , val .res .Collect )
130116 dPts [i ].Attributes = val .attrs
@@ -134,8 +120,8 @@ func (s *sum[N]) delta(
134120 i ++
135121 return true
136122 })
137- s .values [readIdx ].Clear ()
138- s .len [readIdx ].Store (0 )
123+ s .hotColdValMap [readIdx ]. values .Clear ()
124+ s .hotColdValMap [readIdx ]. len .Store (0 )
139125 // The delta collection cycle resets.
140126 s .start = t
141127
@@ -145,7 +131,38 @@ func (s *sum[N]) delta(
145131 return i
146132}
147133
148- func (s * sum [N ]) cumulative (
134+ // newCumulativeSum returns an aggregator that summarizes a set of measurements
135+ // as their arithmetic sum. Each sum is scoped by attributes and the
136+ // aggregation cycle the measurements were made in.
137+ func newCumulativeSum [N int64 | float64 ](
138+ monotonic bool ,
139+ limit int ,
140+ r func (attribute.Set ) FilteredExemplarReservoir [N ],
141+ ) * cumulativeSum [N ] {
142+ return & cumulativeSum [N ]{
143+ newRes : r ,
144+ aggLimit : limit ,
145+ monotonic : monotonic ,
146+ start : now (),
147+ }
148+ }
149+
150+ // deltaSum is the storage for sums which never reset.
151+ type cumulativeSum [N int64 | float64 ] struct {
152+ newRes func (attribute.Set ) FilteredExemplarReservoir [N ]
153+ aggLimit int
154+
155+ monotonic bool
156+ start time.Time
157+
158+ valMap valueMap [N ]
159+ }
160+
161+ func (s * cumulativeSum [N ]) measure (ctx context.Context , value N , fltrAttr attribute.Set , droppedAttr []attribute.KeyValue ) {
162+ s .valMap .measure (ctx , value , fltrAttr , droppedAttr , s .newRes , s .aggLimit )
163+ }
164+
165+ func (s * cumulativeSum [N ]) cumulative (
149166 dest * metricdata.Aggregation , //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
150167) int {
151168 t := now ()
@@ -156,13 +173,12 @@ func (s *sum[N]) cumulative(
156173 sData .Temporality = metricdata .CumulativeTemporality
157174 sData .IsMonotonic = s .monotonic
158175
159- readIdx := 0
160176 // Values are being concurrently written while we iterate, so only use the
161177 // current length for capacity.
162- dPts := reset (sData .DataPoints , 0 , int (s .len [ readIdx ] .Load ()))
178+ dPts := reset (sData .DataPoints , 0 , int (s .valMap . len .Load ()))
163179
164180 var i int
165- s .values [ readIdx ] .Range (func (_ , value any ) bool {
181+ s .valMap . values .Range (func (_ , value any ) bool {
166182 val := value .(* sumValue [N ])
167183 newPt := metricdata.DataPoint [N ]{
168184 Attributes : val .attrs ,
@@ -195,18 +211,13 @@ func newPrecomputedSum[N int64 | float64](
195211 r func (attribute.Set ) FilteredExemplarReservoir [N ],
196212) * precomputedSum [N ] {
197213 return & precomputedSum [N ]{
198- valueMap : newValueMap [N ](limit , r , true ),
199- monotonic : monotonic ,
200- start : now (),
214+ deltaSum : newDeltaSum (monotonic , limit , r ),
201215 }
202216}
203217
204218// precomputedSum summarizes a set of observations as their arithmetic sum.
205219type precomputedSum [N int64 | float64 ] struct {
206- * valueMap [N ]
207-
208- monotonic bool
209- start time.Time
220+ * deltaSum [N ]
210221
211222 reported map [any ]N
212223}
@@ -227,11 +238,11 @@ func (s *precomputedSum[N]) delta(
227238 readIdx := s .hcwg .swapHotAndWait ()
228239 // The len will not change while we iterate over values, since we waited
229240 // for all writes to finish to the cold values and len.
230- n := int (s .len [readIdx ].Load ())
241+ n := int (s .hotColdValMap [readIdx ]. len .Load ())
231242 dPts := reset (sData .DataPoints , n , n )
232243
233244 var i int
234- s .values [readIdx ].Range (func (key , value any ) bool {
245+ s .hotColdValMap [readIdx ]. values .Range (func (key , value any ) bool {
235246 val := value .(* sumValue [N ])
236247 n := val .n .load ()
237248
@@ -245,8 +256,8 @@ func (s *precomputedSum[N]) delta(
245256 i ++
246257 return true
247258 })
248- s .values [readIdx ].Clear ()
249- s .len [readIdx ].Store (0 )
259+ s .hotColdValMap [readIdx ]. values .Clear ()
260+ s .hotColdValMap [readIdx ]. len .Store (0 )
250261 s .reported = newReported
251262 // The delta collection cycle resets.
252263 s .start = t
@@ -272,11 +283,11 @@ func (s *precomputedSum[N]) cumulative(
272283 readIdx := s .hcwg .swapHotAndWait ()
273284 // The len will not change while we iterate over values, since we waited
274285 // for all writes to finish to the cold values and len.
275- n := int (s .len [readIdx ].Load ())
286+ n := int (s .hotColdValMap [readIdx ]. len .Load ())
276287 dPts := reset (sData .DataPoints , n , n )
277288
278289 var i int
279- s .values [readIdx ].Range (func (_ , value any ) bool {
290+ s .hotColdValMap [readIdx ]. values .Range (func (_ , value any ) bool {
280291 val := value .(* sumValue [N ])
281292 collectExemplars (& dPts [i ].Exemplars , val .res .Collect )
282293 dPts [i ].Attributes = val .attrs
@@ -286,8 +297,8 @@ func (s *precomputedSum[N]) cumulative(
286297 i ++
287298 return true
288299 })
289- s .values [readIdx ].Clear ()
290- s .len [readIdx ].Store (0 )
300+ s .hotColdValMap [readIdx ]. values .Clear ()
301+ s .hotColdValMap [readIdx ]. len .Store (0 )
291302
292303 sData .DataPoints = dPts
293304 * dest = sData
0 commit comments