Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 55 additions & 3 deletions opentelemetry-sdk/src/logs/batch_log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ pub(crate) const OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
pub(crate) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE";
/// Default maximum batch size.
pub(crate) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
/// Maximum force flush timeout.
pub(crate) const OTEL_BLRP_FORCEFLUSH_TIMEOUT: &str = "OTEL_BLRP_FORCEFLUSH_TIMEOUT";
/// Default maximum force flush timeout.
pub(crate) const OTEL_BLRP_FORCEFLUSH_TIMEOUT_DEFAULT: Duration = Duration::from_millis(5_000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why not use 5s here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


/// Messages sent between application thread and batch log processor's work thread.
#[allow(clippy::large_enum_variant)]
Expand Down Expand Up @@ -333,6 +337,7 @@ impl BatchLogProcessor {
let (message_sender, message_receiver) = mpsc::sync_channel::<BatchMessage>(64); // Is this a reasonable bound?
let max_queue_size = config.max_queue_size;
let max_export_batch_size = config.max_export_batch_size;
let forceflush_timeout = config.forceflush_timeout;
let current_batch_size = Arc::new(AtomicUsize::new(0));
let current_batch_size_for_thread = current_batch_size.clone();

Expand Down Expand Up @@ -485,7 +490,7 @@ impl BatchLogProcessor {
logs_sender,
message_sender,
handle: Mutex::new(Some(handle)),
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
forceflush_timeout,
dropped_logs_count: AtomicUsize::new(0),
max_queue_size,
export_log_message_sent: Arc::new(AtomicBool::new(false)),
Expand Down Expand Up @@ -582,6 +587,9 @@ pub struct BatchConfig {
/// is 512.
pub(crate) max_export_batch_size: usize,

/// The maximum duration to wait for a force flush to complete. The default value is 5 seconds.
pub(crate) forceflush_timeout: Duration,

/// The maximum duration to export a batch of data.
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
pub(crate) max_export_timeout: Duration,
Expand All @@ -599,6 +607,7 @@ pub struct BatchConfigBuilder {
max_queue_size: usize,
scheduled_delay: Duration,
max_export_batch_size: usize,
forceflush_timeout: Duration,
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
max_export_timeout: Duration,
}
Expand All @@ -618,6 +627,7 @@ impl Default for BatchConfigBuilder {
max_queue_size: OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT,
scheduled_delay: OTEL_BLRP_SCHEDULE_DELAY_DEFAULT,
max_export_batch_size: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
forceflush_timeout: OTEL_BLRP_FORCEFLUSH_TIMEOUT_DEFAULT,
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
max_export_timeout: OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT,
}
Expand Down Expand Up @@ -678,6 +688,18 @@ impl BatchConfigBuilder {
self
}

/// Set forceflush_timeout for [`BatchConfigBuilder`].
/// It's the maximum duration to wait for a force flush to complete.
/// The default value is 5 seconds.
///
/// Corresponding environment variable: `OTEL_BLRP_FORCEFLUSH_TIMEOUT`.
///
/// Note: Programmatically setting this will override any value set via the environment variable.
pub fn with_forceflush_timeout(mut self, forceflush_timeout: Duration) -> Self {
self.forceflush_timeout = forceflush_timeout;
self
}

/// Builds a `BatchConfig` enforcing the following invariants:
/// * `max_export_batch_size` must be less than or equal to `max_queue_size`.
pub fn build(self) -> BatchConfig {
Expand All @@ -691,6 +713,7 @@ impl BatchConfigBuilder {
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
max_export_timeout: self.max_export_timeout,
max_export_batch_size,
forceflush_timeout: self.forceflush_timeout,
}
}

Expand All @@ -716,6 +739,13 @@ impl BatchConfigBuilder {
self.scheduled_delay = Duration::from_millis(scheduled_delay);
}

if let Some(forceflush_timeout) = env::var(OTEL_BLRP_FORCEFLUSH_TIMEOUT)
.ok()
.and_then(|timeout| u64::from_str(&timeout).ok())
{
self.forceflush_timeout = Duration::from_millis(forceflush_timeout);
}

#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
if let Some(max_export_timeout) = env::var(OTEL_BLRP_EXPORT_TIMEOUT)
.ok()
Expand All @@ -731,7 +761,8 @@ impl BatchConfigBuilder {
#[cfg(all(test, feature = "testing", feature = "logs"))]
mod tests {
use super::{
BatchConfig, BatchConfigBuilder, BatchLogProcessor, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
BatchConfig, BatchConfigBuilder, BatchLogProcessor, OTEL_BLRP_FORCEFLUSH_TIMEOUT,
OTEL_BLRP_FORCEFLUSH_TIMEOUT_DEFAULT, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, OTEL_BLRP_MAX_QUEUE_SIZE,
OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BLRP_SCHEDULE_DELAY,
OTEL_BLRP_SCHEDULE_DELAY_DEFAULT,
Expand Down Expand Up @@ -764,6 +795,8 @@ mod tests {
"OTEL_BLRP_MAX_EXPORT_BATCH_SIZE"
);
assert_eq!(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT, 512);
assert_eq!(OTEL_BLRP_FORCEFLUSH_TIMEOUT, "OTEL_BLRP_FORCEFLUSH_TIMEOUT");
assert_eq!(OTEL_BLRP_FORCEFLUSH_TIMEOUT_DEFAULT.as_millis(), 5_000);
}

#[test]
Expand All @@ -775,6 +808,7 @@ mod tests {
OTEL_BLRP_EXPORT_TIMEOUT,
OTEL_BLRP_MAX_QUEUE_SIZE,
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
OTEL_BLRP_FORCEFLUSH_TIMEOUT,
];

let config = temp_env::with_vars_unset(env_vars, BatchConfig::default);
Expand All @@ -795,18 +829,21 @@ mod tests {
(OTEL_BLRP_SCHEDULE_DELAY, Some("2000")),
(OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")),
(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
(OTEL_BLRP_FORCEFLUSH_TIMEOUT, Some("10000")),
];

temp_env::with_vars(env_vars, || {
let config = BatchConfigBuilder::default()
.with_max_queue_size(2048)
.with_scheduled_delay(Duration::from_millis(1000))
.with_max_export_batch_size(512)
.with_forceflush_timeout(Duration::from_millis(20000))
.build();

assert_eq!(config.scheduled_delay, Duration::from_millis(1000));
assert_eq!(config.max_queue_size, 2048);
assert_eq!(config.max_export_batch_size, 512);
assert_eq!(config.forceflush_timeout, Duration::from_millis(20000));
});
}

Expand All @@ -818,6 +855,7 @@ mod tests {
(OTEL_BLRP_EXPORT_TIMEOUT, Some("60000")),
(OTEL_BLRP_MAX_QUEUE_SIZE, Some("4096")),
(OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, Some("1024")),
(OTEL_BLRP_FORCEFLUSH_TIMEOUT, Some("10000")),
];

let config = temp_env::with_vars(env_vars, BatchConfig::default);
Expand All @@ -827,6 +865,7 @@ mod tests {
assert_eq!(config.max_export_timeout, Duration::from_millis(60000));
assert_eq!(config.max_queue_size, 4096);
assert_eq!(config.max_export_batch_size, 1024);
assert_eq!(config.forceflush_timeout, Duration::from_millis(10000));
}

#[test]
Expand All @@ -850,7 +889,8 @@ mod tests {
let batch_builder = BatchConfigBuilder::default()
.with_max_export_batch_size(1)
.with_scheduled_delay(Duration::from_millis(2))
.with_max_queue_size(4);
.with_max_queue_size(4)
.with_forceflush_timeout(Duration::from_millis(15));

#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
let batch_builder = batch_builder.with_max_export_timeout(Duration::from_millis(3));
Expand All @@ -861,6 +901,7 @@ mod tests {
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
assert_eq!(batch.max_export_timeout, Duration::from_millis(3));
assert_eq!(batch.max_queue_size, 4);
assert_eq!(batch.forceflush_timeout, Duration::from_millis(15));
}

#[test]
Expand All @@ -870,6 +911,7 @@ mod tests {
(OTEL_BLRP_SCHEDULE_DELAY, Some("I am not number")),
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
(OTEL_BLRP_EXPORT_TIMEOUT, Some("2046")),
(OTEL_BLRP_FORCEFLUSH_TIMEOUT, Some("5000")),
];
temp_env::with_vars(env_vars.clone(), || {
let builder = BatchLogProcessor::builder(InMemoryLogExporter::default());
Expand All @@ -889,6 +931,10 @@ mod tests {
builder.config.max_export_timeout,
Duration::from_millis(2046)
);
assert_eq!(
builder.config.forceflush_timeout,
OTEL_BLRP_FORCEFLUSH_TIMEOUT_DEFAULT
);
});

env_vars.push((OTEL_BLRP_MAX_QUEUE_SIZE, Some("120")));
Expand All @@ -897,6 +943,10 @@ mod tests {
let builder = BatchLogProcessor::builder(InMemoryLogExporter::default());
assert_eq!(builder.config.max_export_batch_size, 120);
assert_eq!(builder.config.max_queue_size, 120);
assert_eq!(
builder.config.forceflush_timeout,
OTEL_BLRP_FORCEFLUSH_TIMEOUT_DEFAULT
);
});
}

Expand All @@ -906,6 +956,7 @@ mod tests {
.with_max_export_batch_size(1)
.with_scheduled_delay(Duration::from_millis(2))
.with_max_queue_size(4)
.with_forceflush_timeout(Duration::from_millis(15))
.build();

let builder =
Expand All @@ -915,6 +966,7 @@ mod tests {
assert_eq!(actual.max_export_batch_size, 1);
assert_eq!(actual.scheduled_delay, Duration::from_millis(2));
assert_eq!(actual.max_queue_size, 4);
assert_eq!(actual.forceflush_timeout, Duration::from_millis(15));
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
Expand Down
Loading