Skip to content

Commit ee2e9ea

Browse files
committed
feat: allow OpenTelemetry context access from SpanRef
1 parent 4ebae2c commit ee2e9ea

File tree

5 files changed

+563
-13
lines changed

5 files changed

+563
-13
lines changed

examples/span_ref_ext.rs

Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
//! This example demonstrates how to use `OpenTelemetrySpanRefExt` from a separate layer
2+
//! to access OpenTelemetry context data from tracing spans.
3+
4+
use opentelemetry::trace::{TraceContextExt, TracerProvider as _};
5+
use opentelemetry_sdk::trace::SdkTracerProvider;
6+
use opentelemetry_stdout as stdout;
7+
use std::sync::{Arc, Mutex, RwLock};
8+
use tracing::{debug, info, span, warn, Subscriber};
9+
use tracing::{dispatcher::WeakDispatch, level_filters::LevelFilter, Dispatch};
10+
use tracing_opentelemetry::{layer, OpenTelemetrySpanRefExt};
11+
use tracing_subscriber::layer::Context;
12+
use tracing_subscriber::prelude::*;
13+
use tracing_subscriber::registry::LookupSpan;
14+
use tracing_subscriber::Layer;
15+
16+
/// A custom layer that demonstrates how to use OpenTelemetrySpanRefExt
17+
/// to extract OpenTelemetry contexts from span references.
18+
#[derive(Clone)]
19+
struct SpanAnalysisLayer {
20+
/// Store span analysis results for demonstration
21+
analysis_results: Arc<Mutex<Vec<SpanAnalysis>>>,
22+
/// Weak reference to the dispatcher for context extraction
23+
dispatch: Arc<RwLock<Option<WeakDispatch>>>,
24+
}
25+
26+
#[derive(Debug, Clone)]
27+
struct SpanAnalysis {
28+
span_name: String,
29+
trace_id: String,
30+
span_id: String,
31+
is_root_span: bool,
32+
}
33+
34+
impl SpanAnalysisLayer {
35+
fn new() -> Self {
36+
Self {
37+
analysis_results: Arc::new(Mutex::new(Vec::new())),
38+
dispatch: Arc::new(RwLock::new(None)),
39+
}
40+
}
41+
42+
fn get_analysis_results(&self) -> Vec<SpanAnalysis> {
43+
self.analysis_results.lock().unwrap().clone()
44+
}
45+
46+
fn analyze_span_context(&self, span_name: &str, otel_context: &opentelemetry::Context) {
47+
let span = otel_context.span();
48+
let span_context = span.span_context();
49+
50+
if span_context.is_valid() {
51+
let analysis = SpanAnalysis {
52+
span_name: span_name.to_string(),
53+
trace_id: format!("{:032x}", span_context.trace_id()),
54+
span_id: format!("{:016x}", span_context.span_id()),
55+
is_root_span: span_context.trace_flags().is_sampled(),
56+
};
57+
58+
println!(
59+
"🔍 Analyzing span '{}': trace_id={}, span_id={}, sampled={}",
60+
analysis.span_name,
61+
analysis.trace_id,
62+
analysis.span_id,
63+
span_context.trace_flags().is_sampled()
64+
);
65+
66+
if let Ok(mut results) = self.analysis_results.lock() {
67+
results.push(analysis);
68+
}
69+
}
70+
}
71+
}
72+
73+
impl<S> Layer<S> for SpanAnalysisLayer
74+
where
75+
S: Subscriber + for<'span> LookupSpan<'span>,
76+
{
77+
fn on_new_span(
78+
&self,
79+
attrs: &tracing::span::Attributes<'_>,
80+
id: &tracing::span::Id,
81+
ctx: Context<'_, S>,
82+
) {
83+
// Get the weak dispatch reference
84+
let weak_dispatch = {
85+
let read_guard = self.dispatch.read().unwrap();
86+
match read_guard.as_ref() {
87+
Some(weak_dispatch) => weak_dispatch.clone(),
88+
// Note: This workaround is needed until https://github.com/tokio-rs/tracing/pull/3379
89+
// is merged and released. It should really be handled in on_register_dispatch
90+
None => {
91+
drop(read_guard);
92+
let mut dispatch = self.dispatch.write().unwrap();
93+
let weak_dispatch = Dispatch::default().downgrade();
94+
*dispatch = Some(weak_dispatch.clone());
95+
weak_dispatch
96+
}
97+
}
98+
};
99+
100+
// Get the span reference and extract OpenTelemetry context
101+
if let Some(span_ref) = ctx.span(id) {
102+
// This is the key functionality: using OpenTelemetrySpanRefExt
103+
// to extract the OpenTelemetry context from a span reference
104+
if let Some(otel_context) = span_ref.context(&weak_dispatch.upgrade()) {
105+
self.analyze_span_context(attrs.metadata().name(), &otel_context);
106+
} else {
107+
println!(
108+
"⚠️ Could not extract OpenTelemetry context for span '{}'",
109+
attrs.metadata().name()
110+
);
111+
}
112+
}
113+
}
114+
115+
fn on_enter(&self, id: &tracing::span::Id, ctx: Context<'_, S>) {
116+
if let Some(weak_dispatch) = self.dispatch.read().unwrap().as_ref().map(|wd| wd.clone()) {
117+
if let Some(span_ref) = ctx.span(id) {
118+
if let Some(otel_context) = span_ref.context(&weak_dispatch.upgrade()) {
119+
let span = otel_context.span();
120+
let span_context = span.span_context();
121+
if span_context.is_valid() {
122+
println!(
123+
"📍 Entering span with trace_id: {:032x}, span_id: {:016x}",
124+
span_context.trace_id(),
125+
span_context.span_id()
126+
);
127+
}
128+
}
129+
}
130+
}
131+
}
132+
133+
fn on_register_dispatch(&self, subscriber: &tracing::Dispatch) {
134+
// Note: This does not work for Layer until https://github.com/tokio-rs/tracing/pull/3379
135+
// is merged and released, since `on_register_dispatch` is never called.
136+
let mut dispatch = self.dispatch.write().unwrap();
137+
*dispatch = Some(subscriber.clone().downgrade());
138+
}
139+
}
140+
141+
fn setup_tracing() -> (impl Subscriber, SdkTracerProvider, SpanAnalysisLayer) {
142+
// Create OpenTelemetry tracer that outputs to stdout
143+
let provider = SdkTracerProvider::builder()
144+
.with_simple_exporter(stdout::SpanExporter::default())
145+
.build();
146+
let tracer = provider.tracer("span_ref_ext_example");
147+
148+
// Create our custom analysis layer
149+
let analysis_layer = SpanAnalysisLayer::new();
150+
151+
// Build the subscriber with multiple layers:
152+
// 1. OpenTelemetry layer for trace export
153+
// 2. Our custom analysis layer that uses SpanRefExt
154+
// 3. Formatting layer for console output
155+
let subscriber = tracing_subscriber::registry()
156+
.with(layer().with_tracer(tracer).with_filter(LevelFilter::DEBUG))
157+
.with(analysis_layer.clone())
158+
.with(
159+
tracing_subscriber::fmt::layer()
160+
.with_target(false)
161+
.with_filter(LevelFilter::INFO),
162+
);
163+
164+
(subscriber, provider, analysis_layer)
165+
}
166+
167+
fn simulate_application_work() {
168+
// Create a root span for the main application work
169+
let root_span = span!(tracing::Level::INFO, "application_main", version = "1.0.0");
170+
let _root_guard = root_span.enter();
171+
172+
info!("Starting application");
173+
174+
// Simulate some business logic with nested spans
175+
{
176+
let auth_span = span!(tracing::Level::DEBUG, "authenticate_user", user_id = 12345);
177+
let _auth_guard = auth_span.enter();
178+
179+
debug!("Validating user credentials");
180+
181+
// Simulate authentication work
182+
std::thread::sleep(std::time::Duration::from_millis(10));
183+
184+
info!("User authenticated successfully");
185+
}
186+
187+
// Simulate database operations
188+
{
189+
let db_span = span!(
190+
tracing::Level::DEBUG,
191+
"database_query",
192+
query = "SELECT * FROM users",
193+
table = "users"
194+
);
195+
let _db_guard = db_span.enter();
196+
197+
debug!("Executing database query");
198+
199+
// Nested span for connection management
200+
{
201+
let conn_span = span!(tracing::Level::DEBUG, "acquire_connection", pool_size = 10);
202+
let _conn_guard = conn_span.enter();
203+
204+
debug!("Acquiring database connection from pool");
205+
std::thread::sleep(std::time::Duration::from_millis(5));
206+
}
207+
208+
std::thread::sleep(std::time::Duration::from_millis(20));
209+
info!("Database query completed");
210+
}
211+
212+
// Simulate some processing work
213+
{
214+
let process_span = span!(
215+
tracing::Level::DEBUG,
216+
"process_data",
217+
records_count = 150,
218+
batch_size = 50
219+
);
220+
let _process_guard = process_span.enter();
221+
222+
debug!("Processing user data");
223+
224+
for batch in 1..=3 {
225+
let batch_span = span!(
226+
tracing::Level::DEBUG,
227+
"process_batch",
228+
batch_number = batch,
229+
batch_size = 50
230+
);
231+
let _batch_guard = batch_span.enter();
232+
233+
debug!("Processing batch {}", batch);
234+
std::thread::sleep(std::time::Duration::from_millis(8));
235+
}
236+
237+
info!("Data processing completed");
238+
}
239+
240+
warn!("Application work completed");
241+
}
242+
243+
fn main() {
244+
println!("🚀 SpanRefExt Example: Extracting OpenTelemetry Contexts from Separate Layer");
245+
println!("{}", "=".repeat(80));
246+
247+
// Setup tracing with our custom layer
248+
let (subscriber, provider, analysis_layer) = setup_tracing();
249+
250+
tracing::subscriber::with_default(subscriber, || {
251+
// Simulate application work that generates spans
252+
simulate_application_work();
253+
});
254+
255+
// Ensure all spans are flushed
256+
drop(provider);
257+
258+
// Display the analysis results
259+
println!("\n📊 Span Analysis Results:");
260+
println!("{}", "-".repeat(80));
261+
262+
let results = analysis_layer.get_analysis_results();
263+
for (i, analysis) in results.iter().enumerate() {
264+
println!(
265+
"{}. Span: '{}'\n Trace ID: {}\n Span ID: {}\n Root Span: {}\n",
266+
i + 1,
267+
analysis.span_name,
268+
analysis.trace_id,
269+
analysis.span_id,
270+
analysis.is_root_span
271+
);
272+
}
273+
274+
println!(
275+
"✅ Example completed! Total spans analyzed: {}",
276+
results.len()
277+
);
278+
}

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ mod metrics;
117117
mod layer;
118118
/// Span extension which enables OpenTelemetry context management.
119119
mod span_ext;
120+
/// SpanRef extension which enables OpenTelemetry context extraction.
121+
mod span_ref_ext;
120122

121123
mod stack;
122124

@@ -127,6 +129,7 @@ pub use layer::{layer, OpenTelemetryLayer};
127129
#[cfg(feature = "metrics")]
128130
pub use metrics::MetricsLayer;
129131
pub use span_ext::OpenTelemetrySpanExt;
132+
pub use span_ref_ext::OpenTelemetrySpanRefExt;
130133

131134
/// Per-span OpenTelemetry data tracked by this crate.
132135
#[derive(Debug)]

src/span_ext.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use opentelemetry::{
77
};
88
use std::{borrow::Cow, time::SystemTime};
99
use thiserror::Error;
10+
use tracing::span::Id;
11+
use tracing::Dispatch;
1012

1113
/// Utility functions to allow tracing [`Span`]s to accept and return
1214
/// [OpenTelemetry] [`Context`]s.
@@ -313,20 +315,10 @@ impl OpenTelemetrySpanExt for tracing::Span {
313315
}
314316

315317
fn context(&self) -> Context {
316-
let mut cx = None;
317318
self.with_subscriber(|(id, subscriber)| {
318-
let Some(get_context) = subscriber.downcast_ref::<WithContext>() else {
319-
return;
320-
};
321-
// If our span hasn't been built, we should build it and get the context in one call
322-
get_context.with_activated_context(subscriber, id, |data: &mut OtelData| {
323-
if let OtelDataState::Context { current_cx } = &data.state {
324-
cx = Some(current_cx.clone());
325-
}
326-
});
327-
});
328-
329-
cx.unwrap_or_default()
319+
get_otel_context(subscriber, id).unwrap_or_default()
320+
})
321+
.unwrap_or_default()
330322
}
331323

332324
fn set_attribute(&self, key: impl Into<Key>, value: impl Into<Value>) {
@@ -415,3 +407,16 @@ impl OpenTelemetrySpanExt for tracing::Span {
415407
});
416408
}
417409
}
410+
411+
pub(crate) fn get_otel_context(dispatch: &Dispatch, id: &Id) -> Option<opentelemetry::Context> {
412+
let mut cx = None;
413+
if let Some(get_context) = dispatch.downcast_ref::<WithContext>() {
414+
// If our span hasn't been built, we should build it and get the context in one call
415+
get_context.with_activated_context(dispatch, id, |data: &mut OtelData| {
416+
if let OtelDataState::Context { current_cx } = &data.state {
417+
cx = Some(current_cx.clone());
418+
}
419+
});
420+
}
421+
cx
422+
}

0 commit comments

Comments
 (0)