11mod aggregate;
22mod exponential_histogram;
3+ mod hashed;
34mod histogram;
45mod last_value;
56mod precomputed_sum;
@@ -15,13 +16,12 @@ use std::sync::{Arc, RwLock};
1516use aggregate:: is_under_cardinality_limit;
1617pub ( crate ) use aggregate:: { AggregateBuilder , ComputeAggregation , Measure } ;
1718pub ( crate ) use exponential_histogram:: { EXPO_MAX_SCALE , EXPO_MIN_SCALE } ;
19+ use hashed:: { Hashed , HashedNoOpBuilder } ;
1820use once_cell:: sync:: Lazy ;
1921use opentelemetry:: { otel_warn, KeyValue } ;
2022
21- use crate :: metrics:: AttributeSet ;
22-
23- pub ( crate ) static STREAM_OVERFLOW_ATTRIBUTES : Lazy < Vec < KeyValue > > =
24- Lazy :: new ( || vec ! [ KeyValue :: new( "otel.metric.overflow" , "true" ) ] ) ;
23+ pub ( crate ) static STREAM_OVERFLOW_ATTRIBUTES : Lazy < Hashed < ' static , [ KeyValue ] > > =
24+ Lazy :: new ( || Hashed :: from_owned ( vec ! [ KeyValue :: new( "otel.metric.overflow" , "true" ) ] ) ) ;
2525
2626pub ( crate ) trait Aggregator {
2727 /// A static configuration that is needed in order to initialize aggregator.
5252 A : Aggregator ,
5353{
5454 /// Trackers store the values associated with different attribute sets.
55- trackers : RwLock < HashMap < Vec < KeyValue > , Arc < A > > > ,
55+ trackers : RwLock < HashMap < Hashed < ' static , [ KeyValue ] > , Arc < A > , HashedNoOpBuilder > > ,
5656 /// Number of different attribute set stored in the `trackers` map.
5757 count : AtomicUsize ,
5858 /// Indicates whether a value with no attributes has been stored.
6969{
7070 fn new ( config : A :: InitConfig ) -> Self {
7171 ValueMap {
72- trackers : RwLock :: new ( HashMap :: new ( ) ) ,
72+ trackers : RwLock :: new ( HashMap :: default ( ) ) ,
7373 has_no_attribute_value : AtomicBool :: new ( false ) ,
7474 no_attribute_tracker : A :: create ( & config) ,
7575 count : AtomicUsize :: new ( 0 ) ,
@@ -84,19 +84,25 @@ where
8484 return ;
8585 }
8686
87+ let attributes = Hashed :: from_borrowed ( attributes) ;
88+
8789 let Ok ( trackers) = self . trackers . read ( ) else {
8890 return ;
8991 } ;
9092
9193 // Try to retrieve and update the tracker with the attributes in the provided order first
92- if let Some ( tracker) = trackers. get ( attributes) {
94+ if let Some ( tracker) = trackers. get ( & attributes) {
9395 tracker. update ( value) ;
9496 return ;
9597 }
9698
9799 // Try to retrieve and update the tracker with the attributes sorted.
98- let sorted_attrs = AttributeSet :: from ( attributes) . into_vec ( ) ;
99- if let Some ( tracker) = trackers. get ( sorted_attrs. as_slice ( ) ) {
100+ let sorted_attrs = attributes. clone ( ) . mutate ( |list| {
101+ // use stable sort
102+ list. sort_by ( |a, b| a. key . cmp ( & b. key ) ) ;
103+ dedup_remove_first ( list, |a, b| a. key == b. key ) ;
104+ } ) ;
105+ if let Some ( tracker) = trackers. get ( & sorted_attrs) {
100106 tracker. update ( value) ;
101107 return ;
102108 }
@@ -110,20 +116,20 @@ where
110116
111117 // Recheck both the provided and sorted orders after acquiring the write lock
112118 // in case another thread has pushed an update in the meantime.
113- if let Some ( tracker) = trackers. get ( attributes) {
119+ if let Some ( tracker) = trackers. get ( & attributes) {
114120 tracker. update ( value) ;
115- } else if let Some ( tracker) = trackers. get ( sorted_attrs. as_slice ( ) ) {
121+ } else if let Some ( tracker) = trackers. get ( & sorted_attrs) {
116122 tracker. update ( value) ;
117123 } else if is_under_cardinality_limit ( self . count . load ( Ordering :: SeqCst ) ) {
118124 let new_tracker = Arc :: new ( A :: create ( & self . config ) ) ;
119125 new_tracker. update ( value) ;
120126
121127 // Insert tracker with the attributes in the provided and sorted orders
122- trackers. insert ( attributes. to_vec ( ) , new_tracker. clone ( ) ) ;
128+ trackers. insert ( attributes. into_owned ( ) , new_tracker. clone ( ) ) ;
123129 trackers. insert ( sorted_attrs, new_tracker) ;
124130
125131 self . count . fetch_add ( 1 , Ordering :: SeqCst ) ;
126- } else if let Some ( overflow_value) = trackers. get ( STREAM_OVERFLOW_ATTRIBUTES . as_slice ( ) ) {
132+ } else if let Some ( overflow_value) = trackers. get ( & STREAM_OVERFLOW_ATTRIBUTES ) {
127133 overflow_value. update ( value) ;
128134 } else {
129135 let new_tracker = A :: create ( & self . config ) ;
@@ -153,7 +159,7 @@ where
153159 let mut seen = HashSet :: new ( ) ;
154160 for ( attrs, tracker) in trackers. iter ( ) {
155161 if seen. insert ( Arc :: as_ptr ( tracker) ) {
156- dest. push ( map_fn ( attrs. clone ( ) , tracker) ) ;
162+ dest. push ( map_fn ( attrs. clone ( ) . into_inner_owned ( ) , tracker) ) ;
157163 }
158164 }
159165 }
@@ -183,8 +189,25 @@ where
183189 let mut seen = HashSet :: new ( ) ;
184190 for ( attrs, tracker) in trackers. into_iter ( ) {
185191 if seen. insert ( Arc :: as_ptr ( & tracker) ) {
186- dest. push ( map_fn ( attrs, tracker. clone_and_reset ( & self . config ) ) ) ;
192+ dest. push ( map_fn (
193+ attrs. into_inner_owned ( ) ,
194+ tracker. clone_and_reset ( & self . config ) ,
195+ ) ) ;
196+ }
197+ }
198+ }
199+ }
200+
201+ fn dedup_remove_first < T > ( values : & mut Vec < T > , is_eq : impl Fn ( & T , & T ) -> bool ) {
202+ // we cannot use vec.dedup_by because it will remove last duplicate not first
203+ if values. len ( ) > 1 {
204+ let mut i = values. len ( ) - 1 ;
205+ while i != 0 {
206+ let is_same = unsafe { is_eq ( values. get_unchecked ( i - 1 ) , values. get_unchecked ( i) ) } ;
207+ if is_same {
208+ values. remove ( i - 1 ) ;
187209 }
210+ i -= 1 ;
188211 }
189212 }
190213}
@@ -392,8 +415,45 @@ impl AtomicallyUpdate<f64> for f64 {
392415
393416#[ cfg( test) ]
394417mod tests {
418+ use std:: usize;
419+
395420 use super :: * ;
396421
422+ fn assert_deduped < const N : usize , const M : usize > (
423+ input : [ ( i32 , bool ) ; N ] ,
424+ expect : [ ( i32 , bool ) ; M ] ,
425+ ) {
426+ let mut list: Vec < ( i32 , bool ) > = Vec :: from ( input) ;
427+ dedup_remove_first ( & mut list, |a, b| a. 0 == b. 0 ) ;
428+ assert_eq ! ( list, expect) ;
429+ }
430+
431+ #[ test]
432+ fn deduplicate_by_removing_first_element_from_sorted_array ( ) {
433+ assert_deduped ( [ ] , [ ] ) ;
434+ assert_deduped ( [ ( 1 , true ) ] , [ ( 1 , true ) ] ) ;
435+ assert_deduped ( [ ( 1 , false ) , ( 1 , false ) , ( 1 , true ) ] , [ ( 1 , true ) ] ) ;
436+ assert_deduped (
437+ [ ( 1 , true ) , ( 2 , false ) , ( 2 , false ) , ( 2 , true ) ] ,
438+ [ ( 1 , true ) , ( 2 , true ) ] ,
439+ ) ;
440+ assert_deduped (
441+ [ ( 1 , true ) , ( 1 , false ) , ( 1 , true ) , ( 2 , true ) ] ,
442+ [ ( 1 , true ) , ( 2 , true ) ] ,
443+ ) ;
444+ assert_deduped (
445+ [
446+ ( 1 , false ) ,
447+ ( 1 , true ) ,
448+ ( 2 , false ) ,
449+ ( 2 , true ) ,
450+ ( 3 , false ) ,
451+ ( 3 , true ) ,
452+ ] ,
453+ [ ( 1 , true ) , ( 2 , true ) , ( 3 , true ) ] ,
454+ ) ;
455+ }
456+
397457 #[ test]
398458 fn can_store_u64_atomic_value ( ) {
399459 let atomic = u64:: new_atomic_tracker ( 0 ) ;
0 commit comments