From 706ca82518232cbb69efc8e9b97c1458ba24a098 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Fri, 30 May 2025 12:13:01 +0200 Subject: [PATCH 1/2] feat(span_processor): add on_ending callback --- examples/tracing-http-propagator/Cargo.toml | 2 +- .../tracing-http-propagator/src/server.rs | 46 +++++++++- opentelemetry-sdk/Cargo.toml | 1 + opentelemetry-sdk/src/trace/span.rs | 91 +++++++++++++++++-- opentelemetry-sdk/src/trace/span_processor.rs | 28 +++++- 5 files changed, 152 insertions(+), 16 deletions(-) diff --git a/examples/tracing-http-propagator/Cargo.toml b/examples/tracing-http-propagator/Cargo.toml index a4fa13d371..6d0c9ef0ac 100644 --- a/examples/tracing-http-propagator/Cargo.toml +++ b/examples/tracing-http-propagator/Cargo.toml @@ -25,7 +25,7 @@ hyper = { workspace = true, features = ["full"] } hyper-util = { workspace = true, features = ["full"] } tokio = { workspace = true, features = ["full"] } opentelemetry = { path = "../../opentelemetry" } -opentelemetry_sdk = { path = "../../opentelemetry-sdk" } +opentelemetry_sdk = { path = "../../opentelemetry-sdk", features = ["experimental_span_processor_on_ending"]} opentelemetry-http = { path = "../../opentelemetry-http" } opentelemetry-stdout = { workspace = true, features = ["trace", "logs"] } opentelemetry-semantic-conventions = { path = "../../opentelemetry-semantic-conventions" } diff --git a/examples/tracing-http-propagator/src/server.rs b/examples/tracing-http-propagator/src/server.rs index 49d4a084e7..10bcd5984d 100644 --- a/examples/tracing-http-propagator/src/server.rs +++ b/examples/tracing-http-propagator/src/server.rs @@ -15,7 +15,7 @@ use opentelemetry_sdk::{ error::OTelSdkResult, logs::{LogProcessor, SdkLogRecord, SdkLoggerProvider}, propagation::{BaggagePropagator, TraceContextPropagator}, - trace::{SdkTracerProvider, SpanProcessor}, + trace::{SdkTracerProvider, SpanData, SpanProcessor}, }; use opentelemetry_semantic_conventions::trace; use opentelemetry_stdout::{LogExporter, SpanExporter}; @@ -105,6 +105,49 @@ async fn router( response } +fn obfuscate_http_auth_url(s: &str) -> Option { + #[allow(clippy::unnecessary_to_owned)] + let uri = hyper::http::Uri::from_maybe_shared(s.to_string()).ok()?; + let authority = uri.authority()?; + let (_, url) = authority.as_str().split_once('@')?; + let new_auth = format!("REDACTED_USERNAME:REDACTED_PASSWORD@{url}"); + let mut parts = uri.into_parts(); + parts.authority = Some(hyper::http::uri::Authority::from_maybe_shared(new_auth).ok()?); + Some(hyper::Uri::from_parts(parts).ok()?.to_string()) +} + +#[derive(Debug)] +/// A custom span processor that uses on_ending to obfuscate sensitive information in span attributes. +/// +/// Currently this only overrides http auth information in the URI. +struct SpanObfuscationProcessor; + +impl SpanProcessor for SpanObfuscationProcessor { + fn force_flush(&self) -> OTelSdkResult { + Ok(()) + } + + fn shutdown_with_timeout(&self, _timeout: Duration) -> crate::OTelSdkResult { + Ok(()) + } + + fn on_start(&self, _span: &mut opentelemetry_sdk::trace::Span, _cx: &Context) {} + + fn on_ending(&self, span: &mut opentelemetry_sdk::trace::Span) { + let mut obfuscated_attributes = Vec::new(); + let Some(span) = span.exported_data() else { + return; + }; + for KeyValue { key, value, .. } in span.attributes { + if let Some(redacted_uri) = obfuscate_http_auth_url(value.as_str().as_ref()) { + obfuscated_attributes.push((key.clone(), KeyValue::new(key.clone(), redacted_uri))); + } + } + } + + fn on_end(&self, _span: SpanData) {} +} + /// A custom log processor that enriches LogRecords with baggage attributes. /// Baggage information is not added automatically without this processor. #[derive(Debug)] @@ -159,6 +202,7 @@ fn init_tracer() -> SdkTracerProvider { // that prints the spans to stdout. let provider = SdkTracerProvider::builder() .with_span_processor(EnrichWithBaggageSpanProcessor) + .with_span_processor(SpanObfuscationProcessor) .with_simple_exporter(SpanExporter::default()) .build(); diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index aa92787ea7..3169ab9e08 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -57,6 +57,7 @@ experimental_logs_batch_log_processor_with_async_runtime = ["logs", "experimenta experimental_logs_concurrent_log_processor = ["logs"] experimental_trace_batch_span_processor_with_async_runtime = ["trace", "experimental_async_runtime"] experimental_metrics_disable_name_validation = ["metrics"] +experimental_span_processor_on_ending = ["trace"] [[bench]] name = "context" diff --git a/opentelemetry-sdk/src/trace/span.rs b/opentelemetry-sdk/src/trace/span.rs index 9cb8b88045..ceef53652a 100644 --- a/opentelemetry-sdk/src/trace/span.rs +++ b/opentelemetry-sdk/src/trace/span.rs @@ -198,18 +198,15 @@ impl opentelemetry::trace::Span for Span { impl Span { fn ensure_ended_and_exported(&mut self, timestamp: Option) { - // skip if data has already been exported - let mut data = match self.data.take() { - Some(data) => data, - None => return, - }; - - let provider = self.tracer.provider(); // skip if provider has been shut down - if provider.is_shutdown() { + if self.tracer.provider().is_shutdown() { return; } + // skip if data has already been exported + let Some(data) = self.data.as_mut() else { + return; + }; // ensure end time is set via explicit end or implicitly on drop if let Some(timestamp) = timestamp { data.end_time = timestamp; @@ -217,7 +214,17 @@ impl Span { data.end_time = opentelemetry::time::now(); } - match provider.span_processors() { + #[cfg(feature = "experimental_span_processor_on_ending")] + { + let provider = self.tracer.provider().clone(); + for processor in provider.span_processors() { + processor.on_ending(self); + } + } + + let Some(data) = self.data.take() else { return }; + + match self.tracer.provider().span_processors() { [] => {} [processor] => { processor.on_end(build_export_data( @@ -725,4 +732,70 @@ mod tests { // return none if the provider has already been dropped assert!(dropped_span.exported_data().is_none()); } + + #[test] + #[cfg(feature = "experimental_span_processor_on_ending")] + fn test_on_ending_mutate_span() { + use crate::trace::SpanProcessor; + + #[derive(Debug)] + struct FirstProcessor; + impl SpanProcessor for FirstProcessor { + fn on_start(&self, _span: &mut Span, _cx: &opentelemetry::Context) {} + fn on_end(&self, _span: crate::trace::SpanData) {} + fn on_ending(&self, span: &mut Span) { + span.set_attribute(KeyValue::new("first_processor", "true")); + } + + fn force_flush(&self) -> crate::error::OTelSdkResult { + Ok(()) + } + + fn shutdown_with_timeout(&self, _timeout: Duration) -> crate::error::OTelSdkResult { + Ok(()) + } + } + + #[derive(Debug)] + struct SecondProcessor; + impl SpanProcessor for SecondProcessor { + fn on_start(&self, _span: &mut Span, _cx: &opentelemetry::Context) {} + fn on_end(&self, _span: crate::trace::SpanData) {} + fn on_ending(&self, span: &mut Span) { + assert!(span + .exported_data() + .unwrap() + .attributes + .contains(&KeyValue::new("first_processor", "true"))); + span.set_attribute(KeyValue::new("second_processor", "true")); + } + + fn force_flush(&self) -> crate::error::OTelSdkResult { + Ok(()) + } + + fn shutdown_with_timeout(&self, _timeout: Duration) -> crate::error::OTelSdkResult { + Ok(()) + } + } + + let exporter = crate::trace::span_processor::tests::MockSpanExporter::new(); + let spans = exporter.exported_spans.clone(); + + let provider = crate::trace::SdkTracerProvider::builder() + .with_span_processor(FirstProcessor) + .with_span_processor(SecondProcessor) + .with_simple_exporter(exporter) + .build(); + provider.tracer("test").start("test_span"); + + spans.lock().unwrap().iter().for_each(|span| { + assert!(span + .attributes + .contains(&KeyValue::new("first_processor", "true"))); + assert!(span + .attributes + .contains(&KeyValue::new("second_processor", "true"))); + }); + } } diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 1e5a9c1897..636f5aeac6 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -79,6 +79,24 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// synchronously on the thread that started the span, therefore it should /// not block or throw exceptions. fn on_start(&self, span: &mut Span, cx: &Context); + + #[cfg(feature = "experimental_span_processor_on_ending")] + /// `on_ending` is called when a `Span` is ending. The end timestamp has already + /// been computed. + /// This method is called synchronously within the `Span::end` API, therefore it + /// should not block or throw an exception. + /// + /// If multiple span processors are registered, their on_ending methods are invoked + /// in the order the span processors have been registered, and mutations to the span + /// will be visible to the next processor. + /// + /// The tracer will call `on_ending` for all span processors before calling `on_end` + /// for any of them. + fn on_ending(&self, _span: &mut Span) { + // Default implementation is a no-op so existing processor implementations + // don't break if this feature in enabled transitively. + } + /// `on_end` is called after a `Span` is ended (i.e., the end timestamp is /// already set). This method is called synchronously within the `Span::end` /// API, therefore it should not block or throw an exception. @@ -852,7 +870,7 @@ impl BatchConfigBuilder { } #[cfg(all(test, feature = "testing", feature = "trace"))] -mod tests { +pub(crate) mod tests { // cargo test trace::span_processor::tests:: --features=testing use super::{ BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_EXPORT_TIMEOUT, @@ -1069,13 +1087,13 @@ mod tests { // Mock exporter to test functionality #[derive(Debug)] - struct MockSpanExporter { - exported_spans: Arc>>, - exported_resource: Arc>>, + pub(crate) struct MockSpanExporter { + pub exported_spans: Arc>>, + pub exported_resource: Arc>>, } impl MockSpanExporter { - fn new() -> Self { + pub(crate) fn new() -> Self { Self { exported_spans: Arc::new(Mutex::new(Vec::new())), exported_resource: Arc::new(Mutex::new(None)), From 574242d54ec6503b0bacbf38895fd4061563556e Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Fri, 30 May 2025 13:58:32 +0200 Subject: [PATCH 2/2] feat: add changelog --- opentelemetry-sdk/CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index 1c9e3de159..34133789c2 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -2,6 +2,11 @@ ## vNext +- TODO: Placeholder for Span processor related things + - Add `on_ending` API on the span processor. This allows mutating spans while + they are teriminating. + + ## 0.30.0 Released 2025-May-23