Skip to content

Commit 6553faf

Browse files
authored
[Minor] Remove redundant member from RepartitionExec (#12638)
* Initial commit * Use util get_array_ref * Resolve linter errors
1 parent 84e9ce8 commit 6553faf

File tree

1 file changed

+19
-27
lines changed
  • datafusion/physical-plan/src/repartition

1 file changed

+19
-27
lines changed

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,11 @@ use crate::sorts::streaming_merge;
3838
use crate::stream::RecordBatchStreamAdapter;
3939
use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics};
4040

41-
use arrow::array::ArrayRef;
42-
use arrow::datatypes::{SchemaRef, UInt64Type};
41+
use arrow::datatypes::{SchemaRef, UInt32Type};
4342
use arrow::record_batch::RecordBatch;
4443
use arrow_array::{PrimitiveArray, RecordBatchOptions};
45-
use datafusion_common::utils::transpose;
46-
use datafusion_common::{arrow_datafusion_err, not_impl_err, DataFusionError, Result};
44+
use datafusion_common::utils::{get_arrayref_at_indices, transpose};
45+
use datafusion_common::{not_impl_err, DataFusionError, Result};
4746
use datafusion_common_runtime::SpawnedTask;
4847
use datafusion_execution::memory_pool::MemoryConsumer;
4948
use datafusion_execution::TaskContext;
@@ -280,7 +279,7 @@ impl BatchPartitioner {
280279
.collect();
281280

282281
for (index, hash) in hash_buffer.iter().enumerate() {
283-
indices[(*hash % *partitions as u64) as usize].push(index as u64);
282+
indices[(*hash % *partitions as u64) as usize].push(index as u32);
284283
}
285284

286285
// Finished building index-arrays for output partitions
@@ -292,22 +291,16 @@ impl BatchPartitioner {
292291
.into_iter()
293292
.enumerate()
294293
.filter_map(|(partition, indices)| {
295-
let indices: PrimitiveArray<UInt64Type> = indices.into();
294+
let indices: PrimitiveArray<UInt32Type> = indices.into();
296295
(!indices.is_empty()).then_some((partition, indices))
297296
})
298297
.map(move |(partition, indices)| {
299298
// Tracking time required for repartitioned batches construction
300299
let _timer = partitioner_timer.timer();
301300

302301
// Produce batches based on indices
303-
let columns = batch
304-
.columns()
305-
.iter()
306-
.map(|c| {
307-
arrow::compute::take(c.as_ref(), &indices, None)
308-
.map_err(|e| arrow_datafusion_err!(e))
309-
})
310-
.collect::<Result<Vec<ArrayRef>>>()?;
302+
let columns =
303+
get_arrayref_at_indices(batch.columns(), &indices)?;
311304

312305
let mut options = RecordBatchOptions::new();
313306
options = options.with_row_count(Some(indices.len()));
@@ -403,8 +396,6 @@ impl BatchPartitioner {
403396
pub struct RepartitionExec {
404397
/// Input execution plan
405398
input: Arc<dyn ExecutionPlan>,
406-
/// Partitioning scheme to use
407-
partitioning: Partitioning,
408399
/// Inner state that is initialized when the first output stream is created.
409400
state: LazyState,
410401
/// Execution metrics
@@ -469,7 +460,7 @@ impl RepartitionExec {
469460

470461
/// Partitioning scheme to use
471462
pub fn partitioning(&self) -> &Partitioning {
472-
&self.partitioning
463+
&self.cache.partitioning
473464
}
474465

475466
/// Get preserve_order flag of the RepartitionExecutor
@@ -496,7 +487,7 @@ impl DisplayAs for RepartitionExec {
496487
f,
497488
"{}: partitioning={}, input_partitions={}",
498489
self.name(),
499-
self.partitioning,
490+
self.partitioning(),
500491
self.input.output_partitioning().partition_count()
501492
)?;
502493

@@ -539,16 +530,18 @@ impl ExecutionPlan for RepartitionExec {
539530
self: Arc<Self>,
540531
mut children: Vec<Arc<dyn ExecutionPlan>>,
541532
) -> Result<Arc<dyn ExecutionPlan>> {
542-
let mut repartition =
543-
RepartitionExec::try_new(children.swap_remove(0), self.partitioning.clone())?;
533+
let mut repartition = RepartitionExec::try_new(
534+
children.swap_remove(0),
535+
self.partitioning().clone(),
536+
)?;
544537
if self.preserve_order {
545538
repartition = repartition.with_preserve_order();
546539
}
547540
Ok(Arc::new(repartition))
548541
}
549542

550543
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
551-
vec![matches!(self.partitioning, Partitioning::Hash(_, _))]
544+
vec![matches!(self.partitioning(), Partitioning::Hash(_, _))]
552545
}
553546

554547
fn maintains_input_order(&self) -> Vec<bool> {
@@ -568,7 +561,7 @@ impl ExecutionPlan for RepartitionExec {
568561

569562
let lazy_state = Arc::clone(&self.state);
570563
let input = Arc::clone(&self.input);
571-
let partitioning = self.partitioning.clone();
564+
let partitioning = self.partitioning().clone();
572565
let metrics = self.metrics.clone();
573566
let preserve_order = self.preserve_order;
574567
let name = self.name().to_owned();
@@ -687,7 +680,6 @@ impl RepartitionExec {
687680
Self::compute_properties(&input, partitioning.clone(), preserve_order);
688681
Ok(RepartitionExec {
689682
input,
690-
partitioning,
691683
state: Default::default(),
692684
metrics: ExecutionPlanMetricsSet::new(),
693685
preserve_order,
@@ -1027,10 +1019,10 @@ mod tests {
10271019
{collect, expressions::col, memory::MemoryExec},
10281020
};
10291021

1030-
use arrow::array::{StringArray, UInt32Array};
1022+
use arrow::array::{ArrayRef, StringArray, UInt32Array};
10311023
use arrow::datatypes::{DataType, Field, Schema};
10321024
use datafusion_common::cast::as_string_array;
1033-
use datafusion_common::{assert_batches_sorted_eq, exec_err};
1025+
use datafusion_common::{arrow_datafusion_err, assert_batches_sorted_eq, exec_err};
10341026
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
10351027

10361028
use tokio::task::JoinSet;
@@ -1134,7 +1126,7 @@ mod tests {
11341126

11351127
// execute and collect results
11361128
let mut output_partitions = vec![];
1137-
for i in 0..exec.partitioning.partition_count() {
1129+
for i in 0..exec.partitioning().partition_count() {
11381130
// execute this *output* partition and collect all batches
11391131
let mut stream = exec.execute(i, Arc::clone(&task_ctx))?;
11401132
let mut batches = vec![];
@@ -1524,7 +1516,7 @@ mod tests {
15241516
let exec = RepartitionExec::try_new(Arc::new(exec), partitioning)?;
15251517

15261518
// pull partitions
1527-
for i in 0..exec.partitioning.partition_count() {
1519+
for i in 0..exec.partitioning().partition_count() {
15281520
let mut stream = exec.execute(i, Arc::clone(&task_ctx))?;
15291521
let err =
15301522
arrow_datafusion_err!(stream.next().await.unwrap().unwrap_err().into());

0 commit comments

Comments
 (0)