Skip to content

Commit 285c032

Browse files
committed
Add groups_accumulator.rs, with compilation fixed
1 parent b3acc9f commit 285c032

File tree

2 files changed

+261
-0
lines changed

2 files changed

+261
-0
lines changed
Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Vectorized [`GroupsAccumulator`]
19+
20+
use crate::error::{DataFusionError, Result};
21+
use arrow::array::{ArrayRef, BooleanArray};
22+
23+
/// From upstream: This replaces a datafusion_common::{not_impl_err} import.
24+
macro_rules! not_impl_err {
25+
( $x:expr ) => {
26+
Err(DataFusionError::NotImplemented($x.to_owned()))
27+
};
28+
}
29+
30+
/// Describes how many rows should be emitted during grouping.
31+
#[derive(Debug, Clone, Copy)]
32+
pub enum EmitTo {
33+
/// Emit all groups
34+
All,
35+
/// Emit only the first `n` groups and shift all existing group
36+
/// indexes down by `n`.
37+
///
38+
/// For example, if `n=10`, group_index `0, 1, ... 9` are emitted
39+
/// and group indexes `10, 11, 12, ...` become `0, 1, 2, ...`.
40+
First(usize),
41+
}
42+
43+
impl EmitTo {
44+
/// Removes the number of rows from `v` required to emit the right
45+
/// number of rows, returning a `Vec` with elements taken, and the
46+
/// remaining values in `v`.
47+
///
48+
/// This avoids copying if Self::All
49+
pub fn take_needed<T>(&self, v: &mut Vec<T>) -> Vec<T> {
50+
match self {
51+
Self::All => {
52+
// Take the entire vector, leave new (empty) vector
53+
std::mem::take(v)
54+
}
55+
Self::First(n) => {
56+
// get end n+1,.. values into t
57+
let mut t = v.split_off(*n);
58+
// leave n+1,.. in v
59+
std::mem::swap(v, &mut t);
60+
t
61+
}
62+
}
63+
}
64+
}
65+
66+
/// `GroupsAccumulator` implements a single aggregate (e.g. AVG) and
67+
/// stores the state for *all* groups internally.
68+
///
69+
/// Logically, a [`GroupsAccumulator`] stores a mapping from each group index to
70+
/// the state of the aggregate for that group. For example an implementation for
71+
/// `min` might look like
72+
///
73+
/// ```text
74+
/// ┌─────┐
75+
/// │ 0 │───────────▶ 100
76+
/// ├─────┤
77+
/// │ 1 │───────────▶ 200
78+
/// └─────┘
79+
/// ... ...
80+
/// ┌─────┐
81+
/// │ N-2 │───────────▶ 50
82+
/// ├─────┤
83+
/// │ N-1 │───────────▶ 200
84+
/// └─────┘
85+
///
86+
///
87+
/// Logical group Current Min
88+
/// number value for that
89+
/// group
90+
/// ```
91+
///
92+
/// # Notes on Implementing `GroupAccumulator`
93+
///
94+
/// All aggregates must first implement the simpler [`Accumulator`] trait, which
95+
/// handles state for a single group. Implementing `GroupsAccumulator` is
96+
/// optional and is harder to implement than `Accumulator`, but can be much
97+
/// faster for queries with many group values. See the [Aggregating Millions of
98+
/// Groups Fast blog] for more background.
99+
///
100+
/// [`NullState`] can help keep the state for groups that have not seen any
101+
/// values and produce the correct output for those groups.
102+
///
103+
/// [`NullState`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/struct.NullState.html
104+
///
105+
/// # Details
106+
/// Each group is assigned a `group_index` by the hash table and each
107+
/// accumulator manages the specific state, one per `group_index`.
108+
///
109+
/// `group_index`es are contiguous (there aren't gaps), and thus it is
110+
/// expected that each `GroupAccumulator` will use something like `Vec<..>`
111+
/// to store the group states.
112+
///
113+
/// [`Accumulator`]: crate::accumulator::Accumulator
114+
/// [Aggregating Millions of Groups Fast blog]: https://arrow.apache.org/blog/2023/08/05/datafusion_fast_grouping/
115+
pub trait GroupsAccumulator: Send {
116+
/// Updates the accumulator's state from its arguments, encoded as
117+
/// a vector of [`ArrayRef`]s.
118+
///
119+
/// * `values`: the input arguments to the accumulator
120+
///
121+
/// * `group_indices`: The group indices to which each row in `values` belongs.
122+
///
123+
/// * `opt_filter`: if present, only update aggregate state using
124+
/// `values[i]` if `opt_filter[i]` is true
125+
///
126+
/// * `total_num_groups`: the number of groups (the largest
127+
/// group_index is thus `total_num_groups - 1`).
128+
///
129+
/// Note that subsequent calls to update_batch may have larger
130+
/// total_num_groups as new groups are seen.
131+
///
132+
/// See [`NullState`] to help keep the state for groups that have not seen any
133+
/// values and produce the correct output for those groups.
134+
///
135+
/// [`NullState`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/struct.NullState.html
136+
fn update_batch(
137+
&mut self,
138+
values: &[ArrayRef],
139+
group_indices: &[usize],
140+
opt_filter: Option<&BooleanArray>,
141+
total_num_groups: usize,
142+
) -> Result<()>;
143+
144+
/// Returns the final aggregate value for each group as a single
145+
/// `RecordBatch`, resetting the internal state.
146+
///
147+
/// The rows returned *must* be in group_index order: The value
148+
/// for group_index 0, followed by 1, etc. Any group_index that
149+
/// did not have values, should be null.
150+
///
151+
/// For example, a `SUM` accumulator maintains a running sum for
152+
/// each group, and `evaluate` will produce that running sum as
153+
/// its output for all groups, in group_index order
154+
///
155+
/// If `emit_to` is [`EmitTo::All`], the accumulator should
156+
/// return all groups and release / reset its internal state
157+
/// equivalent to when it was first created.
158+
///
159+
/// If `emit_to` is [`EmitTo::First`], only the first `n` groups
160+
/// should be emitted and the state for those first groups
161+
/// removed. State for the remaining groups must be retained for
162+
/// future use. The group_indices on subsequent calls to
163+
/// `update_batch` or `merge_batch` will be shifted down by
164+
/// `n`. See [`EmitTo::First`] for more details.
165+
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef>;
166+
167+
/// Returns the intermediate aggregate state for this accumulator,
168+
/// used for multi-phase grouping, resetting its internal state.
169+
///
170+
/// See [`Accumulator::state`] for more information on multi-phase
171+
/// aggregation.
172+
///
173+
/// For example, `AVG` might return two arrays: `SUM` and `COUNT`
174+
/// but the `MIN` aggregate would just return a single array.
175+
///
176+
/// Note more sophisticated internal state can be passed as
177+
/// single `StructArray` rather than multiple arrays.
178+
///
179+
/// See [`Self::evaluate`] for details on the required output
180+
/// order and `emit_to`.
181+
///
182+
/// [`Accumulator::state`]: crate::accumulator::Accumulator::state
183+
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;
184+
185+
/// Merges intermediate state (the output from [`Self::state`])
186+
/// into this accumulator's current state.
187+
///
188+
/// For some aggregates (such as `SUM`), `merge_batch` is the same
189+
/// as `update_batch`, but for some aggregates (such as `COUNT`,
190+
/// where the partial counts must be summed) the operations
191+
/// differ. See [`Self::state`] for more details on how state is
192+
/// used and merged.
193+
///
194+
/// * `values`: arrays produced from previously calling `state` on other accumulators.
195+
///
196+
/// Other arguments are the same as for [`Self::update_batch`].
197+
fn merge_batch(
198+
&mut self,
199+
values: &[ArrayRef],
200+
group_indices: &[usize],
201+
opt_filter: Option<&BooleanArray>,
202+
total_num_groups: usize,
203+
) -> Result<()>;
204+
205+
/// Converts an input batch directly to the intermediate aggregate state.
206+
///
207+
/// This is the equivalent of treating each input row as its own group. It
208+
/// is invoked when the Partial phase of a multi-phase aggregation is not
209+
/// reducing the cardinality enough to warrant spending more effort on
210+
/// pre-aggregation (see `Background` section below), and switches to
211+
/// passing intermediate state directly on to the next aggregation phase.
212+
///
213+
/// Examples:
214+
/// * `COUNT`: an array of 1s for each row in the input batch.
215+
/// * `SUM/MIN/MAX`: the input values themselves.
216+
///
217+
/// # Arguments
218+
/// * `values`: the input arguments to the accumulator
219+
/// * `opt_filter`: if present, any row where `opt_filter[i]` is false should be ignored
220+
///
221+
/// # Background
222+
///
223+
/// In a multi-phase aggregation (see [`Accumulator::state`]), the initial
224+
/// Partial phase reduces the cardinality of the input data as soon as
225+
/// possible in the plan.
226+
///
227+
/// This strategy is very effective for queries with a small number of
228+
/// groups, as most of the data is aggregated immediately and only a small
229+
/// amount of data must be repartitioned (see [`Accumulator::state`] for
230+
/// background)
231+
///
232+
/// However, for queries with a large number of groups, the Partial phase
233+
/// often does not reduce the cardinality enough to warrant the memory and
234+
/// CPU cost of actually performing the aggregation. For such cases, the
235+
/// HashAggregate operator will dynamically switch to passing intermediate
236+
/// state directly to the next aggregation phase with minimal processing
237+
/// using this method.
238+
///
239+
/// [`Accumulator::state`]: crate::accumulator::Accumulator::state
240+
fn convert_to_state(
241+
&self,
242+
_values: &[ArrayRef],
243+
_opt_filter: Option<&BooleanArray>,
244+
) -> Result<Vec<ArrayRef>> {
245+
not_impl_err!("Input batch conversion to state not implemented")
246+
}
247+
248+
/// Returns `true` if [`Self::convert_to_state`] is implemented to support
249+
/// intermediate aggregate state conversion.
250+
fn supports_convert_to_state(&self) -> bool {
251+
false
252+
}
253+
254+
/// Amount of memory used to store the state of this accumulator,
255+
/// in bytes.
256+
///
257+
/// This function is called once per batch, so it should be `O(n)` to
258+
/// compute, not `O(num_groups)`
259+
fn size(&self) -> usize;
260+
}

datafusion/src/physical_plan/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,7 @@ pub mod expressions;
638638
pub mod filter;
639639
pub mod functions;
640640
pub mod group_scalar;
641+
pub mod groups_accumulator;
641642
pub mod hash_aggregate;
642643
pub mod hash_join;
643644
pub mod hash_utils;

0 commit comments

Comments
 (0)