@@ -5,8 +5,6 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg
55
66import (
77 "context"
8- "sync"
9- "sync/atomic"
108 "time"
119
1210 "go.opentelemetry.io/otel/attribute"
@@ -20,36 +18,22 @@ type sumValue[N int64 | float64] struct {
2018}
2119
2220type valueMap [N int64 | float64 ] struct {
23- values sync. Map
24- len atomic. Int64
21+ values limitedSyncMap
22+ newRes func (attribute. Set ) FilteredExemplarReservoir [ N ]
2523}
2624
2725func (s * valueMap [N ]) measure (
2826 ctx context.Context ,
2927 value N ,
3028 fltrAttr attribute.Set ,
3129 droppedAttr []attribute.KeyValue ,
32- newRes func (attribute.Set ) FilteredExemplarReservoir [N ],
33- aggLimit int ,
3430) {
35- v , ok := s .values .Load (fltrAttr .Equivalent ())
36- if ! ok {
37- // It is possible to exceed the attribute limit if it races with other
38- // new attribute sets. This is an accepted tradeoff to avoid locking
39- // for writes.
40- if aggLimit > 0 && s .len .Load () >= int64 (aggLimit - 1 ) {
41- fltrAttr = overflowSet
31+ sv := s .values .LoadOrStoreAttr (fltrAttr , func (attr attribute.Set ) any {
32+ return & sumValue [N ]{
33+ res : s .newRes (attr ),
34+ attrs : attr ,
4235 }
43- var loaded bool
44- v , loaded = s .values .LoadOrStore (fltrAttr .Equivalent (), & sumValue [N ]{
45- res : newRes (fltrAttr ),
46- attrs : fltrAttr ,
47- })
48- if ! loaded {
49- s .len .Add (1 )
50- }
51- }
52- sv := v .(* sumValue [N ])
36+ }).(* sumValue [N ])
5337 sv .n .add (value )
5438 // It is possible for collection to race with measurement and observe the
5539 // exemplar in the batch of metrics after the add() for cumulative sums.
@@ -66,18 +50,23 @@ func newDeltaSum[N int64 | float64](
6650 r func (attribute.Set ) FilteredExemplarReservoir [N ],
6751) * deltaSum [N ] {
6852 return & deltaSum [N ]{
69- newRes : r ,
70- aggLimit : limit ,
7153 monotonic : monotonic ,
7254 start : now (),
55+ hotColdValMap : [2 ]valueMap [N ]{
56+ {
57+ values : limitedSyncMap {aggLimit : limit },
58+ newRes : r ,
59+ },
60+ {
61+ values : limitedSyncMap {aggLimit : limit },
62+ newRes : r ,
63+ },
64+ },
7365 }
7466}
7567
7668// deltaSum is the storage for sums which resets every collection interval.
7769type deltaSum [N int64 | float64 ] struct {
78- newRes func (attribute.Set ) FilteredExemplarReservoir [N ]
79- aggLimit int
80-
8170 monotonic bool
8271 start time.Time
8372
@@ -88,7 +77,7 @@ type deltaSum[N int64 | float64] struct {
8877func (s * deltaSum [N ]) measure (ctx context.Context , value N , fltrAttr attribute.Set , droppedAttr []attribute.KeyValue ) {
8978 hotIdx := s .hcwg .start ()
9079 defer s .hcwg .done (hotIdx )
91- s .hotColdValMap [hotIdx ].measure (ctx , value , fltrAttr , droppedAttr , s . newRes , s . aggLimit )
80+ s .hotColdValMap [hotIdx ].measure (ctx , value , fltrAttr , droppedAttr )
9281}
9382
9483func (s * deltaSum [N ]) delta (
@@ -106,7 +95,7 @@ func (s *deltaSum[N]) delta(
10695 readIdx := s .hcwg .swapHotAndWait ()
10796 // The len will not change while we iterate over values, since we waited
10897 // for all writes to finish to the cold values and len.
109- n := int ( s .hotColdValMap [readIdx ].len . Load () )
98+ n := s .hotColdValMap [readIdx ].values . Len ( )
11099 dPts := reset (sData .DataPoints , n , n )
111100
112101 var i int
@@ -121,7 +110,6 @@ func (s *deltaSum[N]) delta(
121110 return true
122111 })
123112 s .hotColdValMap [readIdx ].values .Clear ()
124- s .hotColdValMap [readIdx ].len .Store (0 )
125113 // The delta collection cycle resets.
126114 s .start = t
127115
@@ -140,31 +128,21 @@ func newCumulativeSum[N int64 | float64](
140128 r func (attribute.Set ) FilteredExemplarReservoir [N ],
141129) * cumulativeSum [N ] {
142130 return & cumulativeSum [N ]{
143- newRes : r ,
144- aggLimit : limit ,
145131 monotonic : monotonic ,
146132 start : now (),
133+ valueMap : valueMap [N ]{
134+ values : limitedSyncMap {aggLimit : limit },
135+ newRes : r ,
136+ },
147137 }
148138}
149139
150140// deltaSum is the storage for sums which never reset.
151141type cumulativeSum [N int64 | float64 ] struct {
152- newRes func (attribute.Set ) FilteredExemplarReservoir [N ]
153- aggLimit int
154-
155142 monotonic bool
156143 start time.Time
157144
158- valMap valueMap [N ]
159- }
160-
161- func (s * cumulativeSum [N ]) measure (
162- ctx context.Context ,
163- value N ,
164- fltrAttr attribute.Set ,
165- droppedAttr []attribute.KeyValue ,
166- ) {
167- s .valMap .measure (ctx , value , fltrAttr , droppedAttr , s .newRes , s .aggLimit )
145+ valueMap [N ]
168146}
169147
170148func (s * cumulativeSum [N ]) cumulative (
@@ -180,10 +158,10 @@ func (s *cumulativeSum[N]) cumulative(
180158
181159 // Values are being concurrently written while we iterate, so only use the
182160 // current length for capacity.
183- dPts := reset (sData .DataPoints , 0 , int ( s . valMap . len . Load () ))
161+ dPts := reset (sData .DataPoints , 0 , s . valueMap . values . Len ( ))
184162
185163 var i int
186- s .valMap . values .Range (func (_ , value any ) bool {
164+ s .values .Range (func (_ , value any ) bool {
187165 val := value .(* sumValue [N ])
188166 newPt := metricdata.DataPoint [N ]{
189167 Attributes : val .attrs ,
@@ -243,7 +221,7 @@ func (s *precomputedSum[N]) delta(
243221 readIdx := s .hcwg .swapHotAndWait ()
244222 // The len will not change while we iterate over values, since we waited
245223 // for all writes to finish to the cold values and len.
246- n := int ( s .hotColdValMap [readIdx ].len . Load () )
224+ n := s .hotColdValMap [readIdx ].values . Len ( )
247225 dPts := reset (sData .DataPoints , n , n )
248226
249227 var i int
@@ -262,7 +240,6 @@ func (s *precomputedSum[N]) delta(
262240 return true
263241 })
264242 s .hotColdValMap [readIdx ].values .Clear ()
265- s .hotColdValMap [readIdx ].len .Store (0 )
266243 s .reported = newReported
267244 // The delta collection cycle resets.
268245 s .start = t
@@ -288,7 +265,7 @@ func (s *precomputedSum[N]) cumulative(
288265 readIdx := s .hcwg .swapHotAndWait ()
289266 // The len will not change while we iterate over values, since we waited
290267 // for all writes to finish to the cold values and len.
291- n := int ( s .hotColdValMap [readIdx ].len . Load () )
268+ n := s .hotColdValMap [readIdx ].values . Len ( )
292269 dPts := reset (sData .DataPoints , n , n )
293270
294271 var i int
@@ -303,7 +280,6 @@ func (s *precomputedSum[N]) cumulative(
303280 return true
304281 })
305282 s .hotColdValMap [readIdx ].values .Clear ()
306- s .hotColdValMap [readIdx ].len .Store (0 )
307283
308284 sData .DataPoints = dPts
309285 * dest = sData
0 commit comments