@@ -8,54 +8,66 @@ import (
88 "slices"
99 "sort"
1010 "sync"
11+ "sync/atomic"
1112 "time"
1213
1314 "go.opentelemetry.io/otel/attribute"
1415 "go.opentelemetry.io/otel/sdk/metric/metricdata"
1516)
1617
1718type buckets [N int64 | float64 ] struct {
19+ count uint64
20+ counts []uint64
21+ min , max atomic.Value
22+ total * counter [N ]
23+
1824 attrs attribute.Set
1925 res FilteredExemplarReservoir [N ]
20-
21- counts []uint64
22- count uint64
23- total N
24- min , max N
2526}
2627
2728// newBuckets returns buckets with n bins.
2829func newBuckets [N int64 | float64 ](attrs attribute.Set , n int ) * buckets [N ] {
29- return & buckets [N ]{attrs : attrs , counts : make ([]uint64 , n )}
30+ return & buckets [N ]{attrs : attrs , counts : make ([]uint64 , n ), total : & counter [ N ]{} }
3031}
3132
32- func (b * buckets [N ]) sum (value N ) { b .total += value }
33-
3433func (b * buckets [N ]) bin (idx int , value N ) {
35- b .counts [idx ]++
36- b .count ++
37- if value < b .min {
38- b .min = value
39- } else if value > b .max {
40- b .max = value
34+ atomic .AddUint64 (& b .counts [idx ], 1 )
35+ atomic .AddUint64 (& b .count , 1 )
36+ }
37+
38+ func (b * buckets [N ]) minMax (value N ) {
39+ for {
40+ minLoaded := b .min .Load ()
41+ if value < minLoaded .(N ) && ! b .min .CompareAndSwap (minLoaded , value ) {
42+ // We got a new min value, but lost the race. Try again.
43+ continue
44+ }
45+ maxLoaded := b .max .Load ()
46+ if value > maxLoaded .(N ) && ! b .max .CompareAndSwap (maxLoaded , value ) {
47+ // We got a new max value, but lost the race. Try again.
48+ continue
49+ }
50+ return
4151 }
4252}
4353
4454// histValues summarizes a set of measurements as an histValues with
4555// explicitly defined buckets.
4656type histValues [N int64 | float64 ] struct {
47- noSum bool
48- bounds []float64
57+ noSum bool
58+ noMinMax bool
59+ bounds []float64
4960
5061 newRes func (attribute.Set ) FilteredExemplarReservoir [N ]
51- limit limiter [* buckets [N ]]
62+ limit limiter [buckets [N ]]
5263 values map [attribute.Distinct ]* buckets [N ]
53- valuesMu sync.Mutex
64+ valuesMu sync.RWMutex
5465}
5566
5667func newHistValues [N int64 | float64 ](
5768 bounds []float64 ,
5869 noSum bool ,
70+ noMinMax bool ,
5971 limit int ,
6072 r func (attribute.Set ) FilteredExemplarReservoir [N ],
6173) * histValues [N ] {
@@ -66,11 +78,12 @@ func newHistValues[N int64 | float64](
6678 b := slices .Clone (bounds )
6779 slices .Sort (b )
6880 return & histValues [N ]{
69- noSum : noSum ,
70- bounds : b ,
71- newRes : r ,
72- limit : newLimiter [* buckets [N ]](limit ),
73- values : make (map [attribute.Distinct ]* buckets [N ]),
81+ noSum : noSum ,
82+ noMinMax : noMinMax ,
83+ bounds : b ,
84+ newRes : r ,
85+ limit : newLimiter [buckets [N ]](limit ),
86+ values : make (map [attribute.Distinct ]* buckets [N ]),
7487 }
7588}
7689
@@ -89,11 +102,11 @@ func (s *histValues[N]) measure(
89102 // (s.bounds[len(s.bounds)-1], +∞).
90103 idx := sort .SearchFloat64s (s .bounds , float64 (value ))
91104
92- s .valuesMu .Lock ()
93- defer s .valuesMu .Unlock ()
105+ s .valuesMu .RLock ()
94106
95107 attr := s .limit .Attributes (fltrAttr , s .values )
96108 b , ok := s .values [attr .Equivalent ()]
109+ s .valuesMu .RUnlock ()
97110 if ! ok {
98111 // N+1 buckets. For example:
99112 //
@@ -106,12 +119,20 @@ func (s *histValues[N]) measure(
106119 b .res = s .newRes (attr )
107120
108121 // Ensure min and max are recorded values (not zero), for new buckets.
109- b .min , b .max = value , value
122+ if ! s .noMinMax {
123+ b .min .Store (value )
124+ b .max .Store (value )
125+ }
126+ s .valuesMu .Lock ()
110127 s .values [attr .Equivalent ()] = b
128+ s .valuesMu .Unlock ()
111129 }
112130 b .bin (idx , value )
131+ if ! s .noMinMax {
132+ b .minMax (value )
133+ }
113134 if ! s .noSum {
114- b .sum (value )
135+ b .total . add (value )
115136 }
116137 b .res .Offer (ctx , value , droppedAttr )
117138}
@@ -125,8 +146,7 @@ func newHistogram[N int64 | float64](
125146 r func (attribute.Set ) FilteredExemplarReservoir [N ],
126147) * histogram [N ] {
127148 return & histogram [N ]{
128- histValues : newHistValues [N ](boundaries , noSum , limit , r ),
129- noMinMax : noMinMax ,
149+ histValues : newHistValues [N ](boundaries , noSum , noMinMax , limit , r ),
130150 start : now (),
131151 }
132152}
@@ -136,8 +156,7 @@ func newHistogram[N int64 | float64](
136156type histogram [N int64 | float64 ] struct {
137157 * histValues [N ]
138158
139- noMinMax bool
140- start time.Time
159+ start time.Time
141160}
142161
143162func (s * histogram [N ]) delta (
@@ -169,12 +188,12 @@ func (s *histogram[N]) delta(
169188 hDPts [i ].BucketCounts = val .counts
170189
171190 if ! s .noSum {
172- hDPts [i ].Sum = val .total
191+ hDPts [i ].Sum = val .total . value ()
173192 }
174193
175194 if ! s .noMinMax {
176- hDPts [i ].Min = metricdata .NewExtrema (val .min )
177- hDPts [i ].Max = metricdata .NewExtrema (val .max )
195+ hDPts [i ].Min = metricdata .NewExtrema (val .min . Load ().( N ) )
196+ hDPts [i ].Max = metricdata .NewExtrema (val .max . Load ().( N ) )
178197 }
179198
180199 collectExemplars (& hDPts [i ].Exemplars , val .res .Collect )
@@ -227,12 +246,12 @@ func (s *histogram[N]) cumulative(
227246 hDPts [i ].BucketCounts = slices .Clone (val .counts )
228247
229248 if ! s .noSum {
230- hDPts [i ].Sum = val .total
249+ hDPts [i ].Sum = val .total . value ()
231250 }
232251
233252 if ! s .noMinMax {
234- hDPts [i ].Min = metricdata .NewExtrema (val .min )
235- hDPts [i ].Max = metricdata .NewExtrema (val .max )
253+ hDPts [i ].Min = metricdata .NewExtrema (val .min . Load ().( N ) )
254+ hDPts [i ].Max = metricdata .NewExtrema (val .max . Load ().( N ) )
236255 }
237256
238257 collectExemplars (& hDPts [i ].Exemplars , val .res .Collect )
0 commit comments