From 959b9ef0c0af330b6849085da0fa5faccf7dabe5 Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Sun, 23 Mar 2025 12:31:35 -0700 Subject: [PATCH 1/3] docs: Modify example to use logs, baggage --- examples/tracing-http-propagator/Cargo.toml | 5 +- .../tracing-http-propagator/src/client.rs | 48 +++++++-- .../tracing-http-propagator/src/server.rs | 101 ++++++++++++++++-- 3 files changed, 132 insertions(+), 22 deletions(-) diff --git a/examples/tracing-http-propagator/Cargo.toml b/examples/tracing-http-propagator/Cargo.toml index 9aa47859b7..d41e055ab4 100644 --- a/examples/tracing-http-propagator/Cargo.toml +++ b/examples/tracing-http-propagator/Cargo.toml @@ -23,5 +23,8 @@ tokio = { workspace = true, features = ["full"] } opentelemetry = { path = "../../opentelemetry" } opentelemetry_sdk = { path = "../../opentelemetry-sdk" } opentelemetry-http = { path = "../../opentelemetry-http" } -opentelemetry-stdout = { workspace = true, features = ["trace"] } +opentelemetry-stdout = { workspace = true, features = ["trace", "logs"] } opentelemetry-semantic-conventions = { path = "../../opentelemetry-semantic-conventions" } +opentelemetry-appender-tracing = { workspace = true } +tracing = { workspace = true, features = ["std"]} +tracing-subscriber = { workspace = true, features = ["env-filter","registry", "std", "fmt"] } diff --git a/examples/tracing-http-propagator/src/client.rs b/examples/tracing-http-propagator/src/client.rs index 35d0b34be3..dfc6cf50c9 100644 --- a/examples/tracing-http-propagator/src/client.rs +++ b/examples/tracing-http-propagator/src/client.rs @@ -3,13 +3,18 @@ use hyper_util::{client::legacy::Client, rt::TokioExecutor}; use opentelemetry::{ global, trace::{SpanKind, TraceContextExt, Tracer}, - Context, KeyValue, + Context, }; +use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; use opentelemetry_http::{Bytes, HeaderInjector}; -use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::SdkTracerProvider}; -use opentelemetry_stdout::SpanExporter; +use opentelemetry_sdk::{ + logs::SdkLoggerProvider, propagation::TraceContextPropagator, trace::SdkTracerProvider, +}; +use opentelemetry_stdout::{LogExporter, SpanExporter}; +use tracing::info; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; -fn init_tracer() { +fn init_tracer() -> SdkTracerProvider { global::set_text_map_propagator(TraceContextPropagator::new()); // Install stdout exporter pipeline to be able to retrieve the collected spans. // For the demonstration, use `Sampler::AlwaysOn` sampler to sample all traces. @@ -17,7 +22,23 @@ fn init_tracer() { .with_simple_exporter(SpanExporter::default()) .build(); - global::set_tracer_provider(provider); + global::set_tracer_provider(provider.clone()); + provider +} + +fn init_logs() -> SdkLoggerProvider { + // Setup tracerprovider with stdout exporter + // that prints the spans to stdout. + let logger_provider = SdkLoggerProvider::builder() + .with_simple_exporter(LogExporter::default()) + .build(); + let otel_layer = OpenTelemetryTracingBridge::new(&logger_provider); + tracing_subscriber::registry() + .with(otel_layer) + .with(tracing_subscriber::filter::LevelFilter::INFO) + .init(); + + logger_provider } async fn send_request( @@ -37,21 +58,22 @@ async fn send_request( global::get_text_map_propagator(|propagator| { propagator.inject_context(&cx, &mut HeaderInjector(req.headers_mut().unwrap())) }); + req.headers_mut() + .unwrap() + .insert("baggage", "is_synthetic=true".parse().unwrap()); let res = client .request(req.body(Full::new(Bytes::from(body_content.to_string())))?) .await?; - cx.span().add_event( - "Got response!", - vec![KeyValue::new("status", res.status().to_string())], - ); + info!(name: "ResponseReceived", status = res.status().to_string(), message = "Response received"); Ok(()) } #[tokio::main] async fn main() -> std::result::Result<(), Box> { - init_tracer(); + let tracer_provider = init_tracer(); + let logger_provider = init_logs(); send_request( "http://127.0.0.1:3000/health", @@ -66,5 +88,11 @@ async fn main() -> std::result::Result<(), Box &'static BoxedTracer { static TRACER: OnceLock = OnceLock::new(); @@ -30,11 +41,11 @@ async fn handle_health_check( _req: Request, ) -> Result>, Infallible> { let tracer = get_tracer(); - let mut span = tracer + let _span = tracer .span_builder("health_check") .with_kind(SpanKind::Internal) .start(tracer); - span.add_event("Health check accessed", vec![]); + info!(name: "health_check", message = "Health check endpoint hit"); let res = Response::new( Full::new(Bytes::from_static(b"Server is up and running!")) @@ -50,11 +61,11 @@ async fn handle_echo( req: Request, ) -> Result>, Infallible> { let tracer = get_tracer(); - let mut span = tracer + let _span = tracer .span_builder("echo") .with_kind(SpanKind::Internal) .start(tracer); - span.add_event("Echoing back the request", vec![]); + info!(name = "echo", message = "Echo endpoint hit"); let res = Response::new(req.into_body().boxed()); @@ -66,17 +77,20 @@ async fn router( ) -> Result>, Infallible> { // Extract the context from the incoming request headers let parent_cx = extract_context_from_request(&req); + for bag in parent_cx.baggage() { + println!("Baggage: {:?} = {:?}", bag.0, bag.1); + } let response = { // Create a span parenting the remote client span. let tracer = get_tracer(); - let mut span = tracer + let span = tracer .span_builder("router") .with_kind(SpanKind::Server) .start_with_context(tracer, &parent_cx); - span.add_event("dispatching request", vec![]); + info!(name = "router", message = "Dispatching request"); - let cx = Context::default().with_span(span); + let cx = parent_cx.with_span(span); match (req.method(), req.uri().path()) { (&hyper::Method::GET, "/health") => handle_health_check(req).with_context(cx).await, (&hyper::Method::GET, "/echo") => handle_echo(req).with_context(cx).await, @@ -93,12 +107,60 @@ async fn router( response } +#[derive(Debug)] +struct EnrichWithBaggageLogProcessor; +impl LogProcessor for EnrichWithBaggageLogProcessor { + fn emit(&self, data: &mut SdkLogRecord, _instrumentation: &InstrumentationScope) { + Context::map_current(|cx| { + for (kk, vv) in cx.baggage().iter() { + data.add_attribute(kk.clone(), vv.0.clone()); + } + }); + } + + fn force_flush(&self) -> OTelSdkResult { + Ok(()) + } + + fn shutdown(&self) -> OTelSdkResult { + Ok(()) + } +} + +#[derive(Debug)] +struct EnrichWithBaggageSpanProcessor; +impl SpanProcessor for EnrichWithBaggageSpanProcessor { + fn force_flush(&self) -> OTelSdkResult { + Ok(()) + } + + fn shutdown(&self) -> OTelSdkResult { + Ok(()) + } + + fn on_start(&self, span: &mut opentelemetry_sdk::trace::Span, cx: &Context) { + for (kk, vv) in cx.baggage().iter() { + span.set_attribute(KeyValue::new(kk.clone(), vv.0.clone())); + } + } + + fn on_end(&self, _span: opentelemetry_sdk::trace::SpanData) {} +} + fn init_tracer() -> SdkTracerProvider { - global::set_text_map_propagator(TraceContextPropagator::new()); + let baggage_propagator = BaggagePropagator::new(); + let trace_context_propagator = TraceContextPropagator::new(); + let composite_propagator = TextMapCompositePropagator::new(vec![ + Box::new(baggage_propagator), + Box::new(trace_context_propagator), + ]); + + global::set_text_map_propagator(composite_propagator); // Setup tracerprovider with stdout exporter // that prints the spans to stdout. let provider = SdkTracerProvider::builder() + .with_span_processor(EnrichWithBaggageSpanProcessor) .with_simple_exporter(SpanExporter::default()) .build(); @@ -106,11 +168,25 @@ fn init_tracer() -> SdkTracerProvider { provider } +fn init_logs() -> SdkLoggerProvider { + // Setup tracerprovider with stdout exporter + // that prints the spans to stdout. + let logger_provider = SdkLoggerProvider::builder() + .with_log_processor(EnrichWithBaggageLogProcessor) + .with_simple_exporter(LogExporter::default()) + .build(); + let otel_layer = OpenTelemetryTracingBridge::new(&logger_provider); + tracing_subscriber::registry().with(otel_layer).init(); + + logger_provider +} + #[tokio::main] async fn main() { use hyper_util::server::conn::auto::Builder; let provider = init_tracer(); + let logger_provider = init_logs(); let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); let listener = TcpListener::bind(addr).await.unwrap(); @@ -124,4 +200,7 @@ async fn main() { } provider.shutdown().expect("Shutdown provider failed"); + logger_provider + .shutdown() + .expect("Shutdown provider failed"); } From 973c4dc7da87314d97108557d5848d5eb85b3bce Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Mon, 24 Mar 2025 08:10:23 -0700 Subject: [PATCH 2/3] review comment --- examples/tracing-http-propagator/src/server.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/examples/tracing-http-propagator/src/server.rs b/examples/tracing-http-propagator/src/server.rs index 767e1e9886..bf499f9c6a 100644 --- a/examples/tracing-http-propagator/src/server.rs +++ b/examples/tracing-http-propagator/src/server.rs @@ -41,7 +41,7 @@ async fn handle_health_check( _req: Request, ) -> Result>, Infallible> { let tracer = get_tracer(); - let _span = tracer + let _ = tracer .span_builder("health_check") .with_kind(SpanKind::Internal) .start(tracer); @@ -61,7 +61,7 @@ async fn handle_echo( req: Request, ) -> Result>, Infallible> { let tracer = get_tracer(); - let _span = tracer + let _ = tracer .span_builder("echo") .with_kind(SpanKind::Internal) .start(tracer); @@ -77,9 +77,6 @@ async fn router( ) -> Result>, Infallible> { // Extract the context from the incoming request headers let parent_cx = extract_context_from_request(&req); - for bag in parent_cx.baggage() { - println!("Baggage: {:?} = {:?}", bag.0, bag.1); - } let response = { // Create a span parenting the remote client span. let tracer = get_tracer(); @@ -107,6 +104,8 @@ async fn router( response } +/// A custom log processor that enriches LogRecords with baggage attributes. +/// Baggage information is not added automatically without this processor. #[derive(Debug)] struct EnrichWithBaggageLogProcessor; impl LogProcessor for EnrichWithBaggageLogProcessor { @@ -127,6 +126,8 @@ impl LogProcessor for EnrichWithBaggageLogProcessor { } } +/// A custom span processor that enriches spans with baggage attributes. Baggage +/// information is not added automatically without this processor. #[derive(Debug)] struct EnrichWithBaggageSpanProcessor; impl SpanProcessor for EnrichWithBaggageSpanProcessor { From 1d9cc2bc327f45743fd6de0a649cce1ee936278d Mon Sep 17 00:00:00 2001 From: Cijo Thomas Date: Tue, 25 Mar 2025 12:11:25 -0700 Subject: [PATCH 3/3] fix undesired early drop --- examples/tracing-http-propagator/src/server.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/tracing-http-propagator/src/server.rs b/examples/tracing-http-propagator/src/server.rs index bf499f9c6a..a7aa09aac9 100644 --- a/examples/tracing-http-propagator/src/server.rs +++ b/examples/tracing-http-propagator/src/server.rs @@ -41,7 +41,7 @@ async fn handle_health_check( _req: Request, ) -> Result>, Infallible> { let tracer = get_tracer(); - let _ = tracer + let _span = tracer .span_builder("health_check") .with_kind(SpanKind::Internal) .start(tracer); @@ -61,7 +61,7 @@ async fn handle_echo( req: Request, ) -> Result>, Infallible> { let tracer = get_tracer(); - let _ = tracer + let _span = tracer .span_builder("echo") .with_kind(SpanKind::Internal) .start(tracer);