@@ -16,6 +16,7 @@ use std::sync::{Arc, Mutex, RwLock};
1616
1717use aggregate:: is_under_cardinality_limit;
1818pub ( crate ) use aggregate:: { AggregateBuilder , ComputeAggregation , Measure } ;
19+ use crossbeam_utils:: CachePadded ;
1920pub ( crate ) use exponential_histogram:: { EXPO_MAX_SCALE , EXPO_MIN_SCALE } ;
2021use hashed:: { Hashed , HashedNoOpBuilder } ;
2122use once_cell:: sync:: Lazy ;
@@ -59,10 +60,12 @@ pub(crate) struct ValueMap<A>
5960where
6061 A : Aggregator ,
6162{
63+ shards_count : usize ,
6264 // for performance reasons, no_attribs tracker
6365 no_attribs : NoAttribs < A > ,
6466 // for performance reasons, to handle attributes in the provided order
65- all_attribs : RwLock < HashMap < Hashed < ' static , [ KeyValue ] > , Arc < A > , HashedNoOpBuilder > > ,
67+ all_attribs :
68+ Vec < CachePadded < RwLock < HashMap < Hashed < ' static , [ KeyValue ] > , Arc < A > , HashedNoOpBuilder > > > > ,
6669 // different order of attribute keys should still map to same tracker instance
6770 // this helps to achieve that and also enables implementing collection efficiently
6871 sorted_attribs : Mutex < HashMap < Hashed < ' static , [ KeyValue ] > , Arc < A > , HashedNoOpBuilder > > ,
@@ -75,12 +78,21 @@ where
7578 A : Aggregator ,
7679{
7780 fn new ( config : A :: InitConfig ) -> Self {
81+ let shards_count = std:: thread:: available_parallelism ( )
82+ . map ( |v| v. get ( ) )
83+ . unwrap_or ( 1 )
84+ * 4 ;
85+ let mut all_attribs = Vec :: with_capacity ( shards_count) ;
86+ all_attribs. resize_with ( shards_count, || {
87+ CachePadded :: new ( RwLock :: new ( HashMap :: default ( ) ) )
88+ } ) ;
7889 ValueMap {
90+ shards_count,
7991 no_attribs : NoAttribs {
8092 tracker : A :: create ( & config) ,
8193 is_set : AtomicBool :: new ( false ) ,
8294 } ,
83- all_attribs : RwLock :: new ( HashMap :: default ( ) ) ,
95+ all_attribs,
8496 sorted_attribs : Mutex :: new ( HashMap :: default ( ) ) ,
8597 config,
8698 }
@@ -95,8 +107,10 @@ where
95107
96108 let attributes = Hashed :: from_borrowed ( attributes) ;
97109
110+ let shard = ( attributes. hash_value ( ) % self . shards_count as u64 ) as usize ;
111+
98112 // Try to retrieve and update the tracker with the attributes in the provided order first
99- match self . all_attribs . read ( ) {
113+ match self . all_attribs [ shard ] . read ( ) {
100114 Ok ( trackers) => {
101115 if let Some ( tracker) = trackers. get ( & attributes) {
102116 tracker. update ( value) ;
@@ -136,7 +150,7 @@ where
136150 new_tracker. update ( value) ;
137151
138152 // Insert new tracker, so we could find it next time
139- let Ok ( mut all_trackers) = self . all_attribs . write ( ) else {
153+ let Ok ( mut all_trackers) = self . all_attribs [ shard ] . write ( ) else {
140154 return ;
141155 } ;
142156 all_trackers. insert ( attributes. into_owned ( ) , new_tracker) ;
@@ -176,17 +190,20 @@ where
176190 {
177191 // reset sorted trackers so new attributes set will be written into new hashmap
178192 let trackers = match self . sorted_attribs . lock ( ) {
179- Ok ( mut trackers) => {
180- let new = HashMap :: with_capacity_and_hasher ( trackers. len ( ) , HashedNoOpBuilder :: default ( ) ) ;
193+ Ok ( mut trackers) => {
194+ let new =
195+ HashMap :: with_capacity_and_hasher ( trackers. len ( ) , HashedNoOpBuilder :: default ( ) ) ;
181196 replace ( trackers. deref_mut ( ) , new)
182197 }
183198 Err ( _) => return ,
184199 } ;
185200 // reset all trackers, so all attribute sets will start using new hashmap
186- match self . all_attribs . write ( ) {
187- Ok ( mut all_trackers) => all_trackers. clear ( ) ,
188- Err ( _) => return ,
189- } ;
201+ for shard in 0 ..self . shards_count {
202+ match self . all_attribs [ shard] . write ( ) {
203+ Ok ( mut all_trackers) => all_trackers. clear ( ) ,
204+ Err ( _) => return ,
205+ } ;
206+ }
190207
191208 prepare_data ( dest, trackers. len ( ) ) ;
192209
0 commit comments