Skip to content

Commit 97c47ab

Browse files
committed
fix: metrics view bug fix
1 parent d52dcef commit 97c47ab

File tree

4 files changed

+178
-24
lines changed

4 files changed

+178
-24
lines changed

opentelemetry-sdk/src/metrics/instrument.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -190,11 +190,11 @@ impl Instrument {
190190
#[allow(unreachable_pub)]
191191
pub struct Stream {
192192
/// The human-readable identifier of the stream.
193-
pub name: Cow<'static, str>,
193+
pub name: Option<Cow<'static, str>>,
194194
/// Describes the purpose of the data.
195-
pub description: Cow<'static, str>,
195+
pub description: Option<Cow<'static, str>>,
196196
/// the unit of measurement recorded.
197-
pub unit: Cow<'static, str>,
197+
pub unit: Option<Cow<'static, str>>,
198198
/// Aggregation the stream uses for an instrument.
199199
pub aggregation: Option<Aggregation>,
200200
/// An allow-list of attribute keys that will be preserved for the stream.
@@ -217,19 +217,19 @@ impl Stream {
217217

218218
/// Set the stream name.
219219
pub fn name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
220-
self.name = name.into();
220+
self.name = Some(name.into());
221221
self
222222
}
223223

224224
/// Set the stream description.
225225
pub fn description(mut self, description: impl Into<Cow<'static, str>>) -> Self {
226-
self.description = description.into();
226+
self.description = Some(description.into());
227227
self
228228
}
229229

230230
/// Set the stream unit.
231231
pub fn unit(mut self, unit: impl Into<Cow<'static, str>>) -> Self {
232-
self.unit = unit.into();
232+
self.unit = Some(unit.into());
233233
self
234234
}
235235

opentelemetry-sdk/src/metrics/mod.rs

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1463,6 +1463,150 @@ mod tests {
14631463
);
14641464
}
14651465

1466+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1467+
async fn view_test_rename() {
1468+
// Run this test with stdout enabled to see output.
1469+
// cargo test view_test_rename --all-features -- --nocapture
1470+
1471+
// Arrange
1472+
let view_rename = |i: &Instrument| {
1473+
if i.name == "my_counter" {
1474+
Some(Stream::new().name("my_counter_renamed"))
1475+
} else {
1476+
None
1477+
}
1478+
};
1479+
1480+
let exporter = InMemoryMetricExporter::default();
1481+
let meter_provider = SdkMeterProvider::builder()
1482+
.with_periodic_exporter(exporter.clone())
1483+
.with_view(view_rename)
1484+
.build();
1485+
1486+
// Act
1487+
let meter = meter_provider.meter("test");
1488+
let counter = meter
1489+
.f64_counter("my_counter")
1490+
.with_unit("my_unit")
1491+
.with_description("my_description")
1492+
.build();
1493+
1494+
counter.add(1.5, &[KeyValue::new("key1", "value1")]);
1495+
meter_provider.force_flush().unwrap();
1496+
1497+
// Assert
1498+
let resource_metrics = exporter
1499+
.get_finished_metrics()
1500+
.expect("metrics are expected to be exported.");
1501+
assert!(!resource_metrics.is_empty());
1502+
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
1503+
assert_eq!(
1504+
metric.name, "my_counter_renamed",
1505+
"my_counter_renamed is expected."
1506+
);
1507+
assert_eq!(metric.unit, "my_unit", "Original unit should be retained.");
1508+
assert_eq!(
1509+
metric.description, "my_description",
1510+
"Original description should be retained."
1511+
);
1512+
}
1513+
1514+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1515+
async fn view_test_change_unit() {
1516+
// Run this test with stdout enabled to see output.
1517+
// cargo test view_test_change_unit --all-features -- --nocapture
1518+
1519+
// Arrange
1520+
let view_rename = |i: &Instrument| {
1521+
if i.name == "my_counter" {
1522+
Some(Stream::new().unit("my_unit_new"))
1523+
} else {
1524+
None
1525+
}
1526+
};
1527+
1528+
let exporter = InMemoryMetricExporter::default();
1529+
let meter_provider = SdkMeterProvider::builder()
1530+
.with_periodic_exporter(exporter.clone())
1531+
.with_view(view_rename)
1532+
.build();
1533+
1534+
// Act
1535+
let meter = meter_provider.meter("test");
1536+
let counter = meter
1537+
.f64_counter("my_counter")
1538+
.with_unit("my_unit")
1539+
.with_description("my_description")
1540+
.build();
1541+
1542+
counter.add(1.5, &[KeyValue::new("key1", "value1")]);
1543+
meter_provider.force_flush().unwrap();
1544+
1545+
// Assert
1546+
let resource_metrics = exporter
1547+
.get_finished_metrics()
1548+
.expect("metrics are expected to be exported.");
1549+
assert!(!resource_metrics.is_empty());
1550+
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
1551+
assert_eq!(
1552+
metric.name, "my_counter",
1553+
"original name is expected."
1554+
);
1555+
assert_eq!(metric.unit, "my_unit_new", "unit should be updated.");
1556+
assert_eq!(
1557+
metric.description, "my_description",
1558+
"Original description should be retained."
1559+
);
1560+
}
1561+
1562+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1563+
async fn view_test_change_description() {
1564+
// Run this test with stdout enabled to see output.
1565+
// cargo test view_test_change_description --all-features -- --nocapture
1566+
1567+
// Arrange
1568+
let view_rename = |i: &Instrument| {
1569+
if i.name == "my_counter" {
1570+
Some(Stream::new().description("my_description_new"))
1571+
} else {
1572+
None
1573+
}
1574+
};
1575+
1576+
let exporter = InMemoryMetricExporter::default();
1577+
let meter_provider = SdkMeterProvider::builder()
1578+
.with_periodic_exporter(exporter.clone())
1579+
.with_view(view_rename)
1580+
.build();
1581+
1582+
// Act
1583+
let meter = meter_provider.meter("test");
1584+
let counter = meter
1585+
.f64_counter("my_counter")
1586+
.with_unit("my_unit")
1587+
.with_description("my_description")
1588+
.build();
1589+
1590+
counter.add(1.5, &[KeyValue::new("key1", "value1")]);
1591+
meter_provider.force_flush().unwrap();
1592+
1593+
// Assert
1594+
let resource_metrics = exporter
1595+
.get_finished_metrics()
1596+
.expect("metrics are expected to be exported.");
1597+
assert!(!resource_metrics.is_empty());
1598+
let metric = &resource_metrics[0].scope_metrics[0].metrics[0];
1599+
assert_eq!(
1600+
metric.name, "my_counter",
1601+
"original name is expected."
1602+
);
1603+
assert_eq!(metric.unit, "my_unit", "Original unit should be retained.");
1604+
assert_eq!(
1605+
metric.description, "my_description_new",
1606+
"description should be updated."
1607+
);
1608+
}
1609+
14661610
fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
14671611
instrument_name: &'static str,
14681612
should_not_emit: bool,

opentelemetry-sdk/src/metrics/pipeline.rs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -267,12 +267,22 @@ where
267267
// The cache will return the same Aggregator instance. Use stream ids to de duplicate.
268268
let mut seen = HashSet::new();
269269
for v in &self.pipeline.views {
270-
let stream = match v.match_inst(&inst) {
270+
let mut stream = match v.match_inst(&inst) {
271271
Some(stream) => stream,
272272
None => continue,
273273
};
274274
matched = true;
275275

276+
if stream.name.is_none() {
277+
stream.name = Some(inst.name.clone());
278+
}
279+
if stream.description.is_none() {
280+
stream.description = Some(inst.description.clone());
281+
}
282+
if stream.unit.is_none() {
283+
stream.unit = Some(inst.unit.clone());
284+
}
285+
276286
let id = self.inst_id(kind, &stream);
277287
if seen.contains(&id) {
278288
continue; // This aggregator has already been added
@@ -300,9 +310,9 @@ where
300310

301311
// Apply implicit default view if no explicit matched.
302312
let mut stream = Stream {
303-
name: inst.name,
304-
description: inst.description,
305-
unit: inst.unit,
313+
name: Some(inst.name),
314+
description: Some(inst.description),
315+
unit: Some(inst.unit),
306316
aggregation: None,
307317
allowed_attribute_keys: None,
308318
cardinality_limit,
@@ -403,16 +413,16 @@ where
403413

404414
otel_debug!(
405415
name : "Metrics.InstrumentCreated",
406-
instrument_name = stream.name.as_ref(),
416+
instrument_name = stream.name.clone().unwrap_or_default().as_ref(),
407417
cardinality_limit = cardinality_limit,
408418
);
409419

410420
self.pipeline.add_sync(
411421
scope.clone(),
412422
InstrumentSync {
413-
name: stream.name,
414-
description: stream.description,
415-
unit: stream.unit,
423+
name: stream.name.unwrap_or_default(),
424+
description: stream.description.unwrap_or_default(),
425+
unit: stream.unit.unwrap_or_default(),
416426
comp_agg: collect,
417427
},
418428
);
@@ -453,10 +463,10 @@ where
453463

454464
fn inst_id(&self, kind: InstrumentKind, stream: &Stream) -> InstrumentId {
455465
InstrumentId {
456-
name: stream.name.clone(),
457-
description: stream.description.clone(),
466+
name: stream.name.clone().unwrap_or_default(),
467+
description: stream.description.clone().unwrap_or_default(),
458468
kind,
459-
unit: stream.unit.clone(),
469+
unit: stream.unit.clone().unwrap_or_default(),
460470
number: Cow::Borrowed(std::any::type_name::<T>()),
461471
}
462472
}

opentelemetry-sdk/src/metrics/view.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ pub fn new_view(criteria: Instrument, mask: Stream) -> MetricResult<Box<dyn View
110110
let contains_wildcard = criteria.name.contains(['*', '?']);
111111

112112
let match_fn: Box<dyn Fn(&Instrument) -> bool + Send + Sync> = if contains_wildcard {
113-
if !mask.name.is_empty() {
113+
if !mask.name.is_none() {
114114
// TODO - The error is getting lost here. Need to return or log.
115115
return Ok(Box::new(empty_view));
116116
}
@@ -144,20 +144,20 @@ pub fn new_view(criteria: Instrument, mask: Stream) -> MetricResult<Box<dyn View
144144
Ok(Box::new(move |i: &Instrument| -> Option<Stream> {
145145
if match_fn(i) {
146146
Some(Stream {
147-
name: if !mask.name.is_empty() {
147+
name: if !mask.name.is_none() {
148148
mask.name.clone()
149149
} else {
150-
i.name.clone()
150+
Some(i.name.clone())
151151
},
152-
description: if !mask.description.is_empty() {
152+
description: if !mask.description.is_none() {
153153
mask.description.clone()
154154
} else {
155-
i.description.clone()
155+
Some(i.description.clone())
156156
},
157-
unit: if !mask.unit.is_empty() {
157+
unit: if !mask.unit.is_none() {
158158
mask.unit.clone()
159159
} else {
160-
i.unit.clone()
160+
Some(i.unit.clone())
161161
},
162162
aggregation: agg.clone(),
163163
allowed_attribute_keys: mask.allowed_attribute_keys.clone(),

0 commit comments

Comments
 (0)