@@ -7,45 +7,9 @@ use crate::metrics::data::HistogramDataPoint;
77use crate :: metrics:: data:: { self , Aggregation , Temporality } ;
88use opentelemetry:: KeyValue ;
99
10- use super :: Number ;
11- use super :: { AtomicTracker , AtomicallyUpdate , Operation , ValueMap } ;
10+ use super :: ValueMap ;
11+ use super :: { Aggregator , Number } ;
1212
13- struct HistogramUpdate ;
14-
15- impl Operation for HistogramUpdate {
16- fn update_tracker < T : Default , AT : AtomicTracker < T > > ( tracker : & AT , value : T , index : usize ) {
17- tracker. update_histogram ( index, value) ;
18- }
19- }
20-
21- struct HistogramTracker < T > {
22- buckets : Mutex < Buckets < T > > ,
23- }
24-
25- impl < T : Number > AtomicTracker < T > for HistogramTracker < T > {
26- fn update_histogram ( & self , index : usize , value : T ) {
27- let mut buckets = match self . buckets . lock ( ) {
28- Ok ( guard) => guard,
29- Err ( _) => return ,
30- } ;
31-
32- buckets. bin ( index, value) ;
33- buckets. sum ( value) ;
34- }
35- }
36-
37- impl < T : Number > AtomicallyUpdate < T > for HistogramTracker < T > {
38- type AtomicTracker = HistogramTracker < T > ;
39-
40- fn new_atomic_tracker ( buckets_count : Option < usize > ) -> Self :: AtomicTracker {
41- let count = buckets_count. unwrap ( ) ;
42- HistogramTracker {
43- buckets : Mutex :: new ( Buckets :: < T > :: new ( count) ) ,
44- }
45- }
46- }
47-
48- #[ derive( Default ) ]
4913struct Buckets < T > {
5014 counts : Vec < u64 > ,
5115 count : u64 ,
@@ -54,29 +18,17 @@ struct Buckets<T> {
5418 max : T ,
5519}
5620
57- impl < T : Number > Buckets < T > {
58- /// returns buckets with `n` bins.
59- fn new ( n : usize ) -> Buckets < T > {
60- Buckets {
61- counts : vec ! [ 0 ; n] ,
21+ impl < T > Buckets < T >
22+ where
23+ T : Number ,
24+ {
25+ fn new ( size : usize ) -> Self {
26+ Self {
27+ counts : vec ! [ 0 ; size] ,
28+ count : 0 ,
29+ total : T :: default ( ) ,
6230 min : T :: max ( ) ,
6331 max : T :: min ( ) ,
64- ..Default :: default ( )
65- }
66- }
67-
68- fn sum ( & mut self , value : T ) {
69- self . total += value;
70- }
71-
72- fn bin ( & mut self , idx : usize , value : T ) {
73- self . counts [ idx] += 1 ;
74- self . count += 1 ;
75- if value < self . min {
76- self . min = value;
77- }
78- if value > self . max {
79- self . max = value
8032 }
8133 }
8234
@@ -91,45 +43,72 @@ impl<T: Number> Buckets<T> {
9143 }
9244}
9345
46+ impl < T > Aggregator < T > for Mutex < Buckets < T > >
47+ where
48+ T : Number ,
49+ {
50+ type InitConfig = usize ;
51+ /// Value and bucket index
52+ type PreComputedValue = ( T , usize ) ;
53+
54+ fn create ( size : & usize ) -> Self {
55+ Mutex :: new ( Buckets :: new ( * size) )
56+ }
57+
58+ fn update ( & self , ( value, idx) : ( T , usize ) ) {
59+ if let Ok ( mut this) = self . lock ( ) {
60+ this. counts [ idx] += 1 ;
61+ this. count += 1 ;
62+ if value < this. min {
63+ this. min = value;
64+ }
65+ if value > this. max {
66+ this. max = value
67+ }
68+ this. total += value;
69+ }
70+ }
71+ }
72+
9473/// Summarizes a set of measurements as a histogram with explicitly defined
9574/// buckets.
9675pub ( crate ) struct Histogram < T : Number > {
97- value_map : ValueMap < HistogramTracker < T > , T , HistogramUpdate > ,
76+ value_map : ValueMap < T , Mutex < Buckets < T > > > ,
9877 bounds : Vec < f64 > ,
9978 record_min_max : bool ,
10079 record_sum : bool ,
10180 start : Mutex < SystemTime > ,
10281}
10382
10483impl < T : Number > Histogram < T > {
105- pub ( crate ) fn new ( boundaries : Vec < f64 > , record_min_max : bool , record_sum : bool ) -> Self {
106- let buckets_count = boundaries. len ( ) + 1 ;
107- let mut histogram = Histogram {
108- value_map : ValueMap :: new_with_buckets_count ( buckets_count) ,
109- bounds : boundaries,
84+ pub ( crate ) fn new ( mut bounds : Vec < f64 > , record_min_max : bool , record_sum : bool ) -> Self {
85+ bounds. retain ( |v| !v. is_nan ( ) ) ;
86+ bounds. sort_by ( |a, b| a. partial_cmp ( b) . expect ( "NaNs filtered out" ) ) ;
87+ let buckets_count = bounds. len ( ) + 1 ;
88+ Self {
89+ value_map : ValueMap :: new ( buckets_count) ,
90+ bounds,
11091 record_min_max,
11192 record_sum,
11293 start : Mutex :: new ( SystemTime :: now ( ) ) ,
113- } ;
114-
115- histogram. bounds . retain ( |v| !v. is_nan ( ) ) ;
116- histogram
117- . bounds
118- . sort_by ( |a, b| a. partial_cmp ( b) . expect ( "NaNs filtered out" ) ) ;
119-
120- histogram
94+ }
12195 }
12296
12397 pub ( crate ) fn measure ( & self , measurement : T , attrs : & [ KeyValue ] ) {
12498 let f = measurement. into_float ( ) ;
125-
99+ // Ignore NaN and infinity.
100+ // Only makes sense if T is f64, maybe this could be no-op for other cases?
101+ if f. is_infinite ( ) || f. is_nan ( ) {
102+ return ;
103+ }
126104 // This search will return an index in the range `[0, bounds.len()]`, where
127105 // it will return `bounds.len()` if value is greater than the last element
128106 // of `bounds`. This aligns with the buckets in that the length of buckets
129107 // is `bounds.len()+1`, with the last bucket representing:
130108 // `(bounds[bounds.len()-1], +∞)`.
131109 let index = self . bounds . partition_point ( |& x| x < f) ;
132- self . value_map . measure ( measurement, attrs, index) ;
110+
111+ self . value_map . measure ( ( measurement, index) , attrs) ;
133112 }
134113
135114 pub ( crate ) fn delta (
@@ -167,7 +146,7 @@ impl<T: Number> Histogram<T> {
167146 . has_no_attribute_value
168147 . swap ( false , Ordering :: AcqRel )
169148 {
170- if let Ok ( ref mut b) = self . value_map . no_attribute_tracker . buckets . lock ( ) {
149+ if let Ok ( ref mut b) = self . value_map . no_attribute_tracker . lock ( ) {
171150 h. data_points . push ( HistogramDataPoint {
172151 attributes : vec ! [ ] ,
173152 start_time : start,
@@ -205,7 +184,7 @@ impl<T: Number> Histogram<T> {
205184 let mut seen = HashSet :: new ( ) ;
206185 for ( attrs, tracker) in trackers. drain ( ) {
207186 if seen. insert ( Arc :: as_ptr ( & tracker) ) {
208- if let Ok ( b) = tracker. buckets . lock ( ) {
187+ if let Ok ( b) = tracker. lock ( ) {
209188 h. data_points . push ( HistogramDataPoint {
210189 attributes : attrs. clone ( ) ,
211190 start_time : start,
@@ -278,7 +257,7 @@ impl<T: Number> Histogram<T> {
278257 . has_no_attribute_value
279258 . load ( Ordering :: Acquire )
280259 {
281- if let Ok ( b) = & self . value_map . no_attribute_tracker . buckets . lock ( ) {
260+ if let Ok ( b) = & self . value_map . no_attribute_tracker . lock ( ) {
282261 h. data_points . push ( HistogramDataPoint {
283262 attributes : vec ! [ ] ,
284263 start_time : start,
@@ -318,7 +297,7 @@ impl<T: Number> Histogram<T> {
318297 let mut seen = HashSet :: new ( ) ;
319298 for ( attrs, tracker) in trackers. iter ( ) {
320299 if seen. insert ( Arc :: as_ptr ( tracker) ) {
321- if let Ok ( b) = tracker. buckets . lock ( ) {
300+ if let Ok ( b) = tracker. lock ( ) {
322301 h. data_points . push ( HistogramDataPoint {
323302 attributes : attrs. clone ( ) ,
324303 start_time : start,
@@ -350,3 +329,68 @@ impl<T: Number> Histogram<T> {
350329 ( h. data_points . len ( ) , new_agg. map ( |a| Box :: new ( a) as Box < _ > ) )
351330 }
352331}
332+
333+ #[ cfg( test) ]
334+ mod tests {
335+
336+ use super :: * ;
337+
338+ #[ test]
339+ fn when_f64_is_nan_or_infinity_then_ignore ( ) {
340+ struct Expected {
341+ min : f64 ,
342+ max : f64 ,
343+ sum : f64 ,
344+ count : u64 ,
345+ }
346+ impl Expected {
347+ fn new ( min : f64 , max : f64 , sum : f64 , count : u64 ) -> Self {
348+ Expected {
349+ min,
350+ max,
351+ sum,
352+ count,
353+ }
354+ }
355+ }
356+ struct TestCase {
357+ values : Vec < f64 > ,
358+ expected : Expected ,
359+ }
360+
361+ let test_cases = vec ! [
362+ TestCase {
363+ values: vec![ 2.0 , 4.0 , 1.0 ] ,
364+ expected: Expected :: new( 1.0 , 4.0 , 7.0 , 3 ) ,
365+ } ,
366+ TestCase {
367+ values: vec![ 2.0 , 4.0 , 1.0 , f64 :: INFINITY ] ,
368+ expected: Expected :: new( 1.0 , 4.0 , 7.0 , 3 ) ,
369+ } ,
370+ TestCase {
371+ values: vec![ 2.0 , 4.0 , 1.0 , -f64 :: INFINITY ] ,
372+ expected: Expected :: new( 1.0 , 4.0 , 7.0 , 3 ) ,
373+ } ,
374+ TestCase {
375+ values: vec![ 2.0 , f64 :: NAN , 4.0 , 1.0 ] ,
376+ expected: Expected :: new( 1.0 , 4.0 , 7.0 , 3 ) ,
377+ } ,
378+ TestCase {
379+ values: vec![ 4.0 , 4.0 , 4.0 , 2.0 , 16.0 , 1.0 ] ,
380+ expected: Expected :: new( 1.0 , 16.0 , 31.0 , 6 ) ,
381+ } ,
382+ ] ;
383+
384+ for test in test_cases {
385+ let h = Histogram :: new ( vec ! [ ] , true , true ) ;
386+ for v in test. values {
387+ h. measure ( v, & [ ] ) ;
388+ }
389+ let res = h. value_map . no_attribute_tracker . lock ( ) . unwrap ( ) ;
390+ assert_eq ! ( test. expected. max, res. max) ;
391+ assert_eq ! ( test. expected. min, res. min) ;
392+ assert_eq ! ( test. expected. sum, res. total) ;
393+ assert_eq ! ( test. expected. count, res. count) ;
394+ }
395+ }
396+ }
0 commit comments