@@ -6,11 +6,12 @@ mod precomputed_sum;
66mod sum;
77
88use core:: fmt;
9- use std:: collections:: { HashMap , HashSet } ;
10- use std:: mem:: take;
9+ use std:: collections:: hash_map:: Entry ;
10+ use std:: collections:: HashMap ;
11+ use std:: mem:: replace;
1112use std:: ops:: { Add , AddAssign , DerefMut , Sub } ;
12- use std:: sync:: atomic:: { AtomicBool , AtomicI64 , AtomicU64 , AtomicUsize , Ordering } ;
13- use std:: sync:: { Arc , RwLock } ;
13+ use std:: sync:: atomic:: { AtomicBool , AtomicI64 , AtomicU64 , Ordering } ;
14+ use std:: sync:: { Arc , Mutex , RwLock } ;
1415
1516use aggregate:: is_under_cardinality_limit;
1617pub ( crate ) use aggregate:: { AggregateBuilder , ComputeAggregation , Measure } ;
@@ -41,6 +42,11 @@ pub(crate) trait Aggregator {
4142 fn clone_and_reset ( & self , init : & Self :: InitConfig ) -> Self ;
4243}
4344
45+ struct NoAttribs < A > {
46+ tracker : A ,
47+ is_set : AtomicBool ,
48+ }
49+
4450/// The storage for sums.
4551///
4652/// This structure is parametrized by an `Operation` that indicates how
@@ -49,14 +55,13 @@ pub(crate) struct ValueMap<A>
4955where
5056 A : Aggregator ,
5157{
52- /// Trackers store the values associated with different attribute sets.
53- trackers : RwLock < HashMap < Vec < KeyValue > , Arc < A > > > ,
54- /// Number of different attribute set stored in the `trackers` map.
55- count : AtomicUsize ,
56- /// Indicates whether a value with no attributes has been stored.
57- has_no_attribute_value : AtomicBool ,
58- /// Tracker for values with no attributes attached.
59- no_attribute_tracker : A ,
58+ // for performance reasons, no_attribs tracker
59+ no_attribs : NoAttribs < A > ,
60+ // for performance reasons, to handle attributes in the provided order
61+ all_attribs : RwLock < HashMap < Vec < KeyValue > , Arc < A > > > ,
62+ // different order of attribute keys should still map to same tracker instance
63+ // this helps to achieve that and also enables implementing collection efficiently
64+ sorted_attribs : Mutex < HashMap < Vec < KeyValue > , Arc < A > > > ,
6065 /// Configuration for an Aggregator
6166 config : A :: InitConfig ,
6267}
@@ -67,70 +72,68 @@ where
6772{
6873 fn new ( config : A :: InitConfig ) -> Self {
6974 ValueMap {
70- trackers : RwLock :: new ( HashMap :: new ( ) ) ,
71- has_no_attribute_value : AtomicBool :: new ( false ) ,
72- no_attribute_tracker : A :: create ( & config) ,
73- count : AtomicUsize :: new ( 0 ) ,
75+ no_attribs : NoAttribs {
76+ tracker : A :: create ( & config) ,
77+ is_set : AtomicBool :: new ( false ) ,
78+ } ,
79+ all_attribs : RwLock :: new ( HashMap :: new ( ) ) ,
80+ sorted_attribs : Mutex :: new ( HashMap :: new ( ) ) ,
7481 config,
7582 }
7683 }
7784
7885 fn measure ( & self , value : A :: PreComputedValue , attributes : & [ KeyValue ] ) {
7986 if attributes. is_empty ( ) {
80- self . no_attribute_tracker . update ( value) ;
81- self . has_no_attribute_value . store ( true , Ordering :: Release ) ;
87+ self . no_attribs . tracker . update ( value) ;
88+ self . no_attribs . is_set . store ( true , Ordering :: Release ) ;
8289 return ;
8390 }
8491
85- let Ok ( trackers) = self . trackers . read ( ) else {
86- return ;
87- } ;
88-
8992 // Try to retrieve and update the tracker with the attributes in the provided order first
90- if let Some ( tracker) = trackers. get ( attributes) {
91- tracker. update ( value) ;
92- return ;
93- }
93+ match self . all_attribs . read ( ) {
94+ Ok ( trackers) => {
95+ if let Some ( tracker) = trackers. get ( attributes) {
96+ tracker. update ( value) ;
97+ return ;
98+ }
99+ }
100+ Err ( _) => return ,
101+ } ;
94102
95- // Try to retrieve and update the tracker with the attributes sorted.
103+ // Get or create a tracker
96104 let sorted_attrs = sort_and_dedup ( attributes) ;
97- if let Some ( tracker) = trackers. get ( sorted_attrs. as_slice ( ) ) {
98- tracker. update ( value) ;
105+ let Ok ( mut sorted_trackers) = self . sorted_attribs . lock ( ) else {
99106 return ;
100- }
107+ } ;
108+
109+ let sorted_count = sorted_trackers. len ( ) ;
110+ let new_tracker = match sorted_trackers. entry ( sorted_attrs) {
111+ Entry :: Occupied ( occupied_entry) => occupied_entry. get ( ) . clone ( ) ,
112+ Entry :: Vacant ( vacant_entry) => {
113+ if !is_under_cardinality_limit ( sorted_count) {
114+ sorted_trackers. entry ( STREAM_OVERFLOW_ATTRIBUTES . clone ( ) )
115+ . or_insert_with ( || {
116+ otel_warn ! ( name: "ValueMap.measure" ,
117+ message = "Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged."
118+ ) ;
119+ Arc :: new ( A :: create ( & self . config ) )
120+ } )
121+ . update ( value) ;
122+ return ;
123+ }
124+ let new_tracker = Arc :: new ( A :: create ( & self . config ) ) ;
125+ vacant_entry. insert ( new_tracker) . clone ( )
126+ }
127+ } ;
128+ drop ( sorted_trackers) ;
101129
102- // Give up the read lock before acquiring the write lock.
103- drop ( trackers) ;
130+ new_tracker. update ( value) ;
104131
105- let Ok ( mut trackers) = self . trackers . write ( ) else {
132+ // Insert new tracker, so we could find it next time
133+ let Ok ( mut all_trackers) = self . all_attribs . write ( ) else {
106134 return ;
107135 } ;
108-
109- // Recheck both the provided and sorted orders after acquiring the write lock
110- // in case another thread has pushed an update in the meantime.
111- if let Some ( tracker) = trackers. get ( attributes) {
112- tracker. update ( value) ;
113- } else if let Some ( tracker) = trackers. get ( sorted_attrs. as_slice ( ) ) {
114- tracker. update ( value) ;
115- } else if is_under_cardinality_limit ( self . count . load ( Ordering :: SeqCst ) ) {
116- let new_tracker = Arc :: new ( A :: create ( & self . config ) ) ;
117- new_tracker. update ( value) ;
118-
119- // Insert tracker with the attributes in the provided and sorted orders
120- trackers. insert ( attributes. to_vec ( ) , new_tracker. clone ( ) ) ;
121- trackers. insert ( sorted_attrs, new_tracker) ;
122-
123- self . count . fetch_add ( 1 , Ordering :: SeqCst ) ;
124- } else if let Some ( overflow_value) = trackers. get ( STREAM_OVERFLOW_ATTRIBUTES . as_slice ( ) ) {
125- overflow_value. update ( value) ;
126- } else {
127- let new_tracker = A :: create ( & self . config ) ;
128- new_tracker. update ( value) ;
129- trackers. insert ( STREAM_OVERFLOW_ATTRIBUTES . clone ( ) , Arc :: new ( new_tracker) ) ;
130- otel_warn ! ( name: "ValueMap.measure" ,
131- message = "Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged."
132- ) ;
133- }
136+ all_trackers. insert ( attributes. to_vec ( ) , new_tracker) ;
134137 }
135138
136139 /// Iterate through all attribute sets and populate `DataPoints` in readonly mode.
@@ -139,20 +142,23 @@ where
139142 where
140143 MapFn : FnMut ( Vec < KeyValue > , & A ) -> Res ,
141144 {
142- prepare_data ( dest , self . count . load ( Ordering :: SeqCst ) ) ;
143- if self . has_no_attribute_value . load ( Ordering :: Acquire ) {
144- dest . push ( map_fn ( vec ! [ ] , & self . no_attribute_tracker ) ) ;
145- }
146-
147- let Ok ( trackers ) = self . trackers . read ( ) else {
148- return ;
145+ let trackers = match self . sorted_attribs . lock ( ) {
146+ Ok ( trackers ) => {
147+ // it's important to release lock as fast as possible,
148+ // so we don't block insertion of new attribute sets
149+ trackers . clone ( )
150+ }
151+ Err ( _ ) => return ,
149152 } ;
150153
151- let mut seen = HashSet :: new ( ) ;
152- for ( attrs, tracker) in trackers. iter ( ) {
153- if seen. insert ( Arc :: as_ptr ( tracker) ) {
154- dest. push ( map_fn ( attrs. clone ( ) , tracker) ) ;
155- }
154+ prepare_data ( dest, trackers. len ( ) ) ;
155+
156+ if self . no_attribs . is_set . load ( Ordering :: Acquire ) {
157+ dest. push ( map_fn ( vec ! [ ] , & self . no_attribs . tracker ) ) ;
158+ }
159+
160+ for ( attrs, tracker) in trackers. into_iter ( ) {
161+ dest. push ( map_fn ( attrs, & tracker) ) ;
156162 }
157163 }
158164
@@ -162,35 +168,40 @@ where
162168 where
163169 MapFn : FnMut ( Vec < KeyValue > , A ) -> Res ,
164170 {
165- prepare_data ( dest, self . count . load ( Ordering :: SeqCst ) ) ;
166- if self . has_no_attribute_value . swap ( false , Ordering :: AcqRel ) {
171+ // reset sorted trackers so new attributes set will be written into new hashmap
172+ let trackers = match self . sorted_attribs . lock ( ) {
173+ Ok ( mut trackers) => {
174+ let new = HashMap :: with_capacity ( trackers. len ( ) ) ;
175+ replace ( trackers. deref_mut ( ) , new)
176+ }
177+ Err ( _) => return ,
178+ } ;
179+ // reset all trackers, so all attribute sets will start using new hashmap
180+ match self . all_attribs . write ( ) {
181+ Ok ( mut all_trackers) => all_trackers. clear ( ) ,
182+ Err ( _) => return ,
183+ } ;
184+
185+ prepare_data ( dest, trackers. len ( ) ) ;
186+
187+ if self . no_attribs . is_set . swap ( false , Ordering :: AcqRel ) {
167188 dest. push ( map_fn (
168189 vec ! [ ] ,
169- self . no_attribute_tracker . clone_and_reset ( & self . config ) ,
190+ self . no_attribs . tracker . clone_and_reset ( & self . config ) ,
170191 ) ) ;
171192 }
172193
173- let trackers = match self . trackers . write ( ) {
174- Ok ( mut trackers) => {
175- self . count . store ( 0 , Ordering :: SeqCst ) ;
176- take ( trackers. deref_mut ( ) )
177- }
178- Err ( _) => todo ! ( ) ,
179- } ;
180-
181- let mut seen = HashSet :: new ( ) ;
182194 for ( attrs, tracker) in trackers. into_iter ( ) {
183- if seen. insert ( Arc :: as_ptr ( & tracker) ) {
184- dest. push ( map_fn ( attrs, tracker. clone_and_reset ( & self . config ) ) ) ;
185- }
195+ let tracker = Arc :: into_inner ( tracker) . expect ( "the only instance" ) ;
196+ dest. push ( map_fn ( attrs, tracker) ) ;
186197 }
187198 }
188199}
189200
190201/// Clear and allocate exactly required amount of space for all attribute-sets
191202fn prepare_data < T > ( data : & mut Vec < T > , list_len : usize ) {
192203 data. clear ( ) ;
193- let total_len = list_len + 2 ; // to account for no_attributes case + overflow state
204+ let total_len = list_len + 1 ; // to account for no_attributes case
194205 if total_len > data. capacity ( ) {
195206 data. reserve_exact ( total_len - data. capacity ( ) ) ;
196207 }
0 commit comments