From 62faec380e38a8dd7c7e4c471400e218bf787032 Mon Sep 17 00:00:00 2001 From: Mohit rao Date: Wed, 7 Jan 2026 15:12:11 +0530 Subject: [PATCH] feat: split BatchPartitioner::try_new into hash and round-robin constructors (#19668) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Which issue does this PR close? Closes #19664 --- ### Rationale for this change After #18880, `BatchPartitioner::try_new` gained additional parameters that are only relevant for round-robin repartitioning. This made the constructor API confusing, as hash repartitioning received parameters it does not use. Splitting the constructor improves clarity and avoids passing round-robin–specific parameters to hash partitioning. --- ### What changes are included in this PR? - Introduce `BatchPartitioner::try_new_hash` - Introduce `BatchPartitioner::try_new_round_robin` - Refactor callers to use the specialized constructors - Retain `BatchPartitioner::try_new` as a delegator for backward compatibility This is a pure refactor; behavior is unchanged. --- ### Are these changes tested? Yes. Existing tests cover this code path. All builds pass locally. --- ### Are there any user-facing changes? No. This change is internal only and does not affect user-facing behavior or APIs. --------- Co-authored-by: Your Name --- .../physical-plan/src/repartition/mod.rs | 117 ++++++++++++++---- 1 file changed, 94 insertions(+), 23 deletions(-) diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 1efdaaabc7d6a..d50404c8fc1e8 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -434,33 +434,90 @@ pub const REPARTITION_RANDOM_STATE: SeededRandomState = SeededRandomState::with_seeds(0, 0, 0, 0); impl BatchPartitioner { - /// Create a new [`BatchPartitioner`] with the provided [`Partitioning`] + /// Create a new [`BatchPartitioner`] for hash-based repartitioning. /// - /// The time spent repartitioning will be recorded to `timer` + /// # Parameters + /// - `exprs`: Expressions used to compute the hash for each input row. + /// - `num_partitions`: Total number of output partitions. + /// - `timer`: Metric used to record time spent during repartitioning. + /// + /// # Notes + /// This constructor cannot fail and performs no validation. + pub fn new_hash_partitioner( + exprs: Vec>, + num_partitions: usize, + timer: metrics::Time, + ) -> Self { + Self { + state: BatchPartitionerState::Hash { + exprs, + num_partitions, + hash_buffer: vec![], + }, + timer, + } + } + + /// Create a new [`BatchPartitioner`] for round-robin repartitioning. + /// + /// # Parameters + /// - `num_partitions`: Total number of output partitions. + /// - `timer`: Metric used to record time spent during repartitioning. + /// - `input_partition`: Index of the current input partition. + /// - `num_input_partitions`: Total number of input partitions. + /// + /// # Notes + /// The starting output partition is derived from the input partition + /// to avoid skew when multiple input partitions are used. + pub fn new_round_robin_partitioner( + num_partitions: usize, + timer: metrics::Time, + input_partition: usize, + num_input_partitions: usize, + ) -> Self { + Self { + state: BatchPartitionerState::RoundRobin { + num_partitions, + next_idx: (input_partition * num_partitions) / num_input_partitions, + }, + timer, + } + } + /// Create a new [`BatchPartitioner`] based on the provided [`Partitioning`] scheme. + /// + /// This is a convenience constructor that delegates to the specialized + /// hash or round-robin constructors depending on the partitioning variant. + /// + /// # Parameters + /// - `partitioning`: Partitioning scheme to apply (hash or round-robin). + /// - `timer`: Metric used to record time spent during repartitioning. + /// - `input_partition`: Index of the current input partition. + /// - `num_input_partitions`: Total number of input partitions. + /// + /// # Errors + /// Returns an error if the provided partitioning scheme is not supported. pub fn try_new( partitioning: Partitioning, timer: metrics::Time, input_partition: usize, num_input_partitions: usize, ) -> Result { - let state = match partitioning { + match partitioning { + Partitioning::Hash(exprs, num_partitions) => { + Ok(Self::new_hash_partitioner(exprs, num_partitions, timer)) + } Partitioning::RoundRobinBatch(num_partitions) => { - BatchPartitionerState::RoundRobin { + Ok(Self::new_round_robin_partitioner( num_partitions, - // Distribute starting index evenly based on input partition, number of input partitions and number of partitions - // to avoid they all start at partition 0 and heavily skew on the lower partitions - next_idx: ((input_partition * num_partitions) / num_input_partitions), - } + timer, + input_partition, + num_input_partitions, + )) } - Partitioning::Hash(exprs, num_partitions) => BatchPartitionerState::Hash { - exprs, - num_partitions, - hash_buffer: vec![], - }, - other => return not_impl_err!("Unsupported repartitioning scheme {other:?}"), - }; - - Ok(Self { state, timer }) + other => { + not_impl_err!("Unsupported repartitioning scheme {other:?}") + } + } } /// Partition the provided [`RecordBatch`] into one or more partitioned [`RecordBatch`] @@ -1245,12 +1302,26 @@ impl RepartitionExec { input_partition: usize, num_input_partitions: usize, ) -> Result<()> { - let mut partitioner = BatchPartitioner::try_new( - partitioning, - metrics.repartition_time.clone(), - input_partition, - num_input_partitions, - )?; + let mut partitioner = match &partitioning { + Partitioning::Hash(exprs, num_partitions) => { + BatchPartitioner::new_hash_partitioner( + exprs.clone(), + *num_partitions, + metrics.repartition_time.clone(), + ) + } + Partitioning::RoundRobinBatch(num_partitions) => { + BatchPartitioner::new_round_robin_partitioner( + *num_partitions, + metrics.repartition_time.clone(), + input_partition, + num_input_partitions, + ) + } + other => { + return not_impl_err!("Unsupported repartitioning scheme {other:?}"); + } + }; // While there are still outputs to send to, keep pulling inputs let mut batches_until_yield = partitioner.num_partitions();