Skip to content

Commit 7ecfb2d

Browse files
Support OTLP bytes -> OTAP for summary data points (open-telemetry#1408)
part of open-telemetry#387
1 parent 183c1a7 commit 7ecfb2d

File tree

5 files changed

+419
-28
lines changed

5 files changed

+419
-28
lines changed

rust/otap-dataflow/crates/otap/src/encoder.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -976,9 +976,8 @@ mod test {
976976
use arrow::array::{
977977
Array, ArrayRef, ArrowPrimitiveType, BinaryArray, BooleanArray, DictionaryArray,
978978
DurationNanosecondArray, FixedSizeBinaryArray, Float64Array, Int32Array, Int64Array,
979-
LargeListArray, LargeListBuilder, ListBuilder, PrimitiveBuilder, RecordBatch, StringArray,
980-
StructArray, StructBuilder, TimestampNanosecondArray, UInt8Array, UInt16Array, UInt32Array,
981-
UInt64Array,
979+
ListArray, ListBuilder, PrimitiveBuilder, RecordBatch, StringArray, StructArray,
980+
StructBuilder, TimestampNanosecondArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
982981
};
983982
use arrow::buffer::NullBuffer;
984983
use arrow::datatypes::{
@@ -1734,8 +1733,8 @@ mod test {
17341733
let sdp = otap_batch.get(ArrowPayloadType::SummaryDataPoints).unwrap();
17351734
let expected_sdp_batch = RecordBatch::try_new(
17361735
Arc::new(Schema::new(vec![
1737-
Field::new("id", DataType::UInt32, false),
1738-
Field::new("parent_id", DataType::UInt16, false),
1736+
Field::new("id", DataType::UInt32, false).with_plain_encoding(),
1737+
Field::new("parent_id", DataType::UInt16, false).with_plain_encoding(),
17391738
Field::new(
17401739
"start_time_unix_nano",
17411740
DataType::Timestamp(TimeUnit::Nanosecond, None),
@@ -1750,7 +1749,7 @@ mod test {
17501749
Field::new("sum", DataType::Float64, false),
17511750
Field::new(
17521751
"quantile",
1753-
DataType::LargeList(Arc::new(Field::new(
1752+
DataType::List(Arc::new(Field::new(
17541753
"item",
17551754
DataType::Struct(Fields::from(vec![
17561755
Field::new("quantile", DataType::Float64, false),
@@ -2340,8 +2339,8 @@ mod test {
23402339

23412340
/// A tiny helper function to deal with the messiness of ListOf(StructOf()) construction; it
23422341
/// generates a single list.
2343-
fn make_quantile_value_list(quantiles: &[f64], values: &[f64]) -> LargeListArray {
2344-
let mut lists = LargeListBuilder::new(StructBuilder::from_fields(
2342+
fn make_quantile_value_list(quantiles: &[f64], values: &[f64]) -> ListArray {
2343+
let mut lists = ListBuilder::new(StructBuilder::from_fields(
23452344
vec![
23462345
Field::new(consts::SUMMARY_QUANTILE, DataType::Float64, false),
23472346
Field::new(consts::SUMMARY_VALUE, DataType::Float64, false),

rust/otap-dataflow/crates/otap/src/pdata.rs

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -725,8 +725,9 @@ mod test {
725725
metrics::v1::{
726726
AggregationTemporality, ExponentialHistogram, ExponentialHistogramDataPoint, Gauge,
727727
Histogram, HistogramDataPoint, Metric, NumberDataPoint, ResourceMetrics,
728-
ScopeMetrics, Sum, exponential_histogram_data_point::Buckets, metric::Data,
729-
number_data_point::Value,
728+
ScopeMetrics, Sum, Summary, SummaryDataPoint,
729+
exponential_histogram_data_point::Buckets, metric::Data, number_data_point::Value,
730+
summary_data_point::ValueAtQuantile,
730731
},
731732
resource::v1::Resource,
732733
trace::v1::{
@@ -1417,6 +1418,69 @@ mod test {
14171418
],
14181419
})),
14191420
},
1421+
Metric {
1422+
name: "metric6".into(),
1423+
description: "metric 6 desc".into(),
1424+
unit: "metric6 unit".into(),
1425+
metadata: vec![
1426+
KeyValue::new("attr1", AnyValue::new_string("val99")),
1427+
KeyValue::new("attr2", AnyValue::new_string("val007")),
1428+
],
1429+
data: Some(Data::Summary(Summary {
1430+
data_points: vec![
1431+
SummaryDataPoint {
1432+
count: 1,
1433+
sum: 2.0,
1434+
attributes: vec![
1435+
KeyValue::new("dp_attr1", AnyValue::new_string("val99")),
1436+
KeyValue::new("dp_attr2", AnyValue::new_string("val007")),
1437+
],
1438+
start_time_unix_nano: 8383,
1439+
time_unix_nano: 9873,
1440+
quantile_values: vec![
1441+
ValueAtQuantile {
1442+
quantile: 1.0,
1443+
value: 2.0,
1444+
},
1445+
ValueAtQuantile {
1446+
quantile: 8.0,
1447+
value: 4.0,
1448+
},
1449+
ValueAtQuantile {
1450+
quantile: 9.0,
1451+
value: 5.0,
1452+
},
1453+
],
1454+
flags: 256,
1455+
},
1456+
SummaryDataPoint {
1457+
count: 11,
1458+
sum: 21.0,
1459+
attributes: vec![KeyValue::new(
1460+
"dp_attr11",
1461+
AnyValue::new_string("val99"),
1462+
)],
1463+
start_time_unix_nano: 333,
1464+
time_unix_nano: 444,
1465+
quantile_values: vec![
1466+
ValueAtQuantile {
1467+
quantile: 11.0,
1468+
value: 20.0,
1469+
},
1470+
ValueAtQuantile {
1471+
quantile: 81.0,
1472+
value: 40.0,
1473+
},
1474+
ValueAtQuantile {
1475+
quantile: 91.0,
1476+
value: 59.0,
1477+
},
1478+
],
1479+
flags: 200,
1480+
},
1481+
],
1482+
})),
1483+
},
14201484
],
14211485
}],
14221486
}]);

rust/otap-dataflow/crates/pdata/src/encode/record/metrics.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@ use std::sync::Arc;
77

88
use arrow::{
99
array::{
10-
Array, LargeListArray, LargeListBuilder, ListBuilder, PrimitiveBuilder, RecordBatch,
11-
StructArray, StructBuilder,
10+
Array, ListArray, ListBuilder, PrimitiveBuilder, RecordBatch, StructArray, StructBuilder,
1211
},
1312
datatypes::{DataType, Field, Fields, Float64Type, Schema, UInt64Type},
1413
error::ArrowError,
@@ -609,15 +608,15 @@ impl ExemplarsRecordBatchBuilder {
609608
///
610609
/// Ultimately, what we want is a ListOf(StructOf(quantile, value)).
611610
pub struct QuantileRecordBatchBuilder {
612-
lists: LargeListBuilder<StructBuilder>,
611+
lists: ListBuilder<StructBuilder>,
613612
}
614613

615614
impl QuantileRecordBatchBuilder {
616615
/// Create a new instance of `Quantile`
617616
#[must_use]
618617
pub fn new() -> Self {
619618
Self {
620-
lists: LargeListBuilder::new(StructBuilder::from_fields(
619+
lists: ListBuilder::new(StructBuilder::from_fields(
621620
vec![
622621
Field::new(consts::SUMMARY_QUANTILE, DataType::Float64, false),
623622
Field::new(consts::SUMMARY_VALUE, DataType::Float64, false),
@@ -669,7 +668,7 @@ impl QuantileRecordBatchBuilder {
669668
}
670669

671670
/// Construct an OTAP Quantile `StructArray` from the builders.
672-
pub fn finish(&mut self) -> Option<LargeListArray> {
671+
pub fn finish(&mut self) -> Option<ListArray> {
673672
let array = self.lists.finish();
674673
(!array.is_empty()).then_some(array)
675674
}
@@ -774,7 +773,9 @@ impl SummaryDataPointsRecordBatchBuilder {
774773
let mut columns = Vec::with_capacity(8);
775774

776775
if let Some(array) = self.id.finish() {
777-
fields.push(Field::new(consts::ID, array.data_type().clone(), false));
776+
fields.push(
777+
Field::new(consts::ID, array.data_type().clone(), false).with_plain_encoding(),
778+
);
778779
columns.push(array);
779780
}
780781

@@ -784,11 +785,9 @@ impl SummaryDataPointsRecordBatchBuilder {
784785
.parent_id
785786
.finish()
786787
.expect("finish returns `Some(array)`");
787-
fields.push(Field::new(
788-
consts::PARENT_ID,
789-
array.data_type().clone(),
790-
false,
791-
));
788+
fields.push(
789+
Field::new(consts::PARENT_ID, array.data_type().clone(), false).with_plain_encoding(),
790+
);
792791
columns.push(array);
793792

794793
if let Some(array) = self.start_time_unix_nano.finish() {

rust/otap-dataflow/crates/pdata/src/proto/consts.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ pub mod field_num {
145145
pub const SUMMARY_DP_COUNT: u64 = 4;
146146
pub const SUMMARY_DP_SUM: u64 = 5;
147147
pub const SUMMARY_DP_QUANTILE_VALUES: u64 = 6;
148-
pub const SUMMARY_DP_FLAGS: u64 = 6;
148+
pub const SUMMARY_DP_FLAGS: u64 = 8;
149149

150150
pub const VALUE_AT_QUANTILE_QUANTILE: u64 = 1;
151151
pub const VALUE_AT_QUANTILE_VALUE: u64 = 2;

0 commit comments

Comments
 (0)