Skip to content

Commit 53c71b6

Browse files
andygroveclaude
andcommitted
perf: optimize shuffle writer with buffered I/O and fix file size bug
1. Add BufWriter for buffered file I/O in shuffle writer Wrapping File with BufWriter reduces syscalls when writing multiple small batches to shuffle files. This affects both the hash-partitioned shuffle path and the non-partitioned write_stream_to_disk utility. 2. Fix file size read before writer finish Previously, fs::metadata() was called before writer.finish(), which could report incorrect file sizes since data may not have been fully flushed to disk. This is especially important now that BufWriter is used, as buffered data would not be reflected in the file size until after finish() flushes it. Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent 08544f4 commit 53c71b6

File tree

2 files changed

+8
-5
lines changed

2 files changed

+8
-5
lines changed

ballista/core/src/execution_plans/shuffle_writer.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use std::fmt::Debug;
2929
use std::fs;
3030
use std::fs::File;
3131
use std::future::Future;
32+
use std::io::BufWriter;
3233
use std::iter::Iterator;
3334
use std::path::PathBuf;
3435
use std::sync::Arc;
@@ -105,7 +106,7 @@ impl std::fmt::Display for ShuffleWriterExec {
105106
pub struct WriteTracker {
106107
pub num_batches: usize,
107108
pub num_rows: usize,
108-
pub writer: StreamWriter<File>,
109+
pub writer: StreamWriter<BufWriter<File>>,
109110
pub path: PathBuf,
110111
}
111112

@@ -295,7 +296,8 @@ impl ShuffleWriterExec {
295296
CompressionType::LZ4_FRAME,
296297
))?;
297298

298-
let file = File::create(path.clone())?;
299+
let file =
300+
BufWriter::new(File::create(path.clone())?);
299301
let mut writer =
300302
StreamWriter::try_new_with_options(
301303
file,
@@ -323,8 +325,8 @@ impl ShuffleWriterExec {
323325

324326
for (i, w) in writers.iter_mut().enumerate() {
325327
if let Some(w) = w {
326-
let num_bytes = fs::metadata(&w.path)?.len();
327328
w.writer.finish()?;
329+
let num_bytes = fs::metadata(&w.path)?.len();
328330
debug!(
329331
"Finished writing shuffle partition {} at {:?}. Batches: {}. Rows: {}. Bytes: {}.",
330332
i, w.path, w.num_batches, w.num_rows, num_bytes

ballista/core/src/utils.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use datafusion::physical_plan::metrics::MetricsSet;
3131
use datafusion::physical_plan::{ExecutionPlan, RecordBatchStream, metrics};
3232
use futures::StreamExt;
3333
use log::error;
34+
use std::io::BufWriter;
3435
use std::sync::Arc;
3536
use std::time::{Duration, SystemTime, UNIX_EPOCH};
3637
use std::{fs::File, pin::Pin};
@@ -148,10 +149,10 @@ pub async fn write_stream_to_disk(
148149
path: &str,
149150
disk_write_metric: &metrics::Time,
150151
) -> Result<PartitionStats> {
151-
let file = File::create(path).map_err(|e| {
152+
let file = BufWriter::new(File::create(path).map_err(|e| {
152153
error!("Failed to create partition file at {path}: {e:?}");
153154
BallistaError::IoError(e)
154-
})?;
155+
})?);
155156

156157
let mut num_rows = 0;
157158
let mut num_batches = 0;

0 commit comments

Comments
 (0)