Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ also modified to suppress telemetry before invoking exporters.
behind feature flag "experimental_metrics_custom_reader".
[#2928](https://github.com/open-telemetry/opentelemetry-rust/pull/2928)

- TODO: Placeholder for View related changelog. Polish this after all
changes are done.
Hide public fields from `Stream` struct.

- *Breaking* `Aggregation` enum moved behind feature flag
"spec_unstable_metrics_views". This was only required when using Views.
[#2928](https://github.com/open-telemetry/opentelemetry-rust/pull/2928)
Expand Down
18 changes: 9 additions & 9 deletions opentelemetry-sdk/src/metrics/instrument.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,22 +190,22 @@ impl Instrument {
#[allow(unreachable_pub)]
pub struct Stream {
/// The human-readable identifier of the stream.
pub name: Cow<'static, str>,
pub(crate) name: Option<Cow<'static, str>>,
/// Describes the purpose of the data.
pub description: Cow<'static, str>,
pub(crate) description: Option<Cow<'static, str>>,
/// the unit of measurement recorded.
pub unit: Cow<'static, str>,
pub(crate) unit: Option<Cow<'static, str>>,
/// Aggregation the stream uses for an instrument.
pub aggregation: Option<Aggregation>,
pub(crate) aggregation: Option<Aggregation>,
/// An allow-list of attribute keys that will be preserved for the stream.
///
/// Any attribute recorded for the stream with a key not in this set will be
/// dropped. If the set is empty, all attributes will be dropped, if `None` all
/// attributes will be kept.
pub allowed_attribute_keys: Option<Arc<HashSet<Key>>>,
pub(crate) allowed_attribute_keys: Option<Arc<HashSet<Key>>>,

/// Cardinality limit for the stream.
pub cardinality_limit: Option<usize>,
pub(crate) cardinality_limit: Option<usize>,
}

#[cfg(feature = "spec_unstable_metrics_views")]
Expand All @@ -217,19 +217,19 @@ impl Stream {

/// Set the stream name.
pub fn name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
self.name = name.into();
self.name = Some(name.into());
self
}

/// Set the stream description.
pub fn description(mut self, description: impl Into<Cow<'static, str>>) -> Self {
self.description = description.into();
self.description = Some(description.into());
self
}

/// Set the stream unit.
pub fn unit(mut self, unit: impl Into<Cow<'static, str>>) -> Self {
self.unit = unit.into();
self.unit = Some(unit.into());
self
}

Expand Down
138 changes: 138 additions & 0 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1463,6 +1463,144 @@
);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn view_test_rename() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests and the helper method need not be async.

test_view_customization(
|i| {
if i.name == "my_counter" {
Some(Stream::new().name("my_counter_renamed"))
} else {
None
}
},
"my_counter_renamed",
"my_unit",
"my_description",
)
.await;
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn view_test_change_unit() {
test_view_customization(
|i| {
if i.name == "my_counter" {
Some(Stream::new().unit("my_unit_new"))
} else {
None
}
},
"my_counter",
"my_unit_new",
"my_description",
)
.await;
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn view_test_change_description() {
test_view_customization(
|i| {
if i.name == "my_counter" {
Some(Stream::new().description("my_description_new"))
} else {
None
}
},
"my_counter",
"my_unit",
"my_description_new",
)
.await;
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn view_test_match_unit() {
test_view_customization(
|i| {
if i.unit == "my_unit" {
Some(Stream::new().unit("my_unit_new"))
} else {
None
}
},
"my_counter",
"my_unit_new",
"my_description",
)
.await;
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn view_test_match_none() {
test_view_customization(
|i| {
if i.name == "not_expected_to_match" {
Some(Stream::new())
} else {
None
}
},
"my_counter",
"my_unit",
"my_description",
)
.await;
}

/// Helper function to test view customizations
async fn test_view_customization<F>(
view_function: F,
expected_name: &str,
expected_unit: &str,
expected_description: &str,
) where
F: Fn(&Instrument) -> Option<Stream> + Send + Sync + 'static,
{
// Run this test with stdout enabled to see output.
// cargo test view_test_* --all-features -- --nocapture

// Arrange
let exporter = InMemoryMetricExporter::default();
let meter_provider = SdkMeterProvider::builder()
.with_periodic_exporter(exporter.clone())
.with_view(view_function)
.build();

// Act
let meter = meter_provider.meter("test");
let counter = meter
.f64_counter("my_counter")
.with_unit("my_unit")
.with_description("my_description")
.build();

counter.add(1.5, &[KeyValue::new("key1", "value1")]);
meter_provider.force_flush().unwrap();

// Assert
let resource_metrics = exporter
.get_finished_metrics()
.expect("metrics are expected to be exported.");
assert!(!resource_metrics.is_empty());
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
assert_eq!(
metric.name, expected_name,
"Expected name: {}.",

Check warning on line 1589 in opentelemetry-sdk/src/metrics/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/mod.rs#L1589

Added line #L1589 was not covered by tests
expected_name
);
assert_eq!(
metric.unit, expected_unit,
"Expected unit: {}.",

Check warning on line 1594 in opentelemetry-sdk/src/metrics/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/mod.rs#L1594

Added line #L1594 was not covered by tests
expected_unit
);
assert_eq!(
metric.description, expected_description,
"Expected description: {}.",

Check warning on line 1599 in opentelemetry-sdk/src/metrics/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/mod.rs#L1599

Added line #L1599 was not covered by tests
expected_description
);
}

fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
instrument_name: &'static str,
should_not_emit: bool,
Expand Down
32 changes: 21 additions & 11 deletions opentelemetry-sdk/src/metrics/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,22 @@
// The cache will return the same Aggregator instance. Use stream ids to de duplicate.
let mut seen = HashSet::new();
for v in &self.pipeline.views {
let stream = match v.match_inst(&inst) {
let mut stream = match v.match_inst(&inst) {
Some(stream) => stream,
None => continue,
};
matched = true;

if stream.name.is_none() {
stream.name = Some(inst.name.clone());
}
if stream.description.is_none() {
stream.description = Some(inst.description.clone());
}
if stream.unit.is_none() {
stream.unit = Some(inst.unit.clone());
}

let id = self.inst_id(kind, &stream);
if seen.contains(&id) {
continue; // This aggregator has already been added
Expand Down Expand Up @@ -300,9 +310,9 @@

// Apply implicit default view if no explicit matched.
let mut stream = Stream {
name: inst.name,
description: inst.description,
unit: inst.unit,
name: Some(inst.name),
description: Some(inst.description),
unit: Some(inst.unit),
aggregation: None,
allowed_attribute_keys: None,
cardinality_limit,
Expand Down Expand Up @@ -403,16 +413,16 @@

otel_debug!(
name : "Metrics.InstrumentCreated",
instrument_name = stream.name.as_ref(),
instrument_name = stream.name.clone().unwrap_or_default().as_ref(),

Check warning on line 416 in opentelemetry-sdk/src/metrics/pipeline.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/pipeline.rs#L416

Added line #L416 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are being forced to introduce a lot of unwrap_or_default() method calls even though we know that the stream provided to cached_aggregator method would always have some name, unit, and description.

We should ideally create another internal struct that would have the same fields as Stream but they wouldn't be optional. Using such a struct instead of Stream would allow to get rid of these unwrap_or_default() calls which have weakened the contract for cached_aggregator method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point! we can modify this separately, as we use the unwrap_default for buckets, attribute_keys too.

cardinality_limit = cardinality_limit,
);

self.pipeline.add_sync(
scope.clone(),
InstrumentSync {
name: stream.name,
description: stream.description,
unit: stream.unit,
name: stream.name.unwrap_or_default(),
description: stream.description.unwrap_or_default(),
unit: stream.unit.unwrap_or_default(),

Check warning on line 425 in opentelemetry-sdk/src/metrics/pipeline.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/pipeline.rs#L423-L425

Added lines #L423 - L425 were not covered by tests
comp_agg: collect,
},
);
Expand Down Expand Up @@ -453,10 +463,10 @@

fn inst_id(&self, kind: InstrumentKind, stream: &Stream) -> InstrumentId {
InstrumentId {
name: stream.name.clone(),
description: stream.description.clone(),
name: stream.name.clone().unwrap_or_default(),
description: stream.description.clone().unwrap_or_default(),
kind,
unit: stream.unit.clone(),
unit: stream.unit.clone().unwrap_or_default(),
number: Cow::Borrowed(std::any::type_name::<T>()),
}
}
Expand Down
14 changes: 7 additions & 7 deletions opentelemetry-sdk/src/metrics/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub fn new_view(criteria: Instrument, mask: Stream) -> MetricResult<Box<dyn View
let contains_wildcard = criteria.name.contains(['*', '?']);

let match_fn: Box<dyn Fn(&Instrument) -> bool + Send + Sync> = if contains_wildcard {
if !mask.name.is_empty() {
if mask.name.is_some() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still might have to check for empty string. A user could still provide an empty string as Some(String::new()).

// TODO - The error is getting lost here. Need to return or log.
return Ok(Box::new(empty_view));
}
Expand Down Expand Up @@ -144,20 +144,20 @@ pub fn new_view(criteria: Instrument, mask: Stream) -> MetricResult<Box<dyn View
Ok(Box::new(move |i: &Instrument| -> Option<Stream> {
if match_fn(i) {
Some(Stream {
name: if !mask.name.is_empty() {
name: if mask.name.is_some() {
mask.name.clone()
} else {
i.name.clone()
Some(i.name.clone())
},
description: if !mask.description.is_empty() {
description: if mask.description.is_some() {
mask.description.clone()
} else {
i.description.clone()
Some(i.description.clone())
},
unit: if !mask.unit.is_empty() {
unit: if mask.unit.is_some() {
mask.unit.clone()
} else {
i.unit.clone()
Some(i.unit.clone())
},
aggregation: agg.clone(),
allowed_attribute_keys: mask.allowed_attribute_keys.clone(),
Expand Down