Skip to content

Commit 88525e3

Browse files
authored
fix: Shuffle should maintain insertion order (#1660)
1 parent 739a9f1 commit 88525e3

File tree

1 file changed

+10
-10
lines changed

1 file changed

+10
-10
lines changed

native/core/src/execution/shuffle/shuffle_writer.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -686,16 +686,6 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
686686
for i in 0..num_output_partitions {
687687
offsets[i] = output_data.stream_position()?;
688688

689-
// Write in memory batches to output data file
690-
let mut partition_iter = partitioned_batches.produce(i);
691-
Self::shuffle_write_partition(
692-
&mut partition_iter,
693-
&mut self.shuffle_block_writer,
694-
&mut output_data,
695-
&self.metrics.encode_time,
696-
&self.metrics.write_time,
697-
)?;
698-
699689
// if we wrote a spill file for this partition then copy the
700690
// contents into the shuffle file
701691
if let Some(spill_data) = self.partition_writers[i].spill_file.as_ref() {
@@ -705,6 +695,16 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
705695
std::io::copy(&mut spill_file, &mut output_data).map_err(to_df_err)?;
706696
write_timer.stop();
707697
}
698+
699+
// Write in memory batches to output data file
700+
let mut partition_iter = partitioned_batches.produce(i);
701+
Self::shuffle_write_partition(
702+
&mut partition_iter,
703+
&mut self.shuffle_block_writer,
704+
&mut output_data,
705+
&self.metrics.encode_time,
706+
&self.metrics.write_time,
707+
)?;
708708
}
709709

710710
let mut write_timer = self.metrics.write_time.timer();

0 commit comments

Comments
 (0)