Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 76 additions & 9 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ struct ExternalSorter {
}

impl ExternalSorter {
fn is_large_batch(&self, batch: &RecordBatch) -> bool {
batch.num_rows() > self.batch_size
}
// TODO: make a builder or some other nicer API to avoid the
// clippy warning
#[expect(clippy::too_many_arguments)]
Expand Down Expand Up @@ -308,9 +311,47 @@ impl ExternalSorter {
})
}

/// Sorts a single oversized `RecordBatch` and spills it incrementally in
/// `batch_size`-sized chunks.
///
/// This is used as a fallback under severe memory pressure when a single
/// input batch cannot be safely sorted in memory and there are no buffered
/// batches available to spill first.
async fn sort_and_spill_large_batch(&mut self, batch: RecordBatch) -> Result<()> {
debug!("Sorting and spilling large batch chunk-by-chunk");

// Lazily create spill file
if self.in_progress_spill_file.is_none() {
self.in_progress_spill_file =
Some((self.spill_manager.create_in_progress_file("Sorting")?, 0));
}

// Sort the batch into batch_size-sized chunks
let sorted_chunks = sort_batch_chunked(&batch, &self.expr, self.batch_size)?;

// Drop the original large batch early to free memory
drop(batch);

let (spill_file, max_batch_size) = self
.in_progress_spill_file
.as_mut()
.expect("spill file must exist");

// Append each sorted chunk to the spill file
for chunk in sorted_chunks {
let chunk_size = chunk.get_sliced_size()?;
spill_file.append_batch(&chunk)?;
*max_batch_size = (*max_batch_size).max(chunk_size);
// chunk dropped here
}

Ok(())
}

/// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
///
/// Updates memory usage metrics, and possibly triggers spilling to disk
/// Buffers an input batch, spilling to disk if memory is insufficient.
async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
if input.num_rows() == 0 {
return Ok(());
Expand All @@ -320,6 +361,7 @@ impl ExternalSorter {
self.reserve_memory_for_batch_and_maybe_spill(&input)
.await?;

// Safe to buffer after successful memory reservation or spill handling
self.in_mem_batches.push(input);
Ok(())
}
Expand Down Expand Up @@ -795,20 +837,45 @@ impl ExternalSorter {
&mut self,
input: &RecordBatch,
) -> Result<()> {
let size = get_reserved_bytes_for_record_batch(input)?;
let full_sort_size = get_reserved_bytes_for_record_batch(input)?;

match self.reservation.try_grow(size) {
match self.reservation.try_grow(full_sort_size) {
Ok(_) => Ok(()),

Err(e) => {
if self.in_mem_batches.is_empty() {
return Err(Self::err_with_oom_context(e));
// CASE 1: we can spill existing batches
if !self.in_mem_batches.is_empty() {
self.sort_and_spill_in_mem_batches().await?;
self.reservation
.try_grow(full_sort_size)
.map_err(Self::err_with_oom_context)?;
return Ok(());
}

// Spill and try again.
self.sort_and_spill_in_mem_batches().await?;
self.reservation
.try_grow(size)
.map_err(Self::err_with_oom_context)
// CASE 2: single oversized batch under memory pressure
//
// If we cannot reserve enough memory and there are no buffered batches
// to spill first, fall back to chunked sorting and spilling.
//
// This avoids creating a single large sorted batch in memory while
// preserving correct ordering and output batch sizing.
if self.is_large_batch(input) {
debug!("Chunked spilling oversized batch");

// Reserve minimal memory for the input batch
let batch_mem = get_record_batch_memory_size(input);
self.reservation
.try_grow(batch_mem)
.map_err(Self::err_with_oom_context)?;

// Spill immediately in sorted chunks
self.sort_and_spill_large_batch(input.clone()).await?; // Spill immediately using chunked sorting to avoid OOM on a single large batch

return Ok(());
}

// CASE 3: true OOM
Err(Self::err_with_oom_context(e))
}
}
}
Expand Down