Skip to content

Commit fcb193b

Browse files
authored
Update spilled_bytes to report spill file size (#16535)
1 parent 1b36400 commit fcb193b

File tree

5 files changed

+19
-7
lines changed

5 files changed

+19
-7
lines changed

datafusion/execution/src/disk_manager.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,10 @@ impl RefCountedTempFile {
366366

367367
Ok(())
368368
}
369+
370+
pub fn current_disk_usage(&self) -> u64 {
371+
self.current_file_disk_usage
372+
}
369373
}
370374

371375
/// When the temporary file is dropped, subtract its disk usage from the disk manager's total

datafusion/physical-plan/src/metrics/baseline.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ pub struct SpillMetrics {
151151
/// count of spills during the execution of the operator
152152
pub spill_file_count: Count,
153153

154-
/// total spilled bytes during the execution of the operator
154+
/// total bytes actually written to disk during the execution of the operator
155155
pub spilled_bytes: Count,
156156

157157
/// total spilled rows during the execution of the operator

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1514,7 +1514,7 @@ mod tests {
15141514
// bytes. We leave a little wiggle room for the actual numbers.
15151515
assert!((3..=10).contains(&spill_count));
15161516
assert!((9000..=10000).contains(&spilled_rows));
1517-
assert!((38000..=42000).contains(&spilled_bytes));
1517+
assert!((38000..=44000).contains(&spilled_bytes));
15181518

15191519
let columns = result[0].columns();
15201520

datafusion/physical-plan/src/spill/in_progress_spill_file.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,14 @@ impl InProgressSpillFile {
7575
}
7676
}
7777
if let Some(writer) = &mut self.writer {
78-
let (spilled_rows, spilled_bytes) = writer.write(batch)?;
78+
let (spilled_rows, _) = writer.write(batch)?;
7979
if let Some(in_progress_file) = &mut self.in_progress_file {
8080
in_progress_file.update_disk_usage()?;
8181
} else {
8282
unreachable!() // Already checked inside current function
8383
}
8484

8585
// Update metrics
86-
self.spill_writer.metrics.spilled_bytes.add(spilled_bytes);
8786
self.spill_writer.metrics.spilled_rows.add(spilled_rows);
8887
}
8988
Ok(())
@@ -98,6 +97,14 @@ impl InProgressSpillFile {
9897
return Ok(None);
9998
}
10099

100+
// Since spill files are append-only, add the file size to spilled_bytes
101+
if let Some(in_progress_file) = &mut self.in_progress_file {
102+
// Since writer.finish() writes continuation marker and message length at the end
103+
in_progress_file.update_disk_usage()?;
104+
let size = in_progress_file.current_disk_usage();
105+
self.spill_writer.metrics.spilled_bytes.add(size as usize);
106+
}
107+
101108
Ok(self.in_progress_file.take())
102109
}
103110
}

datafusion/physical-plan/src/spill/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -809,12 +809,13 @@ mod tests {
809809
Arc::new(StringArray::from(vec!["d", "e", "f"])),
810810
],
811811
)?;
812-
812+
// After appending each batch, spilled_rows should increase, while spill_file_count and
813+
// spilled_bytes remain the same (spilled_bytes is updated only after finish() is called)
813814
in_progress_file.append_batch(&batch1)?;
814-
verify_metrics(&in_progress_file, 1, 356, 3)?;
815+
verify_metrics(&in_progress_file, 1, 0, 3)?;
815816

816817
in_progress_file.append_batch(&batch2)?;
817-
verify_metrics(&in_progress_file, 1, 712, 6)?;
818+
verify_metrics(&in_progress_file, 1, 0, 6)?;
818819

819820
let completed_file = in_progress_file.finish()?;
820821
assert!(completed_file.is_some());

0 commit comments

Comments
 (0)