Skip to content

Commit 655d63f

Browse files
jdrouetalamb
andauthored
feat: create helpers to set the max_temp_directory_size (apache#15919)
* feat: create helpers to set the max_temp_directory_size Signed-off-by: Jérémie Drouet <jeremie.drouet@gmail.com> * refactor: use helper in cli Signed-off-by: Jérémie Drouet <jeremie.drouet@gmail.com> * refactor: update error message Signed-off-by: Jérémie Drouet <jeremie.drouet@gmail.com> * refactor: use setter in tests Signed-off-by: Jérémie Drouet <jeremie.drouet@gmail.com> --------- Signed-off-by: Jérémie Drouet <jeremie.drouet@gmail.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 0c92091 commit 655d63f

File tree

3 files changed

+32
-13
lines changed

3 files changed

+32
-13
lines changed

datafusion-cli/src/main.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -177,13 +177,14 @@ async fn main_inner() -> Result<()> {
177177

178178
// set disk limit
179179
if let Some(disk_limit) = args.disk_limit {
180-
let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
180+
let mut disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
181181

182-
let disk_manager = Arc::try_unwrap(disk_manager)
183-
.expect("DiskManager should be a single instance")
184-
.with_max_temp_directory_size(disk_limit.try_into().unwrap())?;
182+
DiskManager::set_arc_max_temp_directory_size(
183+
&mut disk_manager,
184+
disk_limit.try_into().unwrap(),
185+
)?;
185186

186-
let disk_config = DiskManagerConfig::new_existing(Arc::new(disk_manager));
187+
let disk_config = DiskManagerConfig::new_existing(disk_manager);
187188
rt_builder = rt_builder.with_disk_manager(disk_config);
188189
}
189190

datafusion/core/tests/memory_limit/mod.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -534,11 +534,9 @@ async fn setup_context(
534534
disk_limit: u64,
535535
memory_pool_limit: usize,
536536
) -> Result<SessionContext> {
537-
let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
537+
let mut disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
538538

539-
let disk_manager = Arc::try_unwrap(disk_manager)
540-
.expect("DiskManager should be a single instance")
541-
.with_max_temp_directory_size(disk_limit)?;
539+
DiskManager::set_arc_max_temp_directory_size(&mut disk_manager, disk_limit)?;
542540

543541
let runtime = RuntimeEnvBuilder::new()
544542
.with_memory_pool(Arc::new(FairSpillPool::new(memory_pool_limit)))
@@ -547,7 +545,7 @@ async fn setup_context(
547545

548546
let runtime = Arc::new(RuntimeEnv {
549547
memory_pool: runtime.memory_pool.clone(),
550-
disk_manager: Arc::new(disk_manager),
548+
disk_manager,
551549
cache_manager: runtime.cache_manager.clone(),
552550
object_store_registry: runtime.object_store_registry.clone(),
553551
});

datafusion/execution/src/disk_manager.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,10 +120,10 @@ impl DiskManager {
120120
}
121121
}
122122

123-
pub fn with_max_temp_directory_size(
124-
mut self,
123+
pub fn set_max_temp_directory_size(
124+
&mut self,
125125
max_temp_directory_size: u64,
126-
) -> Result<Self> {
126+
) -> Result<()> {
127127
// If the disk manager is disabled and `max_temp_directory_size` is not 0,
128128
// this operation is not meaningful, fail early.
129129
if self.local_dirs.lock().is_none() && max_temp_directory_size != 0 {
@@ -133,6 +133,26 @@ impl DiskManager {
133133
}
134134

135135
self.max_temp_directory_size = max_temp_directory_size;
136+
Ok(())
137+
}
138+
139+
pub fn set_arc_max_temp_directory_size(
140+
this: &mut Arc<Self>,
141+
max_temp_directory_size: u64,
142+
) -> Result<()> {
143+
if let Some(inner) = Arc::get_mut(this) {
144+
inner.set_max_temp_directory_size(max_temp_directory_size)?;
145+
Ok(())
146+
} else {
147+
config_err!("DiskManager should be a single instance")
148+
}
149+
}
150+
151+
pub fn with_max_temp_directory_size(
152+
mut self,
153+
max_temp_directory_size: u64,
154+
) -> Result<Self> {
155+
self.set_max_temp_directory_size(max_temp_directory_size)?;
136156
Ok(self)
137157
}
138158

0 commit comments

Comments
 (0)