@@ -221,12 +221,15 @@ use futures_executor::block_on;
221221use std:: sync:: mpsc:: sync_channel;
222222use std:: sync:: mpsc:: RecvTimeoutError ;
223223use std:: sync:: mpsc:: SyncSender ;
224+ use std:: sync:: mpsc:: Receiver ;
225+ use crate :: export:: trace:: ExportResult ;
224226
225227/// Messages exchanged between the main thread and the background thread.
226228#[ allow( clippy:: large_enum_variant) ]
227229#[ derive( Debug ) ]
228230enum BatchMessage {
229- ExportSpan ( SpanData ) ,
231+ //ExportSpan(SpanData),
232+ ExportSpan ( Arc < AtomicBool > ) ,
230233 ForceFlush ( SyncSender < TraceResult < ( ) > > ) ,
231234 Shutdown ( SyncSender < TraceResult < ( ) > > ) ,
232235 SetResource ( Arc < Resource > ) ,
@@ -235,12 +238,16 @@ enum BatchMessage {
235238/// A batch span processor with a dedicated background thread.
236239#[ derive( Debug ) ]
237240pub struct BatchSpanProcessor {
238- message_sender : SyncSender < BatchMessage > ,
241+ span_sender : SyncSender < SpanData > , // Data channel to store spans
242+ message_sender : SyncSender < BatchMessage > , // Control channel to store control messages.
239243 handle : Mutex < Option < thread:: JoinHandle < ( ) > > > ,
240244 forceflush_timeout : Duration ,
241245 shutdown_timeout : Duration ,
242246 is_shutdown : AtomicBool ,
243247 dropped_span_count : Arc < AtomicUsize > ,
248+ export_span_message_sent : Arc < AtomicBool > ,
249+ current_batch_size : Arc < AtomicUsize > ,
250+ max_export_batch_size : usize ,
244251}
245252
246253impl BatchSpanProcessor {
@@ -255,7 +262,12 @@ impl BatchSpanProcessor {
255262 where
256263 E : SpanExporter + Send + ' static ,
257264 {
258- let ( message_sender, message_receiver) = sync_channel ( config. max_queue_size ) ;
265+ let ( message_sender, message_receiver) = sync_channel :: < SpanData > ( config. max_queue_size ) ;
266+ let ( message_sender, message_receiver) = sync_channel :: < BatchMessage > ( 64 ) ; // Is this a reasonable bound?
267+ let max_queue_size = config. max_queue_size ;
268+ let max_export_batch_size = config. max_export_batch_size ;
269+ let current_batch_size = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
270+ let current_batch_size_for_thread = current_batch_size. clone ( ) ;
259271
260272 let handle = thread:: Builder :: new ( )
261273 . name ( "OpenTelemetry.Traces.BatchProcessor" . to_string ( ) )
@@ -268,7 +280,41 @@ impl BatchSpanProcessor {
268280 ) ;
269281 let mut spans = Vec :: with_capacity ( config. max_export_batch_size ) ;
270282 let mut last_export_time = Instant :: now ( ) ;
283+ let current_batch_size = current_batch_size_for_thread;
284+ // This method gets upto `max_export_batch_size` amount of spans from the channel and exports them.
285+ // It returns the result of the export operation.
286+ // It expects the span vec to be empty when it's called.
287+ #[ inline]
288+ fn get_spans_and_export < E > (
289+ spans_receiver : & Receiver < SpanData > ,
290+ exporter : & E ,
291+ spans : & mut Vec < SpanData > ,
292+ last_export_time : & mut Instant ,
293+ current_batch_size : & AtomicUsize ,
294+ config : & BatchConfig ,
295+ ) -> ExportResult
296+ where
297+ E : SpanExporter + Send + Sync + ' static ,
298+ {
299+ // Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
300+ while let Ok ( log) = spans_receiver. try_recv ( ) {
301+ spans. push ( log) ;
302+ if spans. len ( ) == config. max_export_batch_size {
303+ break ;
304+ }
305+ }
306+
307+ let count_of_logs = spans. len ( ) ; // Count of logs that will be exported
308+ let result = export_with_timeout_sync (
309+ config. max_export_timeout ,
310+ exporter,
311+ spans,
312+ last_export_time,
313+ ) ; // This method clears the logs vec after exporting
271314
315+ current_batch_size. fetch_sub ( count_of_logs, Ordering :: Relaxed ) ;
316+ result
317+ }
272318 loop {
273319 let remaining_time_option = config
274320 . scheduled_delay
@@ -280,6 +326,9 @@ impl BatchSpanProcessor {
280326 match message_receiver. recv_timeout ( remaining_time) {
281327 Ok ( message) => match message {
282328 BatchMessage :: ExportSpan ( span) => {
329+ otel_debug ! (
330+ name: "BatchSpanProcessor.ExportingDueToBatchSize" ,
331+ ) ;
283332 spans. push ( span) ;
284333 if spans. len ( ) >= config. max_queue_size
285334 || last_export_time. elapsed ( ) >= config. scheduled_delay
0 commit comments