diff --git a/opentelemetry-sdk/src/logs/batch_log_processor.rs b/opentelemetry-sdk/src/logs/batch_log_processor.rs index eb48fbfd5b..420142ea5d 100644 --- a/opentelemetry-sdk/src/logs/batch_log_processor.rs +++ b/opentelemetry-sdk/src/logs/batch_log_processor.rs @@ -54,6 +54,10 @@ pub(crate) const OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048; pub(crate) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"; /// Default maximum batch size. pub(crate) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512; +/// Maximum force flush timeout. +pub(crate) const OTEL_BLRP_FORCEFLUSH_TIMEOUT: &str = "OTEL_BLRP_FORCEFLUSH_TIMEOUT"; +/// Default maximum force flush timeout. +pub(crate) const OTEL_BLRP_FORCEFLUSH_TIMEOUT_DEFAULT: Duration = Duration::from_millis(5_000); /// Messages sent between application thread and batch log processor's work thread. #[allow(clippy::large_enum_variant)] @@ -333,6 +337,7 @@ impl BatchLogProcessor { let (message_sender, message_receiver) = mpsc::sync_channel::(64); // Is this a reasonable bound? let max_queue_size = config.max_queue_size; let max_export_batch_size = config.max_export_batch_size; + let forceflush_timeout = config.forceflush_timeout; let current_batch_size = Arc::new(AtomicUsize::new(0)); let current_batch_size_for_thread = current_batch_size.clone(); @@ -485,7 +490,7 @@ impl BatchLogProcessor { logs_sender, message_sender, handle: Mutex::new(Some(handle)), - forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable + forceflush_timeout, dropped_logs_count: AtomicUsize::new(0), max_queue_size, export_log_message_sent: Arc::new(AtomicBool::new(false)), @@ -582,6 +587,9 @@ pub struct BatchConfig { /// is 512. pub(crate) max_export_batch_size: usize, + /// The maximum duration to wait for a force flush to complete. The default value is 5 seconds. + pub(crate) forceflush_timeout: Duration, + /// The maximum duration to export a batch of data. #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] pub(crate) max_export_timeout: Duration, @@ -599,6 +607,7 @@ pub struct BatchConfigBuilder { max_queue_size: usize, scheduled_delay: Duration, max_export_batch_size: usize, + forceflush_timeout: Duration, #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] max_export_timeout: Duration, } @@ -618,6 +627,7 @@ impl Default for BatchConfigBuilder { max_queue_size: OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, scheduled_delay: OTEL_BLRP_SCHEDULE_DELAY_DEFAULT, max_export_batch_size: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, + forceflush_timeout: OTEL_BLRP_FORCEFLUSH_TIMEOUT_DEFAULT, #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] max_export_timeout: OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT, } @@ -678,6 +688,18 @@ impl BatchConfigBuilder { self } + /// Set forceflush_timeout for [`BatchConfigBuilder`]. + /// It's the maximum duration to wait for a force flush to complete. + /// The default value is 5 seconds. + /// + /// Corresponding environment variable: `OTEL_BLRP_FORCEFLUSH_TIMEOUT`. + /// + /// Note: Programmatically setting this will override any value set via the environment variable. + pub fn with_forceflush_timeout(mut self, forceflush_timeout: Duration) -> Self { + self.forceflush_timeout = forceflush_timeout; + self + } + /// Builds a `BatchConfig` enforcing the following invariants: /// * `max_export_batch_size` must be less than or equal to `max_queue_size`. pub fn build(self) -> BatchConfig { @@ -691,6 +713,7 @@ impl BatchConfigBuilder { #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] max_export_timeout: self.max_export_timeout, max_export_batch_size, + forceflush_timeout: self.forceflush_timeout, } } @@ -716,6 +739,13 @@ impl BatchConfigBuilder { self.scheduled_delay = Duration::from_millis(scheduled_delay); } + if let Some(forceflush_timeout) = env::var(OTEL_BLRP_FORCEFLUSH_TIMEOUT) + .ok() + .and_then(|timeout| u64::from_str(&timeout).ok()) + { + self.forceflush_timeout = Duration::from_millis(forceflush_timeout); + } + #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] if let Some(max_export_timeout) = env::var(OTEL_BLRP_EXPORT_TIMEOUT) .ok() @@ -731,7 +761,8 @@ impl BatchConfigBuilder { #[cfg(all(test, feature = "testing", feature = "logs"))] mod tests { use super::{ - BatchConfig, BatchConfigBuilder, BatchLogProcessor, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, + BatchConfig, BatchConfigBuilder, BatchLogProcessor, OTEL_BLRP_FORCEFLUSH_TIMEOUT, + OTEL_BLRP_FORCEFLUSH_TIMEOUT_DEFAULT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BLRP_SCHEDULE_DELAY, OTEL_BLRP_SCHEDULE_DELAY_DEFAULT, @@ -764,6 +795,8 @@ mod tests { "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE" ); assert_eq!(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512); + assert_eq!(OTEL_BLRP_FORCEFLUSH_TIMEOUT, "OTEL_BLRP_FORCEFLUSH_TIMEOUT"); + assert_eq!(OTEL_BLRP_FORCEFLUSH_TIMEOUT_DEFAULT.as_millis(), 5_000); } #[test] @@ -775,6 +808,7 @@ mod tests { OTEL_BLRP_EXPORT_TIMEOUT, OTEL_BLRP_MAX_QUEUE_SIZE, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, + OTEL_BLRP_FORCEFLUSH_TIMEOUT, ]; let config = temp_env::with_vars_unset(env_vars, BatchConfig::default); @@ -795,6 +829,7 @@ mod tests { (OTEL_BLRP_SCHEDULE_DELAY, Some("2000")), (OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")), (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")), + (OTEL_BLRP_FORCEFLUSH_TIMEOUT, Some("10000")), ]; temp_env::with_vars(env_vars, || { @@ -802,11 +837,13 @@ mod tests { .with_max_queue_size(2048) .with_scheduled_delay(Duration::from_millis(1000)) .with_max_export_batch_size(512) + .with_forceflush_timeout(Duration::from_millis(20000)) .build(); assert_eq!(config.scheduled_delay, Duration::from_millis(1000)); assert_eq!(config.max_queue_size, 2048); assert_eq!(config.max_export_batch_size, 512); + assert_eq!(config.forceflush_timeout, Duration::from_millis(20000)); }); } @@ -818,6 +855,7 @@ mod tests { (OTEL_BLRP_EXPORT_TIMEOUT, Some("60000")), (OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")), (OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")), + (OTEL_BLRP_FORCEFLUSH_TIMEOUT, Some("10000")), ]; let config = temp_env::with_vars(env_vars, BatchConfig::default); @@ -827,6 +865,7 @@ mod tests { assert_eq!(config.max_export_timeout, Duration::from_millis(60000)); assert_eq!(config.max_queue_size, 4096); assert_eq!(config.max_export_batch_size, 1024); + assert_eq!(config.forceflush_timeout, Duration::from_millis(10000)); } #[test] @@ -850,7 +889,8 @@ mod tests { let batch_builder = BatchConfigBuilder::default() .with_max_export_batch_size(1) .with_scheduled_delay(Duration::from_millis(2)) - .with_max_queue_size(4); + .with_max_queue_size(4) + .with_forceflush_timeout(Duration::from_millis(15)); #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] let batch_builder = batch_builder.with_max_export_timeout(Duration::from_millis(3)); @@ -861,6 +901,7 @@ mod tests { #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] assert_eq!(batch.max_export_timeout, Duration::from_millis(3)); assert_eq!(batch.max_queue_size, 4); + assert_eq!(batch.forceflush_timeout, Duration::from_millis(15)); } #[test] @@ -870,6 +911,7 @@ mod tests { (OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")), #[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")] (OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")), + (OTEL_BLRP_FORCEFLUSH_TIMEOUT, Some("5000")), ]; temp_env::with_vars(env_vars.clone(), || { let builder = BatchLogProcessor::builder(InMemoryLogExporter::default()); @@ -889,6 +931,10 @@ mod tests { builder.config.max_export_timeout, Duration::from_millis(2046) ); + assert_eq!( + builder.config.forceflush_timeout, + OTEL_BLRP_FORCEFLUSH_TIMEOUT_DEFAULT + ); }); env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120"))); @@ -897,6 +943,10 @@ mod tests { let builder = BatchLogProcessor::builder(InMemoryLogExporter::default()); assert_eq!(builder.config.max_export_batch_size, 120); assert_eq!(builder.config.max_queue_size, 120); + assert_eq!( + builder.config.forceflush_timeout, + OTEL_BLRP_FORCEFLUSH_TIMEOUT_DEFAULT + ); }); } @@ -906,6 +956,7 @@ mod tests { .with_max_export_batch_size(1) .with_scheduled_delay(Duration::from_millis(2)) .with_max_queue_size(4) + .with_forceflush_timeout(Duration::from_millis(15)) .build(); let builder = @@ -915,6 +966,7 @@ mod tests { assert_eq!(actual.max_export_batch_size, 1); assert_eq!(actual.scheduled_delay, Duration::from_millis(2)); assert_eq!(actual.max_queue_size, 4); + assert_eq!(actual.forceflush_timeout, Duration::from_millis(15)); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)]