@@ -2,16 +2,27 @@ use http_body_util::{combinators::BoxBody, BodyExt, Full};
22use hyper:: { body:: Incoming , service:: service_fn, Request , Response , StatusCode } ;
33use hyper_util:: rt:: { TokioExecutor , TokioIo } ;
44use opentelemetry:: {
5+ baggage:: BaggageExt ,
56 global:: { self , BoxedTracer } ,
7+ logs:: LogRecord ,
8+ propagation:: TextMapCompositePropagator ,
69 trace:: { FutureExt , Span , SpanKind , TraceContextExt , Tracer } ,
7- Context , KeyValue ,
10+ Context , InstrumentationScope , KeyValue ,
811} ;
12+ use opentelemetry_appender_tracing:: layer:: OpenTelemetryTracingBridge ;
913use opentelemetry_http:: { Bytes , HeaderExtractor } ;
10- use opentelemetry_sdk:: { propagation:: TraceContextPropagator , trace:: SdkTracerProvider } ;
14+ use opentelemetry_sdk:: {
15+ error:: OTelSdkResult ,
16+ logs:: { LogProcessor , SdkLogRecord , SdkLoggerProvider } ,
17+ propagation:: { BaggagePropagator , TraceContextPropagator } ,
18+ trace:: { SdkTracerProvider , SpanProcessor } ,
19+ } ;
1120use opentelemetry_semantic_conventions:: trace;
12- use opentelemetry_stdout:: SpanExporter ;
21+ use opentelemetry_stdout:: { LogExporter , SpanExporter } ;
1322use std:: { convert:: Infallible , net:: SocketAddr , sync:: OnceLock } ;
1423use tokio:: net:: TcpListener ;
24+ use tracing:: info;
25+ use tracing_subscriber:: { layer:: SubscriberExt , util:: SubscriberInitExt } ;
1526
1627fn get_tracer ( ) -> & ' static BoxedTracer {
1728 static TRACER : OnceLock < BoxedTracer > = OnceLock :: new ( ) ;
@@ -30,11 +41,11 @@ async fn handle_health_check(
3041 _req : Request < Incoming > ,
3142) -> Result < Response < BoxBody < Bytes , hyper:: Error > > , Infallible > {
3243 let tracer = get_tracer ( ) ;
33- let mut span = tracer
44+ let _span = tracer
3445 . span_builder ( "health_check" )
3546 . with_kind ( SpanKind :: Internal )
3647 . start ( tracer) ;
37- span . add_event ( " Health check accessed" , vec ! [ ] ) ;
48+ info ! ( name : "health_check" , message = " Health check endpoint hit" ) ;
3849
3950 let res = Response :: new (
4051 Full :: new ( Bytes :: from_static ( b"Server is up and running!" ) )
@@ -50,11 +61,11 @@ async fn handle_echo(
5061 req : Request < Incoming > ,
5162) -> Result < Response < BoxBody < Bytes , hyper:: Error > > , Infallible > {
5263 let tracer = get_tracer ( ) ;
53- let mut span = tracer
64+ let _span = tracer
5465 . span_builder ( "echo" )
5566 . with_kind ( SpanKind :: Internal )
5667 . start ( tracer) ;
57- span . add_event ( "Echoing back the request" , vec ! [ ] ) ;
68+ info ! ( name = "echo" , message = "Echo endpoint hit" ) ;
5869
5970 let res = Response :: new ( req. into_body ( ) . boxed ( ) ) ;
6071
@@ -69,14 +80,14 @@ async fn router(
6980 let response = {
7081 // Create a span parenting the remote client span.
7182 let tracer = get_tracer ( ) ;
72- let mut span = tracer
83+ let span = tracer
7384 . span_builder ( "router" )
7485 . with_kind ( SpanKind :: Server )
7586 . start_with_context ( tracer, & parent_cx) ;
7687
77- span . add_event ( "dispatching request" , vec ! [ ] ) ;
88+ info ! ( name = "router" , message = "Dispatching request" ) ;
7889
79- let cx = Context :: default ( ) . with_span ( span) ;
90+ let cx = parent_cx . with_span ( span) ;
8091 match ( req. method ( ) , req. uri ( ) . path ( ) ) {
8192 ( & hyper:: Method :: GET , "/health" ) => handle_health_check ( req) . with_context ( cx) . await ,
8293 ( & hyper:: Method :: GET , "/echo" ) => handle_echo ( req) . with_context ( cx) . await ,
@@ -93,24 +104,90 @@ async fn router(
93104 response
94105}
95106
107+ /// A custom log processor that enriches LogRecords with baggage attributes.
108+ /// Baggage information is not added automatically without this processor.
109+ #[ derive( Debug ) ]
110+ struct EnrichWithBaggageLogProcessor ;
111+ impl LogProcessor for EnrichWithBaggageLogProcessor {
112+ fn emit ( & self , data : & mut SdkLogRecord , _instrumentation : & InstrumentationScope ) {
113+ Context :: map_current ( |cx| {
114+ for ( kk, vv) in cx. baggage ( ) . iter ( ) {
115+ data. add_attribute ( kk. clone ( ) , vv. 0 . clone ( ) ) ;
116+ }
117+ } ) ;
118+ }
119+
120+ fn force_flush ( & self ) -> OTelSdkResult {
121+ Ok ( ( ) )
122+ }
123+
124+ fn shutdown ( & self ) -> OTelSdkResult {
125+ Ok ( ( ) )
126+ }
127+ }
128+
129+ /// A custom span processor that enriches spans with baggage attributes. Baggage
130+ /// information is not added automatically without this processor.
131+ #[ derive( Debug ) ]
132+ struct EnrichWithBaggageSpanProcessor ;
133+ impl SpanProcessor for EnrichWithBaggageSpanProcessor {
134+ fn force_flush ( & self ) -> OTelSdkResult {
135+ Ok ( ( ) )
136+ }
137+
138+ fn shutdown ( & self ) -> OTelSdkResult {
139+ Ok ( ( ) )
140+ }
141+
142+ fn on_start ( & self , span : & mut opentelemetry_sdk:: trace:: Span , cx : & Context ) {
143+ for ( kk, vv) in cx. baggage ( ) . iter ( ) {
144+ span. set_attribute ( KeyValue :: new ( kk. clone ( ) , vv. 0 . clone ( ) ) ) ;
145+ }
146+ }
147+
148+ fn on_end ( & self , _span : opentelemetry_sdk:: trace:: SpanData ) { }
149+ }
150+
96151fn init_tracer ( ) -> SdkTracerProvider {
97- global:: set_text_map_propagator ( TraceContextPropagator :: new ( ) ) ;
152+ let baggage_propagator = BaggagePropagator :: new ( ) ;
153+ let trace_context_propagator = TraceContextPropagator :: new ( ) ;
154+ let composite_propagator = TextMapCompositePropagator :: new ( vec ! [
155+ Box :: new( baggage_propagator) ,
156+ Box :: new( trace_context_propagator) ,
157+ ] ) ;
158+
159+ global:: set_text_map_propagator ( composite_propagator) ;
98160
99161 // Setup tracerprovider with stdout exporter
100162 // that prints the spans to stdout.
101163 let provider = SdkTracerProvider :: builder ( )
164+ . with_span_processor ( EnrichWithBaggageSpanProcessor )
102165 . with_simple_exporter ( SpanExporter :: default ( ) )
103166 . build ( ) ;
104167
105168 global:: set_tracer_provider ( provider. clone ( ) ) ;
106169 provider
107170}
108171
172+ fn init_logs ( ) -> SdkLoggerProvider {
173+ // Setup tracerprovider with stdout exporter
174+ // that prints the spans to stdout.
175+ let logger_provider = SdkLoggerProvider :: builder ( )
176+ . with_log_processor ( EnrichWithBaggageLogProcessor )
177+ . with_simple_exporter ( LogExporter :: default ( ) )
178+ . build ( ) ;
179+ let otel_layer = OpenTelemetryTracingBridge :: new ( & logger_provider) ;
180+ tracing_subscriber:: registry ( ) . with ( otel_layer) . init ( ) ;
181+
182+ logger_provider
183+ }
184+
109185#[ tokio:: main]
110186async fn main ( ) {
111187 use hyper_util:: server:: conn:: auto:: Builder ;
112188
113189 let provider = init_tracer ( ) ;
190+ let logger_provider = init_logs ( ) ;
114191 let addr = SocketAddr :: from ( ( [ 127 , 0 , 0 , 1 ] , 3000 ) ) ;
115192 let listener = TcpListener :: bind ( addr) . await . unwrap ( ) ;
116193
@@ -124,4 +201,7 @@ async fn main() {
124201 }
125202
126203 provider. shutdown ( ) . expect ( "Shutdown provider failed" ) ;
204+ logger_provider
205+ . shutdown ( )
206+ . expect ( "Shutdown provider failed" ) ;
127207}
0 commit comments