Skip to content

Commit 6ec97da

Browse files
committed
Make GroupsAccumulatorFlatAdapter accumulators not intermingle with state
1 parent 7deee7c commit 6ec97da

File tree

1 file changed

+57
-54
lines changed

1 file changed

+57
-54
lines changed

datafusion/src/physical_plan/groups_accumulator_flat_adapter.rs

Lines changed: 57 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,14 @@ use smallvec::SmallVec;
9191
pub struct GroupsAccumulatorFlatAdapter<AccumulatorType: Accumulator> {
9292
factory: Box<dyn Fn() -> Result<AccumulatorType> + Send>,
9393

94-
/// state for each group, stored in group_index order
95-
states: Vec<AccumulatorState<AccumulatorType>>,
94+
/// [`Accumulators`] that store the per-group state
95+
accumulators: Vec<AccumulatorType>,
96+
97+
/// scratch space: indexes in the input array that will be fed to this accumulator (this is a
98+
/// parallel array to accumulators). Stores indexes as `u32` to match the arrow `take` kernel
99+
/// input. Note that we have indices.len() <= accumulators.len(); it only gets extended when
100+
/// used.
101+
indices: Vec<SmallVec<[u32; 4]>>,
96102

97103
// TODO: Code maintaining this is commented.
98104
/// Current memory usage, in bytes.
@@ -104,31 +110,15 @@ pub struct GroupsAccumulatorFlatAdapter<AccumulatorType: Accumulator> {
104110
allocation_bytes: usize,
105111
}
106112

107-
struct AccumulatorState<AccumulatorType> {
108-
/// [`Accumulator`] that stores the per-group state
109-
accumulator: AccumulatorType,
110-
111-
/// scratch space: indexes in the input array that will be fed to
112-
/// this accumulator. Stores indexes as `u32` to match the arrow
113-
/// `take` kernel input.
114-
indices: SmallVec<[u32; 4]>,
115-
}
116-
117-
impl<AccumulatorType> AccumulatorState<AccumulatorType> {
118-
fn new(accumulator: AccumulatorType) -> Self {
119-
Self {
120-
accumulator,
121-
indices: smallvec![],
122-
}
123-
}
124-
125-
/* TODO: Add Accumulator::size?
126-
/// Returns the amount of memory taken by this structure and its accumulator
127-
fn size(&self) -> usize {
128-
self.accumulator.size() + size_of_val(self) + self.indices.allocated_size()
129-
}
130-
*/
113+
// TODO: Remove this or remember to account for accumulators and indices in a size() calculation.
114+
// impl<AccumulatorType> AccumulatorState<AccumulatorType> {
115+
/* TODO: Add Accumulator::size?
116+
/// Returns the amount of memory taken by this structure and its accumulator
117+
fn size(&self) -> usize {
118+
self.accumulator.size() + size_of_val(self) + self.indices.allocated_size()
131119
}
120+
*/
121+
// }
132122

133123
impl<AccumulatorType: Accumulator> GroupsAccumulatorFlatAdapter<AccumulatorType> {
134124
/// Create a new adapter that will create a new [`Accumulator`]
@@ -139,28 +129,40 @@ impl<AccumulatorType: Accumulator> GroupsAccumulatorFlatAdapter<AccumulatorType>
139129
{
140130
Self {
141131
factory: Box::new(factory),
142-
states: vec![],
132+
accumulators: vec![],
133+
indices: vec![],
143134
allocation_bytes: 0,
144135
}
145136
}
146137

147138
/// Ensure that self.accumulators has total_num_groups
148139
fn make_accumulators_if_needed(&mut self, total_num_groups: usize) -> Result<()> {
149140
// can't shrink
150-
assert!(total_num_groups >= self.states.len());
151-
let vec_size_pre = self.states.allocated_size();
141+
assert!(total_num_groups >= self.accumulators.len());
142+
let vec_size_pre = self.accumulators.allocated_size();
152143

153144
// instantiate new accumulators
154-
let new_accumulators = total_num_groups - self.states.len();
145+
let new_accumulators = total_num_groups - self.accumulators.len();
155146
for _ in 0..new_accumulators {
156147
let accumulator = (self.factory)()?;
157-
let state = AccumulatorState::new(accumulator);
158148
// TODO: Add Accumulator::size()?
159149
// self.add_allocation(state.size());
160-
self.states.push(state);
150+
self.accumulators.push(accumulator);
161151
}
162152

163-
self.adjust_allocation(vec_size_pre, self.states.allocated_size());
153+
self.adjust_allocation(vec_size_pre, self.accumulators.allocated_size());
154+
Ok(())
155+
}
156+
157+
fn make_indices_if_needed(&mut self, total_num_groups: usize) -> Result<()> {
158+
// can't shrink
159+
assert!(total_num_groups >= self.indices.len());
160+
let vec_size_pre = self.indices.allocated_size();
161+
162+
// instantiate new indices
163+
self.indices.resize(total_num_groups, smallvec![]);
164+
165+
self.adjust_allocation(vec_size_pre, self.indices.allocated_size());
164166
Ok(())
165167
}
166168

@@ -191,12 +193,12 @@ impl<AccumulatorType: Accumulator> GroupsAccumulatorFlatAdapter<AccumulatorType>
191193
let mut sizes_post = 0;
192194
for offsets in offsets_param.windows(2) {
193195
let group_idx = group_indices[offsets[0]];
194-
let state = &mut self.states[group_idx];
196+
let accumulator: &mut AccumulatorType = &mut self.accumulators[group_idx];
195197
// sizes_pre += state.size(); // TODO: Add Accumulator::size?
196198

197199
let values_to_accumulate =
198200
slice_and_maybe_filter(values, opt_filter, offsets)?;
199-
f(&mut state.accumulator, &values_to_accumulate)?;
201+
f(accumulator, &values_to_accumulate)?;
200202

201203
// sizes_post += state.size(); // TODO: Add Accumulator::size?
202204
}
@@ -217,6 +219,7 @@ impl<AccumulatorType: Accumulator> GroupsAccumulatorFlatAdapter<AccumulatorType>
217219
F: Fn(&mut dyn Accumulator, &[ArrayRef]) -> Result<()>,
218220
{
219221
self.make_accumulators_if_needed(total_num_groups)?;
222+
self.make_indices_if_needed(total_num_groups)?;
220223

221224
assert_eq!(
222225
values[0].len(),
@@ -228,7 +231,7 @@ impl<AccumulatorType: Accumulator> GroupsAccumulatorFlatAdapter<AccumulatorType>
228231
// Note that self.state.indices starts empty for all groups
229232
// (it is cleared out below)
230233
for (idx, group_index) in group_indices.iter().enumerate() {
231-
self.states[*group_index].indices.push(idx as u32);
234+
self.indices[*group_index].push(idx as u32);
232235
}
233236

234237
// groups_with_rows holds a list of group indexes that have
@@ -245,8 +248,7 @@ impl<AccumulatorType: Accumulator> GroupsAccumulatorFlatAdapter<AccumulatorType>
245248
let mut offsets = vec![0];
246249

247250
let mut offset_so_far = 0;
248-
for (group_index, state) in self.states.iter_mut().enumerate() {
249-
let indices = &state.indices;
251+
for (group_index, indices) in self.indices.iter_mut().enumerate() {
250252
if indices.is_empty() {
251253
continue;
252254
}
@@ -272,19 +274,18 @@ impl<AccumulatorType: Accumulator> GroupsAccumulatorFlatAdapter<AccumulatorType>
272274
let mut sizes_pre = 0;
273275
let mut sizes_post = 0;
274276
for (&group_idx, offsets) in iter {
275-
let state = &mut self.states[group_idx];
276277
// sizes_pre += state.size(); // TODO: Add Accumulator::size?
277278

278279
let values_to_accumulate = slice_and_maybe_filter(
279280
&values,
280281
opt_filter.as_ref().map(|f| as_boolean_array(f)),
281282
offsets,
282283
)?;
283-
f(&mut state.accumulator, &values_to_accumulate)?;
284+
f(&mut self.accumulators[group_idx], &values_to_accumulate)?;
284285

285286
// clear out the state so they are empty for next
286287
// iteration
287-
state.indices.clear();
288+
self.indices[group_idx].clear();
288289
// sizes_post += state.size(); // TODO: Add Accumulator::size?
289290
}
290291

@@ -365,41 +366,43 @@ impl<AccumulatorType: Accumulator> GroupsAccumulator
365366
}
366367

367368
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
368-
let vec_size_pre = self.states.allocated_size();
369+
let vec_size_pre = self.accumulators.allocated_size();
369370

370-
let states = emit_to.take_needed(&mut self.states);
371+
let accumulators = emit_to.take_needed(&mut self.accumulators);
372+
self.indices.truncate(self.accumulators.len());
371373

372-
let results: Vec<ScalarValue> = states
374+
let results: Vec<ScalarValue> = accumulators
373375
.into_iter()
374-
.map(|mut state| {
376+
.map(|mut accumulator| {
375377
// self.free_allocation(state.size()); // TODO: Add Accumulator::size?
376-
state.accumulator.evaluate()
378+
accumulator.evaluate()
377379
})
378380
.collect::<Result<_>>()?;
379381

380382
let result = ScalarValue::iter_to_array(results);
381383

382-
self.adjust_allocation(vec_size_pre, self.states.allocated_size());
384+
self.adjust_allocation(vec_size_pre, self.accumulators.allocated_size());
383385

384386
result
385387
}
386388

387389
fn peek_evaluate(&self, group_index: usize) -> Result<ScalarValue> {
388-
self.states[group_index].accumulator.evaluate()
390+
self.accumulators[group_index].evaluate()
389391
}
390392

391393
// filtered_null_mask(opt_filter, &values);
392394
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
393-
let vec_size_pre = self.states.allocated_size();
394-
let states = emit_to.take_needed(&mut self.states);
395+
let vec_size_pre = self.accumulators.allocated_size();
396+
let accumulators = emit_to.take_needed(&mut self.accumulators);
397+
self.indices.truncate(self.accumulators.len());
395398

396399
// each accumulator produces a potential vector of values
397400
// which we need to form into columns
398401
let mut results: Vec<Vec<ScalarValue>> = vec![];
399402

400-
for mut state in states {
403+
for mut accumulator in accumulators {
401404
// self.free_allocation(state.size()); // TODO: Add Accumulator::size?
402-
let accumulator_state = state.accumulator.state()?;
405+
let accumulator_state = accumulator.state()?;
403406
results.resize_with(accumulator_state.len(), Vec::new);
404407
for (idx, state_val) in accumulator_state.into_iter().enumerate() {
405408
results[idx].push(state_val);
@@ -419,13 +422,13 @@ impl<AccumulatorType: Accumulator> GroupsAccumulator
419422
assert_eq!(arr.len(), first_col.len())
420423
}
421424
}
422-
self.adjust_allocation(vec_size_pre, self.states.allocated_size());
425+
self.adjust_allocation(vec_size_pre, self.accumulators.allocated_size());
423426

424427
Ok(arrays)
425428
}
426429

427430
fn peek_state(&self, group_index: usize) -> Result<SmallVec<[ScalarValue; 2]>> {
428-
self.states[group_index].accumulator.state()
431+
self.accumulators[group_index].state()
429432
}
430433

431434
fn merge_batch(

0 commit comments

Comments
 (0)