@@ -10,7 +10,7 @@ use std::collections::{HashMap, HashSet};
1010use std:: mem:: swap;
1111use std:: ops:: { Add , AddAssign , DerefMut , Sub } ;
1212use std:: sync:: atomic:: { AtomicBool , AtomicI64 , AtomicU64 , AtomicUsize , Ordering } ;
13- use std:: sync:: { Arc , RwLock } ;
13+ use std:: sync:: { Arc , OnceLock , RwLock } ;
1414
1515use aggregate:: { is_under_cardinality_limit, STREAM_CARDINALITY_LIMIT } ;
1616pub ( crate ) use aggregate:: { AggregateBuilder , ComputeAggregation , Measure } ;
5252 /// Trackers store the values associated with different attribute sets.
5353 trackers : RwLock < HashMap < Vec < KeyValue > , Arc < A > > > ,
5454
55- /// Used by collect exclusively. The data type must match the one used in
56- /// `trackers` to allow mem::swap.
57- trackers_for_collect : RwLock < HashMap < Vec < KeyValue > , Arc < A > > > ,
55+ /// Used ONLY by Delta collect. The data type must match the one used in
56+ /// `trackers` to allow mem::swap. Wrapping the type in `OnceLock` to
57+ /// avoid this allocation for Cumulative aggregation.
58+ trackers_for_collect : OnceLock < RwLock < HashMap < Vec < KeyValue > , Arc < A > > > > ,
5859
5960 /// Number of different attribute set stored in the `trackers` map.
6061 count : AtomicUsize ,
@@ -73,16 +74,20 @@ where
7374 fn new ( config : A :: InitConfig ) -> Self {
7475 ValueMap {
7576 trackers : RwLock :: new ( HashMap :: with_capacity ( 1 + STREAM_CARDINALITY_LIMIT ) ) ,
76- // TODO: For cumulative, this is not required, so avoid this
77- // pre-allocation.
78- trackers_for_collect : RwLock :: new ( HashMap :: with_capacity ( 1 + STREAM_CARDINALITY_LIMIT ) ) ,
77+ trackers_for_collect : OnceLock :: new ( ) ,
7978 has_no_attribute_value : AtomicBool :: new ( false ) ,
8079 no_attribute_tracker : A :: create ( & config) ,
8180 count : AtomicUsize :: new ( 0 ) ,
8281 config,
8382 }
8483 }
8584
85+ #[ inline]
86+ fn trackers_for_collect ( & self ) -> & RwLock < HashMap < Vec < KeyValue > , Arc < A > > > {
87+ self . trackers_for_collect
88+ . get_or_init ( || RwLock :: new ( HashMap :: with_capacity ( 1 + STREAM_CARDINALITY_LIMIT ) ) )
89+ }
90+
8691 fn measure ( & self , value : A :: PreComputedValue , attributes : & [ KeyValue ] ) {
8792 if attributes. is_empty ( ) {
8893 self . no_attribute_tracker . update ( value) ;
@@ -178,7 +183,7 @@ where
178183 ) ) ;
179184 }
180185
181- if let Ok ( mut trackers_collect) = self . trackers_for_collect . write ( ) {
186+ if let Ok ( mut trackers_collect) = self . trackers_for_collect ( ) . write ( ) {
182187 if let Ok ( mut trackers_current) = self . trackers . write ( ) {
183188 swap ( trackers_collect. deref_mut ( ) , trackers_current. deref_mut ( ) ) ;
184189 self . count . store ( 0 , Ordering :: SeqCst ) ;
0 commit comments