Skip to content

Commit fe27598

Browse files
chore: making forceflush_timeout configurable
Signed-off-by: Nicolas Takashi <[email protected]>
1 parent e9ca158 commit fe27598

File tree

1 file changed

+55
-3
lines changed

1 file changed

+55
-3
lines changed

opentelemetry-sdk/src/logs/batch_log_processor.rs

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ pub(crate) const OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
5454
pub(crate) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE";
5555
/// Default maximum batch size.
5656
pub(crate) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
57+
/// Maximum force flush timeout.
58+
pub(crate) const OTEL_BLRP_FORCEFLUSH_TIMEOUT: &str = "OTEL_BLRP_FORCEFLUSH_TIMEOUT";
59+
/// Default maximum force flush timeout.
60+
pub(crate) const OTEL_BLRP_FORCEFLUSH_TIMEOUT_DEFAULT: Duration = Duration::from_millis(5_000);
5761

5862
/// Messages sent between application thread and batch log processor's work thread.
5963
#[allow(clippy::large_enum_variant)]
@@ -333,6 +337,7 @@ impl BatchLogProcessor {
333337
let (message_sender, message_receiver) = mpsc::sync_channel::<BatchMessage>(64); // Is this a reasonable bound?
334338
let max_queue_size = config.max_queue_size;
335339
let max_export_batch_size = config.max_export_batch_size;
340+
let forceflush_timeout = config.forceflush_timeout;
336341
let current_batch_size = Arc::new(AtomicUsize::new(0));
337342
let current_batch_size_for_thread = current_batch_size.clone();
338343

@@ -485,7 +490,7 @@ impl BatchLogProcessor {
485490
logs_sender,
486491
message_sender,
487492
handle: Mutex::new(Some(handle)),
488-
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
493+
forceflush_timeout,
489494
dropped_logs_count: AtomicUsize::new(0),
490495
max_queue_size,
491496
export_log_message_sent: Arc::new(AtomicBool::new(false)),
@@ -582,6 +587,9 @@ pub struct BatchConfig {
582587
/// is 512.
583588
pub(crate) max_export_batch_size: usize,
584589

590+
/// The maximum duration to wait for a force flush to complete. The default value is 5 seconds.
591+
pub(crate) forceflush_timeout: Duration,
592+
585593
/// The maximum duration to export a batch of data.
586594
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
587595
pub(crate) max_export_timeout: Duration,
@@ -599,6 +607,7 @@ pub struct BatchConfigBuilder {
599607
max_queue_size: usize,
600608
scheduled_delay: Duration,
601609
max_export_batch_size: usize,
610+
forceflush_timeout: Duration,
602611
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
603612
max_export_timeout: Duration,
604613
}
@@ -618,6 +627,7 @@ impl Default for BatchConfigBuilder {
618627
max_queue_size: OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT,
619628
scheduled_delay: OTEL_BLRP_SCHEDULE_DELAY_DEFAULT,
620629
max_export_batch_size: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
630+
forceflush_timeout: OTEL_BLRP_FORCEFLUSH_TIMEOUT_DEFAULT,
621631
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
622632
max_export_timeout: OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT,
623633
}
@@ -678,6 +688,18 @@ impl BatchConfigBuilder {
678688
self
679689
}
680690

691+
/// Set forceflush_timeout for [`BatchConfigBuilder`].
692+
/// It's the maximum duration to wait for a force flush to complete.
693+
/// The default value is 5 seconds.
694+
///
695+
/// Corresponding environment variable: `OTEL_BLRP_FORCEFLUSH_TIMEOUT`.
696+
///
697+
/// Note: Programmatically setting this will override any value set via the environment variable.
698+
pub fn with_forceflush_timeout(mut self, forceflush_timeout: Duration) -> Self {
699+
self.forceflush_timeout = forceflush_timeout;
700+
self
701+
}
702+
681703
/// Builds a `BatchConfig` enforcing the following invariants:
682704
/// * `max_export_batch_size` must be less than or equal to `max_queue_size`.
683705
pub fn build(self) -> BatchConfig {
@@ -691,6 +713,7 @@ impl BatchConfigBuilder {
691713
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
692714
max_export_timeout: self.max_export_timeout,
693715
max_export_batch_size,
716+
forceflush_timeout: self.forceflush_timeout,
694717
}
695718
}
696719

@@ -716,6 +739,13 @@ impl BatchConfigBuilder {
716739
self.scheduled_delay = Duration::from_millis(scheduled_delay);
717740
}
718741

742+
if let Some(forceflush_timeout) = env::var(OTEL_BLRP_FORCEFLUSH_TIMEOUT)
743+
.ok()
744+
.and_then(|timeout| u64::from_str(&timeout).ok())
745+
{
746+
self.forceflush_timeout = Duration::from_millis(forceflush_timeout);
747+
}
748+
719749
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
720750
if let Some(max_export_timeout) = env::var(OTEL_BLRP_EXPORT_TIMEOUT)
721751
.ok()
@@ -731,7 +761,8 @@ impl BatchConfigBuilder {
731761
#[cfg(all(test, feature = "testing", feature = "logs"))]
732762
mod tests {
733763
use super::{
734-
BatchConfig, BatchConfigBuilder, BatchLogProcessor, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
764+
BatchConfig, BatchConfigBuilder, BatchLogProcessor, OTEL_BLRP_FORCEFLUSH_TIMEOUT,
765+
OTEL_BLRP_FORCEFLUSH_TIMEOUT_DEFAULT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
735766
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, OTEL_BLRP_MAX_QUEUE_SIZE,
736767
OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BLRP_SCHEDULE_DELAY,
737768
OTEL_BLRP_SCHEDULE_DELAY_DEFAULT,
@@ -764,6 +795,8 @@ mod tests {
764795
"OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"
765796
);
766797
assert_eq!(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512);
798+
assert_eq!(OTEL_BLRP_FORCEFLUSH_TIMEOUT, "OTEL_BLRP_FORCEFLUSH_TIMEOUT");
799+
assert_eq!(OTEL_BLRP_FORCEFLUSH_TIMEOUT_DEFAULT.as_millis(), 5_000);
767800
}
768801

769802
#[test]
@@ -775,6 +808,7 @@ mod tests {
775808
OTEL_BLRP_EXPORT_TIMEOUT,
776809
OTEL_BLRP_MAX_QUEUE_SIZE,
777810
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
811+
OTEL_BLRP_FORCEFLUSH_TIMEOUT,
778812
];
779813

780814
let config = temp_env::with_vars_unset(env_vars, BatchConfig::default);
@@ -795,18 +829,21 @@ mod tests {
795829
(OTEL_BLRP_SCHEDULE_DELAY, Some("2000")),
796830
(OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")),
797831
(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
832+
(OTEL_BLRP_FORCEFLUSH_TIMEOUT, Some("10000")),
798833
];
799834

800835
temp_env::with_vars(env_vars, || {
801836
let config = BatchConfigBuilder::default()
802837
.with_max_queue_size(2048)
803838
.with_scheduled_delay(Duration::from_millis(1000))
804839
.with_max_export_batch_size(512)
840+
.with_forceflush_timeout(Duration::from_millis(20000))
805841
.build();
806842

807843
assert_eq!(config.scheduled_delay, Duration::from_millis(1000));
808844
assert_eq!(config.max_queue_size, 2048);
809845
assert_eq!(config.max_export_batch_size, 512);
846+
assert_eq!(config.forceflush_timeout, Duration::from_millis(20000));
810847
});
811848
}
812849

@@ -818,6 +855,7 @@ mod tests {
818855
(OTEL_BLRP_EXPORT_TIMEOUT, Some("60000")),
819856
(OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")),
820857
(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
858+
(OTEL_BLRP_FORCEFLUSH_TIMEOUT, Some("10000")),
821859
];
822860

823861
let config = temp_env::with_vars(env_vars, BatchConfig::default);
@@ -827,6 +865,7 @@ mod tests {
827865
assert_eq!(config.max_export_timeout, Duration::from_millis(60000));
828866
assert_eq!(config.max_queue_size, 4096);
829867
assert_eq!(config.max_export_batch_size, 1024);
868+
assert_eq!(config.forceflush_timeout, Duration::from_millis(10000));
830869
}
831870

832871
#[test]
@@ -850,7 +889,8 @@ mod tests {
850889
let batch_builder = BatchConfigBuilder::default()
851890
.with_max_export_batch_size(1)
852891
.with_scheduled_delay(Duration::from_millis(2))
853-
.with_max_queue_size(4);
892+
.with_max_queue_size(4)
893+
.with_forceflush_timeout(Duration::from_millis(15));
854894

855895
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
856896
let batch_builder = batch_builder.with_max_export_timeout(Duration::from_millis(3));
@@ -861,6 +901,7 @@ mod tests {
861901
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
862902
assert_eq!(batch.max_export_timeout, Duration::from_millis(3));
863903
assert_eq!(batch.max_queue_size, 4);
904+
assert_eq!(batch.forceflush_timeout, Duration::from_millis(15));
864905
}
865906

866907
#[test]
@@ -870,6 +911,7 @@ mod tests {
870911
(OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")),
871912
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
872913
(OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")),
914+
(OTEL_BLRP_FORCEFLUSH_TIMEOUT, Some("10000")),
873915
];
874916
temp_env::with_vars(env_vars.clone(), || {
875917
let builder = BatchLogProcessor::builder(InMemoryLogExporter::default());
@@ -889,6 +931,10 @@ mod tests {
889931
builder.config.max_export_timeout,
890932
Duration::from_millis(2046)
891933
);
934+
assert_eq!(
935+
builder.config.forceflush_timeout,
936+
OTEL_BLRP_FORCEFLUSH_TIMEOUT_DEFAULT
937+
);
892938
});
893939

894940
env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120")));
@@ -897,6 +943,10 @@ mod tests {
897943
let builder = BatchLogProcessor::builder(InMemoryLogExporter::default());
898944
assert_eq!(builder.config.max_export_batch_size, 120);
899945
assert_eq!(builder.config.max_queue_size, 120);
946+
assert_eq!(
947+
builder.config.forceflush_timeout,
948+
OTEL_BLRP_FORCEFLUSH_TIMEOUT_DEFAULT
949+
);
900950
});
901951
}
902952

@@ -906,6 +956,7 @@ mod tests {
906956
.with_max_export_batch_size(1)
907957
.with_scheduled_delay(Duration::from_millis(2))
908958
.with_max_queue_size(4)
959+
.with_forceflush_timeout(Duration::from_millis(15))
909960
.build();
910961

911962
let builder =
@@ -915,6 +966,7 @@ mod tests {
915966
assert_eq!(actual.max_export_batch_size, 1);
916967
assert_eq!(actual.scheduled_delay, Duration::from_millis(2));
917968
assert_eq!(actual.max_queue_size, 4);
969+
assert_eq!(actual.forceflush_timeout, Duration::from_millis(15));
918970
}
919971

920972
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]

0 commit comments

Comments
 (0)