@@ -2,16 +2,27 @@ use http_body_util::{combinators::BoxBody, BodyExt, Full};
22use hyper:: { body:: Incoming , service:: service_fn, Request , Response , StatusCode } ;
33use hyper_util:: rt:: { TokioExecutor , TokioIo } ;
44use opentelemetry:: {
5+ baggage:: BaggageExt ,
56 global:: { self , BoxedTracer } ,
7+ logs:: LogRecord ,
8+ propagation:: TextMapCompositePropagator ,
69 trace:: { FutureExt , Span , SpanKind , TraceContextExt , Tracer } ,
7- Context , KeyValue ,
10+ Context , InstrumentationScope , KeyValue ,
811} ;
12+ use opentelemetry_appender_tracing:: layer:: OpenTelemetryTracingBridge ;
913use opentelemetry_http:: { Bytes , HeaderExtractor } ;
10- use opentelemetry_sdk:: { propagation:: TraceContextPropagator , trace:: SdkTracerProvider } ;
14+ use opentelemetry_sdk:: {
15+ error:: OTelSdkResult ,
16+ logs:: { LogProcessor , SdkLogRecord , SdkLoggerProvider } ,
17+ propagation:: { BaggagePropagator , TraceContextPropagator } ,
18+ trace:: { SdkTracerProvider , SpanProcessor } ,
19+ } ;
1120use opentelemetry_semantic_conventions:: trace;
12- use opentelemetry_stdout:: SpanExporter ;
21+ use opentelemetry_stdout:: { LogExporter , SpanExporter } ;
1322use std:: { convert:: Infallible , net:: SocketAddr , sync:: OnceLock } ;
1423use tokio:: net:: TcpListener ;
24+ use tracing:: info;
25+ use tracing_subscriber:: { layer:: SubscriberExt , util:: SubscriberInitExt } ;
1526
1627fn get_tracer ( ) -> & ' static BoxedTracer {
1728 static TRACER : OnceLock < BoxedTracer > = OnceLock :: new ( ) ;
@@ -30,11 +41,11 @@ async fn handle_health_check(
3041 _req : Request < Incoming > ,
3142) -> Result < Response < BoxBody < Bytes , hyper:: Error > > , Infallible > {
3243 let tracer = get_tracer ( ) ;
33- let mut span = tracer
44+ let _span = tracer
3445 . span_builder ( "health_check" )
3546 . with_kind ( SpanKind :: Internal )
3647 . start ( tracer) ;
37- span . add_event ( " Health check accessed" , vec ! [ ] ) ;
48+ info ! ( name : "health_check" , message = " Health check endpoint hit" ) ;
3849
3950 let res = Response :: new (
4051 Full :: new ( Bytes :: from_static ( b"Server is up and running!" ) )
@@ -50,11 +61,11 @@ async fn handle_echo(
5061 req : Request < Incoming > ,
5162) -> Result < Response < BoxBody < Bytes , hyper:: Error > > , Infallible > {
5263 let tracer = get_tracer ( ) ;
53- let mut span = tracer
64+ let _span = tracer
5465 . span_builder ( "echo" )
5566 . with_kind ( SpanKind :: Internal )
5667 . start ( tracer) ;
57- span . add_event ( "Echoing back the request" , vec ! [ ] ) ;
68+ info ! ( name = "echo" , message = "Echo endpoint hit" ) ;
5869
5970 let res = Response :: new ( req. into_body ( ) . boxed ( ) ) ;
6071
@@ -66,17 +77,20 @@ async fn router(
6677) -> Result < Response < BoxBody < Bytes , hyper:: Error > > , Infallible > {
6778 // Extract the context from the incoming request headers
6879 let parent_cx = extract_context_from_request ( & req) ;
80+ for bag in parent_cx. baggage ( ) {
81+ println ! ( "Baggage: {:?} = {:?}" , bag. 0 , bag. 1 ) ;
82+ }
6983 let response = {
7084 // Create a span parenting the remote client span.
7185 let tracer = get_tracer ( ) ;
72- let mut span = tracer
86+ let span = tracer
7387 . span_builder ( "router" )
7488 . with_kind ( SpanKind :: Server )
7589 . start_with_context ( tracer, & parent_cx) ;
7690
77- span . add_event ( "dispatching request" , vec ! [ ] ) ;
91+ info ! ( name = "router" , message = "Dispatching request" ) ;
7892
79- let cx = Context :: default ( ) . with_span ( span) ;
93+ let cx = parent_cx . with_span ( span) ;
8094 match ( req. method ( ) , req. uri ( ) . path ( ) ) {
8195 ( & hyper:: Method :: GET , "/health" ) => handle_health_check ( req) . with_context ( cx) . await ,
8296 ( & hyper:: Method :: GET , "/echo" ) => handle_echo ( req) . with_context ( cx) . await ,
@@ -93,24 +107,86 @@ async fn router(
93107 response
94108}
95109
110+ #[ derive( Debug ) ]
111+ struct EnrichWithBaggageLogProcessor ;
112+ impl LogProcessor for EnrichWithBaggageLogProcessor {
113+ fn emit ( & self , data : & mut SdkLogRecord , _instrumentation : & InstrumentationScope ) {
114+ Context :: map_current ( |cx| {
115+ for ( kk, vv) in cx. baggage ( ) . iter ( ) {
116+ data. add_attribute ( kk. clone ( ) , vv. 0 . clone ( ) ) ;
117+ }
118+ } ) ;
119+ }
120+
121+ fn force_flush ( & self ) -> OTelSdkResult {
122+ Ok ( ( ) )
123+ }
124+
125+ fn shutdown ( & self ) -> OTelSdkResult {
126+ Ok ( ( ) )
127+ }
128+ }
129+
130+ #[ derive( Debug ) ]
131+ struct EnrichWithBaggageSpanProcessor ;
132+ impl SpanProcessor for EnrichWithBaggageSpanProcessor {
133+ fn force_flush ( & self ) -> OTelSdkResult {
134+ Ok ( ( ) )
135+ }
136+
137+ fn shutdown ( & self ) -> OTelSdkResult {
138+ Ok ( ( ) )
139+ }
140+
141+ fn on_start ( & self , span : & mut opentelemetry_sdk:: trace:: Span , cx : & Context ) {
142+ for ( kk, vv) in cx. baggage ( ) . iter ( ) {
143+ span. set_attribute ( KeyValue :: new ( kk. clone ( ) , vv. 0 . clone ( ) ) ) ;
144+ }
145+ }
146+
147+ fn on_end ( & self , _span : opentelemetry_sdk:: trace:: SpanData ) { }
148+ }
149+
96150fn init_tracer ( ) -> SdkTracerProvider {
97- global:: set_text_map_propagator ( TraceContextPropagator :: new ( ) ) ;
151+ let baggage_propagator = BaggagePropagator :: new ( ) ;
152+ let trace_context_propagator = TraceContextPropagator :: new ( ) ;
153+ let composite_propagator = TextMapCompositePropagator :: new ( vec ! [
154+ Box :: new( baggage_propagator) ,
155+ Box :: new( trace_context_propagator) ,
156+ ] ) ;
157+
158+ global:: set_text_map_propagator ( composite_propagator) ;
98159
99160 // Setup tracerprovider with stdout exporter
100161 // that prints the spans to stdout.
101162 let provider = SdkTracerProvider :: builder ( )
163+ . with_span_processor ( EnrichWithBaggageSpanProcessor )
102164 . with_simple_exporter ( SpanExporter :: default ( ) )
103165 . build ( ) ;
104166
105167 global:: set_tracer_provider ( provider. clone ( ) ) ;
106168 provider
107169}
108170
171+ fn init_logs ( ) -> SdkLoggerProvider {
172+ // Setup tracerprovider with stdout exporter
173+ // that prints the spans to stdout.
174+ let logger_provider = SdkLoggerProvider :: builder ( )
175+ . with_log_processor ( EnrichWithBaggageLogProcessor )
176+ . with_simple_exporter ( LogExporter :: default ( ) )
177+ . build ( ) ;
178+ let otel_layer = OpenTelemetryTracingBridge :: new ( & logger_provider) ;
179+ tracing_subscriber:: registry ( ) . with ( otel_layer) . init ( ) ;
180+
181+ logger_provider
182+ }
183+
109184#[ tokio:: main]
110185async fn main ( ) {
111186 use hyper_util:: server:: conn:: auto:: Builder ;
112187
113188 let provider = init_tracer ( ) ;
189+ let logger_provider = init_logs ( ) ;
114190 let addr = SocketAddr :: from ( ( [ 127 , 0 , 0 , 1 ] , 3000 ) ) ;
115191 let listener = TcpListener :: bind ( addr) . await . unwrap ( ) ;
116192
@@ -124,4 +200,7 @@ async fn main() {
124200 }
125201
126202 provider. shutdown ( ) . expect ( "Shutdown provider failed" ) ;
203+ logger_provider
204+ . shutdown ( )
205+ . expect ( "Shutdown provider failed" ) ;
127206}
0 commit comments