@@ -7,79 +7,41 @@ use crate::metrics::data::HistogramDataPoint;
77use crate :: metrics:: data:: { self , Aggregation , Temporality } ;
88use opentelemetry:: KeyValue ;
99
10- use super :: Number ;
11- use super :: { AtomicTracker , AtomicallyUpdate , Operation , ValueMap } ;
10+ use super :: ValueMap ;
11+ use super :: { Aggregator , Number } ;
1212
13- struct HistogramUpdate ;
14-
15- impl Operation for HistogramUpdate {
16- fn update_tracker < T : Default , AT : AtomicTracker < T > > ( tracker : & AT , value : T , index : usize ) {
17- tracker. update_histogram ( index, value) ;
18- }
19- }
20-
21- struct HistogramTracker < T > {
22- buckets : Mutex < Buckets < T > > ,
23- }
24-
25- impl < T : Number > AtomicTracker < T > for HistogramTracker < T > {
26- fn update_histogram ( & self , index : usize , value : T ) {
27- let mut buckets = match self . buckets . lock ( ) {
28- Ok ( guard) => guard,
29- Err ( _) => return ,
30- } ;
31-
32- buckets. bin ( index, value) ;
33- buckets. sum ( value) ;
34- }
35- }
36-
37- impl < T : Number > AtomicallyUpdate < T > for HistogramTracker < T > {
38- type AtomicTracker = HistogramTracker < T > ;
39-
40- fn new_atomic_tracker ( buckets_count : Option < usize > ) -> Self :: AtomicTracker {
41- let count = buckets_count. unwrap ( ) ;
42- HistogramTracker {
43- buckets : Mutex :: new ( Buckets :: < T > :: new ( count) ) ,
44- }
45- }
13+ struct BucketsConfig {
14+ bounds : Vec < f64 > ,
15+ record_min_max : bool ,
16+ record_sum : bool ,
4617}
4718
48- #[ derive( Default ) ]
49- struct Buckets < T > {
19+ #[ derive( Default , Debug , Clone ) ]
20+ struct BucketsData < T > {
5021 counts : Vec < u64 > ,
5122 count : u64 ,
5223 total : T ,
5324 min : T ,
5425 max : T ,
5526}
5627
57- impl < T : Number > Buckets < T > {
58- /// returns buckets with `n` bins.
59- fn new ( n : usize ) -> Buckets < T > {
60- Buckets {
61- counts : vec ! [ 0 ; n] ,
28+ struct Buckets < T > {
29+ data : Mutex < BucketsData < T > > ,
30+ }
31+
32+ impl < T > BucketsData < T >
33+ where
34+ T : Number ,
35+ {
36+ fn new ( size : usize ) -> Self {
37+ Self {
38+ counts : vec ! [ 0 ; size] ,
6239 min : T :: max ( ) ,
6340 max : T :: min ( ) ,
6441 ..Default :: default ( )
6542 }
6643 }
6744
68- fn sum ( & mut self , value : T ) {
69- self . total += value;
70- }
71-
72- fn bin ( & mut self , idx : usize , value : T ) {
73- self . counts [ idx] += 1 ;
74- self . count += 1 ;
75- if value < self . min {
76- self . min = value;
77- }
78- if value > self . max {
79- self . max = value
80- }
81- }
82-
8345 fn reset ( & mut self ) {
8446 for item in & mut self . counts {
8547 * item = 0 ;
@@ -91,45 +53,71 @@ impl<T: Number> Buckets<T> {
9153 }
9254}
9355
56+ impl < T > Aggregator < T > for Buckets < T >
57+ where
58+ T : Number ,
59+ {
60+ type Config = BucketsConfig ;
61+
62+ fn create ( config : & BucketsConfig ) -> Self {
63+ let size = config. bounds . len ( ) + 1 ;
64+ Buckets {
65+ data : Mutex :: new ( BucketsData :: new ( size) ) ,
66+ }
67+ }
68+
69+ fn update ( & self , config : & BucketsConfig , measurement : T ) {
70+ let f_value = measurement. into_float ( ) ;
71+ // Ignore NaN and infinity.
72+ if f_value. is_infinite ( ) || f_value. is_nan ( ) {
73+ return ;
74+ }
75+ // This search will return an index in the range `[0, bounds.len()]`, where
76+ // it will return `bounds.len()` if value is greater than the last element
77+ // of `bounds`. This aligns with the buckets in that the length of buckets
78+ // is `bounds.len()+1`, with the last bucket representing:
79+ // `(bounds[bounds.len()-1], +∞)`.
80+ let idx = config. bounds . partition_point ( |& x| x < f_value) ;
81+ if let Ok ( mut data) = self . data . lock ( ) {
82+ data. counts [ idx] += 1 ;
83+ data. count += 1 ;
84+ if config. record_min_max {
85+ if measurement < data. min {
86+ data. min = measurement;
87+ }
88+ if measurement > data. max {
89+ data. max = measurement
90+ }
91+ }
92+ // it's very cheap to update it, even if it is not configured to record_sum
93+ data. total += measurement;
94+ }
95+ }
96+ }
97+
9498/// Summarizes a set of measurements as a histogram with explicitly defined
9599/// buckets.
96100pub ( crate ) struct Histogram < T : Number > {
97- value_map : ValueMap < HistogramTracker < T > , T , HistogramUpdate > ,
98- bounds : Vec < f64 > ,
99- record_min_max : bool ,
100- record_sum : bool ,
101+ value_map : ValueMap < T , Buckets < T > > ,
101102 start : Mutex < SystemTime > ,
102103}
103104
104105impl < T : Number > Histogram < T > {
105- pub ( crate ) fn new ( boundaries : Vec < f64 > , record_min_max : bool , record_sum : bool ) -> Self {
106- let buckets_count = boundaries. len ( ) + 1 ;
107- let mut histogram = Histogram {
108- value_map : ValueMap :: new_with_buckets_count ( buckets_count) ,
109- bounds : boundaries,
110- record_min_max,
111- record_sum,
106+ pub ( crate ) fn new ( mut bounds : Vec < f64 > , record_min_max : bool , record_sum : bool ) -> Self {
107+ bounds. retain ( |v| !v. is_nan ( ) ) ;
108+ bounds. sort_by ( |a, b| a. partial_cmp ( b) . expect ( "NaNs filtered out" ) ) ;
109+ Self {
110+ value_map : ValueMap :: new ( BucketsConfig {
111+ record_min_max,
112+ record_sum,
113+ bounds,
114+ } ) ,
112115 start : Mutex :: new ( SystemTime :: now ( ) ) ,
113- } ;
114-
115- histogram. bounds . retain ( |v| !v. is_nan ( ) ) ;
116- histogram
117- . bounds
118- . sort_by ( |a, b| a. partial_cmp ( b) . expect ( "NaNs filtered out" ) ) ;
119-
120- histogram
116+ }
121117 }
122118
123119 pub ( crate ) fn measure ( & self , measurement : T , attrs : & [ KeyValue ] ) {
124- let f = measurement. into_float ( ) ;
125-
126- // This search will return an index in the range `[0, bounds.len()]`, where
127- // it will return `bounds.len()` if value is greater than the last element
128- // of `bounds`. This aligns with the buckets in that the length of buckets
129- // is `bounds.len()+1`, with the last bucket representing:
130- // `(bounds[bounds.len()-1], +∞)`.
131- let index = self . bounds . partition_point ( |& x| x < f) ;
132- self . value_map . measure ( measurement, attrs, index) ;
120+ self . value_map . measure ( measurement, attrs) ;
133121 }
134122
135123 pub ( crate ) fn delta (
@@ -167,25 +155,25 @@ impl<T: Number> Histogram<T> {
167155 . has_no_attribute_value
168156 . swap ( false , Ordering :: AcqRel )
169157 {
170- if let Ok ( ref mut b) = self . value_map . no_attribute_tracker . buckets . lock ( ) {
158+ if let Ok ( ref mut b) = self . value_map . no_attribute_tracker . data . lock ( ) {
171159 h. data_points . push ( HistogramDataPoint {
172160 attributes : vec ! [ ] ,
173161 start_time : start,
174162 time : t,
175163 count : b. count ,
176- bounds : self . bounds . clone ( ) ,
164+ bounds : self . value_map . config . bounds . clone ( ) ,
177165 bucket_counts : b. counts . clone ( ) ,
178- sum : if self . record_sum {
166+ sum : if self . value_map . config . record_sum {
179167 b. total
180168 } else {
181169 T :: default ( )
182170 } ,
183- min : if self . record_min_max {
171+ min : if self . value_map . config . record_min_max {
184172 Some ( b. min )
185173 } else {
186174 None
187175 } ,
188- max : if self . record_min_max {
176+ max : if self . value_map . config . record_min_max {
189177 Some ( b. max )
190178 } else {
191179 None
@@ -205,25 +193,25 @@ impl<T: Number> Histogram<T> {
205193 let mut seen = HashSet :: new ( ) ;
206194 for ( attrs, tracker) in trackers. drain ( ) {
207195 if seen. insert ( Arc :: as_ptr ( & tracker) ) {
208- if let Ok ( b) = tracker. buckets . lock ( ) {
196+ if let Ok ( b) = tracker. data . lock ( ) {
209197 h. data_points . push ( HistogramDataPoint {
210198 attributes : attrs. clone ( ) ,
211199 start_time : start,
212200 time : t,
213201 count : b. count ,
214- bounds : self . bounds . clone ( ) ,
202+ bounds : self . value_map . config . bounds . clone ( ) ,
215203 bucket_counts : b. counts . clone ( ) ,
216- sum : if self . record_sum {
204+ sum : if self . value_map . config . record_sum {
217205 b. total
218206 } else {
219207 T :: default ( )
220208 } ,
221- min : if self . record_min_max {
209+ min : if self . value_map . config . record_min_max {
222210 Some ( b. min )
223211 } else {
224212 None
225213 } ,
226- max : if self . record_min_max {
214+ max : if self . value_map . config . record_min_max {
227215 Some ( b. max )
228216 } else {
229217 None
@@ -278,25 +266,25 @@ impl<T: Number> Histogram<T> {
278266 . has_no_attribute_value
279267 . load ( Ordering :: Acquire )
280268 {
281- if let Ok ( b) = & self . value_map . no_attribute_tracker . buckets . lock ( ) {
269+ if let Ok ( b) = & self . value_map . no_attribute_tracker . data . lock ( ) {
282270 h. data_points . push ( HistogramDataPoint {
283271 attributes : vec ! [ ] ,
284272 start_time : start,
285273 time : t,
286274 count : b. count ,
287- bounds : self . bounds . clone ( ) ,
275+ bounds : self . value_map . config . bounds . clone ( ) ,
288276 bucket_counts : b. counts . clone ( ) ,
289- sum : if self . record_sum {
277+ sum : if self . value_map . config . record_sum {
290278 b. total
291279 } else {
292280 T :: default ( )
293281 } ,
294- min : if self . record_min_max {
282+ min : if self . value_map . config . record_min_max {
295283 Some ( b. min )
296284 } else {
297285 None
298286 } ,
299- max : if self . record_min_max {
287+ max : if self . value_map . config . record_min_max {
300288 Some ( b. max )
301289 } else {
302290 None
@@ -318,25 +306,25 @@ impl<T: Number> Histogram<T> {
318306 let mut seen = HashSet :: new ( ) ;
319307 for ( attrs, tracker) in trackers. iter ( ) {
320308 if seen. insert ( Arc :: as_ptr ( tracker) ) {
321- if let Ok ( b) = tracker. buckets . lock ( ) {
309+ if let Ok ( b) = tracker. data . lock ( ) {
322310 h. data_points . push ( HistogramDataPoint {
323311 attributes : attrs. clone ( ) ,
324312 start_time : start,
325313 time : t,
326314 count : b. count ,
327- bounds : self . bounds . clone ( ) ,
315+ bounds : self . value_map . config . bounds . clone ( ) ,
328316 bucket_counts : b. counts . clone ( ) ,
329- sum : if self . record_sum {
317+ sum : if self . value_map . config . record_sum {
330318 b. total
331319 } else {
332320 T :: default ( )
333321 } ,
334- min : if self . record_min_max {
322+ min : if self . value_map . config . record_min_max {
335323 Some ( b. min )
336324 } else {
337325 None
338326 } ,
339- max : if self . record_min_max {
327+ max : if self . value_map . config . record_min_max {
340328 Some ( b. max )
341329 } else {
342330 None
0 commit comments