Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
300 changes: 300 additions & 0 deletions examples/otel_context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
//! This example demonstrates how to use `OpenTelemetryContext` from a separate layer
//! to access OpenTelemetry context data from tracing spans.

use opentelemetry::trace::{TraceContextExt, TracerProvider as _};
use opentelemetry_sdk::trace::SdkTracerProvider;
use opentelemetry_stdout as stdout;
use std::sync::{Arc, Mutex, RwLock};
use tracing::{debug, info, span, warn, Subscriber};
use tracing::{dispatcher::WeakDispatch, level_filters::LevelFilter, Dispatch};
use tracing_opentelemetry::{layer, OpenTelemetryContext};
use tracing_subscriber::layer::Context;
use tracing_subscriber::prelude::*;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::Layer;

/// A custom layer that demonstrates how to use OpenTelemetryContext
/// to extract OpenTelemetry contexts from span extensions.
#[derive(Clone)]
struct SpanAnalysisLayer {
/// Store span analysis results for demonstration
analysis_results: Arc<Mutex<Vec<SpanAnalysis>>>,
/// Weak reference to the dispatcher for context extraction
dispatch: Arc<RwLock<Option<WeakDispatch>>>,
}

#[derive(Debug, Clone)]
struct SpanAnalysis {
span_name: String,
trace_id: String,
span_id: String,
is_root_span: bool,
}

impl SpanAnalysisLayer {
fn new() -> Self {
Self {
analysis_results: Arc::new(Mutex::new(Vec::new())),
dispatch: Arc::new(RwLock::new(None)),
}
}

fn get_analysis_results(&self) -> Vec<SpanAnalysis> {
self.analysis_results.lock().unwrap().clone()
}

fn analyze_span_context(&self, span_name: &str, otel_context: &opentelemetry::Context) {
let span = otel_context.span();
let span_context = span.span_context();

if span_context.is_valid() {
let analysis = SpanAnalysis {
span_name: span_name.to_string(),
trace_id: format!("{:032x}", span_context.trace_id()),
span_id: format!("{:016x}", span_context.span_id()),
is_root_span: span_context.trace_flags().is_sampled(),
};

println!(
"🔍 Analyzing span '{}': trace_id={}, span_id={}, sampled={}",
analysis.span_name,
analysis.trace_id,
analysis.span_id,
span_context.trace_flags().is_sampled()
);

if let Ok(mut results) = self.analysis_results.lock() {
results.push(analysis);
}
}
}

fn get_weak_dispatch(&self, get_default: bool) -> Option<WeakDispatch> {
let read_guard = self.dispatch.read().unwrap();
match read_guard.as_ref() {
Some(weak_dispatch) => Some(weak_dispatch.clone()),
// Note: This workaround is needed until https://github.com/tokio-rs/tracing/pull/3379
// is merged and released. It should really be handled in on_register_dispatch
None => {
if !get_default {
None
} else {
drop(read_guard);
let mut dispatch = self.dispatch.write().unwrap();
let weak_dispatch = Dispatch::default().downgrade();
*dispatch = Some(weak_dispatch.clone());
Some(weak_dispatch)
}
}
}
}
}

impl<S> Layer<S> for SpanAnalysisLayer
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
fn on_new_span(
&self,
attrs: &tracing::span::Attributes<'_>,
id: &tracing::span::Id,
ctx: Context<'_, S>,
) {
// Get the weak dispatch reference.
//
// Note: We can't use the Dispatch::default() workaround described above here since this
// method is called from inside a dispatcher::get_default block, and such calls can't be
// nested so we would get the global dispatcher instead, which can't downcast to the right
// types when extracting the OpenTelemetry context. This also means that we will miss
// analyzing the first span that is created 🤷🏼‍♂️
let Some(weak_dispatch) = self.get_weak_dispatch(false) else {
return;
};

// Get the span reference and extract OpenTelemetry context
if let Some(span_ref) = ctx.span(id) {
// This is the key functionality: using OpenTelemetryContext
// to extract the OpenTelemetry context from span extensions
let mut extensions = span_ref.extensions_mut();
if let Some(otel_context) =
OpenTelemetryContext::context(&mut extensions, &weak_dispatch.upgrade())
{
self.analyze_span_context(attrs.metadata().name(), &otel_context);
} else {
println!(
"⚠️ Could not extract OpenTelemetry context for span '{}'",
attrs.metadata().name()
);
}
}
}

fn on_enter(&self, id: &tracing::span::Id, ctx: Context<'_, S>) {
if let Some(weak_dispatch) = self.get_weak_dispatch(true) {
if let Some(span_ref) = ctx.span(id) {
let mut extensions = span_ref.extensions_mut();
if let Some(otel_context) =
OpenTelemetryContext::context(&mut extensions, &weak_dispatch.upgrade())
{
let span = otel_context.span();
let span_context = span.span_context();
if span_context.is_valid() {
println!(
"📍 Entering span with trace_id: {:032x}, span_id: {:016x}",
span_context.trace_id(),
span_context.span_id()
);
}
}
}
}
}

fn on_register_dispatch(&self, subscriber: &tracing::Dispatch) {
// Note: This does not work for Layer until https://github.com/tokio-rs/tracing/pull/3379
// is merged and released, since `on_register_dispatch` is never called.
let mut dispatch = self.dispatch.write().unwrap();
*dispatch = Some(subscriber.clone().downgrade());
}
}

fn setup_tracing() -> (impl Subscriber, SdkTracerProvider, SpanAnalysisLayer) {
// Create OpenTelemetry tracer that outputs to stdout
let provider = SdkTracerProvider::builder()
.with_simple_exporter(stdout::SpanExporter::default())
.build();
let tracer = provider.tracer("span_ref_ext_example");

// Create our custom analysis layer
let analysis_layer = SpanAnalysisLayer::new();

// Build the subscriber with multiple layers:
// 1. OpenTelemetry layer for trace export
// 2. Our custom analysis layer that uses OpenTelemetryContext
// 3. Formatting layer for console output
let subscriber = tracing_subscriber::registry()
.with(layer().with_tracer(tracer).with_filter(LevelFilter::DEBUG))
.with(analysis_layer.clone())
.with(
tracing_subscriber::fmt::layer()
.with_target(false)
.with_filter(LevelFilter::INFO),
);

(subscriber, provider, analysis_layer)
}

fn simulate_application_work() {
// Create a root span for the main application work
let root_span = span!(tracing::Level::INFO, "application_main", version = "1.0.0");
let _root_guard = root_span.enter();

info!("Starting application");

// Simulate some business logic with nested spans
{
let auth_span = span!(tracing::Level::DEBUG, "authenticate_user", user_id = 12345);
let _auth_guard = auth_span.enter();

debug!("Validating user credentials");

// Simulate authentication work
std::thread::sleep(std::time::Duration::from_millis(10));

info!("User authenticated successfully");
}

// Simulate database operations
{
let db_span = span!(
tracing::Level::DEBUG,
"database_query",
query = "SELECT * FROM users",
table = "users"
);
let _db_guard = db_span.enter();

debug!("Executing database query");

// Nested span for connection management
{
let conn_span = span!(tracing::Level::DEBUG, "acquire_connection", pool_size = 10);
let _conn_guard = conn_span.enter();

debug!("Acquiring database connection from pool");
std::thread::sleep(std::time::Duration::from_millis(5));
}

std::thread::sleep(std::time::Duration::from_millis(20));
info!("Database query completed");
}

// Simulate some processing work
{
let process_span = span!(
tracing::Level::DEBUG,
"process_data",
records_count = 150,
batch_size = 50
);
let _process_guard = process_span.enter();

debug!("Processing user data");

for batch in 1..=3 {
let batch_span = span!(
tracing::Level::DEBUG,
"process_batch",
batch_number = batch,
batch_size = 50
);
let _batch_guard = batch_span.enter();

debug!("Processing batch {}", batch);
std::thread::sleep(std::time::Duration::from_millis(8));
}

info!("Data processing completed");
}

warn!("Application work completed");
}

fn main() {
println!(
"🚀 OpenTelemetryContext Example: Extracting OpenTelemetry Contexts from Separate Layer"
);
println!("{}", "=".repeat(80));

// Setup tracing with our custom layer
let (subscriber, provider, analysis_layer) = setup_tracing();

tracing::subscriber::with_default(subscriber, || {
// Simulate application work that generates spans
simulate_application_work();
});

// Ensure all spans are flushed
drop(provider);

// Display the analysis results
println!("\n📊 Span Analysis Results:");
println!("{}", "-".repeat(80));

let results = analysis_layer.get_analysis_results();
for (i, analysis) in results.iter().enumerate() {
println!(
"{}. Span: '{}'\n Trace ID: {}\n Span ID: {}\n Root Span: {}\n",
i + 1,
analysis.span_name,
analysis.trace_id,
analysis.span_id,
analysis.is_root_span
);
}

println!(
"✅ Example completed! Total spans analyzed: {}",
results.len()
);
}
38 changes: 36 additions & 2 deletions src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tracing_core::{field, Event, Subscriber};
#[cfg(feature = "tracing-log")]
use tracing_log::NormalizeEvent;
use tracing_subscriber::layer::Context;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::registry::{ExtensionsMut, LookupSpan};
use tracing_subscriber::Layer;
#[cfg(all(target_arch = "wasm32", not(target_os = "wasi")))]
use web_time::Instant;
Expand Down Expand Up @@ -104,6 +104,14 @@ pub(crate) struct WithContext {
#[allow(clippy::type_complexity)]
pub(crate) with_activated_context:
fn(&tracing::Dispatch, &span::Id, f: &mut dyn FnMut(&mut OtelData)),

///
/// Ensures the given SpanId has been activated - that is, created in the OTel side of things,
/// and had its SpanBuilder consumed - and then provides access to the OtelData associated with it.
///
#[allow(clippy::type_complexity)]
pub(crate) with_activated_context_extensions:
fn(&tracing::Dispatch, &mut ExtensionsMut<'_>, f: &mut dyn FnMut(&mut OtelData)),
}

impl WithContext {
Expand Down Expand Up @@ -133,6 +141,20 @@ impl WithContext {
) {
(self.with_activated_context)(dispatch, id, &mut f)
}

///
/// Ensures the given SpanId has been activated - that is, created in the OTel side of things,
/// and had its SpanBuilder consumed - and then provides access to the OtelData associated with it.
///
#[allow(clippy::type_complexity)]
pub(crate) fn with_activated_context_extensions(
&self,
dispatch: &tracing::Dispatch,
extensions: &mut ExtensionsMut<'_>,
mut f: impl FnMut(&mut OtelData),
) {
(self.with_activated_context_extensions)(dispatch, extensions, &mut f)
}
}

fn str_to_span_kind(s: &str) -> Option<otel::SpanKind> {
Expand Down Expand Up @@ -625,6 +647,7 @@ where
with_context: WithContext {
with_context: Self::get_context,
with_activated_context: Self::get_activated_context,
with_activated_context_extensions: Self::get_activated_context_extensions,
},
_registry: marker::PhantomData,
}
Expand Down Expand Up @@ -680,6 +703,8 @@ where
with_context: WithContext {
with_context: OpenTelemetryLayer::<S, Tracer>::get_context,
with_activated_context: OpenTelemetryLayer::<S, Tracer>::get_activated_context,
with_activated_context_extensions:
OpenTelemetryLayer::<S, Tracer>::get_activated_context_extensions,
},
_registry: self._registry,
// cannot use ``..self` here due to different generics
Expand Down Expand Up @@ -937,11 +962,20 @@ where
.span(id)
.expect("registry should have a span for the current ID");

let mut extensions = span.extensions_mut();

Self::get_activated_context_extensions(dispatch, &mut extensions, f)
}

fn get_activated_context_extensions(
dispatch: &tracing::Dispatch,
extensions: &mut ExtensionsMut<'_>,
f: &mut dyn FnMut(&mut OtelData),
) {
let layer = dispatch
.downcast_ref::<OpenTelemetryLayer<S, T>>()
.expect("layer should downcast to expected type; this is a bug!");

let mut extensions = span.extensions_mut();
if let Some(otel_data) = extensions.get_mut::<OtelData>() {
// Activate the context
layer.start_cx(otel_data);
Expand Down
Loading
Loading