@@ -6,20 +6,19 @@ use crate::trace::Span;
66use crate :: trace:: SpanProcessor ;
77use crate :: trace:: { SpanData , SpanExporter } ;
88use futures_channel:: oneshot;
9+ use futures_util:: pin_mut;
910use futures_util:: {
1011 future:: { self , BoxFuture , Either } ,
11- pin_mut , select,
12+ select,
1213 stream:: { self , FusedStream , FuturesUnordered } ,
13- FutureExt as _ , StreamExt as _ , TryFutureExt as _,
14+ StreamExt as _,
1415} ;
1516use opentelemetry:: Context ;
1617use opentelemetry:: { otel_debug, otel_error, otel_warn} ;
1718use std:: collections:: VecDeque ;
1819use std:: fmt;
19- use std:: future:: Future ;
2020use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
2121use std:: sync:: Arc ;
22- use tokio:: sync:: RwLock ;
2322
2423/// A [`SpanProcessor`] that asynchronously buffers finished spans and reports
2524/// them at a preconfigured interval.
@@ -190,17 +189,14 @@ struct BatchSpanProcessorInternal<E, R> {
190189 spans : VecDeque < SpanData > ,
191190 export_tasks : FuturesUnordered < BoxFuture < ' static , OTelSdkResult > > ,
192191 runtime : R ,
193- exporter : Arc < RwLock < E > > ,
192+ exporter : E ,
194193 config : BatchConfig ,
195194}
196195
197- impl < E , R > BatchSpanProcessorInternal < E , R >
198- where
199- E : SpanExporter + ' static ,
200- R : RuntimeChannel ,
201- {
196+ impl < E : SpanExporter , R : RuntimeChannel > BatchSpanProcessorInternal < E , R > {
202197 async fn flush ( & mut self , res_channel : Option < oneshot:: Sender < OTelSdkResult > > ) {
203- let task = self . export ( ) . map ( |export_result| {
198+ let export_result = self . export ( ) . await ; // TODO: Move execution to `export_tasks`.
199+ let task = Box :: pin ( async move {
204200 if let Some ( channel) = res_channel {
205201 // If a response channel is provided, attempt to send the export result through it.
206202 if let Err ( result) = channel. send ( export_result) {
@@ -226,7 +222,7 @@ where
226222 if self . config . max_concurrent_exports == 1 {
227223 let _ = task. await ;
228224 } else {
229- self . export_tasks . push ( task. boxed ( ) ) ;
225+ self . export_tasks . push ( task) ;
230226 while self . export_tasks . next ( ) . await . is_some ( ) { }
231227 }
232228 }
@@ -255,15 +251,17 @@ where
255251 self . export_tasks . next ( ) . await ;
256252 }
257253
258- let task = self . export ( ) . or_else ( |err| async move {
259- otel_error ! (
260- name: "BatchSpanProcessor.Export.Error" ,
261- reason = format!( "{err}" ) ,
262- ) ;
254+ let export_result = self . export ( ) . await ; // TODO: Move execution to `export_tasks`.
255+ let task = async move {
256+ if let Err ( err) = export_result {
257+ otel_error ! (
258+ name: "BatchSpanProcessor.Export.Error" ,
259+ reason = format!( "{}" , err)
260+ ) ;
261+ }
263262
264263 Ok ( ( ) )
265- } ) ;
266-
264+ } ;
267265 // Special case when not using concurrent exports
268266 if self . config . max_concurrent_exports == 1 {
269267 let _ = task. await ;
@@ -298,42 +296,34 @@ where
298296 // Stream has terminated or processor is shutdown, return to finish execution.
299297 BatchMessage :: Shutdown ( ch) => {
300298 self . flush ( Some ( ch) ) . await ;
301- let _ = self . exporter . write ( ) . await . shutdown ( ) ;
299+ let _ = self . exporter . shutdown ( ) ;
302300 return false ;
303301 }
304302 // propagate the resource
305303 BatchMessage :: SetResource ( resource) => {
306- self . exporter . write ( ) . await . set_resource ( & resource) ;
304+ self . exporter . set_resource ( & resource) ;
307305 }
308306 }
309307 true
310308 }
311309
312- #[ allow( impl_trait_overcaptures) ] // MSRV compatibility.
313- fn export ( & mut self ) -> impl Future < Output = OTelSdkResult > {
314- let spans = self . spans . drain ( ..) . collect :: < Vec < _ > > ( ) ;
315- let exporter = self . exporter . clone ( ) ;
316- let runtime = self . runtime . clone ( ) ;
317- let time_out = self . config . max_export_timeout ;
318-
319- async move {
320- // Batch size check for flush / shutdown. Those methods may be called
321- // when there's no work to do.
322- if spans. is_empty ( ) {
323- return Ok ( ( ) ) ;
324- }
310+ async fn export ( & mut self ) -> OTelSdkResult {
311+ // Batch size check for flush / shutdown. Those methods may be called
312+ // when there's no work to do.
313+ if self . spans . is_empty ( ) {
314+ return Ok ( ( ) ) ;
315+ }
325316
326- let exporter = exporter. read ( ) . await ;
327- let export = exporter . export ( spans ) ;
328- let timeout = runtime . delay ( time_out ) ;
317+ let export = self . exporter . export ( self . spans . drain ( .. ) . collect ( ) ) ;
318+ let timeout = self . runtime . delay ( self . config . max_export_timeout ) ;
319+ let time_out = self . config . max_export_timeout ;
329320
330- pin_mut ! ( export) ;
331- pin_mut ! ( timeout) ;
321+ pin_mut ! ( export) ;
322+ pin_mut ! ( timeout) ;
332323
333- match future:: select ( export, timeout) . await {
334- Either :: Left ( ( export_res, _) ) => export_res,
335- Either :: Right ( ( _, _) ) => Err ( OTelSdkError :: Timeout ( time_out) ) ,
336- }
324+ match future:: select ( export, timeout) . await {
325+ Either :: Left ( ( export_res, _) ) => export_res,
326+ Either :: Right ( ( _, _) ) => Err ( OTelSdkError :: Timeout ( time_out) ) ,
337327 }
338328 }
339329
@@ -346,14 +336,14 @@ where
346336 // An export task completed; do we need to do anything with it?
347337 } ,
348338 message = messages. next( ) => {
349- if let Some ( m) = message {
350- if self . process_message( m) . await {
351- continue ;
352- }
339+ match message {
340+ Some ( message) => {
341+ if !self . process_message( message) . await {
342+ break ;
343+ }
344+ } ,
345+ None => break ,
353346 }
354-
355- // Shutdown if there's no message, or the message indicates shutdown.
356- break ;
357347 } ,
358348 }
359349 }
@@ -386,7 +376,7 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
386376 export_tasks : FuturesUnordered :: new ( ) ,
387377 runtime : timeout_runtime,
388378 config,
389- exporter : Arc :: new ( RwLock :: new ( exporter ) ) ,
379+ exporter,
390380 } ;
391381
392382 processor. run ( messages) . await
0 commit comments