diff --git a/examples/otel_context.rs b/examples/otel_context.rs new file mode 100644 index 0000000..0ab4b38 --- /dev/null +++ b/examples/otel_context.rs @@ -0,0 +1,300 @@ +//! This example demonstrates how to use `OpenTelemetryContext` from a separate layer +//! to access OpenTelemetry context data from tracing spans. + +use opentelemetry::trace::{TraceContextExt, TracerProvider as _}; +use opentelemetry_sdk::trace::SdkTracerProvider; +use opentelemetry_stdout as stdout; +use std::sync::{Arc, Mutex, RwLock}; +use tracing::{debug, info, span, warn, Subscriber}; +use tracing::{dispatcher::WeakDispatch, level_filters::LevelFilter, Dispatch}; +use tracing_opentelemetry::{layer, OpenTelemetryContext}; +use tracing_subscriber::layer::Context; +use tracing_subscriber::prelude::*; +use tracing_subscriber::registry::LookupSpan; +use tracing_subscriber::Layer; + +/// A custom layer that demonstrates how to use OpenTelemetryContext +/// to extract OpenTelemetry contexts from span extensions. +#[derive(Clone)] +struct SpanAnalysisLayer { + /// Store span analysis results for demonstration + analysis_results: Arc>>, + /// Weak reference to the dispatcher for context extraction + dispatch: Arc>>, +} + +#[derive(Debug, Clone)] +struct SpanAnalysis { + span_name: String, + trace_id: String, + span_id: String, + is_root_span: bool, +} + +impl SpanAnalysisLayer { + fn new() -> Self { + Self { + analysis_results: Arc::new(Mutex::new(Vec::new())), + dispatch: Arc::new(RwLock::new(None)), + } + } + + fn get_analysis_results(&self) -> Vec { + self.analysis_results.lock().unwrap().clone() + } + + fn analyze_span_context(&self, span_name: &str, otel_context: &opentelemetry::Context) { + let span = otel_context.span(); + let span_context = span.span_context(); + + if span_context.is_valid() { + let analysis = SpanAnalysis { + span_name: span_name.to_string(), + trace_id: format!("{:032x}", span_context.trace_id()), + span_id: format!("{:016x}", span_context.span_id()), + is_root_span: span_context.trace_flags().is_sampled(), + }; + + println!( + "šŸ” Analyzing span '{}': trace_id={}, span_id={}, sampled={}", + analysis.span_name, + analysis.trace_id, + analysis.span_id, + span_context.trace_flags().is_sampled() + ); + + if let Ok(mut results) = self.analysis_results.lock() { + results.push(analysis); + } + } + } + + fn get_weak_dispatch(&self, get_default: bool) -> Option { + let read_guard = self.dispatch.read().unwrap(); + match read_guard.as_ref() { + Some(weak_dispatch) => Some(weak_dispatch.clone()), + // Note: This workaround is needed until https://github.com/tokio-rs/tracing/pull/3379 + // is merged and released. It should really be handled in on_register_dispatch + None => { + if !get_default { + None + } else { + drop(read_guard); + let mut dispatch = self.dispatch.write().unwrap(); + let weak_dispatch = Dispatch::default().downgrade(); + *dispatch = Some(weak_dispatch.clone()); + Some(weak_dispatch) + } + } + } + } +} + +impl Layer for SpanAnalysisLayer +where + S: Subscriber + for<'span> LookupSpan<'span>, +{ + fn on_new_span( + &self, + attrs: &tracing::span::Attributes<'_>, + id: &tracing::span::Id, + ctx: Context<'_, S>, + ) { + // Get the weak dispatch reference. + // + // Note: We can't use the Dispatch::default() workaround described above here since this + // method is called from inside a dispatcher::get_default block, and such calls can't be + // nested so we would get the global dispatcher instead, which can't downcast to the right + // types when extracting the OpenTelemetry context. This also means that we will miss + // analyzing the first span that is created šŸ¤·šŸ¼ā€ā™‚ļø + let Some(weak_dispatch) = self.get_weak_dispatch(false) else { + return; + }; + + // Get the span reference and extract OpenTelemetry context + if let Some(span_ref) = ctx.span(id) { + // This is the key functionality: using OpenTelemetryContext + // to extract the OpenTelemetry context from span extensions + let mut extensions = span_ref.extensions_mut(); + if let Some(otel_context) = + OpenTelemetryContext::context(&mut extensions, &weak_dispatch.upgrade()) + { + self.analyze_span_context(attrs.metadata().name(), &otel_context); + } else { + println!( + "āš ļø Could not extract OpenTelemetry context for span '{}'", + attrs.metadata().name() + ); + } + } + } + + fn on_enter(&self, id: &tracing::span::Id, ctx: Context<'_, S>) { + if let Some(weak_dispatch) = self.get_weak_dispatch(true) { + if let Some(span_ref) = ctx.span(id) { + let mut extensions = span_ref.extensions_mut(); + if let Some(otel_context) = + OpenTelemetryContext::context(&mut extensions, &weak_dispatch.upgrade()) + { + let span = otel_context.span(); + let span_context = span.span_context(); + if span_context.is_valid() { + println!( + "šŸ“ Entering span with trace_id: {:032x}, span_id: {:016x}", + span_context.trace_id(), + span_context.span_id() + ); + } + } + } + } + } + + fn on_register_dispatch(&self, subscriber: &tracing::Dispatch) { + // Note: This does not work for Layer until https://github.com/tokio-rs/tracing/pull/3379 + // is merged and released, since `on_register_dispatch` is never called. + let mut dispatch = self.dispatch.write().unwrap(); + *dispatch = Some(subscriber.clone().downgrade()); + } +} + +fn setup_tracing() -> (impl Subscriber, SdkTracerProvider, SpanAnalysisLayer) { + // Create OpenTelemetry tracer that outputs to stdout + let provider = SdkTracerProvider::builder() + .with_simple_exporter(stdout::SpanExporter::default()) + .build(); + let tracer = provider.tracer("span_ref_ext_example"); + + // Create our custom analysis layer + let analysis_layer = SpanAnalysisLayer::new(); + + // Build the subscriber with multiple layers: + // 1. OpenTelemetry layer for trace export + // 2. Our custom analysis layer that uses OpenTelemetryContext + // 3. Formatting layer for console output + let subscriber = tracing_subscriber::registry() + .with(layer().with_tracer(tracer).with_filter(LevelFilter::DEBUG)) + .with(analysis_layer.clone()) + .with( + tracing_subscriber::fmt::layer() + .with_target(false) + .with_filter(LevelFilter::INFO), + ); + + (subscriber, provider, analysis_layer) +} + +fn simulate_application_work() { + // Create a root span for the main application work + let root_span = span!(tracing::Level::INFO, "application_main", version = "1.0.0"); + let _root_guard = root_span.enter(); + + info!("Starting application"); + + // Simulate some business logic with nested spans + { + let auth_span = span!(tracing::Level::DEBUG, "authenticate_user", user_id = 12345); + let _auth_guard = auth_span.enter(); + + debug!("Validating user credentials"); + + // Simulate authentication work + std::thread::sleep(std::time::Duration::from_millis(10)); + + info!("User authenticated successfully"); + } + + // Simulate database operations + { + let db_span = span!( + tracing::Level::DEBUG, + "database_query", + query = "SELECT * FROM users", + table = "users" + ); + let _db_guard = db_span.enter(); + + debug!("Executing database query"); + + // Nested span for connection management + { + let conn_span = span!(tracing::Level::DEBUG, "acquire_connection", pool_size = 10); + let _conn_guard = conn_span.enter(); + + debug!("Acquiring database connection from pool"); + std::thread::sleep(std::time::Duration::from_millis(5)); + } + + std::thread::sleep(std::time::Duration::from_millis(20)); + info!("Database query completed"); + } + + // Simulate some processing work + { + let process_span = span!( + tracing::Level::DEBUG, + "process_data", + records_count = 150, + batch_size = 50 + ); + let _process_guard = process_span.enter(); + + debug!("Processing user data"); + + for batch in 1..=3 { + let batch_span = span!( + tracing::Level::DEBUG, + "process_batch", + batch_number = batch, + batch_size = 50 + ); + let _batch_guard = batch_span.enter(); + + debug!("Processing batch {}", batch); + std::thread::sleep(std::time::Duration::from_millis(8)); + } + + info!("Data processing completed"); + } + + warn!("Application work completed"); +} + +fn main() { + println!( + "šŸš€ OpenTelemetryContext Example: Extracting OpenTelemetry Contexts from Separate Layer" + ); + println!("{}", "=".repeat(80)); + + // Setup tracing with our custom layer + let (subscriber, provider, analysis_layer) = setup_tracing(); + + tracing::subscriber::with_default(subscriber, || { + // Simulate application work that generates spans + simulate_application_work(); + }); + + // Ensure all spans are flushed + drop(provider); + + // Display the analysis results + println!("\nšŸ“Š Span Analysis Results:"); + println!("{}", "-".repeat(80)); + + let results = analysis_layer.get_analysis_results(); + for (i, analysis) in results.iter().enumerate() { + println!( + "{}. Span: '{}'\n Trace ID: {}\n Span ID: {}\n Root Span: {}\n", + i + 1, + analysis.span_name, + analysis.trace_id, + analysis.span_id, + analysis.is_root_span + ); + } + + println!( + "āœ… Example completed! Total spans analyzed: {}", + results.len() + ); +} diff --git a/src/layer.rs b/src/layer.rs index a1f0d7a..b65e14b 100644 --- a/src/layer.rs +++ b/src/layer.rs @@ -17,7 +17,7 @@ use tracing_core::{field, Event, Subscriber}; #[cfg(feature = "tracing-log")] use tracing_log::NormalizeEvent; use tracing_subscriber::layer::Context; -use tracing_subscriber::registry::LookupSpan; +use tracing_subscriber::registry::{ExtensionsMut, LookupSpan}; use tracing_subscriber::Layer; #[cfg(all(target_arch = "wasm32", not(target_os = "wasi")))] use web_time::Instant; @@ -104,6 +104,14 @@ pub(crate) struct WithContext { #[allow(clippy::type_complexity)] pub(crate) with_activated_context: fn(&tracing::Dispatch, &span::Id, f: &mut dyn FnMut(&mut OtelData)), + + /// + /// Ensures the given SpanId has been activated - that is, created in the OTel side of things, + /// and had its SpanBuilder consumed - and then provides access to the OtelData associated with it. + /// + #[allow(clippy::type_complexity)] + pub(crate) with_activated_context_extensions: + fn(&tracing::Dispatch, &mut ExtensionsMut<'_>, f: &mut dyn FnMut(&mut OtelData)), } impl WithContext { @@ -133,6 +141,20 @@ impl WithContext { ) { (self.with_activated_context)(dispatch, id, &mut f) } + + /// + /// Ensures the given SpanId has been activated - that is, created in the OTel side of things, + /// and had its SpanBuilder consumed - and then provides access to the OtelData associated with it. + /// + #[allow(clippy::type_complexity)] + pub(crate) fn with_activated_context_extensions( + &self, + dispatch: &tracing::Dispatch, + extensions: &mut ExtensionsMut<'_>, + mut f: impl FnMut(&mut OtelData), + ) { + (self.with_activated_context_extensions)(dispatch, extensions, &mut f) + } } fn str_to_span_kind(s: &str) -> Option { @@ -625,6 +647,7 @@ where with_context: WithContext { with_context: Self::get_context, with_activated_context: Self::get_activated_context, + with_activated_context_extensions: Self::get_activated_context_extensions, }, _registry: marker::PhantomData, } @@ -680,6 +703,8 @@ where with_context: WithContext { with_context: OpenTelemetryLayer::::get_context, with_activated_context: OpenTelemetryLayer::::get_activated_context, + with_activated_context_extensions: + OpenTelemetryLayer::::get_activated_context_extensions, }, _registry: self._registry, // cannot use ``..self` here due to different generics @@ -937,11 +962,20 @@ where .span(id) .expect("registry should have a span for the current ID"); + let mut extensions = span.extensions_mut(); + + Self::get_activated_context_extensions(dispatch, &mut extensions, f) + } + + fn get_activated_context_extensions( + dispatch: &tracing::Dispatch, + extensions: &mut ExtensionsMut<'_>, + f: &mut dyn FnMut(&mut OtelData), + ) { let layer = dispatch .downcast_ref::>() .expect("layer should downcast to expected type; this is a bug!"); - let mut extensions = span.extensions_mut(); if let Some(otel_data) = extensions.get_mut::() { // Activate the context layer.start_cx(otel_data); diff --git a/src/lib.rs b/src/lib.rs index 18de273..599b7f1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -115,6 +115,8 @@ mod metrics; /// Implementation of the trace::Layer as a source of OpenTelemetry data. mod layer; +/// OpenTelemetryContext which enables OpenTelemetry context extraction from span extensions. +mod otel_context; /// Span extension which enables OpenTelemetry context management. mod span_ext; @@ -127,6 +129,7 @@ pub use layer::{layer, OpenTelemetryLayer}; #[cfg(feature = "metrics")] pub use metrics::MetricsLayer; use opentelemetry::trace::TraceContextExt as _; +pub use otel_context::OpenTelemetryContext; pub use span_ext::OpenTelemetrySpanExt; /// Per-span OpenTelemetry data tracked by this crate. diff --git a/src/otel_context.rs b/src/otel_context.rs new file mode 100644 index 0000000..6569305 --- /dev/null +++ b/src/otel_context.rs @@ -0,0 +1,75 @@ +use crate::{layer::WithContext, OtelData, OtelDataState}; +use tracing::Dispatch; +use tracing_subscriber::registry::ExtensionsMut; + +/// Utility functions to allow tracing [`ExtensionsMut`]s to return +/// [OpenTelemetry] [`Context`]s. +/// +/// [`ExtensionsMut`]: tracing_subscriber::registry::ExtensionsMut +/// [OpenTelemetry]: https://opentelemetry.io +/// [`Context`]: opentelemetry::Context +pub struct OpenTelemetryContext {} +impl OpenTelemetryContext { + /// Extracts the OpenTelemetry [`Context`] associated with this span extensions. + /// + /// This method retrieves the OpenTelemetry context data that has been stored + /// for the span by the OpenTelemetry layer. The context includes the span's + /// OpenTelemetry span context, which contains trace ID, span ID, and other + /// trace-related metadata. + /// + /// [`Context`]: opentelemetry::Context + /// + /// # Examples + /// + /// ```rust + /// use tracing_opentelemetry::OpenTelemetryContext; + /// use tracing::dispatcher::WeakDispatch; + /// use tracing_subscriber::registry::LookupSpan; + /// use opentelemetry::trace::TraceContextExt; + /// + /// fn do_things_with_otel_context<'a, D>( + /// span_ref: &tracing_subscriber::registry::SpanRef<'a, D>, + /// weak_dispatch: &WeakDispatch + /// ) where + /// D: LookupSpan<'a>, + /// { + /// if let Some(otel_context) = OpenTelemetryContext::context(&mut span_ref.extensions_mut(), &weak_dispatch.upgrade()) { + /// // Process the extracted context + /// let span = otel_context.span(); + /// let span_context = span.span_context(); + /// if span_context.is_valid() { + /// // Handle the valid context... + /// } + /// } + /// } + /// ``` + /// + /// # Use Cases + /// + /// - When working with multiple subscriber configurations + /// - When implementing advanced tracing middleware that manages multiple dispatches + pub fn context( + extensions: &mut ExtensionsMut<'_>, + dispatch: &Option, + ) -> Option { + dispatch.as_ref().and_then(|dispatch| { + let mut cx = None; + if let Some(get_context) = dispatch.downcast_ref::() { + println!("WOHO"); + // If our span hasn't been built, we should build it and get the context in one call + get_context.with_activated_context_extensions( + dispatch, + extensions, + |data: &mut OtelData| { + if let OtelDataState::Context { current_cx } = &data.state { + cx = Some(current_cx.clone()); + } + }, + ); + } else { + println!("BOHO"); + } + cx + }) + } +} diff --git a/tests/otel_context.rs b/tests/otel_context.rs new file mode 100644 index 0000000..156f5de --- /dev/null +++ b/tests/otel_context.rs @@ -0,0 +1,203 @@ +use opentelemetry::trace::{TraceContextExt, TracerProvider as _}; +use opentelemetry_sdk::{ + error::OTelSdkResult, + trace::{SdkTracerProvider, SpanData, SpanExporter, Tracer}, +}; +use std::sync::{Arc, Mutex, RwLock}; +use tracing::Subscriber; +use tracing::{dispatcher::WeakDispatch, level_filters::LevelFilter, Dispatch}; +use tracing_opentelemetry::{layer, OpenTelemetryContext}; +use tracing_subscriber::layer::Context; +use tracing_subscriber::prelude::*; +use tracing_subscriber::registry::LookupSpan; +use tracing_subscriber::Layer; + +#[derive(Clone, Default, Debug)] +struct TestExporter(Arc>>); + +impl SpanExporter for TestExporter { + async fn export(&self, mut batch: Vec) -> OTelSdkResult { + let spans = self.0.clone(); + if let Ok(mut inner) = spans.lock() { + inner.append(&mut batch); + } + Ok(()) + } +} + +/// A custom tracing layer that uses OpenTelemetryContext to access OpenTelemetry contexts +/// from span extensions. This simulates a separate layer that needs to interact with +/// OpenTelemetry data managed by the OpenTelemetryLayer. +#[derive(Clone)] +struct CustomLayer { + /// Store extracted contexts for verification + extracted_contexts: Arc>>, + dispatch: Arc>>, +} + +impl CustomLayer { + fn new() -> Self { + Self { + extracted_contexts: Arc::new(Mutex::new(Vec::new())), + dispatch: Arc::new(RwLock::new(None)), + } + } + + fn get_extracted_contexts(&self) -> Vec { + self.extracted_contexts.lock().unwrap().clone() + } +} + +impl Layer for CustomLayer +where + S: Subscriber + for<'span> LookupSpan<'span>, +{ + fn on_enter(&self, id: &tracing::span::Id, ctx: Context<'_, S>) { + let weak_dispatch = { + let read_guard = self.dispatch.read().unwrap(); + match read_guard.as_ref() { + Some(weak_dispatch) => weak_dispatch.clone(), + None => { + drop(read_guard); + let mut dispatch = self.dispatch.write().unwrap(); + let weak_dispatch = Dispatch::default().downgrade(); + *dispatch = Some(weak_dispatch.clone()); + weak_dispatch + } + } + }; + + // Get the span reference from the registry when the span is entered + if let Some(span_ref) = ctx.span(id) { + // Use OpenTelemetryContext to extract the OpenTelemetry context + let mut extensions = span_ref.extensions_mut(); + let otel_context = + OpenTelemetryContext::context(&mut extensions, &weak_dispatch.upgrade()); + if let Some(otel_context) = otel_context { + // Store the extracted context for verification + if let Ok(mut contexts) = self.extracted_contexts.lock() { + contexts.push(otel_context); + } + } + } + } + + fn on_register_dispatch(&self, subscriber: &tracing::Dispatch) { + // Note: This does not work for Layer until https://github.com/tokio-rs/tracing/pull/3379 + // is merged and released, since `on_register_dispatch` is never called. + let mut dispatch = self.dispatch.write().unwrap(); + *dispatch = Some(subscriber.clone().downgrade()); + } +} + +fn test_tracer_with_custom_layer() -> ( + Tracer, + SdkTracerProvider, + TestExporter, + CustomLayer, + impl Subscriber, +) { + let exporter = TestExporter::default(); + let provider = SdkTracerProvider::builder() + .with_simple_exporter(exporter.clone()) + .build(); + let tracer = provider.tracer("test"); + + let custom_layer = CustomLayer::new(); + + let subscriber = tracing_subscriber::registry() + .with( + layer() + .with_tracer(tracer.clone()) + .with_filter(LevelFilter::DEBUG), + ) + .with(custom_layer.clone()) + .with(tracing_subscriber::fmt::layer().with_filter(LevelFilter::TRACE)); + + (tracer, provider, exporter, custom_layer, subscriber) +} + +#[test] +fn test_span_ref_ext_from_separate_layer() { + let (_tracer, provider, exporter, custom_layer, subscriber) = test_tracer_with_custom_layer(); + + tracing::subscriber::with_default(subscriber, || { + // Create a span that will be processed by both the OpenTelemetry layer + // and our custom layer + let _span = tracing::debug_span!("test_span", test_field = "test_value").entered(); + + // Create a child span to test hierarchical context extraction + let _child_span = tracing::debug_span!("child_span", child_field = "child_value").entered(); + }); + + drop(provider); // flush all spans + + // Verify that spans were exported by the OpenTelemetry layer + let spans = exporter.0.lock().unwrap(); + assert_eq!(spans.len(), 2, "Expected 2 spans to be exported"); + + // Verify that our custom layer extracted OpenTelemetry contexts + let extracted_contexts = custom_layer.get_extracted_contexts(); + assert_eq!( + extracted_contexts.len(), + 2, + "Expected 2 contexts to be extracted by custom layer" + ); + + // Verify that the extracted contexts contain valid span contexts + for (i, context) in extracted_contexts.iter().enumerate() { + let span = context.span(); + let span_context = span.span_context(); + assert!( + span_context.is_valid(), + "Context {} should have a valid span context", + i + ); + assert_ne!( + span_context.trace_id(), + opentelemetry::trace::TraceId::INVALID, + "Context {} should have a valid trace ID", + i + ); + assert_ne!( + span_context.span_id(), + opentelemetry::trace::SpanId::INVALID, + "Context {} should have a valid span ID", + i + ); + } + + // Verify that the contexts correspond to the exported spans + let parent_span = spans.iter().find(|s| s.name == "test_span").unwrap(); + let child_span = spans.iter().find(|s| s.name == "child_span").unwrap(); + + // The first extracted context should correspond to the parent span + let parent_context = &extracted_contexts[0]; + assert_eq!( + parent_context.span().span_context().span_id(), + parent_span.span_context.span_id(), + "Parent context should match parent span" + ); + + // The second extracted context should correspond to the child span + let child_context = &extracted_contexts[1]; + assert_eq!( + child_context.span().span_context().span_id(), + child_span.span_context.span_id(), + "Child context should match child span" + ); + + // Verify that both spans share the same trace ID (hierarchical relationship) + assert_eq!( + parent_span.span_context.trace_id(), + child_span.span_context.trace_id(), + "Parent and child spans should share the same trace ID" + ); + + // Verify that the child span has the parent span as its parent + assert_eq!( + child_span.parent_span_id, + parent_span.span_context.span_id(), + "Child span should have parent span as its parent" + ); +}