Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions opentelemetry-proto/src/transform/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@
.iter()
.map(|dp| TonicHistogramDataPoint {
attributes: dp.attributes.iter().map(Into::into).collect(),
start_time_unix_nano: to_nanos(dp.start_time),
time_unix_nano: to_nanos(dp.time),
start_time_unix_nano: to_nanos(hist.start_time),
time_unix_nano: to_nanos(hist.time),

Check warning on line 234 in opentelemetry-proto/src/transform/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-proto/src/transform/metrics.rs#L233-L234

Added lines #L233 - L234 were not covered by tests
count: dp.count,
sum: Some(dp.sum.into_f64()),
bucket_counts: dp.bucket_counts.clone(),
Expand All @@ -258,8 +258,8 @@
.iter()
.map(|dp| TonicExponentialHistogramDataPoint {
attributes: dp.attributes.iter().map(Into::into).collect(),
start_time_unix_nano: to_nanos(dp.start_time),
time_unix_nano: to_nanos(dp.time),
start_time_unix_nano: to_nanos(hist.start_time),
time_unix_nano: to_nanos(hist.time),

Check warning on line 262 in opentelemetry-proto/src/transform/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-proto/src/transform/metrics.rs#L261-L262

Added lines #L261 - L262 were not covered by tests
count: dp.count as u64,
sum: Some(dp.sum.into_f64()),
scale: dp.scale.into(),
Expand Down
26 changes: 8 additions & 18 deletions opentelemetry-sdk/src/metrics/data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for Sum<T> {
pub struct Histogram<T> {
/// Individual aggregated measurements with unique attributes.
pub data_points: Vec<HistogramDataPoint<T>>,
/// The time when the time series was started.
pub start_time: SystemTime,
/// The time when the time series was recorded.
pub time: SystemTime,
/// Describes if the aggregation is reported as the change from the last report
/// time, or the cumulative changes since a fixed start time.
pub temporality: Temporality,
Expand All @@ -166,11 +170,6 @@ impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for Histogram<T> {
pub struct HistogramDataPoint<T> {
/// The set of key value pairs that uniquely identify the time series.
pub attributes: Vec<KeyValue>,
/// The time when the time series was started.
pub start_time: SystemTime,
/// The time when the time series was recorded.
pub time: SystemTime,

/// The number of updates this histogram has been calculated with.
pub count: u64,
/// The upper bounds of the buckets of the histogram.
Expand All @@ -195,8 +194,6 @@ impl<T: Copy> Clone for HistogramDataPoint<T> {
fn clone(&self) -> Self {
Self {
attributes: self.attributes.clone(),
start_time: self.start_time,
time: self.time,
count: self.count,
bounds: self.bounds.clone(),
bucket_counts: self.bucket_counts.clone(),
Expand All @@ -213,7 +210,10 @@ impl<T: Copy> Clone for HistogramDataPoint<T> {
pub struct ExponentialHistogram<T> {
/// The individual aggregated measurements with unique attributes.
pub data_points: Vec<ExponentialHistogramDataPoint<T>>,

/// When the time series was started.
pub start_time: SystemTime,
/// The time when the time series was recorded.
pub time: SystemTime,
/// Describes if the aggregation is reported as the change from the last report
/// time, or the cumulative changes since a fixed start time.
pub temporality: Temporality,
Expand All @@ -233,10 +233,6 @@ impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for ExponentialHistogram
pub struct ExponentialHistogramDataPoint<T> {
/// The set of key value pairs that uniquely identify the time series.
pub attributes: Vec<KeyValue>,
/// When the time series was started.
pub start_time: SystemTime,
/// The time when the time series was recorded.
pub time: SystemTime,

/// The number of updates this histogram has been calculated with.
pub count: usize,
Expand Down Expand Up @@ -281,8 +277,6 @@ impl<T: Copy> Clone for ExponentialHistogramDataPoint<T> {
fn clone(&self) -> Self {
Self {
attributes: self.attributes.clone(),
start_time: self.start_time,
time: self.time,
count: self.count,
min: self.min,
max: self.max,
Expand Down Expand Up @@ -375,8 +369,6 @@ mod tests {

let histogram_data_point = HistogramDataPoint {
attributes: vec![KeyValue::new("key", "value")],
start_time: std::time::SystemTime::now(),
time: std::time::SystemTime::now(),
count: 0,
bounds: vec![],
bucket_counts: vec![],
Expand All @@ -395,8 +387,6 @@ mod tests {

let exponential_histogram_data_point = ExponentialHistogramDataPoint {
attributes: vec![KeyValue::new("key", "value")],
start_time: std::time::SystemTime::now(),
time: std::time::SystemTime::now(),
count: 0,
min: None,
max: None,
Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,6 @@ mod tests {
let mut a = Histogram {
data_points: vec![HistogramDataPoint {
attributes: vec![KeyValue::new("a1", 1)],
start_time: SystemTime::now(),
time: SystemTime::now(),
count: 2,
bounds: vec![1.0, 2.0],
bucket_counts: vec![0, 1, 1],
Expand All @@ -324,6 +322,8 @@ mod tests {
sum: 3u64,
exemplars: vec![],
}],
start_time: SystemTime::now(),
time: SystemTime::now(),
temporality: if temporality == Temporality::Delta {
Temporality::Cumulative
} else {
Expand Down Expand Up @@ -357,8 +357,6 @@ mod tests {
let mut a = ExponentialHistogram {
data_points: vec![ExponentialHistogramDataPoint {
attributes: vec![KeyValue::new("a1", 1)],
start_time: SystemTime::now(),
time: SystemTime::now(),
count: 2,
min: None,
max: None,
Expand All @@ -376,6 +374,8 @@ mod tests {
zero_threshold: 1.0,
exemplars: vec![],
}],
start_time: SystemTime::now(),
time: SystemTime::now(),
temporality: if temporality == Temporality::Delta {
Temporality::Cumulative
} else {
Expand Down
56 changes: 30 additions & 26 deletions opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,33 +389,34 @@
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let t = SystemTime::now();
let time = SystemTime::now();
let start_time = self
.start
.lock()
.map(|mut start| replace(start.deref_mut(), time))
.unwrap_or(time);

let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::ExponentialHistogram<T>>());
let mut new_agg = if h.is_none() {
Some(data::ExponentialHistogram {
data_points: vec![],
start_time,
time,

Check warning on line 404 in opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs#L403-L404

Added lines #L403 - L404 were not covered by tests
temporality: Temporality::Delta,
})
} else {
None
};
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
h.temporality = Temporality::Delta;

let prev_start = self
.start
.lock()
.map(|mut start| replace(start.deref_mut(), t))
.unwrap_or(t);
h.start_time = start_time;
h.time = time;

self.value_map
.collect_and_reset(&mut h.data_points, |attributes, attr| {
let b = attr.into_inner().unwrap_or_else(|err| err.into_inner());
data::ExponentialHistogramDataPoint {
attributes,
start_time: prev_start,
time: t,
count: b.count,
min: if self.record_min_max {
Some(b.min)
Expand Down Expand Up @@ -450,33 +451,34 @@
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let t = SystemTime::now();
let time = SystemTime::now();
let start_time = self
.start
.lock()
.map(|s| *s)
.unwrap_or_else(|_| SystemTime::now());

let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::ExponentialHistogram<T>>());
let mut new_agg = if h.is_none() {
Some(data::ExponentialHistogram {
data_points: vec![],
start_time,
time,

Check warning on line 466 in opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs#L465-L466

Added lines #L465 - L466 were not covered by tests
temporality: Temporality::Cumulative,
})
} else {
None
};
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
h.temporality = Temporality::Cumulative;

let prev_start = self
.start
.lock()
.map(|s| *s)
.unwrap_or_else(|_| SystemTime::now());
h.start_time = start_time;
h.time = time;

self.value_map
.collect_readonly(&mut h.data_points, |attributes, attr| {
let b = attr.lock().unwrap_or_else(|err| err.into_inner());
data::ExponentialHistogramDataPoint {
attributes,
start_time: prev_start,
time: t,
count: b.count,
min: if self.record_min_max {
Some(b.min)
Expand Down Expand Up @@ -1270,8 +1272,6 @@
min: Some(1.into()),
max: Some(16.into()),
sum: 31.into(),
start_time: SystemTime::now(),
time: SystemTime::now(),
scale: -1,
positive_bucket: data::ExponentialBucket {
offset: -1,
Expand All @@ -1285,6 +1285,8 @@
zero_threshold: 0.0,
zero_count: 0,
}],
start_time: SystemTime::now(),
time: SystemTime::now(),
},
want_count: 1,
},
Expand Down Expand Up @@ -1318,8 +1320,6 @@
offset: -1,
counts: vec![1, 4, 1],
},
start_time: SystemTime::now(),
time: SystemTime::now(),
negative_bucket: data::ExponentialBucket {
offset: 0,
counts: vec![],
Expand All @@ -1328,6 +1328,8 @@
zero_threshold: 0.0,
zero_count: 0,
}],
start_time: SystemTime::now(),
time: SystemTime::now(),
},
want_count: 1,
},
Expand Down Expand Up @@ -1364,8 +1366,6 @@
offset: -1,
counts: vec![1, 4, 1],
},
start_time: SystemTime::now(),
time: SystemTime::now(),
negative_bucket: data::ExponentialBucket {
offset: 0,
counts: vec![],
Expand All @@ -1374,6 +1374,8 @@
zero_threshold: 0.0,
zero_count: 0,
}],
start_time: SystemTime::now(),
time: SystemTime::now(),
},
want_count: 1,
},
Expand Down Expand Up @@ -1410,8 +1412,6 @@
counts: vec![1, 6, 2],
},
attributes: vec![],
start_time: SystemTime::now(),
time: SystemTime::now(),
negative_bucket: data::ExponentialBucket {
offset: 0,
counts: vec![],
Expand All @@ -1420,6 +1420,8 @@
zero_threshold: 0.0,
zero_count: 0,
}],
start_time: SystemTime::now(),
time: SystemTime::now(),
},
want_count: 1,
},
Expand All @@ -1430,6 +1432,8 @@

let mut got: Box<dyn data::Aggregation> = Box::new(data::ExponentialHistogram::<T> {
data_points: vec![],
start_time: SystemTime::now(),
time: SystemTime::now(),
temporality: Temporality::Delta,
});
let mut count = 0;
Expand Down
39 changes: 21 additions & 18 deletions opentelemetry-sdk/src/metrics/internal/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,32 +109,33 @@ impl<T: Number> Histogram<T> {
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let t = SystemTime::now();
let time = SystemTime::now();
let start_time = self
.start
.lock()
.map(|mut start| replace(start.deref_mut(), time))
.unwrap_or(time);
let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
let mut new_agg = if h.is_none() {
Some(data::Histogram {
data_points: vec![],
start_time,
time,
temporality: Temporality::Delta,
})
} else {
None
};
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
h.temporality = Temporality::Delta;

let prev_start = self
.start
.lock()
.map(|mut start| replace(start.deref_mut(), t))
.unwrap_or(t);
h.start_time = start_time;
h.time = time;

self.value_map
.collect_and_reset(&mut h.data_points, |attributes, aggr| {
let b = aggr.into_inner().unwrap_or_else(|err| err.into_inner());
HistogramDataPoint {
attributes,
start_time: prev_start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
bucket_counts: b.counts,
Expand Down Expand Up @@ -164,32 +165,34 @@ impl<T: Number> Histogram<T> {
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let t = SystemTime::now();
let time = SystemTime::now();
let start_time = self
.start
.lock()
.map(|s| *s)
.unwrap_or_else(|_| SystemTime::now());

let h = dest.and_then(|d| d.as_mut().downcast_mut::<data::Histogram<T>>());
let mut new_agg = if h.is_none() {
Some(data::Histogram {
data_points: vec![],
start_time,
time,
temporality: Temporality::Cumulative,
})
} else {
None
};
let h = h.unwrap_or_else(|| new_agg.as_mut().expect("present if h is none"));
h.temporality = Temporality::Cumulative;

let prev_start = self
.start
.lock()
.map(|s| *s)
.unwrap_or_else(|_| SystemTime::now());
h.start_time = start_time;
h.time = time;

self.value_map
.collect_readonly(&mut h.data_points, |attributes, aggr| {
let b = aggr.lock().unwrap_or_else(|err| err.into_inner());
HistogramDataPoint {
attributes,
start_time: prev_start,
time: t,
count: b.count,
bounds: self.bounds.clone(),
bucket_counts: b.counts.clone(),
Expand Down
Loading
Loading