Skip to content

Commit 639ed15

Browse files
gboucher90alamb
authored andcommitted
Recreate group_values after spill merge to fix duplicate group keys (apache#20724)
When switching to streaming merge after spill, group_ordering is set to Full but group_values is not recreated. The existing GroupValuesColumn<false> uses vectorized_intern which can produce non-monotonic group indices, violating GroupOrderingFull's assumption and causing duplicate groups in the output. Fix: recreate group_values with the correct streaming mode after updating group_ordering in update_merged_stream().
1 parent b7ccb53 commit 639ed15

File tree

1 file changed

+12
-0
lines changed

1 file changed

+12
-0
lines changed

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1267,6 +1267,18 @@ impl GroupedHashAggregateStream {
12671267
// on the grouping columns.
12681268
self.group_ordering = GroupOrdering::Full(GroupOrderingFull::new());
12691269

1270+
// Recreate group_values to use streaming mode (GroupValuesColumn<true>
1271+
// with scalarized_intern) which preserves input row order, as required
1272+
// by GroupOrderingFull. This is only needed for multi-column group by,
1273+
// since single-column uses GroupValuesPrimitive which is always safe.
1274+
let group_schema = self
1275+
.spill_state
1276+
.merging_group_by
1277+
.group_schema(&self.spill_state.spill_schema)?;
1278+
if group_schema.fields().len() > 1 {
1279+
self.group_values = new_group_values(group_schema, &self.group_ordering)?;
1280+
}
1281+
12701282
// Use `OutOfMemoryMode::ReportError` from this point on
12711283
// to ensure we don't spill the spilled data to disk again.
12721284
self.oom_mode = OutOfMemoryMode::ReportError;

0 commit comments

Comments
 (0)