@@ -6,18 +6,20 @@ use crate::trace::Span;
66use crate :: trace:: SpanProcessor ;
77use crate :: trace:: { SpanData , SpanExporter } ;
88use futures_channel:: oneshot;
9- use futures_util:: pin_mut;
109use futures_util:: {
1110 future:: { self , BoxFuture , Either } ,
12- select,
11+ pin_mut , select,
1312 stream:: { self , FusedStream , FuturesUnordered } ,
14- StreamExt as _,
13+ FutureExt as _ , StreamExt as _ , TryFutureExt as _,
1514} ;
1615use opentelemetry:: Context ;
1716use opentelemetry:: { otel_debug, otel_error, otel_warn} ;
17+ use std:: collections:: VecDeque ;
1818use std:: fmt;
19+ use std:: future:: Future ;
1920use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
2021use std:: sync:: Arc ;
22+ use tokio:: sync:: RwLock ;
2123
2224/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
2325/// them at a preconfigured interval.
@@ -185,17 +187,20 @@ enum BatchMessage {
185187}
186188
187189struct BatchSpanProcessorInternal < E , R > {
188- spans : Vec < SpanData > ,
190+ spans : VecDeque < SpanData > ,
189191 export_tasks : FuturesUnordered < BoxFuture < ' static , OTelSdkResult > > ,
190192 runtime : R ,
191- exporter : E ,
193+ exporter : Arc < RwLock < E > > ,
192194 config : BatchConfig ,
193195}
194196
195- impl < E : SpanExporter , R : RuntimeChannel > BatchSpanProcessorInternal < E , R > {
197+ impl < E , R > BatchSpanProcessorInternal < E , R >
198+ where
199+ E : SpanExporter + ' static ,
200+ R : RuntimeChannel ,
201+ {
196202 async fn flush ( & mut self , res_channel : Option < oneshot:: Sender < OTelSdkResult > > ) {
197- let export_result = self . export ( ) . await ;
198- let task = Box :: pin ( async move {
203+ let task = self . export ( ) . map ( |export_result| {
199204 if let Some ( channel) = res_channel {
200205 // If a response channel is provided, attempt to send the export result through it.
201206 if let Err ( result) = channel. send ( export_result) {
@@ -221,7 +226,7 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
221226 if self . config . max_concurrent_exports == 1 {
222227 let _ = task. await ;
223228 } else {
224- self . export_tasks . push ( task) ;
229+ self . export_tasks . push ( task. boxed ( ) ) ;
225230 while self . export_tasks . next ( ) . await . is_some ( ) { }
226231 }
227232 }
@@ -233,7 +238,12 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
233238 match message {
234239 // Span has finished, add to buffer of pending spans.
235240 BatchMessage :: ExportSpan ( span) => {
236- self . spans . push ( span) ;
241+ if self . spans . len ( ) == self . config . max_export_batch_size {
242+ // Replace the oldest span with the new span to avoid suspending messages
243+ // processing.
244+ self . spans . pop_front ( ) ;
245+ }
246+ self . spans . push_back ( span) ;
237247
238248 if self . spans . len ( ) == self . config . max_export_batch_size {
239249 // If concurrent exports are saturated, wait for one to complete.
@@ -243,17 +253,15 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
243253 self . export_tasks . next ( ) . await ;
244254 }
245255
246- let export_result = self . export ( ) . await ;
247- let task = async move {
248- if let Err ( err) = export_result {
249- otel_error ! (
250- name: "BatchSpanProcessor.Export.Error" ,
251- reason = format!( "{}" , err)
252- ) ;
253- }
256+ let task = self . export ( ) . or_else ( |err| async move {
257+ otel_error ! (
258+ name: "BatchSpanProcessor.Export.Error" ,
259+ reason = format!( "{err}" ) ,
260+ ) ;
254261
255262 Ok ( ( ) )
256- } ;
263+ } ) ;
264+
257265 // Special case when not using concurrent exports
258266 if self . config . max_concurrent_exports == 1 {
259267 let _ = task. await ;
@@ -288,34 +296,41 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
288296 // Stream has terminated or processor is shutdown, return to finish execution.
289297 BatchMessage :: Shutdown ( ch) => {
290298 self . flush ( Some ( ch) ) . await ;
291- let _ = self . exporter . shutdown ( ) ;
299+ let _ = self . exporter . write ( ) . await . shutdown ( ) ;
292300 return false ;
293301 }
294302 // propagate the resource
295303 BatchMessage :: SetResource ( resource) => {
296- self . exporter . set_resource ( & resource) ;
304+ self . exporter . write ( ) . await . set_resource ( & resource) ;
297305 }
298306 }
299307 true
300308 }
301309
302- async fn export ( & mut self ) -> OTelSdkResult {
303- // Batch size check for flush / shutdown. Those methods may be called
304- // when there's no work to do.
305- if self . spans . is_empty ( ) {
306- return Ok ( ( ) ) ;
307- }
308-
309- let export = self . exporter . export ( self . spans . split_off ( 0 ) ) ;
310- let timeout = self . runtime . delay ( self . config . max_export_timeout ) ;
310+ fn export ( & mut self ) -> impl Future < Output = OTelSdkResult > + use < E , R > {
311+ let spans = self . spans . drain ( ..) . collect :: < Vec < _ > > ( ) ;
312+ let exporter = self . exporter . clone ( ) ;
313+ let runtime = self . runtime . clone ( ) ;
311314 let time_out = self . config . max_export_timeout ;
312315
313- pin_mut ! ( export) ;
314- pin_mut ! ( timeout) ;
316+ async move {
317+ // Batch size check for flush / shutdown. Those methods may be called
318+ // when there's no work to do.
319+ if spans. is_empty ( ) {
320+ return Ok ( ( ) ) ;
321+ }
322+
323+ let exporter = exporter. read ( ) . await ;
324+ let export = exporter. export ( spans) ;
325+ let timeout = runtime. delay ( time_out) ;
326+
327+ pin_mut ! ( export) ;
328+ pin_mut ! ( timeout) ;
315329
316- match future:: select ( export, timeout) . await {
317- Either :: Left ( ( export_res, _) ) => export_res,
318- Either :: Right ( ( _, _) ) => Err ( OTelSdkError :: Timeout ( time_out) ) ,
330+ match future:: select ( export, timeout) . await {
331+ Either :: Left ( ( export_res, _) ) => export_res,
332+ Either :: Right ( ( _, _) ) => Err ( OTelSdkError :: Timeout ( time_out) ) ,
333+ }
319334 }
320335 }
321336
@@ -328,14 +343,14 @@ impl<E: SpanExporter, R: RuntimeChannel> BatchSpanProcessorInternal<E, R> {
328343 // An export task completed; do we need to do anything with it?
329344 } ,
330345 message = messages. next( ) => {
331- match message {
332- Some ( message) => {
333- if !self . process_message( message) . await {
334- break ;
335- }
336- } ,
337- None => break ,
346+ if let Some ( m) = message {
347+ if self . process_message( m) . await {
348+ continue ;
349+ }
338350 }
351+
352+ // Shutdown if there's no message, or the message indicates shutdown.
353+ break ;
339354 } ,
340355 }
341356 }
@@ -364,11 +379,11 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
364379
365380 let messages = Box :: pin ( stream:: select ( message_receiver, ticker) ) ;
366381 let processor = BatchSpanProcessorInternal {
367- spans : Vec :: new ( ) ,
382+ spans : VecDeque :: new ( ) ,
368383 export_tasks : FuturesUnordered :: new ( ) ,
369384 runtime : timeout_runtime,
370385 config,
371- exporter,
386+ exporter : Arc :: new ( RwLock :: new ( exporter ) ) ,
372387 } ;
373388
374389 processor. run ( messages) . await
0 commit comments