@@ -45,12 +45,15 @@ use futures_util::{
4545 stream:: { self , FusedStream , FuturesUnordered } ,
4646 StreamExt as _,
4747} ;
48- use opentelemetry:: { otel_debug, otel_error} ;
48+ use opentelemetry:: global:: { self } ;
49+ use opentelemetry:: metrics:: Counter ;
50+ use opentelemetry:: { otel_debug, otel_error, otel_warn} ;
4951use opentelemetry:: {
5052 trace:: { TraceError , TraceResult } ,
5153 Context ,
5254} ;
5355use std:: cmp:: min;
56+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
5457use std:: sync:: { Arc , Mutex } ;
5558use std:: { env, fmt, str:: FromStr , time:: Duration } ;
5659
@@ -227,6 +230,11 @@ impl SpanProcessor for SimpleSpanProcessor {
227230/// [`async-std`]: https://async.rs
228231pub struct BatchSpanProcessor < R : RuntimeChannel > {
229232 message_sender : R :: Sender < BatchMessage > ,
233+
234+ // Track dropped spans. We'll log this at shutdown and also emit
235+ // as a metric.
236+ dropped_spans_count : AtomicUsize ,
237+ dropped_spans_metric : Counter < u64 > ,
230238}
231239
232240impl < R : RuntimeChannel > fmt:: Debug for BatchSpanProcessor < R > {
@@ -249,11 +257,11 @@ impl<R: RuntimeChannel> SpanProcessor for BatchSpanProcessor<R> {
249257
250258 let result = self . message_sender . try_send ( BatchMessage :: ExportSpan ( span) ) ;
251259
252- if let Err ( err ) = result {
253- otel_debug ! (
254- name : "BatchSpanProcessor.OnEnd.ExportQueueingFailed" ,
255- reason = format! ( "{:?}" , TraceError :: Other ( err . into ( ) ) )
256- ) ;
260+ // If the queue is full, and we can't buffer a span
261+ if result . is_err ( ) {
262+ // Increment the number of dropped spans and the corresponding metric
263+ self . dropped_spans_metric . add ( 1 , & [ ] ) ;
264+ self . dropped_spans_count . fetch_add ( 1 , Ordering :: Relaxed ) ;
257265 }
258266 }
259267
@@ -269,6 +277,15 @@ impl<R: RuntimeChannel> SpanProcessor for BatchSpanProcessor<R> {
269277 }
270278
271279 fn shutdown ( & self ) -> TraceResult < ( ) > {
280+ let dropped_spans = self . dropped_spans_count . load ( Ordering :: Relaxed ) ;
281+ if dropped_spans > 0 {
282+ otel_warn ! (
283+ name: "BatchSpanProcessor.Shutdown" ,
284+ dropped_spans = dropped_spans,
285+ message = "Spans were dropped due to a full buffer."
286+ ) ;
287+ }
288+
272289 let ( res_sender, res_receiver) = oneshot:: channel ( ) ;
273290 self . message_sender
274291 . try_send ( BatchMessage :: Shutdown ( res_sender) )
@@ -492,8 +509,17 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
492509 processor. run ( messages) . await
493510 } ) ) ;
494511
512+ let dropped_spans_metric = global:: meter ( "opentelemetry" )
513+ . u64_counter ( "dropped_spans" )
514+ . with_description ( "Number of spans dropped due to full buffer" )
515+ . build ( ) ;
516+
495517 // Return batch processor with link to worker
496- BatchSpanProcessor { message_sender }
518+ BatchSpanProcessor {
519+ message_sender,
520+ dropped_spans_count : AtomicUsize :: new ( 0 ) ,
521+ dropped_spans_metric,
522+ }
497523 }
498524
499525 /// Create a new batch processor builder
0 commit comments