Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
82d4bff
feat: Add internal telemetry sdk exporter sending telemetry to an int…
andborja Dec 19, 2025
21c71f9
Merge branch 'main' into andborja/sdkToReceiver
andborja Dec 19, 2025
4271d29
Merge branch 'main' into andborja/sdkToReceiver
andborja Dec 19, 2025
dc1390d
Merge branch 'main' into andborja/sdkToReceiver
andborja Dec 19, 2025
4274a09
Implement clippy suggestions
andborja Dec 19, 2025
760e2ba
Merge branch 'main' into andborja/sdkToReceiver
andborja Dec 19, 2025
e99485f
Fix clippy findings
andborja Dec 19, 2025
f4b7648
Use OtapPdata as transport object for internal telemetry
andborja Dec 20, 2025
e278090
Merge branch 'main' into andborja/sdkToReceiver
andborja Dec 20, 2025
d9b9d04
Rename components from generic telemetry to logs specific
andborja Dec 20, 2025
5ad7814
Merge branch 'main' into andborja/sdkToReceiver
andborja Dec 22, 2025
0fb22b8
Add configuration option to internal logs exporter.
andborja Dec 22, 2025
4cf4e25
Merge branch 'main' into andborja/sdkToReceiver
andborja Dec 22, 2025
ecefcc8
Merge branch 'main' into andborja/sdkToReceiver
andborja Dec 22, 2025
44385fc
Merge branch 'main' into andborja/sdkToReceiver
andborja Dec 26, 2025
3afa613
Merge branch 'main' into andborja/sdkToReceiver
andborja Dec 29, 2025
c1eb9eb
Merge branch 'main' into andborja/sdkToReceiver
andborja Dec 30, 2025
1a56042
Merge branch 'main' into andborja/sdkToReceiver
andborja Dec 31, 2025
aabe4ed
Fix merge issues
andborja Dec 31, 2025
a64b964
Use flume instead of crossbeam for channels
andborja Dec 31, 2025
5a39615
Merge branch 'main' into andborja/sdkToReceiver
andborja Jan 3, 2026
dc8c801
Fix merge issues
andborja Jan 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/otap-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ tikv-jemallocator = { workspace = true, optional = true }
[workspace.dependencies]
otap-df-pdata-otlp-macros = { path = "./crates/pdata/src/otlp/macros"}
otap-df-pdata-otlp-model = { path = "./crates/pdata/src/otlp/model"}
otap-df-pdata = { path = "./crates/pdata" }
otap-df-config = { path = "crates/config" }
otap-df-controller = { path = "crates/controller" }
otap-df-otap = { path = "crates/otap" }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
settings:
default_pipeline_ctrl_msg_channel_size: 100
default_node_ctrl_msg_channel_size: 100
default_pdata_channel_size: 100

nodes:
receiver_internal_telemetry:
kind: receiver
plugin_urn: "urn:otel:otap:internal_logs:receiver"
out_ports:
out_port:
destinations:
- noop
dispatch_strategy: round_robin
config: {}
receiver:
kind: receiver
plugin_urn: "urn:otel:otap:fake_data_generator:receiver"
out_ports:
out_port:
destinations:
- debug
dispatch_strategy: round_robin
config:
traffic_config:
max_signal_count: 1000
max_batch_size: 1000
signals_per_second: 1000
log_weight: 100
registry_path: https://github.com/open-telemetry/semantic-conventions.git[model]
debug:
kind: processor
plugin_urn: "urn:otel:debug:processor"
out_ports:
out_port:
destinations:
- noop
dispatch_strategy: round_robin
config:
verbosity: basic
noop:
kind: exporter
plugin_urn: "urn:otel:noop:exporter"
config:

service:
telemetry:
logs:
level: "info"
processors:
- batch:
exporter:
internal:
resource:
service.name: "fake-debug-noop-service"
7 changes: 7 additions & 0 deletions rust/otap-dataflow/crates/config/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ pub enum Error {
/// The id of the pipeline that was duplicated.
pipeline_id: PipelineId,
},

/// An internal error occurred.
#[error("An internal error occurred: {details}")]
InternalError {
/// A description of the internal error that occurred.
details: String,
},
}

/// Information that all errors provide to help identify
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ mod tests {
match config {
LogProcessorConfig::Batch(batch_config) => match batch_config.exporter {
LogBatchProcessorExporterConfig::Console => {}
LogBatchProcessorExporterConfig::Otlp(_) => {
panic!("Expected Console exporter, got OTLP.");
_ => {
panic!("Expected Console exporter.");
}
},
}
Expand All @@ -96,10 +96,28 @@ mod tests {
let config: LogProcessorConfig = serde_yaml::from_str(yaml_str).unwrap();
match config {
LogProcessorConfig::Batch(batch_config) => match batch_config.exporter {
LogBatchProcessorExporterConfig::Console => {
panic!("Expected OTLP exporter, got Console.");
}
LogBatchProcessorExporterConfig::Otlp(_) => {}
_ => {
panic!("Expected OTLP exporter.");
}
},
}
}

#[test]
fn test_log_processor_config_internal_deserialize() {
let yaml_str = r#"
batch:
exporter:
internal:
"#;
let config: LogProcessorConfig = serde_yaml::from_str(yaml_str).unwrap();
match config {
LogProcessorConfig::Batch(batch_config) => match batch_config.exporter {
LogBatchProcessorExporterConfig::Internal => {}
_ => {
panic!("Expected Internal exporter.");
}
},
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ pub enum LogBatchProcessorExporterConfig {

/// OTLP log exporter
Otlp(OtlpExporterConfig),

/// Internal log exporter
Internal,
}

impl<'de> Deserialize<'de> for LogBatchProcessorExporterConfig {
Expand Down Expand Up @@ -54,7 +57,15 @@ impl<'de> Deserialize<'de> for LogBatchProcessorExporterConfig {
let otlp_config: OtlpExporterConfig = map.next_value()?;
Ok(LogBatchProcessorExporterConfig::Otlp(otlp_config))
}
_ => Err(de::Error::unknown_variant(&key, &["console", "otlp"])),
"internal" => {
// Internal exporter has no configuration, just consume the value (empty or null)
let _: de::IgnoredAny = map.next_value()?;
Ok(LogBatchProcessorExporterConfig::Internal)
}
_ => Err(de::Error::unknown_variant(
&key,
&["console", "otlp", "internal"],
)),
}
}
}
Expand Down Expand Up @@ -93,4 +104,17 @@ mod tests {
};
Ok(())
}

#[test]
fn test_log_batch_processor_internal_exporter_config_deserialize()
-> Result<(), serde_yaml::Error> {
let yaml_str = r#"
internal:
"#;
let config: LogBatchProcessorExporterConfig = serde_yaml::from_str(yaml_str)?;
let LogBatchProcessorExporterConfig::Internal = config else {
panic!("Expected Internal exporter config");
};
Ok(())
}
}
1 change: 1 addition & 0 deletions rust/otap-dataflow/crates/controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ miette = { workspace = true }
core_affinity = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
flume = { workspace = true }
21 changes: 14 additions & 7 deletions rust/otap-dataflow/crates/controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,12 @@ impl<PData: 'static + Clone + Send + Sync + std::fmt::Debug> Controller<PData> {
node_ctrl_msg_channel_size = settings.default_node_ctrl_msg_channel_size,
pipeline_ctrl_msg_channel_size = settings.default_pipeline_ctrl_msg_channel_size
);
let opentelemetry_client = OpentelemetryClient::new(telemetry_config)?;

let (internal_logs_sender, internal_logs_receiver) =
flume::bounded(settings.default_pipeline_ctrl_msg_channel_size);

let opentelemetry_client =
OpentelemetryClient::new(telemetry_config, internal_logs_sender)?;
let metrics_system = MetricsSystem::new(telemetry_config);
let metrics_dispatcher = metrics_system.dispatcher();
let metrics_reporter = metrics_system.reporter();
Expand Down Expand Up @@ -141,12 +146,14 @@ impl<PData: 'static + Clone + Send + Sync + std::fmt::Debug> Controller<PData> {

let pipeline_config = pipeline.clone();
let pipeline_factory = self.pipeline_factory;
let pipeline_handle = controller_ctx.pipeline_context_with(
pipeline_group_id.clone(),
pipeline_id.clone(),
core_id.id,
thread_id,
);
let pipeline_handle = controller_ctx
.pipeline_context_with(
pipeline_group_id.clone(),
pipeline_id.clone(),
core_id.id,
thread_id,
)
.with_internal_logs_receiver(internal_logs_receiver.clone());
let metrics_reporter = metrics_reporter.clone();

let thread_name = format!("pipeline-core-{}", core_id.id);
Expand Down
51 changes: 51 additions & 0 deletions rust/otap-dataflow/crates/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::attributes::{
};
use otap_df_config::node::NodeKind;
use otap_df_config::{NodeId, NodeUrn, PipelineGroupId, PipelineId};
use otap_df_pdata::OtapPayload;
use otap_df_telemetry::metrics::{MetricSet, MetricSetHandler};
use otap_df_telemetry::registry::MetricsRegistryHandle;
use std::fmt::Debug;
Expand Down Expand Up @@ -80,6 +81,8 @@ static HOST_ID: LazyLock<Cow<'static, str>> =
static CONTAINER_ID: LazyLock<Cow<'static, str>> =
LazyLock::new(|| detect_container_id().map_or(Cow::Borrowed(""), Cow::Owned));

type InternalLogReceiver = flume::Receiver<OtapPayload>;

/// A lightweight/cloneable controller context.
#[derive(Clone, Debug)]
pub struct ControllerContext {
Expand All @@ -101,6 +104,7 @@ pub struct PipelineContext {
node_id: NodeId,
node_urn: NodeUrn,
node_kind: NodeKind,
internal_logs_receiver: Option<InternalLogReceiver>,
}

impl ControllerContext {
Expand Down Expand Up @@ -153,6 +157,7 @@ impl PipelineContext {
node_id: Default::default(),
node_urn: Default::default(),
node_kind: Default::default(),
internal_logs_receiver: None,
}
}

Expand Down Expand Up @@ -251,6 +256,52 @@ impl PipelineContext {
node_id,
node_urn,
node_kind,
internal_logs_receiver: self.internal_logs_receiver.clone(),
}
}

/// Returns a new pipeline context with the given internal telemetry notifier handle.
#[must_use]
pub fn with_internal_logs_receiver(&mut self, logs_receiver: InternalLogReceiver) -> Self {
Self {
controller_context: self.controller_context.clone(),
core_id: self.core_id,
thread_id: self.thread_id,
pipeline_group_id: self.pipeline_group_id.clone(),
pipeline_id: self.pipeline_id.clone(),
node_id: self.node_id.clone(),
node_urn: self.node_urn.clone(),
node_kind: self.node_kind,
internal_logs_receiver: Some(logs_receiver),
}
}

/// Returns the internal logs receiver, if any.
#[must_use]
pub fn internal_logs_receiver(&self) -> Option<InternalLogReceiver> {
self.internal_logs_receiver.clone()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_with_internal_logs_receiver() {
let controller_ctx = ControllerContext::new(MetricsRegistryHandle::default());

let (_internal_logs_sender, internal_logs_receiver) = flume::unbounded();

let pipeline_ctx = controller_ctx
.pipeline_context_with(
"test_pipeline_group_id".into(),
"test_pipeline_id".into(),
0,
0,
)
.with_internal_logs_receiver(internal_logs_receiver);

assert!(pipeline_ctx.internal_logs_receiver().is_some());
}
}
2 changes: 1 addition & 1 deletion rust/otap-dataflow/crates/otap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ arrow.workspace = true
arrow-ipc.workspace = true
async-trait.workspace = true
ciborium.workspace = true
flume.workspace = true
futures.workspace = true
futures-timer.workspace = true
http.workspace = true
Expand Down Expand Up @@ -107,7 +108,6 @@ experimental-processors = []
condense-attributes-processor = ["experimental-processors"]

[dev-dependencies]
flume.workspace = true
portpicker.workspace = true
pretty_assertions.workspace = true
rcgen.workspace = true
Expand Down
Loading
Loading