Skip to content

Commit 2d49406

Browse files
authored
Use bytes::Bytes for OTLP payloads across pdata/otap (open-telemetry#1473)
Switch `OtlpProtoBytes` to use `bytes::Bytes` and wire it through pdata and otap (part of open-telemetry#1461). All OTAP components now consume/produce Bytes: exporter, batch/filter/debug processors, fake data generator, fixtures, OTLP gRPC server, and Azure/Geneva exporters. I didn't observe any performance regression. These changes will be leverage in future PRs. No perf regression observed -> https://github.com/open-telemetry/otel-arrow/actions/runs/19655923074/job/56292300977
1 parent 0dc05d9 commit 2d49406

File tree

18 files changed

+120
-79
lines changed

18 files changed

+120
-79
lines changed

rust/otap-dataflow/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,9 @@ async-stream = "0.3.6"
5454
async-trait = "0.1.88"
5555
async-unsync = "0.3.0"
5656
axum = "0.8.4"
57-
bitflags = "2.9"
58-
bytemuck = "1.23"
57+
bitflags = "2.10"
58+
bytes = "1.11"
59+
bytemuck = "1.24"
5960
chrono = { version = "0.4", features = ["serde"] }
6061
ciborium = "0.2.2"
6162
clap = { version = "4.5.42", features = ["derive"] }

rust/otap-dataflow/crates/otap/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ tonic-prost = { workspace = true }
3939
prost = { workspace = true }
4040
smallvec = { workspace = true }
4141
bitflags = { workspace = true }
42+
bytes = { workspace = true }
4243

4344
otap-df-engine = { path = "../engine" }
4445
otap-df-engine-macros = { path = "../engine-macros" }

rust/otap-dataflow/crates/otap/src/attributes_processor.rs

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,7 @@ mod payload_sets {
531531
mod tests {
532532
use super::*;
533533
use crate::pdata::OtapPdata;
534+
use bytes::BytesMut;
534535
use otap_df_engine::context::ControllerContext;
535536
use otap_df_engine::message::Message;
536537
use otap_df_engine::testing::{node::test_node, processor::TestRuntime};
@@ -626,8 +627,9 @@ mod tests {
626627

627628
phase
628629
.run_test(|mut ctx| async move {
629-
let mut bytes = Vec::new();
630+
let mut bytes = BytesMut::new();
630631
input.encode(&mut bytes).expect("encode");
632+
let bytes = bytes.freeze();
631633
let pdata_in =
632634
OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(bytes).into());
633635
ctx.process(Message::PData(pdata_in))
@@ -644,7 +646,7 @@ mod tests {
644646
OtlpProtoBytes::ExportLogsRequest(b) => b,
645647
_ => panic!("unexpected otlp variant"),
646648
};
647-
let decoded = ExportLogsServiceRequest::decode(bytes.as_slice()).expect("decode");
649+
let decoded = ExportLogsServiceRequest::decode(bytes.as_ref()).expect("decode");
648650

649651
// Resource should still have key "a"
650652
let res_attrs = &decoded.resource_logs[0]
@@ -707,9 +709,11 @@ mod tests {
707709

708710
phase
709711
.run_test(|mut ctx| async move {
710-
let mut bytes = Vec::new();
712+
let mut bytes = BytesMut::new();
711713
input.encode(&mut bytes).expect("encode");
712-
let pdata_in = OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(bytes).into());
714+
let bytes = bytes.freeze();
715+
let pdata_in =
716+
OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(bytes).into());
713717
ctx.process(Message::PData(pdata_in))
714718
.await
715719
.expect("process");
@@ -722,7 +726,7 @@ mod tests {
722726
OtlpProtoBytes::ExportLogsRequest(b) => b,
723727
_ => panic!("unexpected otlp variant"),
724728
};
725-
let decoded = ExportLogsServiceRequest::decode(bytes.as_slice()).expect("decode");
729+
let decoded = ExportLogsServiceRequest::decode(bytes.as_ref()).expect("decode");
726730

727731
// Resource should still have key "a"
728732
let res_attrs = &decoded.resource_logs[0]
@@ -790,8 +794,9 @@ mod tests {
790794

791795
phase
792796
.run_test(|mut ctx| async move {
793-
let mut bytes = Vec::new();
797+
let mut bytes = BytesMut::new();
794798
input.encode(&mut bytes).expect("encode");
799+
let bytes = bytes.freeze();
795800
let pdata_in =
796801
OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(bytes).into());
797802
ctx.process(Message::PData(pdata_in))
@@ -805,7 +810,7 @@ mod tests {
805810
OtlpProtoBytes::ExportLogsRequest(b) => b,
806811
_ => panic!("unexpected otlp variant"),
807812
};
808-
let decoded = ExportLogsServiceRequest::decode(bytes.as_slice()).expect("decode");
813+
let decoded = ExportLogsServiceRequest::decode(bytes.as_ref()).expect("decode");
809814

810815
// Resource 'a' should be deleted
811816
let res_attrs = &decoded.resource_logs[0]
@@ -865,8 +870,9 @@ mod tests {
865870

866871
phase
867872
.run_test(|mut ctx| async move {
868-
let mut bytes = Vec::new();
873+
let mut bytes = BytesMut::new();
869874
input.encode(&mut bytes).expect("encode");
875+
let bytes = bytes.freeze();
870876
let pdata_in =
871877
OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(bytes).into());
872878
ctx.process(Message::PData(pdata_in))
@@ -880,7 +886,7 @@ mod tests {
880886
OtlpProtoBytes::ExportLogsRequest(b) => b,
881887
_ => panic!("unexpected otlp variant"),
882888
};
883-
let decoded = ExportLogsServiceRequest::decode(bytes.as_slice()).expect("decode");
889+
let decoded = ExportLogsServiceRequest::decode(bytes.as_ref()).expect("decode");
884890

885891
// Resource 'a' should remain
886892
let res_attrs = &decoded.resource_logs[0]
@@ -944,8 +950,9 @@ mod tests {
944950

945951
phase
946952
.run_test(|mut ctx| async move {
947-
let mut bytes = Vec::new();
953+
let mut bytes = BytesMut::new();
948954
input.encode(&mut bytes).expect("encode");
955+
let bytes = bytes.freeze();
949956
let pdata_in =
950957
OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(bytes).into());
951958
ctx.process(Message::PData(pdata_in))
@@ -959,7 +966,7 @@ mod tests {
959966
OtlpProtoBytes::ExportLogsRequest(b) => b,
960967
_ => panic!("unexpected otlp variant"),
961968
};
962-
let decoded = ExportLogsServiceRequest::decode(bytes.as_slice()).expect("decode");
969+
let decoded = ExportLogsServiceRequest::decode(bytes.as_ref()).expect("decode");
963970

964971
// Resource 'a' should be deleted; 'r' should remain
965972
let res_attrs = &decoded.resource_logs[0]
@@ -992,6 +999,7 @@ mod tests {
992999
mod telemetry_tests {
9931000
use super::*;
9941001
use crate::pdata::OtapPdata;
1002+
use bytes::BytesMut;
9951003
use otap_df_engine::config::ProcessorConfig;
9961004
use otap_df_engine::context::ControllerContext;
9971005
use otap_df_engine::control::NodeControlMsg;
@@ -1073,9 +1081,9 @@ mod telemetry_tests {
10731081
..Default::default()
10741082
}],
10751083
};
1076-
let mut bytes = Vec::new();
1084+
let mut bytes = BytesMut::new();
10771085
req.encode(&mut bytes).expect("encode");
1078-
bytes
1086+
bytes.freeze()
10791087
};
10801088

10811089
// 5) Drive processor with TestRuntime; start collector inside and then request a telemetry snapshot

rust/otap-dataflow/crates/otap/src/batch_processor.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use crate::OTAP_PROCESSOR_FACTORIES;
88
use crate::pdata::{Context, OtapPdata};
99
use async_trait::async_trait;
10+
use bytes::Bytes;
1011
use linkme::distributed_slice;
1112
use otap_df_config::SignalType;
1213
use otap_df_config::error::Error as ConfigError;
@@ -562,9 +563,9 @@ impl SignalBuffer {
562563
// have a bunch of timers pending, when we want a timer we can
563564
// cancel.
564565
let wakeup: OtapPayload = match signal {
565-
SignalType::Logs => OtlpProtoBytes::ExportLogsRequest(vec![]),
566-
SignalType::Metrics => OtlpProtoBytes::ExportMetricsRequest(vec![]),
567-
SignalType::Traces => OtlpProtoBytes::ExportTracesRequest(vec![]),
566+
SignalType::Logs => OtlpProtoBytes::ExportLogsRequest(Bytes::new()),
567+
SignalType::Metrics => OtlpProtoBytes::ExportMetricsRequest(Bytes::new()),
568+
SignalType::Traces => OtlpProtoBytes::ExportTracesRequest(Bytes::new()),
568569
}
569570
.into();
570571

rust/otap-dataflow/crates/otap/src/debug_processor.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ impl local::Processor<OtapPdata> for DebugProcessor {
289289
match otlp_bytes {
290290
OtlpProtoBytes::ExportLogsRequest(bytes) => {
291291
if active_signals.contains(&SignalActive::Logs) {
292-
let req = LogsData::decode(bytes.as_slice()).map_err(|e| {
292+
let req = LogsData::decode(bytes.as_ref()).map_err(|e| {
293293
Error::PdataConversionError {
294294
error: format!("error decoding proto bytes: {e}"),
295295
}
@@ -300,7 +300,7 @@ impl local::Processor<OtapPdata> for DebugProcessor {
300300
}
301301
OtlpProtoBytes::ExportMetricsRequest(bytes) => {
302302
if active_signals.contains(&SignalActive::Metrics) {
303-
let req = MetricsData::decode(bytes.as_slice()).map_err(|e| {
303+
let req = MetricsData::decode(bytes.as_ref()).map_err(|e| {
304304
Error::PdataConversionError {
305305
error: format!("error decoding proto bytes: {e}"),
306306
}
@@ -311,7 +311,7 @@ impl local::Processor<OtapPdata> for DebugProcessor {
311311
}
312312
OtlpProtoBytes::ExportTracesRequest(bytes) => {
313313
if active_signals.contains(&SignalActive::Spans) {
314-
let req = TracesData::decode(bytes.as_slice()).map_err(|e| {
314+
let req = TracesData::decode(bytes.as_ref()).map_err(|e| {
315315
Error::PdataConversionError {
316316
error: format!("error decoding proto bytes: {e}"),
317317
}
@@ -504,6 +504,7 @@ mod tests {
504504
use crate::debug_processor::sampling::SamplingConfig;
505505
use crate::debug_processor::{DEBUG_PROCESSOR_URN, DebugProcessor};
506506
use crate::pdata::OtapPdata;
507+
use bytes::BytesMut;
507508
use otap_df_config::node::NodeUserConfig;
508509
use otap_df_engine::context::ControllerContext;
509510
use otap_df_engine::message::Message;
@@ -612,12 +613,13 @@ mod tests {
612613
)]);
613614

614615
//convert logsdata to otappdata
615-
let mut bytes = vec![];
616+
let mut log_bytes = BytesMut::new();
616617
logs_data
617-
.encode(&mut bytes)
618+
.encode(&mut log_bytes)
618619
.expect("failed to encode log data into bytes");
620+
let log_bytes = log_bytes.freeze();
619621
let otlp_logs_bytes =
620-
OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(bytes).into());
622+
OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(log_bytes).into());
621623
ctx.process(Message::PData(otlp_logs_bytes))
622624
.await
623625
.expect("failed to process");
@@ -667,12 +669,14 @@ mod tests {
667669
)],
668670
)]);
669671

670-
bytes = vec![];
672+
let mut metric_bytes = BytesMut::new();
671673
metrics_data
672-
.encode(&mut bytes)
674+
.encode(&mut metric_bytes)
673675
.expect("failed to encode log data into bytes");
674-
let otlp_metrics_bytes =
675-
OtapPdata::new_default(OtlpProtoBytes::ExportMetricsRequest(bytes).into());
676+
let metric_bytes = metric_bytes.freeze();
677+
let otlp_metrics_bytes = OtapPdata::new_default(
678+
OtlpProtoBytes::ExportMetricsRequest(metric_bytes).into(),
679+
);
676680
ctx.process(Message::PData(otlp_metrics_bytes))
677681
.await
678682
.expect("failed to process");
@@ -731,12 +735,13 @@ mod tests {
731735
)],
732736
)]);
733737

734-
bytes = vec![];
738+
let mut trace_bytes = BytesMut::new();
735739
traces_data
736-
.encode(&mut bytes)
740+
.encode(&mut trace_bytes)
737741
.expect("failed to encode log data into bytes");
742+
let trace_bytes = trace_bytes.freeze();
738743
let otlp_traces_bytes =
739-
OtapPdata::new_default(OtlpProtoBytes::ExportTracesRequest(bytes).into());
744+
OtapPdata::new_default(OtlpProtoBytes::ExportTracesRequest(trace_bytes).into());
740745
ctx.process(Message::PData(otlp_traces_bytes))
741746
.await
742747
.expect("failed to process");

rust/otap-dataflow/crates/otap/src/experimental/azure_monitor_exporter/exporter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ impl AzureMonitorExporter {
6969

7070
match otlp_bytes {
7171
OtlpProtoBytes::ExportLogsRequest(bytes) => {
72-
let request = ExportLogsServiceRequest::decode(bytes.as_slice()).map_err(|e| {
72+
let request = ExportLogsServiceRequest::decode(bytes.as_ref()).map_err(|e| {
7373
Error::PDataError {
7474
reason: format!("Failed to decode OTLP logs request: {e}"),
7575
}

rust/otap-dataflow/crates/otap/src/experimental/geneva_exporter/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ impl GenevaExporter {
256256
.await;
257257

258258
// Decode OTLP bytes to ResourceLogs
259-
let logs_request = ExportLogsServiceRequest::decode(&bytes[..])
259+
let logs_request = ExportLogsServiceRequest::decode(bytes.as_ref())
260260
.map_err(|e| format!("Failed to decode logs request: {}", e))?;
261261

262262
// Encode and compress using Geneva client
@@ -288,7 +288,7 @@ impl GenevaExporter {
288288
.await;
289289

290290
// Decode OTLP bytes to ResourceSpans
291-
let traces_request = ExportTraceServiceRequest::decode(&bytes[..])
291+
let traces_request = ExportTraceServiceRequest::decode(bytes.as_ref())
292292
.map_err(|e| format!("Failed to decode traces request: {}", e))?;
293293

294294
// Encode and compress using Geneva client

rust/otap-dataflow/crates/otap/src/fake_data_generator.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::fake_data_generator::fake_signal::{
1212
};
1313
use crate::pdata::OtapPdata;
1414
use async_trait::async_trait;
15+
use bytes::BytesMut;
1516
use linkme::distributed_slice;
1617
use metrics::FakeSignalReceiverMetrics;
1718
use otap_df_config::node::NodeUserConfig;
@@ -406,19 +407,25 @@ impl TryFrom<OtlpProtoMessage> for OtapPdata {
406407
type Error = Error;
407408

408409
fn try_from(value: OtlpProtoMessage) -> Result<Self, Self::Error> {
409-
let mut bytes = vec![];
410+
let mut bytes = BytesMut::new();
410411
Ok(match value {
411412
OtlpProtoMessage::Logs(logs_data) => {
412413
logs_data.encode(&mut bytes)?;
413-
OtapPdata::new_todo_context(OtlpProtoBytes::ExportLogsRequest(bytes).into())
414+
OtapPdata::new_todo_context(
415+
OtlpProtoBytes::ExportLogsRequest(bytes.freeze()).into(),
416+
)
414417
}
415418
OtlpProtoMessage::Metrics(metrics_data) => {
416419
metrics_data.encode(&mut bytes)?;
417-
OtapPdata::new_todo_context(OtlpProtoBytes::ExportMetricsRequest(bytes).into())
420+
OtapPdata::new_todo_context(
421+
OtlpProtoBytes::ExportMetricsRequest(bytes.freeze()).into(),
422+
)
418423
}
419424
OtlpProtoMessage::Traces(trace_data) => {
420425
trace_data.encode(&mut bytes)?;
421-
OtapPdata::new_todo_context(OtlpProtoBytes::ExportTracesRequest(bytes).into())
426+
OtapPdata::new_todo_context(
427+
OtlpProtoBytes::ExportTracesRequest(bytes.freeze()).into(),
428+
)
422429
}
423430
})
424431
}

rust/otap-dataflow/crates/otap/src/filter_processor.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ impl local::Processor<OtapPdata> for FilterProcessor {
181181
mod tests {
182182
use crate::filter_processor::{FILTER_PROCESSOR_URN, FilterProcessor, config::Config};
183183
use crate::pdata::OtapPdata;
184+
use bytes::BytesMut;
184185
use otap_df_config::node::NodeUserConfig;
185186
use otap_df_engine::context::ControllerContext;
186187
use otap_df_engine::message::Message;
@@ -622,9 +623,10 @@ mod tests {
622623
move |mut ctx| {
623624
Box::pin(async move {
624625
//convert logsdata to otappdata
625-
let mut bytes = vec![];
626+
let mut bytes = BytesMut::new();
626627
sent.encode(&mut bytes)
627628
.expect("failed to encode log data into bytes");
629+
let bytes = bytes.freeze();
628630
let otlp_logs_bytes =
629631
OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(bytes).into());
630632
ctx.process(Message::PData(otlp_logs_bytes))
@@ -638,7 +640,7 @@ mod tests {
638640
.try_into()
639641
.expect("failed to convert to OtlpProtoBytes");
640642
let received_logs_data = match otlp_bytes {
641-
OtlpProtoBytes::ExportLogsRequest(bytes) => LogsData::decode(bytes.as_slice())
643+
OtlpProtoBytes::ExportLogsRequest(bytes) => LogsData::decode(bytes.as_ref())
642644
.expect("failed to decode logs into logsdata"),
643645
_ => panic!("expected logs type"),
644646
};
@@ -656,10 +658,11 @@ mod tests {
656658
Box::pin(async move {
657659
//convert tracesdata to otappdata
658660
let traces_data = build_traces();
659-
let mut bytes = vec![];
661+
let mut bytes = BytesMut::new();
660662
traces_data
661663
.encode(&mut bytes)
662664
.expect("failed to encode trace data into bytes");
665+
let bytes = bytes.freeze();
663666
let otlp_traces_bytes =
664667
OtapPdata::new_default(OtlpProtoBytes::ExportTracesRequest(bytes).into());
665668
ctx.process(Message::PData(otlp_traces_bytes))
@@ -674,7 +677,7 @@ mod tests {
674677
.expect("failed to convert to OtlpProtoBytes");
675678
let received_traces_data = match otlp_bytes {
676679
OtlpProtoBytes::ExportTracesRequest(bytes) => {
677-
TracesData::decode(bytes.as_slice())
680+
TracesData::decode(bytes.as_ref())
678681
.expect("failed to decode traces into tracesdata")
679682
}
680683
_ => panic!("expected traces type"),

rust/otap-dataflow/crates/otap/src/fixtures.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use arrow::array::{
88
};
99
use arrow::datatypes::{DataType, Field, Schema, TimeUnit, UInt16Type, UInt32Type};
1010
use arrow_ipc::writer::StreamWriter;
11+
use bytes::BytesMut;
1112
use fluke_hpack::Encoder;
1213
use otap_df_pdata::OtlpProtoBytes;
1314
use otap_df_pdata::otlp::attributes::AttributeValueType;
@@ -124,9 +125,9 @@ pub fn create_single_logs_pdata_with_attrs(attributes: Vec<KeyValue>) -> OtapPda
124125
..Default::default()
125126
}],
126127
};
127-
let mut bytes = vec![];
128+
let mut bytes = BytesMut::new();
128129
log_req_1.encode(&mut bytes).unwrap();
129-
OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(bytes).into())
130+
OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(bytes.freeze()).into())
130131
}
131132

132133
pub fn create_simple_logs_arrow_record_batches(options: SimpleDataGenOptions) -> BatchArrowRecords {

0 commit comments

Comments
 (0)