Skip to content

Commit c952903

Browse files
committed
Revert "Add batch coalescing in BufBatchWriter to reduce IPC schema overhead (apache#3441)"
This reverts commit 8724b76.
1 parent 5637f32 commit c952903

File tree

5 files changed

+9
-191
lines changed

5 files changed

+9
-191
lines changed

native/core/src/execution/shuffle/partitioners/multi_partition.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -442,19 +442,14 @@ impl MultiPartitionShuffleRepartitioner {
442442
encode_time: &Time,
443443
write_time: &Time,
444444
write_buffer_size: usize,
445-
batch_size: usize,
446445
) -> datafusion::common::Result<()> {
447-
let mut buf_batch_writer = BufBatchWriter::new(
448-
shuffle_block_writer,
449-
output_data,
450-
write_buffer_size,
451-
batch_size,
452-
);
446+
let mut buf_batch_writer =
447+
BufBatchWriter::new(shuffle_block_writer, output_data, write_buffer_size);
453448
for batch in partition_iter {
454449
let batch = batch?;
455450
buf_batch_writer.write(&batch, encode_time, write_time)?;
456451
}
457-
buf_batch_writer.flush(encode_time, write_time)?;
452+
buf_batch_writer.flush(write_time)?;
458453
Ok(())
459454
}
460455

@@ -513,7 +508,6 @@ impl MultiPartitionShuffleRepartitioner {
513508
&self.runtime,
514509
&self.metrics,
515510
self.write_buffer_size,
516-
self.batch_size,
517511
)?;
518512
}
519513

@@ -598,7 +592,6 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
598592
&self.metrics.encode_time,
599593
&self.metrics.write_time,
600594
self.write_buffer_size,
601-
self.batch_size,
602595
)?;
603596
}
604597

native/core/src/execution/shuffle/partitioners/single_partition.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,8 @@ impl SinglePartitionShufflePartitioner {
5959
.truncate(true)
6060
.open(output_data_path)?;
6161

62-
let output_data_writer = BufBatchWriter::new(
63-
shuffle_block_writer,
64-
output_data_file,
65-
write_buffer_size,
66-
batch_size,
67-
);
62+
let output_data_writer =
63+
BufBatchWriter::new(shuffle_block_writer, output_data_file, write_buffer_size);
6864

6965
Ok(Self {
7066
output_data_writer,
@@ -166,8 +162,7 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner {
166162
&self.metrics.write_time,
167163
)?;
168164
}
169-
self.output_data_writer
170-
.flush(&self.metrics.encode_time, &self.metrics.write_time)?;
165+
self.output_data_writer.flush(&self.metrics.write_time)?;
171166

172167
// Write index file. It should only contain 2 entries: 0 and the total number of bytes written
173168
let index_file = OpenOptions::new()

native/core/src/execution/shuffle/shuffle_writer.rs

Lines changed: 0 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -585,112 +585,4 @@ mod test {
585585
let _ = fs::remove_file("/tmp/rr_data_1.out");
586586
let _ = fs::remove_file("/tmp/rr_index_1.out");
587587
}
588-
589-
/// Test that batch coalescing in BufBatchWriter reduces output size by
590-
/// writing fewer, larger IPC blocks instead of many small ones.
591-
#[test]
592-
#[cfg_attr(miri, ignore)]
593-
fn test_batch_coalescing_reduces_size() {
594-
use crate::execution::shuffle::writers::BufBatchWriter;
595-
use arrow::array::Int32Array;
596-
597-
// Create a wide schema to amplify per-block schema overhead
598-
let fields: Vec<Field> = (0..20)
599-
.map(|i| Field::new(format!("col_{i}"), DataType::Int32, false))
600-
.collect();
601-
let schema = Arc::new(Schema::new(fields));
602-
603-
// Create many small batches (50 rows each)
604-
let small_batches: Vec<RecordBatch> = (0..100)
605-
.map(|batch_idx| {
606-
let columns: Vec<Arc<dyn Array>> = (0..20)
607-
.map(|col_idx| {
608-
let values: Vec<i32> = (0..50)
609-
.map(|row| batch_idx * 50 + row + col_idx * 1000)
610-
.collect();
611-
Arc::new(Int32Array::from(values)) as Arc<dyn Array>
612-
})
613-
.collect();
614-
RecordBatch::try_new(Arc::clone(&schema), columns).unwrap()
615-
})
616-
.collect();
617-
618-
let codec = CompressionCodec::Lz4Frame;
619-
let encode_time = Time::default();
620-
let write_time = Time::default();
621-
622-
// Write with coalescing (batch_size=8192)
623-
let mut coalesced_output = Vec::new();
624-
{
625-
let mut writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone()).unwrap();
626-
let mut buf_writer = BufBatchWriter::new(
627-
&mut writer,
628-
Cursor::new(&mut coalesced_output),
629-
1024 * 1024,
630-
8192,
631-
);
632-
for batch in &small_batches {
633-
buf_writer.write(batch, &encode_time, &write_time).unwrap();
634-
}
635-
buf_writer.flush(&encode_time, &write_time).unwrap();
636-
}
637-
638-
// Write without coalescing (batch_size=1)
639-
let mut uncoalesced_output = Vec::new();
640-
{
641-
let mut writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone()).unwrap();
642-
let mut buf_writer = BufBatchWriter::new(
643-
&mut writer,
644-
Cursor::new(&mut uncoalesced_output),
645-
1024 * 1024,
646-
1,
647-
);
648-
for batch in &small_batches {
649-
buf_writer.write(batch, &encode_time, &write_time).unwrap();
650-
}
651-
buf_writer.flush(&encode_time, &write_time).unwrap();
652-
}
653-
654-
// Coalesced output should be smaller due to fewer IPC schema blocks
655-
assert!(
656-
coalesced_output.len() < uncoalesced_output.len(),
657-
"Coalesced output ({} bytes) should be smaller than uncoalesced ({} bytes)",
658-
coalesced_output.len(),
659-
uncoalesced_output.len()
660-
);
661-
662-
// Verify both roundtrip correctly by reading all IPC blocks
663-
let coalesced_rows = read_all_ipc_blocks(&coalesced_output);
664-
let uncoalesced_rows = read_all_ipc_blocks(&uncoalesced_output);
665-
assert_eq!(
666-
coalesced_rows, 5000,
667-
"Coalesced should contain all 5000 rows"
668-
);
669-
assert_eq!(
670-
uncoalesced_rows, 5000,
671-
"Uncoalesced should contain all 5000 rows"
672-
);
673-
}
674-
675-
/// Read all IPC blocks from a byte buffer written by BufBatchWriter/ShuffleBlockWriter,
676-
/// returning the total number of rows.
677-
fn read_all_ipc_blocks(data: &[u8]) -> usize {
678-
let mut offset = 0;
679-
let mut total_rows = 0;
680-
while offset < data.len() {
681-
// First 8 bytes are the IPC length (little-endian u64)
682-
let ipc_length =
683-
u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()) as usize;
684-
// Skip the 8-byte length prefix; the next 8 bytes are field_count + codec header
685-
let block_start = offset + 8;
686-
let block_end = block_start + ipc_length;
687-
// read_ipc_compressed expects data starting after the 16-byte header
688-
// (i.e., after length + field_count), at the codec tag
689-
let ipc_data = &data[block_start + 8..block_end];
690-
let batch = read_ipc_compressed(ipc_data).unwrap();
691-
total_rows += batch.num_rows();
692-
offset = block_end;
693-
}
694-
total_rows
695-
}
696588
}

native/core/src/execution/shuffle/writers/buf_batch_writer.rs

Lines changed: 2 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -17,44 +17,27 @@
1717

1818
use crate::execution::shuffle::ShuffleBlockWriter;
1919
use arrow::array::RecordBatch;
20-
use arrow::compute::kernels::coalesce::BatchCoalescer;
2120
use datafusion::physical_plan::metrics::Time;
2221
use std::borrow::Borrow;
2322
use std::io::{Cursor, Seek, SeekFrom, Write};
2423

2524
/// Write batches to writer while using a buffer to avoid frequent system calls.
2625
/// The record batches were first written by ShuffleBlockWriter into an internal buffer.
2726
/// Once the buffer exceeds the max size, the buffer will be flushed to the writer.
28-
///
29-
/// Small batches are coalesced using Arrow's [`BatchCoalescer`] before serialization,
30-
/// producing exactly `batch_size`-row output batches to reduce per-block IPC schema overhead.
31-
/// The coalescer is lazily initialized on the first write.
3227
pub(crate) struct BufBatchWriter<S: Borrow<ShuffleBlockWriter>, W: Write> {
3328
shuffle_block_writer: S,
3429
writer: W,
3530
buffer: Vec<u8>,
3631
buffer_max_size: usize,
37-
/// Coalesces small batches into target_batch_size before serialization.
38-
/// Lazily initialized on first write to capture the schema.
39-
coalescer: Option<BatchCoalescer>,
40-
/// Target batch size for coalescing
41-
batch_size: usize,
4232
}
4333

4434
impl<S: Borrow<ShuffleBlockWriter>, W: Write> BufBatchWriter<S, W> {
45-
pub(crate) fn new(
46-
shuffle_block_writer: S,
47-
writer: W,
48-
buffer_max_size: usize,
49-
batch_size: usize,
50-
) -> Self {
35+
pub(crate) fn new(shuffle_block_writer: S, writer: W, buffer_max_size: usize) -> Self {
5136
Self {
5237
shuffle_block_writer,
5338
writer,
5439
buffer: vec![],
5540
buffer_max_size,
56-
coalescer: None,
57-
batch_size,
5841
}
5942
}
6043

@@ -63,32 +46,6 @@ impl<S: Borrow<ShuffleBlockWriter>, W: Write> BufBatchWriter<S, W> {
6346
batch: &RecordBatch,
6447
encode_time: &Time,
6548
write_time: &Time,
66-
) -> datafusion::common::Result<usize> {
67-
let coalescer = self
68-
.coalescer
69-
.get_or_insert_with(|| BatchCoalescer::new(batch.schema(), self.batch_size));
70-
coalescer.push_batch(batch.clone())?;
71-
72-
// Drain completed batches into a local vec so the coalescer borrow ends
73-
// before we call write_batch_to_buffer (which borrows &mut self).
74-
let mut completed = Vec::new();
75-
while let Some(batch) = coalescer.next_completed_batch() {
76-
completed.push(batch);
77-
}
78-
79-
let mut bytes_written = 0;
80-
for batch in &completed {
81-
bytes_written += self.write_batch_to_buffer(batch, encode_time, write_time)?;
82-
}
83-
Ok(bytes_written)
84-
}
85-
86-
/// Serialize a single batch into the byte buffer, flushing to the writer if needed.
87-
fn write_batch_to_buffer(
88-
&mut self,
89-
batch: &RecordBatch,
90-
encode_time: &Time,
91-
write_time: &Time,
9249
) -> datafusion::common::Result<usize> {
9350
let mut cursor = Cursor::new(&mut self.buffer);
9451
cursor.seek(SeekFrom::End(0))?;
@@ -106,24 +63,7 @@ impl<S: Borrow<ShuffleBlockWriter>, W: Write> BufBatchWriter<S, W> {
10663
Ok(bytes_written)
10764
}
10865

109-
pub(crate) fn flush(
110-
&mut self,
111-
encode_time: &Time,
112-
write_time: &Time,
113-
) -> datafusion::common::Result<()> {
114-
// Finish any remaining buffered rows in the coalescer
115-
let mut remaining = Vec::new();
116-
if let Some(coalescer) = &mut self.coalescer {
117-
coalescer.finish_buffered_batch()?;
118-
while let Some(batch) = coalescer.next_completed_batch() {
119-
remaining.push(batch);
120-
}
121-
}
122-
for batch in &remaining {
123-
self.write_batch_to_buffer(batch, encode_time, write_time)?;
124-
}
125-
126-
// Flush the byte buffer to the underlying writer
66+
pub(crate) fn flush(&mut self, write_time: &Time) -> datafusion::common::Result<()> {
12767
let mut write_timer = write_time.timer();
12868
if !self.buffer.is_empty() {
12969
self.writer.write_all(&self.buffer)?;

native/core/src/execution/shuffle/writers/partition_writer.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ impl PartitionWriter {
7979
runtime: &RuntimeEnv,
8080
metrics: &ShufflePartitionerMetrics,
8181
write_buffer_size: usize,
82-
batch_size: usize,
8382
) -> datafusion::common::Result<usize> {
8483
if let Some(batch) = iter.next() {
8584
self.ensure_spill_file_created(runtime)?;
@@ -89,7 +88,6 @@ impl PartitionWriter {
8988
&mut self.shuffle_block_writer,
9089
&mut self.spill_file.as_mut().unwrap().file,
9190
write_buffer_size,
92-
batch_size,
9391
);
9492
let mut bytes_written =
9593
buf_batch_writer.write(&batch?, &metrics.encode_time, &metrics.write_time)?;
@@ -101,7 +99,7 @@ impl PartitionWriter {
10199
&metrics.write_time,
102100
)?;
103101
}
104-
buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?;
102+
buf_batch_writer.flush(&metrics.write_time)?;
105103
bytes_written
106104
};
107105

0 commit comments

Comments
 (0)