Skip to content

Commit 680ddcc

Browse files
mohit7705Your Name
andauthored
feat: split BatchPartitioner::try_new into hash and round-robin constructors (#19668)
### Which issue does this PR close? Closes #19664 --- ### Rationale for this change After #18880, `BatchPartitioner::try_new` gained additional parameters that are only relevant for round-robin repartitioning. This made the constructor API confusing, as hash repartitioning received parameters it does not use. Splitting the constructor improves clarity and avoids passing round-robin–specific parameters to hash partitioning. --- ### What changes are included in this PR? - Introduce `BatchPartitioner::try_new_hash` - Introduce `BatchPartitioner::try_new_round_robin` - Refactor callers to use the specialized constructors - Retain `BatchPartitioner::try_new` as a delegator for backward compatibility This is a pure refactor; behavior is unchanged. --- ### Are these changes tested? Yes. Existing tests cover this code path. All builds pass locally. --- ### Are there any user-facing changes? No. This change is internal only and does not affect user-facing behavior or APIs. --------- Co-authored-by: Your Name <[email protected]>
1 parent ce08307 commit 680ddcc

File tree

1 file changed

+94
-23
lines changed
  • datafusion/physical-plan/src/repartition

1 file changed

+94
-23
lines changed

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 94 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -434,33 +434,90 @@ pub const REPARTITION_RANDOM_STATE: SeededRandomState =
434434
SeededRandomState::with_seeds(0, 0, 0, 0);
435435

436436
impl BatchPartitioner {
437-
/// Create a new [`BatchPartitioner`] with the provided [`Partitioning`]
437+
/// Create a new [`BatchPartitioner`] for hash-based repartitioning.
438438
///
439-
/// The time spent repartitioning will be recorded to `timer`
439+
/// # Parameters
440+
/// - `exprs`: Expressions used to compute the hash for each input row.
441+
/// - `num_partitions`: Total number of output partitions.
442+
/// - `timer`: Metric used to record time spent during repartitioning.
443+
///
444+
/// # Notes
445+
/// This constructor cannot fail and performs no validation.
446+
pub fn new_hash_partitioner(
447+
exprs: Vec<Arc<dyn PhysicalExpr>>,
448+
num_partitions: usize,
449+
timer: metrics::Time,
450+
) -> Self {
451+
Self {
452+
state: BatchPartitionerState::Hash {
453+
exprs,
454+
num_partitions,
455+
hash_buffer: vec![],
456+
},
457+
timer,
458+
}
459+
}
460+
461+
/// Create a new [`BatchPartitioner`] for round-robin repartitioning.
462+
///
463+
/// # Parameters
464+
/// - `num_partitions`: Total number of output partitions.
465+
/// - `timer`: Metric used to record time spent during repartitioning.
466+
/// - `input_partition`: Index of the current input partition.
467+
/// - `num_input_partitions`: Total number of input partitions.
468+
///
469+
/// # Notes
470+
/// The starting output partition is derived from the input partition
471+
/// to avoid skew when multiple input partitions are used.
472+
pub fn new_round_robin_partitioner(
473+
num_partitions: usize,
474+
timer: metrics::Time,
475+
input_partition: usize,
476+
num_input_partitions: usize,
477+
) -> Self {
478+
Self {
479+
state: BatchPartitionerState::RoundRobin {
480+
num_partitions,
481+
next_idx: (input_partition * num_partitions) / num_input_partitions,
482+
},
483+
timer,
484+
}
485+
}
486+
/// Create a new [`BatchPartitioner`] based on the provided [`Partitioning`] scheme.
487+
///
488+
/// This is a convenience constructor that delegates to the specialized
489+
/// hash or round-robin constructors depending on the partitioning variant.
490+
///
491+
/// # Parameters
492+
/// - `partitioning`: Partitioning scheme to apply (hash or round-robin).
493+
/// - `timer`: Metric used to record time spent during repartitioning.
494+
/// - `input_partition`: Index of the current input partition.
495+
/// - `num_input_partitions`: Total number of input partitions.
496+
///
497+
/// # Errors
498+
/// Returns an error if the provided partitioning scheme is not supported.
440499
pub fn try_new(
441500
partitioning: Partitioning,
442501
timer: metrics::Time,
443502
input_partition: usize,
444503
num_input_partitions: usize,
445504
) -> Result<Self> {
446-
let state = match partitioning {
505+
match partitioning {
506+
Partitioning::Hash(exprs, num_partitions) => {
507+
Ok(Self::new_hash_partitioner(exprs, num_partitions, timer))
508+
}
447509
Partitioning::RoundRobinBatch(num_partitions) => {
448-
BatchPartitionerState::RoundRobin {
510+
Ok(Self::new_round_robin_partitioner(
449511
num_partitions,
450-
// Distribute starting index evenly based on input partition, number of input partitions and number of partitions
451-
// to avoid they all start at partition 0 and heavily skew on the lower partitions
452-
next_idx: ((input_partition * num_partitions) / num_input_partitions),
453-
}
512+
timer,
513+
input_partition,
514+
num_input_partitions,
515+
))
454516
}
455-
Partitioning::Hash(exprs, num_partitions) => BatchPartitionerState::Hash {
456-
exprs,
457-
num_partitions,
458-
hash_buffer: vec![],
459-
},
460-
other => return not_impl_err!("Unsupported repartitioning scheme {other:?}"),
461-
};
462-
463-
Ok(Self { state, timer })
517+
other => {
518+
not_impl_err!("Unsupported repartitioning scheme {other:?}")
519+
}
520+
}
464521
}
465522

466523
/// Partition the provided [`RecordBatch`] into one or more partitioned [`RecordBatch`]
@@ -1245,12 +1302,26 @@ impl RepartitionExec {
12451302
input_partition: usize,
12461303
num_input_partitions: usize,
12471304
) -> Result<()> {
1248-
let mut partitioner = BatchPartitioner::try_new(
1249-
partitioning,
1250-
metrics.repartition_time.clone(),
1251-
input_partition,
1252-
num_input_partitions,
1253-
)?;
1305+
let mut partitioner = match &partitioning {
1306+
Partitioning::Hash(exprs, num_partitions) => {
1307+
BatchPartitioner::new_hash_partitioner(
1308+
exprs.clone(),
1309+
*num_partitions,
1310+
metrics.repartition_time.clone(),
1311+
)
1312+
}
1313+
Partitioning::RoundRobinBatch(num_partitions) => {
1314+
BatchPartitioner::new_round_robin_partitioner(
1315+
*num_partitions,
1316+
metrics.repartition_time.clone(),
1317+
input_partition,
1318+
num_input_partitions,
1319+
)
1320+
}
1321+
other => {
1322+
return not_impl_err!("Unsupported repartitioning scheme {other:?}");
1323+
}
1324+
};
12541325

12551326
// While there are still outputs to send to, keep pulling inputs
12561327
let mut batches_until_yield = partitioner.num_partitions();

0 commit comments

Comments
 (0)