Skip to content

Commit b626bce

Browse files
authored
[otap-df-pdata]Add zero-copy view for OTAP logs format (open-telemetry#1467)
Fixes: open-telemetry#1441 Implements `OtapLogsView` - a zero-copy view over Arrow RecordBatch data for OTAP logs. ## Use Cases This view is useful for exporters that need to iterate over the OTAP log data: - Geneva Exporter: Enables the exporter to iterate directly over Arrow data to generate custom payloads, avoiding the costly round-trip of converting to OTLP bytes and decoding them back to objects. - Log Analytics: Allows efficient, zero-copy access to log fields for serialization to JSON. - General Data Access: Provides a standard, efficient way for any component to read the hierarchical OTLP structure (Resource → Scope → Log) without needing to understand the underlying Arrow complexity. Without views: OTAP → OTLP bytes → decode → iterate (slower) With views: OTAP → view → iterate (faster, zero-copy) ## Design - Stores column references and accessors (fixed overhead) - Pre-computes indices for resource/scope/log hierarchy - Actual telemetry data remains in Arrow RecordBatch (no copying) ## Benchmark Results System: Apple M4 Pro, 12 cores, 24 GB RAM | Logs | Zero-Copy View | OTAP→OTLP→Process | |------|----------------|-------------------| | 10 | 1.48 µs | 6.43 µs | | 100 | 9.86 µs | 40.40 µs | | 1000 | 121.42 µs | 383.11 µs | OTAP→OTLP→Process = full cost without views (OTAP→OTLP conversion + OTLP decode + iterate) ## Changes - Added `OtapLogsView` with hierarchical iteration (Resource → Scope → LogRecord → Attributes) - Added column caching to avoid repeated schema lookups - Updated Geneva exporter: - Added example view-based code (commented out) to demonstrate intended usage - Currently uses OTAP→OTLP fallback conversion until geneva-uploader 0.4.0+ supports view API - All pipelines (OTLP bytes and OTAP Arrow) continue to work
1 parent 261dd53 commit b626bce

File tree

7 files changed

+2774
-99
lines changed

7 files changed

+2774
-99
lines changed

rust/otap-dataflow/benchmarks/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ harness = false
7777
name = "item_count"
7878
harness = false
7979

80+
[[bench]]
81+
name = "otap_logs_view"
82+
harness = false
83+
8084
[profile.bench]
8185
opt-level = 3
8286
debug = false
Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! Benchmarks for OtapLogsView (zero-copy) vs OTLP conversion
5+
//!
6+
//! ## Benchmark Results
7+
//!
8+
//! System: Apple M4 Pro, 24 GB RAM, 12 cores
9+
//!
10+
//! | Logs | Zero-Copy View | OTAP→OTLP→Process | Direct Arrow |
11+
//! |------|----------------|-------------------|--------------|
12+
//! | 10 | 1.48 µs | 6.43 µs | 24.41 ns |
13+
//! | 100 | 9.86 µs | 40.40 µs | 233.67 ns |
14+
//! | 1000 | 121.42 µs | 383.11 µs | 2.35 µs |
15+
//!
16+
//! OTAP→OTLP→Process includes: OTAP→OTLP conversion + OTLP decode + iterate.
17+
//! Direct Arrow is fastest but unrealistic (baseline only, no hierarchical access).
18+
19+
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
20+
use otap_df_pdata::otap::OtapArrowRecords;
21+
use otap_df_pdata::otlp::{ProtoBuffer, ProtoBytesEncoder, logs::LogsProtoBytesEncoder};
22+
use otap_df_pdata::proto::opentelemetry::arrow::v1::ArrowPayloadType;
23+
use otap_df_pdata::proto::opentelemetry::collector::logs::v1::ExportLogsServiceRequest;
24+
use otap_df_pdata::views::common::AttributeView;
25+
use otap_df_pdata::views::logs::{LogRecordView, LogsDataView, ResourceLogsView, ScopeLogsView};
26+
use otap_df_pdata::views::otap::OtapLogsView;
27+
use prost::Message;
28+
29+
use arrow::array::Array;
30+
use arrow::array::DictionaryArray;
31+
use arrow::datatypes::UInt8Type;
32+
use arrow::error::ArrowError;
33+
use otap_df_pdata::encode::record::attributes::StrKeysAttributesRecordBatchBuilder;
34+
use otap_df_pdata::encode::record::logs::LogsRecordBatchBuilder;
35+
36+
// Helper to create a simple OTAP logs RecordBatch for testing
37+
fn create_test_logs_batch(count: usize) -> Result<OtapArrowRecords, ArrowError> {
38+
let mut builder = LogsRecordBatchBuilder::new();
39+
40+
for i in 0..count {
41+
// Log Record fields
42+
builder.append_time_unix_nano(100);
43+
builder.append_observed_time_unix_nano(110);
44+
builder.append_severity_number(Some(1));
45+
builder.append_severity_text(Some(b"INFO"));
46+
builder.body.append_str(b"test log body");
47+
builder.append_dropped_attributes_count(0);
48+
builder.append_flags(Some(1));
49+
// Append unique ID for attribute linking (1-based to avoid 0 which is sometimes treated as null/default)
50+
builder.append_id(Some((i + 1) as u16));
51+
52+
// Resource fields (repeated for simplicity in this test)
53+
builder.resource.append_id(Some(1));
54+
builder.resource.append_schema_url(None);
55+
builder.resource.append_dropped_attributes_count(0);
56+
57+
// Scope fields (repeated for simplicity in this test)
58+
builder.scope.append_id(Some(10));
59+
builder.scope.append_name(Some(b"test_scope"));
60+
builder.scope.append_version(Some(b"1.0"));
61+
builder.scope.append_dropped_attributes_count(0);
62+
}
63+
64+
let mut otap_records = OtapArrowRecords::Logs(Default::default());
65+
otap_records.set(ArrowPayloadType::Logs, builder.finish()?);
66+
67+
// Create Resource Attributes (Parent ID = 1)
68+
let mut res_attrs = StrKeysAttributesRecordBatchBuilder::<u16>::new();
69+
res_attrs.append_parent_id(&1);
70+
res_attrs.append_key("service.name");
71+
res_attrs.any_values_builder.append_str(b"test_service");
72+
res_attrs.append_parent_id(&1);
73+
res_attrs.append_key("host.name");
74+
res_attrs.any_values_builder.append_str(b"localhost");
75+
otap_records.set(ArrowPayloadType::ResourceAttrs, res_attrs.finish()?);
76+
77+
// Create Scope Attributes (Parent ID = 10)
78+
let mut scope_attrs = StrKeysAttributesRecordBatchBuilder::<u16>::new();
79+
scope_attrs.append_parent_id(&10);
80+
scope_attrs.append_key("scope.attr");
81+
scope_attrs.any_values_builder.append_str(b"scope_value");
82+
otap_records.set(ArrowPayloadType::ScopeAttrs, scope_attrs.finish()?);
83+
84+
// Create Log Attributes (Parent ID = 1..count)
85+
let mut log_attrs = StrKeysAttributesRecordBatchBuilder::<u16>::new();
86+
for i in 0..count {
87+
let log_id = (i + 1) as u16;
88+
// Add 3 attributes per log
89+
log_attrs.append_parent_id(&log_id);
90+
log_attrs.append_key("log.attr.1");
91+
log_attrs.any_values_builder.append_str(b"value1");
92+
93+
log_attrs.append_parent_id(&log_id);
94+
log_attrs.append_key("log.attr.2");
95+
log_attrs.any_values_builder.append_int(42);
96+
97+
log_attrs.append_parent_id(&log_id);
98+
log_attrs.append_key("log.attr.3");
99+
log_attrs.any_values_builder.append_bool(true);
100+
}
101+
otap_records.set(ArrowPayloadType::LogAttrs, log_attrs.finish()?);
102+
103+
Ok(otap_records)
104+
}
105+
106+
/// Benchmark entry point
107+
fn bench_otap_logs_view(c: &mut Criterion) {
108+
let mut group = c.benchmark_group("otap_logs_view");
109+
110+
for size in [10, 100, 1000].iter() {
111+
let input = create_test_logs_batch(*size).expect("Failed to create test logs batch");
112+
113+
let _ = group.bench_with_input(
114+
BenchmarkId::new("zero_copy_view", size),
115+
&input,
116+
|b, input| {
117+
b.iter(|| {
118+
let view =
119+
OtapLogsView::try_from(input).expect("Failed to create OtapLogsView");
120+
let mut count = 0;
121+
for resource in view.resources() {
122+
for scope in resource.scopes() {
123+
for log in scope.log_records() {
124+
// Access body
125+
let _ = std::hint::black_box(log.body());
126+
// Access attributes
127+
for attr in log.attributes() {
128+
let _ = std::hint::black_box(attr.value());
129+
}
130+
count += 1;
131+
}
132+
}
133+
}
134+
assert_eq!(count, *size);
135+
})
136+
},
137+
);
138+
139+
let _ = group.bench_with_input(
140+
BenchmarkId::new("otlp_bytes_processing", size),
141+
&input,
142+
|b, input: &OtapArrowRecords| {
143+
b.iter(|| {
144+
// 1. Convert OTAP -> OTLP bytes
145+
let mut logs_encoder = LogsProtoBytesEncoder::new();
146+
let mut buffer = ProtoBuffer::new();
147+
let mut input_clone = input.clone();
148+
logs_encoder
149+
.encode(&mut input_clone, &mut buffer)
150+
.expect("Failed to encode logs to OTLP");
151+
let otlp_bytes = buffer.as_ref();
152+
153+
// 2. Decode OTLP bytes -> OTLP Structs
154+
let request = ExportLogsServiceRequest::decode(otlp_bytes)
155+
.expect("Failed to decode OTLP logs");
156+
157+
// 3. Iterate
158+
let mut count = 0;
159+
for resource_logs in request.resource_logs {
160+
for scope_logs in resource_logs.scope_logs {
161+
for _log in scope_logs.log_records {
162+
count += 1;
163+
}
164+
}
165+
}
166+
assert_eq!(count, *size);
167+
})
168+
},
169+
);
170+
171+
let _ = group.bench_with_input(
172+
BenchmarkId::new("direct_arrow_iteration", size),
173+
&input,
174+
|b, input: &OtapArrowRecords| {
175+
let logs_batch = input
176+
.get(ArrowPayloadType::Logs)
177+
.expect("Logs batch not found");
178+
let resource_attrs = input
179+
.get(ArrowPayloadType::ResourceAttrs)
180+
.expect("Resource attrs not found");
181+
let scope_attrs = input
182+
.get(ArrowPayloadType::ScopeAttrs)
183+
.expect("Scope attrs not found");
184+
let log_attrs = input
185+
.get(ArrowPayloadType::LogAttrs)
186+
.expect("Log attrs not found");
187+
188+
let time_col = logs_batch
189+
.column_by_name("time_unix_nano")
190+
.expect("time_unix_nano column not found")
191+
.as_any()
192+
.downcast_ref::<arrow::array::TimestampNanosecondArray>()
193+
.expect("Failed to downcast time_unix_nano");
194+
let resource_col = logs_batch
195+
.column_by_name("resource")
196+
.expect("resource column not found")
197+
.as_any()
198+
.downcast_ref::<arrow::array::StructArray>()
199+
.expect("Failed to downcast resource");
200+
let resource_id_col = resource_col
201+
.column_by_name("id")
202+
.expect("resource.id column not found")
203+
.as_any()
204+
.downcast_ref::<arrow::array::UInt16Array>()
205+
.expect("Failed to downcast resource.id");
206+
let scope_col = logs_batch
207+
.column_by_name("scope")
208+
.expect("scope column not found")
209+
.as_any()
210+
.downcast_ref::<arrow::array::StructArray>()
211+
.expect("Failed to downcast scope");
212+
let scope_id_col = scope_col
213+
.column_by_name("id")
214+
.expect("scope.id column not found")
215+
.as_any()
216+
.downcast_ref::<arrow::array::UInt16Array>()
217+
.expect("Failed to downcast scope.id");
218+
219+
// Attribute columns (Dictionary encoded)
220+
let res_attr_key_col = resource_attrs
221+
.column_by_name("key")
222+
.expect("resource attrs key column not found")
223+
.as_any()
224+
.downcast_ref::<DictionaryArray<UInt8Type>>()
225+
.expect("Failed to downcast resource attrs key");
226+
let scope_attr_key_col = scope_attrs
227+
.column_by_name("key")
228+
.expect("scope attrs key column not found")
229+
.as_any()
230+
.downcast_ref::<DictionaryArray<UInt8Type>>()
231+
.expect("Failed to downcast scope attrs key");
232+
let log_attr_key_col = log_attrs
233+
.column_by_name("key")
234+
.expect("log attrs key column not found")
235+
.as_any()
236+
.downcast_ref::<DictionaryArray<UInt8Type>>()
237+
.expect("Failed to downcast log attrs key");
238+
239+
// Get underlying string values for length calculation
240+
let res_keys = res_attr_key_col
241+
.values()
242+
.as_any()
243+
.downcast_ref::<arrow::array::StringArray>()
244+
.expect("Failed to downcast resource keys to StringArray");
245+
let scope_keys = scope_attr_key_col
246+
.values()
247+
.as_any()
248+
.downcast_ref::<arrow::array::StringArray>()
249+
.expect("Failed to downcast scope keys to StringArray");
250+
let log_keys = log_attr_key_col
251+
.values()
252+
.as_any()
253+
.downcast_ref::<arrow::array::StringArray>()
254+
.expect("Failed to downcast log keys to StringArray");
255+
256+
b.iter(|| {
257+
let mut sum = 0;
258+
// Iterate logs
259+
for i in 0..logs_batch.num_rows() {
260+
let time = time_col.value(i);
261+
let res_id = resource_id_col.value(i);
262+
let scope_id = scope_id_col.value(i);
263+
sum += time as u64 + res_id as u64 + scope_id as u64;
264+
}
265+
266+
// Iterate attributes (simulating flat access)
267+
// For dictionary array, we should look up the value using the key index
268+
// But for simplicity/speed in this "flat" benchmark, we can just iterate the keys array
269+
// or just sum the lengths of the referenced strings.
270+
// Actually, `value(i)` on DictionaryArray returns the value? No, it returns the key index?
271+
// DictionaryArray doesn't have `value(i)` returning the string directly in a simple way without casting?
272+
// `res_attr_key_col.keys().value(i)` gives the index.
273+
// `res_keys.value(index)` gives the string.
274+
275+
let res_indices = res_attr_key_col.keys();
276+
for i in 0..resource_attrs.num_rows() {
277+
if res_indices.is_valid(i) {
278+
let idx = res_indices.value(i) as usize;
279+
sum += res_keys.value(idx).len() as u64;
280+
}
281+
}
282+
283+
let scope_indices = scope_attr_key_col.keys();
284+
for i in 0..scope_attrs.num_rows() {
285+
if scope_indices.is_valid(i) {
286+
let idx = scope_indices.value(i) as usize;
287+
sum += scope_keys.value(idx).len() as u64;
288+
}
289+
}
290+
291+
let log_indices = log_attr_key_col.keys();
292+
for i in 0..log_attrs.num_rows() {
293+
if log_indices.is_valid(i) {
294+
let idx = log_indices.value(i) as usize;
295+
sum += log_keys.value(idx).len() as u64;
296+
}
297+
}
298+
299+
let _ = std::hint::black_box(sum);
300+
})
301+
},
302+
);
303+
}
304+
305+
group.finish();
306+
}
307+
308+
#[allow(missing_docs)]
309+
mod bench_entry {
310+
use super::*;
311+
312+
criterion_group!(
313+
name = benches;
314+
config = Criterion::default();
315+
targets = bench_otap_logs_view
316+
);
317+
}
318+
319+
criterion_main!(bench_entry::benches);

0 commit comments

Comments
 (0)