Skip to content

Commit 7deee7c

Browse files
committed
Use update_batch_preordered and merge_batch_preordered
1 parent cb5aee4 commit 7deee7c

File tree

3 files changed

+123
-64
lines changed

3 files changed

+123
-64
lines changed

datafusion/src/physical_plan/groups_accumulator.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,21 @@ pub trait GroupsAccumulator: Send {
155155
total_num_groups: usize,
156156
) -> Result<()>;
157157

158+
/// update_batch but where group_indices is already ordered into adjacent groups. `offsets` has
159+
/// the group boundaries, and note that `offsets[0] == 0` (and the last offset is
160+
/// `group_indices.len()`).
161+
fn update_batch_preordered(
162+
&mut self,
163+
values: &[ArrayRef],
164+
group_indices: &[usize],
165+
_offsets: &[usize],
166+
opt_filter: Option<&BooleanArray>,
167+
total_num_groups: usize,
168+
) -> Result<()> {
169+
// Extremely wasteful:
170+
self.update_batch(values, group_indices, opt_filter, total_num_groups)
171+
}
172+
158173
/// Returns the final aggregate value for each group as a single
159174
/// `RecordBatch`, resetting the internal state.
160175
///
@@ -224,6 +239,21 @@ pub trait GroupsAccumulator: Send {
224239
total_num_groups: usize,
225240
) -> Result<()>;
226241

242+
/// merge_batch but where group_indices is already ordered into adjacent groups. `offsets` has
243+
/// the group boundaries, and note that `offsets[0] == 0` (and the last offset is
244+
/// `group_indices.len()`).
245+
fn merge_batch_preordered(
246+
&mut self,
247+
values: &[ArrayRef],
248+
group_indices: &[usize],
249+
_offsets: &[usize],
250+
opt_filter: Option<&BooleanArray>,
251+
total_num_groups: usize,
252+
) -> Result<()> {
253+
// Extremly wasteful:
254+
self.merge_batch(values, group_indices, opt_filter, total_num_groups)
255+
}
256+
227257
/// Converts an input batch directly to the intermediate aggregate state.
228258
///
229259
/// This is the equivalent of treating each input row as its own group. It

datafusion/src/physical_plan/groups_accumulator_flat_adapter.rs

Lines changed: 87 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use arrow::{
3232
compute,
3333
datatypes::UInt32Type,
3434
};
35+
use smallvec::smallvec;
3536
use smallvec::SmallVec;
3637

3738
/// An adapter that implements [`GroupsAccumulator`] for any [`Accumulator`]
@@ -110,14 +111,14 @@ struct AccumulatorState<AccumulatorType> {
110111
/// scratch space: indexes in the input array that will be fed to
111112
/// this accumulator. Stores indexes as `u32` to match the arrow
112113
/// `take` kernel input.
113-
indices: Vec<u32>,
114+
indices: SmallVec<[u32; 4]>,
114115
}
115116

116117
impl<AccumulatorType> AccumulatorState<AccumulatorType> {
117118
fn new(accumulator: AccumulatorType) -> Self {
118119
Self {
119120
accumulator,
120-
indices: vec![],
121+
indices: smallvec![],
121122
}
122123
}
123124

@@ -163,31 +164,47 @@ impl<AccumulatorType: Accumulator> GroupsAccumulatorFlatAdapter<AccumulatorType>
163164
Ok(())
164165
}
165166

166-
/// invokes f(accumulator, values) for each group that has values
167-
/// in group_indices.
168-
///
169-
/// This function first reorders the input and filter so that
170-
/// values for each group_index are contiguous and then invokes f
171-
/// on the contiguous ranges, to minimize per-row overhead
172-
///
173-
/// ```text
174-
/// ┌─────────┐ ┌─────────┐ ┌ ─ ─ ─ ─ ┐ ┌─────────┐ ┌ ─ ─ ─ ─ ┐
175-
/// │ ┌─────┐ │ │ ┌─────┐ │ ┌─────┐ ┏━━━━━┓ │ ┌─────┐ │ ┌─────┐
176-
/// │ │ 2 │ │ │ │ 200 │ │ │ │ t │ │ ┃ 0 ┃ │ │ 200 │ │ │ │ t │ │
177-
/// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤
178-
/// │ │ 2 │ │ │ │ 100 │ │ │ │ f │ │ ┃ 0 ┃ │ │ 300 │ │ │ │ t │ │
179-
/// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤
180-
/// │ │ 0 │ │ │ │ 200 │ │ │ │ t │ │ ┃ 1 ┃ │ │ 200 │ │ │ │NULL │ │
181-
/// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ────────▶ ┣━━━━━┫ │ ├─────┤ │ ├─────┤
182-
/// │ │ 1 │ │ │ │ 200 │ │ │ │NULL │ │ ┃ 2 ┃ │ │ 200 │ │ │ │ t │ │
183-
/// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤
184-
/// │ │ 0 │ │ │ │ 300 │ │ │ │ t │ │ ┃ 2 ┃ │ │ 100 │ │ │ │ f │ │
185-
/// │ └─────┘ │ │ └─────┘ │ └─────┘ ┗━━━━━┛ │ └─────┘ │ └─────┘
186-
/// └─────────┘ └─────────┘ └ ─ ─ ─ ─ ┘ └─────────┘ └ ─ ─ ─ ─ ┘
187-
///
188-
/// logical group values opt_filter logical group values opt_filter
189-
///
190-
/// ```
167+
/// invokes f(accumulator, values) for each group that has values in group_indices, but unlike
168+
/// invoke_per_accumulator, the group_indices are already clumped together in intervals
169+
/// [offsets[i], offsets[i + 1]).
170+
fn invoke_per_accumulator_preordered<F>(
171+
&mut self,
172+
values: &[ArrayRef],
173+
group_indices: &[usize],
174+
offsets_param: &[usize],
175+
opt_filter: Option<&BooleanArray>,
176+
total_num_groups: usize,
177+
f: F,
178+
) -> Result<()>
179+
where
180+
F: Fn(&mut dyn Accumulator, &[ArrayRef]) -> Result<()>,
181+
{
182+
self.make_accumulators_if_needed(total_num_groups)?;
183+
184+
assert_eq!(
185+
values[0].len(),
186+
group_indices.len(),
187+
"asserting values[0].len() == group_indices.len()"
188+
);
189+
190+
let mut sizes_pre = 0;
191+
let mut sizes_post = 0;
192+
for offsets in offsets_param.windows(2) {
193+
let group_idx = group_indices[offsets[0]];
194+
let state = &mut self.states[group_idx];
195+
// sizes_pre += state.size(); // TODO: Add Accumulator::size?
196+
197+
let values_to_accumulate =
198+
slice_and_maybe_filter(values, opt_filter, offsets)?;
199+
f(&mut state.accumulator, &values_to_accumulate)?;
200+
201+
// sizes_post += state.size(); // TODO: Add Accumulator::size?
202+
}
203+
204+
// self.adjust_allocation(sizes_pre, sizes_post); // TODO: Add Accumulator::size?
205+
Ok(())
206+
}
207+
191208
fn invoke_per_accumulator<F>(
192209
&mut self,
193210
values: &[ArrayRef],
@@ -326,6 +343,27 @@ impl<AccumulatorType: Accumulator> GroupsAccumulator
326343
Ok(())
327344
}
328345

346+
fn update_batch_preordered(
347+
&mut self,
348+
values: &[ArrayRef],
349+
group_indices: &[usize],
350+
offsets: &[usize],
351+
opt_filter: Option<&BooleanArray>,
352+
total_num_groups: usize,
353+
) -> Result<()> {
354+
self.invoke_per_accumulator_preordered(
355+
values,
356+
group_indices,
357+
offsets,
358+
opt_filter,
359+
total_num_groups,
360+
|accumulator, values_to_accumulate| {
361+
accumulator.update_batch(values_to_accumulate)
362+
},
363+
)?;
364+
Ok(())
365+
}
366+
329367
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
330368
let vec_size_pre = self.states.allocated_size();
331369

@@ -410,6 +448,28 @@ impl<AccumulatorType: Accumulator> GroupsAccumulator
410448
Ok(())
411449
}
412450

451+
fn merge_batch_preordered(
452+
&mut self,
453+
values: &[ArrayRef],
454+
group_indices: &[usize],
455+
offsets: &[usize],
456+
opt_filter: Option<&BooleanArray>,
457+
total_num_groups: usize,
458+
) -> Result<()> {
459+
self.invoke_per_accumulator_preordered(
460+
values,
461+
group_indices,
462+
offsets,
463+
opt_filter,
464+
total_num_groups,
465+
|accumulator, values_to_accumulate| {
466+
accumulator.merge_batch(values_to_accumulate)?;
467+
Ok(())
468+
},
469+
)?;
470+
Ok(())
471+
}
472+
413473
fn size(&self) -> usize {
414474
self.allocation_bytes
415475
}

datafusion/src/physical_plan/hash_aggregate.rs

Lines changed: 6 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -558,8 +558,7 @@ pub(crate) fn group_aggregate_batch(
558558
.collect::<Vec<ArrayRef>>(),
559559
)
560560
})
561-
.enumerate() // TODO: Remove as we don't use accumulator_index.
562-
.try_for_each(|(accumulator_index, (accumulator, values))| {
561+
.try_for_each(|(accumulator, values)| {
563562
if let Some(accumulator) = accumulator {
564563
match mode {
565564
AggregateMode::Partial | AggregateMode::Full => {
@@ -571,39 +570,7 @@ pub(crate) fn group_aggregate_batch(
571570
}
572571
}
573572
} else {
574-
// TODO: Remove this block.
575-
/*
576-
if false {
577-
group_indices.clear();
578-
group_indices
579-
.extend(std::iter::repeat(*group_index).take(values[0].len()));
580-
match mode {
581-
AggregateMode::Partial | AggregateMode::Full => {
582-
accumulation_state.groups_accumulators[accumulator_index]
583-
.as_mut()
584-
.unwrap()
585-
.update_batch(
586-
&values,
587-
&group_indices,
588-
None,
589-
accumulation_state.next_group_index,
590-
)
591-
}
592-
AggregateMode::FinalPartitioned | AggregateMode::Final => {
593-
// note: the aggregation here is over states, not values, thus the merge
594-
accumulation_state.groups_accumulators[accumulator_index]
595-
.as_mut()
596-
.unwrap()
597-
.merge_batch(
598-
&values,
599-
&group_indices,
600-
None,
601-
accumulation_state.next_group_index,
602-
)
603-
}
604-
}
605-
}
606-
*/
573+
// We do groups accumulator separately.
607574
Ok(())
608575
}
609576
})
@@ -622,17 +589,19 @@ pub(crate) fn group_aggregate_batch(
622589
if let Some(accumulator) = accumulator {
623590
match mode {
624591
AggregateMode::Partial | AggregateMode::Full => accumulator
625-
.update_batch(
592+
.update_batch_preordered(
626593
&values[accumulator_index],
627594
&all_group_indices,
595+
&offsets,
628596
None,
629597
accumulation_state.next_group_index,
630598
)?,
631599
AggregateMode::FinalPartitioned | AggregateMode::Final => {
632600
// note: the aggregation here is over states, not values, thus the merge
633-
accumulator.merge_batch(
601+
accumulator.merge_batch_preordered(
634602
&values[accumulator_index],
635603
&all_group_indices,
604+
&offsets,
636605
None,
637606
accumulation_state.next_group_index,
638607
)?

0 commit comments

Comments
 (0)