Skip to content

Commit d02642e

Browse files
adriangb2010YOUY01Copilot
authored
Add a SpillingPool to manage collections of spill files (#18207)
Addresses #18014 (comment), potentially paves the path to solve #18011 for other operators as well --------- Co-authored-by: Yongting You <[email protected]> Co-authored-by: Copilot <[email protected]>
1 parent a899ca0 commit d02642e

File tree

9 files changed

+2352
-255
lines changed

9 files changed

+2352
-255
lines changed

datafusion/common/src/config.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,23 @@ config_namespace! {
517517
/// batches and merged.
518518
pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
519519

520+
/// Maximum size in bytes for individual spill files before rotating to a new file.
521+
///
522+
/// When operators spill data to disk (e.g., RepartitionExec), they write
523+
/// multiple batches to the same file until this size limit is reached, then rotate
524+
/// to a new file. This reduces syscall overhead compared to one-file-per-batch
525+
/// while preventing files from growing too large.
526+
///
527+
/// A larger value reduces file creation overhead but may hold more disk space.
528+
/// A smaller value creates more files but allows finer-grained space reclamation
529+
/// as files can be deleted once fully consumed.
530+
///
531+
/// Now only `RepartitionExec` supports this spill file rotation feature, other spilling operators
532+
/// may create spill files larger than the limit.
533+
///
534+
/// Default: 128 MB
535+
pub max_spill_file_size_bytes: usize, default = 128 * 1024 * 1024
536+
520537
/// Number of files to read in parallel when inferring schema and statistics
521538
pub meta_fetch_concurrency: usize, default = 32
522539

datafusion/execution/src/disk_manager.rs

Lines changed: 237 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -283,11 +283,13 @@ impl DiskManager {
283283

284284
let dir_index = rng().random_range(0..local_dirs.len());
285285
Ok(RefCountedTempFile {
286-
_parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
287-
tempfile: Builder::new()
288-
.tempfile_in(local_dirs[dir_index].as_ref())
289-
.map_err(DataFusionError::IoError)?,
290-
current_file_disk_usage: 0,
286+
parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
287+
tempfile: Arc::new(
288+
Builder::new()
289+
.tempfile_in(local_dirs[dir_index].as_ref())
290+
.map_err(DataFusionError::IoError)?,
291+
),
292+
current_file_disk_usage: Arc::new(AtomicU64::new(0)),
291293
disk_manager: Arc::clone(self),
292294
})
293295
}
@@ -301,26 +303,50 @@ impl DiskManager {
301303
/// must invoke [`Self::update_disk_usage`] to update the global disk usage counter.
302304
/// This ensures the disk manager can properly enforce usage limits configured by
303305
/// [`DiskManager::with_max_temp_directory_size`].
306+
///
307+
/// This type is Clone-able, allowing multiple references to the same underlying file.
308+
/// The file is deleted only when the last reference is dropped.
309+
///
310+
/// The parent temporary directory is also kept alive as long as any reference to
311+
/// this file exists, preventing premature cleanup of the directory.
312+
///
313+
/// Once all references to this file are dropped, the file is deleted, and the
314+
/// disk usage is subtracted from the disk manager's total.
304315
#[derive(Debug)]
305316
pub struct RefCountedTempFile {
306317
/// The reference to the directory in which temporary files are created to ensure
307318
/// it is not cleaned up prior to the NamedTempFile
308-
_parent_temp_dir: Arc<TempDir>,
309-
tempfile: NamedTempFile,
319+
parent_temp_dir: Arc<TempDir>,
320+
/// The underlying temporary file, wrapped in Arc to allow cloning
321+
tempfile: Arc<NamedTempFile>,
310322
/// Tracks the current disk usage of this temporary file. See
311323
/// [`Self::update_disk_usage`] for more details.
312-
current_file_disk_usage: u64,
324+
///
325+
/// This is wrapped in `Arc<AtomicU64>` so that all clones share the same
326+
/// disk usage tracking, preventing incorrect accounting when clones are dropped.
327+
current_file_disk_usage: Arc<AtomicU64>,
313328
/// The disk manager that created and manages this temporary file
314329
disk_manager: Arc<DiskManager>,
315330
}
316331

332+
impl Clone for RefCountedTempFile {
333+
fn clone(&self) -> Self {
334+
Self {
335+
parent_temp_dir: Arc::clone(&self.parent_temp_dir),
336+
tempfile: Arc::clone(&self.tempfile),
337+
current_file_disk_usage: Arc::clone(&self.current_file_disk_usage),
338+
disk_manager: Arc::clone(&self.disk_manager),
339+
}
340+
}
341+
}
342+
317343
impl RefCountedTempFile {
318344
pub fn path(&self) -> &Path {
319345
self.tempfile.path()
320346
}
321347

322348
pub fn inner(&self) -> &NamedTempFile {
323-
&self.tempfile
349+
self.tempfile.as_ref()
324350
}
325351

326352
/// Updates the global disk usage counter after modifications to the underlying file.
@@ -332,11 +358,14 @@ impl RefCountedTempFile {
332358
let metadata = self.tempfile.as_file().metadata()?;
333359
let new_disk_usage = metadata.len();
334360

361+
// Get the old disk usage
362+
let old_disk_usage = self.current_file_disk_usage.load(Ordering::Relaxed);
363+
335364
// Update the global disk usage by:
336365
// 1. Subtracting the old file size from the global counter
337366
self.disk_manager
338367
.used_disk_space
339-
.fetch_sub(self.current_file_disk_usage, Ordering::Relaxed);
368+
.fetch_sub(old_disk_usage, Ordering::Relaxed);
340369
// 2. Adding the new file size to the global counter
341370
self.disk_manager
342371
.used_disk_space
@@ -352,23 +381,29 @@ impl RefCountedTempFile {
352381
}
353382

354383
// 4. Update the local file size tracking
355-
self.current_file_disk_usage = new_disk_usage;
384+
self.current_file_disk_usage
385+
.store(new_disk_usage, Ordering::Relaxed);
356386

357387
Ok(())
358388
}
359389

360390
pub fn current_disk_usage(&self) -> u64 {
361-
self.current_file_disk_usage
391+
self.current_file_disk_usage.load(Ordering::Relaxed)
362392
}
363393
}
364394

365395
/// When the temporary file is dropped, subtract its disk usage from the disk manager's total
366396
impl Drop for RefCountedTempFile {
367397
fn drop(&mut self) {
368-
// Subtract the current file's disk usage from the global counter
369-
self.disk_manager
370-
.used_disk_space
371-
.fetch_sub(self.current_file_disk_usage, Ordering::Relaxed);
398+
// Only subtract disk usage when this is the last reference to the file
399+
// Check if we're the last one by seeing if there's only one strong reference
400+
// left to the underlying tempfile (the one we're holding)
401+
if Arc::strong_count(&self.tempfile) == 1 {
402+
let current_usage = self.current_file_disk_usage.load(Ordering::Relaxed);
403+
self.disk_manager
404+
.used_disk_space
405+
.fetch_sub(current_usage, Ordering::Relaxed);
406+
}
372407
}
373408
}
374409

@@ -523,4 +558,190 @@ mod tests {
523558

524559
Ok(())
525560
}
561+
562+
#[test]
563+
fn test_disk_usage_basic() -> Result<()> {
564+
use std::io::Write;
565+
566+
let dm = Arc::new(DiskManagerBuilder::default().build()?);
567+
let mut temp_file = dm.create_tmp_file("Testing")?;
568+
569+
// Initially, disk usage should be 0
570+
assert_eq!(dm.used_disk_space(), 0);
571+
assert_eq!(temp_file.current_disk_usage(), 0);
572+
573+
// Write some data to the file
574+
temp_file.inner().as_file().write_all(b"hello world")?;
575+
temp_file.update_disk_usage()?;
576+
577+
// Disk usage should now reflect the written data
578+
let expected_usage = temp_file.current_disk_usage();
579+
assert!(expected_usage > 0);
580+
assert_eq!(dm.used_disk_space(), expected_usage);
581+
582+
// Write more data
583+
temp_file.inner().as_file().write_all(b" more data")?;
584+
temp_file.update_disk_usage()?;
585+
586+
// Disk usage should increase
587+
let new_usage = temp_file.current_disk_usage();
588+
assert!(new_usage > expected_usage);
589+
assert_eq!(dm.used_disk_space(), new_usage);
590+
591+
// Drop the file
592+
drop(temp_file);
593+
594+
// Disk usage should return to 0
595+
assert_eq!(dm.used_disk_space(), 0);
596+
597+
Ok(())
598+
}
599+
600+
#[test]
601+
fn test_disk_usage_with_clones() -> Result<()> {
602+
use std::io::Write;
603+
604+
let dm = Arc::new(DiskManagerBuilder::default().build()?);
605+
let mut temp_file = dm.create_tmp_file("Testing")?;
606+
607+
// Write some data
608+
temp_file.inner().as_file().write_all(b"test data")?;
609+
temp_file.update_disk_usage()?;
610+
611+
let usage_after_write = temp_file.current_disk_usage();
612+
assert!(usage_after_write > 0);
613+
assert_eq!(dm.used_disk_space(), usage_after_write);
614+
615+
// Clone the file
616+
let clone1 = temp_file.clone();
617+
let clone2 = temp_file.clone();
618+
619+
// All clones should see the same disk usage
620+
assert_eq!(clone1.current_disk_usage(), usage_after_write);
621+
assert_eq!(clone2.current_disk_usage(), usage_after_write);
622+
623+
// Global disk usage should still be the same (not multiplied by number of clones)
624+
assert_eq!(dm.used_disk_space(), usage_after_write);
625+
626+
// Write more data through one clone
627+
clone1.inner().as_file().write_all(b" more data")?;
628+
let mut mutable_clone1 = clone1;
629+
mutable_clone1.update_disk_usage()?;
630+
631+
let new_usage = mutable_clone1.current_disk_usage();
632+
assert!(new_usage > usage_after_write);
633+
634+
// All clones should see the updated disk usage
635+
assert_eq!(temp_file.current_disk_usage(), new_usage);
636+
assert_eq!(clone2.current_disk_usage(), new_usage);
637+
assert_eq!(mutable_clone1.current_disk_usage(), new_usage);
638+
639+
// Global disk usage should reflect the new size (not multiplied)
640+
assert_eq!(dm.used_disk_space(), new_usage);
641+
642+
// Drop one clone
643+
drop(mutable_clone1);
644+
645+
// Disk usage should NOT change (other clones still exist)
646+
assert_eq!(dm.used_disk_space(), new_usage);
647+
assert_eq!(temp_file.current_disk_usage(), new_usage);
648+
assert_eq!(clone2.current_disk_usage(), new_usage);
649+
650+
// Drop another clone
651+
drop(clone2);
652+
653+
// Disk usage should still NOT change (original still exists)
654+
assert_eq!(dm.used_disk_space(), new_usage);
655+
assert_eq!(temp_file.current_disk_usage(), new_usage);
656+
657+
// Drop the original
658+
drop(temp_file);
659+
660+
// Now disk usage should return to 0 (last reference dropped)
661+
assert_eq!(dm.used_disk_space(), 0);
662+
663+
Ok(())
664+
}
665+
666+
#[test]
667+
fn test_disk_usage_clones_dropped_out_of_order() -> Result<()> {
668+
use std::io::Write;
669+
670+
let dm = Arc::new(DiskManagerBuilder::default().build()?);
671+
let mut temp_file = dm.create_tmp_file("Testing")?;
672+
673+
// Write data
674+
temp_file.inner().as_file().write_all(b"test")?;
675+
temp_file.update_disk_usage()?;
676+
677+
let usage = temp_file.current_disk_usage();
678+
assert_eq!(dm.used_disk_space(), usage);
679+
680+
// Create multiple clones
681+
let clone1 = temp_file.clone();
682+
let clone2 = temp_file.clone();
683+
let clone3 = temp_file.clone();
684+
685+
// Drop the original first (out of order)
686+
drop(temp_file);
687+
688+
// Disk usage should still be tracked (clones exist)
689+
assert_eq!(dm.used_disk_space(), usage);
690+
assert_eq!(clone1.current_disk_usage(), usage);
691+
692+
// Drop clones in different order
693+
drop(clone2);
694+
assert_eq!(dm.used_disk_space(), usage);
695+
696+
drop(clone1);
697+
assert_eq!(dm.used_disk_space(), usage);
698+
699+
// Drop the last clone
700+
drop(clone3);
701+
702+
// Now disk usage should be 0
703+
assert_eq!(dm.used_disk_space(), 0);
704+
705+
Ok(())
706+
}
707+
708+
#[test]
709+
fn test_disk_usage_multiple_files() -> Result<()> {
710+
use std::io::Write;
711+
712+
let dm = Arc::new(DiskManagerBuilder::default().build()?);
713+
714+
// Create multiple temp files
715+
let mut file1 = dm.create_tmp_file("Testing1")?;
716+
let mut file2 = dm.create_tmp_file("Testing2")?;
717+
718+
// Write to first file
719+
file1.inner().as_file().write_all(b"file1")?;
720+
file1.update_disk_usage()?;
721+
let usage1 = file1.current_disk_usage();
722+
723+
assert_eq!(dm.used_disk_space(), usage1);
724+
725+
// Write to second file
726+
file2.inner().as_file().write_all(b"file2 data")?;
727+
file2.update_disk_usage()?;
728+
let usage2 = file2.current_disk_usage();
729+
730+
// Global usage should be sum of both files
731+
assert_eq!(dm.used_disk_space(), usage1 + usage2);
732+
733+
// Drop first file
734+
drop(file1);
735+
736+
// Usage should only reflect second file
737+
assert_eq!(dm.used_disk_space(), usage2);
738+
739+
// Drop second file
740+
drop(file2);
741+
742+
// Usage should be 0
743+
assert_eq!(dm.used_disk_space(), 0);
744+
745+
Ok(())
746+
}
526747
}

0 commit comments

Comments
 (0)