-
Notifications
You must be signed in to change notification settings - Fork 600
Redesign BatchLogProcessor #2494
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f040407
9c210b0
77dd3e8
e81c639
a599c1c
177abcf
7291998
5f56d1c
08687d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -199,8 +199,8 @@ | |
| #[allow(clippy::large_enum_variant)] | ||
| #[derive(Debug)] | ||
| enum BatchMessage { | ||
| /// Export logs, called when the log is emitted. | ||
| ExportLog(Box<(LogRecord, InstrumentationScope)>), | ||
| /// This ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`. | ||
| ExportLog(Arc<AtomicBool>), | ||
| /// ForceFlush flushes the current buffer to the exporter. | ||
| ForceFlush(mpsc::SyncSender<ExportResult>), | ||
| /// Shut down the worker thread, push all logs in buffer to the exporter. | ||
|
|
@@ -209,6 +209,8 @@ | |
| SetResource(Arc<Resource>), | ||
| } | ||
|
|
||
| type LogsData = Box<(LogRecord, InstrumentationScope)>; | ||
|
|
||
| /// The `BatchLogProcessor` collects finished logs in a buffer and exports them | ||
| /// in batches to the configured `LogExporter`. This processor is ideal for | ||
| /// high-throughput environments, as it minimizes the overhead of exporting logs | ||
|
|
@@ -246,11 +248,15 @@ | |
| /// .build(); | ||
| /// | ||
| pub struct BatchLogProcessor { | ||
| message_sender: SyncSender<BatchMessage>, | ||
| logs_sender: SyncSender<LogsData>, // Data channel to store log records and instrumentation scopes | ||
| message_sender: SyncSender<BatchMessage>, // Control channel to store control messages for the worker thread | ||
| handle: Mutex<Option<thread::JoinHandle<()>>>, | ||
| forceflush_timeout: Duration, | ||
| shutdown_timeout: Duration, | ||
| is_shutdown: AtomicBool, | ||
| export_log_message_sent: Arc<AtomicBool>, | ||
| current_batch_size: Arc<AtomicUsize>, | ||
| max_export_batch_size: usize, | ||
|
|
||
| // Track dropped logs - we'll log this at shutdown | ||
| dropped_logs_count: AtomicUsize, | ||
|
|
@@ -279,11 +285,8 @@ | |
| } | ||
|
|
||
| let result = self | ||
| .message_sender | ||
| .try_send(BatchMessage::ExportLog(Box::new(( | ||
| record.clone(), | ||
| instrumentation.clone(), | ||
| )))); | ||
| .logs_sender | ||
| .try_send(Box::new((record.clone(), instrumentation.clone()))); | ||
|
|
||
| if result.is_err() { | ||
| // Increment dropped logs count. The first time we have to drop a log, | ||
|
|
@@ -292,6 +295,37 @@ | |
| otel_warn!(name: "BatchLogProcessor.LogDroppingStarted", | ||
| message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped."); | ||
| } | ||
| return; | ||
| } | ||
|
|
||
| // At this point, sending the log record to the data channel was successful. | ||
| // Increment the current batch size and check if it has reached the max export batch size. | ||
| if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 >= self.max_export_batch_size | ||
| { | ||
| // Check if the a control message for exporting logs is already sent to the worker thread. | ||
| // If not, send a control message to export logs. | ||
| // `export_log_message_sent` is set to false ONLY when the worker thread has processed the control message. | ||
|
|
||
| if !self.export_log_message_sent.load(Ordering::Relaxed) { | ||
| // This is a cost-efficient check as atomic load operations do not require exclusive access to cache line. | ||
| // Perform atomic swap to `export_log_message_sent` ONLY when the atomic load operation above returns false. | ||
| // Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures. | ||
| // We could have used compare_exchange as well here, but it's more verbose than swap. | ||
| if !self.export_log_message_sent.swap(true, Ordering::Relaxed) { | ||
| match self.message_sender.try_send(BatchMessage::ExportLog( | ||
| self.export_log_message_sent.clone(), | ||
| )) { | ||
| Ok(_) => { | ||
| // Control message sent successfully. | ||
| } | ||
| Err(_err) => { | ||
| // TODO: Log error | ||
| // If the control message could not be sent, reset the `export_log_message_sent` flag. | ||
| self.export_log_message_sent.store(false, Ordering::Relaxed); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -388,8 +422,12 @@ | |
| where | ||
| E: LogExporter + Send + Sync + 'static, | ||
| { | ||
| let (message_sender, message_receiver) = mpsc::sync_channel(config.max_queue_size); | ||
| let (logs_sender, logs_receiver) = mpsc::sync_channel::<LogsData>(config.max_queue_size); | ||
| let (message_sender, message_receiver) = mpsc::sync_channel::<BatchMessage>(64); // Is this a reasonable bound? | ||
| let max_queue_size = config.max_queue_size; | ||
| let max_export_batch_size = config.max_export_batch_size; | ||
| let current_batch_size = Arc::new(AtomicUsize::new(0)); | ||
| let current_batch_size_for_thread = current_batch_size.clone(); | ||
|
|
||
| let handle = thread::Builder::new() | ||
| .name("OpenTelemetry.Logs.BatchProcessor".to_string()) | ||
|
|
@@ -402,6 +440,42 @@ | |
| ); | ||
| let mut last_export_time = Instant::now(); | ||
| let mut logs = Vec::with_capacity(config.max_export_batch_size); | ||
| let current_batch_size = current_batch_size_for_thread; | ||
|
|
||
| // This method gets upto `max_export_batch_size` amount of logs from the channel and exports them. | ||
| // It returns the result of the export operation. | ||
| // It expects the logs vec to be empty when it's called. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can get rid of
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For using array, we need to know the size of the array beforehand which we don't because we don't know how many items are present in the channel. It might be less than
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can allocate an array of max_export_batch_size, and slice it at the required length. Anyway, this is not a blocker.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Always stack allocating a big array even to export very few logs seems wasteful. What's the benefit of using an array instead of a |
||
| #[inline] | ||
| fn get_logs_and_export<E>( | ||
| logs_receiver: &mpsc::Receiver<LogsData>, | ||
| exporter: &E, | ||
| logs: &mut Vec<LogsData>, | ||
| last_export_time: &mut Instant, | ||
| current_batch_size: &AtomicUsize, | ||
| config: &BatchConfig, | ||
| ) -> ExportResult | ||
| where | ||
| E: LogExporter + Send + Sync + 'static, | ||
| { | ||
| // Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec | ||
| while let Ok(log) = logs_receiver.try_recv() { | ||
| logs.push(log); | ||
| if logs.len() == config.max_export_batch_size { | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| let count_of_logs = logs.len(); // Count of logs that will be exported | ||
| let result = export_with_timeout_sync( | ||
| config.max_export_timeout, | ||
| exporter, | ||
| logs, | ||
| last_export_time, | ||
| ); // This method clears the logs vec after exporting | ||
|
|
||
| current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed); | ||
| result | ||
| } | ||
|
|
||
| loop { | ||
| let remaining_time = config | ||
|
|
@@ -410,37 +484,44 @@ | |
| .unwrap_or(config.scheduled_delay); | ||
|
|
||
| match message_receiver.recv_timeout(remaining_time) { | ||
| Ok(BatchMessage::ExportLog(log)) => { | ||
| logs.push(log); | ||
| if logs.len() == config.max_export_batch_size { | ||
| otel_debug!( | ||
| name: "BatchLogProcessor.ExportingDueToBatchSize", | ||
| ); | ||
| let _ = export_with_timeout_sync( | ||
| config.max_export_timeout, | ||
| &mut exporter, | ||
| &mut logs, | ||
| &mut last_export_time, | ||
| ); | ||
| } | ||
| Ok(BatchMessage::ExportLog(export_log_message_sent)) => { | ||
| otel_debug!( | ||
| name: "BatchLogProcessor.ExportingDueToBatchSize", | ||
| ); | ||
|
|
||
| let _ = get_logs_and_export( | ||
| &logs_receiver, | ||
| &exporter, | ||
| &mut logs, | ||
| &mut last_export_time, | ||
| ¤t_batch_size, | ||
| &config, | ||
| ); | ||
|
|
||
| // Reset the export log message sent flag now it has has been processed. | ||
| export_log_message_sent.store(false, Ordering::Relaxed); | ||
| } | ||
| Ok(BatchMessage::ForceFlush(sender)) => { | ||
| otel_debug!(name: "BatchLogProcessor.ExportingDueToForceFlush"); | ||
| let result = export_with_timeout_sync( | ||
| config.max_export_timeout, | ||
| &mut exporter, | ||
| let result = get_logs_and_export( | ||
utpilla marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| &logs_receiver, | ||
| &exporter, | ||
| &mut logs, | ||
| &mut last_export_time, | ||
| ¤t_batch_size, | ||
| &config, | ||
| ); | ||
| let _ = sender.send(result); | ||
| } | ||
| Ok(BatchMessage::Shutdown(sender)) => { | ||
| otel_debug!(name: "BatchLogProcessor.ExportingDueToShutdown"); | ||
| let result = export_with_timeout_sync( | ||
| config.max_export_timeout, | ||
| &mut exporter, | ||
| let result = get_logs_and_export( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as flush - we need to ensure logs are continued to be exported until it finishes all logs received upto the point shutdown was invoked. |
||
| &logs_receiver, | ||
| &exporter, | ||
| &mut logs, | ||
| &mut last_export_time, | ||
| ¤t_batch_size, | ||
| &config, | ||
| ); | ||
| let _ = sender.send(result); | ||
|
|
||
|
|
@@ -460,11 +541,14 @@ | |
| otel_debug!( | ||
| name: "BatchLogProcessor.ExportingDueToTimer", | ||
| ); | ||
| let _ = export_with_timeout_sync( | ||
| config.max_export_timeout, | ||
| &mut exporter, | ||
|
|
||
| let _ = get_logs_and_export( | ||
| &logs_receiver, | ||
| &exporter, | ||
| &mut logs, | ||
| &mut last_export_time, | ||
| ¤t_batch_size, | ||
| &config, | ||
| ); | ||
| } | ||
| Err(RecvTimeoutError::Disconnected) => { | ||
|
|
@@ -486,13 +570,17 @@ | |
|
|
||
| // Return batch processor with link to worker | ||
| BatchLogProcessor { | ||
| logs_sender, | ||
| message_sender, | ||
| handle: Mutex::new(Some(handle)), | ||
| forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable | ||
| shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable | ||
| is_shutdown: AtomicBool::new(false), | ||
| dropped_logs_count: AtomicUsize::new(0), | ||
| max_queue_size, | ||
| export_log_message_sent: Arc::new(AtomicBool::new(false)), | ||
| current_batch_size, | ||
| max_export_batch_size, | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -511,7 +599,7 @@ | |
| #[allow(clippy::vec_box)] | ||
| fn export_with_timeout_sync<E>( | ||
| _: Duration, // TODO, enforcing timeout in exporter. | ||
| exporter: &mut E, | ||
| exporter: &E, | ||
| batch: &mut Vec<Box<(LogRecord, InstrumentationScope)>>, | ||
| last_export_time: &mut Instant, | ||
| ) -> ExportResult | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.