@@ -6,11 +6,10 @@ mod precomputed_sum;
66mod sum;
77
88use core:: fmt;
9- use std:: collections:: { HashMap , HashSet } ;
10- use std:: mem:: take;
11- use std:: ops:: { Add , AddAssign , DerefMut , Sub } ;
9+ use std:: collections:: HashSet ;
10+ use std:: ops:: { Add , AddAssign , Sub } ;
1211use std:: sync:: atomic:: { AtomicBool , AtomicI64 , AtomicU64 , AtomicUsize , Ordering } ;
13- use std:: sync:: { Arc , RwLock } ;
12+ use std:: sync:: Arc ;
1413
1514use aggregate:: is_under_cardinality_limit;
1615pub ( crate ) use aggregate:: { AggregateBuilder , ComputeAggregation , Measure } ;
@@ -49,10 +48,12 @@ pub(crate) trait Aggregator {
4948/// updates to the underlying value trackers should be performed.
5049pub ( crate ) struct ValueMap < A >
5150where
52- A : Aggregator ,
51+ A : Aggregator + Send + Sync ,
5352{
5453 /// Trackers store the values associated with different attribute sets.
55- trackers : RwLock < HashMap < Vec < KeyValue > , Arc < A > > > ,
54+ trackers : flurry:: HashMap < Vec < KeyValue > , Arc < A > > ,
55+ /// Lock to ensure that only one writer can write to the `trackers` map at a time.
56+ write_lock : std:: sync:: Mutex < ( ) > ,
5657 /// Number of different attribute set stored in the `trackers` map.
5758 count : AtomicUsize ,
5859 /// Indicates whether a value with no attributes has been stored.
@@ -65,11 +66,12 @@ where
6566
6667impl < A > ValueMap < A >
6768where
68- A : Aggregator ,
69+ A : Aggregator + Send + Sync ,
6970{
7071 fn new ( config : A :: InitConfig ) -> Self {
7172 ValueMap {
72- trackers : RwLock :: new ( HashMap :: new ( ) ) ,
73+ trackers : flurry:: HashMap :: new ( ) ,
74+ write_lock : std:: sync:: Mutex :: new ( ( ) ) ,
7375 has_no_attribute_value : AtomicBool :: new ( false ) ,
7476 no_attribute_tracker : A :: create ( & config) ,
7577 count : AtomicUsize :: new ( 0 ) ,
@@ -84,51 +86,46 @@ where
8486 return ;
8587 }
8688
87- let Ok ( trackers) = self . trackers . read ( ) else {
88- return ;
89- } ;
90-
9189 // Try to retrieve and update the tracker with the attributes in the provided order first
92- if let Some ( tracker) = trackers. get ( attributes) {
90+ let hashmap_ref = self . trackers . pin ( ) ;
91+ if let Some ( tracker) = hashmap_ref. get ( attributes) {
9392 tracker. update ( value) ;
9493 return ;
9594 }
9695
9796 // Try to retrieve and update the tracker with the attributes sorted.
9897 let sorted_attrs = AttributeSet :: from ( attributes) . into_vec ( ) ;
99- if let Some ( tracker) = trackers . get ( sorted_attrs. as_slice ( ) ) {
98+ if let Some ( tracker) = hashmap_ref . get ( sorted_attrs. as_slice ( ) ) {
10099 tracker. update ( value) ;
101100 return ;
102101 }
103102
104- // Give up the read lock before acquiring the write lock.
105- drop ( trackers) ;
106-
107- let Ok ( mut trackers) = self . trackers . write ( ) else {
103+ let Ok ( _write_lock) = self . write_lock . lock ( ) else {
108104 return ;
109105 } ;
110106
111107 // Recheck both the provided and sorted orders after acquiring the write lock
112108 // in case another thread has pushed an update in the meantime.
113- if let Some ( tracker) = trackers . get ( attributes) {
109+ if let Some ( tracker) = hashmap_ref . get ( attributes) {
114110 tracker. update ( value) ;
115- } else if let Some ( tracker) = trackers . get ( sorted_attrs. as_slice ( ) ) {
111+ } else if let Some ( tracker) = hashmap_ref . get ( sorted_attrs. as_slice ( ) ) {
116112 tracker. update ( value) ;
117113 } else if is_under_cardinality_limit ( self . count . load ( Ordering :: SeqCst ) ) {
118114 let new_tracker = Arc :: new ( A :: create ( & self . config ) ) ;
119115 new_tracker. update ( value) ;
120116
121117 // Insert tracker with the attributes in the provided and sorted orders
122- trackers . insert ( attributes. to_vec ( ) , new_tracker. clone ( ) ) ;
123- trackers . insert ( sorted_attrs, new_tracker) ;
118+ hashmap_ref . insert ( attributes. to_vec ( ) , new_tracker. clone ( ) ) ;
119+ hashmap_ref . insert ( sorted_attrs, new_tracker) ;
124120
125121 self . count . fetch_add ( 1 , Ordering :: SeqCst ) ;
126- } else if let Some ( overflow_value) = trackers. get ( STREAM_OVERFLOW_ATTRIBUTES . as_slice ( ) ) {
122+ } else if let Some ( overflow_value) = hashmap_ref. get ( STREAM_OVERFLOW_ATTRIBUTES . as_slice ( ) )
123+ {
127124 overflow_value. update ( value) ;
128125 } else {
129126 let new_tracker = A :: create ( & self . config ) ;
130127 new_tracker. update ( value) ;
131- trackers . insert ( STREAM_OVERFLOW_ATTRIBUTES . clone ( ) , Arc :: new ( new_tracker) ) ;
128+ hashmap_ref . insert ( STREAM_OVERFLOW_ATTRIBUTES . clone ( ) , Arc :: new ( new_tracker) ) ;
132129 otel_warn ! ( name: "ValueMap.measure" ,
133130 message = "Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged."
134131 ) ;
@@ -146,12 +143,9 @@ where
146143 dest. push ( map_fn ( vec ! [ ] , & self . no_attribute_tracker ) ) ;
147144 }
148145
149- let Ok ( trackers) = self . trackers . read ( ) else {
150- return ;
151- } ;
152-
153146 let mut seen = HashSet :: new ( ) ;
154- for ( attrs, tracker) in trackers. iter ( ) {
147+ let hashmap_ref = self . trackers . pin ( ) ;
148+ for ( attrs, tracker) in hashmap_ref. iter ( ) {
155149 if seen. insert ( Arc :: as_ptr ( tracker) ) {
156150 dest. push ( map_fn ( attrs. clone ( ) , tracker) ) ;
157151 }
@@ -172,18 +166,23 @@ where
172166 ) ) ;
173167 }
174168
175- let trackers = match self . trackers . write ( ) {
176- Ok ( mut trackers) => {
177- self . count . store ( 0 , Ordering :: SeqCst ) ;
178- take ( trackers. deref_mut ( ) )
179- }
180- Err ( _) => todo ! ( ) ,
169+ let Ok ( _write_lock) = self . write_lock . lock ( ) else {
170+ return ;
181171 } ;
182172
173+ self . count . store ( 0 , Ordering :: SeqCst ) ;
174+ let trackers = self . trackers . clone ( ) ;
175+ self . trackers . pin ( ) . clear ( ) ;
176+
177+ drop ( _write_lock) ;
178+
183179 let mut seen = HashSet :: new ( ) ;
184- for ( attrs, tracker) in trackers. into_iter ( ) {
185- if seen. insert ( Arc :: as_ptr ( & tracker) ) {
186- dest. push ( map_fn ( attrs, tracker. clone_and_reset ( & self . config ) ) ) ;
180+ for ( attrs, tracker) in trackers. pin ( ) . into_iter ( ) {
181+ if seen. insert ( Arc :: as_ptr ( tracker) ) {
182+ dest. push ( map_fn (
183+ attrs. to_vec ( ) ,
184+ tracker. clone_and_reset ( & self . config ) ,
185+ ) ) ;
187186 }
188187 }
189188 }
0 commit comments