@@ -300,14 +300,31 @@ impl LogProcessor for BatchLogProcessor {
300300
301301 // At this point, sending the log record to the data channel was successful.
302302 // Increment the current batch size and check if it has reached the max export batch size.
303- if self . current_batch_size . fetch_add ( 1 , Ordering :: Relaxed ) >= self . max_export_batch_size {
303+ if self . current_batch_size . fetch_add ( 1 , Ordering :: Relaxed ) + 1 >= self . max_export_batch_size
304+ {
304305 // Check if the a control message for exporting logs is already sent to the worker thread.
305306 // If not, send a control message to export logs.
306307 // `export_log_message_sent` is set to false ONLY when the worker thread has processed the control message.
307- if !self . export_log_message_sent . swap ( true , Ordering :: Relaxed ) {
308- let _ = self . message_sender . try_send ( BatchMessage :: ExportLog (
309- self . export_log_message_sent . clone ( ) ,
310- ) ) ; // TODO: Handle error
308+
309+ if !self . export_log_message_sent . load ( Ordering :: Relaxed ) {
310+ // This is a cost-efficient check as atomic load operations do not require exclusive access to cache line.
311+ // Perform atomic swap to `export_log_message_sent` ONLY when the atomic load operation abbove returns false.
312+ // Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures.
313+ // We could have used compare_exchange as well here, but it's more verbose than swap. Also, swap uses compare_exchange internally anyway.
314+ if !self . export_log_message_sent . swap ( true , Ordering :: Relaxed ) {
315+ match self . message_sender . try_send ( BatchMessage :: ExportLog (
316+ self . export_log_message_sent . clone ( ) ,
317+ ) ) {
318+ Ok ( _) => {
319+ // Control message sent successfully.
320+ }
321+ Err ( _err) => {
322+ // TODO: Log error
323+ // If the control message could not be sent, reset the `export_log_message_sent` flag.
324+ self . export_log_message_sent . store ( false , Ordering :: Relaxed ) ;
325+ }
326+ }
327+ }
311328 }
312329 }
313330 }
0 commit comments