@@ -6,18 +6,20 @@ use crate::trace::Span;
66use crate :: trace:: SpanProcessor ;
77use crate :: trace:: { SpanData , SpanExporter } ;
88use futures_channel:: oneshot;
9- use futures_util:: pin_mut;
109use futures_util:: {
1110 future:: { self , BoxFuture , Either } ,
12- select,
11+ pin_mut , select,
1312 stream:: { self , FusedStream , FuturesUnordered } ,
14- StreamExt as _,
13+ FutureExt as _ , StreamExt as _ , TryFutureExt as _,
1514} ;
1615use opentelemetry:: Context ;
1716use opentelemetry:: { otel_debug, otel_error, otel_warn} ;
17+ use std:: collections:: VecDeque ;
1818use std:: fmt;
19+ use std:: future:: Future ;
1920use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
2021use std:: sync:: Arc ;
22+ use tokio:: sync:: RwLock ;
2123
2224/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
2325/// them at a preconfigured interval.
@@ -185,17 +187,20 @@ enum BatchMessage {
185187}
186188
187189struct BatchSpanProcessorInternal < E , R > {
188- spans : Vec < SpanData > ,
190+ spans : VecDeque < SpanData > ,
189191 export_tasks : FuturesUnordered < BoxFuture < ' static , OTelSdkResult > > ,
190192 runtime : R ,
191- exporter : E ,
193+ exporter : Arc < RwLock < E > > ,
192194 config : BatchConfig ,
193195}
194196
195- impl < E : SpanExporter , R : RuntimeChannel > BatchSpanProcessorInternal < E , R > {
197+ impl < E , R > BatchSpanProcessorInternal < E , R >
198+ where
199+ E : SpanExporter + ' static ,
200+ R : RuntimeChannel ,
201+ {
196202 async fn flush ( & mut self , res_channel : Option < oneshot:: Sender < OTelSdkResult > > ) {
197- let export_result = self . export ( ) . await ;
198- let task = Box :: pin ( async move {
203+ let task = self . export ( ) . map ( |export_result| {
199204 if let Some ( channel) = res_channel {
200205 // If a response channel is provided, attempt to send the export result through it.
201206 if let Err ( result) = channel. send ( export_result) {
@@ -221,7 +226,7 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
221226 if self . config . max_concurrent_exports == 1 {
222227 let _ = task. await ;
223228 } else {
224- self . export_tasks . push ( task) ;
229+ self . export_tasks . push ( task. boxed ( ) ) ;
225230 while self . export_tasks . next ( ) . await . is_some ( ) { }
226231 }
227232 }
@@ -233,27 +238,32 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
233238 match message {
234239 // Span has finished, add to buffer of pending spans.
235240 BatchMessage :: ExportSpan ( span) => {
236- self . spans . push ( span) ;
241+ if self . spans . len ( ) == self . config . max_export_batch_size {
242+ // Replace the oldest span with the new span to avoid suspending messages
243+ // processing.
244+ self . spans . pop_front ( ) ;
245+ }
246+ self . spans . push_back ( span) ;
237247
238248 if self . spans . len ( ) == self . config . max_export_batch_size {
239249 // If concurrent exports are saturated, wait for one to complete.
240250 if !self . export_tasks . is_empty ( )
241251 && self . export_tasks . len ( ) == self . config . max_concurrent_exports
242252 {
253+ // TODO: Refactor to avoid stopping message processing to not delay
254+ // shutdown/resource set because of export saturation.
243255 self . export_tasks . next ( ) . await ;
244256 }
245257
246- let export_result = self . export ( ) . await ;
247- let task = async move {
248- if let Err ( err) = export_result {
249- otel_error ! (
250- name: "BatchSpanProcessor.Export.Error" ,
251- reason = format!( "{}" , err)
252- ) ;
253- }
258+ let task = self . export ( ) . or_else ( |err| async move {
259+ otel_error ! (
260+ name: "BatchSpanProcessor.Export.Error" ,
261+ reason = format!( "{err}" ) ,
262+ ) ;
254263
255264 Ok ( ( ) )
256- } ;
265+ } ) ;
266+
257267 // Special case when not using concurrent exports
258268 if self . config . max_concurrent_exports == 1 {
259269 let _ = task. await ;
@@ -288,34 +298,42 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
288298 // Stream has terminated or processor is shutdown, return to finish execution.
289299 BatchMessage :: Shutdown ( ch) => {
290300 self . flush ( Some ( ch) ) . await ;
291- let _ = self . exporter . shutdown ( ) ;
301+ let _ = self . exporter . write ( ) . await . shutdown ( ) ;
292302 return false ;
293303 }
294304 // propagate the resource
295305 BatchMessage :: SetResource ( resource) => {
296- self . exporter . set_resource ( & resource) ;
306+ self . exporter . write ( ) . await . set_resource ( & resource) ;
297307 }
298308 }
299309 true
300310 }
301311
302- async fn export ( & mut self ) -> OTelSdkResult {
303- // Batch size check for flush / shutdown. Those methods may be called
304- // when there's no work to do.
305- if self . spans . is_empty ( ) {
306- return Ok ( ( ) ) ;
307- }
308-
309- let export = self . exporter . export ( self . spans . split_off ( 0 ) ) ;
310- let timeout = self . runtime . delay ( self . config . max_export_timeout ) ;
312+ #[ allow( impl_trait_overcaptures) ] // MSRV compatibility.
313+ fn export ( & mut self ) -> impl Future < Output = OTelSdkResult > {
314+ let spans = self . spans . drain ( ..) . collect :: < Vec < _ > > ( ) ;
315+ let exporter = self . exporter . clone ( ) ;
316+ let runtime = self . runtime . clone ( ) ;
311317 let time_out = self . config . max_export_timeout ;
312318
313- pin_mut ! ( export) ;
314- pin_mut ! ( timeout) ;
319+ async move {
320+ // Batch size check for flush / shutdown. Those methods may be called
321+ // when there's no work to do.
322+ if spans. is_empty ( ) {
323+ return Ok ( ( ) ) ;
324+ }
325+
326+ let exporter = exporter. read ( ) . await ;
327+ let export = exporter. export ( spans) ;
328+ let timeout = runtime. delay ( time_out) ;
329+
330+ pin_mut ! ( export) ;
331+ pin_mut ! ( timeout) ;
315332
316- match future:: select ( export, timeout) . await {
317- Either :: Left ( ( export_res, _) ) => export_res,
318- Either :: Right ( ( _, _) ) => Err ( OTelSdkError :: Timeout ( time_out) ) ,
333+ match future:: select ( export, timeout) . await {
334+ Either :: Left ( ( export_res, _) ) => export_res,
335+ Either :: Right ( ( _, _) ) => Err ( OTelSdkError :: Timeout ( time_out) ) ,
336+ }
319337 }
320338 }
321339
@@ -328,14 +346,14 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
328346 // An export task completed; do we need to do anything with it?
329347 } ,
330348 message = messages. next( ) => {
331- match message {
332- Some ( message) => {
333- if !self . process_message( message) . await {
334- break ;
335- }
336- } ,
337- None => break ,
349+ if let Some ( m) = message {
350+ if self . process_message( m) . await {
351+ continue ;
352+ }
338353 }
354+
355+ // Shutdown if there's no message, or the message indicates shutdown.
356+ break ;
339357 } ,
340358 }
341359 }
@@ -364,11 +382,11 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
364382
365383 let messages = Box :: pin ( stream:: select ( message_receiver, ticker) ) ;
366384 let processor = BatchSpanProcessorInternal {
367- spans : Vec :: new ( ) ,
385+ spans : VecDeque :: new ( ) ,
368386 export_tasks : FuturesUnordered :: new ( ) ,
369387 runtime : timeout_runtime,
370388 config,
371- exporter,
389+ exporter : Arc :: new ( RwLock :: new ( exporter ) ) ,
372390 } ;
373391
374392 processor. run ( messages) . await
0 commit comments