diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index d50404c8fc1e8..a43ecd49243ab 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -421,6 +421,7 @@ enum BatchPartitionerState { exprs: Vec>, num_partitions: usize, hash_buffer: Vec, + indices: Vec>, }, RoundRobin { num_partitions: usize, @@ -453,6 +454,7 @@ impl BatchPartitioner { exprs, num_partitions, hash_buffer: vec![], + indices: vec![vec![]; num_partitions], }, timer, } @@ -562,6 +564,7 @@ impl BatchPartitioner { exprs, num_partitions: partitions, hash_buffer, + indices, } => { // Tracking time required for distributing indexes across output partitions let timer = self.timer.timer(); @@ -578,9 +581,7 @@ impl BatchPartitioner { hash_buffer, )?; - let mut indices: Vec<_> = (0..*partitions) - .map(|_| Vec::with_capacity(batch.num_rows())) - .collect(); + indices.iter_mut().for_each(|v| v.clear()); for (index, hash) in hash_buffer.iter().enumerate() { indices[(*hash % *partitions as u64) as usize].push(index as u32); @@ -591,22 +592,23 @@ impl BatchPartitioner { // Borrowing partitioner timer to prevent moving `self` to closure let partitioner_timer = &self.timer; - let it = indices - .into_iter() - .enumerate() - .filter_map(|(partition, indices)| { - let indices: PrimitiveArray = indices.into(); - (!indices.is_empty()).then_some((partition, indices)) - }) - .map(move |(partition, indices)| { + + let mut partitioned_batches = vec![]; + for (partition, p_indices) in indices.iter_mut().enumerate() { + if !p_indices.is_empty() { + let taken_indices = std::mem::take(p_indices); + let indices_array: PrimitiveArray = + taken_indices.into(); + // Tracking time required for repartitioned batches construction let _timer = partitioner_timer.timer(); // Produce batches based on indices - let columns = take_arrays(batch.columns(), &indices, None)?; + let columns = + take_arrays(batch.columns(), &indices_array, None)?; let mut options = RecordBatchOptions::new(); - options = options.with_row_count(Some(indices.len())); + options = options.with_row_count(Some(indices_array.len())); let batch = RecordBatch::try_new_with_options( batch.schema(), columns, @@ -614,10 +616,22 @@ impl BatchPartitioner { ) .unwrap(); - Ok((partition, batch)) - }); + partitioned_batches.push(Ok((partition, batch))); + + // Return the taken vec + let (_, buffer, _) = indices_array.into_parts(); + let mut vec = + buffer.into_inner().into_vec::().map_err(|e| { + DataFusionError::Internal(format!( + "Could not convert buffer to vec: {e:?}" + )) + })?; + vec.clear(); + *p_indices = vec; + } + } - Box::new(it) + Box::new(partitioned_batches.into_iter()) } };