Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opentelemetry-sdk/benches/metrics_counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
|--------------------------------|-------------|
| Counter_Add_Sorted | 172 ns |
| Counter_Add_Unsorted | 183 ns |
| Counter_Overflow | 898 ns |
| Counter_Overflow | 562 ns |
| ThreadLocal_Random_Generator_5 | 37 ns |
*/

Expand Down
14 changes: 11 additions & 3 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
use once_cell::sync::Lazy;
use opentelemetry::{otel_warn, KeyValue};

use crate::metrics::AttributeSet;

pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy<Vec<KeyValue>> =
Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]);

Expand Down Expand Up @@ -95,7 +93,7 @@ where
}

// Try to retrieve and update the tracker with the attributes sorted.
let sorted_attrs = AttributeSet::from(attributes).into_vec();
let sorted_attrs = sort_and_dedup(attributes);
if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
tracker.update(value);
return;
Expand Down Expand Up @@ -198,6 +196,16 @@ fn prepare_data<T>(data: &mut Vec<T>, list_len: usize) {
}
}

fn sort_and_dedup(attributes: &[KeyValue]) -> Vec<KeyValue> {
// Use newly allocated vec here as incoming attributes are immutable so
// cannot sort/de-dup in-place. TODO: This allocation can be avoided by
// leveraging a ThreadLocal vec.
let mut sorted = attributes.to_vec();
sorted.sort_unstable_by(|a, b| a.key.cmp(&b.key));
sorted.dedup_by(|a, b| a.key == b.key);
Copy link
Member

@lalitb lalitb Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a nit change in behavior of dedup it seems - earlier with (AttributeSet) last occurrence of key was retained (because of rev call), now the first occurrence is retained.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For an API like SetAttribute, it totally makes sense to overwrite the existing one and retain the new one. Spec also recommends that. But for an existing collection - dont know if one is better than the other. This clearly indicates an issue at instrumentation/user side, so I think we don't need to make any promise here about which one wins.
Also prometheus deals with this by taking all values and concatenating then with comma separation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spotted same thing:) Now it actually is undefined, because sort order is unstable.
I tried to find the answer in the spec, and I couldnt find any restrictions... it says:

The enforcement of uniqueness may be performed in a variety of ways as it best fits the limitations of the particular implementation.

So probably this works just fine.
However, I have wrote this impl, which is also free of allocations.

        vec.sort_by(|a, b| a.key.cmp(&b.key));

        // we cannot use vec.dedup_by because it will remove last duplicate not first
        if vec.len() > 1 {
            let mut i = vec.len() - 1;
            while i != 0 {
                let is_same = unsafe { vec.get_unchecked(i - 1).key == vec.get_unchecked(i).key };
                if is_same {
                    vec.remove(i - 1);
                }
                i -= 1;
            }
        }

Copy link
Member Author

@cijothomas cijothomas Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, I have wrote this impl, which is also free of allocations.

Allocation is not in the sorting/de-dup logic, but just outside of that in the vec! creation! I briefly tried thread-local, and it can solve that issue also.

sorted
}

/// Marks a type that can have a value added and retrieved atomically. Required since
/// different types have different backing atomic mechanisms
pub(crate) trait AtomicTracker<T>: Sync + Send + 'static {
Expand Down
60 changes: 1 addition & 59 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,7 @@ pub use view::*;
// #[cfg(not(feature = "spec_unstable_metrics_views"))]
// pub(crate) use view::*;

use std::collections::hash_map::DefaultHasher;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};

use opentelemetry::KeyValue;
use std::hash::Hash;

/// Defines the window that an aggregation was calculated over.
#[derive(Debug, Copy, Clone, Default, PartialEq, Eq, Hash)]
Expand All @@ -106,60 +102,6 @@ pub enum Temporality {
LowMemory,
}

/// A unique set of attributes that can be used as instrument identifiers.
///
/// This must implement [Hash], [PartialEq], and [Eq] so it may be used as
/// HashMap keys and other de-duplication methods.
#[derive(Clone, Default, Debug, PartialEq, Eq)]
pub(crate) struct AttributeSet(Vec<KeyValue>, u64);

impl From<&[KeyValue]> for AttributeSet {
fn from(values: &[KeyValue]) -> Self {
let mut seen_keys = HashSet::with_capacity(values.len());
let vec = values
.iter()
.rev()
.filter_map(|kv| {
if seen_keys.insert(kv.key.clone()) {
Some(kv.clone())
} else {
None
}
})
.collect::<Vec<_>>();

AttributeSet::new(vec)
}
}

fn calculate_hash(values: &[KeyValue]) -> u64 {
let mut hasher = DefaultHasher::new();
values.iter().fold(&mut hasher, |mut hasher, item| {
item.hash(&mut hasher);
hasher
});
hasher.finish()
}

impl AttributeSet {
fn new(mut values: Vec<KeyValue>) -> Self {
values.sort_unstable_by(|a, b| a.key.cmp(&b.key));
let hash = calculate_hash(&values);
AttributeSet(values, hash)
}

/// Returns the underlying Vec of KeyValue pairs
pub(crate) fn into_vec(self) -> Vec<KeyValue> {
self.0
}
}

impl Hash for AttributeSet {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write_u64(self.1)
}
}

#[cfg(all(test, feature = "testing"))]
mod tests {
use self::data::{DataPoint, HistogramDataPoint, ScopeMetrics};
Expand Down
Loading