Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 30 additions & 16 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ enum BatchPartitionerState {
exprs: Vec<Arc<dyn PhysicalExpr>>,
num_partitions: usize,
hash_buffer: Vec<u64>,
indices: Vec<Vec<u32>>,
},
RoundRobin {
num_partitions: usize,
Expand Down Expand Up @@ -453,6 +454,7 @@ impl BatchPartitioner {
exprs,
num_partitions,
hash_buffer: vec![],
indices: vec![vec![]; num_partitions],
},
timer,
}
Expand Down Expand Up @@ -562,6 +564,7 @@ impl BatchPartitioner {
exprs,
num_partitions: partitions,
hash_buffer,
indices,
} => {
// Tracking time required for distributing indexes across output partitions
let timer = self.timer.timer();
Expand All @@ -578,9 +581,7 @@ impl BatchPartitioner {
hash_buffer,
)?;

let mut indices: Vec<_> = (0..*partitions)
.map(|_| Vec::with_capacity(batch.num_rows()))
.collect();
indices.iter_mut().for_each(|v| v.clear());

for (index, hash) in hash_buffer.iter().enumerate() {
indices[(*hash % *partitions as u64) as usize].push(index as u32);
Expand All @@ -591,33 +592,46 @@ impl BatchPartitioner {

// Borrowing partitioner timer to prevent moving `self` to closure
let partitioner_timer = &self.timer;
let it = indices
.into_iter()
.enumerate()
.filter_map(|(partition, indices)| {
let indices: PrimitiveArray<UInt32Type> = indices.into();
(!indices.is_empty()).then_some((partition, indices))
})
.map(move |(partition, indices)| {

let mut partitioned_batches = vec![];
for (partition, p_indices) in indices.iter_mut().enumerate() {
if !p_indices.is_empty() {
let taken_indices = std::mem::take(p_indices);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just wondering if we can use drain https://doc.rust-lang.org/std/vec/struct.Vec.html#method.drain so you can get rid of clearing them?

let indices_array: PrimitiveArray<UInt32Type> =
taken_indices.into();

// Tracking time required for repartitioned batches construction
let _timer = partitioner_timer.timer();

// Produce batches based on indices
let columns = take_arrays(batch.columns(), &indices, None)?;
let columns =
take_arrays(batch.columns(), &indices_array, None)?;

let mut options = RecordBatchOptions::new();
options = options.with_row_count(Some(indices.len()));
options = options.with_row_count(Some(indices_array.len()));
let batch = RecordBatch::try_new_with_options(
batch.schema(),
columns,
&options,
)
.unwrap();

Ok((partition, batch))
});
partitioned_batches.push(Ok((partition, batch)));

// Return the taken vec
let (_, buffer, _) = indices_array.into_parts();
let mut vec =
buffer.into_inner().into_vec::<u32>().map_err(|e| {
DataFusionError::Internal(format!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
DataFusionError::Internal(format!(
internal_datafusion_err!(

"Could not convert buffer to vec: {e:?}"
))
})?;
vec.clear();
*p_indices = vec;
}
}

Box::new(it)
Box::new(partitioned_batches.into_iter())
}
};

Expand Down