Skip to content

Commit 502a208

Browse files
committed
merged value-map hash once
2 parents 2c1393d + 1e4053f commit 502a208

File tree

3 files changed

+170
-34
lines changed

3 files changed

+170
-34
lines changed
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
use std::{
2+
borrow::{Borrow, Cow},
3+
hash::{BuildHasher, DefaultHasher, Hash, Hasher},
4+
ops::Deref,
5+
};
6+
7+
/// Hash value only once, works with references and owned types.
8+
pub(crate) struct Hashed<'a, T>
9+
where
10+
T: ToOwned + ?Sized,
11+
{
12+
value: Cow<'a, T>,
13+
hash: u64,
14+
}
15+
16+
impl<'a, T> Hashed<'a, T>
17+
where
18+
T: ToOwned + Hash + ?Sized,
19+
{
20+
pub(crate) fn from_borrowed(value: &'a T) -> Self {
21+
let hash = calc_hash(&value);
22+
Self {
23+
value: Cow::Borrowed(value),
24+
hash,
25+
}
26+
}
27+
28+
pub(crate) fn from_owned(value: <T as ToOwned>::Owned) -> Self {
29+
let hash = calc_hash(value.borrow());
30+
Self {
31+
value: Cow::Owned(value),
32+
hash,
33+
}
34+
}
35+
36+
pub(crate) fn into_owned(self) -> Hashed<'static, T> {
37+
let value = self.value.into_owned();
38+
Hashed {
39+
value: Cow::Owned(value),
40+
hash: self.hash,
41+
}
42+
}
43+
44+
pub(crate) fn into_inner_owned(self) -> T::Owned {
45+
self.value.into_owned()
46+
}
47+
}
48+
49+
fn calc_hash<T>(value: T) -> u64
50+
where
51+
T: Hash,
52+
{
53+
let mut hasher = DefaultHasher::default();
54+
value.hash(&mut hasher);
55+
hasher.finish()
56+
}
57+
58+
impl<T> Clone for Hashed<'_, T>
59+
where
60+
T: ToOwned + ?Sized,
61+
{
62+
fn clone(&self) -> Self {
63+
Self {
64+
value: self.value.clone(),
65+
hash: self.hash,
66+
}
67+
}
68+
69+
fn clone_from(&mut self, source: &Self) {
70+
self.value.clone_from(&source.value);
71+
self.hash = source.hash;
72+
}
73+
}
74+
75+
impl<T> Hash for Hashed<'_, T>
76+
where
77+
T: ToOwned + Hash + ?Sized,
78+
{
79+
fn hash<H: Hasher>(&self, state: &mut H) {
80+
state.write_u64(self.hash);
81+
}
82+
}
83+
84+
impl<T> PartialEq for Hashed<'_, T>
85+
where
86+
T: ToOwned + PartialEq + ?Sized,
87+
{
88+
fn eq(&self, other: &Self) -> bool {
89+
self.value.as_ref() == other.value.as_ref()
90+
}
91+
}
92+
93+
impl<T> Eq for Hashed<'_, T> where T: ToOwned + Eq + ?Sized {}
94+
95+
impl<T> Deref for Hashed<'_, T>
96+
where
97+
T: ToOwned + ?Sized,
98+
{
99+
type Target = T;
100+
101+
fn deref(&self) -> &Self::Target {
102+
self.value.deref()
103+
}
104+
}
105+
106+
/// Used to make [`Hashed`] values no-op in [`HashMap`](std::collections::HashMap) or [`HashSet`](std::collections::HashSet).
107+
/// For all other keys types (except for [`u64`]) it will panic.
108+
#[derive(Default, Clone)]
109+
pub(crate) struct HashedNoOpBuilder {
110+
hashed: u64,
111+
}
112+
113+
impl Hasher for HashedNoOpBuilder {
114+
fn finish(&self) -> u64 {
115+
self.hashed
116+
}
117+
118+
fn write(&mut self, _bytes: &[u8]) {
119+
panic!("Only works with `Hashed` value")
120+
}
121+
122+
fn write_u64(&mut self, i: u64) {
123+
self.hashed = i;
124+
}
125+
}
126+
127+
impl BuildHasher for HashedNoOpBuilder {
128+
type Hasher = HashedNoOpBuilder;
129+
130+
fn build_hasher(&self) -> Self::Hasher {
131+
HashedNoOpBuilder::default()
132+
}
133+
}

opentelemetry-sdk/src/metrics/internal/mod.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
mod aggregate;
22
mod exponential_histogram;
3+
mod hashed;
34
mod histogram;
45
mod last_value;
56
mod precomputed_sum;
@@ -16,13 +17,14 @@ use std::sync::{Arc, Mutex, RwLock};
1617
use aggregate::is_under_cardinality_limit;
1718
pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure};
1819
pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
20+
use hashed::{Hashed, HashedNoOpBuilder};
1921
use once_cell::sync::Lazy;
2022
use opentelemetry::{otel_warn, KeyValue};
2123

22-
use crate::metrics::AttributeSet;
24+
use super::sort_and_dedup;
2325

24-
pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy<Vec<KeyValue>> =
25-
Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]);
26+
pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy<Hashed<'static, [KeyValue]>> =
27+
Lazy::new(|| Hashed::from_owned(vec![KeyValue::new("otel.metric.overflow", "true")]));
2628

2729
pub(crate) trait Aggregator {
2830
/// A static configuration that is needed in order to initialize aggregator.
@@ -60,10 +62,10 @@ where
6062
// for performance reasons, no_attribs tracker
6163
no_attribs: NoAttribs<A>,
6264
// for performance reasons, to handle attributes in the provided order
63-
all_attribs: RwLock<HashMap<Vec<KeyValue>, Arc<A>>>,
65+
all_attribs: RwLock<HashMap<Hashed<'static, [KeyValue]>, Arc<A>, HashedNoOpBuilder>>,
6466
// different order of attribute keys should still map to same tracker instance
6567
// this helps to achieve that and also enables implementing collection efficiently
66-
sorted_attribs: Mutex<HashMap<Vec<KeyValue>, Arc<A>>>,
68+
sorted_attribs: Mutex<HashMap<Hashed<'static, [KeyValue]>, Arc<A>, HashedNoOpBuilder>>,
6769
/// Configuration for an Aggregator
6870
config: A::InitConfig,
6971
}
@@ -78,8 +80,8 @@ where
7880
tracker: A::create(&config),
7981
is_set: AtomicBool::new(false),
8082
},
81-
all_attribs: RwLock::new(HashMap::new()),
82-
sorted_attribs: Mutex::new(HashMap::new()),
83+
all_attribs: RwLock::new(HashMap::default()),
84+
sorted_attribs: Mutex::new(HashMap::default()),
8385
config,
8486
}
8587
}
@@ -91,10 +93,12 @@ where
9193
return;
9294
}
9395

96+
let attributes = Hashed::from_borrowed(attributes);
97+
9498
// Try to retrieve and update the tracker with the attributes in the provided order first
9599
match self.all_attribs.read() {
96100
Ok(trackers) => {
97-
if let Some(tracker) = trackers.get(attributes) {
101+
if let Some(tracker) = trackers.get(&attributes) {
98102
tracker.update(value);
99103
return;
100104
}
@@ -103,7 +107,7 @@ where
103107
};
104108

105109
// Get or create a tracker
106-
let sorted_attrs = AttributeSet::from(attributes).into_vec();
110+
let sorted_attrs = Hashed::from_owned(sort_and_dedup(&attributes));
107111
let Ok(mut sorted_trackers) = self.sorted_attribs.lock() else {
108112
return;
109113
};
@@ -135,7 +139,7 @@ where
135139
let Ok(mut all_trackers) = self.all_attribs.write() else {
136140
return;
137141
};
138-
all_trackers.insert(attributes.to_vec(), new_tracker);
142+
all_trackers.insert(attributes.into_owned(), new_tracker);
139143
}
140144

141145
/// Iterate through all attribute sets and populate `DataPoints` in readonly mode.
@@ -160,7 +164,7 @@ where
160164
}
161165

162166
for (attrs, tracker) in trackers.into_iter() {
163-
dest.push(map_fn(attrs, &tracker));
167+
dest.push(map_fn(attrs.clone().into_inner_owned(), &tracker));
164168
}
165169
}
166170

@@ -173,7 +177,7 @@ where
173177
// reset sorted trackers so new attributes set will be written into new hashmap
174178
let trackers = match self.sorted_attribs.lock() {
175179
Ok(mut trackers) => {
176-
let new = HashMap::with_capacity(trackers.len());
180+
let new = HashMap::with_capacity_and_hasher(trackers.len(), HashedNoOpBuilder::default());
177181
replace(trackers.deref_mut(), new)
178182
}
179183
Err(_) => return,
@@ -195,7 +199,7 @@ where
195199

196200
for (attrs, tracker) in trackers.into_iter() {
197201
let tracker = Arc::into_inner(tracker).expect("the only instance");
198-
dest.push(map_fn(attrs, tracker));
202+
dest.push(map_fn(attrs.into_inner_owned(), tracker));
199203
}
200204
}
201205
}
@@ -403,6 +407,7 @@ impl AtomicallyUpdate<f64> for f64 {
403407

404408
#[cfg(test)]
405409
mod tests {
410+
406411
use super::*;
407412

408413
#[test]

opentelemetry-sdk/src/metrics/mod.rs

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -115,23 +115,27 @@ pub(crate) struct AttributeSet(Vec<KeyValue>, u64);
115115

116116
impl From<&[KeyValue]> for AttributeSet {
117117
fn from(values: &[KeyValue]) -> Self {
118-
let mut seen_keys = HashSet::with_capacity(values.len());
119-
let vec = values
120-
.iter()
121-
.rev()
122-
.filter_map(|kv| {
123-
if seen_keys.insert(kv.key.clone()) {
124-
Some(kv.clone())
125-
} else {
126-
None
127-
}
128-
})
129-
.collect::<Vec<_>>();
130-
131-
AttributeSet::new(vec)
118+
AttributeSet::new(sort_and_dedup(values))
132119
}
133120
}
134121

122+
pub(crate) fn sort_and_dedup(values: &[KeyValue]) -> Vec<KeyValue> {
123+
let mut seen_keys = HashSet::with_capacity(values.len());
124+
let mut vec = values
125+
.iter()
126+
.rev()
127+
.filter_map(|kv| {
128+
if seen_keys.insert(kv.key.clone()) {
129+
Some(kv.clone())
130+
} else {
131+
None
132+
}
133+
})
134+
.collect::<Vec<_>>();
135+
vec.sort_unstable_by(|a, b| a.key.cmp(&b.key));
136+
vec
137+
}
138+
135139
fn calculate_hash(values: &[KeyValue]) -> u64 {
136140
let mut hasher = DefaultHasher::new();
137141
values.iter().fold(&mut hasher, |mut hasher, item| {
@@ -142,8 +146,7 @@ fn calculate_hash(values: &[KeyValue]) -> u64 {
142146
}
143147

144148
impl AttributeSet {
145-
fn new(mut values: Vec<KeyValue>) -> Self {
146-
values.sort_unstable_by(|a, b| a.key.cmp(&b.key));
149+
fn new(values: Vec<KeyValue>) -> Self {
147150
let hash = calculate_hash(&values);
148151
AttributeSet(values, hash)
149152
}
@@ -152,11 +155,6 @@ impl AttributeSet {
152155
pub(crate) fn iter(&self) -> impl Iterator<Item = (&Key, &Value)> {
153156
self.0.iter().map(|kv| (&kv.key, &kv.value))
154157
}
155-
156-
/// Returns the underlying Vec of KeyValue pairs
157-
pub(crate) fn into_vec(self) -> Vec<KeyValue> {
158-
self.0
159-
}
160158
}
161159

162160
impl Hash for AttributeSet {

0 commit comments

Comments
 (0)