Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions ballista/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::fmt::Debug;
use std::fs;
use std::fs::File;
use std::future::Future;
use std::io::BufWriter;
use std::iter::Iterator;
use std::path::PathBuf;
use std::sync::Arc;
Expand Down Expand Up @@ -105,7 +106,7 @@ impl std::fmt::Display for ShuffleWriterExec {
pub struct WriteTracker {
pub num_batches: usize,
pub num_rows: usize,
pub writer: StreamWriter<File>,
pub writer: StreamWriter<BufWriter<File>>,
pub path: PathBuf,
}

Expand Down Expand Up @@ -295,7 +296,8 @@ impl ShuffleWriterExec {
CompressionType::LZ4_FRAME,
))?;

let file = File::create(path.clone())?;
let file =
BufWriter::new(File::create(path.clone())?);
let mut writer =
StreamWriter::try_new_with_options(
file,
Expand Down Expand Up @@ -323,8 +325,8 @@ impl ShuffleWriterExec {

for (i, w) in writers.iter_mut().enumerate() {
if let Some(w) = w {
let num_bytes = fs::metadata(&w.path)?.len();
w.writer.finish()?;
let num_bytes = fs::metadata(&w.path)?.len();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

took me a moment to understand whats changed

debug!(
"Finished writing shuffle partition {} at {:?}. Batches: {}. Rows: {}. Bytes: {}.",
i, w.path, w.num_batches, w.num_rows, num_bytes
Expand Down
5 changes: 3 additions & 2 deletions ballista/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::{ExecutionPlan, RecordBatchStream, metrics};
use futures::StreamExt;
use log::error;
use std::io::BufWriter;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{fs::File, pin::Pin};
Expand Down Expand Up @@ -148,10 +149,10 @@ pub async fn write_stream_to_disk(
path: &str,
disk_write_metric: &metrics::Time,
) -> Result<PartitionStats> {
let file = File::create(path).map_err(|e| {
let file = BufWriter::new(File::create(path).map_err(|e| {
error!("Failed to create partition file at {path}: {e:?}");
BallistaError::IoError(e)
})?;
})?);

let mut num_rows = 0;
let mut num_batches = 0;
Expand Down