@@ -7,12 +7,12 @@ mod sum;
77
88use core:: fmt;
99use std:: collections:: { HashMap , HashSet } ;
10- use std:: mem:: take ;
10+ use std:: mem:: swap ;
1111use std:: ops:: { Add , AddAssign , DerefMut , Sub } ;
1212use std:: sync:: atomic:: { AtomicBool , AtomicI64 , AtomicU64 , AtomicUsize , Ordering } ;
1313use std:: sync:: { Arc , RwLock } ;
1414
15- use aggregate:: is_under_cardinality_limit;
15+ use aggregate:: { is_under_cardinality_limit, STREAM_CARDINALITY_LIMIT } ;
1616pub ( crate ) use aggregate:: { AggregateBuilder , ComputeAggregation , Measure } ;
1717pub ( crate ) use exponential_histogram:: { EXPO_MAX_SCALE , EXPO_MIN_SCALE } ;
1818use once_cell:: sync:: Lazy ;
5151{
5252 /// Trackers store the values associated with different attribute sets.
5353 trackers : RwLock < HashMap < Vec < KeyValue > , Arc < A > > > ,
54+
55+ /// Used by collect exclusively. The data type must match the one used in
56+ /// `trackers` to allow mem::swap.
57+ trackers_for_collect : RwLock < HashMap < Vec < KeyValue > , Arc < A > > > ,
58+
5459 /// Number of different attribute set stored in the `trackers` map.
5560 count : AtomicUsize ,
5661 /// Indicates whether a value with no attributes has been stored.
6772{
6873 fn new ( config : A :: InitConfig ) -> Self {
6974 ValueMap {
70- trackers : RwLock :: new ( HashMap :: new ( ) ) ,
75+ trackers : RwLock :: new ( HashMap :: with_capacity ( 1 + STREAM_CARDINALITY_LIMIT ) ) ,
76+ // TODO: For cumulative, this is not required, so avoid this
77+ // pre-allocation.
78+ trackers_for_collect : RwLock :: new ( HashMap :: with_capacity ( 1 + STREAM_CARDINALITY_LIMIT ) ) ,
7179 has_no_attribute_value : AtomicBool :: new ( false ) ,
7280 no_attribute_tracker : A :: create ( & config) ,
7381 count : AtomicUsize :: new ( 0 ) ,
@@ -170,19 +178,23 @@ where
170178 ) ) ;
171179 }
172180
173- let trackers = match self . trackers . write ( ) {
174- Ok ( mut trackers) => {
181+ if let Ok ( mut trackers_collect) = self . trackers_for_collect . write ( ) {
182+ if let Ok ( mut trackers_current) = self . trackers . write ( ) {
183+ swap ( trackers_collect. deref_mut ( ) , trackers_current. deref_mut ( ) ) ;
175184 self . count . store ( 0 , Ordering :: SeqCst ) ;
176- take ( trackers. deref_mut ( ) )
185+ } else {
186+ otel_warn ! ( name: "MeterProvider.InternalError" , message = "Metric collection failed. Report this issue in OpenTelemetry repo." , details ="ValueMap trackers lock poisoned" ) ;
187+ return ;
177188 }
178- Err ( _) => todo ! ( ) ,
179- } ;
180189
181- let mut seen = HashSet :: new ( ) ;
182- for ( attrs, tracker) in trackers. into_iter ( ) {
183- if seen. insert ( Arc :: as_ptr ( & tracker) ) {
184- dest. push ( map_fn ( attrs, tracker. clone_and_reset ( & self . config ) ) ) ;
190+ let mut seen = HashSet :: new ( ) ;
191+ for ( attrs, tracker) in trackers_collect. drain ( ) {
192+ if seen. insert ( Arc :: as_ptr ( & tracker) ) {
193+ dest. push ( map_fn ( attrs, tracker. clone_and_reset ( & self . config ) ) ) ;
194+ }
185195 }
196+ } else {
197+ otel_warn ! ( name: "MeterProvider.InternalError" , message = "Metric collection failed. Report this issue in OpenTelemetry repo." , details ="ValueMap trackers for collect lock poisoned" ) ;
186198 }
187199 }
188200}
0 commit comments