diff --git a/rust/otap-dataflow/Cargo.toml b/rust/otap-dataflow/Cargo.toml index 06a284e10..eddc037e9 100644 --- a/rust/otap-dataflow/Cargo.toml +++ b/rust/otap-dataflow/Cargo.toml @@ -47,6 +47,7 @@ tikv-jemallocator = { workspace = true, optional = true } [workspace.dependencies] otap-df-pdata-otlp-macros = { path = "./crates/pdata/src/otlp/macros"} otap-df-pdata-otlp-model = { path = "./crates/pdata/src/otlp/model"} +otap-df-pdata = { path = "./crates/pdata" } otap-df-config = { path = "crates/config" } otap-df-controller = { path = "crates/controller" } otap-df-otap = { path = "crates/otap" } diff --git a/rust/otap-dataflow/configs/fake-debug-noop-internal-telemetry.yaml b/rust/otap-dataflow/configs/fake-debug-noop-internal-telemetry.yaml new file mode 100644 index 000000000..964bbc538 --- /dev/null +++ b/rust/otap-dataflow/configs/fake-debug-noop-internal-telemetry.yaml @@ -0,0 +1,55 @@ +settings: + default_pipeline_ctrl_msg_channel_size: 100 + default_node_ctrl_msg_channel_size: 100 + default_pdata_channel_size: 100 + +nodes: + receiver_internal_telemetry: + kind: receiver + plugin_urn: "urn:otel:otap:internal_logs:receiver" + out_ports: + out_port: + destinations: + - noop + dispatch_strategy: round_robin + config: {} + receiver: + kind: receiver + plugin_urn: "urn:otel:otap:fake_data_generator:receiver" + out_ports: + out_port: + destinations: + - debug + dispatch_strategy: round_robin + config: + traffic_config: + max_signal_count: 1000 + max_batch_size: 1000 + signals_per_second: 1000 + log_weight: 100 + registry_path: https://github.com/open-telemetry/semantic-conventions.git[model] + debug: + kind: processor + plugin_urn: "urn:otel:debug:processor" + out_ports: + out_port: + destinations: + - noop + dispatch_strategy: round_robin + config: + verbosity: basic + noop: + kind: exporter + plugin_urn: "urn:otel:noop:exporter" + config: + +service: + telemetry: + logs: + level: "info" + processors: + - batch: + exporter: + internal: + resource: + service.name: "fake-debug-noop-service" diff --git a/rust/otap-dataflow/crates/config/src/error.rs b/rust/otap-dataflow/crates/config/src/error.rs index 4b0f378f2..894e12043 100644 --- a/rust/otap-dataflow/crates/config/src/error.rs +++ b/rust/otap-dataflow/crates/config/src/error.rs @@ -118,6 +118,13 @@ pub enum Error { /// The id of the pipeline that was duplicated. pipeline_id: PipelineId, }, + + /// An internal error occurred. + #[error("An internal error occurred: {details}")] + InternalError { + /// A description of the internal error that occurred. + details: String, + }, } /// Information that all errors provide to help identify diff --git a/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry/logs/processors.rs b/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry/logs/processors.rs index bf89e0e81..3a5648753 100644 --- a/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry/logs/processors.rs +++ b/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry/logs/processors.rs @@ -78,8 +78,8 @@ mod tests { match config { LogProcessorConfig::Batch(batch_config) => match batch_config.exporter { LogBatchProcessorExporterConfig::Console => {} - LogBatchProcessorExporterConfig::Otlp(_) => { - panic!("Expected Console exporter, got OTLP."); + _ => { + panic!("Expected Console exporter."); } }, } @@ -96,10 +96,28 @@ mod tests { let config: LogProcessorConfig = serde_yaml::from_str(yaml_str).unwrap(); match config { LogProcessorConfig::Batch(batch_config) => match batch_config.exporter { - LogBatchProcessorExporterConfig::Console => { - panic!("Expected OTLP exporter, got Console."); - } LogBatchProcessorExporterConfig::Otlp(_) => {} + _ => { + panic!("Expected OTLP exporter."); + } + }, + } + } + + #[test] + fn test_log_processor_config_internal_deserialize() { + let yaml_str = r#" + batch: + exporter: + internal: + "#; + let config: LogProcessorConfig = serde_yaml::from_str(yaml_str).unwrap(); + match config { + LogProcessorConfig::Batch(batch_config) => match batch_config.exporter { + LogBatchProcessorExporterConfig::Internal => {} + _ => { + panic!("Expected Internal exporter."); + } }, } } diff --git a/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry/logs/processors/batch.rs b/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry/logs/processors/batch.rs index d597f0f2b..23e408cca 100644 --- a/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry/logs/processors/batch.rs +++ b/rust/otap-dataflow/crates/config/src/pipeline/service/telemetry/logs/processors/batch.rs @@ -18,6 +18,9 @@ pub enum LogBatchProcessorExporterConfig { /// OTLP log exporter Otlp(OtlpExporterConfig), + + /// Internal log exporter + Internal, } impl<'de> Deserialize<'de> for LogBatchProcessorExporterConfig { @@ -54,7 +57,15 @@ impl<'de> Deserialize<'de> for LogBatchProcessorExporterConfig { let otlp_config: OtlpExporterConfig = map.next_value()?; Ok(LogBatchProcessorExporterConfig::Otlp(otlp_config)) } - _ => Err(de::Error::unknown_variant(&key, &["console", "otlp"])), + "internal" => { + // Internal exporter has no configuration, just consume the value (empty or null) + let _: de::IgnoredAny = map.next_value()?; + Ok(LogBatchProcessorExporterConfig::Internal) + } + _ => Err(de::Error::unknown_variant( + &key, + &["console", "otlp", "internal"], + )), } } } @@ -93,4 +104,17 @@ mod tests { }; Ok(()) } + + #[test] + fn test_log_batch_processor_internal_exporter_config_deserialize() + -> Result<(), serde_yaml::Error> { + let yaml_str = r#" + internal: + "#; + let config: LogBatchProcessorExporterConfig = serde_yaml::from_str(yaml_str)?; + let LogBatchProcessorExporterConfig::Internal = config else { + panic!("Expected Internal exporter config"); + }; + Ok(()) + } } diff --git a/rust/otap-dataflow/crates/controller/Cargo.toml b/rust/otap-dataflow/crates/controller/Cargo.toml index ad214acc0..47ce14f41 100644 --- a/rust/otap-dataflow/crates/controller/Cargo.toml +++ b/rust/otap-dataflow/crates/controller/Cargo.toml @@ -27,3 +27,4 @@ miette = { workspace = true } core_affinity = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } +flume = { workspace = true } diff --git a/rust/otap-dataflow/crates/controller/src/lib.rs b/rust/otap-dataflow/crates/controller/src/lib.rs index 7a5294e82..99e220e03 100644 --- a/rust/otap-dataflow/crates/controller/src/lib.rs +++ b/rust/otap-dataflow/crates/controller/src/lib.rs @@ -83,7 +83,12 @@ impl Controller { node_ctrl_msg_channel_size = settings.default_node_ctrl_msg_channel_size, pipeline_ctrl_msg_channel_size = settings.default_pipeline_ctrl_msg_channel_size ); - let opentelemetry_client = OpentelemetryClient::new(telemetry_config)?; + + let (internal_logs_sender, internal_logs_receiver) = + flume::bounded(settings.default_pipeline_ctrl_msg_channel_size); + + let opentelemetry_client = + OpentelemetryClient::new(telemetry_config, internal_logs_sender)?; let metrics_system = MetricsSystem::new(telemetry_config); let metrics_dispatcher = metrics_system.dispatcher(); let metrics_reporter = metrics_system.reporter(); @@ -141,12 +146,14 @@ impl Controller { let pipeline_config = pipeline.clone(); let pipeline_factory = self.pipeline_factory; - let pipeline_handle = controller_ctx.pipeline_context_with( - pipeline_group_id.clone(), - pipeline_id.clone(), - core_id.id, - thread_id, - ); + let pipeline_handle = controller_ctx + .pipeline_context_with( + pipeline_group_id.clone(), + pipeline_id.clone(), + core_id.id, + thread_id, + ) + .with_internal_logs_receiver(internal_logs_receiver.clone()); let metrics_reporter = metrics_reporter.clone(); let thread_name = format!("pipeline-core-{}", core_id.id); diff --git a/rust/otap-dataflow/crates/engine/src/context.rs b/rust/otap-dataflow/crates/engine/src/context.rs index b3e5814d0..8d275f6b8 100644 --- a/rust/otap-dataflow/crates/engine/src/context.rs +++ b/rust/otap-dataflow/crates/engine/src/context.rs @@ -8,6 +8,7 @@ use crate::attributes::{ }; use otap_df_config::node::NodeKind; use otap_df_config::{NodeId, NodeUrn, PipelineGroupId, PipelineId}; +use otap_df_pdata::OtapPayload; use otap_df_telemetry::metrics::{MetricSet, MetricSetHandler}; use otap_df_telemetry::registry::MetricsRegistryHandle; use std::fmt::Debug; @@ -80,6 +81,8 @@ static HOST_ID: LazyLock> = static CONTAINER_ID: LazyLock> = LazyLock::new(|| detect_container_id().map_or(Cow::Borrowed(""), Cow::Owned)); +type InternalLogReceiver = flume::Receiver; + /// A lightweight/cloneable controller context. #[derive(Clone, Debug)] pub struct ControllerContext { @@ -101,6 +104,7 @@ pub struct PipelineContext { node_id: NodeId, node_urn: NodeUrn, node_kind: NodeKind, + internal_logs_receiver: Option, } impl ControllerContext { @@ -153,6 +157,7 @@ impl PipelineContext { node_id: Default::default(), node_urn: Default::default(), node_kind: Default::default(), + internal_logs_receiver: None, } } @@ -251,6 +256,52 @@ impl PipelineContext { node_id, node_urn, node_kind, + internal_logs_receiver: self.internal_logs_receiver.clone(), } } + + /// Returns a new pipeline context with the given internal telemetry notifier handle. + #[must_use] + pub fn with_internal_logs_receiver(&mut self, logs_receiver: InternalLogReceiver) -> Self { + Self { + controller_context: self.controller_context.clone(), + core_id: self.core_id, + thread_id: self.thread_id, + pipeline_group_id: self.pipeline_group_id.clone(), + pipeline_id: self.pipeline_id.clone(), + node_id: self.node_id.clone(), + node_urn: self.node_urn.clone(), + node_kind: self.node_kind, + internal_logs_receiver: Some(logs_receiver), + } + } + + /// Returns the internal logs receiver, if any. + #[must_use] + pub fn internal_logs_receiver(&self) -> Option { + self.internal_logs_receiver.clone() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_with_internal_logs_receiver() { + let controller_ctx = ControllerContext::new(MetricsRegistryHandle::default()); + + let (_internal_logs_sender, internal_logs_receiver) = flume::unbounded(); + + let pipeline_ctx = controller_ctx + .pipeline_context_with( + "test_pipeline_group_id".into(), + "test_pipeline_id".into(), + 0, + 0, + ) + .with_internal_logs_receiver(internal_logs_receiver); + + assert!(pipeline_ctx.internal_logs_receiver().is_some()); + } } diff --git a/rust/otap-dataflow/crates/otap/Cargo.toml b/rust/otap-dataflow/crates/otap/Cargo.toml index ed8ebbd26..c7eb4782a 100644 --- a/rust/otap-dataflow/crates/otap/Cargo.toml +++ b/rust/otap-dataflow/crates/otap/Cargo.toml @@ -17,6 +17,7 @@ arrow.workspace = true arrow-ipc.workspace = true async-trait.workspace = true ciborium.workspace = true +flume.workspace = true futures.workspace = true futures-timer.workspace = true http.workspace = true @@ -107,7 +108,6 @@ experimental-processors = [] condense-attributes-processor = ["experimental-processors"] [dev-dependencies] -flume.workspace = true portpicker.workspace = true pretty_assertions.workspace = true rcgen.workspace = true diff --git a/rust/otap-dataflow/crates/otap/src/internal_logs_receiver.rs b/rust/otap-dataflow/crates/otap/src/internal_logs_receiver.rs new file mode 100644 index 000000000..00ed98b93 --- /dev/null +++ b/rust/otap-dataflow/crates/otap/src/internal_logs_receiver.rs @@ -0,0 +1,201 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! An internal logs receiver. +//! This receiver is used to receive internal logs data from the OpenTelemetry SDK +//! and forward it to the pipeline engine for processing. + +use crate::OTAP_RECEIVER_FACTORIES; +use crate::pdata::OtapPdata; +use async_trait::async_trait; +use linkme::distributed_slice; +use otap_df_config::node::NodeUserConfig; +use otap_df_engine::ReceiverFactory; +use otap_df_engine::config::ReceiverConfig; +use otap_df_engine::context::PipelineContext; +use otap_df_engine::error::{Error, ReceiverErrorKind}; +use otap_df_engine::local::receiver as local; +use otap_df_engine::node::NodeId; +use otap_df_engine::receiver::ReceiverWrapper; +use otap_df_engine::terminal_state::TerminalState; +use otap_df_pdata::OtapPayload; +use serde_json::Value; +use std::sync::Arc; + +/// The URN for the internal logs receiver +pub const INTERNAL_LOGS_RECEIVER_URN: &str = "urn:otel:otap:internal_logs:receiver"; + +type InternalLogReceiver = flume::Receiver; + +/// A Receiver that receives internal logs data. +pub struct InternalLogsReceiver { + /// Configuration for the internal logs receiver + #[allow(dead_code)] + config: Config, + + /// The channel to receive internal logs data from the OpenTelemetry SDK. + internal_logs_receiver: InternalLogReceiver, + + /// The node id of this receiver + node_id: NodeId, +} + +/// Declares the internal logs receiver as a local receiver factory +#[allow(unsafe_code)] +#[distributed_slice(OTAP_RECEIVER_FACTORIES)] +pub static INTERNAL_LOGS_RECEIVER: ReceiverFactory = ReceiverFactory { + name: INTERNAL_LOGS_RECEIVER_URN, + create: |pipeline: PipelineContext, + node: NodeId, + node_config: Arc, + receiver_config: &ReceiverConfig| { + Ok(ReceiverWrapper::local( + InternalLogsReceiver::from_config(pipeline, &node_config.config, node.clone())?, + node, + node_config, + receiver_config, + )) + }, +}; + +impl InternalLogsReceiver { + /// creates a new InternalLogsReceiver + /// TODO: Fail if more than one instance is created, as the internal logs channel should + /// have only one receiver instance (it can be multiple replicas). + /// We can do this validation during configuration time. + pub fn new( + pipeline_ctx: PipelineContext, + config: Config, + node_id: NodeId, + ) -> Result { + let internal_logs_receiver = pipeline_ctx + .internal_logs_receiver() + .ok_or_else(|| otap_df_config::error::Error::InternalError { + details: "Internal logs receiver channel not configured in pipeline context" + .to_string(), + })? + .clone(); + Ok(Self { + config, + internal_logs_receiver, + node_id, + }) + } + + /// Creates a new internal logs receiver from a configuration object + pub fn from_config( + pipeline_ctx: PipelineContext, + config: &Value, + node_id: NodeId, + ) -> Result { + let config: Config = serde_json::from_value(config.clone()).map_err(|e| { + otap_df_config::error::Error::InvalidUserConfig { + error: e.to_string(), + } + })?; + InternalLogsReceiver::new(pipeline_ctx, config, node_id) + } +} + +/// Implement the Receiver trait for the InternalLogsReceiver +#[async_trait(?Send)] +impl local::Receiver for InternalLogsReceiver { + #[allow(clippy::print_stdout)] // Keeping prints for demonstration purposes + async fn start( + mut self: Box, + mut _ctrl_msg_recv: local::ControlChannel, + _effect_handler: local::EffectHandler, + ) -> Result { + let receiver = &self.internal_logs_receiver; + loop { + tokio::select! { + result = tokio::task::spawn_blocking({ + let receiver = receiver.clone(); + move || receiver.recv() + }) => { + match result { + Ok(Ok(logs_data)) => { + println!("The InternalLogsReceiver received a logs data batch with {:?} log records. Node name: '{}'", logs_data, self.node_id); + //TODO: Send the received logs data to the next consumers through the effect handler + // Make sure no new internal logs data is produced for this entire pipeline. + //effect_handler.send_data(pdata).await?; + } + Ok(Err(e)) => { + return Err(Error::ReceiverError { + receiver: self.node_id.clone(), + kind: ReceiverErrorKind::Connect, + error: "There was a problem receiving logs data".to_string(), + source_detail: e.to_string(), + }); + } + Err(e) => { + return Err(Error::ReceiverError { + receiver: self.node_id.clone(), + kind: ReceiverErrorKind::Connect, + error: "Spawn blocking task failed".to_string(), + source_detail: e.to_string(), + }); + } + } + } + } + } + } +} + +/// Configuration for the internal logs receiver +#[derive(serde::Deserialize)] +pub struct Config {} + +#[cfg(test)] +mod tests { + use otap_df_config::{PipelineGroupId, PipelineId}; + use otap_df_engine::context::ControllerContext; + use otap_df_telemetry::registry::MetricsRegistryHandle; + + use super::*; + + #[test] + fn test_internal_logs_receiver_from_config() -> Result<(), otap_df_config::error::Error> { + let metrics_handle = MetricsRegistryHandle::new(); + let controller_context = ControllerContext::new(metrics_handle); + let pipeline_group_id: PipelineGroupId = "test_group".into(); + let pipeline_id: PipelineId = "test_pipeline".into(); + + let (_internal_logs_sender, internal_logs_receiver) = flume::unbounded::(); + let pipeline_ctx = controller_context + .pipeline_context_with(pipeline_group_id, pipeline_id, 0, 0) + .with_internal_logs_receiver(internal_logs_receiver); + let node_id = NodeId { + name: "test_node".into(), + index: 0, + }; + let config_value = serde_json::json!({}); + let receiver = + InternalLogsReceiver::from_config(pipeline_ctx, &config_value, node_id.clone())?; + drop(receiver); // Just testing creation + Ok(()) + } + + #[test] + fn test_internal_logs_receiver_new_with_no_channel() { + let metrics_handle = MetricsRegistryHandle::new(); + let controller_context = ControllerContext::new(metrics_handle); + let pipeline_group_id: PipelineGroupId = "test_group".into(); + let pipeline_id: PipelineId = "test_pipeline".into(); + + let pipeline_ctx = + controller_context.pipeline_context_with(pipeline_group_id, pipeline_id, 0, 0); + let node_id = NodeId { + name: "test_node".into(), + index: 0, + }; + let config = Config {}; + let result = InternalLogsReceiver::new(pipeline_ctx, config, node_id); + if let Err(otap_df_config::error::Error::InternalError { details }) = &result { + assert!(details.contains("channel"),); + } else { + panic!("Expected InternalError due to missing internal logs receiver channel"); + } + } +} diff --git a/rust/otap-dataflow/crates/otap/src/lib.rs b/rust/otap-dataflow/crates/otap/src/lib.rs index b08eb4efb..e8b67b7c9 100644 --- a/rust/otap-dataflow/crates/otap/src/lib.rs +++ b/rust/otap-dataflow/crates/otap/src/lib.rs @@ -58,6 +58,7 @@ pub mod error_exporter; feature = "experimental-processors" ))] pub mod experimental; +pub mod internal_logs_receiver; /// testing utilities #[cfg(test)] diff --git a/rust/otap-dataflow/crates/telemetry/Cargo.toml b/rust/otap-dataflow/crates/telemetry/Cargo.toml index dce25e697..6ee6556fe 100644 --- a/rust/otap-dataflow/crates/telemetry/Cargo.toml +++ b/rust/otap-dataflow/crates/telemetry/Cargo.toml @@ -21,6 +21,7 @@ unchecked-arithmetic = [] [dependencies] axum = { workspace = true } otap-df-config = { workspace = true } +otap-df-pdata = { workspace = true } flume = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } @@ -35,10 +36,12 @@ opentelemetry = { workspace = true } opentelemetry_sdk = { workspace = true } opentelemetry-stdout = { workspace = true } opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "metrics", "logs"] } -opentelemetry-prometheus = { workspace = true } +opentelemetry-proto = { workspace = true } opentelemetry-appender-tracing = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter","registry", "std", "fmt"] } +prost = { workspace = true } +opentelemetry-prometheus = { workspace = true } [dev-dependencies] tower = { workspace = true } diff --git a/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client.rs b/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client.rs index 272d65cbe..541418b41 100644 --- a/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client.rs +++ b/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client.rs @@ -11,12 +11,15 @@ use opentelemetry_sdk::{Resource, logs::SdkLoggerProvider, metrics::SdkMeterProv use otap_df_config::pipeline::service::telemetry::{ AttributeValue, AttributeValueArray, TelemetryConfig, }; +use otap_df_pdata::OtapPayload; use crate::{ error::Error, opentelemetry_client::{logger_provider::LoggerProvider, meter_provider::MeterProvider}, }; +type InternalLogSender = flume::Sender; + /// Client for the OpenTelemetry SDK. pub struct OpentelemetryClient { /// The tokio runtime used to run the OpenTelemetry SDK OTLP exporter. @@ -29,7 +32,10 @@ pub struct OpentelemetryClient { impl OpentelemetryClient { /// Create a new OpenTelemetry client from the given configuration. - pub fn new(config: &TelemetryConfig) -> Result { + pub fn new( + config: &TelemetryConfig, + internal_logs_sender: InternalLogSender, + ) -> Result { let sdk_resource = Self::configure_resource(&config.resource); let runtime = None; @@ -40,7 +46,12 @@ impl OpentelemetryClient { // Extract the meter provider and runtime by consuming the MeterProvider let (meter_provider, runtime) = meter_provider.into_parts(); - let logger_provider = LoggerProvider::configure(sdk_resource, &config.logs, runtime)?; + let logger_provider = LoggerProvider::configure( + sdk_resource, + &config.logs, + runtime, + internal_logs_sender.clone(), + )?; let (logger_provider, runtime) = logger_provider.into_parts(); @@ -140,7 +151,8 @@ mod tests { #[test] fn test_configure_minimal_opentelemetry_client() -> Result<(), Error> { let config = TelemetryConfig::default(); - let client = OpentelemetryClient::new(&config)?; + let sender = dummy_sender(); + let client = OpentelemetryClient::new(&config, sender)?; let meter = global::meter("test-meter"); let counter = meter.u64_counter("test-counter").build(); @@ -174,7 +186,8 @@ mod tests { logs: LogsConfig::default(), resource, }; - let client = OpentelemetryClient::new(&config)?; + let sender = dummy_sender(); + let client = OpentelemetryClient::new(&config, sender)?; let meter = global::meter("test-meter"); let counter = meter.u64_counter("test-counter").build(); @@ -217,4 +230,9 @@ mod tests { opentelemetry::Value::Array(opentelemetry::Array::I64(vec![1, 2, 3])) ); } + + /// Returns a dummy internal logs sender for testing. + fn dummy_sender() -> InternalLogSender { + flume::unbounded().0 + } } diff --git a/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/logger_provider.rs b/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/logger_provider.rs index d14dc84a3..1a0162cd0 100644 --- a/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/logger_provider.rs +++ b/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/logger_provider.rs @@ -3,6 +3,8 @@ //! Configures the OpenTelemetry logger provider based on the provided configuration. +pub mod internal_exporter; + use opentelemetry_appender_tracing::layer; use opentelemetry_otlp::{Protocol, WithExportConfig}; use opentelemetry_sdk::{Resource, logs::SdkLoggerProvider}; @@ -20,7 +22,7 @@ use tracing::level_filters::LevelFilter; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::{EnvFilter, layer::SubscriberExt}; -use crate::error::Error; +use crate::{error::Error, opentelemetry_client::InternalLogSender}; /// Provider for configuring OpenTelemetry Logger. pub struct LoggerProvider { @@ -66,6 +68,7 @@ impl LoggerProvider { sdk_resource: Resource, logger_config: &LogsConfig, initial_runtime: Option, + internal_logs_sender: InternalLogSender, ) -> Result { let mut sdk_logger_builder = SdkLoggerProvider::builder().with_resource(sdk_resource); @@ -74,8 +77,12 @@ impl LoggerProvider { let log_processors = &logger_config.processors; for processor in log_processors { - (sdk_logger_builder, runtime) = - Self::configure_log_processor(sdk_logger_builder, processor, runtime)?; + (sdk_logger_builder, runtime) = Self::configure_log_processor( + sdk_logger_builder, + processor, + runtime, + internal_logs_sender.clone(), + )?; } let sdk_logger_provider = sdk_logger_builder.build(); @@ -124,6 +131,7 @@ impl LoggerProvider { sdk_logger_builder: opentelemetry_sdk::logs::LoggerProviderBuilder, processor_config: &otap_df_config::pipeline::service::telemetry::logs::processors::LogProcessorConfig, runtime: Option, + internal_logs_sender: InternalLogSender, ) -> Result< ( opentelemetry_sdk::logs::LoggerProviderBuilder, @@ -139,6 +147,7 @@ impl LoggerProvider { sdk_logger_builder, batch_config, runtime, + internal_logs_sender, ) } } @@ -148,6 +157,7 @@ impl LoggerProvider { mut sdk_logger_builder: opentelemetry_sdk::logs::LoggerProviderBuilder, batch_config: &BatchLogProcessorConfig, mut runtime: Option, + internal_logs_sender: InternalLogSender, ) -> Result< ( opentelemetry_sdk::logs::LoggerProviderBuilder, @@ -165,6 +175,12 @@ impl LoggerProvider { sdk_logger_builder = builder; runtime = rt; } + LogBatchProcessorExporterConfig::Internal => { + sdk_logger_builder = Self::configure_internal_logs_exporter( + sdk_logger_builder, + internal_logs_sender, + )? + } } Ok((sdk_logger_builder, runtime)) } @@ -245,6 +261,15 @@ impl LoggerProvider { .map_err(|e| Error::ConfigurationError(e.to_string()))?; Ok(exporter) } + + fn configure_internal_logs_exporter( + mut sdk_logger_builder: opentelemetry_sdk::logs::LoggerProviderBuilder, + internal_logs_sender: InternalLogSender, + ) -> Result { + let exporter = internal_exporter::InternalLogsExporter::new(internal_logs_sender); + sdk_logger_builder = sdk_logger_builder.with_batch_exporter(exporter); + Ok(sdk_logger_builder) + } } #[cfg(test)] @@ -265,7 +290,8 @@ mod tests { ), ], }; - let logger_provider = LoggerProvider::configure(resource, &logger_config, None)?; + let sender = dummy_sender(); + let logger_provider = LoggerProvider::configure(resource, &logger_config, None, sender)?; let (sdk_logger_provider, _) = logger_provider.into_parts(); emit_log(); @@ -293,7 +319,8 @@ mod tests { ), ], }; - let logger_provider = LoggerProvider::configure(resource, &logger_config, None)?; + let sender = dummy_sender(); + let logger_provider = LoggerProvider::configure(resource, &logger_config, None, sender)?; let (sdk_logger_provider, runtime_option) = logger_provider.into_parts(); assert!(runtime_option.is_some()); @@ -312,7 +339,8 @@ mod tests { level: LogLevel::default(), processors: vec![], }; - let logger_provider = LoggerProvider::configure(resource, &logger_config, None)?; + let sender = dummy_sender(); + let logger_provider = LoggerProvider::configure(resource, &logger_config, None, sender)?; let (sdk_logger_provider, _) = logger_provider.into_parts(); emit_log(); @@ -347,4 +375,9 @@ mod tests { fn emit_log() { error!(name: "my-event-name", target: "my-system", event_id = 20, user_name = "otel", user_email = "otel@opentelemetry.io"); } + + /// Returns a dummy internal logs sender for testing. + fn dummy_sender() -> InternalLogSender { + flume::unbounded().0 + } } diff --git a/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/logger_provider/internal_exporter.rs b/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/logger_provider/internal_exporter.rs new file mode 100644 index 000000000..2545ddad8 --- /dev/null +++ b/rust/otap-dataflow/crates/telemetry/src/opentelemetry_client/logger_provider/internal_exporter.rs @@ -0,0 +1,327 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Opentelemetry SDK logs exporter of internal telemetry logs. + +use std::time::SystemTime; + +use opentelemetry::logs::Severity as SdkSeverity; +use opentelemetry_proto::tonic::common::v1::KeyValue as OtlpKeyValue; +use opentelemetry_proto::tonic::common::v1::{AnyValue, InstrumentationScope}; +use opentelemetry_proto::tonic::logs::v1::ResourceLogs as OtlpResourceLogs; +use opentelemetry_proto::tonic::logs::v1::ScopeLogs as OtlpScopeLogs; +use opentelemetry_proto::tonic::logs::v1::{LogRecord, LogsData}; +use opentelemetry_proto::tonic::resource::v1::Resource as OtlpResource; +use opentelemetry_sdk::Resource as SdkResource; +use opentelemetry_sdk::logs::LogBatch as SdkLogBatch; +use opentelemetry_sdk::{error::OTelSdkResult, logs::LogExporter}; +use otap_df_pdata::{OtapPayload, OtlpProtoBytes}; +use prost::Message; +use prost::bytes::BytesMut; + +use crate::opentelemetry_client::InternalLogSender; + +/// An OpenTelemetry log exporter that sends internal logs to the pipeline engine. +#[derive(Debug)] +pub struct InternalLogsExporter { + internal_logs_sender: InternalLogSender, + sdk_resource: Option, +} + +impl LogExporter for InternalLogsExporter { + fn export(&self, batch: SdkLogBatch<'_>) -> impl Future + Send { + let otap_data = self.convert_sdk_logs_batch_to_otap_data(batch); + let internal_logs_sender = self.internal_logs_sender.clone(); + + async move { + // Push the logs_data to the internal telemetry receiver though its channel. + // It can be a different object to be sent instead of the proto LogsData. + let _ = internal_logs_sender.try_send(otap_data); + // Ignore if there is an error as there might not be any receiver configured to receive internal telemetry data. + Ok(()) + } + } + + fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { + self.sdk_resource = Some(resource.clone()); + } +} + +impl InternalLogsExporter { + /// Creates a new instance of the InternalLogsExporter. + /// internal_logs_sender: The channel sender to send internal logs to the pipeline engine. + #[must_use] + pub fn new(internal_logs_sender: InternalLogSender) -> Self { + InternalLogsExporter { + internal_logs_sender, + sdk_resource: None, + } + } + + /// Converts an SDK LogBatch into OTLP LogsData format. + fn to_otlp_logs_data(&self, batch: SdkLogBatch<'_>) -> LogsData { + let mut scope_logs = Vec::new(); + + for (log_record, instrumentation_scope) in batch.iter() { + let time_unix_nano: u64 = log_record + .timestamp() + .unwrap_or_else(SystemTime::now) + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() as u64; + let observed_time_unix_nano: u64 = log_record + .observed_timestamp() + .unwrap_or_else(SystemTime::now) + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() as u64; + + let severity_number = log_record.severity_number().unwrap_or(SdkSeverity::Info) as i32; + let severity_text = log_record.severity_text().unwrap_or("INFO").to_string(); + let body: Option = + log_record.body().map(Self::convert_sdk_any_value_to_proto); + + let event_name: String = log_record.event_name().unwrap_or("").to_string(); + + let attributes: Vec = + Self::convert_sdk_attributes_to_proto(log_record.attributes_iter()); + + // Conversion logic from SdkLogRecord to LogRecord: + let scope_logs_instance = OtlpScopeLogs { + scope: Some(InstrumentationScope { + name: instrumentation_scope.name().into(), + version: instrumentation_scope.version().unwrap_or_default().into(), + attributes: attributes.clone(), + dropped_attributes_count: 0, + }), + log_records: vec![LogRecord { + time_unix_nano, + observed_time_unix_nano, + severity_number, + severity_text, + body, + attributes, + dropped_attributes_count: 0, + flags: 0, + trace_id: vec![], + span_id: vec![], + event_name, + }], + schema_url: String::new(), + }; + scope_logs.push(scope_logs_instance) + } + + let otlp_resource: Option = + Self::to_otlp_resource(self.sdk_resource.as_ref()); + + let resource_logs = OtlpResourceLogs { + resource: otlp_resource, + scope_logs, + schema_url: String::new(), + }; + LogsData { + resource_logs: vec![resource_logs], + } + } + + // Helper function to convert OpenTelemetry AnyValue to protobuf AnyValue + fn convert_sdk_any_value_to_proto(value: &opentelemetry::logs::AnyValue) -> AnyValue { + use opentelemetry_proto::tonic::common::v1::any_value::Value; + + let proto_value = match value { + opentelemetry::logs::AnyValue::String(s) => Value::StringValue(s.to_string()), + opentelemetry::logs::AnyValue::Int(i) => Value::IntValue(*i), + opentelemetry::logs::AnyValue::Double(d) => Value::DoubleValue(*d), + opentelemetry::logs::AnyValue::Boolean(b) => Value::BoolValue(*b), + opentelemetry::logs::AnyValue::Bytes(bytes) => Value::BytesValue(*bytes.clone()), + opentelemetry::logs::AnyValue::ListAny(list) => { + Value::ArrayValue(opentelemetry_proto::tonic::common::v1::ArrayValue { + values: list + .iter() + .map(Self::convert_sdk_any_value_to_proto) + .collect(), + }) + } + _ => { + // TODO: Complete. + // Handle any other variants by defaulting to an empty string + Value::StringValue(String::new()) + } + }; + + AnyValue { + value: Some(proto_value), + } + } + + /// Helper function to convert SDK OpenTelemetry Value to protobuf AnyValue + fn convert_value_to_proto(value: &opentelemetry::Value) -> AnyValue { + use opentelemetry_proto::tonic::common::v1::any_value::Value as ProtoValue; + + let proto_value = match value { + opentelemetry::Value::Bool(b) => ProtoValue::BoolValue(*b), + opentelemetry::Value::I64(i) => ProtoValue::IntValue(*i), + opentelemetry::Value::F64(f) => ProtoValue::DoubleValue(*f), + opentelemetry::Value::String(s) => ProtoValue::StringValue(s.to_string()), + opentelemetry::Value::Array(arr) => { + let values = match arr { + opentelemetry::Array::Bool(v) => v + .iter() + .map(|b| AnyValue { + value: Some(ProtoValue::BoolValue(*b)), + }) + .collect(), + opentelemetry::Array::I64(v) => v + .iter() + .map(|i| AnyValue { + value: Some(ProtoValue::IntValue(*i)), + }) + .collect(), + opentelemetry::Array::F64(v) => v + .iter() + .map(|f| AnyValue { + value: Some(ProtoValue::DoubleValue(*f)), + }) + .collect(), + opentelemetry::Array::String(v) => v + .iter() + .map(|s| AnyValue { + value: Some(ProtoValue::StringValue(s.to_string())), + }) + .collect(), + _ => vec![], + }; + ProtoValue::ArrayValue(opentelemetry_proto::tonic::common::v1::ArrayValue { + values, + }) + } + _ => ProtoValue::StringValue(String::new()), + }; + + AnyValue { + value: Some(proto_value), + } + } + + /// Converts an SDK Resource into OTLP Resource format. + fn to_otlp_resource(resource: Option<&SdkResource>) -> Option { + resource.map(|res| { + let attributes = res + .iter() + .map( + |(key, value)| opentelemetry_proto::tonic::common::v1::KeyValue { + key: key.as_str().to_string(), + value: Some(Self::convert_value_to_proto(value)), + }, + ) + .collect(); + + OtlpResource { + attributes, + dropped_attributes_count: 0, + entity_refs: vec![], + } + }) + } + + /// Helper function to convert SDK attributes iterator to protobuf KeyValue vector + fn convert_sdk_attributes_to_proto<'a, I>(attributes_iter: I) -> Vec + where + I: Iterator, + { + attributes_iter + .map(|(key, value)| OtlpKeyValue { + key: key.as_str().to_string(), + value: Some(Self::convert_sdk_any_value_to_proto(value)), + }) + .collect() + } + + /// Converts an SDK LogBatch into OTAP payload format. + fn convert_sdk_logs_batch_to_otap_data(&self, batch: SdkLogBatch<'_>) -> OtapPayload { + let logs_data = self.to_otlp_logs_data(batch); + + let mut bytes = BytesMut::new(); + logs_data + .encode(&mut bytes) + .expect("Failed to encode LogsData"); + + let otlp_bytes = OtlpProtoBytes::ExportLogsRequest(bytes.into()); + OtapPayload::OtlpBytes(otlp_bytes) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use opentelemetry::KeyValue as SdkKeyValue; + use opentelemetry::logs::{AnyValue, LogRecord, Logger, LoggerProvider, Severity}; + use opentelemetry_sdk::Resource as SdkResource; + use opentelemetry_sdk::logs::SdkLoggerProvider; + + #[test] + fn test_internal_logs_exporter_export() { + let (internal_logs_sender, internal_logs_receiver) = flume::unbounded(); + let mut exporter = InternalLogsExporter::new(internal_logs_sender); + + let sdk_resource = SdkResource::builder() + .with_attributes(vec![ + SdkKeyValue::new("service.name", "test-service"), + SdkKeyValue::new("service.version", "1.0.0"), + ]) + .build(); + + exporter.set_resource(&sdk_resource); + + // Create a logger provider and logger + let logger_provider = SdkLoggerProvider::builder() + .with_resource(sdk_resource.clone()) + .with_batch_exporter(exporter) + .build(); + + let logger = logger_provider.logger("test-logger"); + + // Emit a log record + let mut log_record = logger.create_log_record(); + log_record.set_body(AnyValue::from("Test log message")); + log_record.set_severity_number(Severity::Info); + log_record.set_severity_text("INFO"); + log_record.set_event_name("test_event"); + logger.emit(log_record); + + // Receive the internal log payload + let received_payload = internal_logs_receiver + .recv() + .expect("Failed to receive internal log payload"); + match received_payload { + OtapPayload::OtlpBytes(OtlpProtoBytes::ExportLogsRequest(bytes)) => { + let logs_data = + LogsData::decode(bytes.as_ref()).expect("Failed to decode LogsData"); + assert_eq!(logs_data.resource_logs.len(), 1); + let resource_logs = &logs_data.resource_logs[0]; + assert_eq!(resource_logs.scope_logs.len(), 1); + let scope_logs = &resource_logs.scope_logs[0]; + assert_eq!(scope_logs.log_records.len(), 1); + let log_record = &scope_logs.log_records[0]; + assert_eq!(log_record.severity_text, "INFO"); + assert_eq!(log_record.event_name, "test_event"); + match log_record.body.as_ref() { + Some(any_value) => match any_value.value.as_ref() { + Some( + opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue( + s, + ), + ) => { + assert_eq!(s, "Test log message"); + } + _ => panic!("Unexpected body value type"), + }, + _ => panic!("Unexpected body value type"), + } + } + _ => panic!("Unexpected OtapPayload variant"), + } + } +}