Skip to content

Commit eb30c19

Browse files
authored
Implement disk spilling for all grouping ordering modes in GroupedHashAggregateStream (#19287)
## Which issue does this PR close? - Closes #19286. - Related to #13123 ## Rationale for this change GroupedHashAggregateStream currently always reports that it can spill to the memory tracking subsystem even though this is dependent on the aggregation mode and the grouping order. The optimistic logic in `group_aggregate_batch` does not correctly take the spilling preconditions into account which can lead to excessive memory use. In order to to resolve this, this PR implements disk spilling for all grouping modes. ## What changes are included in this PR? - Correctly set `MemoryConsumer::can_spill` to reflect actual spilling behaviour - Ensure optimistic out-of-memory tolerance in `group_aggregate_batch` is aligned with disk spilling or early emission logic - Implement output order respecting disk spilling for partially and fully sorted inputs. ## Are these changes tested? Added additional test case to demonstrate problem. Added test case to check that output order is respected after spilling. ## Are there any user-facing changes? Yes, memory exhaustion may be reported much earlier in the query pipeline than is currently the case. In my local tests with a per consumer memory limit of 32MiB, grouped aggregation would consume 480MiB in practice. This was then reported by ExternalSortExec which choked on trying to reserve that much memory.
1 parent 6fa9c1a commit eb30c19

File tree

11 files changed

+423
-197
lines changed

11 files changed

+423
-197
lines changed

datafusion/physical-expr-common/src/sort_expr.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,17 @@ impl LexOrdering {
457457
req.expr.eq(&cur.expr) && is_reversed_sort_options(&req.options, &cur.options)
458458
})
459459
}
460+
461+
/// Returns the sort options for the given expression if one is defined in this `LexOrdering`.
462+
pub fn get_sort_options(&self, expr: &dyn PhysicalExpr) -> Option<SortOptions> {
463+
for e in self {
464+
if e.expr.as_ref().dyn_eq(expr) {
465+
return Some(e.options);
466+
}
467+
}
468+
469+
None
470+
}
460471
}
461472

462473
/// Check if two SortOptions represent reversed orderings.

datafusion/physical-plan/src/aggregates/group_values/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use arrow::array::types::{
2222
Time64MicrosecondType, Time64NanosecondType, TimestampMicrosecondType,
2323
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
2424
};
25-
use arrow::array::{ArrayRef, RecordBatch, downcast_primitive};
25+
use arrow::array::{ArrayRef, downcast_primitive};
2626
use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
2727
use datafusion_common::Result;
2828

@@ -112,7 +112,7 @@ pub trait GroupValues: Send {
112112
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;
113113

114114
/// Clear the contents and shrink the capacity to the size of the batch (free up memory usage)
115-
fn clear_shrink(&mut self, batch: &RecordBatch);
115+
fn clear_shrink(&mut self, num_rows: usize);
116116
}
117117

118118
/// Return a specialized implementation of [`GroupValues`] for the given schema.

datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::aggregates::group_values::multi_group_by::{
3030
bytes_view::ByteViewGroupValueBuilder, primitive::PrimitiveGroupValueBuilder,
3131
};
3232
use ahash::RandomState;
33-
use arrow::array::{Array, ArrayRef, RecordBatch};
33+
use arrow::array::{Array, ArrayRef};
3434
use arrow::compute::cast;
3535
use arrow::datatypes::{
3636
BinaryViewType, DataType, Date32Type, Date64Type, Decimal128Type, Float32Type,
@@ -1181,14 +1181,13 @@ impl<const STREAMING: bool> GroupValues for GroupValuesColumn<STREAMING> {
11811181
Ok(output)
11821182
}
11831183

1184-
fn clear_shrink(&mut self, batch: &RecordBatch) {
1185-
let count = batch.num_rows();
1184+
fn clear_shrink(&mut self, num_rows: usize) {
11861185
self.group_values.clear();
11871186
self.map.clear();
1188-
self.map.shrink_to(count, |_| 0); // hasher does not matter since the map is cleared
1187+
self.map.shrink_to(num_rows, |_| 0); // hasher does not matter since the map is cleared
11891188
self.map_size = self.map.capacity() * size_of::<(u64, usize)>();
11901189
self.hashes_buffer.clear();
1191-
self.hashes_buffer.shrink_to(count);
1190+
self.hashes_buffer.shrink_to(num_rows);
11921191

11931192
// Such structures are only used in `non-streaming` case
11941193
if !STREAMING {

datafusion/physical-plan/src/aggregates/group_values/row.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
use crate::aggregates::group_values::GroupValues;
1919
use ahash::RandomState;
20-
use arrow::array::{Array, ArrayRef, ListArray, RecordBatch, StructArray};
20+
use arrow::array::{Array, ArrayRef, ListArray, StructArray};
2121
use arrow::compute::cast;
2222
use arrow::datatypes::{DataType, SchemaRef};
2323
use arrow::row::{RowConverter, Rows, SortField};
@@ -243,17 +243,16 @@ impl GroupValues for GroupValuesRows {
243243
Ok(output)
244244
}
245245

246-
fn clear_shrink(&mut self, batch: &RecordBatch) {
247-
let count = batch.num_rows();
246+
fn clear_shrink(&mut self, num_rows: usize) {
248247
self.group_values = self.group_values.take().map(|mut rows| {
249248
rows.clear();
250249
rows
251250
});
252251
self.map.clear();
253-
self.map.shrink_to(count, |_| 0); // hasher does not matter since the map is cleared
252+
self.map.shrink_to(num_rows, |_| 0); // hasher does not matter since the map is cleared
254253
self.map_size = self.map.capacity() * size_of::<(u64, usize)>();
255254
self.hashes_buffer.clear();
256-
self.hashes_buffer.shrink_to(count);
255+
self.hashes_buffer.shrink_to(num_rows);
257256
}
258257
}
259258

datafusion/physical-plan/src/aggregates/group_values/single_group_by/boolean.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use crate::aggregates::group_values::GroupValues;
1919

2020
use arrow::array::{
2121
ArrayRef, AsArray as _, BooleanArray, BooleanBufferBuilder, NullBufferBuilder,
22-
RecordBatch,
2322
};
2423
use datafusion_common::Result;
2524
use datafusion_expr::EmitTo;
@@ -146,7 +145,7 @@ impl GroupValues for GroupValuesBoolean {
146145
Ok(vec![Arc::new(BooleanArray::new(values, nulls)) as _])
147146
}
148147

149-
fn clear_shrink(&mut self, _batch: &RecordBatch) {
148+
fn clear_shrink(&mut self, _num_rows: usize) {
150149
self.false_group = None;
151150
self.true_group = None;
152151
self.null_group = None;

datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::mem::size_of;
1919

2020
use crate::aggregates::group_values::GroupValues;
2121

22-
use arrow::array::{Array, ArrayRef, OffsetSizeTrait, RecordBatch};
22+
use arrow::array::{Array, ArrayRef, OffsetSizeTrait};
2323
use datafusion_common::Result;
2424
use datafusion_expr::EmitTo;
2525
use datafusion_physical_expr_common::binary_map::{ArrowBytesMap, OutputType};
@@ -120,7 +120,7 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesBytes<O> {
120120
Ok(vec![group_values])
121121
}
122122

123-
fn clear_shrink(&mut self, _batch: &RecordBatch) {
123+
fn clear_shrink(&mut self, _num_rows: usize) {
124124
// in theory we could potentially avoid this reallocation and clear the
125125
// contents of the maps, but for now we just reset the map from the beginning
126126
self.map.take();

datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use crate::aggregates::group_values::GroupValues;
19-
use arrow::array::{Array, ArrayRef, RecordBatch};
19+
use arrow::array::{Array, ArrayRef};
2020
use datafusion_expr::EmitTo;
2121
use datafusion_physical_expr::binary_map::OutputType;
2222
use datafusion_physical_expr_common::binary_view_map::ArrowBytesViewMap;
@@ -122,7 +122,7 @@ impl GroupValues for GroupValuesBytesView {
122122
Ok(vec![group_values])
123123
}
124124

125-
fn clear_shrink(&mut self, _batch: &RecordBatch) {
125+
fn clear_shrink(&mut self, _num_rows: usize) {
126126
// in theory we could potentially avoid this reallocation and clear the
127127
// contents of the maps, but for now we just reset the map from the beginning
128128
self.map.take();

datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use arrow::array::{
2323
cast::AsArray,
2424
};
2525
use arrow::datatypes::{DataType, i256};
26-
use arrow::record_batch::RecordBatch;
2726
use datafusion_common::Result;
2827
use datafusion_execution::memory_pool::proxy::VecAllocExt;
2928
use datafusion_expr::EmitTo;
@@ -213,11 +212,10 @@ where
213212
Ok(vec![Arc::new(array.with_data_type(self.data_type.clone()))])
214213
}
215214

216-
fn clear_shrink(&mut self, batch: &RecordBatch) {
217-
let count = batch.num_rows();
215+
fn clear_shrink(&mut self, num_rows: usize) {
218216
self.values.clear();
219-
self.values.shrink_to(count);
217+
self.values.shrink_to(num_rows);
220218
self.map.clear();
221-
self.map.shrink_to(count, |_| 0); // hasher does not matter since the map is cleared
219+
self.map.shrink_to(num_rows, |_| 0); // hasher does not matter since the map is cleared
222220
}
223221
}

0 commit comments

Comments
 (0)