Skip to content

Commit fb56dfe

Browse files
committed
Avoid repartition skew
1 parent e6d1773 commit fb56dfe

File tree

2 files changed

+35
-16
lines changed

2 files changed

+35
-16
lines changed

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -878,9 +878,9 @@ mod test {
878878
partition_row_counts.push(total_rows);
879879
}
880880
assert_eq!(partition_row_counts.len(), 3);
881-
assert_eq!(partition_row_counts[0], 2);
881+
assert_eq!(partition_row_counts[0], 1);
882882
assert_eq!(partition_row_counts[1], 2);
883-
assert_eq!(partition_row_counts[2], 0);
883+
assert_eq!(partition_row_counts[2], 1);
884884

885885
Ok(())
886886
}

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,9 @@ impl RepartitionExecState {
382382
txs,
383383
partitioning.clone(),
384384
metrics,
385+
// preser_order dep
386+
if preserve_order { 0 } else { i },
387+
num_input_partitions,
385388
));
386389

387390
// In a separate task, wait for each input to be done
@@ -428,12 +431,22 @@ impl BatchPartitioner {
428431
/// Create a new [`BatchPartitioner`] with the provided [`Partitioning`]
429432
///
430433
/// The time spent repartitioning will be recorded to `timer`
431-
pub fn try_new(partitioning: Partitioning, timer: metrics::Time) -> Result<Self> {
434+
pub fn try_new(
435+
partitioning: Partitioning,
436+
timer: metrics::Time,
437+
input_partition: usize,
438+
num_input_partitions: usize,
439+
) -> Result<Self> {
432440
let state = match partitioning {
433441
Partitioning::RoundRobinBatch(num_partitions) => {
434442
BatchPartitionerState::RoundRobin {
435443
num_partitions,
436-
next_idx: 0,
444+
// Distribute starting index evenly based on input partition, number of input partitions and number of partitions
445+
// to avoid they all start at partition 0 and heavily skew on the lower partitions
446+
next_idx: (input_partition as f64
447+
* (num_partitions as f64 / num_input_partitions as f64))
448+
as usize
449+
% num_partitions,
437450
}
438451
}
439452
Partitioning::Hash(exprs, num_partitions) => BatchPartitionerState::Hash {
@@ -1196,9 +1209,15 @@ impl RepartitionExec {
11961209
mut output_channels: HashMap<usize, OutputChannel>,
11971210
partitioning: Partitioning,
11981211
metrics: RepartitionMetrics,
1212+
input_partition: usize,
1213+
num_input_partitions: usize,
11991214
) -> Result<()> {
1200-
let mut partitioner =
1201-
BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())?;
1215+
let mut partitioner = BatchPartitioner::try_new(
1216+
partitioning,
1217+
metrics.repartition_time.clone(),
1218+
input_partition,
1219+
num_input_partitions,
1220+
)?;
12021221

12031222
// While there are still outputs to send to, keep pulling inputs
12041223
let mut batches_until_yield = partitioner.num_partitions();
@@ -1862,16 +1881,16 @@ mod tests {
18621881
// output stream 1 should *not* error and have one of the input batches
18631882
let batches = crate::common::collect(output_stream1).await.unwrap();
18641883

1865-
assert_snapshot!(batches_to_sort_string(&batches), @r#"
1866-
+------------------+
1867-
| my_awesome_field |
1868-
+------------------+
1869-
| baz |
1870-
| frob |
1871-
| gaz |
1872-
| grob |
1873-
+------------------+
1874-
"#);
1884+
assert_snapshot!(batches_to_sort_string(&batches), @r"
1885+
+------------------+
1886+
| my_awesome_field |
1887+
+------------------+
1888+
| baz |
1889+
| frob |
1890+
| gar |
1891+
| goo |
1892+
+------------------+
1893+
");
18751894
}
18761895

18771896
#[tokio::test]

0 commit comments

Comments
 (0)