Skip to content

Commit f286c64

Browse files
committed
remove roundrobin repartition
1 parent 910029d commit f286c64

File tree

1 file changed

+22
-26
lines changed

1 file changed

+22
-26
lines changed

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1056,12 +1056,12 @@ fn replace_order_preserving_variants(
10561056
struct RepartitionRequirementStatus {
10571057
/// The distribution requirement for the node.
10581058
requirement: Distribution,
1059-
/// Designates whether round robin partitioning is theoretically beneficial;
1059+
/// Designates whether repartitioning is theoretically beneficial;
10601060
/// i.e. the operator can actually utilize parallelism.
1061-
roundrobin_beneficial: bool,
1062-
/// Designates whether round robin partitioning is beneficial according to
1061+
repartition_beneficial: bool,
1062+
/// Designates whether repartitioning is beneficial according to
10631063
/// the statistical information we have on the number of rows.
1064-
roundrobin_beneficial_stats: bool,
1064+
repartition_beneficial_stats: bool,
10651065
/// Designates whether hash partitioning is necessary.
10661066
hash_necessary: bool,
10671067
}
@@ -1104,15 +1104,17 @@ fn get_repartition_requirement_status(
11041104
) -> Result<Vec<RepartitionRequirementStatus>> {
11051105
let mut needs_alignment = false;
11061106
let children = plan.children();
1107-
let rr_beneficial = plan.benefits_from_input_partitioning();
1107+
let is_increased_parallelization_beneficial = plan.benefits_from_input_partitioning();
11081108
let requirements = plan.required_input_distribution();
11091109
let mut repartition_status_flags = vec![];
1110-
for (child, requirement, roundrobin_beneficial) in
1111-
izip!(children.into_iter(), requirements, rr_beneficial)
1112-
{
1113-
// Decide whether adding a round robin is beneficial depending on
1110+
for (child, requirement, repartition_beneficial) in izip!(
1111+
children.into_iter(),
1112+
requirements,
1113+
is_increased_parallelization_beneficial
1114+
) {
1115+
// Decide whether adding a repartition is beneficial depending on
11141116
// the statistical information we have on the number of rows:
1115-
let roundrobin_beneficial_stats = match child.statistics()?.num_rows {
1117+
let repartition_beneficial_stats = match child.statistics()?.num_rows {
11161118
Precision::Exact(n_rows) => n_rows > batch_size,
11171119
Precision::Inexact(n_rows) => !should_use_estimates || (n_rows > batch_size),
11181120
Precision::Absent => true,
@@ -1121,14 +1123,14 @@ fn get_repartition_requirement_status(
11211123
// Hash re-partitioning is necessary when the input has more than one
11221124
// partitions:
11231125
let multi_partitions = child.output_partitioning().partition_count() > 1;
1124-
let roundrobin_sensible = roundrobin_beneficial && roundrobin_beneficial_stats;
1125-
needs_alignment |= is_hash && (multi_partitions || roundrobin_sensible);
1126+
let repartition_sensible = repartition_beneficial && repartition_beneficial_stats;
1127+
needs_alignment |= is_hash && (multi_partitions || repartition_sensible);
11261128
repartition_status_flags.push((
11271129
is_hash,
11281130
RepartitionRequirementStatus {
11291131
requirement,
1130-
roundrobin_beneficial,
1131-
roundrobin_beneficial_stats,
1132+
repartition_beneficial,
1133+
repartition_beneficial_stats,
11321134
hash_necessary: is_hash && multi_partitions,
11331135
},
11341136
));
@@ -1230,21 +1232,20 @@ pub fn ensure_distribution(
12301232
maintains,
12311233
RepartitionRequirementStatus {
12321234
requirement,
1233-
roundrobin_beneficial,
1234-
roundrobin_beneficial_stats,
1235+
repartition_beneficial,
1236+
repartition_beneficial_stats,
12351237
hash_necessary,
12361238
},
12371239
)| {
1238-
let add_roundrobin = enable_round_robin
1240+
let add_repartition = repartition_beneficial
12391241
// Operator benefits from partitioning (e.g. filter):
1240-
&& roundrobin_beneficial
1241-
&& roundrobin_beneficial_stats
1242+
&& repartition_beneficial_stats
12421243
// Unless partitioning increases the partition count, it is not beneficial:
12431244
&& child.plan.output_partitioning().partition_count() < target_partitions;
12441245

12451246
// When `repartition_file_scans` is set, attempt to increase
12461247
// parallelism at the source.
1247-
if repartition_file_scans && roundrobin_beneficial_stats {
1248+
if repartition_file_scans && repartition_beneficial_stats {
12481249
if let Some(new_child) =
12491250
child.plan.repartitioned(target_partitions, config)?
12501251
{
@@ -1258,19 +1259,14 @@ pub fn ensure_distribution(
12581259
child = add_spm_on_top(child);
12591260
}
12601261
Distribution::HashPartitioned(exprs) => {
1261-
if add_roundrobin {
1262-
// Add round-robin repartitioning on top of the operator
1263-
// to increase parallelism.
1264-
child = add_roundrobin_on_top(child, target_partitions)?;
1265-
}
12661262
// When inserting hash is necessary to satisfy hash requirement, insert hash repartition.
12671263
if hash_necessary {
12681264
child =
12691265
add_hash_on_top(child, exprs.to_vec(), target_partitions)?;
12701266
}
12711267
}
12721268
Distribution::UnspecifiedDistribution => {
1273-
if add_roundrobin {
1269+
if add_repartition && enable_round_robin {
12741270
// Add round-robin repartitioning on top of the operator
12751271
// to increase parallelism.
12761272
child = add_roundrobin_on_top(child, target_partitions)?;

0 commit comments

Comments
 (0)