Skip to content

Commit cc93ead

Browse files
authored
fix: Metrics Views - fix a bug that causes unit, description to be lost when applying views that influence other aspects (#2981)
1 parent d52dcef commit cc93ead

File tree

5 files changed

+236
-27
lines changed

5 files changed

+236
-27
lines changed

opentelemetry-sdk/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ also modified to suppress telemetry before invoking exporters.
4242
behind feature flag "experimental_metrics_custom_reader".
4343
[#2928](https://github.com/open-telemetry/opentelemetry-rust/pull/2928)
4444

45+
- TODO: Placeholder for View related changelog. Polish this after all
46+
changes are done.
47+
Hide public fields from `Stream` struct.
48+
4549
- *Breaking* `Aggregation` enum moved behind feature flag
4650
"spec_unstable_metrics_views". This was only required when using Views.
4751
[#2928](https://github.com/open-telemetry/opentelemetry-rust/pull/2928)

opentelemetry-sdk/src/metrics/instrument.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -190,22 +190,22 @@ impl Instrument {
190190
#[allow(unreachable_pub)]
191191
pub struct Stream {
192192
/// The human-readable identifier of the stream.
193-
pub name: Cow<'static, str>,
193+
pub(crate) name: Option<Cow<'static, str>>,
194194
/// Describes the purpose of the data.
195-
pub description: Cow<'static, str>,
195+
pub(crate) description: Option<Cow<'static, str>>,
196196
/// the unit of measurement recorded.
197-
pub unit: Cow<'static, str>,
197+
pub(crate) unit: Option<Cow<'static, str>>,
198198
/// Aggregation the stream uses for an instrument.
199-
pub aggregation: Option<Aggregation>,
199+
pub(crate) aggregation: Option<Aggregation>,
200200
/// An allow-list of attribute keys that will be preserved for the stream.
201201
///
202202
/// Any attribute recorded for the stream with a key not in this set will be
203203
/// dropped. If the set is empty, all attributes will be dropped, if `None` all
204204
/// attributes will be kept.
205-
pub allowed_attribute_keys: Option<Arc<HashSet<Key>>>,
205+
pub(crate) allowed_attribute_keys: Option<Arc<HashSet<Key>>>,
206206

207207
/// Cardinality limit for the stream.
208-
pub cardinality_limit: Option<usize>,
208+
pub(crate) cardinality_limit: Option<usize>,
209209
}
210210

211211
#[cfg(feature = "spec_unstable_metrics_views")]
@@ -217,19 +217,19 @@ impl Stream {
217217

218218
/// Set the stream name.
219219
pub fn name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
220-
self.name = name.into();
220+
self.name = Some(name.into());
221221
self
222222
}
223223

224224
/// Set the stream description.
225225
pub fn description(mut self, description: impl Into<Cow<'static, str>>) -> Self {
226-
self.description = description.into();
226+
self.description = Some(description.into());
227227
self
228228
}
229229

230230
/// Set the stream unit.
231231
pub fn unit(mut self, unit: impl Into<Cow<'static, str>>) -> Self {
232-
self.unit = unit.into();
232+
self.unit = Some(unit.into());
233233
self
234234
}
235235

opentelemetry-sdk/src/metrics/mod.rs

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1463,6 +1463,201 @@ mod tests {
14631463
);
14641464
}
14651465

1466+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1467+
async fn view_test_rename() {
1468+
test_view_customization(
1469+
|i| {
1470+
if i.name == "my_counter" {
1471+
Some(Stream::new().name("my_counter_renamed"))
1472+
} else {
1473+
None
1474+
}
1475+
},
1476+
"my_counter_renamed",
1477+
"my_unit",
1478+
"my_description",
1479+
)
1480+
.await;
1481+
}
1482+
1483+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1484+
async fn view_test_change_unit() {
1485+
test_view_customization(
1486+
|i| {
1487+
if i.name == "my_counter" {
1488+
Some(Stream::new().unit("my_unit_new"))
1489+
} else {
1490+
None
1491+
}
1492+
},
1493+
"my_counter",
1494+
"my_unit_new",
1495+
"my_description",
1496+
)
1497+
.await;
1498+
}
1499+
1500+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1501+
async fn view_test_change_description() {
1502+
test_view_customization(
1503+
|i| {
1504+
if i.name == "my_counter" {
1505+
Some(Stream::new().description("my_description_new"))
1506+
} else {
1507+
None
1508+
}
1509+
},
1510+
"my_counter",
1511+
"my_unit",
1512+
"my_description_new",
1513+
)
1514+
.await;
1515+
}
1516+
1517+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1518+
async fn view_test_change_name_unit() {
1519+
test_view_customization(
1520+
|i| {
1521+
if i.name == "my_counter" {
1522+
Some(Stream::new().name("my_counter_renamed").unit("my_unit_new"))
1523+
} else {
1524+
None
1525+
}
1526+
},
1527+
"my_counter_renamed",
1528+
"my_unit_new",
1529+
"my_description",
1530+
)
1531+
.await;
1532+
}
1533+
1534+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1535+
async fn view_test_change_name_unit_desc() {
1536+
test_view_customization(
1537+
|i| {
1538+
if i.name == "my_counter" {
1539+
Some(
1540+
Stream::new()
1541+
.name("my_counter_renamed")
1542+
.unit("my_unit_new")
1543+
.description("my_description_new"),
1544+
)
1545+
} else {
1546+
None
1547+
}
1548+
},
1549+
"my_counter_renamed",
1550+
"my_unit_new",
1551+
"my_description_new",
1552+
)
1553+
.await;
1554+
}
1555+
1556+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1557+
async fn view_test_match_unit() {
1558+
test_view_customization(
1559+
|i| {
1560+
if i.unit == "my_unit" {
1561+
Some(Stream::new().unit("my_unit_new"))
1562+
} else {
1563+
None
1564+
}
1565+
},
1566+
"my_counter",
1567+
"my_unit_new",
1568+
"my_description",
1569+
)
1570+
.await;
1571+
}
1572+
1573+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1574+
async fn view_test_match_none() {
1575+
test_view_customization(
1576+
|i| {
1577+
if i.name == "not_expected_to_match" {
1578+
Some(Stream::new())
1579+
} else {
1580+
None
1581+
}
1582+
},
1583+
"my_counter",
1584+
"my_unit",
1585+
"my_description",
1586+
)
1587+
.await;
1588+
}
1589+
1590+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1591+
async fn view_test_match_multiple() {
1592+
test_view_customization(
1593+
|i| {
1594+
if i.name == "my_counter" && i.unit == "my_unit" {
1595+
Some(Stream::new().name("my_counter_renamed"))
1596+
} else {
1597+
None
1598+
}
1599+
},
1600+
"my_counter_renamed",
1601+
"my_unit",
1602+
"my_description",
1603+
)
1604+
.await;
1605+
}
1606+
1607+
/// Helper function to test view customizations
1608+
async fn test_view_customization<F>(
1609+
view_function: F,
1610+
expected_name: &str,
1611+
expected_unit: &str,
1612+
expected_description: &str,
1613+
) where
1614+
F: Fn(&Instrument) -> Option<Stream> + Send + Sync + 'static,
1615+
{
1616+
// Run this test with stdout enabled to see output.
1617+
// cargo test view_test_* --all-features -- --nocapture
1618+
1619+
// Arrange
1620+
let exporter = InMemoryMetricExporter::default();
1621+
let meter_provider = SdkMeterProvider::builder()
1622+
.with_periodic_exporter(exporter.clone())
1623+
.with_view(view_function)
1624+
.build();
1625+
1626+
// Act
1627+
let meter = meter_provider.meter("test");
1628+
let counter = meter
1629+
.f64_counter("my_counter")
1630+
.with_unit("my_unit")
1631+
.with_description("my_description")
1632+
.build();
1633+
1634+
counter.add(1.5, &[KeyValue::new("key1", "value1")]);
1635+
meter_provider.force_flush().unwrap();
1636+
1637+
// Assert
1638+
let resource_metrics = exporter
1639+
.get_finished_metrics()
1640+
.expect("metrics are expected to be exported.");
1641+
assert_eq!(resource_metrics.len(), 1);
1642+
assert_eq!(resource_metrics[0].scope_metrics.len(), 1);
1643+
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
1644+
assert_eq!(
1645+
metric.name, expected_name,
1646+
"Expected name: {}.",
1647+
expected_name
1648+
);
1649+
assert_eq!(
1650+
metric.unit, expected_unit,
1651+
"Expected unit: {}.",
1652+
expected_unit
1653+
);
1654+
assert_eq!(
1655+
metric.description, expected_description,
1656+
"Expected description: {}.",
1657+
expected_description
1658+
);
1659+
}
1660+
14661661
fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
14671662
instrument_name: &'static str,
14681663
should_not_emit: bool,

opentelemetry-sdk/src/metrics/pipeline.rs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -267,12 +267,22 @@ where
267267
// The cache will return the same Aggregator instance. Use stream ids to de duplicate.
268268
let mut seen = HashSet::new();
269269
for v in &self.pipeline.views {
270-
let stream = match v.match_inst(&inst) {
270+
let mut stream = match v.match_inst(&inst) {
271271
Some(stream) => stream,
272272
None => continue,
273273
};
274274
matched = true;
275275

276+
if stream.name.is_none() {
277+
stream.name = Some(inst.name.clone());
278+
}
279+
if stream.description.is_none() {
280+
stream.description = Some(inst.description.clone());
281+
}
282+
if stream.unit.is_none() {
283+
stream.unit = Some(inst.unit.clone());
284+
}
285+
276286
let id = self.inst_id(kind, &stream);
277287
if seen.contains(&id) {
278288
continue; // This aggregator has already been added
@@ -300,9 +310,9 @@ where
300310

301311
// Apply implicit default view if no explicit matched.
302312
let mut stream = Stream {
303-
name: inst.name,
304-
description: inst.description,
305-
unit: inst.unit,
313+
name: Some(inst.name),
314+
description: Some(inst.description),
315+
unit: Some(inst.unit),
306316
aggregation: None,
307317
allowed_attribute_keys: None,
308318
cardinality_limit,
@@ -403,16 +413,16 @@ where
403413

404414
otel_debug!(
405415
name : "Metrics.InstrumentCreated",
406-
instrument_name = stream.name.as_ref(),
416+
instrument_name = stream.name.clone().unwrap_or_default().as_ref(),
407417
cardinality_limit = cardinality_limit,
408418
);
409419

410420
self.pipeline.add_sync(
411421
scope.clone(),
412422
InstrumentSync {
413-
name: stream.name,
414-
description: stream.description,
415-
unit: stream.unit,
423+
name: stream.name.unwrap_or_default(),
424+
description: stream.description.unwrap_or_default(),
425+
unit: stream.unit.unwrap_or_default(),
416426
comp_agg: collect,
417427
},
418428
);
@@ -453,10 +463,10 @@ where
453463

454464
fn inst_id(&self, kind: InstrumentKind, stream: &Stream) -> InstrumentId {
455465
InstrumentId {
456-
name: stream.name.clone(),
457-
description: stream.description.clone(),
466+
name: stream.name.clone().unwrap_or_default(),
467+
description: stream.description.clone().unwrap_or_default(),
458468
kind,
459-
unit: stream.unit.clone(),
469+
unit: stream.unit.clone().unwrap_or_default(),
460470
number: Cow::Borrowed(std::any::type_name::<T>()),
461471
}
462472
}

opentelemetry-sdk/src/metrics/view.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ pub fn new_view(criteria: Instrument, mask: Stream) -> MetricResult<Box<dyn View
110110
let contains_wildcard = criteria.name.contains(['*', '?']);
111111

112112
let match_fn: Box<dyn Fn(&Instrument) -> bool + Send + Sync> = if contains_wildcard {
113-
if !mask.name.is_empty() {
113+
if mask.name.is_some() {
114114
// TODO - The error is getting lost here. Need to return or log.
115115
return Ok(Box::new(empty_view));
116116
}
@@ -144,20 +144,20 @@ pub fn new_view(criteria: Instrument, mask: Stream) -> MetricResult<Box<dyn View
144144
Ok(Box::new(move |i: &Instrument| -> Option<Stream> {
145145
if match_fn(i) {
146146
Some(Stream {
147-
name: if !mask.name.is_empty() {
147+
name: if mask.name.is_some() {
148148
mask.name.clone()
149149
} else {
150-
i.name.clone()
150+
Some(i.name.clone())
151151
},
152-
description: if !mask.description.is_empty() {
152+
description: if mask.description.is_some() {
153153
mask.description.clone()
154154
} else {
155-
i.description.clone()
155+
Some(i.description.clone())
156156
},
157-
unit: if !mask.unit.is_empty() {
157+
unit: if mask.unit.is_some() {
158158
mask.unit.clone()
159159
} else {
160-
i.unit.clone()
160+
Some(i.unit.clone())
161161
},
162162
aggregation: agg.clone(),
163163
allowed_attribute_keys: mask.allowed_attribute_keys.clone(),

0 commit comments

Comments
 (0)