diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index d09c8dd855..c813c74625 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -42,6 +42,10 @@ also modified to suppress telemetry before invoking exporters. behind feature flag "experimental_metrics_custom_reader". [#2928](https://github.com/open-telemetry/opentelemetry-rust/pull/2928) +- TODO: Placeholder for View related changelog. Polish this after all + changes are done. + Hide public fields from `Stream` struct. + - *Breaking* `Aggregation` enum moved behind feature flag "spec_unstable_metrics_views". This was only required when using Views. [#2928](https://github.com/open-telemetry/opentelemetry-rust/pull/2928) diff --git a/opentelemetry-sdk/src/metrics/instrument.rs b/opentelemetry-sdk/src/metrics/instrument.rs index 559e9c5328..63464fa5f1 100644 --- a/opentelemetry-sdk/src/metrics/instrument.rs +++ b/opentelemetry-sdk/src/metrics/instrument.rs @@ -190,22 +190,22 @@ impl Instrument { #[allow(unreachable_pub)] pub struct Stream { /// The human-readable identifier of the stream. - pub name: Cow<'static, str>, + pub(crate) name: Option>, /// Describes the purpose of the data. - pub description: Cow<'static, str>, + pub(crate) description: Option>, /// the unit of measurement recorded. - pub unit: Cow<'static, str>, + pub(crate) unit: Option>, /// Aggregation the stream uses for an instrument. - pub aggregation: Option, + pub(crate) aggregation: Option, /// An allow-list of attribute keys that will be preserved for the stream. /// /// Any attribute recorded for the stream with a key not in this set will be /// dropped. If the set is empty, all attributes will be dropped, if `None` all /// attributes will be kept. - pub allowed_attribute_keys: Option>>, + pub(crate) allowed_attribute_keys: Option>>, /// Cardinality limit for the stream. - pub cardinality_limit: Option, + pub(crate) cardinality_limit: Option, } #[cfg(feature = "spec_unstable_metrics_views")] @@ -217,19 +217,19 @@ impl Stream { /// Set the stream name. pub fn name(mut self, name: impl Into>) -> Self { - self.name = name.into(); + self.name = Some(name.into()); self } /// Set the stream description. pub fn description(mut self, description: impl Into>) -> Self { - self.description = description.into(); + self.description = Some(description.into()); self } /// Set the stream unit. pub fn unit(mut self, unit: impl Into>) -> Self { - self.unit = unit.into(); + self.unit = Some(unit.into()); self } diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 285ef39a73..c98d7affa8 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -1463,6 +1463,201 @@ mod tests { ); } + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn view_test_rename() { + test_view_customization( + |i| { + if i.name == "my_counter" { + Some(Stream::new().name("my_counter_renamed")) + } else { + None + } + }, + "my_counter_renamed", + "my_unit", + "my_description", + ) + .await; + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn view_test_change_unit() { + test_view_customization( + |i| { + if i.name == "my_counter" { + Some(Stream::new().unit("my_unit_new")) + } else { + None + } + }, + "my_counter", + "my_unit_new", + "my_description", + ) + .await; + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn view_test_change_description() { + test_view_customization( + |i| { + if i.name == "my_counter" { + Some(Stream::new().description("my_description_new")) + } else { + None + } + }, + "my_counter", + "my_unit", + "my_description_new", + ) + .await; + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn view_test_change_name_unit() { + test_view_customization( + |i| { + if i.name == "my_counter" { + Some(Stream::new().name("my_counter_renamed").unit("my_unit_new")) + } else { + None + } + }, + "my_counter_renamed", + "my_unit_new", + "my_description", + ) + .await; + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn view_test_change_name_unit_desc() { + test_view_customization( + |i| { + if i.name == "my_counter" { + Some( + Stream::new() + .name("my_counter_renamed") + .unit("my_unit_new") + .description("my_description_new"), + ) + } else { + None + } + }, + "my_counter_renamed", + "my_unit_new", + "my_description_new", + ) + .await; + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn view_test_match_unit() { + test_view_customization( + |i| { + if i.unit == "my_unit" { + Some(Stream::new().unit("my_unit_new")) + } else { + None + } + }, + "my_counter", + "my_unit_new", + "my_description", + ) + .await; + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn view_test_match_none() { + test_view_customization( + |i| { + if i.name == "not_expected_to_match" { + Some(Stream::new()) + } else { + None + } + }, + "my_counter", + "my_unit", + "my_description", + ) + .await; + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn view_test_match_multiple() { + test_view_customization( + |i| { + if i.name == "my_counter" && i.unit == "my_unit" { + Some(Stream::new().name("my_counter_renamed")) + } else { + None + } + }, + "my_counter_renamed", + "my_unit", + "my_description", + ) + .await; + } + + /// Helper function to test view customizations + async fn test_view_customization( + view_function: F, + expected_name: &str, + expected_unit: &str, + expected_description: &str, + ) where + F: Fn(&Instrument) -> Option + Send + Sync + 'static, + { + // Run this test with stdout enabled to see output. + // cargo test view_test_* --all-features -- --nocapture + + // Arrange + let exporter = InMemoryMetricExporter::default(); + let meter_provider = SdkMeterProvider::builder() + .with_periodic_exporter(exporter.clone()) + .with_view(view_function) + .build(); + + // Act + let meter = meter_provider.meter("test"); + let counter = meter + .f64_counter("my_counter") + .with_unit("my_unit") + .with_description("my_description") + .build(); + + counter.add(1.5, &[KeyValue::new("key1", "value1")]); + meter_provider.force_flush().unwrap(); + + // Assert + let resource_metrics = exporter + .get_finished_metrics() + .expect("metrics are expected to be exported."); + assert_eq!(resource_metrics.len(), 1); + assert_eq!(resource_metrics[0].scope_metrics.len(), 1); + let metric = &resource_metrics[0].scope_metrics[0].metrics[0]; + assert_eq!( + metric.name, expected_name, + "Expected name: {}.", + expected_name + ); + assert_eq!( + metric.unit, expected_unit, + "Expected unit: {}.", + expected_unit + ); + assert_eq!( + metric.description, expected_description, + "Expected description: {}.", + expected_description + ); + } + fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper( instrument_name: &'static str, should_not_emit: bool, diff --git a/opentelemetry-sdk/src/metrics/pipeline.rs b/opentelemetry-sdk/src/metrics/pipeline.rs index 05d2861807..e4d62b3955 100644 --- a/opentelemetry-sdk/src/metrics/pipeline.rs +++ b/opentelemetry-sdk/src/metrics/pipeline.rs @@ -267,12 +267,22 @@ where // The cache will return the same Aggregator instance. Use stream ids to de duplicate. let mut seen = HashSet::new(); for v in &self.pipeline.views { - let stream = match v.match_inst(&inst) { + let mut stream = match v.match_inst(&inst) { Some(stream) => stream, None => continue, }; matched = true; + if stream.name.is_none() { + stream.name = Some(inst.name.clone()); + } + if stream.description.is_none() { + stream.description = Some(inst.description.clone()); + } + if stream.unit.is_none() { + stream.unit = Some(inst.unit.clone()); + } + let id = self.inst_id(kind, &stream); if seen.contains(&id) { continue; // This aggregator has already been added @@ -300,9 +310,9 @@ where // Apply implicit default view if no explicit matched. let mut stream = Stream { - name: inst.name, - description: inst.description, - unit: inst.unit, + name: Some(inst.name), + description: Some(inst.description), + unit: Some(inst.unit), aggregation: None, allowed_attribute_keys: None, cardinality_limit, @@ -403,16 +413,16 @@ where otel_debug!( name : "Metrics.InstrumentCreated", - instrument_name = stream.name.as_ref(), + instrument_name = stream.name.clone().unwrap_or_default().as_ref(), cardinality_limit = cardinality_limit, ); self.pipeline.add_sync( scope.clone(), InstrumentSync { - name: stream.name, - description: stream.description, - unit: stream.unit, + name: stream.name.unwrap_or_default(), + description: stream.description.unwrap_or_default(), + unit: stream.unit.unwrap_or_default(), comp_agg: collect, }, ); @@ -453,10 +463,10 @@ where fn inst_id(&self, kind: InstrumentKind, stream: &Stream) -> InstrumentId { InstrumentId { - name: stream.name.clone(), - description: stream.description.clone(), + name: stream.name.clone().unwrap_or_default(), + description: stream.description.clone().unwrap_or_default(), kind, - unit: stream.unit.clone(), + unit: stream.unit.clone().unwrap_or_default(), number: Cow::Borrowed(std::any::type_name::()), } } diff --git a/opentelemetry-sdk/src/metrics/view.rs b/opentelemetry-sdk/src/metrics/view.rs index 9ec02717f5..eab2916664 100644 --- a/opentelemetry-sdk/src/metrics/view.rs +++ b/opentelemetry-sdk/src/metrics/view.rs @@ -110,7 +110,7 @@ pub fn new_view(criteria: Instrument, mask: Stream) -> MetricResult bool + Send + Sync> = if contains_wildcard { - if !mask.name.is_empty() { + if mask.name.is_some() { // TODO - The error is getting lost here. Need to return or log. return Ok(Box::new(empty_view)); } @@ -144,20 +144,20 @@ pub fn new_view(criteria: Instrument, mask: Stream) -> MetricResult Option { if match_fn(i) { Some(Stream { - name: if !mask.name.is_empty() { + name: if mask.name.is_some() { mask.name.clone() } else { - i.name.clone() + Some(i.name.clone()) }, - description: if !mask.description.is_empty() { + description: if mask.description.is_some() { mask.description.clone() } else { - i.description.clone() + Some(i.description.clone()) }, - unit: if !mask.unit.is_empty() { + unit: if mask.unit.is_some() { mask.unit.clone() } else { - i.unit.clone() + Some(i.unit.clone()) }, aggregation: agg.clone(), allowed_attribute_keys: mask.allowed_attribute_keys.clone(),