Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 94 additions & 23 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,33 +434,90 @@ pub const REPARTITION_RANDOM_STATE: SeededRandomState =
SeededRandomState::with_seeds(0, 0, 0, 0);

impl BatchPartitioner {
/// Create a new [`BatchPartitioner`] with the provided [`Partitioning`]
/// Create a new [`BatchPartitioner`] for hash-based repartitioning.
///
/// The time spent repartitioning will be recorded to `timer`
/// # Parameters
/// - `exprs`: Expressions used to compute the hash for each input row.
/// - `num_partitions`: Total number of output partitions.
/// - `timer`: Metric used to record time spent during repartitioning.
///
/// # Notes
/// This constructor cannot fail and performs no validation.
pub fn new_hash_partitioner(
exprs: Vec<Arc<dyn PhysicalExpr>>,
num_partitions: usize,
timer: metrics::Time,
) -> Self {
Self {
state: BatchPartitionerState::Hash {
exprs,
num_partitions,
hash_buffer: vec![],
},
timer,
}
}

/// Create a new [`BatchPartitioner`] for round-robin repartitioning.
///
/// # Parameters
/// - `num_partitions`: Total number of output partitions.
/// - `timer`: Metric used to record time spent during repartitioning.
/// - `input_partition`: Index of the current input partition.
/// - `num_input_partitions`: Total number of input partitions.
///
/// # Notes
/// The starting output partition is derived from the input partition
/// to avoid skew when multiple input partitions are used.
pub fn new_round_robin_partitioner(
num_partitions: usize,
timer: metrics::Time,
input_partition: usize,
num_input_partitions: usize,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

num_input_partitions == 0 would lead to a panic below (division by 0). It would be good to prevent this or at least document it as a non-supported value.

) -> Self {
Self {
state: BatchPartitionerState::RoundRobin {
num_partitions,
next_idx: (input_partition * num_partitions) / num_input_partitions,
},
timer,
}
}
/// Create a new [`BatchPartitioner`] based on the provided [`Partitioning`] scheme.
///
/// This is a convenience constructor that delegates to the specialized
/// hash or round-robin constructors depending on the partitioning variant.
///
/// # Parameters
/// - `partitioning`: Partitioning scheme to apply (hash or round-robin).
/// - `timer`: Metric used to record time spent during repartitioning.
/// - `input_partition`: Index of the current input partition.
/// - `num_input_partitions`: Total number of input partitions.
///
/// # Errors
/// Returns an error if the provided partitioning scheme is not supported.
pub fn try_new(
partitioning: Partitioning,
timer: metrics::Time,
input_partition: usize,
num_input_partitions: usize,
) -> Result<Self> {
let state = match partitioning {
match partitioning {
Partitioning::Hash(exprs, num_partitions) => {
Ok(Self::new_hash_partitioner(exprs, num_partitions, timer))
}
Partitioning::RoundRobinBatch(num_partitions) => {
BatchPartitionerState::RoundRobin {
Ok(Self::new_round_robin_partitioner(
num_partitions,
// Distribute starting index evenly based on input partition, number of input partitions and number of partitions
// to avoid they all start at partition 0 and heavily skew on the lower partitions
next_idx: ((input_partition * num_partitions) / num_input_partitions),
}
timer,
input_partition,
num_input_partitions,
))
}
Partitioning::Hash(exprs, num_partitions) => BatchPartitionerState::Hash {
exprs,
num_partitions,
hash_buffer: vec![],
},
other => return not_impl_err!("Unsupported repartitioning scheme {other:?}"),
};

Ok(Self { state, timer })
other => {
not_impl_err!("Unsupported repartitioning scheme {other:?}")
}
}
}

/// Partition the provided [`RecordBatch`] into one or more partitioned [`RecordBatch`]
Expand Down Expand Up @@ -1245,12 +1302,26 @@ impl RepartitionExec {
input_partition: usize,
num_input_partitions: usize,
) -> Result<()> {
let mut partitioner = BatchPartitioner::try_new(
partitioning,
metrics.repartition_time.clone(),
input_partition,
num_input_partitions,
)?;
let mut partitioner = match &partitioning {
Partitioning::Hash(exprs, num_partitions) => {
BatchPartitioner::new_hash_partitioner(
exprs.clone(),
*num_partitions,
metrics.repartition_time.clone(),
)
}
Partitioning::RoundRobinBatch(num_partitions) => {
BatchPartitioner::new_round_robin_partitioner(
*num_partitions,
metrics.repartition_time.clone(),
input_partition,
num_input_partitions,
)
}
other => {
return not_impl_err!("Unsupported repartitioning scheme {other:?}");
}
};

// While there are still outputs to send to, keep pulling inputs
let mut batches_until_yield = partitioner.num_partitions();
Expand Down