Skip to content

Commit f35de84

Browse files
authored
chore: Migrate topics, internal_telemetry_receiver, and perf_exporter… (open-telemetry#2323)
… to core-nodes crate # Change Summary Next part of open-telemetry#1847 and open-telemetry#2086 Moves: * perf_exporter * topic_exporter * internal_telemetry_receiver * topic_receiver ## How are these changes tested? * Unit tests / CI * Compiled and ran `df_engine` and confirmed all nodes are still available ## Are there any user-facing changes? No
1 parent 2a69a7d commit f35de84

File tree

15 files changed

+82
-107
lines changed

15 files changed

+82
-107
lines changed

rust/otap-dataflow/README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,12 +256,14 @@ ongoing refactor.
256256

257257
Current status:
258258

259-
- Exporters: `console_exporter`, `error_exporter`, `noop_exporter`
259+
- Exporters: `console_exporter`, `error_exporter`, `noop_exporter`,
260+
`perf_exporter`, `topic_exporter`
260261
- Processors: `attributes_processor`, `batch_processor`,
261262
`content_router`, `debug_processor`, `delay_processor`,
262263
`durable_buffer_processor`, `fanout_processor`, `filter_processor`,
263264
`retry_processor`, `signal_type_router`, `transform_processor`
264-
- Receivers: `fake_data_generator`
265+
- Receivers: `fake_data_generator`, `internal_telemetry_receiver`,
266+
`topic_receiver`
265267

266268
### Contrib Nodes
267269

rust/otap-dataflow/benchmarks/benches/exporter/main.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
99
use fluke_hpack::Encoder;
1010
use otap_df_channel::mpsc;
11+
use otap_df_core_nodes::exporters::perf_exporter::{
12+
OTAP_PERF_EXPORTER_URN, PerfExporter, config::Config,
13+
};
1114
use otap_df_engine::{
1215
Interests,
1316
config::ExporterConfig,
@@ -19,7 +22,6 @@ use otap_df_engine::{
1922
use otap_df_otap::{
2023
otap_exporter::OTAPExporter,
2124
pdata::{Context, OtapPdata},
22-
perf_exporter::{config::Config, exporter::PerfExporter},
2325
};
2426
use otap_df_pdata::{
2527
Consumer,
@@ -63,7 +65,6 @@ use otap_df_engine::context::ControllerContext;
6365
use otap_df_engine::control::{Controllable, NodeControlMsg, pipeline_ctrl_msg_channel};
6466
use otap_df_otap::otap_exporter::OTAP_EXPORTER_URN;
6567
use otap_df_otap::otlp_grpc::OTLPData;
66-
use otap_df_otap::perf_exporter::exporter::OTAP_PERF_EXPORTER_URN;
6768
use otap_df_telemetry::InternalTelemetrySystem;
6869
use serde_json::json;
6970
use std::pin::Pin;

rust/otap-dataflow/crates/core-nodes/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ bytes.workspace = true
3333
data_engine_expressions.workspace = true
3434
data_engine_kql_parser.workspace = true
3535
humantime-serde.workspace = true
36+
futures.workspace = true
3637
linkme.workspace = true
3738
prost.workspace = true
3839
rand.workspace = true

rust/otap-dataflow/crates/core-nodes/README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ Each component lives in its own subfolder within a category:
1414
console_exporter/
1515
error_exporter/
1616
noop_exporter/
17+
perf_exporter/
18+
topic_exporter/
1719
processors/
1820
mod.rs (category exports)
1921
attributes_processor/
@@ -30,6 +32,8 @@ Each component lives in its own subfolder within a category:
3032
receivers/
3133
mod.rs (category exports)
3234
fake_data_generator/
35+
internal_telemetry_receiver/
36+
topic_receiver/
3337
lib.rs
3438

3539
## Components
@@ -41,6 +45,8 @@ Each component lives in its own subfolder within a category:
4145
| console_exporter | `urn:otel:exporter:console` | `src/exporters/console_exporter/` |
4246
| error_exporter | `urn:otel:exporter:error` | `src/exporters/error_exporter/` |
4347
| noop_exporter | `urn:otel:exporter:noop` | `src/exporters/noop_exporter/` |
48+
| perf_exporter | `urn:otel:exporter:perf` | `src/exporters/perf_exporter/` |
49+
| topic_exporter | `urn:otel:exporter:topic` | `src/exporters/topic_exporter/` |
4450

4551
#### console_exporter
4652

@@ -57,6 +63,16 @@ Each component lives in its own subfolder within a category:
5763
- Placeholder exporter that ACKs all messages without processing
5864
- Lightweight for performance testing and pipeline validation
5965

66+
#### perf_exporter
67+
68+
- Measures item throughput by signal type for benchmarking scenarios
69+
- Emits pdata-oriented telemetry metrics during pipeline execution
70+
71+
#### topic_exporter
72+
73+
- Publishes pdata into configured runtime topics
74+
- Supports tracked end-to-end ack/nack propagation through topic boundaries
75+
6076
### Processors
6177

6278
| Node | URN | Module |
@@ -140,9 +156,21 @@ Each component lives in its own subfolder within a category:
140156
| Node | URN | Module |
141157
| ---- | --- | ------ |
142158
| fake_data_generator | `urn:otel:receiver:traffic_generator` | `src/receivers/fake_data_generator/` |
159+
| internal_telemetry_receiver | `urn:otel:receiver:internal_telemetry` | `src/receivers/internal_telemetry_receiver/` |
160+
| topic_receiver | `urn:otel:receiver:topic` | `src/receivers/topic_receiver/` |
143161

144162
#### fake_data_generator
145163

146164
- Generates synthetic OTAP/OTLP signals for testing and benchmarking
147165
- Configurable signal generation strategies and volume constraints
148166
- Includes support for pregenerated, dynamic, and rate-based signal generation
167+
168+
#### internal_telemetry_receiver
169+
170+
- Receives internal engine telemetry events from the internal log channel
171+
- Emits them as OTLP log pdata into the configured pipeline
172+
173+
#### topic_receiver
174+
175+
- Subscribes to runtime topics and forwards messages into the pipeline
176+
- Supports broadcast/balanced subscription modes and topic ack/nack bridging

rust/otap-dataflow/crates/core-nodes/src/exporters/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,9 @@ pub mod error_exporter;
99

1010
/// Console exporter.
1111
pub mod console_exporter;
12+
13+
/// Topic exporter.
14+
pub mod topic_exporter;
15+
16+
/// Perf exporter.
17+
pub mod perf_exporter;

rust/otap-dataflow/crates/otap/src/perf_exporter/config.rs renamed to rust/otap-dataflow/crates/core-nodes/src/exporters/perf_exporter/config.rs

File renamed without changes.

rust/otap-dataflow/crates/otap/src/perf_exporter/metrics.rs renamed to rust/otap-dataflow/crates/core-nodes/src/exporters/perf_exporter/metrics.rs

File renamed without changes.

rust/otap-dataflow/crates/otap/src/perf_exporter/exporter.rs renamed to rust/otap-dataflow/crates/core-nodes/src/exporters/perf_exporter/mod.rs

Lines changed: 22 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,32 @@
33

44
//! Exporter used to measure the performance of the OTAP data pipeline.
55
//!
6-
//! ToDo Future developments / improvements:
6+
//! ToDo - Future developments / improvements:
77
//! - Replace this exporter with a processor that could be combined with a Noop exporter to achieve
88
//! the same functionality. The advantage would be to allow performance measurements anywhere in
99
//! the pipeline.
1010
//! - Measure the number of memory allocations for the current thread. This would allow measuring
1111
//! the memory used by the pipeline. This is possible using `mimalloc-sys`.
12-
//! - Measure per-thread CPU usage. This would allow measuring the pipelines CPU load. This is
13-
//! possible using the "libc" crate function `getrusage(RUSAGE\_THREAD)`.
12+
//! - Measure per-thread CPU usage. This would allow measuring the pipeline's CPU load. This is
13+
//! possible using the "libc" crate function `getrusage(RUSAGE_THREAD)`.
1414
//! - Measure network usage either via a cgroup or via eBPF.
15-
//! - Measure per-thread perf counters (see crates perfcnt, perfcnt2, or direct perf\_event\_open
15+
//! - Measure per-thread perf counters (see crates perfcnt, perfcnt2, or direct perf_event_open
1616
//! via nix/libc). We could measure task-clock, context switches, page faults, ...
1717
//! - Measure the latency of signals traversing the pipeline. This would require adding a timestamp
1818
//! in the headers of pdata messages.
1919
//! - Support live reconfiguration via control message.
2020
21-
use crate::OTAP_EXPORTER_FACTORIES;
22-
use crate::metrics::ExporterPDataMetrics;
23-
use crate::pdata::OtapPdata;
24-
use crate::perf_exporter::config::Config;
25-
use crate::perf_exporter::metrics::PerfExporterPdataMetrics;
21+
pub mod config;
22+
pub mod metrics;
23+
24+
use crate::exporters::perf_exporter::config::Config;
25+
use crate::exporters::perf_exporter::metrics::PerfExporterPdataMetrics;
2626
use async_trait::async_trait;
27+
use linkme::distributed_slice;
2728
use otap_df_config::SignalType;
2829
use otap_df_config::node::NodeUserConfig;
2930
use otap_df_engine::ConsumerEffectHandlerExtension;
31+
use otap_df_engine::ExporterFactory;
3032
use otap_df_engine::config::ExporterConfig;
3133
use otap_df_engine::context::PipelineContext;
3234
use otap_df_engine::control::{AckMsg, NodeControlMsg};
@@ -36,7 +38,9 @@ use otap_df_engine::local::exporter as local;
3638
use otap_df_engine::message::{Message, MessageChannel};
3739
use otap_df_engine::node::NodeId;
3840
use otap_df_engine::terminal_state::TerminalState;
39-
use otap_df_engine::{ExporterFactory, distributed_slice};
41+
use otap_df_otap::OTAP_EXPORTER_FACTORIES;
42+
use otap_df_otap::metrics::ExporterPDataMetrics;
43+
use otap_df_otap::pdata::OtapPdata;
4044
use otap_df_pdata::otap::OtapArrowRecords;
4145
use otap_df_telemetry::metrics::{MetricSet, MetricSetHandler};
4246
use otap_df_telemetry::otel_info;
@@ -267,22 +271,17 @@ impl local::Exporter<OtapPdata> for PerfExporter {
267271

268272
#[cfg(test)]
269273
mod tests {
270-
use crate::fixtures::{
271-
SimpleDataGenOptions, create_simple_logs_arrow_record_batches,
272-
create_simple_metrics_arrow_record_batches, create_simple_trace_arrow_record_batches,
273-
};
274-
use crate::pdata::OtapPdata;
275-
use crate::perf_exporter::config::Config;
276-
use crate::perf_exporter::exporter::{OTAP_PERF_EXPORTER_URN, PerfExporter};
274+
use super::{OTAP_PERF_EXPORTER_URN, PerfExporter};
275+
use crate::exporters::perf_exporter::config::Config;
277276
use otap_df_config::node::NodeUserConfig;
278277
use otap_df_engine::context::ControllerContext;
279278
use otap_df_engine::error::Error;
280279
use otap_df_engine::exporter::ExporterWrapper;
281280
use otap_df_engine::testing::exporter::TestContext;
282281
use otap_df_engine::testing::exporter::TestRuntime;
283282
use otap_df_engine::testing::test_node;
284-
use otap_df_pdata::Consumer;
285-
use otap_df_pdata::otap::{OtapArrowRecords, from_record_messages};
283+
use otap_df_otap::pdata::OtapPdata;
284+
use otap_df_otap::testing::create_test_pdata;
286285
use otap_df_telemetry::registry::TelemetryRegistryHandle;
287286
use std::future::Future;
288287
use std::ops::Add;
@@ -297,59 +296,10 @@ mod tests {
297296
-> impl FnOnce(TestContext<OtapPdata>) -> std::pin::Pin<Box<dyn Future<Output = ()>>> {
298297
|ctx| {
299298
Box::pin(async move {
300-
// send some messages to the exporter to calculate pipeline statistics
301-
for i in 0..3 {
302-
let mut traces_batch_data =
303-
create_simple_trace_arrow_record_batches(SimpleDataGenOptions {
304-
id_offset: 3 * i,
305-
num_rows: 5,
306-
..Default::default()
307-
});
308-
let mut logs_batch_data =
309-
create_simple_logs_arrow_record_batches(SimpleDataGenOptions {
310-
id_offset: 3 * i + 1,
311-
num_rows: 5,
312-
..Default::default()
313-
});
314-
let mut metrics_batch_data =
315-
create_simple_metrics_arrow_record_batches(SimpleDataGenOptions {
316-
id_offset: 3 * i + 2,
317-
num_rows: 5,
318-
..Default::default()
319-
});
320-
321-
let trace_batch_data = from_record_messages(
322-
Consumer::default()
323-
.consume_bar(&mut traces_batch_data)
324-
.unwrap(),
325-
);
326-
let logs_batch_data = from_record_messages(
327-
Consumer::default()
328-
.consume_bar(&mut logs_batch_data)
329-
.unwrap(),
330-
);
331-
let metrics_batch_data = from_record_messages(
332-
Consumer::default()
333-
.consume_bar(&mut metrics_batch_data)
334-
.unwrap(),
335-
);
336-
337-
// Send a data message
338-
ctx.send_pdata(OtapPdata::new_default(
339-
OtapArrowRecords::Traces(trace_batch_data).into(),
340-
))
341-
.await
342-
.expect("Failed to send data message");
343-
ctx.send_pdata(OtapPdata::new_default(
344-
OtapArrowRecords::Logs(logs_batch_data).into(),
345-
))
346-
.await
347-
.expect("Failed to send data message");
348-
ctx.send_pdata(OtapPdata::new_default(
349-
OtapArrowRecords::Metrics(metrics_batch_data).into(),
350-
))
351-
.await
352-
.expect("Failed to send data message");
299+
for _ in 0..3 {
300+
ctx.send_pdata(create_test_pdata())
301+
.await
302+
.expect("Failed to send data message");
353303
}
354304

355305
// TODO ADD DELAY BETWEEN HERE

rust/otap-dataflow/crates/otap/src/topic_exporter.rs renamed to rust/otap-dataflow/crates/core-nodes/src/exporters/topic_exporter/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33

44
//! Topic exporter.
55
6-
use crate::OTAP_EXPORTER_FACTORIES;
7-
use crate::pdata::OtapPdata;
86
use async_trait::async_trait;
97
use futures::stream::{FuturesUnordered, StreamExt};
108
use linkme::distributed_slice;
@@ -25,6 +23,8 @@ use otap_df_engine::topic::{
2523
PublishOutcome, TopicHandle, TrackedPublishOutcome, TrackedTryPublishOutcome,
2624
};
2725
use otap_df_engine::{ConsumerEffectHandlerExtension, ExporterFactory};
26+
use otap_df_otap::OTAP_EXPORTER_FACTORIES;
27+
use otap_df_otap::pdata::OtapPdata;
2828
use otap_df_telemetry::instrument::{Counter, Gauge};
2929
use otap_df_telemetry::metrics::MetricSet;
3030
use otap_df_telemetry::{otel_info, otel_warn};
@@ -363,8 +363,6 @@ impl Exporter<OtapPdata> for TopicExporter {
363363
#[cfg(test)]
364364
mod tests {
365365
use super::{TOPIC_EXPORTER, TOPIC_EXPORTER_URN, TopicExporter};
366-
use crate::pdata::OtapPdata;
367-
use crate::testing::{TestCallData, create_test_pdata, next_ack};
368366
use otap_df_config::node::NodeUserConfig;
369367
use otap_df_config::topic::{TopicAckPropagationMode, TopicQueueOnFullPolicy};
370368
use otap_df_engine::Interests;
@@ -381,6 +379,8 @@ mod tests {
381379
PipelineTopicBinding, SubscriberOptions, SubscriptionMode, TopicBroadcastOnLagPolicy,
382380
TopicBroker, TopicOptions, TopicSet,
383381
};
382+
use otap_df_otap::pdata::OtapPdata;
383+
use otap_df_otap::testing::{TestCallData, create_test_pdata, next_ack};
384384
use otap_df_telemetry::reporter::MetricsReporter;
385385
use serde_json::json;
386386
use std::sync::Arc;

rust/otap-dataflow/crates/otap/src/internal_telemetry_receiver.rs renamed to rust/otap-dataflow/crates/core-nodes/src/receivers/internal_telemetry_receiver/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66
//! This receiver consumes internal logs from the logging channel and emits
77
//! the logs as OTLP ExportLogsRequest messages into the pipeline.
88
9-
use crate::OTAP_RECEIVER_FACTORIES;
10-
use crate::pdata::{Context, OtapPdata};
119
use async_trait::async_trait;
1210
use bytes::Bytes;
1311
use linkme::distributed_slice;
@@ -21,6 +19,8 @@ use otap_df_engine::local::receiver as local;
2119
use otap_df_engine::node::NodeId;
2220
use otap_df_engine::receiver::ReceiverWrapper;
2321
use otap_df_engine::terminal_state::TerminalState;
22+
use otap_df_otap::OTAP_RECEIVER_FACTORIES;
23+
use otap_df_otap::pdata::{Context, OtapPdata};
2424
use otap_df_pdata::OtlpProtoBytes;
2525
use otap_df_pdata::otlp::ProtoBuffer;
2626
use otap_df_telemetry::event::{LogEvent, ObservedEvent};

0 commit comments

Comments
 (0)