Skip to content

Commit 9797095

Browse files
authored
[branch-52] perf: sort replace free()->try_grow() pattern with try_resize() to reduce memory pool interactions (apache#20732)
Backport apache#20729 to `branch-52`.
1 parent afc1c72 commit 9797095

File tree

1 file changed

+20
-30
lines changed
  • datafusion/physical-plan/src/sorts

1 file changed

+20
-30
lines changed

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -728,37 +728,27 @@ impl ExternalSorter {
728728
let sorted_batches = sort_batch_chunked(&batch, &expressions, batch_size)?;
729729
drop(batch);
730730

731-
// Free the old reservation and grow it to match the actual sorted output size
732-
reservation.free();
731+
// Resize the reservation to match the actual sorted output size.
732+
// Using try_resize avoids a release-then-reacquire cycle, which
733+
// matters for MemoryPool implementations where grow/shrink have
734+
// non-trivial cost (e.g. JNI calls in Comet).
735+
let total_sorted_size: usize = sorted_batches
736+
.iter()
737+
.map(get_record_batch_memory_size)
738+
.sum();
739+
reservation
740+
.try_resize(total_sorted_size)
741+
.map_err(Self::err_with_oom_context)?;
733742

734-
Result::<_, DataFusionError>::Ok((schema, sorted_batches, reservation))
735-
})
736-
.then({
737-
move |batches| async move {
738-
match batches {
739-
Ok((schema, sorted_batches, mut reservation)) => {
740-
// Calculate the total size of sorted batches
741-
let total_sorted_size: usize = sorted_batches
742-
.iter()
743-
.map(get_record_batch_memory_size)
744-
.sum();
745-
reservation
746-
.try_grow(total_sorted_size)
747-
.map_err(Self::err_with_oom_context)?;
748-
749-
// Wrap in ReservationStream to hold the reservation
750-
Ok(Box::pin(ReservationStream::new(
751-
Arc::clone(&schema),
752-
Box::pin(RecordBatchStreamAdapter::new(
753-
schema,
754-
futures::stream::iter(sorted_batches.into_iter().map(Ok)),
755-
)),
756-
reservation,
757-
)) as SendableRecordBatchStream)
758-
}
759-
Err(e) => Err(e),
760-
}
761-
}
743+
// Wrap in ReservationStream to hold the reservation
744+
Result::<_, DataFusionError>::Ok(Box::pin(ReservationStream::new(
745+
Arc::clone(&schema),
746+
Box::pin(RecordBatchStreamAdapter::new(
747+
schema,
748+
futures::stream::iter(sorted_batches.into_iter().map(Ok)),
749+
)),
750+
reservation,
751+
)) as SendableRecordBatchStream)
762752
})
763753
.try_flatten()
764754
.map(move |batch| match batch {

0 commit comments

Comments
 (0)