From 442b0ac410de78cd8834eb4402124eb300d76aa1 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Wed, 21 May 2025 09:44:38 -0700 Subject: [PATCH 1/5] Feat: View cleanups --- examples/metrics-advanced/src/main.rs | 4 +- opentelemetry-sdk/CHANGELOG.md | 4 + opentelemetry-sdk/Cargo.toml | 3 +- opentelemetry-sdk/src/metrics/error.rs | 21 -- opentelemetry-sdk/src/metrics/instrument.rs | 143 ++++--------- opentelemetry-sdk/src/metrics/meter.rs | 2 +- .../src/metrics/meter_provider.rs | 6 +- opentelemetry-sdk/src/metrics/mod.rs | 6 - opentelemetry-sdk/src/metrics/pipeline.rs | 5 +- opentelemetry-sdk/src/metrics/view.rs | 193 ------------------ 10 files changed, 50 insertions(+), 337 deletions(-) diff --git a/examples/metrics-advanced/src/main.rs b/examples/metrics-advanced/src/main.rs index 287ec2ca3b..104e75b501 100644 --- a/examples/metrics-advanced/src/main.rs +++ b/examples/metrics-advanced/src/main.rs @@ -7,7 +7,7 @@ use std::error::Error; fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider { // for example 1 let my_view_rename_and_unit = |i: &Instrument| { - if i.name == "my_histogram" { + if i.name() == "my_histogram" { Some( Stream::builder() .with_name("my_histogram_renamed") @@ -22,7 +22,7 @@ fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider { // for example 2 let my_view_change_cardinality = |i: &Instrument| { - if i.name == "my_second_histogram" { + if i.name() == "my_second_histogram" { // Note: If Stream is invalid, build() will return an error. By // calling `.ok()`, any such error is ignored and treated as if the // view does not match the instrument. If this is not the desired diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 1cd4213839..82f92c4758 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -55,6 +55,10 @@ also modified to suppress telemetry before invoking exporters. - Added `Stream::builder()` method that returns a new `StreamBuilder` - `StreamBuilder::build()` returns `Result>` enabling proper validation + - Removed `new_view()` on `View`. Views can be instead added by passing anything + that implements `View` trait to `with_view` method on `MeterProviderBuilder`. + `View` is implemented for `Fn(&Instrument) -> Option`, so this can be + used to add views. - *Breaking* `Aggregation` enum moved behind feature flag "spec_unstable_metrics_views". This was only required when using Views. diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index 171296e3ef..120acec442 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -18,7 +18,6 @@ futures-executor = { workspace = true } futures-util = { workspace = true, features = ["std", "sink", "async-await-macro"] } percent-encoding = { workspace = true, optional = true } rand = { workspace = true, features = ["std", "std_rng", "small_rng", "os_rng", "thread_rng"], optional = true } -glob = { workspace = true, optional = true } serde = { workspace = true, features = ["derive", "rc"], optional = true } serde_json = { workspace = true, optional = true } thiserror = { workspace = true } @@ -45,7 +44,7 @@ trace = ["opentelemetry/trace", "rand", "percent-encoding"] jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_json", "url", "experimental_async_runtime"] logs = ["opentelemetry/logs", "serde_json"] spec_unstable_logs_enabled = ["logs", "opentelemetry/spec_unstable_logs_enabled"] -metrics = ["opentelemetry/metrics", "glob"] +metrics = ["opentelemetry/metrics"] testing = ["opentelemetry/testing", "trace", "metrics", "logs", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"] experimental_async_runtime = [] rt-tokio = ["tokio", "tokio-stream", "experimental_async_runtime"] diff --git a/opentelemetry-sdk/src/metrics/error.rs b/opentelemetry-sdk/src/metrics/error.rs index 7d7f2bd19b..cb2e082aa3 100644 --- a/opentelemetry-sdk/src/metrics/error.rs +++ b/opentelemetry-sdk/src/metrics/error.rs @@ -3,30 +3,9 @@ use std::sync::PoisonError; use thiserror::Error; /// A specialized `Result` type for metric operations. -#[cfg(feature = "spec_unstable_metrics_views")] -pub type MetricResult = result::Result; -#[cfg(not(feature = "spec_unstable_metrics_views"))] pub(crate) type MetricResult = result::Result; /// Errors returned by the metrics API. -#[cfg(feature = "spec_unstable_metrics_views")] -#[derive(Error, Debug)] -#[non_exhaustive] -pub enum MetricError { - /// Other errors not covered by specific cases. - #[error("Metrics error: {0}")] - Other(String), - /// Invalid configuration - #[error("Config error {0}")] - Config(String), - /// Invalid instrument configuration such invalid instrument name, invalid instrument description, invalid instrument unit, etc. - /// See [spec](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#general-characteristics) - /// for full list of requirements. - #[error("Invalid instrument configuration: {0}")] - InvalidInstrumentConfiguration(&'static str), -} - -#[cfg(not(feature = "spec_unstable_metrics_views"))] #[derive(Error, Debug)] pub(crate) enum MetricError { /// Other errors not covered by specific cases. diff --git a/opentelemetry-sdk/src/metrics/instrument.rs b/opentelemetry-sdk/src/metrics/instrument.rs index 259460ef67..fd5d02751d 100644 --- a/opentelemetry-sdk/src/metrics/instrument.rs +++ b/opentelemetry-sdk/src/metrics/instrument.rs @@ -65,110 +65,62 @@ impl InstrumentKind { } } -/// Describes properties an instrument is created with, also used for filtering -/// in [View](crate::metrics::View)s. +/// Describes properties an instrument is created with, used for filtering in +/// [View](crate::metrics::View)s. +/// +/// A reference to `Instrument` is provided to the `view` to select the +/// instrument(s) for which the [Stream] should be applied. /// /// # Example /// -/// Instruments can be used as criteria for views. +/// ```rust +/// use opentelemetry_sdk::metrics::{Instrument, Stream}; /// +/// let my_view_change_cardinality = |i: &Instrument| { +/// if i.name() == "my_second_histogram" { +/// // Note: If Stream is invalid, build() will return an error. By +/// // calling `.ok()`, any such error is ignored and treated as if the +/// // view does not match the instrument. If this is not the desired +/// // behavior, consider handling the error explicitly. +/// Stream::builder().with_cardinality_limit(2).build().ok() +/// } else { +/// None +/// } +/// }; /// ``` -/// use opentelemetry_sdk::metrics::{new_view, Aggregation, Instrument, Stream, StreamBuilder}; -/// -/// let criteria = Instrument::new().name("counter_*"); -/// let mask = Stream::builder() -/// .with_aggregation(Aggregation::Sum) -/// .build() -/// .unwrap(); -/// -/// let view = new_view(criteria, mask); -/// # drop(view); -/// ``` -#[derive(Clone, Default, Debug, PartialEq)] -#[non_exhaustive] +#[derive(Clone, Debug, PartialEq)] pub struct Instrument { /// The human-readable identifier of the instrument. - pub name: Cow<'static, str>, + pub(crate) name: Cow<'static, str>, /// describes the purpose of the instrument. - pub description: Cow<'static, str>, + pub(crate) description: Cow<'static, str>, /// The functional group of the instrument. - pub kind: Option, + pub(crate) kind: InstrumentKind, /// Unit is the unit of measurement recorded by the instrument. - pub unit: Cow<'static, str>, + pub(crate) unit: Cow<'static, str>, /// The instrumentation that created the instrument. - pub scope: InstrumentationScope, + pub(crate) scope: InstrumentationScope, } -#[cfg(feature = "spec_unstable_metrics_views")] impl Instrument { - /// Create a new instrument with default values - pub fn new() -> Self { - Instrument::default() - } - - /// Set the instrument name. - pub fn name(mut self, name: impl Into>) -> Self { - self.name = name.into(); - self - } - - /// Set the instrument description. - pub fn description(mut self, description: impl Into>) -> Self { - self.description = description.into(); - self - } - - /// Set the instrument unit. - pub fn unit(mut self, unit: impl Into>) -> Self { - self.unit = unit.into(); - self - } - - /// Set the instrument scope. - pub fn scope(mut self, scope: InstrumentationScope) -> Self { - self.scope = scope; - self - } - - /// empty returns if all fields of i are their default-value. - pub(crate) fn is_empty(&self) -> bool { - self.name.is_empty() - && self.description.is_empty() - && self.kind.is_none() - && self.unit.is_empty() - && self.scope == InstrumentationScope::default() - } - - pub(crate) fn matches(&self, other: &Instrument) -> bool { - self.matches_name(other) - && self.matches_description(other) - && self.matches_kind(other) - && self.matches_unit(other) - && self.matches_scope(other) + /// Instrument name. + pub fn name(&self) -> &str { + self.name.as_ref() } - pub(crate) fn matches_name(&self, other: &Instrument) -> bool { - self.name.is_empty() || self.name.as_ref() == other.name.as_ref() + /// Instrument kind. + pub fn kind(&self) -> InstrumentKind { + self.kind } - pub(crate) fn matches_description(&self, other: &Instrument) -> bool { - self.description.is_empty() || self.description.as_ref() == other.description.as_ref() + /// Instrument unit. + pub fn unit(&self) -> &str { + self.unit.as_ref() } - pub(crate) fn matches_kind(&self, other: &Instrument) -> bool { - self.kind.is_none() || self.kind == other.kind - } - - pub(crate) fn matches_unit(&self, other: &Instrument) -> bool { - self.unit.is_empty() || self.unit.as_ref() == other.unit.as_ref() - } - - pub(crate) fn matches_scope(&self, other: &Instrument) -> bool { - (self.scope.name().is_empty() || self.scope.name() == other.scope.name()) - && (self.scope.version().is_none() - || self.scope.version().as_ref() == other.scope.version().as_ref()) - && (self.scope.schema_url().is_none() - || self.scope.schema_url().as_ref() == other.scope.schema_url().as_ref()) + /// Instrument scope. + pub fn scope(&self) -> &InstrumentationScope { + &self.scope } } @@ -188,7 +140,6 @@ impl Instrument { /// .unwrap(); /// ``` #[derive(Default, Debug)] -#[non_exhaustive] pub struct StreamBuilder { name: Option>, description: Option>, @@ -312,27 +263,9 @@ fn validate_bucket_boundaries(boundaries: &[f64]) -> Result<(), String> { Ok(()) } -/// Describes the stream of data an instrument produces. -/// -/// # Example -/// -/// Streams can be used as masks in views. -/// -/// ``` -/// use opentelemetry_sdk::metrics::{new_view, Aggregation, Instrument, Stream}; -/// -/// let criteria = Instrument::new().name("counter_*"); -/// let mask = Stream::builder() -/// .with_aggregation(Aggregation::Sum) -/// .build() -/// .unwrap(); -/// -/// let view = new_view(criteria, mask); -/// # drop(view); -/// ``` +/// Describes the stream of data an instrument produces. Used in +/// [View](crate::metrics::View)s to customize the metric output. #[derive(Default, Debug)] -#[non_exhaustive] -#[allow(unreachable_pub)] pub struct Stream { /// The human-readable identifier of the stream. pub(crate) name: Option>, diff --git a/opentelemetry-sdk/src/metrics/meter.rs b/opentelemetry-sdk/src/metrics/meter.rs index 3950cbce55..0ec960c313 100644 --- a/opentelemetry-sdk/src/metrics/meter.rs +++ b/opentelemetry-sdk/src/metrics/meter.rs @@ -683,7 +683,7 @@ where name, description: description.unwrap_or_default(), unit: unit.unwrap_or_default(), - kind: Some(kind), + kind: kind, scope: self.meter.scope.clone(), }; diff --git a/opentelemetry-sdk/src/metrics/meter_provider.rs b/opentelemetry-sdk/src/metrics/meter_provider.rs index 73cc9c23d1..90ca980140 100644 --- a/opentelemetry-sdk/src/metrics/meter_provider.rs +++ b/opentelemetry-sdk/src/metrics/meter_provider.rs @@ -308,7 +308,7 @@ impl MeterProviderBuilder { /// ``` /// # use opentelemetry_sdk::metrics::{Stream, Instrument}; /// let view_rename = |i: &Instrument| { - /// if i.name == "my_counter" { + /// if i.name() == "my_counter" { /// Some(Stream::builder().with_name("my_counter_renamed").build().expect("Stream should be valid")) /// } else { /// None @@ -324,7 +324,7 @@ impl MeterProviderBuilder { /// ``` /// # use opentelemetry_sdk::metrics::{Stream, Instrument}; /// let view_change_cardinality = |i: &Instrument| { - /// if i.name == "my_counter" { + /// if i.name() == "my_counter" { /// Some( /// Stream::builder() /// .with_cardinality_limit(100).build().expect("Stream should be valid"), @@ -343,7 +343,7 @@ impl MeterProviderBuilder { /// ``` /// # use opentelemetry_sdk::metrics::{Stream, Instrument}; /// let my_view_change_cardinality = |i: &Instrument| { - /// if i.name == "my_second_histogram" { + /// if i.name() == "my_second_histogram" { /// // Note: If Stream is invalid, build() will return `Error` variant. /// // By calling `.ok()`, any such error is ignored and treated as if the view does not match /// // the instrument. diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index fc20f88f19..e92eaf6378 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -73,8 +73,6 @@ pub use in_memory_exporter::{InMemoryMetricExporter, InMemoryMetricExporterBuild #[cfg(feature = "spec_unstable_metrics_views")] pub use aggregation::*; -#[cfg(feature = "spec_unstable_metrics_views")] -pub use error::{MetricError, MetricResult}; #[cfg(feature = "experimental_metrics_custom_reader")] pub use manual_reader::*; pub use meter_provider::*; @@ -83,10 +81,6 @@ pub use periodic_reader::*; pub use pipeline::Pipeline; pub use instrument::{Instrument, InstrumentKind, Stream, StreamBuilder}; - -#[cfg(feature = "spec_unstable_metrics_views")] -pub use view::new_view; - pub use view::View; use std::hash::Hash; diff --git a/opentelemetry-sdk/src/metrics/pipeline.rs b/opentelemetry-sdk/src/metrics/pipeline.rs index e012211ffc..68c48b5387 100644 --- a/opentelemetry-sdk/src/metrics/pipeline.rs +++ b/opentelemetry-sdk/src/metrics/pipeline.rs @@ -259,10 +259,7 @@ where let mut matched = false; let mut measures = vec![]; let mut errs = vec![]; - let kind = match inst.kind { - Some(kind) => kind, - None => return Err(MetricError::Other("instrument must have a kind".into())), - }; + let kind = inst.kind; // The cache will return the same Aggregator instance. Use stream ids to de duplicate. let mut seen = HashSet::new(); diff --git a/opentelemetry-sdk/src/metrics/view.rs b/opentelemetry-sdk/src/metrics/view.rs index ac2e0493d9..046e56d298 100644 --- a/opentelemetry-sdk/src/metrics/view.rs +++ b/opentelemetry-sdk/src/metrics/view.rs @@ -1,13 +1,4 @@ use super::instrument::{Instrument, Stream}; -#[cfg(feature = "spec_unstable_metrics_views")] -use crate::metrics::{MetricError, MetricResult}; -#[cfg(feature = "spec_unstable_metrics_views")] -use glob::Pattern; - -#[cfg(feature = "spec_unstable_metrics_views")] -fn empty_view(_inst: &Instrument) -> Option { - None -} /// Used to customize the metrics that are output by the SDK. /// @@ -67,187 +58,3 @@ impl View for Box { (**self).match_inst(inst) } } - -#[cfg(feature = "spec_unstable_metrics_views")] -/// Creates a [View] that applies the [Stream] mask for all instruments that -/// match criteria. -/// -/// The returned [View] will only apply the mask if all non-empty fields of -/// criteria match the corresponding [Instrument] passed to the view. If all -/// fields of the criteria are their default values, a view that matches no -/// instruments is returned. If you need to match an empty-value field, create a -/// [View] directly. -/// -/// The [Instrument::name] field of criteria supports wildcard pattern matching. -/// The wildcard `*` is recognized as matching zero or more characters, and `?` -/// is recognized as matching exactly one character. For example, a pattern of -/// `*` will match all instrument names. -/// -/// The [Stream] mask only applies updates for non-empty fields. By default, the -/// [Instrument] the [View] matches against will be use for the name, -/// description, and unit of the returned [Stream] and no `aggregation` or -/// `allowed_attribute_keys` are set. All non-empty fields of mask are used -/// instead of the default. If you need to set a an empty value in the returned -/// stream, create a custom [View] directly. -/// -/// # Example -/// -/// ``` -/// use opentelemetry_sdk::metrics::{new_view, Aggregation, Instrument, Stream}; -/// -/// let criteria = Instrument::new().name("counter_*"); -/// let mask = Stream::builder() -/// .with_aggregation(Aggregation::Sum) -/// .build() -/// .unwrap(); -/// -/// let view = new_view(criteria, mask); -/// # drop(view); -/// ``` -pub fn new_view(criteria: Instrument, mask: Stream) -> MetricResult> { - if criteria.is_empty() { - // TODO - The error is getting lost here. Need to return or log. - return Ok(Box::new(empty_view)); - } - let contains_wildcard = criteria.name.contains(['*', '?']); - - let match_fn: Box bool + Send + Sync> = if contains_wildcard { - if mask.name.is_some() { - // TODO - The error is getting lost here. Need to return or log. - return Ok(Box::new(empty_view)); - } - - let pattern = criteria.name.clone(); - let glob_pattern = - Pattern::new(&pattern).map_err(|e| MetricError::Config(e.to_string()))?; - - Box::new(move |i| { - glob_pattern.matches(&i.name) - && criteria.matches_description(i) - && criteria.matches_kind(i) - && criteria.matches_unit(i) - && criteria.matches_scope(i) - }) - } else { - Box::new(move |i| criteria.matches(i)) - }; - - let mut agg = None; - if let Some(ma) = &mask.aggregation { - match ma.validate() { - Ok(_) => agg = Some(ma.clone()), - Err(_) => { - // TODO - The error is getting lost here. Need to return or log. - return Ok(Box::new(empty_view)); - } - } - } - - Ok(Box::new(move |i: &Instrument| -> Option { - if match_fn(i) { - Some(Stream { - name: if mask.name.is_some() { - mask.name.clone() - } else { - Some(i.name.clone()) - }, - description: if mask.description.is_some() { - mask.description.clone() - } else { - Some(i.description.clone()) - }, - unit: if mask.unit.is_some() { - mask.unit.clone() - } else { - Some(i.unit.clone()) - }, - aggregation: agg.clone(), - allowed_attribute_keys: mask.allowed_attribute_keys.clone(), - cardinality_limit: mask.cardinality_limit, - }) - } else { - None - } - })) -} - -#[cfg(test)] -#[cfg(feature = "spec_unstable_metrics_views")] -mod tests { - use super::*; - #[test] - fn test_new_view_matching_all() { - let criteria = Instrument::new().name("*"); - let mask = Stream::builder().build().unwrap(); - - let view = new_view(criteria, mask).expect("Expected to create a new view"); - - let test_instrument = Instrument::new().name("test_instrument"); - assert!( - view.match_inst(&test_instrument).is_some(), - "Expected to match all instruments with * pattern" - ); - } - - #[test] - fn test_new_view_exact_match() { - let criteria = Instrument::new().name("counter_exact_match"); - let mask = Stream::builder().build().unwrap(); - - let view = new_view(criteria, mask).expect("Expected to create a new view"); - - let matching_instrument = Instrument::new().name("counter_exact_match"); - assert!( - view.match_inst(&matching_instrument).is_some(), - "Expected to match instrument with exact name" - ); - - let non_matching_instrument = Instrument::new().name("counter_non_exact_match"); - assert!( - view.match_inst(&non_matching_instrument).is_none(), - "Expected not to match instrument with different name" - ); - } - - #[test] - fn test_new_view_with_wildcard_pattern() { - let criteria = Instrument::new().name("prefix_*"); - let mask = Stream::builder().build().unwrap(); - - let view = new_view(criteria, mask).expect("Expected to create a new view"); - - let matching_instrument = Instrument::new().name("prefix_counter"); - assert!( - view.match_inst(&matching_instrument).is_some(), - "Expected to match instrument with matching prefix" - ); - - let non_matching_instrument = Instrument::new().name("nonprefix_counter"); - assert!( - view.match_inst(&non_matching_instrument).is_none(), - "Expected not to match instrument with different prefix" - ); - } - - #[test] - fn test_new_view_wildcard_question_mark() { - let criteria = Instrument::new().name("test_?"); - let mask = Stream::builder().build().unwrap(); - - let view = new_view(criteria, mask).expect("Expected to create a new view"); - - // Instrument name that should match the pattern "test_?". - let matching_instrument = Instrument::new().name("test_1"); - assert!( - view.match_inst(&matching_instrument).is_some(), - "Expected to match instrument with test_? pattern" - ); - - // Instrument name that should not match the pattern "test_?". - let non_matching_instrument = Instrument::new().name("test_12"); - assert!( - view.match_inst(&non_matching_instrument).is_none(), - "Expected not to match instrument with test_? pattern" - ); - } -} From e7c5ba2c1dde3f07bf454aefbfc2c1ab149b04f3 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Wed, 21 May 2025 10:39:40 -0700 Subject: [PATCH 2/5] TODO comment --- opentelemetry-sdk/src/metrics/view.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/opentelemetry-sdk/src/metrics/view.rs b/opentelemetry-sdk/src/metrics/view.rs index 046e56d298..1d28426a12 100644 --- a/opentelemetry-sdk/src/metrics/view.rs +++ b/opentelemetry-sdk/src/metrics/view.rs @@ -36,6 +36,8 @@ use super::instrument::{Instrument, Stream}; /// let provider = SdkMeterProvider::builder().with_view(my_view).build(); /// # drop(provider) /// ``` +// TODO: This trait need not be public, if we modify MeterProvider to take a +// Fn(&Instrument) -> Option instead of View. pub trait View: Send + Sync + 'static { /// Defines how data should be collected for certain instruments. /// From 0c5e7c1465bf9315f48a0414c3932b6fc7a23979 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Wed, 21 May 2025 12:05:28 -0700 Subject: [PATCH 3/5] add some coverage for advanced views --- opentelemetry-sdk/src/metrics/meter.rs | 2 +- opentelemetry-sdk/src/metrics/mod.rs | 102 +++++++++++++++++++++++++ 2 files changed, 103 insertions(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/metrics/meter.rs b/opentelemetry-sdk/src/metrics/meter.rs index 0ec960c313..ea0545bb3a 100644 --- a/opentelemetry-sdk/src/metrics/meter.rs +++ b/opentelemetry-sdk/src/metrics/meter.rs @@ -683,7 +683,7 @@ where name, description: description.unwrap_or_default(), unit: unit.unwrap_or_default(), - kind: kind, + kind, scope: self.meter.scope.clone(), }; diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index e92eaf6378..6bf9ac732d 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -1687,6 +1687,108 @@ mod tests { ); } + // Following are just a basic set of advanced View tests - Views bring a lot + // of permutations and combinations, and we need + // to expand coverage for more scenarios in future. + // It is best to first split this file into multiple files + // based on scenarios (eg: regular aggregation, cardinality, views, view_advanced, etc) + // and then add more tests for each of the scenarios. + #[test] + fn test_view_single_instrument_multiple_stream() { + // Run this test with stdout enabled to see output. + // cargo test test_view_multiple_stream --all-features + + // Each of the views match the instrument name "my_counter" and create a + // new stream with a different name. In other words, View can be used to + // create multiple streams for the same instrument. + + let view1 = |i: &Instrument| { + if i.name() == "my_counter" { + Some(Stream::builder().with_name("my_counter_1").build().unwrap()) + } else { + None + } + }; + + let view2 = |i: &Instrument| { + if i.name() == "my_counter" { + Some(Stream::builder().with_name("my_counter_2").build().unwrap()) + } else { + None + } + }; + + // Arrange + let exporter = InMemoryMetricExporter::default(); + let meter_provider = SdkMeterProvider::builder() + .with_periodic_exporter(exporter.clone()) + .with_view(view1) + .with_view(view2) + .build(); + + // Act + let meter = meter_provider.meter("test"); + let counter = meter.f64_counter("my_counter").build(); + + counter.add(1.5, &[KeyValue::new("key1", "value1")]); + meter_provider.force_flush().unwrap(); + + // Assert + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + assert_eq!(resource_metrics.len(), 1); + assert_eq!(resource_metrics[0].scope_metrics.len(), 1); + let metrics = &resource_metrics[0].scope_metrics[0].metrics; + assert_eq!(metrics.len(), 2); + assert_eq!(metrics[0].name, "my_counter_1"); + assert_eq!(metrics[1].name, "my_counter_2"); + } + + #[test] + fn test_view_multiple_instrument_single_stream() { + // Run this test with stdout enabled to see output. + // cargo test test_view_multiple_instrument_single_stream --all-features + + // The view matches the instrument name "my_counter1" and "my_counter1" + // and create a single new stream for both. In other words, View can be used to + // "merge" multiple instruments into a single stream. + let view = |i: &Instrument| { + if i.name() == "my_counter1" || i.name() == "my_counter2" { + Some(Stream::builder().with_name("my_counter").build().unwrap()) + } else { + None + } + }; + + // Arrange + let exporter = InMemoryMetricExporter::default(); + let meter_provider = SdkMeterProvider::builder() + .with_periodic_exporter(exporter.clone()) + .with_view(view) + .build(); + + // Act + let meter = meter_provider.meter("test"); + let counter1 = meter.f64_counter("my_counter1").build(); + let counter2 = meter.f64_counter("my_counter2").build(); + + counter1.add(1.5, &[KeyValue::new("key1", "value1")]); + counter2.add(1.5, &[KeyValue::new("key1", "value1")]); + meter_provider.force_flush().unwrap(); + + // Assert + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + assert_eq!(resource_metrics.len(), 1); + assert_eq!(resource_metrics[0].scope_metrics.len(), 1); + let metrics = &resource_metrics[0].scope_metrics[0].metrics; + assert_eq!(metrics.len(), 1); + assert_eq!(metrics[0].name, "my_counter"); + // TODO: Assert that the data points are aggregated correctly. + } + fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper( instrument_name: &'static str, should_not_emit: bool, From 48ae7fd60edbb4c76f182097a0e06566432701f1 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Wed, 21 May 2025 13:19:17 -0700 Subject: [PATCH 4/5] fix benches --- opentelemetry-sdk/benches/metric.rs | 53 +++++++++++++---------------- 1 file changed, 24 insertions(+), 29 deletions(-) diff --git a/opentelemetry-sdk/benches/metric.rs b/opentelemetry-sdk/benches/metric.rs index 54eceb2116..8ad8e91fe5 100644 --- a/opentelemetry-sdk/benches/metric.rs +++ b/opentelemetry-sdk/benches/metric.rs @@ -6,8 +6,8 @@ use opentelemetry::{ use opentelemetry_sdk::{ error::OTelSdkResult, metrics::{ - data::ResourceMetrics, new_view, reader::MetricReader, Aggregation, Instrument, - InstrumentKind, ManualReader, Pipeline, SdkMeterProvider, Stream, Temporality, View, + data::ResourceMetrics, reader::MetricReader, Aggregation, Instrument, InstrumentKind, + ManualReader, Pipeline, SdkMeterProvider, Stream, Temporality, View, }, }; use rand::Rng; @@ -220,16 +220,12 @@ fn counters(c: &mut Criterion) { }); let (_, cntr) = bench_counter( - Some( - new_view( - Instrument::new().name("*"), - Stream::builder() - .with_allowed_attribute_keys([Key::new("K")]) - .build() - .unwrap(), - ) - .unwrap(), - ), + Some(Box::new(|_i: &Instrument| { + Stream::builder() + .with_allowed_attribute_keys([Key::new("K")]) + .build() + .ok() + })), "cumulative", ); @@ -273,25 +269,24 @@ fn bench_histogram(bound_count: usize) -> (SharedReader, Histogram) { for i in 0..bounds.len() { bounds[i] = i * MAX_BOUND / bound_count } - let view = Some( - new_view( - Instrument::new().name("histogram_*"), - Stream::builder() - .with_aggregation(Aggregation::ExplicitBucketHistogram { - boundaries: bounds.iter().map(|&x| x as f64).collect(), - record_min_max: true, - }) - .build() - .unwrap(), - ) - .unwrap(), - ); let r = SharedReader(Arc::new(ManualReader::default())); - let mut builder = SdkMeterProvider::builder().with_reader(r.clone()); - if let Some(view) = view { - builder = builder.with_view(view); - } + let builder = SdkMeterProvider::builder() + .with_reader(r.clone()) + .with_view(Box::new(move |i: &Instrument| { + if i.name() == "histogram_*" { + Stream::builder() + .with_aggregation(Aggregation::ExplicitBucketHistogram { + boundaries: bounds.iter().map(|&x| x as f64).collect(), + record_min_max: true, + }) + .build() + .ok() + } else { + None + } + })); + let mtr = builder.build().meter("test_meter"); let hist = mtr .u64_histogram(format!("histogram_{}", bound_count)) From a33760c79b838f2a59ebdc2d8c3929890215b8ce Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Wed, 21 May 2025 15:47:21 -0700 Subject: [PATCH 5/5] make bench do same as before --- opentelemetry-sdk/benches/metric.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/benches/metric.rs b/opentelemetry-sdk/benches/metric.rs index 8ad8e91fe5..e0efc9b21b 100644 --- a/opentelemetry-sdk/benches/metric.rs +++ b/opentelemetry-sdk/benches/metric.rs @@ -274,7 +274,7 @@ fn bench_histogram(bound_count: usize) -> (SharedReader, Histogram) { let builder = SdkMeterProvider::builder() .with_reader(r.clone()) .with_view(Box::new(move |i: &Instrument| { - if i.name() == "histogram_*" { + if i.name().starts_with("histogram_") { Stream::builder() .with_aggregation(Aggregation::ExplicitBucketHistogram { boundaries: bounds.iter().map(|&x| x as f64).collect(),