@@ -10,7 +10,7 @@ use std::collections::{HashMap, HashSet};
1010use std:: mem:: swap;
1111use std:: ops:: { Add , AddAssign , DerefMut , Sub } ;
1212use std:: sync:: atomic:: { AtomicBool , AtomicI64 , AtomicU64 , AtomicUsize , Ordering } ;
13- use std:: sync:: { Arc , RwLock } ;
13+ use std:: sync:: { Arc , OnceLock , RwLock } ;
1414
1515use aggregate:: { is_under_cardinality_limit, STREAM_CARDINALITY_LIMIT } ;
1616pub ( crate ) use aggregate:: { AggregateBuilder , ComputeAggregation , Measure } ;
5757 /// Trackers store the values associated with different attribute sets.
5858 trackers : RwLock < HashMap < Vec < KeyValue > , Arc < A > > > ,
5959
60- /// Used by collect exclusively. The data type must match the one used in
61- /// `trackers` to allow mem::swap.
62- trackers_for_collect : RwLock < HashMap < Vec < KeyValue > , Arc < A > > > ,
60+ /// Used ONLY by Delta collect. The data type must match the one used in
61+ /// `trackers` to allow mem::swap. Wrapping the type in `OnceLock` to
62+ /// avoid this allocation for Cumulative aggregation.
63+ trackers_for_collect : OnceLock < RwLock < HashMap < Vec < KeyValue > , Arc < A > > > > ,
6364
6465 /// Number of different attribute set stored in the `trackers` map.
6566 count : AtomicUsize ,
@@ -78,16 +79,20 @@ where
7879 fn new ( config : A :: InitConfig ) -> Self {
7980 ValueMap {
8081 trackers : RwLock :: new ( HashMap :: with_capacity ( 1 + STREAM_CARDINALITY_LIMIT ) ) ,
81- // TODO: For cumulative, this is not required, so avoid this
82- // pre-allocation.
83- trackers_for_collect : RwLock :: new ( HashMap :: with_capacity ( 1 + STREAM_CARDINALITY_LIMIT ) ) ,
82+ trackers_for_collect : OnceLock :: new ( ) ,
8483 has_no_attribute_value : AtomicBool :: new ( false ) ,
8584 no_attribute_tracker : A :: create ( & config) ,
8685 count : AtomicUsize :: new ( 0 ) ,
8786 config,
8887 }
8988 }
9089
90+ #[ inline]
91+ fn trackers_for_collect ( & self ) -> & RwLock < HashMap < Vec < KeyValue > , Arc < A > > > {
92+ self . trackers_for_collect
93+ . get_or_init ( || RwLock :: new ( HashMap :: with_capacity ( 1 + STREAM_CARDINALITY_LIMIT ) ) )
94+ }
95+
9196 fn measure ( & self , value : A :: PreComputedValue , attributes : & [ KeyValue ] ) {
9297 if attributes. is_empty ( ) {
9398 self . no_attribute_tracker . update ( value) ;
@@ -183,7 +188,7 @@ where
183188 ) ) ;
184189 }
185190
186- if let Ok ( mut trackers_collect) = self . trackers_for_collect . write ( ) {
191+ if let Ok ( mut trackers_collect) = self . trackers_for_collect ( ) . write ( ) {
187192 if let Ok ( mut trackers_current) = self . trackers . write ( ) {
188193 swap ( trackers_collect. deref_mut ( ) , trackers_current. deref_mut ( ) ) ;
189194 self . count . store ( 0 , Ordering :: SeqCst ) ;
0 commit comments