@@ -15,11 +15,16 @@ use opentelemetry_sdk::{
1515 error:: OTelSdkResult ,
1616 logs:: { LogProcessor , SdkLogRecord , SdkLoggerProvider } ,
1717 propagation:: { BaggagePropagator , TraceContextPropagator } ,
18- trace:: { SdkTracerProvider , SpanProcessor } ,
18+ trace:: { FinishedSpan , ReadableSpan , SdkTracerProvider , SpanProcessor } ,
1919} ;
2020use opentelemetry_semantic_conventions:: trace;
2121use opentelemetry_stdout:: { LogExporter , SpanExporter } ;
22- use std:: { convert:: Infallible , net:: SocketAddr , sync:: OnceLock } ;
22+ use std:: {
23+ collections:: HashMap ,
24+ convert:: Infallible ,
25+ net:: SocketAddr ,
26+ sync:: { Mutex , OnceLock } ,
27+ } ;
2328use tokio:: net:: TcpListener ;
2429use tracing:: info;
2530use tracing_subscriber:: { layer:: SubscriberExt , util:: SubscriberInitExt } ;
@@ -83,6 +88,7 @@ async fn router(
8388 let span = tracer
8489 . span_builder ( "router" )
8590 . with_kind ( SpanKind :: Server )
91+ . with_attributes ( [ KeyValue :: new ( "http.route" , req. uri ( ) . path ( ) . to_string ( ) ) ] )
8692 . start_with_context ( tracer, & parent_cx) ;
8793
8894 info ! ( name = "router" , message = "Dispatching request" ) ;
@@ -104,6 +110,62 @@ async fn router(
104110 response
105111}
106112
113+ #[ derive( Debug , Default ) ]
114+ /// A custom span processor that counts concurrent requests for each route (indetified by the http.route
115+ /// attribute) and adds that information to the span attributes.
116+ struct RouteConcurrencyCounterSpanProcessor ( Mutex < HashMap < opentelemetry:: Key , usize > > ) ;
117+
118+ impl SpanProcessor for RouteConcurrencyCounterSpanProcessor {
119+ fn force_flush ( & self ) -> OTelSdkResult {
120+ Ok ( ( ) )
121+ }
122+
123+ fn shutdown ( & self ) -> OTelSdkResult {
124+ Ok ( ( ) )
125+ }
126+
127+ fn on_start ( & self , span : & mut opentelemetry_sdk:: trace:: Span , _cx : & Context ) {
128+ if !matches ! ( span. span_kind( ) , SpanKind :: Server ) {
129+ return ;
130+ }
131+ let Some ( route) = span
132+ . attributes ( )
133+ . iter ( )
134+ . find ( |kv| kv. key . as_str ( ) == "http.route" )
135+ else {
136+ return ;
137+ } ;
138+ let mut counts = self . 0 . lock ( ) . unwrap ( ) ;
139+ let count = counts. entry ( route. key . clone ( ) ) . or_default ( ) ;
140+ * count += 1 ;
141+ span. set_attribute ( KeyValue :: new (
142+ "http.route.concurrent_requests" ,
143+ count. to_string ( ) ,
144+ ) ) ;
145+ }
146+
147+ fn on_end ( & self , span : & mut FinishedSpan ) {
148+ if !matches ! ( span. span_kind( ) , SpanKind :: Server ) {
149+ return ;
150+ }
151+ let Some ( route) = span
152+ . attributes ( )
153+ . iter ( )
154+ . find ( |kv| kv. key . as_str ( ) == "http.route" )
155+ else {
156+ return ;
157+ } ;
158+ let mut counts = self . 0 . lock ( ) . unwrap ( ) ;
159+ let Some ( count) = counts. get_mut ( & route. key ) else {
160+ return ;
161+ } ;
162+ * count -= 1 ;
163+ if * count == 0 {
164+ counts. remove ( & route. key ) ;
165+ }
166+ }
167+ }
168+
107169/// A custom log processor that enriches LogRecords with baggage attributes.
108170/// Baggage information is not added automatically without this processor.
109171#[ derive( Debug ) ]
@@ -145,7 +207,7 @@ impl SpanProcessor for EnrichWithBaggageSpanProcessor {
145207 }
146208 }
147209
148- fn on_end ( & self , _span : opentelemetry_sdk:: trace:: SpanData ) { }
210+ fn on_end ( & self , _span : & mut opentelemetry_sdk:: trace:: FinishedSpan ) { }
149211}
150212
151213fn init_tracer ( ) -> SdkTracerProvider {
@@ -161,6 +223,7 @@ fn init_tracer() -> SdkTracerProvider {
161223 // Setup tracerprovider with stdout exporter
162224 // that prints the spans to stdout.
163225 let provider = SdkTracerProvider :: builder ( )
226+ . with_span_processor ( RouteConcurrencyCounterSpanProcessor :: default ( ) )
164227 . with_span_processor ( EnrichWithBaggageSpanProcessor )
165228 . with_simple_exporter ( SpanExporter :: default ( ) )
166229 . build ( ) ;
0 commit comments