Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion benchmarks/src/util/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,15 @@ impl CommonOpt {
pub fn runtime_env_builder(&self) -> Result<RuntimeEnvBuilder> {
let mut rt_builder = RuntimeEnvBuilder::new();
const NUM_TRACKED_CONSUMERS: usize = 5;
if let Some(memory_limit) = self.memory_limit {
// Use CLI --memory-limit if provided, otherwise fall back to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could use https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html#method.from_env and pick up any environment variables, rather than just the memory limit?

Copy link
Contributor Author

@adriangb adriangb Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That only loads the SessionConfig options, the memory limit is part of the RuntimeEnv which comes into play via SessionContext (SessionConfig + RuntimeEnv).

So we'd have to do some refactoring to create SessionContext::from_env(), or add the memory limit to SessionConfig and add RuntimeEnv::new_from_config(&config) or something.

// DATAFUSION_RUNTIME_MEMORY_LIMIT env var
let memory_limit = self.memory_limit.or_else(|| {
std::env::var("DATAFUSION_RUNTIME_MEMORY_LIMIT")
.ok()
.and_then(|val| parse_memory_limit(&val).ok())
});

if let Some(memory_limit) = memory_limit {
let pool: Arc<dyn MemoryPool> = match self.mem_pool_type.as_str() {
"fair" => Arc::new(TrackConsumersPool::new(
FairSpillPool::new(memory_limit),
Expand Down Expand Up @@ -138,6 +146,39 @@ fn parse_memory_limit(limit: &str) -> Result<usize, String> {
mod tests {
use super::*;

#[test]
fn test_runtime_env_builder_reads_env_var() {
// Set the env var and verify runtime_env_builder picks it up
// when no CLI --memory-limit is provided
let opt = CommonOpt {
iterations: 3,
partitions: None,
batch_size: None,
mem_pool_type: "fair".to_string(),
memory_limit: None,
sort_spill_reservation_bytes: None,
debug: false,
};

// With env var set, builder should succeed and have a memory pool
// SAFETY: This test is single-threaded and the env var is restored after use
unsafe {
std::env::set_var("DATAFUSION_RUNTIME_MEMORY_LIMIT", "2G");
}
let builder = opt.runtime_env_builder().unwrap();
let runtime = builder.build().unwrap();
unsafe {
std::env::remove_var("DATAFUSION_RUNTIME_MEMORY_LIMIT");
}
// A 2G memory pool should be present — verify it reports the correct limit
match runtime.memory_pool.memory_limit() {
datafusion::execution::memory_pool::MemoryLimit::Finite(limit) => {
assert_eq!(limit, 2 * 1024 * 1024 * 1024);
}
_ => panic!("Expected Finite memory limit"),
}
}

#[test]
fn test_parse_memory_limit_all() {
// Test valid inputs
Expand Down