|
| 1 | +//! This example demonstrates how to use `OpenTelemetryContext` from a separate layer |
| 2 | +//! to access OpenTelemetry context data from tracing spans. |
| 3 | +
|
| 4 | +use opentelemetry::trace::{TraceContextExt, TracerProvider as _}; |
| 5 | +use opentelemetry_sdk::trace::SdkTracerProvider; |
| 6 | +use opentelemetry_stdout as stdout; |
| 7 | +use std::sync::{Arc, Mutex, RwLock}; |
| 8 | +use tracing::{debug, info, span, warn, Subscriber}; |
| 9 | +use tracing::{dispatcher::WeakDispatch, level_filters::LevelFilter, Dispatch}; |
| 10 | +use tracing_opentelemetry::{layer, OpenTelemetryContext}; |
| 11 | +use tracing_subscriber::layer::Context; |
| 12 | +use tracing_subscriber::prelude::*; |
| 13 | +use tracing_subscriber::registry::LookupSpan; |
| 14 | +use tracing_subscriber::Layer; |
| 15 | + |
| 16 | +/// A custom layer that demonstrates how to use OpenTelemetryContext |
| 17 | +/// to extract OpenTelemetry contexts from span extensions. |
| 18 | +#[derive(Clone)] |
| 19 | +struct SpanAnalysisLayer { |
| 20 | + /// Store span analysis results for demonstration |
| 21 | + analysis_results: Arc<Mutex<Vec<SpanAnalysis>>>, |
| 22 | + /// Weak reference to the dispatcher for context extraction |
| 23 | + dispatch: Arc<RwLock<Option<WeakDispatch>>>, |
| 24 | +} |
| 25 | + |
| 26 | +#[derive(Debug, Clone)] |
| 27 | +struct SpanAnalysis { |
| 28 | + span_name: String, |
| 29 | + trace_id: String, |
| 30 | + span_id: String, |
| 31 | + is_root_span: bool, |
| 32 | +} |
| 33 | + |
| 34 | +impl SpanAnalysisLayer { |
| 35 | + fn new() -> Self { |
| 36 | + Self { |
| 37 | + analysis_results: Arc::new(Mutex::new(Vec::new())), |
| 38 | + dispatch: Arc::new(RwLock::new(None)), |
| 39 | + } |
| 40 | + } |
| 41 | + |
| 42 | + fn get_analysis_results(&self) -> Vec<SpanAnalysis> { |
| 43 | + self.analysis_results.lock().unwrap().clone() |
| 44 | + } |
| 45 | + |
| 46 | + fn analyze_span_context(&self, span_name: &str, otel_context: &opentelemetry::Context) { |
| 47 | + let span = otel_context.span(); |
| 48 | + let span_context = span.span_context(); |
| 49 | + |
| 50 | + if span_context.is_valid() { |
| 51 | + let analysis = SpanAnalysis { |
| 52 | + span_name: span_name.to_string(), |
| 53 | + trace_id: format!("{:032x}", span_context.trace_id()), |
| 54 | + span_id: format!("{:016x}", span_context.span_id()), |
| 55 | + is_root_span: span_context.trace_flags().is_sampled(), |
| 56 | + }; |
| 57 | + |
| 58 | + println!( |
| 59 | + "🔍 Analyzing span '{}': trace_id={}, span_id={}, sampled={}", |
| 60 | + analysis.span_name, |
| 61 | + analysis.trace_id, |
| 62 | + analysis.span_id, |
| 63 | + span_context.trace_flags().is_sampled() |
| 64 | + ); |
| 65 | + |
| 66 | + if let Ok(mut results) = self.analysis_results.lock() { |
| 67 | + results.push(analysis); |
| 68 | + } |
| 69 | + } |
| 70 | + } |
| 71 | + |
| 72 | + fn get_weak_dispatch(&self, get_default: bool) -> Option<WeakDispatch> { |
| 73 | + let read_guard = self.dispatch.read().unwrap(); |
| 74 | + match read_guard.as_ref() { |
| 75 | + Some(weak_dispatch) => Some(weak_dispatch.clone()), |
| 76 | + // Note: This workaround is needed until https://github.com/tokio-rs/tracing/pull/3379 |
| 77 | + // is merged and released. It should really be handled in on_register_dispatch |
| 78 | + None => { |
| 79 | + if !get_default { |
| 80 | + None |
| 81 | + } else { |
| 82 | + drop(read_guard); |
| 83 | + let mut dispatch = self.dispatch.write().unwrap(); |
| 84 | + let weak_dispatch = Dispatch::default().downgrade(); |
| 85 | + *dispatch = Some(weak_dispatch.clone()); |
| 86 | + Some(weak_dispatch) |
| 87 | + } |
| 88 | + } |
| 89 | + } |
| 90 | + } |
| 91 | +} |
| 92 | + |
| 93 | +impl<S> Layer<S> for SpanAnalysisLayer |
| 94 | +where |
| 95 | + S: Subscriber + for<'span> LookupSpan<'span>, |
| 96 | +{ |
| 97 | + fn on_new_span( |
| 98 | + &self, |
| 99 | + attrs: &tracing::span::Attributes<'_>, |
| 100 | + id: &tracing::span::Id, |
| 101 | + ctx: Context<'_, S>, |
| 102 | + ) { |
| 103 | + // Get the weak dispatch reference. |
| 104 | + // |
| 105 | + // Note: We can't use the Dispatch::default() workaround described above here since this |
| 106 | + // method is called from inside a dispatcher::get_default block, and such calls can't be |
| 107 | + // nested so we would get the global dispatcher instead, which can't downcast to the right |
| 108 | + // types when extracting the OpenTelemetry context. This also means that we will miss |
| 109 | + // analyzing the first span that is created 🤷🏼♂️ |
| 110 | + let Some(weak_dispatch) = self.get_weak_dispatch(false) else { |
| 111 | + return; |
| 112 | + }; |
| 113 | + |
| 114 | + // Get the span reference and extract OpenTelemetry context |
| 115 | + if let Some(span_ref) = ctx.span(id) { |
| 116 | + // This is the key functionality: using OpenTelemetryContext |
| 117 | + // to extract the OpenTelemetry context from span extensions |
| 118 | + let mut extensions = span_ref.extensions_mut(); |
| 119 | + if let Some(otel_context) = |
| 120 | + OpenTelemetryContext::context(&mut extensions, &weak_dispatch.upgrade()) |
| 121 | + { |
| 122 | + self.analyze_span_context(attrs.metadata().name(), &otel_context); |
| 123 | + } else { |
| 124 | + println!( |
| 125 | + "⚠️ Could not extract OpenTelemetry context for span '{}'", |
| 126 | + attrs.metadata().name() |
| 127 | + ); |
| 128 | + } |
| 129 | + } |
| 130 | + } |
| 131 | + |
| 132 | + fn on_enter(&self, id: &tracing::span::Id, ctx: Context<'_, S>) { |
| 133 | + if let Some(weak_dispatch) = self.get_weak_dispatch(true) { |
| 134 | + if let Some(span_ref) = ctx.span(id) { |
| 135 | + let mut extensions = span_ref.extensions_mut(); |
| 136 | + if let Some(otel_context) = |
| 137 | + OpenTelemetryContext::context(&mut extensions, &weak_dispatch.upgrade()) |
| 138 | + { |
| 139 | + let span = otel_context.span(); |
| 140 | + let span_context = span.span_context(); |
| 141 | + if span_context.is_valid() { |
| 142 | + println!( |
| 143 | + "📍 Entering span with trace_id: {:032x}, span_id: {:016x}", |
| 144 | + span_context.trace_id(), |
| 145 | + span_context.span_id() |
| 146 | + ); |
| 147 | + } |
| 148 | + } |
| 149 | + } |
| 150 | + } |
| 151 | + } |
| 152 | + |
| 153 | + fn on_register_dispatch(&self, subscriber: &tracing::Dispatch) { |
| 154 | + // Note: This does not work for Layer until https://github.com/tokio-rs/tracing/pull/3379 |
| 155 | + // is merged and released, since `on_register_dispatch` is never called. |
| 156 | + let mut dispatch = self.dispatch.write().unwrap(); |
| 157 | + *dispatch = Some(subscriber.clone().downgrade()); |
| 158 | + } |
| 159 | +} |
| 160 | + |
| 161 | +fn setup_tracing() -> (impl Subscriber, SdkTracerProvider, SpanAnalysisLayer) { |
| 162 | + // Create OpenTelemetry tracer that outputs to stdout |
| 163 | + let provider = SdkTracerProvider::builder() |
| 164 | + .with_simple_exporter(stdout::SpanExporter::default()) |
| 165 | + .build(); |
| 166 | + let tracer = provider.tracer("span_ref_ext_example"); |
| 167 | + |
| 168 | + // Create our custom analysis layer |
| 169 | + let analysis_layer = SpanAnalysisLayer::new(); |
| 170 | + |
| 171 | + // Build the subscriber with multiple layers: |
| 172 | + // 1. OpenTelemetry layer for trace export |
| 173 | + // 2. Our custom analysis layer that uses OpenTelemetryContext |
| 174 | + // 3. Formatting layer for console output |
| 175 | + let subscriber = tracing_subscriber::registry() |
| 176 | + .with(layer().with_tracer(tracer).with_filter(LevelFilter::DEBUG)) |
| 177 | + .with(analysis_layer.clone()) |
| 178 | + .with( |
| 179 | + tracing_subscriber::fmt::layer() |
| 180 | + .with_target(false) |
| 181 | + .with_filter(LevelFilter::INFO), |
| 182 | + ); |
| 183 | + |
| 184 | + (subscriber, provider, analysis_layer) |
| 185 | +} |
| 186 | + |
| 187 | +fn simulate_application_work() { |
| 188 | + // Create a root span for the main application work |
| 189 | + let root_span = span!(tracing::Level::INFO, "application_main", version = "1.0.0"); |
| 190 | + let _root_guard = root_span.enter(); |
| 191 | + |
| 192 | + info!("Starting application"); |
| 193 | + |
| 194 | + // Simulate some business logic with nested spans |
| 195 | + { |
| 196 | + let auth_span = span!(tracing::Level::DEBUG, "authenticate_user", user_id = 12345); |
| 197 | + let _auth_guard = auth_span.enter(); |
| 198 | + |
| 199 | + debug!("Validating user credentials"); |
| 200 | + |
| 201 | + // Simulate authentication work |
| 202 | + std::thread::sleep(std::time::Duration::from_millis(10)); |
| 203 | + |
| 204 | + info!("User authenticated successfully"); |
| 205 | + } |
| 206 | + |
| 207 | + // Simulate database operations |
| 208 | + { |
| 209 | + let db_span = span!( |
| 210 | + tracing::Level::DEBUG, |
| 211 | + "database_query", |
| 212 | + query = "SELECT * FROM users", |
| 213 | + table = "users" |
| 214 | + ); |
| 215 | + let _db_guard = db_span.enter(); |
| 216 | + |
| 217 | + debug!("Executing database query"); |
| 218 | + |
| 219 | + // Nested span for connection management |
| 220 | + { |
| 221 | + let conn_span = span!(tracing::Level::DEBUG, "acquire_connection", pool_size = 10); |
| 222 | + let _conn_guard = conn_span.enter(); |
| 223 | + |
| 224 | + debug!("Acquiring database connection from pool"); |
| 225 | + std::thread::sleep(std::time::Duration::from_millis(5)); |
| 226 | + } |
| 227 | + |
| 228 | + std::thread::sleep(std::time::Duration::from_millis(20)); |
| 229 | + info!("Database query completed"); |
| 230 | + } |
| 231 | + |
| 232 | + // Simulate some processing work |
| 233 | + { |
| 234 | + let process_span = span!( |
| 235 | + tracing::Level::DEBUG, |
| 236 | + "process_data", |
| 237 | + records_count = 150, |
| 238 | + batch_size = 50 |
| 239 | + ); |
| 240 | + let _process_guard = process_span.enter(); |
| 241 | + |
| 242 | + debug!("Processing user data"); |
| 243 | + |
| 244 | + for batch in 1..=3 { |
| 245 | + let batch_span = span!( |
| 246 | + tracing::Level::DEBUG, |
| 247 | + "process_batch", |
| 248 | + batch_number = batch, |
| 249 | + batch_size = 50 |
| 250 | + ); |
| 251 | + let _batch_guard = batch_span.enter(); |
| 252 | + |
| 253 | + debug!("Processing batch {}", batch); |
| 254 | + std::thread::sleep(std::time::Duration::from_millis(8)); |
| 255 | + } |
| 256 | + |
| 257 | + info!("Data processing completed"); |
| 258 | + } |
| 259 | + |
| 260 | + warn!("Application work completed"); |
| 261 | +} |
| 262 | + |
| 263 | +fn main() { |
| 264 | + println!( |
| 265 | + "🚀 OpenTelemetryContext Example: Extracting OpenTelemetry Contexts from Separate Layer" |
| 266 | + ); |
| 267 | + println!("{}", "=".repeat(80)); |
| 268 | + |
| 269 | + // Setup tracing with our custom layer |
| 270 | + let (subscriber, provider, analysis_layer) = setup_tracing(); |
| 271 | + |
| 272 | + tracing::subscriber::with_default(subscriber, || { |
| 273 | + // Simulate application work that generates spans |
| 274 | + simulate_application_work(); |
| 275 | + }); |
| 276 | + |
| 277 | + // Ensure all spans are flushed |
| 278 | + drop(provider); |
| 279 | + |
| 280 | + // Display the analysis results |
| 281 | + println!("\n📊 Span Analysis Results:"); |
| 282 | + println!("{}", "-".repeat(80)); |
| 283 | + |
| 284 | + let results = analysis_layer.get_analysis_results(); |
| 285 | + for (i, analysis) in results.iter().enumerate() { |
| 286 | + println!( |
| 287 | + "{}. Span: '{}'\n Trace ID: {}\n Span ID: {}\n Root Span: {}\n", |
| 288 | + i + 1, |
| 289 | + analysis.span_name, |
| 290 | + analysis.trace_id, |
| 291 | + analysis.span_id, |
| 292 | + analysis.is_root_span |
| 293 | + ); |
| 294 | + } |
| 295 | + |
| 296 | + println!( |
| 297 | + "✅ Example completed! Total spans analyzed: {}", |
| 298 | + results.len() |
| 299 | + ); |
| 300 | +} |
0 commit comments