diff --git a/opentelemetry-sdk/src/metrics/internal/hashed.rs b/opentelemetry-sdk/src/metrics/internal/hashed.rs new file mode 100644 index 0000000000..d66d8f898e --- /dev/null +++ b/opentelemetry-sdk/src/metrics/internal/hashed.rs @@ -0,0 +1,133 @@ +use std::{ + borrow::{Borrow, Cow}, + hash::{BuildHasher, DefaultHasher, Hash, Hasher}, + ops::Deref, +}; + +/// Hash value only once, works with references and owned types. +pub(crate) struct Hashed<'a, T> +where + T: ToOwned + ?Sized, +{ + value: Cow<'a, T>, + hash: u64, +} + +impl<'a, T> Hashed<'a, T> +where + T: ToOwned + Hash + ?Sized, +{ + pub(crate) fn from_borrowed(value: &'a T) -> Self { + let hash = calc_hash(&value); + Self { + value: Cow::Borrowed(value), + hash, + } + } + + pub(crate) fn from_owned(value: ::Owned) -> Self { + let hash = calc_hash(value.borrow()); + Self { + value: Cow::Owned(value), + hash, + } + } + + pub(crate) fn into_owned(self) -> Hashed<'static, T> { + let value = self.value.into_owned(); + Hashed { + value: Cow::Owned(value), + hash: self.hash, + } + } + + pub(crate) fn into_inner_owned(self) -> T::Owned { + self.value.into_owned() + } +} + +fn calc_hash(value: T) -> u64 +where + T: Hash, +{ + let mut hasher = DefaultHasher::default(); + value.hash(&mut hasher); + hasher.finish() +} + +impl Clone for Hashed<'_, T> +where + T: ToOwned + ?Sized, +{ + fn clone(&self) -> Self { + Self { + value: self.value.clone(), + hash: self.hash, + } + } + + fn clone_from(&mut self, source: &Self) { + self.value.clone_from(&source.value); + self.hash = source.hash; + } +} + +impl Hash for Hashed<'_, T> +where + T: ToOwned + Hash + ?Sized, +{ + fn hash(&self, state: &mut H) { + state.write_u64(self.hash); + } +} + +impl PartialEq for Hashed<'_, T> +where + T: ToOwned + PartialEq + ?Sized, +{ + fn eq(&self, other: &Self) -> bool { + self.value.as_ref() == other.value.as_ref() + } +} + +impl Eq for Hashed<'_, T> where T: ToOwned + Eq + ?Sized {} + +impl Deref for Hashed<'_, T> +where + T: ToOwned + ?Sized, +{ + type Target = T; + + fn deref(&self) -> &Self::Target { + self.value.deref() + } +} + +/// Used to make [`Hashed`] values no-op in [`HashMap`](std::collections::HashMap) or [`HashSet`](std::collections::HashSet). +/// For all other keys types (except for [`u64`]) it will panic. +#[derive(Default, Clone)] +pub(crate) struct HashedNoOpBuilder { + hashed: u64, +} + +impl Hasher for HashedNoOpBuilder { + fn finish(&self) -> u64 { + self.hashed + } + + fn write(&mut self, _bytes: &[u8]) { + panic!("Only works with `Hashed` value") + } + + fn write_u64(&mut self, i: u64) { + self.hashed = i; + } +} + +impl BuildHasher for HashedNoOpBuilder { + type Hasher = HashedNoOpBuilder; + + fn build_hasher(&self) -> Self::Hasher { + HashedNoOpBuilder::default() + } +} diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 8b6136d7ce..6f5ac6acae 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -1,5 +1,6 @@ mod aggregate; mod exponential_histogram; +mod hashed; mod histogram; mod last_value; mod precomputed_sum; @@ -15,13 +16,14 @@ use std::sync::{Arc, RwLock}; use aggregate::is_under_cardinality_limit; pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure}; pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE}; +use hashed::{Hashed, HashedNoOpBuilder}; use once_cell::sync::Lazy; use opentelemetry::{otel_warn, KeyValue}; -use crate::metrics::AttributeSet; +use super::sort_and_dedup; -pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy> = - Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]); +pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy> = + Lazy::new(|| Hashed::from_owned(vec![KeyValue::new("otel.metric.overflow", "true")])); pub(crate) trait Aggregator { /// A static configuration that is needed in order to initialize aggregator. @@ -52,7 +54,7 @@ where A: Aggregator, { /// Trackers store the values associated with different attribute sets. - trackers: RwLock, Arc>>, + trackers: RwLock, Arc, HashedNoOpBuilder>>, /// Number of different attribute set stored in the `trackers` map. count: AtomicUsize, /// Indicates whether a value with no attributes has been stored. @@ -69,7 +71,7 @@ where { fn new(config: A::InitConfig) -> Self { ValueMap { - trackers: RwLock::new(HashMap::new()), + trackers: RwLock::new(HashMap::default()), has_no_attribute_value: AtomicBool::new(false), no_attribute_tracker: A::create(&config), count: AtomicUsize::new(0), @@ -84,19 +86,21 @@ where return; } + let attributes = Hashed::from_borrowed(attributes); + let Ok(trackers) = self.trackers.read() else { return; }; // Try to retrieve and update the tracker with the attributes in the provided order first - if let Some(tracker) = trackers.get(attributes) { + if let Some(tracker) = trackers.get(&attributes) { tracker.update(value); return; } // Try to retrieve and update the tracker with the attributes sorted. - let sorted_attrs = AttributeSet::from(attributes).into_vec(); - if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { + let sorted_attrs = Hashed::from_owned(sort_and_dedup(&attributes)); + if let Some(tracker) = trackers.get(&sorted_attrs) { tracker.update(value); return; } @@ -110,20 +114,20 @@ where // Recheck both the provided and sorted orders after acquiring the write lock // in case another thread has pushed an update in the meantime. - if let Some(tracker) = trackers.get(attributes) { + if let Some(tracker) = trackers.get(&attributes) { tracker.update(value); - } else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) { + } else if let Some(tracker) = trackers.get(&sorted_attrs) { tracker.update(value); } else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) { let new_tracker = Arc::new(A::create(&self.config)); new_tracker.update(value); // Insert tracker with the attributes in the provided and sorted orders - trackers.insert(attributes.to_vec(), new_tracker.clone()); + trackers.insert(attributes.into_owned(), new_tracker.clone()); trackers.insert(sorted_attrs, new_tracker); self.count.fetch_add(1, Ordering::SeqCst); - } else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) { + } else if let Some(overflow_value) = trackers.get(&STREAM_OVERFLOW_ATTRIBUTES) { overflow_value.update(value); } else { let new_tracker = A::create(&self.config); @@ -153,7 +157,7 @@ where let mut seen = HashSet::new(); for (attrs, tracker) in trackers.iter() { if seen.insert(Arc::as_ptr(tracker)) { - dest.push(map_fn(attrs.clone(), tracker)); + dest.push(map_fn(attrs.clone().into_inner_owned(), tracker)); } } } @@ -183,7 +187,10 @@ where let mut seen = HashSet::new(); for (attrs, tracker) in trackers.into_iter() { if seen.insert(Arc::as_ptr(&tracker)) { - dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config))); + dest.push(map_fn( + attrs.into_inner_owned(), + tracker.clone_and_reset(&self.config), + )); } } } @@ -392,6 +399,7 @@ impl AtomicallyUpdate for f64 { #[cfg(test)] mod tests { + use super::*; #[test] diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 7db8a63ec2..54a606243b 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -115,23 +115,27 @@ pub(crate) struct AttributeSet(Vec, u64); impl From<&[KeyValue]> for AttributeSet { fn from(values: &[KeyValue]) -> Self { - let mut seen_keys = HashSet::with_capacity(values.len()); - let vec = values - .iter() - .rev() - .filter_map(|kv| { - if seen_keys.insert(kv.key.clone()) { - Some(kv.clone()) - } else { - None - } - }) - .collect::>(); - - AttributeSet::new(vec) + AttributeSet::new(sort_and_dedup(values)) } } +pub(crate) fn sort_and_dedup(values: &[KeyValue]) -> Vec { + let mut seen_keys = HashSet::with_capacity(values.len()); + let mut vec = values + .iter() + .rev() + .filter_map(|kv| { + if seen_keys.insert(kv.key.clone()) { + Some(kv.clone()) + } else { + None + } + }) + .collect::>(); + vec.sort_unstable_by(|a, b| a.key.cmp(&b.key)); + vec +} + fn calculate_hash(values: &[KeyValue]) -> u64 { let mut hasher = DefaultHasher::new(); values.iter().fold(&mut hasher, |mut hasher, item| { @@ -142,8 +146,7 @@ fn calculate_hash(values: &[KeyValue]) -> u64 { } impl AttributeSet { - fn new(mut values: Vec) -> Self { - values.sort_unstable_by(|a, b| a.key.cmp(&b.key)); + fn new(values: Vec) -> Self { let hash = calculate_hash(&values); AttributeSet(values, hash) } @@ -152,11 +155,6 @@ impl AttributeSet { pub(crate) fn iter(&self) -> impl Iterator { self.0.iter().map(|kv| (&kv.key, &kv.value)) } - - /// Returns the underlying Vec of KeyValue pairs - pub(crate) fn into_vec(self) -> Vec { - self.0 - } } impl Hash for AttributeSet {