Skip to content

Commit bf85744

Browse files
gboucher90alamb
authored andcommitted
Recreate group_values after spill merge to fix duplicate group keys (apache#20724)
When switching to streaming merge after spill, group_ordering is set to Full but group_values is not recreated. The existing GroupValuesColumn<false> uses vectorized_intern which can produce non-monotonic group indices, violating GroupOrderingFull's assumption and causing duplicate groups in the output. Fix: recreate group_values with the correct streaming mode after updating group_ordering in update_merged_stream().
1 parent 2947378 commit bf85744

File tree

1 file changed

+12
-0
lines changed

1 file changed

+12
-0
lines changed

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,6 +1233,18 @@ impl GroupedHashAggregateStream {
12331233
// on the grouping columns.
12341234
self.group_ordering = GroupOrdering::Full(GroupOrderingFull::new());
12351235

1236+
// Recreate group_values to use streaming mode (GroupValuesColumn<true>
1237+
// with scalarized_intern) which preserves input row order, as required
1238+
// by GroupOrderingFull. This is only needed for multi-column group by,
1239+
// since single-column uses GroupValuesPrimitive which is always safe.
1240+
let group_schema = self
1241+
.spill_state
1242+
.merging_group_by
1243+
.group_schema(&self.spill_state.spill_schema)?;
1244+
if group_schema.fields().len() > 1 {
1245+
self.group_values = new_group_values(group_schema, &self.group_ordering)?;
1246+
}
1247+
12361248
// Use `OutOfMemoryMode::ReportError` from this point on
12371249
// to ensure we don't spill the spilled data to disk again.
12381250
self.oom_mode = OutOfMemoryMode::ReportError;

0 commit comments

Comments
 (0)