Skip to content

Commit 4842c8b

Browse files
committed
add forceflush_timeout configurability to BatchSpanProcessor and BatchLogProcessor
1 parent cff5728 commit 4842c8b

File tree

2 files changed

+60
-2
lines changed

2 files changed

+60
-2
lines changed

opentelemetry-sdk/src/logs/batch_log_processor.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ pub(crate) const OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
5454
pub(crate) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE";
5555
/// Default maximum batch size.
5656
pub(crate) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
57+
/// Force flush timeout
58+
pub(crate) const OTEL_BLRP_FORCE_FLUSH_TIMEOUT: &str = "OTEL_BLRP_FORCE_FLUSH_TIMEOUT";
59+
/// Default force flush timeout
60+
pub(crate) const OTEL_BLRP_FORCE_FLUSH_TIMEOUT_DEFAULT: Duration = Duration::from_millis(5_000);
5761

5862
/// Messages sent between application thread and batch log processor's work thread.
5963
#[allow(clippy::large_enum_variant)]
@@ -339,6 +343,7 @@ impl BatchLogProcessor {
339343
let max_export_batch_size = config.max_export_batch_size;
340344
let current_batch_size = Arc::new(AtomicUsize::new(0));
341345
let current_batch_size_for_thread = current_batch_size.clone();
346+
let forceflush_timeout = config.forceflush_timeout;
342347

343348
let handle = thread::Builder::new()
344349
.name("OpenTelemetry.Logs.BatchProcessor".to_string())
@@ -489,7 +494,7 @@ impl BatchLogProcessor {
489494
logs_sender,
490495
message_sender,
491496
handle: Mutex::new(Some(handle)),
492-
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
497+
forceflush_timeout,
493498
dropped_logs_count: AtomicUsize::new(0),
494499
max_queue_size,
495500
export_log_message_sent: Arc::new(AtomicBool::new(false)),
@@ -586,6 +591,9 @@ pub struct BatchConfig {
586591
/// is 512.
587592
pub(crate) max_export_batch_size: usize,
588593

594+
/// The maximum duration to wait when force flushing.
595+
pub(crate) forceflush_timeout: Duration,
596+
589597
/// The maximum duration to export a batch of data.
590598
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
591599
pub(crate) max_export_timeout: Duration,
@@ -603,6 +611,7 @@ pub struct BatchConfigBuilder {
603611
max_queue_size: usize,
604612
scheduled_delay: Duration,
605613
max_export_batch_size: usize,
614+
forceflush_timeout: Duration,
606615
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
607616
max_export_timeout: Duration,
608617
}
@@ -622,6 +631,7 @@ impl Default for BatchConfigBuilder {
622631
max_queue_size: OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT,
623632
scheduled_delay: OTEL_BLRP_SCHEDULE_DELAY_DEFAULT,
624633
max_export_batch_size: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
634+
forceflush_timeout: OTEL_BLRP_FORCE_FLUSH_TIMEOUT_DEFAULT,
625635
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
626636
max_export_timeout: OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT,
627637
}
@@ -682,6 +692,17 @@ impl BatchConfigBuilder {
682692
self
683693
}
684694

695+
/// Set forceflush_timeout for [`BatchConfigBuilder`].
696+
/// The default value is 5000 milliseconds.
697+
///
698+
/// Corresponding environment variable: `OTEL_BLRP_FORCE_FLUSH_TIMEOUT`.
699+
///
700+
/// Note: Programmatically setting this will override any value set via the environment variable.
701+
pub fn with_forceflush_timeout(mut self, forceflush_timeout: Duration) -> Self {
702+
self.forceflush_timeout = forceflush_timeout;
703+
self
704+
}
705+
685706
/// Builds a `BatchConfig` enforcing the following invariants:
686707
/// * `max_export_batch_size` must be less than or equal to `max_queue_size`.
687708
pub fn build(self) -> BatchConfig {
@@ -692,6 +713,7 @@ impl BatchConfigBuilder {
692713
BatchConfig {
693714
max_queue_size: self.max_queue_size,
694715
scheduled_delay: self.scheduled_delay,
716+
forceflush_timeout: self.forceflush_timeout,
695717
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
696718
max_export_timeout: self.max_export_timeout,
697719
max_export_batch_size,
@@ -720,6 +742,13 @@ impl BatchConfigBuilder {
720742
self.scheduled_delay = Duration::from_millis(scheduled_delay);
721743
}
722744

745+
if let Some(forceflush_timeout) = env::var(OTEL_BLRP_FORCE_FLUSH_TIMEOUT)
746+
.ok()
747+
.and_then(|s| u64::from_str(&s).ok())
748+
{
749+
self.forceflush_timeout = Duration::from_millis(forceflush_timeout);
750+
}
751+
723752
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
724753
if let Some(max_export_timeout) = env::var(OTEL_BLRP_EXPORT_TIMEOUT)
725754
.ok()

opentelemetry-sdk/src/trace/span_processor.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ pub(crate) const OTEL_BSP_EXPORT_TIMEOUT_DEFAULT: Duration = Duration::from_mill
7070
pub(crate) const OTEL_BSP_MAX_CONCURRENT_EXPORTS: &str = "OTEL_BSP_MAX_CONCURRENT_EXPORTS";
7171
/// Default max concurrent exports for BSP
7272
pub(crate) const OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT: usize = 1;
73+
/// Force flush timeout
74+
pub(crate) const OTEL_BSP_FORCE_FLUSH_TIMEOUT: &str = "OTEL_BSP_FORCE_FLUSH_TIMEOUT";
75+
/// Default force flush timeout
76+
pub(crate) const OTEL_BSP_FORCE_FLUSH_TIMEOUT_DEFAULT: Duration = Duration::from_millis(5_000);
7377

7478
/// `SpanProcessor` is an interface which allows hooks for span start and end
7579
/// method invocations. The span processors are invoked only when is_recording
@@ -312,6 +316,7 @@ impl BatchSpanProcessor {
312316
let max_export_batch_size = config.max_export_batch_size;
313317
let current_batch_size = Arc::new(AtomicUsize::new(0));
314318
let current_batch_size_for_thread = current_batch_size.clone();
319+
let forceflush_timeout = config.forceflush_timeout;
315320

316321
let handle = thread::Builder::new()
317322
.name("OpenTelemetry.Traces.BatchProcessor".to_string())
@@ -424,7 +429,7 @@ impl BatchSpanProcessor {
424429
span_sender,
425430
message_sender,
426431
handle: Mutex::new(Some(handle)),
427-
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
432+
forceflush_timeout,
428433
dropped_spans_count: AtomicUsize::new(0),
429434
max_queue_size,
430435
export_span_message_sent: Arc::new(AtomicBool::new(false)),
@@ -757,6 +762,9 @@ pub struct BatchConfig {
757762
/// by an exporter. A value of 1 will cause exports to be performed
758763
/// synchronously on the BatchSpanProcessor task.
759764
pub(crate) max_concurrent_exports: usize,
765+
766+
/// The maximum duration to wait when force flushing.
767+
pub(crate) forceflush_timeout: Duration,
760768
}
761769

762770
impl Default for BatchConfig {
@@ -773,6 +781,7 @@ pub struct BatchConfigBuilder {
773781
max_export_batch_size: usize,
774782
max_export_timeout: Duration,
775783
max_concurrent_exports: usize,
784+
forceflush_timeout: Duration,
776785
}
777786

778787
impl Default for BatchConfigBuilder {
@@ -793,6 +802,7 @@ impl Default for BatchConfigBuilder {
793802
max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
794803
max_export_timeout: OTEL_BSP_EXPORT_TIMEOUT_DEFAULT,
795804
max_concurrent_exports: OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT,
805+
forceflush_timeout: OTEL_BSP_FORCE_FLUSH_TIMEOUT_DEFAULT,
796806
}
797807
.init_from_env_vars()
798808
}
@@ -868,6 +878,17 @@ impl BatchConfigBuilder {
868878
self
869879
}
870880

881+
/// Set forceflush_timeout for [`BatchConfigBuilder`].
882+
/// The default value is 5000 milliseconds.
883+
///
884+
/// Corresponding environment variable: `OTEL_BSP_FORCE_FLUSH_TIMEOUT`.
885+
///
886+
/// Note: Programmatically setting this will override any value set via the environment variable.
887+
pub fn with_forceflush_timeout(mut self, forceflush_timeout: Duration) -> Self {
888+
self.forceflush_timeout = forceflush_timeout;
889+
self
890+
}
891+
871892
/// Builds a `BatchConfig` enforcing the following invariants:
872893
/// * `max_export_batch_size` must be less than or equal to `max_queue_size`.
873894
pub fn build(self) -> BatchConfig {
@@ -880,6 +901,7 @@ impl BatchConfigBuilder {
880901
scheduled_delay: self.scheduled_delay,
881902
max_export_timeout: self.max_export_timeout,
882903
max_concurrent_exports: self.max_concurrent_exports,
904+
forceflush_timeout: self.forceflush_timeout,
883905
max_export_batch_size,
884906
}
885907
}
@@ -926,6 +948,13 @@ impl BatchConfigBuilder {
926948
self.max_export_timeout = Duration::from_millis(max_export_timeout);
927949
}
928950

951+
if let Some(forceflush_timeout) = env::var(OTEL_BSP_FORCE_FLUSH_TIMEOUT)
952+
.ok()
953+
.and_then(|s| u64::from_str(&s).ok())
954+
{
955+
self.forceflush_timeout = Duration::from_millis(forceflush_timeout);
956+
}
957+
929958
self
930959
}
931960
}

0 commit comments

Comments
 (0)