Skip to content

Commit ecafada

Browse files
OTLP bytes -> OTAP encoding for Metrics Sum and Histogram data (open-telemetry#1398)
part of open-telemetry#387 Will handle ExpHistogram and Exemplars in a followup PR
1 parent 99deb08 commit ecafada

File tree

10 files changed

+671
-75
lines changed

10 files changed

+671
-75
lines changed

rust/otap-dataflow/crates/otap/src/encoder.rs

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -839,8 +839,8 @@ where
839839
hdp.append_start_time_unix_nano(hdp_view.start_time_unix_nano() as i64);
840840
hdp.append_time_unix_nano(hdp_view.time_unix_nano() as i64);
841841
hdp.append_count(hdp_view.count());
842-
hdp.append_bucket_counts(hdp_view.bucket_counts().copied());
843-
hdp.append_explicit_bounds(hdp_view.explicit_bounds().copied());
842+
hdp.append_bucket_counts(hdp_view.bucket_counts());
843+
hdp.append_explicit_bounds(hdp_view.explicit_bounds());
844844
hdp.append_sum(hdp_view.sum());
845845
hdp.append_flags(hdp_view.flags().into_inner());
846846
hdp.append_min(hdp_view.min());
@@ -976,8 +976,9 @@ mod test {
976976
use arrow::array::{
977977
Array, ArrayRef, ArrowPrimitiveType, BinaryArray, BooleanArray, DictionaryArray,
978978
DurationNanosecondArray, FixedSizeBinaryArray, Float64Array, Int32Array, Int64Array,
979-
LargeListArray, LargeListBuilder, PrimitiveBuilder, RecordBatch, StringArray, StructArray,
980-
StructBuilder, TimestampNanosecondArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
979+
LargeListArray, LargeListBuilder, ListBuilder, PrimitiveBuilder, RecordBatch, StringArray,
980+
StructArray, StructBuilder, TimestampNanosecondArray, UInt8Array, UInt16Array, UInt32Array,
981+
UInt64Array,
981982
};
982983
use arrow::buffer::NullBuffer;
983984
use arrow::datatypes::{
@@ -1836,8 +1837,8 @@ mod test {
18361837
.unwrap();
18371838
let expected_hdp_batch = RecordBatch::try_new(
18381839
Arc::new(Schema::new(vec![
1839-
Field::new("id", DataType::UInt32, false),
1840-
Field::new("parent_id", DataType::UInt16, false),
1840+
Field::new("id", DataType::UInt32, false).with_plain_encoding(),
1841+
Field::new("parent_id", DataType::UInt16, false).with_plain_encoding(),
18411842
Field::new(
18421843
"start_time_unix_nano",
18431844
DataType::Timestamp(TimeUnit::Nanosecond, None),
@@ -1849,12 +1850,12 @@ mod test {
18491850
false,
18501851
),
18511852
Field::new("count", DataType::UInt64, false),
1852-
Field::new_large_list(
1853+
Field::new_list(
18531854
"bucket_counts",
18541855
Field::new_list_field(DataType::UInt64, false),
18551856
false,
18561857
),
1857-
Field::new_large_list(
1858+
Field::new_list(
18581859
"explicit_bounds",
18591860
Field::new_list_field(DataType::Float64, false),
18601861
false,
@@ -2080,11 +2081,7 @@ mod test {
20802081
Field::new("offset", DataType::Int32, false),
20812082
Field::new(
20822083
"bucket_counts",
2083-
DataType::LargeList(Arc::new(Field::new(
2084-
"item",
2085-
DataType::UInt64,
2086-
false,
2087-
))),
2084+
DataType::List(Arc::new(Field::new("item", DataType::UInt64, false))),
20882085
false,
20892086
),
20902087
])),
@@ -2096,11 +2093,7 @@ mod test {
20962093
Field::new("offset", DataType::Int32, false),
20972094
Field::new(
20982095
"bucket_counts",
2099-
DataType::LargeList(Arc::new(Field::new(
2100-
"item",
2101-
DataType::UInt64,
2102-
false,
2103-
))),
2096+
DataType::List(Arc::new(Field::new("item", DataType::UInt64, false))),
21042097
false,
21052098
),
21062099
])),
@@ -2314,16 +2307,16 @@ mod test {
23142307
fn make_bucket(offset_counts: Option<(i32, &[u64])>) -> Arc<dyn Array> {
23152308
let (offset, counts) = offset_counts.unwrap_or((0, &[]));
23162309
let offset = Int32Array::from_value(offset, 1);
2317-
let mut counts_builder: LargeListBuilder<PrimitiveBuilder<UInt64Type>> =
2318-
LargeListBuilder::new(PrimitiveBuilder::new());
2310+
let mut counts_builder: ListBuilder<PrimitiveBuilder<UInt64Type>> =
2311+
ListBuilder::new(PrimitiveBuilder::new());
23192312
counts_builder.append_value(counts.iter().copied().map(Some));
23202313

23212314
Arc::new(StructArray::new(
23222315
Fields::from(vec![
23232316
Field::new("offset", DataType::Int32, false),
23242317
Field::new(
23252318
"bucket_counts",
2326-
DataType::LargeList(Arc::new(Field::new("item", DataType::UInt64, false))),
2319+
DataType::List(Arc::new(Field::new("item", DataType::UInt64, false))),
23272320
false,
23282321
),
23292322
]),
@@ -2339,8 +2332,8 @@ mod test {
23392332
where
23402333
ArrowType: ArrowPrimitiveType,
23412334
{
2342-
let mut builder: LargeListBuilder<PrimitiveBuilder<ArrowType>> =
2343-
LargeListBuilder::new(PrimitiveBuilder::new());
2335+
let mut builder: ListBuilder<PrimitiveBuilder<ArrowType>> =
2336+
ListBuilder::new(PrimitiveBuilder::new());
23442337
builder.append_value(data.iter().copied().map(Some));
23452338
Arc::new(no_nulls(builder.finish()))
23462339
}

rust/otap-dataflow/crates/otap/src/pdata.rs

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -723,7 +723,8 @@ mod test {
723723
common::v1::{AnyValue, InstrumentationScope, KeyValue},
724724
logs::v1::{LogRecord, ResourceLogs, ScopeLogs, SeverityNumber},
725725
metrics::v1::{
726-
Gauge, Metric, NumberDataPoint, ResourceMetrics, ScopeMetrics, metric::Data,
726+
AggregationTemporality, Gauge, Histogram, HistogramDataPoint, Metric,
727+
NumberDataPoint, ResourceMetrics, ScopeMetrics, Sum, metric::Data,
727728
number_data_point::Value,
728729
},
729730
resource::v1::Resource,
@@ -1276,6 +1277,90 @@ mod test {
12761277
KeyValue::new("met_attr2", AnyValue::new_string("met_val2")),
12771278
],
12781279
},
1280+
Metric {
1281+
name: "metric3".into(),
1282+
description: "metric3 desc".into(),
1283+
unit: "m3 unit".into(),
1284+
metadata: vec![KeyValue::new(
1285+
"met_attr2",
1286+
AnyValue::new_string("met_val1"),
1287+
)],
1288+
data: Some(Data::Sum(Sum {
1289+
aggregation_temporality: AggregationTemporality::Cumulative as i32,
1290+
is_monotonic: true,
1291+
data_points: vec![
1292+
NumberDataPoint {
1293+
attributes: vec![
1294+
KeyValue::new("attr2", AnyValue::new_string("val1")),
1295+
KeyValue::new("attr4", AnyValue::new_string("val1")),
1296+
],
1297+
start_time_unix_nano: 16,
1298+
time_unix_nano: 18,
1299+
exemplars: vec![],
1300+
flags: 19,
1301+
value: Some(Value::AsInt(14)),
1302+
},
1303+
NumberDataPoint {
1304+
attributes: vec![KeyValue::new(
1305+
"attr",
1306+
AnyValue::new_string("val1"),
1307+
)],
1308+
start_time_unix_nano: 17,
1309+
time_unix_nano: 18,
1310+
exemplars: vec![],
1311+
flags: 0,
1312+
value: Some(Value::AsInt(14)),
1313+
},
1314+
],
1315+
})),
1316+
},
1317+
Metric {
1318+
name: "metric4".into(),
1319+
description: "metric4 desc".into(),
1320+
unit: "m4 unit".into(),
1321+
metadata: vec![
1322+
KeyValue::new("met_attr1", AnyValue::new_string("met_val2")),
1323+
KeyValue::new("met_attr2", AnyValue::new_string("met_val2")),
1324+
KeyValue::new("met_attr3", AnyValue::new_string("met_val1")),
1325+
],
1326+
data: Some(Data::Histogram(Histogram {
1327+
aggregation_temporality: AggregationTemporality::Delta as i32,
1328+
data_points: vec![
1329+
HistogramDataPoint {
1330+
time_unix_nano: 1,
1331+
start_time_unix_nano: 2,
1332+
attributes: vec![
1333+
KeyValue::new("attr1", AnyValue::new_string("val1")),
1334+
KeyValue::new("attr2", AnyValue::new_string("val2")),
1335+
],
1336+
count: 3,
1337+
sum: Some(4.0),
1338+
bucket_counts: vec![1, 2],
1339+
exemplars: vec![],
1340+
explicit_bounds: vec![3.0, 4.0, 5.0],
1341+
flags: 6,
1342+
min: Some(7.0),
1343+
max: Some(8.0),
1344+
},
1345+
HistogramDataPoint {
1346+
time_unix_nano: 3,
1347+
start_time_unix_nano: 4,
1348+
attributes: vec![KeyValue::new(
1349+
"attr1",
1350+
AnyValue::new_string("val1"),
1351+
)],
1352+
count: 2,
1353+
sum: Some(5.0),
1354+
bucket_counts: vec![6, 7, 8],
1355+
exemplars: vec![], // TODO
1356+
explicit_bounds: vec![9.0, 10.0],
1357+
flags: 16,
1358+
min: Some(17.0),
1359+
max: Some(18.0),
1360+
},
1361+
],
1362+
})),
1363+
},
12791364
],
12801365
}],
12811366
}]);

rust/otap-dataflow/crates/pdata/src/encode/record/metrics.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use std::sync::Arc;
77

88
use arrow::{
99
array::{
10-
Array, LargeListArray, LargeListBuilder, PrimitiveBuilder, RecordBatch, StructArray,
11-
StructBuilder,
10+
Array, LargeListArray, LargeListBuilder, ListBuilder, PrimitiveBuilder, RecordBatch,
11+
StructArray, StructBuilder,
1212
},
1313
datatypes::{DataType, Field, Fields, Float64Type, Schema, UInt64Type},
1414
error::ArrowError,
@@ -852,8 +852,8 @@ pub struct HistogramDataPointsRecordBatchBuilder {
852852
start_time_unix_nano: TimestampNanosecondArrayBuilder,
853853
time_unix_nano: TimestampNanosecondArrayBuilder,
854854
count: UInt64ArrayBuilder,
855-
bucket_counts: LargeListBuilder<PrimitiveBuilder<UInt64Type>>,
856-
explicit_bounds: LargeListBuilder<PrimitiveBuilder<Float64Type>>,
855+
bucket_counts: ListBuilder<PrimitiveBuilder<UInt64Type>>,
856+
explicit_bounds: ListBuilder<PrimitiveBuilder<Float64Type>>,
857857
sum: Float64ArrayBuilder,
858858
flags: UInt32ArrayBuilder,
859859
min: Float64ArrayBuilder,
@@ -890,8 +890,8 @@ impl HistogramDataPointsRecordBatchBuilder {
890890
dictionary_options: None,
891891
..Default::default()
892892
}),
893-
bucket_counts: LargeListBuilder::new(PrimitiveBuilder::new()),
894-
explicit_bounds: LargeListBuilder::new(PrimitiveBuilder::new()),
893+
bucket_counts: ListBuilder::new(PrimitiveBuilder::new()),
894+
explicit_bounds: ListBuilder::new(PrimitiveBuilder::new()),
895895
sum: Float64ArrayBuilder::new(ArrayOptions {
896896
optional: true,
897897
dictionary_options: None,
@@ -985,7 +985,9 @@ impl HistogramDataPointsRecordBatchBuilder {
985985
let mut columns = Vec::with_capacity(11);
986986

987987
if let Some(array) = self.id.finish() {
988-
fields.push(Field::new(consts::ID, array.data_type().clone(), false));
988+
fields.push(
989+
Field::new(consts::ID, array.data_type().clone(), false).with_plain_encoding(),
990+
);
989991
columns.push(array);
990992
}
991993

@@ -995,11 +997,9 @@ impl HistogramDataPointsRecordBatchBuilder {
995997
.parent_id
996998
.finish()
997999
.expect("finish returns `Some(array)`");
998-
fields.push(Field::new(
999-
consts::PARENT_ID,
1000-
array.data_type().clone(),
1001-
false,
1002-
));
1000+
fields.push(
1001+
Field::new(consts::PARENT_ID, array.data_type().clone(), false).with_plain_encoding(),
1002+
);
10031003
columns.push(array);
10041004

10051005
if let Some(array) = self.start_time_unix_nano.finish() {
@@ -1031,7 +1031,7 @@ impl HistogramDataPointsRecordBatchBuilder {
10311031

10321032
let array = no_nulls(self.bucket_counts.finish());
10331033
if !array.is_empty() {
1034-
fields.push(Field::new_large_list(
1034+
fields.push(Field::new_list(
10351035
consts::HISTOGRAM_BUCKET_COUNTS,
10361036
Field::new_list_field(DataType::UInt64, false),
10371037
false,
@@ -1041,7 +1041,7 @@ impl HistogramDataPointsRecordBatchBuilder {
10411041

10421042
let array = no_nulls(self.explicit_bounds.finish());
10431043
if !array.is_empty() {
1044-
fields.push(Field::new_large_list(
1044+
fields.push(Field::new_list(
10451045
consts::HISTOGRAM_EXPLICIT_BOUNDS,
10461046
Field::new("item", DataType::Float64, false),
10471047
false,
@@ -1382,7 +1382,7 @@ impl ExponentialHistogramDataPointsRecordBatchBuilder {
13821382
/// offset of zero and an empty counts slice.
13831383
pub struct BucketsRecordBatchBuilder {
13841384
offset: Int32ArrayBuilder,
1385-
bucket_counts: LargeListBuilder<PrimitiveBuilder<UInt64Type>>,
1385+
bucket_counts: ListBuilder<PrimitiveBuilder<UInt64Type>>,
13861386
}
13871387

13881388
impl BucketsRecordBatchBuilder {
@@ -1395,7 +1395,7 @@ impl BucketsRecordBatchBuilder {
13951395
dictionary_options: None,
13961396
default_values_optional: false,
13971397
}),
1398-
bucket_counts: LargeListBuilder::new(PrimitiveBuilder::new()),
1398+
bucket_counts: ListBuilder::new(PrimitiveBuilder::new()),
13991399
}
14001400
}
14011401

rust/otap-dataflow/crates/pdata/src/otlp/metrics/data_points/exp_histogram.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ pub struct PositiveNegativeArrayAccess<'a> {
100100
impl<'a> PositiveNegativeArrayAccess<'a> {
101101
fn bucket_counts_data_type() -> DataType {
102102
DataType::List(
103-
FieldRef::new(Field::new("", DataType::UInt64, true)), //todo: find the inner name here
103+
FieldRef::new(Field::new("item", DataType::UInt64, true)), //todo: find the inner name here
104104
)
105105
}
106106

rust/otap-dataflow/crates/pdata/src/otlp/metrics/data_points/histogram.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -94,17 +94,17 @@ where
9494
{
9595
#[allow(missing_docs)]
9696
pub fn try_new(list: &'a ArrayRef) -> Result<Self> {
97-
let list = list.as_any().downcast_ref::<ListArray>().ok_or_else(|| {
98-
Error::InvalidListArray {
99-
//todo: maybe set the field name here.
100-
expect_oneof: vec![DataType::List(FieldRef::new(Field::new(
101-
"",
102-
T::DATA_TYPE,
103-
true,
104-
)))],
105-
actual: list.data_type().clone(),
106-
}
107-
})?;
97+
let list =
98+
list.as_any()
99+
.downcast_ref::<ListArray>()
100+
.ok_or_else(|| Error::InvalidListArray {
101+
expect_oneof: vec![DataType::List(FieldRef::new(Field::new(
102+
"item",
103+
T::DATA_TYPE,
104+
true,
105+
)))],
106+
actual: list.data_type().clone(),
107+
})?;
108108
Self::try_new_from_list(list)
109109
}
110110

rust/otap-dataflow/crates/pdata/src/schema.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
// TODO write documentation for this crate
77
#![allow(missing_docs)]
88

9-
use arrow::array::{LargeListArray, RecordBatch};
9+
use arrow::array::{ListArray, RecordBatch};
1010
use arrow::datatypes::{DataType, Field, Fields, Schema};
1111
use std::sync::Arc;
1212

@@ -104,18 +104,18 @@ pub fn get_field_metadata<'a>(
104104
field_metadata.get(key).map(|s| s.as_str())
105105
}
106106

107-
/// Make a `LargeListArray` into an array whose item field is not nullable.
107+
/// Make a `ListArray` into an array whose item field is not nullable.
108108
///
109109
/// When you use `GenericListBuilder`, you'll get a list array where list elements are
110-
/// nullable. This is often not what we want, so this little function converts `LargeListArray`s
110+
/// nullable. This is often not what we want, so this little function converts `ListArray`s
111111
/// that don't have any nulls into an equivalent form whose item field type is not nullable. This
112112
/// function panics if the input contains any nulls at all.
113113
#[must_use]
114-
pub fn no_nulls(values: LargeListArray) -> LargeListArray {
114+
pub fn no_nulls(values: ListArray) -> ListArray {
115115
let (mut field, offsets, values, nulls) = values.into_parts();
116116
assert_eq!(0, nulls.map(|n| n.null_count()).unwrap_or(0));
117117
Arc::make_mut(&mut field).set_nullable(false);
118-
LargeListArray::new(field, offsets, values, None)
118+
ListArray::new(field, offsets, values, None)
119119
}
120120

121121
/// Checks the Arrow schema field metadata to determine if the "id" field in this record batch is

0 commit comments

Comments
 (0)