@@ -6,11 +6,12 @@ mod precomputed_sum;
66mod sum;
77
88use core:: fmt;
9- use std:: collections:: { HashMap , HashSet } ;
10- use std:: mem:: take;
9+ use std:: collections:: hash_map:: Entry ;
10+ use std:: collections:: HashMap ;
11+ use std:: mem:: replace;
1112use std:: ops:: { Add , AddAssign , DerefMut , Sub } ;
12- use std:: sync:: atomic:: { AtomicBool , AtomicI64 , AtomicU64 , AtomicUsize , Ordering } ;
13- use std:: sync:: { Arc , RwLock } ;
13+ use std:: sync:: atomic:: { AtomicBool , AtomicI64 , AtomicU64 , Ordering } ;
14+ use std:: sync:: { Arc , Mutex , RwLock } ;
1415
1516use aggregate:: is_under_cardinality_limit;
1617pub ( crate ) use aggregate:: { AggregateBuilder , ComputeAggregation , Measure } ;
@@ -43,6 +44,11 @@ pub(crate) trait Aggregator {
4344 fn clone_and_reset ( & self , init : & Self :: InitConfig ) -> Self ;
4445}
4546
47+ struct NoAttribs < A > {
48+ tracker : A ,
49+ is_set : AtomicBool ,
50+ }
51+
4652/// The storage for sums.
4753///
4854/// This structure is parametrized by an `Operation` that indicates how
@@ -51,14 +57,13 @@ pub(crate) struct ValueMap<A>
5157where
5258 A : Aggregator ,
5359{
54- /// Trackers store the values associated with different attribute sets.
55- trackers : RwLock < HashMap < Vec < KeyValue > , Arc < A > > > ,
56- /// Number of different attribute set stored in the `trackers` map.
57- count : AtomicUsize ,
58- /// Indicates whether a value with no attributes has been stored.
59- has_no_attribute_value : AtomicBool ,
60- /// Tracker for values with no attributes attached.
61- no_attribute_tracker : A ,
60+ // for performance reasons, no_attribs tracker
61+ no_attribs : NoAttribs < A > ,
62+ // for performance reasons, to handle attributes in the provided order
63+ all_attribs : RwLock < HashMap < Vec < KeyValue > , Arc < A > > > ,
64+ // different order of attribute keys should still map to same tracker instance
65+ // this helps to achieve that and also enables implementing collection efficiently
66+ sorted_attribs : Mutex < HashMap < Vec < KeyValue > , Arc < A > > > ,
6267 /// Configuration for an Aggregator
6368 config : A :: InitConfig ,
6469}
@@ -69,70 +74,68 @@ where
6974{
7075 fn new ( config : A :: InitConfig ) -> Self {
7176 ValueMap {
72- trackers : RwLock :: new ( HashMap :: new ( ) ) ,
73- has_no_attribute_value : AtomicBool :: new ( false ) ,
74- no_attribute_tracker : A :: create ( & config) ,
75- count : AtomicUsize :: new ( 0 ) ,
77+ no_attribs : NoAttribs {
78+ tracker : A :: create ( & config) ,
79+ is_set : AtomicBool :: new ( false ) ,
80+ } ,
81+ all_attribs : RwLock :: new ( HashMap :: new ( ) ) ,
82+ sorted_attribs : Mutex :: new ( HashMap :: new ( ) ) ,
7683 config,
7784 }
7885 }
7986
8087 fn measure ( & self , value : A :: PreComputedValue , attributes : & [ KeyValue ] ) {
8188 if attributes. is_empty ( ) {
82- self . no_attribute_tracker . update ( value) ;
83- self . has_no_attribute_value . store ( true , Ordering :: Release ) ;
89+ self . no_attribs . tracker . update ( value) ;
90+ self . no_attribs . is_set . store ( true , Ordering :: Release ) ;
8491 return ;
8592 }
8693
87- let Ok ( trackers) = self . trackers . read ( ) else {
88- return ;
89- } ;
90-
9194 // Try to retrieve and update the tracker with the attributes in the provided order first
92- if let Some ( tracker) = trackers. get ( attributes) {
93- tracker. update ( value) ;
94- return ;
95- }
95+ match self . all_attribs . read ( ) {
96+ Ok ( trackers) => {
97+ if let Some ( tracker) = trackers. get ( attributes) {
98+ tracker. update ( value) ;
99+ return ;
100+ }
101+ }
102+ Err ( _) => return ,
103+ } ;
96104
97- // Try to retrieve and update the tracker with the attributes sorted.
105+ // Get or create a tracker
98106 let sorted_attrs = AttributeSet :: from ( attributes) . into_vec ( ) ;
99- if let Some ( tracker) = trackers. get ( sorted_attrs. as_slice ( ) ) {
100- tracker. update ( value) ;
107+ let Ok ( mut sorted_trackers) = self . sorted_attribs . lock ( ) else {
101108 return ;
102- }
109+ } ;
110+
111+ let sorted_count = sorted_trackers. len ( ) ;
112+ let new_tracker = match sorted_trackers. entry ( sorted_attrs) {
113+ Entry :: Occupied ( occupied_entry) => occupied_entry. get ( ) . clone ( ) ,
114+ Entry :: Vacant ( vacant_entry) => {
115+ if !is_under_cardinality_limit ( sorted_count) {
116+ sorted_trackers. entry ( STREAM_OVERFLOW_ATTRIBUTES . clone ( ) )
117+ . or_insert_with ( || {
118+ otel_warn ! ( name: "ValueMap.measure" ,
119+ message = "Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged."
120+ ) ;
121+ Arc :: new ( A :: create ( & self . config ) )
122+ } )
123+ . update ( value) ;
124+ return ;
125+ }
126+ let new_tracker = Arc :: new ( A :: create ( & self . config ) ) ;
127+ vacant_entry. insert ( new_tracker) . clone ( )
128+ }
129+ } ;
130+ drop ( sorted_trackers) ;
103131
104- // Give up the read lock before acquiring the write lock.
105- drop ( trackers) ;
132+ new_tracker. update ( value) ;
106133
107- let Ok ( mut trackers) = self . trackers . write ( ) else {
134+ // Insert new tracker, so we could find it next time
135+ let Ok ( mut all_trackers) = self . all_attribs . write ( ) else {
108136 return ;
109137 } ;
110-
111- // Recheck both the provided and sorted orders after acquiring the write lock
112- // in case another thread has pushed an update in the meantime.
113- if let Some ( tracker) = trackers. get ( attributes) {
114- tracker. update ( value) ;
115- } else if let Some ( tracker) = trackers. get ( sorted_attrs. as_slice ( ) ) {
116- tracker. update ( value) ;
117- } else if is_under_cardinality_limit ( self . count . load ( Ordering :: SeqCst ) ) {
118- let new_tracker = Arc :: new ( A :: create ( & self . config ) ) ;
119- new_tracker. update ( value) ;
120-
121- // Insert tracker with the attributes in the provided and sorted orders
122- trackers. insert ( attributes. to_vec ( ) , new_tracker. clone ( ) ) ;
123- trackers. insert ( sorted_attrs, new_tracker) ;
124-
125- self . count . fetch_add ( 1 , Ordering :: SeqCst ) ;
126- } else if let Some ( overflow_value) = trackers. get ( STREAM_OVERFLOW_ATTRIBUTES . as_slice ( ) ) {
127- overflow_value. update ( value) ;
128- } else {
129- let new_tracker = A :: create ( & self . config ) ;
130- new_tracker. update ( value) ;
131- trackers. insert ( STREAM_OVERFLOW_ATTRIBUTES . clone ( ) , Arc :: new ( new_tracker) ) ;
132- otel_warn ! ( name: "ValueMap.measure" ,
133- message = "Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged."
134- ) ;
135- }
138+ all_trackers. insert ( attributes. to_vec ( ) , new_tracker) ;
136139 }
137140
138141 /// Iterate through all attribute sets and populate `DataPoints` in readonly mode.
@@ -141,20 +144,23 @@ where
141144 where
142145 MapFn : FnMut ( Vec < KeyValue > , & A ) -> Res ,
143146 {
144- prepare_data ( dest , self . count . load ( Ordering :: SeqCst ) ) ;
145- if self . has_no_attribute_value . load ( Ordering :: Acquire ) {
146- dest . push ( map_fn ( vec ! [ ] , & self . no_attribute_tracker ) ) ;
147- }
148-
149- let Ok ( trackers ) = self . trackers . read ( ) else {
150- return ;
147+ let trackers = match self . sorted_attribs . lock ( ) {
148+ Ok ( trackers ) => {
149+ // it's important to release lock as fast as possible,
150+ // so we don't block insertion of new attribute sets
151+ trackers . clone ( )
152+ }
153+ Err ( _ ) => return ,
151154 } ;
152155
153- let mut seen = HashSet :: new ( ) ;
154- for ( attrs, tracker) in trackers. iter ( ) {
155- if seen. insert ( Arc :: as_ptr ( tracker) ) {
156- dest. push ( map_fn ( attrs. clone ( ) , tracker) ) ;
157- }
156+ prepare_data ( dest, trackers. len ( ) ) ;
157+
158+ if self . no_attribs . is_set . load ( Ordering :: Acquire ) {
159+ dest. push ( map_fn ( vec ! [ ] , & self . no_attribs . tracker ) ) ;
160+ }
161+
162+ for ( attrs, tracker) in trackers. into_iter ( ) {
163+ dest. push ( map_fn ( attrs, & tracker) ) ;
158164 }
159165 }
160166
@@ -164,35 +170,40 @@ where
164170 where
165171 MapFn : FnMut ( Vec < KeyValue > , A ) -> Res ,
166172 {
167- prepare_data ( dest, self . count . load ( Ordering :: SeqCst ) ) ;
168- if self . has_no_attribute_value . swap ( false , Ordering :: AcqRel ) {
173+ // reset sorted trackers so new attributes set will be written into new hashmap
174+ let trackers = match self . sorted_attribs . lock ( ) {
175+ Ok ( mut trackers) => {
176+ let new = HashMap :: with_capacity ( trackers. len ( ) ) ;
177+ replace ( trackers. deref_mut ( ) , new)
178+ }
179+ Err ( _) => return ,
180+ } ;
181+ // reset all trackers, so all attribute sets will start using new hashmap
182+ match self . all_attribs . write ( ) {
183+ Ok ( mut all_trackers) => all_trackers. clear ( ) ,
184+ Err ( _) => return ,
185+ } ;
186+
187+ prepare_data ( dest, trackers. len ( ) ) ;
188+
189+ if self . no_attribs . is_set . swap ( false , Ordering :: AcqRel ) {
169190 dest. push ( map_fn (
170191 vec ! [ ] ,
171- self . no_attribute_tracker . clone_and_reset ( & self . config ) ,
192+ self . no_attribs . tracker . clone_and_reset ( & self . config ) ,
172193 ) ) ;
173194 }
174195
175- let trackers = match self . trackers . write ( ) {
176- Ok ( mut trackers) => {
177- self . count . store ( 0 , Ordering :: SeqCst ) ;
178- take ( trackers. deref_mut ( ) )
179- }
180- Err ( _) => todo ! ( ) ,
181- } ;
182-
183- let mut seen = HashSet :: new ( ) ;
184196 for ( attrs, tracker) in trackers. into_iter ( ) {
185- if seen. insert ( Arc :: as_ptr ( & tracker) ) {
186- dest. push ( map_fn ( attrs, tracker. clone_and_reset ( & self . config ) ) ) ;
187- }
197+ let tracker = Arc :: into_inner ( tracker) . expect ( "the only instance" ) ;
198+ dest. push ( map_fn ( attrs, tracker) ) ;
188199 }
189200 }
190201}
191202
192203/// Clear and allocate exactly required amount of space for all attribute-sets
193204fn prepare_data < T > ( data : & mut Vec < T > , list_len : usize ) {
194205 data. clear ( ) ;
195- let total_len = list_len + 2 ; // to account for no_attributes case + overflow state
206+ let total_len = list_len + 1 ; // to account for no_attributes case
196207 if total_len > data. capacity ( ) {
197208 data. reserve_exact ( total_len - data. capacity ( ) ) ;
198209 }
0 commit comments