Skip to content

Commit c2ba087

Browse files
authored
Avoid skew in Roundrobin repartition (#18880)
## Which issue does this PR close? - Closes #18883 ## Rationale for this change This makes roundrobin repartition more fairly distributed. The benchmarks probably don't reflect this as much (maybe on very high core counts?), as the partitioning already mostly happens at the source side. ## What changes are included in this PR? Set start partition based on input partition. ## Are these changes tested? Existing tests ## Are there any user-facing changes?
1 parent fc77be9 commit c2ba087

File tree

2 files changed

+32
-16
lines changed

2 files changed

+32
-16
lines changed

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -878,9 +878,9 @@ mod test {
878878
partition_row_counts.push(total_rows);
879879
}
880880
assert_eq!(partition_row_counts.len(), 3);
881-
assert_eq!(partition_row_counts[0], 2);
881+
assert_eq!(partition_row_counts[0], 1);
882882
assert_eq!(partition_row_counts[1], 2);
883-
assert_eq!(partition_row_counts[2], 0);
883+
assert_eq!(partition_row_counts[2], 1);
884884

885885
Ok(())
886886
}

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,9 @@ impl RepartitionExecState {
382382
txs,
383383
partitioning.clone(),
384384
metrics,
385+
// preserve_order depends on partition index to start from 0
386+
if preserve_order { 0 } else { i },
387+
num_input_partitions,
385388
));
386389

387390
// In a separate task, wait for each input to be done
@@ -428,12 +431,19 @@ impl BatchPartitioner {
428431
/// Create a new [`BatchPartitioner`] with the provided [`Partitioning`]
429432
///
430433
/// The time spent repartitioning will be recorded to `timer`
431-
pub fn try_new(partitioning: Partitioning, timer: metrics::Time) -> Result<Self> {
434+
pub fn try_new(
435+
partitioning: Partitioning,
436+
timer: metrics::Time,
437+
input_partition: usize,
438+
num_input_partitions: usize,
439+
) -> Result<Self> {
432440
let state = match partitioning {
433441
Partitioning::RoundRobinBatch(num_partitions) => {
434442
BatchPartitionerState::RoundRobin {
435443
num_partitions,
436-
next_idx: 0,
444+
// Distribute starting index evenly based on input partition, number of input partitions and number of partitions
445+
// to avoid they all start at partition 0 and heavily skew on the lower partitions
446+
next_idx: ((input_partition * num_partitions) / num_input_partitions),
437447
}
438448
}
439449
Partitioning::Hash(exprs, num_partitions) => BatchPartitionerState::Hash {
@@ -1199,9 +1209,15 @@ impl RepartitionExec {
11991209
mut output_channels: HashMap<usize, OutputChannel>,
12001210
partitioning: Partitioning,
12011211
metrics: RepartitionMetrics,
1212+
input_partition: usize,
1213+
num_input_partitions: usize,
12021214
) -> Result<()> {
1203-
let mut partitioner =
1204-
BatchPartitioner::try_new(partitioning, metrics.repartition_time.clone())?;
1215+
let mut partitioner = BatchPartitioner::try_new(
1216+
partitioning,
1217+
metrics.repartition_time.clone(),
1218+
input_partition,
1219+
num_input_partitions,
1220+
)?;
12051221

12061222
// While there are still outputs to send to, keep pulling inputs
12071223
let mut batches_until_yield = partitioner.num_partitions();
@@ -1865,16 +1881,16 @@ mod tests {
18651881
// output stream 1 should *not* error and have one of the input batches
18661882
let batches = crate::common::collect(output_stream1).await.unwrap();
18671883

1868-
assert_snapshot!(batches_to_sort_string(&batches), @r#"
1869-
+------------------+
1870-
| my_awesome_field |
1871-
+------------------+
1872-
| baz |
1873-
| frob |
1874-
| gaz |
1875-
| grob |
1876-
+------------------+
1877-
"#);
1884+
assert_snapshot!(batches_to_sort_string(&batches), @r"
1885+
+------------------+
1886+
| my_awesome_field |
1887+
+------------------+
1888+
| baz |
1889+
| frob |
1890+
| gar |
1891+
| goo |
1892+
+------------------+
1893+
");
18781894
}
18791895

18801896
#[tokio::test]

0 commit comments

Comments
 (0)