Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions opentelemetry-proto/src/transform/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,8 @@
.iter()
.map(|dp| TonicNumberDataPoint {
attributes: dp.attributes.iter().map(Into::into).collect(),
start_time_unix_nano: to_nanos(dp.start_time),
time_unix_nano: to_nanos(dp.time),
start_time_unix_nano: to_nanos(sum.start_time),
time_unix_nano: to_nanos(sum.time),

Check warning on line 299 in opentelemetry-proto/src/transform/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-proto/src/transform/metrics.rs#L298-L299

Added lines #L298 - L299 were not covered by tests
exemplars: dp.exemplars.iter().map(Into::into).collect(),
flags: TonicDataPointFlags::default() as u32,
value: Some(dp.value.into()),
Expand All @@ -319,8 +319,8 @@
.iter()
.map(|dp| TonicNumberDataPoint {
attributes: dp.attributes.iter().map(Into::into).collect(),
start_time_unix_nano: dp.start_time.map(to_nanos).unwrap_or_default(),
time_unix_nano: to_nanos(dp.time),
start_time_unix_nano: gauge.start_time.map(to_nanos).unwrap_or_default(),
time_unix_nano: to_nanos(gauge.time),

Check warning on line 323 in opentelemetry-proto/src/transform/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-proto/src/transform/metrics.rs#L322-L323

Added lines #L322 - L323 were not covered by tests
exemplars: dp.exemplars.iter().map(Into::into).collect(),
flags: TonicDataPointFlags::default() as u32,
value: Some(dp.value.into()),
Expand Down
22 changes: 8 additions & 14 deletions opentelemetry-sdk/src/metrics/data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ pub struct GaugeDataPoint<T> {
/// Attributes is the set of key value pairs that uniquely identify the
/// time series.
pub attributes: Vec<KeyValue>,
/// The time when the time series was started.
pub start_time: Option<SystemTime>,
/// The time when the time series was recorded.
pub time: SystemTime,
/// The value of this data point.
pub value: T,
/// The sampled [Exemplar]s collected during the time series.
Expand All @@ -73,8 +69,6 @@ impl<T: Copy> Clone for GaugeDataPoint<T> {
fn clone(&self) -> Self {
Self {
attributes: self.attributes.clone(),
start_time: self.start_time,
time: self.time,
value: self.value,
exemplars: self.exemplars.clone(),
}
Expand All @@ -86,6 +80,10 @@ impl<T: Copy> Clone for GaugeDataPoint<T> {
pub struct Gauge<T> {
/// Represents individual aggregated measurements with unique attributes.
pub data_points: Vec<GaugeDataPoint<T>>,
/// The time when the time series was started.
pub start_time: Option<SystemTime>,
/// The time when the time series was recorded.
pub time: SystemTime,
}

impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for Gauge<T> {
Expand All @@ -103,10 +101,6 @@ pub struct SumDataPoint<T> {
/// Attributes is the set of key value pairs that uniquely identify the
/// time series.
pub attributes: Vec<KeyValue>,
/// The time when the time series was started.
pub start_time: SystemTime,
/// The time when the time series was recorded.
pub time: SystemTime,
/// The value of this data point.
pub value: T,
/// The sampled [Exemplar]s collected during the time series.
Expand All @@ -117,8 +111,6 @@ impl<T: Copy> Clone for SumDataPoint<T> {
fn clone(&self) -> Self {
Self {
attributes: self.attributes.clone(),
start_time: self.start_time,
time: self.time,
value: self.value,
exemplars: self.exemplars.clone(),
}
Expand All @@ -130,6 +122,10 @@ impl<T: Copy> Clone for SumDataPoint<T> {
pub struct Sum<T> {
/// Represents individual aggregated measurements with unique attributes.
pub data_points: Vec<SumDataPoint<T>>,
/// The time when the time series was started.
pub start_time: SystemTime,
/// The time when the time series was recorded.
pub time: SystemTime,
/// Describes if the aggregation is reported as the change from the last report
/// time, or the cumulative changes since a fixed start time.
pub temporality: Temporality,
Expand Down Expand Up @@ -366,8 +362,6 @@ mod tests {
fn validate_cloning_data_points() {
let data_type = SumDataPoint {
attributes: vec![KeyValue::new("key", "value")],
start_time: std::time::SystemTime::now(),
time: std::time::SystemTime::now(),
value: 0u32,
exemplars: vec![Exemplar {
filtered_attributes: vec![],
Expand Down
53 changes: 15 additions & 38 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ use std::{marker, sync::Arc};

use opentelemetry::KeyValue;

use crate::metrics::{
data::{Aggregation, Gauge},
Temporality,
};
use crate::metrics::{data::Aggregation, Temporality};

use super::{
exponential_histogram::ExpoHistogram, histogram::Histogram, last_value::LastValue,
Expand Down Expand Up @@ -99,31 +96,15 @@ impl<T: Number> AggregateBuilder<T> {

/// Builds a last-value aggregate function input and output.
pub(crate) fn last_value(&self) -> (impl Measure<T>, impl ComputeAggregation) {
let lv_filter = Arc::new(LastValue::new());
let lv_agg = Arc::clone(&lv_filter);
let lv = Arc::new(LastValue::new());
let agg_lv = Arc::clone(&lv);
let t = self.temporality;

(
self.filter(move |n, a: &[KeyValue]| lv_filter.measure(n, a)),
move |dest: Option<&mut dyn Aggregation>| {
let g = dest.and_then(|d| d.as_mut().downcast_mut::<Gauge<T>>());
let mut new_agg = if g.is_none() {
Some(Gauge {
data_points: vec![],
})
} else {
None
};
let g = g.unwrap_or_else(|| new_agg.as_mut().expect("present if g is none"));

match t {
Some(Temporality::Delta) => {
lv_agg.compute_aggregation_delta(&mut g.data_points)
}
_ => lv_agg.compute_aggregation_cumulative(&mut g.data_points),
}

(g.data_points.len(), new_agg.map(|a| Box::new(a) as Box<_>))
self.filter(move |n, a: &[KeyValue]| lv.measure(n, a)),
move |dest: Option<&mut dyn Aggregation>| match t {
Some(Temporality::Delta) => agg_lv.delta(dest),
_ => agg_lv.cumulative(dest),
},
)
}
Expand Down Expand Up @@ -211,8 +192,8 @@ impl<T: Number> AggregateBuilder<T> {
#[cfg(test)]
mod tests {
use crate::metrics::data::{
ExponentialBucket, ExponentialHistogram, ExponentialHistogramDataPoint, GaugeDataPoint,
Histogram, HistogramDataPoint, Sum, SumDataPoint,
ExponentialBucket, ExponentialHistogram, ExponentialHistogramDataPoint, Gauge,
GaugeDataPoint, Histogram, HistogramDataPoint, Sum, SumDataPoint,
};
use std::{time::SystemTime, vec};

Expand All @@ -224,11 +205,11 @@ mod tests {
let mut a = Gauge {
data_points: vec![GaugeDataPoint {
attributes: vec![KeyValue::new("a", 1)],
start_time: Some(SystemTime::now()),
time: SystemTime::now(),
value: 1u64,
exemplars: vec![],
}],
start_time: Some(SystemTime::now()),
time: SystemTime::now(),
};
let new_attributes = [KeyValue::new("b", 2)];
measure.call(2, &new_attributes[..]);
Expand All @@ -251,19 +232,17 @@ mod tests {
data_points: vec![
SumDataPoint {
attributes: vec![KeyValue::new("a1", 1)],
start_time: SystemTime::now(),
time: SystemTime::now(),
value: 1u64,
exemplars: vec![],
},
SumDataPoint {
attributes: vec![KeyValue::new("a2", 1)],
start_time: SystemTime::now(),
time: SystemTime::now(),
value: 2u64,
exemplars: vec![],
},
],
start_time: SystemTime::now(),
time: SystemTime::now(),
temporality: if temporality == Temporality::Delta {
Temporality::Cumulative
} else {
Expand Down Expand Up @@ -294,19 +273,17 @@ mod tests {
data_points: vec![
SumDataPoint {
attributes: vec![KeyValue::new("a1", 1)],
start_time: SystemTime::now(),
time: SystemTime::now(),
value: 1u64,
exemplars: vec![],
},
SumDataPoint {
attributes: vec![KeyValue::new("a2", 1)],
start_time: SystemTime::now(),
time: SystemTime::now(),
value: 2u64,
exemplars: vec![],
},
],
start_time: SystemTime::now(),
time: SystemTime::now(),
temporality: if temporality == Temporality::Delta {
Temporality::Cumulative
} else {
Expand Down
68 changes: 4 additions & 64 deletions opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1440,15 +1440,14 @@
count = out_fn.call(Some(got.as_mut())).0
}

assert_aggregation_eq::<T>(Box::new(test.want), got, true, test.name);
assert_aggregation_eq::<T>(Box::new(test.want), got, test.name);
assert_eq!(test.want_count, count, "{}", test.name);
}
}

fn assert_aggregation_eq<T: Number + PartialEq>(
a: Box<dyn Aggregation>,
b: Box<dyn Aggregation>,
ignore_timestamp: bool,
test_name: &'static str,
) {
assert_eq!(
Expand All @@ -1467,13 +1466,7 @@
test_name
);
for (a, b) in a.data_points.iter().zip(b.data_points.iter()) {
assert_gauge_data_points_eq(
a,
b,
ignore_timestamp,
"mismatching gauge data points",
test_name,
);
assert_gauge_data_points_eq(a, b, "mismatching gauge data points", test_name);

Check warning on line 1469 in opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs#L1469

Added line #L1469 was not covered by tests
}
} else if let Some(a) = a.as_any().downcast_ref::<data::Sum<T>>() {
let b = b.as_any().downcast_ref::<data::Sum<T>>().unwrap();
Expand All @@ -1494,13 +1487,7 @@
test_name
);
for (a, b) in a.data_points.iter().zip(b.data_points.iter()) {
assert_sum_data_points_eq(
a,
b,
ignore_timestamp,
"mismatching sum data points",
test_name,
);
assert_sum_data_points_eq(a, b, "mismatching sum data points", test_name);

Check warning on line 1490 in opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs#L1490

Added line #L1490 was not covered by tests
}
} else if let Some(a) = a.as_any().downcast_ref::<data::Histogram<T>>() {
let b = b.as_any().downcast_ref::<data::Histogram<T>>().unwrap();
Expand All @@ -1516,13 +1503,7 @@
test_name
);
for (a, b) in a.data_points.iter().zip(b.data_points.iter()) {
assert_hist_data_points_eq(
a,
b,
ignore_timestamp,
"mismatching hist data points",
test_name,
);
assert_hist_data_points_eq(a, b, "mismatching hist data points", test_name);

Check warning on line 1506 in opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs#L1506

Added line #L1506 was not covered by tests
}
} else if let Some(a) = a.as_any().downcast_ref::<data::ExponentialHistogram<T>>() {
let b = b
Expand All @@ -1544,7 +1525,6 @@
assert_exponential_hist_data_points_eq(
a,
b,
ignore_timestamp,
"mismatching hist data points",
test_name,
);
Expand All @@ -1557,7 +1537,6 @@
fn assert_sum_data_points_eq<T: Number>(
a: &data::SumDataPoint<T>,
b: &data::SumDataPoint<T>,
ignore_timestamp: bool,
message: &'static str,
test_name: &'static str,
) {
Expand All @@ -1567,21 +1546,11 @@
test_name, message
);
assert_eq!(a.value, b.value, "{}: {} value", test_name, message);

if !ignore_timestamp {
assert_eq!(
a.start_time, b.start_time,
"{}: {} start time",
test_name, message
);
assert_eq!(a.time, b.time, "{}: {} time", test_name, message);
}
}

fn assert_gauge_data_points_eq<T: Number>(
a: &data::GaugeDataPoint<T>,
b: &data::GaugeDataPoint<T>,
ignore_timestamp: bool,
message: &'static str,
test_name: &'static str,
) {
Expand All @@ -1591,21 +1560,11 @@
test_name, message
);
assert_eq!(a.value, b.value, "{}: {} value", test_name, message);

if !ignore_timestamp {
assert_eq!(
a.start_time, b.start_time,
"{}: {} start time",
test_name, message
);
assert_eq!(a.time, b.time, "{}: {} time", test_name, message);
}
}

fn assert_hist_data_points_eq<T: Number>(
a: &data::HistogramDataPoint<T>,
b: &data::HistogramDataPoint<T>,
ignore_timestamp: bool,
message: &'static str,
test_name: &'static str,
) {
Expand All @@ -1624,21 +1583,11 @@
assert_eq!(a.min, b.min, "{}: {} min", test_name, message);
assert_eq!(a.max, b.max, "{}: {} max", test_name, message);
assert_eq!(a.sum, b.sum, "{}: {} sum", test_name, message);

if !ignore_timestamp {
assert_eq!(
a.start_time, b.start_time,
"{}: {} start time",
test_name, message
);
assert_eq!(a.time, b.time, "{}: {} time", test_name, message);
}
}

fn assert_exponential_hist_data_points_eq<T: Number>(
a: &data::ExponentialHistogramDataPoint<T>,
b: &data::ExponentialHistogramDataPoint<T>,
ignore_timestamp: bool,
message: &'static str,
test_name: &'static str,
) {
Expand Down Expand Up @@ -1669,14 +1618,5 @@
"{}: {} neg",
test_name, message
);

if !ignore_timestamp {
assert_eq!(
a.start_time, b.start_time,
"{}: {} start time",
test_name, message
);
assert_eq!(a.time, b.time, "{}: {} time", test_name, message);
}
}
}
Loading
Loading