Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion opentelemetry-proto/src/transform/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@
.iter()
.map(|dp| TonicNumberDataPoint {
attributes: dp.attributes.iter().map(Into::into).collect(),
start_time_unix_nano: to_nanos(dp.start_time),
start_time_unix_nano: dp.start_time.map(to_nanos).unwrap_or_default(),

Check warning on line 322 in opentelemetry-proto/src/transform/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-proto/src/transform/metrics.rs#L322

Added line #L322 was not covered by tests
time_unix_nano: to_nanos(dp.time),
exemplars: dp.exemplars.iter().map(Into::into).collect(),
flags: TonicDataPointFlags::default() as u32,
Expand Down
76 changes: 52 additions & 24 deletions opentelemetry-sdk/src/metrics/data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,35 +53,42 @@ pub trait Aggregation: fmt::Debug + any::Any + Send + Sync {
fn as_mut(&mut self) -> &mut dyn any::Any;
}

/// A measurement of the current value of an instrument.
#[derive(Debug)]
pub struct Gauge<T> {
/// Represents individual aggregated measurements with unique attributes.
pub data_points: Vec<DataPoint<T>>,
/// DataPoint is a single data point in a time series.
#[derive(Debug, PartialEq)]
pub struct GaugeDataPoint<T> {
/// Attributes is the set of key value pairs that uniquely identify the
/// time series.
pub attributes: Vec<KeyValue>,
/// The time when the time series was started.
pub start_time: Option<SystemTime>,
/// The time when the time series was recorded.
pub time: SystemTime,
/// The value of this data point.
pub value: T,
/// The sampled [Exemplar]s collected during the time series.
pub exemplars: Vec<Exemplar<T>>,
}

impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for Gauge<T> {
fn as_any(&self) -> &dyn any::Any {
self
}
fn as_mut(&mut self) -> &mut dyn any::Any {
self
impl<T: Copy> Clone for GaugeDataPoint<T> {
fn clone(&self) -> Self {
Self {
attributes: self.attributes.clone(),
start_time: self.start_time,
time: self.time,
value: self.value,
exemplars: self.exemplars.clone(),
}
}
}

/// Represents the sum of all measurements of values from an instrument.
/// A measurement of the current value of an instrument.
#[derive(Debug)]
pub struct Sum<T> {
pub struct Gauge<T> {
/// Represents individual aggregated measurements with unique attributes.
pub data_points: Vec<DataPoint<T>>,
/// Describes if the aggregation is reported as the change from the last report
/// time, or the cumulative changes since a fixed start time.
pub temporality: Temporality,
/// Whether this aggregation only increases or decreases.
pub is_monotonic: bool,
pub data_points: Vec<GaugeDataPoint<T>>,
}

impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for Sum<T> {
impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for Gauge<T> {
fn as_any(&self) -> &dyn any::Any {
self
}
Expand All @@ -92,7 +99,7 @@ impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for Sum<T> {

/// DataPoint is a single data point in a time series.
#[derive(Debug, PartialEq)]
pub struct DataPoint<T> {
pub struct SumDataPoint<T> {
/// Attributes is the set of key value pairs that uniquely identify the
/// time series.
pub attributes: Vec<KeyValue>,
Expand All @@ -106,7 +113,7 @@ pub struct DataPoint<T> {
pub exemplars: Vec<Exemplar<T>>,
}

impl<T: Copy> Clone for DataPoint<T> {
impl<T: Copy> Clone for SumDataPoint<T> {
fn clone(&self) -> Self {
Self {
attributes: self.attributes.clone(),
Expand All @@ -118,6 +125,27 @@ impl<T: Copy> Clone for DataPoint<T> {
}
}

/// Represents the sum of all measurements of values from an instrument.
#[derive(Debug)]
pub struct Sum<T> {
/// Represents individual aggregated measurements with unique attributes.
pub data_points: Vec<SumDataPoint<T>>,
/// Describes if the aggregation is reported as the change from the last report
/// time, or the cumulative changes since a fixed start time.
pub temporality: Temporality,
/// Whether this aggregation only increases or decreases.
pub is_monotonic: bool,
}

impl<T: fmt::Debug + Send + Sync + 'static> Aggregation for Sum<T> {
fn as_any(&self) -> &dyn any::Any {
self
}
fn as_mut(&mut self) -> &mut dyn any::Any {
self
}
}

/// Represents the histogram of all measurements of values from an instrument.
#[derive(Debug)]
pub struct Histogram<T> {
Expand Down Expand Up @@ -330,13 +358,13 @@ impl<T: Copy> Clone for Exemplar<T> {
#[cfg(test)]
mod tests {

use super::{DataPoint, Exemplar, ExponentialHistogramDataPoint, HistogramDataPoint};
use super::{Exemplar, ExponentialHistogramDataPoint, HistogramDataPoint, SumDataPoint};

use opentelemetry::KeyValue;

#[test]
fn validate_cloning_data_points() {
let data_type = DataPoint {
let data_type = SumDataPoint {
attributes: vec![KeyValue::new("key", "value")],
start_time: std::time::SystemTime::now(),
time: std::time::SystemTime::now(),
Expand Down
16 changes: 8 additions & 8 deletions opentelemetry-sdk/src/metrics/internal/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ impl<T: Number> AggregateBuilder<T> {
#[cfg(test)]
mod tests {
use crate::metrics::data::{
DataPoint, ExponentialBucket, ExponentialHistogram, ExponentialHistogramDataPoint,
Histogram, HistogramDataPoint, Sum,
ExponentialBucket, ExponentialHistogram, ExponentialHistogramDataPoint, GaugeDataPoint,
Histogram, HistogramDataPoint, Sum, SumDataPoint,
};
use std::{time::SystemTime, vec};

Expand All @@ -222,9 +222,9 @@ mod tests {
fn last_value_aggregation() {
let (measure, agg) = AggregateBuilder::<u64>::new(None, None).last_value();
let mut a = Gauge {
data_points: vec![DataPoint {
data_points: vec![GaugeDataPoint {
attributes: vec![KeyValue::new("a", 1)],
start_time: SystemTime::now(),
start_time: Some(SystemTime::now()),
time: SystemTime::now(),
value: 1u64,
exemplars: vec![],
Expand All @@ -249,14 +249,14 @@ mod tests {
AggregateBuilder::<u64>::new(Some(temporality), None).precomputed_sum(true);
let mut a = Sum {
data_points: vec![
DataPoint {
SumDataPoint {
attributes: vec![KeyValue::new("a1", 1)],
start_time: SystemTime::now(),
time: SystemTime::now(),
value: 1u64,
exemplars: vec![],
},
DataPoint {
SumDataPoint {
attributes: vec![KeyValue::new("a2", 1)],
start_time: SystemTime::now(),
time: SystemTime::now(),
Expand Down Expand Up @@ -292,14 +292,14 @@ mod tests {
let (measure, agg) = AggregateBuilder::<u64>::new(Some(temporality), None).sum(true);
let mut a = Sum {
data_points: vec![
DataPoint {
SumDataPoint {
attributes: vec![KeyValue::new("a1", 1)],
start_time: SystemTime::now(),
time: SystemTime::now(),
value: 1u64,
exemplars: vec![],
},
DataPoint {
SumDataPoint {
attributes: vec![KeyValue::new("a2", 1)],
start_time: SystemTime::now(),
time: SystemTime::now(),
Expand Down
34 changes: 29 additions & 5 deletions opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1467,7 +1467,7 @@
test_name
);
for (a, b) in a.data_points.iter().zip(b.data_points.iter()) {
assert_data_points_eq(
assert_gauge_data_points_eq(

Check warning on line 1470 in opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs#L1470

Added line #L1470 was not covered by tests
a,
b,
ignore_timestamp,
Expand All @@ -1494,7 +1494,7 @@
test_name
);
for (a, b) in a.data_points.iter().zip(b.data_points.iter()) {
assert_data_points_eq(
assert_sum_data_points_eq(

Check warning on line 1497 in opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs#L1497

Added line #L1497 was not covered by tests
a,
b,
ignore_timestamp,
Expand Down Expand Up @@ -1554,9 +1554,33 @@
}
}

fn assert_data_points_eq<T: Number>(
a: &data::DataPoint<T>,
b: &data::DataPoint<T>,
fn assert_sum_data_points_eq<T: Number>(
a: &data::SumDataPoint<T>,
b: &data::SumDataPoint<T>,
ignore_timestamp: bool,
message: &'static str,
test_name: &'static str,
) {
assert_eq!(

Check warning on line 1564 in opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs#L1557-L1564

Added lines #L1557 - L1564 were not covered by tests
a.attributes, b.attributes,
"{}: {} attributes",

Check warning on line 1566 in opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs#L1566

Added line #L1566 was not covered by tests
test_name, message
);
assert_eq!(a.value, b.value, "{}: {} value", test_name, message);

Check warning on line 1569 in opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs#L1569

Added line #L1569 was not covered by tests

if !ignore_timestamp {
assert_eq!(

Check warning on line 1572 in opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs#L1571-L1572

Added lines #L1571 - L1572 were not covered by tests
a.start_time, b.start_time,
"{}: {} start time",

Check warning on line 1574 in opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs#L1574

Added line #L1574 was not covered by tests
test_name, message
);
assert_eq!(a.time, b.time, "{}: {} time", test_name, message);
}
}

Check warning on line 1579 in opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs#L1577-L1579

Added lines #L1577 - L1579 were not covered by tests

fn assert_gauge_data_points_eq<T: Number>(
a: &data::GaugeDataPoint<T>,
b: &data::GaugeDataPoint<T>,

Check warning on line 1583 in opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs#L1581-L1583

Added lines #L1581 - L1583 were not covered by tests
ignore_timestamp: bool,
message: &'static str,
test_name: &'static str,
Expand Down
14 changes: 7 additions & 7 deletions opentelemetry-sdk/src/metrics/internal/last_value.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{mem::replace, ops::DerefMut, sync::Mutex, time::SystemTime};

use crate::metrics::data::DataPoint;
use crate::metrics::data::GaugeDataPoint;
use opentelemetry::KeyValue;

use super::{Aggregator, AtomicTracker, AtomicallyUpdate, Number, ValueMap};
Expand Down Expand Up @@ -56,30 +56,30 @@ impl<T: Number> LastValue<T> {
self.value_map.measure(measurement, attrs);
}

pub(crate) fn compute_aggregation_delta(&self, dest: &mut Vec<DataPoint<T>>) {
pub(crate) fn compute_aggregation_delta(&self, dest: &mut Vec<GaugeDataPoint<T>>) {
let t = SystemTime::now();
let prev_start = self
.start
.lock()
.map(|mut start| replace(start.deref_mut(), t))
.unwrap_or(t);
self.value_map
.collect_and_reset(dest, |attributes, aggr| DataPoint {
.collect_and_reset(dest, |attributes, aggr| GaugeDataPoint {
attributes,
start_time: prev_start,
start_time: Some(prev_start),
time: t,
value: aggr.value.get_value(),
exemplars: vec![],
});
}

pub(crate) fn compute_aggregation_cumulative(&self, dest: &mut Vec<DataPoint<T>>) {
pub(crate) fn compute_aggregation_cumulative(&self, dest: &mut Vec<GaugeDataPoint<T>>) {
let t = SystemTime::now();
let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);
self.value_map
.collect_readonly(dest, |attributes, aggr| DataPoint {
.collect_readonly(dest, |attributes, aggr| GaugeDataPoint {
attributes,
start_time: prev_start,
start_time: Some(prev_start),
time: t,
value: aggr.value.get_value(),
exemplars: vec![],
Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use opentelemetry::KeyValue;

use crate::metrics::data::{self, Aggregation, DataPoint};
use crate::metrics::data::{self, Aggregation, SumDataPoint};
use crate::metrics::Temporality;

use super::{last_value::Assign, AtomicTracker, Number, ValueMap};
Expand Down Expand Up @@ -66,7 +66,7 @@ impl<T: Number> PrecomputedSum<T> {
let value = aggr.value.get_value();
new_reported.insert(attributes.clone(), value);
let delta = value - *reported.get(&attributes).unwrap_or(&T::default());
DataPoint {
SumDataPoint {
attributes,
start_time: prev_start,
time: t,
Expand Down Expand Up @@ -107,7 +107,7 @@ impl<T: Number> PrecomputedSum<T> {
let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);

self.value_map
.collect_readonly(&mut s_data.data_points, |attributes, aggr| DataPoint {
.collect_readonly(&mut s_data.data_points, |attributes, aggr| SumDataPoint {
attributes,
start_time: prev_start,
time: t,
Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-sdk/src/metrics/internal/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::ops::DerefMut;
use std::vec;
use std::{sync::Mutex, time::SystemTime};

use crate::metrics::data::{self, Aggregation, DataPoint};
use crate::metrics::data::{self, Aggregation, SumDataPoint};
use crate::metrics::Temporality;
use opentelemetry::KeyValue;

Expand Down Expand Up @@ -93,7 +93,7 @@ impl<T: Number> Sum<T> {
.map(|mut start| replace(start.deref_mut(), t))
.unwrap_or(t);
self.value_map
.collect_and_reset(&mut s_data.data_points, |attributes, aggr| DataPoint {
.collect_and_reset(&mut s_data.data_points, |attributes, aggr| SumDataPoint {
attributes,
start_time: prev_start,
time: t,
Expand Down Expand Up @@ -130,7 +130,7 @@ impl<T: Number> Sum<T> {
let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);

self.value_map
.collect_readonly(&mut s_data.data_points, |attributes, aggr| DataPoint {
.collect_readonly(&mut s_data.data_points, |attributes, aggr| SumDataPoint {
attributes,
start_time: prev_start,
time: t,
Expand Down
Loading
Loading