Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions examples/metrics-advanced/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider {
let my_view_rename_and_unit = |i: &Instrument| {
if i.name == "my_histogram" {
Some(
Stream::new()
.name("my_histogram_renamed")
.unit("milliseconds"),
Stream::builder()
.with_name("my_histogram_renamed")
.with_unit("milliseconds")
.build()
.unwrap(),
)
} else {
None
Expand All @@ -21,7 +23,11 @@ fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider {
// for example 2
let my_view_change_cardinality = |i: &Instrument| {
if i.name == "my_second_histogram" {
Some(Stream::new().cardinality_limit(2))
// Note: If Stream is invalid, build() will return an error. By
// calling `.ok()`, any such error is ignored and treated as if the
// view does not match the instrument. If this is not the desired
// behavior, consider handling the error explicitly.
Stream::builder().with_cardinality_limit(2).build().ok()
} else {
None
}
Expand Down
9 changes: 6 additions & 3 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,17 @@ also modified to suppress telemetry before invoking exporters.
behind feature flag "experimental_metrics_custom_reader".
[#2928](https://github.com/open-telemetry/opentelemetry-rust/pull/2928)

- TODO: Placeholder for View related changelog. Polish this after all
- The `Stream` struct now has its public fields hidden.
- TODO: Placeholder for View related changelog. Polish this after all changs done
- Core view functionality is now available by default—users can change the
name, unit, description, and cardinality limit of a metric via views without
enabling the `spec_unstable_metrics_views` feature flag. Advanced view
features, such as custom aggregation or attribute filtering, still require
the `spec_unstable_metrics_views` feature.
- TODO: Add Stream::builder() pattern change, validation when done.
- Introduced a builder pattern for `Stream` creation to use with "Views".
- Added `StreamBuilder` struct with methods to configure stream properties
- Added `Stream::builder()` method that returns a new `StreamBuilder`
- `StreamBuilder::build()` returns `Result<Stream, Box<dyn Error>>` enabling
proper validation

- *Breaking* `Aggregation` enum moved behind feature flag
"spec_unstable_metrics_views". This was only required when using Views.
Expand Down
16 changes: 11 additions & 5 deletions opentelemetry-sdk/benches/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,10 @@ fn counters(c: &mut Criterion) {
Some(
new_view(
Instrument::new().name("*"),
Stream::new().allowed_attribute_keys([Key::new("K")]),
Stream::builder()
.with_allowed_attribute_keys([Key::new("K")])
.build()
.unwrap(),
)
.unwrap(),
),
Expand Down Expand Up @@ -273,10 +276,13 @@ fn bench_histogram(bound_count: usize) -> (SharedReader, Histogram<u64>) {
let view = Some(
new_view(
Instrument::new().name("histogram_*"),
Stream::new().aggregation(Aggregation::ExplicitBucketHistogram {
boundaries: bounds.iter().map(|&x| x as f64).collect(),
record_min_max: true,
}),
Stream::builder()
.with_aggregation(Aggregation::ExplicitBucketHistogram {
boundaries: bounds.iter().map(|&x| x as f64).collect(),
record_min_max: true,
})
.build()
.unwrap(),
)
.unwrap(),
);
Expand Down
200 changes: 152 additions & 48 deletions opentelemetry-sdk/src/metrics/instrument.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{borrow::Cow, collections::HashSet, sync::Arc};
use std::{borrow::Cow, collections::HashSet, error::Error, sync::Arc};

use opentelemetry::{
metrics::{AsyncInstrument, SyncInstrument},
Expand Down Expand Up @@ -73,10 +73,13 @@
/// Instruments can be used as criteria for views.
///
/// ```
/// use opentelemetry_sdk::metrics::{new_view, Aggregation, Instrument, Stream};
/// use opentelemetry_sdk::metrics::{new_view, Aggregation, Instrument, Stream, StreamBuilder};
///
/// let criteria = Instrument::new().name("counter_*");
/// let mask = Stream::new().aggregation(Aggregation::Sum);
/// let mask = Stream::builder()
/// .with_aggregation(Aggregation::Sum)
/// .build()
/// .unwrap();
///
/// let view = new_view(criteria, mask);
/// # drop(view);
Expand Down Expand Up @@ -169,71 +172,60 @@
}
}

/// Describes the stream of data an instrument produces.
/// A builder for creating Stream objects.
///
/// # Example
///
/// Streams can be used as masks in views.
///
/// ```
/// use opentelemetry_sdk::metrics::{new_view, Aggregation, Instrument, Stream};
///
/// let criteria = Instrument::new().name("counter_*");
/// let mask = Stream::new().aggregation(Aggregation::Sum);
/// use opentelemetry_sdk::metrics::{Aggregation, Stream};
/// use opentelemetry::Key;
///
/// let view = new_view(criteria, mask);
/// # drop(view);
/// let stream = Stream::builder()
/// .with_name("my_stream")
/// .with_aggregation(Aggregation::Sum)
/// .with_cardinality_limit(100)
/// .build()
/// .unwrap();
/// ```
#[derive(Default, Debug)]
#[non_exhaustive]
#[allow(unreachable_pub)]
pub struct Stream {
/// The human-readable identifier of the stream.
pub(crate) name: Option<Cow<'static, str>>,
/// Describes the purpose of the data.
pub(crate) description: Option<Cow<'static, str>>,
/// the unit of measurement recorded.
pub(crate) unit: Option<Cow<'static, str>>,
/// Aggregation the stream uses for an instrument.
pub(crate) aggregation: Option<Aggregation>,
/// An allow-list of attribute keys that will be preserved for the stream.
///
/// Any attribute recorded for the stream with a key not in this set will be
/// dropped. If the set is empty, all attributes will be dropped, if `None` all
/// attributes will be kept.
pub(crate) allowed_attribute_keys: Option<Arc<HashSet<Key>>>,

/// Cardinality limit for the stream.
pub(crate) cardinality_limit: Option<usize>,
pub struct StreamBuilder {
name: Option<Cow<'static, str>>,
description: Option<Cow<'static, str>>,
unit: Option<Cow<'static, str>>,
aggregation: Option<Aggregation>,
allowed_attribute_keys: Option<Arc<HashSet<Key>>>,
cardinality_limit: Option<usize>,
}

impl Stream {
/// Create a new stream with empty values.
pub fn new() -> Self {
Stream::default()
impl StreamBuilder {
/// Create a new stream builder with default values.
pub(crate) fn new() -> Self {
StreamBuilder::default()
}

/// Set the stream name.
pub fn name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
/// Set the stream name. If this is not set, name provide while creating the instrument will be used.
pub fn with_name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
self.name = Some(name.into());
self
}

/// Set the stream description.
pub fn description(mut self, description: impl Into<Cow<'static, str>>) -> Self {
/// Set the stream description. If this is not set, description provided while creating the instrument will be used.
pub fn with_description(mut self, description: impl Into<Cow<'static, str>>) -> Self {
self.description = Some(description.into());
self
}

/// Set the stream unit.
pub fn unit(mut self, unit: impl Into<Cow<'static, str>>) -> Self {
/// Set the stream unit. If this is not set, unit provided while creating the instrument will be used.
pub fn with_unit(mut self, unit: impl Into<Cow<'static, str>>) -> Self {
self.unit = Some(unit.into());
self
}

#[cfg(feature = "spec_unstable_metrics_views")]
/// Set the stream aggregation.
pub fn aggregation(mut self, aggregation: Aggregation) -> Self {
/// Set the stream aggregation. This is used to customize the aggregation.
/// If not set, the default aggregation based on the instrument kind will be used.
pub fn with_aggregation(mut self, aggregation: Aggregation) -> Self {
self.aggregation = Some(aggregation);
self
}
Expand All @@ -242,18 +234,130 @@
/// Set the stream allowed attribute keys.
///
/// Any attribute recorded for the stream with a key not in this set will be
/// dropped. If this set is empty all attributes will be dropped.
pub fn allowed_attribute_keys(mut self, attribute_keys: impl IntoIterator<Item = Key>) -> Self {
/// dropped. If the set is empty, all attributes will be dropped, if `None` all
/// attributes will be kept.
pub fn with_allowed_attribute_keys(
mut self,
attribute_keys: impl IntoIterator<Item = Key>,
) -> Self {
self.allowed_attribute_keys = Some(Arc::new(attribute_keys.into_iter().collect()));

self
}

/// Set the stream cardinality limit.
pub fn cardinality_limit(mut self, limit: usize) -> Self {
/// Set the stream cardinality limit. If this is not set, the default limit of 2000 will be used.
pub fn with_cardinality_limit(mut self, limit: usize) -> Self {
self.cardinality_limit = Some(limit);
self
}

/// Build a new Stream instance using the configuration in this builder.
///
/// # Returns
///
/// A Result containing the new Stream instance or an error if the build failed.
pub fn build(self) -> Result<Stream, Box<dyn Error>> {
// TODO: Add same validation as already done while
// creating instruments. It is better to move validation logic
// to a common helper and call it from both places.
// The current implementations does a basic validation
// only to close the overall API design.

// if name is provided, it must not be empty
if let Some(name) = &self.name {
if name.is_empty() {
return Err("Stream name must not be empty".into());

Check warning on line 268 in opentelemetry-sdk/src/metrics/instrument.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/instrument.rs#L268

Added line #L268 was not covered by tests
}
}

// if cardinality limit is provided, it must be greater than 0
if let Some(limit) = self.cardinality_limit {
if limit == 0 {
return Err("Cardinality limit must be greater than 0".into());

Check warning on line 275 in opentelemetry-sdk/src/metrics/instrument.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/instrument.rs#L275

Added line #L275 was not covered by tests
}
}

// If the aggregation is set to ExplicitBucketHistogram, validate the bucket boundaries.
if let Some(Aggregation::ExplicitBucketHistogram { boundaries, .. }) = &self.aggregation {
validate_bucket_boundaries(boundaries)?;
}

Ok(Stream {
name: self.name,
description: self.description,
unit: self.unit,
aggregation: self.aggregation,
allowed_attribute_keys: self.allowed_attribute_keys,
cardinality_limit: self.cardinality_limit,
})
}
}

fn validate_bucket_boundaries(boundaries: &[f64]) -> Result<(), String> {
// Validate boundaries do not contain f64::NAN, f64::INFINITY, or f64::NEG_INFINITY
for boundary in boundaries {
if boundary.is_nan() || boundary.is_infinite() {
return Err(
"Bucket boundaries must not contain NaN, Infinity, or -Infinity".to_string(),
);

Check warning on line 301 in opentelemetry-sdk/src/metrics/instrument.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/instrument.rs#L299-L301

Added lines #L299 - L301 were not covered by tests
}
}

// validate that buckets are sorted and non-duplicate
for i in 1..boundaries.len() {
if boundaries[i] <= boundaries[i - 1] {
return Err("Bucket boundaries must be sorted and non-duplicate".to_string());
}
}

Ok(())

Check warning on line 312 in opentelemetry-sdk/src/metrics/instrument.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/instrument.rs#L312

Added line #L312 was not covered by tests
}

/// Describes the stream of data an instrument produces.
///
/// # Example
///
/// Streams can be used as masks in views.
///
/// ```
/// use opentelemetry_sdk::metrics::{new_view, Aggregation, Instrument, Stream};
///
/// let criteria = Instrument::new().name("counter_*");
/// let mask = Stream::builder()
/// .with_aggregation(Aggregation::Sum)
/// .build()
/// .unwrap();
///
/// let view = new_view(criteria, mask);
/// # drop(view);
/// ```
#[derive(Default, Debug)]
#[non_exhaustive]
#[allow(unreachable_pub)]
pub struct Stream {
/// The human-readable identifier of the stream.
pub(crate) name: Option<Cow<'static, str>>,
/// Describes the purpose of the data.
pub(crate) description: Option<Cow<'static, str>>,
/// the unit of measurement recorded.
pub(crate) unit: Option<Cow<'static, str>>,
/// Aggregation the stream uses for an instrument.
pub(crate) aggregation: Option<Aggregation>,
/// An allow-list of attribute keys that will be preserved for the stream.
///
/// Any attribute recorded for the stream with a key not in this set will be
/// dropped. If the set is empty, all attributes will be dropped, if `None` all
/// attributes will be kept.
pub(crate) allowed_attribute_keys: Option<Arc<HashSet<Key>>>,

/// Cardinality limit for the stream.
pub(crate) cardinality_limit: Option<usize>,
}

impl Stream {
/// Create a new stream builder with default values.
pub fn builder() -> StreamBuilder {
StreamBuilder::new()
}
}

/// The identifying properties of an instrument.
Expand Down
Loading