diff --git a/datafusion/src/cube_ext/joinagg.rs b/datafusion/src/cube_ext/joinagg.rs index 39287ead4170..8953aa5cfafe 100644 --- a/datafusion/src/cube_ext/joinagg.rs +++ b/datafusion/src/cube_ext/joinagg.rs @@ -25,7 +25,7 @@ use crate::execution::context::{ExecutionContextState, ExecutionProps}; use crate::logical_plan::{DFSchemaRef, Expr, LogicalPlan, UserDefinedLogicalNode}; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::utils::from_plan; -use crate::physical_plan::hash_aggregate::{Accumulators, AggregateMode}; +use crate::physical_plan::hash_aggregate::{create_accumulation_state, AggregateMode}; use crate::physical_plan::planner::{physical_name, ExtensionPlanner}; use crate::physical_plan::{hash_aggregate, PhysicalPlanner}; use crate::physical_plan::{ @@ -245,7 +245,7 @@ impl ExecutionPlan for CrossJoinAggExec { &AggregateMode::Full, self.group_expr.len(), )?; - let mut accumulators = Accumulators::new(); + let mut accumulators = create_accumulation_state(&self.agg_expr)?; for partition in 0..self.join.right.output_partitioning().partition_count() { let mut batches = self.join.right.execute(partition).await?; while let Some(right) = batches.next().await { diff --git a/datafusion/src/physical_plan/expressions/average.rs b/datafusion/src/physical_plan/expressions/average.rs index 88c8f754fb68..96f908a06d24 100644 --- a/datafusion/src/physical_plan/expressions/average.rs +++ b/datafusion/src/physical_plan/expressions/average.rs @@ -22,6 +22,8 @@ use std::convert::TryFrom; use std::sync::Arc; use crate::error::{DataFusionError, Result}; +use crate::physical_plan::groups_accumulator::GroupsAccumulator; +use crate::physical_plan::groups_accumulator_flat_adapter::GroupsAccumulatorFlatAdapter; use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr}; use crate::scalar::ScalarValue; use arrow::compute; @@ -112,6 +114,23 @@ impl AggregateExpr for Avg { )?)) } + fn uses_groups_accumulator(&self) -> bool { + return true; + } + + /// the groups accumulator used to accumulate values from the expression. If this returns None, + /// create_accumulator must be used. + fn create_groups_accumulator( + &self, + ) -> arrow::error::Result>> { + Ok(Some(Box::new( + GroupsAccumulatorFlatAdapter::::new(|| { + // avg is f64 (as in create_accumulator) + AvgAccumulator::try_new(&DataType::Float64) + }), + ))) + } + fn expressions(&self) -> Vec> { vec![self.expr.clone()] } diff --git a/datafusion/src/physical_plan/expressions/sum.rs b/datafusion/src/physical_plan/expressions/sum.rs index cee4b41bf953..a3a9a5c73c6d 100644 --- a/datafusion/src/physical_plan/expressions/sum.rs +++ b/datafusion/src/physical_plan/expressions/sum.rs @@ -22,6 +22,8 @@ use std::convert::TryFrom; use std::sync::Arc; use crate::error::{DataFusionError, Result}; +use crate::physical_plan::groups_accumulator::GroupsAccumulator; +use crate::physical_plan::groups_accumulator_flat_adapter::GroupsAccumulatorFlatAdapter; use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr}; use crate::scalar::ScalarValue; use arrow::compute; @@ -42,7 +44,7 @@ use super::format_state_name; use smallvec::smallvec; use smallvec::SmallVec; -// SUM aggregate expression +/// SUM aggregate expression #[derive(Debug)] pub struct Sum { name: String, @@ -118,6 +120,23 @@ impl AggregateExpr for Sum { Ok(Box::new(SumAccumulator::try_new(&self.data_type)?)) } + fn uses_groups_accumulator(&self) -> bool { + return true; + } + + /// the groups accumulator used to accumulate values from the expression. If this returns None, + /// create_accumulator must be used. + fn create_groups_accumulator( + &self, + ) -> arrow::error::Result>> { + let data_type = self.data_type.clone(); + Ok(Some(Box::new( + GroupsAccumulatorFlatAdapter::::new(move || { + SumAccumulator::try_new(&data_type) + }), + ))) + } + fn name(&self) -> &str { &self.name } diff --git a/datafusion/src/physical_plan/groups_accumulator.rs b/datafusion/src/physical_plan/groups_accumulator.rs new file mode 100644 index 000000000000..dfd8aaa5493c --- /dev/null +++ b/datafusion/src/physical_plan/groups_accumulator.rs @@ -0,0 +1,313 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Vectorized [`GroupsAccumulator`] + +use crate::error::{DataFusionError, Result}; +use crate::scalar::ScalarValue; +use arrow::array::{ArrayRef, BooleanArray}; +use smallvec::SmallVec; + +/// From upstream: This replaces a datafusion_common::{not_impl_err} import. +macro_rules! not_impl_err { + ( $x:expr ) => { + Err(DataFusionError::NotImplemented($x.to_owned())) + }; +} + +// TODO: Probably drop the #[macro_export] that was copy/pasted in +// From upstream datafusion. We don't pass the backtrace: +/// Exposes a macro to create `DataFusionError::ArrowError` with optional backtrace +#[macro_export] +macro_rules! arrow_datafusion_err { + ($ERR:expr) => { + DataFusionError::ArrowError( + $ERR, /* , Some(DataFusionError::get_back_trace() */ + ) + }; +} + +/// Describes how many rows should be emitted during grouping. +#[derive(Debug, Clone, Copy)] +pub enum EmitTo { + /// Emit all groups + All, + /// Emit only the first `n` groups and shift all existing group + /// indexes down by `n`. + /// + /// For example, if `n=10`, group_index `0, 1, ... 9` are emitted + /// and group indexes `10, 11, 12, ...` become `0, 1, 2, ...`. + First(usize), +} + +impl EmitTo { + /// Removes the number of rows from `v` required to emit the right + /// number of rows, returning a `Vec` with elements taken, and the + /// remaining values in `v`. + /// + /// This avoids copying if Self::All + pub fn take_needed(&self, v: &mut Vec) -> Vec { + match self { + Self::All => { + // Take the entire vector, leave new (empty) vector + std::mem::take(v) + } + Self::First(n) => { + // get end n+1,.. values into t + let mut t = v.split_off(*n); + // leave n+1,.. in v + std::mem::swap(v, &mut t); + t + } + } + } +} + +/// `GroupsAccumulator` implements a single aggregate (e.g. AVG) and +/// stores the state for *all* groups internally. +/// +/// Logically, a [`GroupsAccumulator`] stores a mapping from each group index to +/// the state of the aggregate for that group. For example an implementation for +/// `min` might look like +/// +/// ```text +/// ┌─────┐ +/// │ 0 │───────────▶ 100 +/// ├─────┤ +/// │ 1 │───────────▶ 200 +/// └─────┘ +/// ... ... +/// ┌─────┐ +/// │ N-2 │───────────▶ 50 +/// ├─────┤ +/// │ N-1 │───────────▶ 200 +/// └─────┘ +/// +/// +/// Logical group Current Min +/// number value for that +/// group +/// ``` +/// +/// # Notes on Implementing `GroupAccumulator` +/// +/// All aggregates must first implement the simpler [`Accumulator`] trait, which +/// handles state for a single group. Implementing `GroupsAccumulator` is +/// optional and is harder to implement than `Accumulator`, but can be much +/// faster for queries with many group values. See the [Aggregating Millions of +/// Groups Fast blog] for more background. +/// +/// [`NullState`] can help keep the state for groups that have not seen any +/// values and produce the correct output for those groups. +/// +/// [`NullState`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/struct.NullState.html +/// +/// # Details +/// Each group is assigned a `group_index` by the hash table and each +/// accumulator manages the specific state, one per `group_index`. +/// +/// `group_index`es are contiguous (there aren't gaps), and thus it is +/// expected that each `GroupAccumulator` will use something like `Vec<..>` +/// to store the group states. +/// +/// [`Accumulator`]: crate::accumulator::Accumulator +/// [Aggregating Millions of Groups Fast blog]: https://arrow.apache.org/blog/2023/08/05/datafusion_fast_grouping/ +pub trait GroupsAccumulator: Send { + /// Updates the accumulator's state from its arguments, encoded as + /// a vector of [`ArrayRef`]s. + /// + /// * `values`: the input arguments to the accumulator + /// + /// * `group_indices`: The group indices to which each row in `values` belongs. + /// + /// * `opt_filter`: if present, only update aggregate state using + /// `values[i]` if `opt_filter[i]` is true + /// + /// * `total_num_groups`: the number of groups (the largest + /// group_index is thus `total_num_groups - 1`). + /// + /// Note that subsequent calls to update_batch may have larger + /// total_num_groups as new groups are seen. + /// + /// See [`NullState`] to help keep the state for groups that have not seen any + /// values and produce the correct output for those groups. + /// + /// [`NullState`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/struct.NullState.html + fn update_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()>; + + /// update_batch but where group_indices is already clumped into groups. `offsets` has the + /// group boundaries, and note that `offsets[0] == 0` (and the last offset is + /// `group_indices.len()`). So offsets[i] .. offsets[i + 1] is a half-open interval of equal + /// group indices. + fn update_batch_preordered( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + _offsets: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + // Extremely wasteful: + self.update_batch(values, group_indices, opt_filter, total_num_groups) + } + + /// Returns the final aggregate value for each group as a single + /// `RecordBatch`, resetting the internal state. + /// + /// The rows returned *must* be in group_index order: The value + /// for group_index 0, followed by 1, etc. Any group_index that + /// did not have values, should be null. + /// + /// For example, a `SUM` accumulator maintains a running sum for + /// each group, and `evaluate` will produce that running sum as + /// its output for all groups, in group_index order + /// + /// If `emit_to` is [`EmitTo::All`], the accumulator should + /// return all groups and release / reset its internal state + /// equivalent to when it was first created. + /// + /// If `emit_to` is [`EmitTo::First`], only the first `n` groups + /// should be emitted and the state for those first groups + /// removed. State for the remaining groups must be retained for + /// future use. The group_indices on subsequent calls to + /// `update_batch` or `merge_batch` will be shifted down by + /// `n`. See [`EmitTo::First`] for more details. + fn evaluate(&mut self, emit_to: EmitTo) -> Result; + + // TODO: Remove this? + /// evaluate for a particular group index. + fn peek_evaluate(&self, group_index: usize) -> Result; + + /// Returns the intermediate aggregate state for this accumulator, + /// used for multi-phase grouping, resetting its internal state. + /// + /// See [`Accumulator::state`] for more information on multi-phase + /// aggregation. + /// + /// For example, `AVG` might return two arrays: `SUM` and `COUNT` + /// but the `MIN` aggregate would just return a single array. + /// + /// Note more sophisticated internal state can be passed as + /// single `StructArray` rather than multiple arrays. + /// + /// See [`Self::evaluate`] for details on the required output + /// order and `emit_to`. + /// + /// [`Accumulator::state`]: crate::accumulator::Accumulator::state + fn state(&mut self, emit_to: EmitTo) -> Result>; + + // TODO: Remove this? + /// Looks at the state for a particular group index. + fn peek_state(&self, group_index: usize) -> Result>; + + /// Merges intermediate state (the output from [`Self::state`]) + /// into this accumulator's current state. + /// + /// For some aggregates (such as `SUM`), `merge_batch` is the same + /// as `update_batch`, but for some aggregates (such as `COUNT`, + /// where the partial counts must be summed) the operations + /// differ. See [`Self::state`] for more details on how state is + /// used and merged. + /// + /// * `values`: arrays produced from previously calling `state` on other accumulators. + /// + /// Other arguments are the same as for [`Self::update_batch`]. + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()>; + + /// merge_batch but where group_indices is already ordered into adjacent groups. `offsets` has + /// the group boundaries, and note that `offsets[0] == 0` (and the last offset is + /// `group_indices.len()`). + fn merge_batch_preordered( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + _offsets: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + // Extremly wasteful: + self.merge_batch(values, group_indices, opt_filter, total_num_groups) + } + + /// Converts an input batch directly to the intermediate aggregate state. + /// + /// This is the equivalent of treating each input row as its own group. It + /// is invoked when the Partial phase of a multi-phase aggregation is not + /// reducing the cardinality enough to warrant spending more effort on + /// pre-aggregation (see `Background` section below), and switches to + /// passing intermediate state directly on to the next aggregation phase. + /// + /// Examples: + /// * `COUNT`: an array of 1s for each row in the input batch. + /// * `SUM/MIN/MAX`: the input values themselves. + /// + /// # Arguments + /// * `values`: the input arguments to the accumulator + /// * `opt_filter`: if present, any row where `opt_filter[i]` is false should be ignored + /// + /// # Background + /// + /// In a multi-phase aggregation (see [`Accumulator::state`]), the initial + /// Partial phase reduces the cardinality of the input data as soon as + /// possible in the plan. + /// + /// This strategy is very effective for queries with a small number of + /// groups, as most of the data is aggregated immediately and only a small + /// amount of data must be repartitioned (see [`Accumulator::state`] for + /// background) + /// + /// However, for queries with a large number of groups, the Partial phase + /// often does not reduce the cardinality enough to warrant the memory and + /// CPU cost of actually performing the aggregation. For such cases, the + /// HashAggregate operator will dynamically switch to passing intermediate + /// state directly to the next aggregation phase with minimal processing + /// using this method. + /// + /// [`Accumulator::state`]: crate::accumulator::Accumulator::state + fn convert_to_state( + &self, + _values: &[ArrayRef], + _opt_filter: Option<&BooleanArray>, + ) -> Result> { + not_impl_err!("Input batch conversion to state not implemented") + } + + /// Returns `true` if [`Self::convert_to_state`] is implemented to support + /// intermediate aggregate state conversion. + fn supports_convert_to_state(&self) -> bool { + false + } + + /// Amount of memory used to store the state of this accumulator, + /// in bytes. + /// + /// This function is called once per batch, so it should be `O(n)` to + /// compute, not `O(num_groups)` + fn size(&self) -> usize; +} diff --git a/datafusion/src/physical_plan/groups_accumulator_adapter.rs b/datafusion/src/physical_plan/groups_accumulator_adapter.rs new file mode 100644 index 000000000000..5b2f62f8c9e5 --- /dev/null +++ b/datafusion/src/physical_plan/groups_accumulator_adapter.rs @@ -0,0 +1,525 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utilities for implementing GroupsAccumulator +//! Adapter that makes [`GroupsAccumulator`] out of [`Accumulator`] + +use std::mem::size_of; +// use std::mem::size_of_val; // TODO: Remove commented Accumulator::size() code? + +use crate::arrow_datafusion_err; +use crate::error::{DataFusionError, Result}; +use crate::physical_plan::groups_accumulator::{EmitTo, GroupsAccumulator}; +use crate::physical_plan::Accumulator; +use crate::scalar::ScalarValue; +use arrow::array::{as_boolean_array, new_empty_array, Array}; +use arrow::compute::take_arrays; +use arrow::{ + array::{ArrayRef, /* AsArray, */ BooleanArray, PrimitiveArray}, + compute, + datatypes::UInt32Type, +}; +use smallvec::SmallVec; + +/// An adapter that implements [`GroupsAccumulator`] for any [`Accumulator`] +/// +/// While [`Accumulator`] are simpler to implement and can support +/// more general calculations (like retractable window functions), +/// they are not as fast as a specialized `GroupsAccumulator`. This +/// interface bridges the gap so the group by operator only operates +/// in terms of [`Accumulator`]. +/// +/// Internally, this adapter creates a new [`Accumulator`] for each group which +/// stores the state for that group. This both requires an allocation for each +/// Accumulator, internal indices, as well as whatever internal allocations the +/// Accumulator itself requires. +/// +/// For example, a `MinAccumulator` that computes the minimum string value with +/// a [`ScalarValue::Utf8`]. That will require at least two allocations per group +/// (one for the `MinAccumulator` and one for the `ScalarValue::Utf8`). +/// +/// ```text +/// ┌─────────────────────────────────┐ +/// │MinAccumulator { │ +/// ┌─────▶│ min: ScalarValue::Utf8("A") │───────┐ +/// │ │} │ │ +/// │ └─────────────────────────────────┘ └───────▶ "A" +/// ┌─────┐ │ ┌─────────────────────────────────┐ +/// │ 0 │─────┘ │MinAccumulator { │ +/// ├─────┤ ┌─────▶│ min: ScalarValue::Utf8("Z") │───────────────▶ "Z" +/// │ 1 │─────┘ │} │ +/// └─────┘ └─────────────────────────────────┘ ... +/// ... ... +/// ┌─────┐ ┌────────────────────────────────┐ +/// │ N-2 │ │MinAccumulator { │ +/// ├─────┤ │ min: ScalarValue::Utf8("A") │────────────────▶ "A" +/// │ N-1 │─────┐ │} │ +/// └─────┘ │ └────────────────────────────────┘ +/// │ ┌────────────────────────────────┐ ┌───────▶ "Q" +/// │ │MinAccumulator { │ │ +/// └─────▶│ min: ScalarValue::Utf8("Q") │────────┘ +/// │} │ +/// └────────────────────────────────┘ +/// +/// +/// Logical group Current Min/Max value for that group stored +/// number as a ScalarValue which points to an +/// indivdually allocated String +/// +///``` +/// +/// # Optimizations +/// +/// The adapter minimizes the number of calls to [`Accumulator::update_batch`] +/// by first collecting the input rows for each group into a contiguous array +/// using [`compute::take`] +/// +pub struct GroupsAccumulatorAdapter { + factory: Box Result> + Send>, + + /// state for each group, stored in group_index order + states: Vec, + + // TODO: Code maintaining this is commented. + /// Current memory usage, in bytes. + /// + /// Note this is incrementally updated with deltas to avoid the + /// call to size() being a bottleneck. We saw size() being a + /// bottleneck in earlier implementations when there were many + /// distinct groups. + allocation_bytes: usize, +} + +struct AccumulatorState { + /// [`Accumulator`] that stores the per-group state + accumulator: Box, + + /// scratch space: indexes in the input array that will be fed to + /// this accumulator. Stores indexes as `u32` to match the arrow + /// `take` kernel input. + indices: Vec, +} + +impl AccumulatorState { + fn new(accumulator: Box) -> Self { + Self { + accumulator, + indices: vec![], + } + } + + /* TODO: Add Accumulator::size? + /// Returns the amount of memory taken by this structure and its accumulator + fn size(&self) -> usize { + self.accumulator.size() + size_of_val(self) + self.indices.allocated_size() + } + */ +} + +impl GroupsAccumulatorAdapter { + /// Create a new adapter that will create a new [`Accumulator`] + /// for each group, using the specified factory function + pub fn new(factory: F) -> Self + where + F: Fn() -> Result> + Send + 'static, + { + Self { + factory: Box::new(factory), + states: vec![], + allocation_bytes: 0, + } + } + + /// Ensure that self.accumulators has total_num_groups + fn make_accumulators_if_needed(&mut self, total_num_groups: usize) -> Result<()> { + // can't shrink + assert!(total_num_groups >= self.states.len()); + let vec_size_pre = self.states.allocated_size(); + + // instantiate new accumulators + let new_accumulators = total_num_groups - self.states.len(); + for _ in 0..new_accumulators { + let accumulator = (self.factory)()?; + let state = AccumulatorState::new(accumulator); + // TODO: Add Accumulator::size()? + // self.add_allocation(state.size()); + self.states.push(state); + } + + self.adjust_allocation(vec_size_pre, self.states.allocated_size()); + Ok(()) + } + + /// invokes f(accumulator, values) for each group that has values + /// in group_indices. + /// + /// This function first reorders the input and filter so that + /// values for each group_index are contiguous and then invokes f + /// on the contiguous ranges, to minimize per-row overhead + /// + /// ```text + /// ┌─────────┐ ┌─────────┐ ┌ ─ ─ ─ ─ ┐ ┌─────────┐ ┌ ─ ─ ─ ─ ┐ + /// │ ┌─────┐ │ │ ┌─────┐ │ ┌─────┐ ┏━━━━━┓ │ ┌─────┐ │ ┌─────┐ + /// │ │ 2 │ │ │ │ 200 │ │ │ │ t │ │ ┃ 0 ┃ │ │ 200 │ │ │ │ t │ │ + /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤ + /// │ │ 2 │ │ │ │ 100 │ │ │ │ f │ │ ┃ 0 ┃ │ │ 300 │ │ │ │ t │ │ + /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤ + /// │ │ 0 │ │ │ │ 200 │ │ │ │ t │ │ ┃ 1 ┃ │ │ 200 │ │ │ │NULL │ │ + /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ────────▶ ┣━━━━━┫ │ ├─────┤ │ ├─────┤ + /// │ │ 1 │ │ │ │ 200 │ │ │ │NULL │ │ ┃ 2 ┃ │ │ 200 │ │ │ │ t │ │ + /// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤ + /// │ │ 0 │ │ │ │ 300 │ │ │ │ t │ │ ┃ 2 ┃ │ │ 100 │ │ │ │ f │ │ + /// │ └─────┘ │ │ └─────┘ │ └─────┘ ┗━━━━━┛ │ └─────┘ │ └─────┘ + /// └─────────┘ └─────────┘ └ ─ ─ ─ ─ ┘ └─────────┘ └ ─ ─ ─ ─ ┘ + /// + /// logical group values opt_filter logical group values opt_filter + /// + /// ``` + fn invoke_per_accumulator( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + f: F, + ) -> Result<()> + where + F: Fn(&mut dyn Accumulator, &[ArrayRef]) -> Result<()>, + { + self.make_accumulators_if_needed(total_num_groups)?; + + assert_eq!( + values[0].len(), + group_indices.len(), + "asserting values[0].len() == group_indices.len()" + ); + + // figure out which input rows correspond to which groups. + // Note that self.state.indices starts empty for all groups + // (it is cleared out below) + for (idx, group_index) in group_indices.iter().enumerate() { + self.states[*group_index].indices.push(idx as u32); + } + + // groups_with_rows holds a list of group indexes that have + // any rows that need to be accumulated, stored in order of + // group_index + + let mut groups_with_rows = vec![]; + + // batch_indices holds indices into values, each group is contiguous + let mut batch_indices = vec![]; + + // offsets[i] is index into batch_indices where the rows for + // group_index i starts + let mut offsets = vec![0]; + + let mut offset_so_far = 0; + for (group_index, state) in self.states.iter_mut().enumerate() { + let indices = &state.indices; + if indices.is_empty() { + continue; + } + + groups_with_rows.push(group_index); + batch_indices.extend_from_slice(indices); + offset_so_far += indices.len(); + offsets.push(offset_so_far); + } + let batch_indices = batch_indices.into(); + + // reorder the values and opt_filter by batch_indices so that + // all values for each group are contiguous, then invoke the + // accumulator once per group with values + let values = take_arrays(values, &batch_indices, None)?; + let opt_filter = get_filter_at_indices(opt_filter, &batch_indices)?; + + // invoke each accumulator with the appropriate rows, first + // pulling the input arguments for this group into their own + // RecordBatch(es) + let iter = groups_with_rows.iter().zip(offsets.windows(2)); + + // let mut sizes_pre = 0; + // let mut sizes_post = 0; + for (&group_idx, offsets) in iter { + let state = &mut self.states[group_idx]; + // sizes_pre += state.size(); // TODO: Add Accumulator::size? + + let values_to_accumulate = slice_and_maybe_filter( + &values, + opt_filter.as_ref().map(|f| as_boolean_array(f)), + offsets, + )?; + f(state.accumulator.as_mut(), &values_to_accumulate)?; + + // clear out the state so they are empty for next + // iteration + state.indices.clear(); + // sizes_post += state.size(); // TODO: Add Accumulator::size? + } + + // self.adjust_allocation(sizes_pre, sizes_post); // TODO: Add Accumulator::size? + Ok(()) + } + + /// Increment the allocation by `n` + /// + /// See [`Self::allocation_bytes`] for rationale. + fn add_allocation(&mut self, size: usize) { + self.allocation_bytes += size; + } + + /// Decrease the allocation by `n` + /// + /// See [`Self::allocation_bytes`] for rationale. + fn free_allocation(&mut self, size: usize) { + // use saturating sub to avoid errors if the accumulators + // report erronious sizes + self.allocation_bytes = self.allocation_bytes.saturating_sub(size) + } + + /// Adjusts the allocation for something that started with + /// start_size and now has new_size avoiding overflow + /// + /// See [`Self::allocation_bytes`] for rationale. + fn adjust_allocation(&mut self, old_size: usize, new_size: usize) { + if new_size > old_size { + self.add_allocation(new_size - old_size) + } else { + self.free_allocation(old_size - new_size) + } + } +} + +impl GroupsAccumulator for GroupsAccumulatorAdapter { + fn update_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + self.invoke_per_accumulator( + values, + group_indices, + opt_filter, + total_num_groups, + |accumulator, values_to_accumulate| { + accumulator.update_batch(values_to_accumulate) + }, + )?; + Ok(()) + } + + fn evaluate(&mut self, emit_to: EmitTo) -> Result { + let vec_size_pre = self.states.allocated_size(); + + let states = emit_to.take_needed(&mut self.states); + + let results: Vec = states + .into_iter() + .map(|state| { + // self.free_allocation(state.size()); // TODO: Add Accumulator::size? + state.accumulator.evaluate() + }) + .collect::>()?; + + let result = ScalarValue::iter_to_array(results); + + self.adjust_allocation(vec_size_pre, self.states.allocated_size()); + + result + } + + fn peek_evaluate(&self, group_index: usize) -> Result { + self.states[group_index].accumulator.evaluate() + } + + // filtered_null_mask(opt_filter, &values); + fn state(&mut self, emit_to: EmitTo) -> Result> { + let vec_size_pre = self.states.allocated_size(); + let states = emit_to.take_needed(&mut self.states); + + // each accumulator produces a potential vector of values + // which we need to form into columns + let mut results: Vec> = vec![]; + + for state in states { + // self.free_allocation(state.size()); // TODO: Add Accumulator::size? + let accumulator_state = state.accumulator.state()?; + results.resize_with(accumulator_state.len(), Vec::new); + for (idx, state_val) in accumulator_state.into_iter().enumerate() { + results[idx].push(state_val); + } + } + + // create an array for each intermediate column + let arrays = results + .into_iter() + .map(ScalarValue::iter_to_array) + .collect::>>()?; + + // double check each array has the same length (aka the + // accumulator was implemented correctly + if let Some(first_col) = arrays.first() { + for arr in &arrays { + assert_eq!(arr.len(), first_col.len()) + } + } + self.adjust_allocation(vec_size_pre, self.states.allocated_size()); + + Ok(arrays) + } + + fn peek_state(&self, group_index: usize) -> Result> { + self.states[group_index].accumulator.state() + } + + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + self.invoke_per_accumulator( + values, + group_indices, + opt_filter, + total_num_groups, + |accumulator, values_to_accumulate| { + accumulator.merge_batch(values_to_accumulate)?; + Ok(()) + }, + )?; + Ok(()) + } + + fn size(&self) -> usize { + self.allocation_bytes + } + + fn convert_to_state( + &self, + values: &[ArrayRef], + opt_filter: Option<&BooleanArray>, + ) -> Result> { + let num_rows = values[0].len(); + + // If there are no rows, return empty arrays + if num_rows == 0 { + // create empty accumulator to get the state types + let empty_state = (self.factory)()?.state()?; + let empty_arrays = empty_state + .into_iter() + .map(|state_val| new_empty_array(&state_val.get_datatype())) + .collect::>(); + + return Ok(empty_arrays); + } + + // Each row has its respective group + let mut results = vec![]; + for row_idx in 0..num_rows { + // Create the empty accumulator for converting + let mut converted_accumulator = (self.factory)()?; + + // Convert row to states + let values_to_accumulate = + slice_and_maybe_filter(values, opt_filter, &[row_idx, row_idx + 1])?; + converted_accumulator.update_batch(&values_to_accumulate)?; + let states = converted_accumulator.state()?; + + // Resize results to have enough columns according to the converted states + results.resize_with(states.len(), || Vec::with_capacity(num_rows)); + + // Add the states to results + for (idx, state_val) in states.into_iter().enumerate() { + results[idx].push(state_val); + } + } + + let arrays = results + .into_iter() + .map(ScalarValue::iter_to_array) + .collect::>>()?; + + Ok(arrays) + } + + fn supports_convert_to_state(&self) -> bool { + true + } +} + +/// Extension trait for [`Vec`] to account for allocations. +pub trait VecAllocExt { + /// Item type. + type T; + /// Return the amount of memory allocated by this Vec (not + /// recursively counting any heap allocations contained within the + /// structure). Does not include the size of `self` + fn allocated_size(&self) -> usize; +} + +impl VecAllocExt for Vec { + type T = T; + fn allocated_size(&self) -> usize { + size_of::() * self.capacity() + } +} + +fn get_filter_at_indices( + opt_filter: Option<&BooleanArray>, + indices: &PrimitiveArray, +) -> Result> { + opt_filter + .map(|filter| { + compute::take( + filter, indices, None, // None: no index check + ) + }) + .transpose() + .map_err(|e| arrow_datafusion_err!(e)) +} + +// Copied from physical-plan +pub(crate) fn slice_and_maybe_filter( + aggr_array: &[ArrayRef], + filter_opt: Option<&BooleanArray>, + offsets: &[usize], +) -> Result> { + let (offset, length) = (offsets[0], offsets[1] - offsets[0]); + let sliced_arrays: Vec = aggr_array + .iter() + .map(|array| array.slice(offset, length)) + .collect(); + + if let Some(f) = filter_opt { + let filter = f.slice(offset, length); + + sliced_arrays + .iter() + .map(|array| { + compute::filter(array.as_ref(), as_boolean_array(&filter)) + .map_err(|e| arrow_datafusion_err!(e)) + }) + .collect() + } else { + Ok(sliced_arrays) + } +} diff --git a/datafusion/src/physical_plan/groups_accumulator_flat_adapter.rs b/datafusion/src/physical_plan/groups_accumulator_flat_adapter.rs new file mode 100644 index 000000000000..611e3c259e98 --- /dev/null +++ b/datafusion/src/physical_plan/groups_accumulator_flat_adapter.rs @@ -0,0 +1,590 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utilities for implementing GroupsAccumulator +//! Adapter that makes [`GroupsAccumulator`] out of [`Accumulator`] + +use std::mem::size_of; +// use std::mem::size_of_val; // TODO: Remove commented Accumulator::size() code? + +use crate::arrow_datafusion_err; +use crate::error::{DataFusionError, Result}; +use crate::physical_plan::groups_accumulator::{EmitTo, GroupsAccumulator}; +use crate::physical_plan::Accumulator; +use crate::scalar::ScalarValue; +use arrow::array::{as_boolean_array, new_empty_array, Array}; +use arrow::compute::take_arrays; +use arrow::{ + array::{ArrayRef, /* AsArray, */ BooleanArray, PrimitiveArray}, + compute, + datatypes::UInt32Type, +}; +use smallvec::smallvec; +use smallvec::SmallVec; + +/// An adapter that implements [`GroupsAccumulator`] for any [`Accumulator`] +/// +/// While [`Accumulator`] are simpler to implement and can support +/// more general calculations (like retractable window functions), +/// they are not as fast as a specialized `GroupsAccumulator`. This +/// interface bridges the gap so the group by operator only operates +/// in terms of [`Accumulator`]. +/// +/// Internally, this adapter creates a new [`Accumulator`] for each group which +/// stores the state for that group. This both requires an allocation for each +/// Accumulator, internal indices, as well as whatever internal allocations the +/// Accumulator itself requires. +/// +/// For example, a `MinAccumulator` that computes the minimum string value with +/// a [`ScalarValue::Utf8`]. That will require at least two allocations per group +/// (one for the `MinAccumulator` and one for the `ScalarValue::Utf8`). +/// +/// ```text +/// ┌─────────────────────────────────┐ +/// │MinAccumulator { │ +/// ┌─────▶│ min: ScalarValue::Utf8("A") │───────┐ +/// │ │} │ │ +/// │ └─────────────────────────────────┘ └───────▶ "A" +/// ┌─────┐ │ ┌─────────────────────────────────┐ +/// │ 0 │─────┘ │MinAccumulator { │ +/// ├─────┤ ┌─────▶│ min: ScalarValue::Utf8("Z") │───────────────▶ "Z" +/// │ 1 │─────┘ │} │ +/// └─────┘ └─────────────────────────────────┘ ... +/// ... ... +/// ┌─────┐ ┌────────────────────────────────┐ +/// │ N-2 │ │MinAccumulator { │ +/// ├─────┤ │ min: ScalarValue::Utf8("A") │────────────────▶ "A" +/// │ N-1 │─────┐ │} │ +/// └─────┘ │ └────────────────────────────────┘ +/// │ ┌────────────────────────────────┐ ┌───────▶ "Q" +/// │ │MinAccumulator { │ │ +/// └─────▶│ min: ScalarValue::Utf8("Q") │────────┘ +/// │} │ +/// └────────────────────────────────┘ +/// +/// +/// Logical group Current Min/Max value for that group stored +/// number as a ScalarValue which points to an +/// indivdually allocated String +/// +///``` +/// +/// # Optimizations +/// +/// The adapter minimizes the number of calls to [`Accumulator::update_batch`] +/// by first collecting the input rows for each group into a contiguous array +/// using [`compute::take`] +/// +pub struct GroupsAccumulatorFlatAdapter { + factory: Box Result + Send>, + + /// [`Accumulators`] that store the per-group state + accumulators: Vec, + + /// scratch space: indexes in the input array that will be fed to this accumulator (this is a + /// parallel array to accumulators). Stores indexes as `u32` to match the arrow `take` kernel + /// input. Note that we have indices.len() <= accumulators.len(); it only gets extended when + /// used. + indices: Vec>, + + // TODO: Code maintaining this is commented. + /// Current memory usage, in bytes. + /// + /// Note this is incrementally updated with deltas to avoid the + /// call to size() being a bottleneck. We saw size() being a + /// bottleneck in earlier implementations when there were many + /// distinct groups. + allocation_bytes: usize, +} + +// TODO: Remove this or remember to account for accumulators and indices in a size() calculation. +// impl AccumulatorState { +/* TODO: Add Accumulator::size? +/// Returns the amount of memory taken by this structure and its accumulator +fn size(&self) -> usize { + self.accumulator.size() + size_of_val(self) + self.indices.allocated_size() +} +*/ +// } + +impl GroupsAccumulatorFlatAdapter { + /// Create a new adapter that will create a new [`Accumulator`] + /// for each group, using the specified factory function + pub fn new(factory: F) -> Self + where + F: Fn() -> Result + Send + 'static, + { + Self { + factory: Box::new(factory), + accumulators: vec![], + indices: vec![], + allocation_bytes: 0, + } + } + + /// Ensure that self.accumulators has total_num_groups + fn make_accumulators_if_needed(&mut self, total_num_groups: usize) -> Result<()> { + // can't shrink + assert!(total_num_groups >= self.accumulators.len()); + let vec_size_pre = self.accumulators.allocated_size(); + + // instantiate new accumulators + let new_accumulators = total_num_groups - self.accumulators.len(); + for _ in 0..new_accumulators { + let accumulator = (self.factory)()?; + // TODO: Add Accumulator::size()? + // self.add_allocation(state.size()); + self.accumulators.push(accumulator); + } + + self.adjust_allocation(vec_size_pre, self.accumulators.allocated_size()); + Ok(()) + } + + fn make_indices_if_needed(&mut self, total_num_groups: usize) -> Result<()> { + // can't shrink + assert!(total_num_groups >= self.indices.len()); + let vec_size_pre = self.indices.allocated_size(); + + // instantiate new indices + self.indices.resize(total_num_groups, smallvec![]); + + self.adjust_allocation(vec_size_pre, self.indices.allocated_size()); + Ok(()) + } + + /// invokes f(accumulator, values) for each group that has values in group_indices, but unlike + /// invoke_per_accumulator, the group_indices are already clumped together in intervals + /// [offsets[i], offsets[i + 1]). + fn invoke_per_accumulator_preordered( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + offsets_param: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + f: F, + ) -> Result<()> + where + F: Fn(&mut dyn Accumulator, &[ArrayRef]) -> Result<()>, + { + self.make_accumulators_if_needed(total_num_groups)?; + + assert_eq!( + values[0].len(), + group_indices.len(), + "asserting values[0].len() == group_indices.len()" + ); + + // let mut sizes_pre = 0; + // let mut sizes_post = 0; + for offsets in offsets_param.windows(2) { + let group_idx = group_indices[offsets[0]]; + let accumulator: &mut AccumulatorType = &mut self.accumulators[group_idx]; + // sizes_pre += state.size(); // TODO: Add Accumulator::size? + + let values_to_accumulate = + slice_and_maybe_filter(values, opt_filter, offsets)?; + f(accumulator, &values_to_accumulate)?; + + // sizes_post += state.size(); // TODO: Add Accumulator::size? + } + + // self.adjust_allocation(sizes_pre, sizes_post); // TODO: Add Accumulator::size? + Ok(()) + } + + fn invoke_per_accumulator( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + f: F, + ) -> Result<()> + where + F: Fn(&mut dyn Accumulator, &[ArrayRef]) -> Result<()>, + { + self.make_accumulators_if_needed(total_num_groups)?; + self.make_indices_if_needed(total_num_groups)?; + + assert_eq!( + values[0].len(), + group_indices.len(), + "asserting values[0].len() == group_indices.len()" + ); + + // figure out which input rows correspond to which groups. + // Note that self.state.indices starts empty for all groups + // (it is cleared out below) + for (idx, group_index) in group_indices.iter().enumerate() { + self.indices[*group_index].push(idx as u32); + } + + // groups_with_rows holds a list of group indexes that have + // any rows that need to be accumulated, stored in order of + // group_index + + let mut groups_with_rows = vec![]; + + // batch_indices holds indices into values, each group is contiguous + let mut batch_indices = vec![]; + + // offsets[i] is index into batch_indices where the rows for + // group_index i starts + let mut offsets = vec![0]; + + let mut offset_so_far = 0; + for (group_index, indices) in self.indices.iter_mut().enumerate() { + if indices.is_empty() { + continue; + } + + groups_with_rows.push(group_index); + batch_indices.extend_from_slice(indices); + offset_so_far += indices.len(); + offsets.push(offset_so_far); + } + let batch_indices = batch_indices.into(); + + // reorder the values and opt_filter by batch_indices so that + // all values for each group are contiguous, then invoke the + // accumulator once per group with values + let values = take_arrays(values, &batch_indices, None)?; + let opt_filter = get_filter_at_indices(opt_filter, &batch_indices)?; + + // invoke each accumulator with the appropriate rows, first + // pulling the input arguments for this group into their own + // RecordBatch(es) + let iter = groups_with_rows.iter().zip(offsets.windows(2)); + + // let mut sizes_pre = 0; + // let mut sizes_post = 0; + for (&group_idx, offsets) in iter { + // sizes_pre += state.size(); // TODO: Add Accumulator::size? + + let values_to_accumulate = slice_and_maybe_filter( + &values, + opt_filter.as_ref().map(|f| as_boolean_array(f)), + offsets, + )?; + f(&mut self.accumulators[group_idx], &values_to_accumulate)?; + + // clear out the state so they are empty for next + // iteration + self.indices[group_idx].clear(); + // sizes_post += state.size(); // TODO: Add Accumulator::size? + } + + // self.adjust_allocation(sizes_pre, sizes_post); // TODO: Add Accumulator::size? + Ok(()) + } + + /// Increment the allocation by `n` + /// + /// See [`Self::allocation_bytes`] for rationale. + fn add_allocation(&mut self, size: usize) { + self.allocation_bytes += size; + } + + /// Decrease the allocation by `n` + /// + /// See [`Self::allocation_bytes`] for rationale. + fn free_allocation(&mut self, size: usize) { + // use saturating sub to avoid errors if the accumulators + // report erronious sizes + self.allocation_bytes = self.allocation_bytes.saturating_sub(size) + } + + /// Adjusts the allocation for something that started with + /// start_size and now has new_size avoiding overflow + /// + /// See [`Self::allocation_bytes`] for rationale. + fn adjust_allocation(&mut self, old_size: usize, new_size: usize) { + if new_size > old_size { + self.add_allocation(new_size - old_size) + } else { + self.free_allocation(old_size - new_size) + } + } +} + +impl GroupsAccumulator + for GroupsAccumulatorFlatAdapter +{ + fn update_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + self.invoke_per_accumulator( + values, + group_indices, + opt_filter, + total_num_groups, + |accumulator, values_to_accumulate| { + accumulator.update_batch(values_to_accumulate) + }, + )?; + Ok(()) + } + + fn update_batch_preordered( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + offsets: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + self.invoke_per_accumulator_preordered( + values, + group_indices, + offsets, + opt_filter, + total_num_groups, + |accumulator, values_to_accumulate| { + accumulator.update_batch(values_to_accumulate) + }, + )?; + Ok(()) + } + + fn evaluate(&mut self, emit_to: EmitTo) -> Result { + let vec_size_pre = self.accumulators.allocated_size(); + + let accumulators = emit_to.take_needed(&mut self.accumulators); + self.indices.truncate(self.accumulators.len()); + + let results: Vec = accumulators + .into_iter() + .map(|accumulator| { + // self.free_allocation(state.size()); // TODO: Add Accumulator::size? + accumulator.evaluate() + }) + .collect::>()?; + + let result = ScalarValue::iter_to_array(results); + + self.adjust_allocation(vec_size_pre, self.accumulators.allocated_size()); + + result + } + + fn peek_evaluate(&self, group_index: usize) -> Result { + self.accumulators[group_index].evaluate() + } + + // filtered_null_mask(opt_filter, &values); + fn state(&mut self, emit_to: EmitTo) -> Result> { + let vec_size_pre = self.accumulators.allocated_size(); + let accumulators = emit_to.take_needed(&mut self.accumulators); + self.indices.truncate(self.accumulators.len()); + + // each accumulator produces a potential vector of values + // which we need to form into columns + let mut results: Vec> = vec![]; + + for accumulator in accumulators { + // self.free_allocation(state.size()); // TODO: Add Accumulator::size? + let accumulator_state = accumulator.state()?; + results.resize_with(accumulator_state.len(), Vec::new); + for (idx, state_val) in accumulator_state.into_iter().enumerate() { + results[idx].push(state_val); + } + } + + // create an array for each intermediate column + let arrays = results + .into_iter() + .map(ScalarValue::iter_to_array) + .collect::>>()?; + + // double check each array has the same length (aka the + // accumulator was implemented correctly + if let Some(first_col) = arrays.first() { + for arr in &arrays { + assert_eq!(arr.len(), first_col.len()) + } + } + self.adjust_allocation(vec_size_pre, self.accumulators.allocated_size()); + + Ok(arrays) + } + + fn peek_state(&self, group_index: usize) -> Result> { + self.accumulators[group_index].state() + } + + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + self.invoke_per_accumulator( + values, + group_indices, + opt_filter, + total_num_groups, + |accumulator, values_to_accumulate| { + accumulator.merge_batch(values_to_accumulate)?; + Ok(()) + }, + )?; + Ok(()) + } + + fn merge_batch_preordered( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + offsets: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + self.invoke_per_accumulator_preordered( + values, + group_indices, + offsets, + opt_filter, + total_num_groups, + |accumulator, values_to_accumulate| { + accumulator.merge_batch(values_to_accumulate)?; + Ok(()) + }, + )?; + Ok(()) + } + + fn size(&self) -> usize { + self.allocation_bytes + } + + fn convert_to_state( + &self, + values: &[ArrayRef], + opt_filter: Option<&BooleanArray>, + ) -> Result> { + let num_rows = values[0].len(); + + // If there are no rows, return empty arrays + if num_rows == 0 { + // create empty accumulator to get the state types + let empty_state = (self.factory)()?.state()?; + let empty_arrays = empty_state + .into_iter() + .map(|state_val| new_empty_array(&state_val.get_datatype())) + .collect::>(); + + return Ok(empty_arrays); + } + + // Each row has its respective group + let mut results = vec![]; + for row_idx in 0..num_rows { + // Create the empty accumulator for converting + let mut converted_accumulator = (self.factory)()?; + + // Convert row to states + let values_to_accumulate = + slice_and_maybe_filter(values, opt_filter, &[row_idx, row_idx + 1])?; + converted_accumulator.update_batch(&values_to_accumulate)?; + let states = converted_accumulator.state()?; + + // Resize results to have enough columns according to the converted states + results.resize_with(states.len(), || Vec::with_capacity(num_rows)); + + // Add the states to results + for (idx, state_val) in states.into_iter().enumerate() { + results[idx].push(state_val); + } + } + + let arrays = results + .into_iter() + .map(ScalarValue::iter_to_array) + .collect::>>()?; + + Ok(arrays) + } + + fn supports_convert_to_state(&self) -> bool { + true + } +} + +/// Extension trait for [`Vec`] to account for allocations. +pub trait VecAllocExt { + /// Item type. + type T; + /// Return the amount of memory allocated by this Vec (not + /// recursively counting any heap allocations contained within the + /// structure). Does not include the size of `self` + fn allocated_size(&self) -> usize; +} + +impl VecAllocExt for Vec { + type T = T; + fn allocated_size(&self) -> usize { + size_of::() * self.capacity() + } +} + +fn get_filter_at_indices( + opt_filter: Option<&BooleanArray>, + indices: &PrimitiveArray, +) -> Result> { + opt_filter + .map(|filter| { + compute::take( + filter, indices, None, // None: no index check + ) + }) + .transpose() + .map_err(|e| arrow_datafusion_err!(e)) +} + +// Copied from physical-plan +pub(crate) fn slice_and_maybe_filter( + aggr_array: &[ArrayRef], + filter_opt: Option<&BooleanArray>, + offsets: &[usize], +) -> Result> { + let (offset, length) = (offsets[0], offsets[1] - offsets[0]); + let sliced_arrays: Vec = aggr_array + .iter() + .map(|array| array.slice(offset, length)) + .collect(); + + if let Some(f) = filter_opt { + let filter = f.slice(offset, length); + + sliced_arrays + .iter() + .map(|array| { + compute::filter(array.as_ref(), as_boolean_array(&filter)) + .map_err(|e| arrow_datafusion_err!(e)) + }) + .collect() + } else { + Ok(sliced_arrays) + } +} diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index fc0bf4af89d8..bd8e5e93f100 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -71,6 +71,7 @@ use arrow::array::{ }; use async_trait::async_trait; +use super::groups_accumulator::GroupsAccumulator; use super::{ expressions::Column, group_scalar::GroupByScalar, RecordBatchStream, SendableRecordBatchStream, @@ -411,10 +412,10 @@ pub(crate) fn group_aggregate_batch( group_expr: &[Arc], aggr_expr: &[Arc], batch: RecordBatch, - mut accumulators: Accumulators, + mut accumulation_state: AccumulationState, aggregate_expressions: &[Vec>], skip_row: impl Fn(&RecordBatch, /*row_index*/ usize) -> bool, -) -> Result { +) -> Result { // evaluate the grouping expressions let group_values = evaluate(group_expr, &batch)?; @@ -437,6 +438,9 @@ pub(crate) fn group_aggregate_batch( // Make sure we can create the accumulators or otherwise return an error create_accumulators(aggr_expr).map_err(DataFusionError::into_arrow_external_error)?; + let all_groups_accumulators: bool = + aggr_expr.iter().all(|expr| expr.uses_groups_accumulator()); + // Keys received in this batch let mut batch_keys = BinaryBuilder::new(0); @@ -448,11 +452,12 @@ pub(crate) fn group_aggregate_batch( create_key(&group_values, row, &mut key) .map_err(DataFusionError::into_arrow_external_error)?; - accumulators + accumulation_state + .accumulators .raw_entry_mut() .from_key(&key) // 1.3 - .and_modify(|_, (_, _, v)| { + .and_modify(|_, AccumulationGroupState { indices: v, .. }| { if v.is_empty() { batch_keys.append_value(&key).expect("must not fail"); }; @@ -461,30 +466,44 @@ pub(crate) fn group_aggregate_batch( // 1.2 .or_insert_with(|| { // We can safely unwrap here as we checked we can create an accumulator before - let accumulator_set = create_accumulators(aggr_expr).unwrap(); + let accumulator_set = create_spotty_accumulators(aggr_expr).unwrap(); batch_keys.append_value(&key).expect("must not fail"); let _ = create_group_by_values(&group_values, row, &mut group_by_values); let mut taken_values = smallvec![GroupByScalar::UInt32(0); group_values.len()]; std::mem::swap(&mut taken_values, &mut group_by_values); + let group_index = accumulation_state.next_group_index; + accumulation_state.next_group_index += 1; ( key.clone(), - (taken_values, accumulator_set, smallvec![row as u32]), + AccumulationGroupState { + group_by_values: taken_values, + accumulator_set, + indices: smallvec![row as u32], + group_index, + }, ) }); } // Collect all indices + offsets based on keys in this vec let mut batch_indices: UInt32Builder = UInt32Builder::new(0); + let mut all_group_indices = Vec::new(); let mut offsets = vec![0]; let mut offset_so_far = 0; let batch_keys = batch_keys.finish(); for key in batch_keys.iter() { let key = key.unwrap(); - let (_, _, indices) = accumulators.get_mut(key).unwrap(); + let AccumulationGroupState { + indices, + group_index, + .. + } = accumulation_state.accumulators.get_mut(key).unwrap(); batch_indices.append_slice(indices)?; + all_group_indices.extend(std::iter::repeat(*group_index).take(indices.len())); offset_so_far += indices.len(); offsets.push(offset_so_far); + indices.clear(); } let batch_indices = batch_indices.finish(); @@ -507,49 +526,88 @@ pub(crate) fn group_aggregate_batch( }) .collect(); - // 2.1 for each key in this batch - // 2.2 for each aggregation - // 2.3 `slice` from each of its arrays the keys' values - // 2.4 update / merge the accumulator with the values - // 2.5 clear indices - batch_keys - .iter() - .zip(offsets.windows(2)) - .try_for_each(|(key, offsets)| { - let (_, accumulator_set, indices) = - accumulators.get_mut(key.unwrap()).unwrap(); - // 2.2 - accumulator_set - .iter_mut() - .zip(values.iter()) - .map(|(accumulator, aggr_array)| { - ( - accumulator, - aggr_array - .iter() - .map(|array| { - // 2.3 - array.slice(offsets[0], offsets[1] - offsets[0]) - }) - .collect::>(), - ) - }) - .try_for_each(|(accumulator, values)| match mode { - AggregateMode::Partial | AggregateMode::Full => { - accumulator.update_batch(&values) - } - AggregateMode::FinalPartitioned | AggregateMode::Final => { - // note: the aggregation here is over states, not values, thus the merge - accumulator.merge_batch(&values) - } - }) - // 2.5 - .and({ - indices.clear(); - Ok(()) - }) - })?; - Ok(accumulators) + if !all_groups_accumulators { + // 2.1 for each key in this batch + // 2.2 for each aggregation + // 2.3 `slice` from each of its arrays the keys' values + // 2.4 update / merge the accumulator with the values + // 2.5 clear indices + batch_keys + .iter() + .zip(offsets.windows(2)) + .try_for_each(|(key, offsets)| { + let AccumulationGroupState { + accumulator_set, .. + } = accumulation_state + .accumulators + .get_mut(key.unwrap()) + .unwrap(); + // 2.2 + accumulator_set + .iter_mut() + .zip(values.iter()) + .map(|(accumulator, aggr_array)| { + ( + accumulator, + aggr_array + .iter() + .map(|array| { + // 2.3 + array.slice(offsets[0], offsets[1] - offsets[0]) + }) + .collect::>(), + ) + }) + .try_for_each(|(accumulator, values)| { + if let Some(accumulator) = accumulator { + match mode { + AggregateMode::Partial | AggregateMode::Full => { + accumulator.update_batch(&values) + } + AggregateMode::FinalPartitioned + | AggregateMode::Final => { + // note: the aggregation here is over states, not values, thus the merge + accumulator.merge_batch(&values) + } + } + } else { + // We do groups accumulator separately. + Ok(()) + } + }) + })?; + } + + for (accumulator_index, accumulator) in accumulation_state + .groups_accumulators + .iter_mut() + .enumerate() + { + if let Some(accumulator) = accumulator { + match mode { + AggregateMode::Partial | AggregateMode::Full => accumulator + .update_batch_preordered( + &values[accumulator_index], + &all_group_indices, + &offsets, + None, + accumulation_state.next_group_index, + )?, + AggregateMode::FinalPartitioned | AggregateMode::Final => { + // note: the aggregation here is over states, not values, thus the merge + accumulator.merge_batch_preordered( + &values[accumulator_index], + &all_group_indices, + &offsets, + None, + accumulation_state.next_group_index, + )? + } + } + } + } + + Ok(accumulation_state) } /// Appends a sequence of [u8] bytes for the value in `col[row]` to @@ -810,7 +868,7 @@ async fn compute_grouped_hash_aggregate( //let mut accumulators: Accumulators = FnvHashMap::default(); // iterate over all input batches and update the accumulators - let mut accumulators = Accumulators::default(); + let mut accumulators = create_accumulation_state(&aggr_expr)?; while let Some(batch) = input.next().await { let batch = batch?; accumulators = group_aggregate_batch( @@ -882,16 +940,44 @@ pub type KeyVec = SmallVec<[u8; 64]>; type AccumulatorItem = Box; #[allow(missing_docs)] pub type AccumulatorSet = SmallVec<[AccumulatorItem; 2]>; +/// Not really a set. Order matters, as this is a parallel array with some GroupsAccumulator array. +/// There are Nones in place where there is (in some AccumulationState, presumably) a groups +/// accumulator in the parallel array. +pub type SpottyAccumulatorSet = SmallVec<[Option; 2]>; +#[allow(missing_docs)] +pub type Accumulators = HashMap; + #[allow(missing_docs)] -pub type Accumulators = HashMap< - KeyVec, - ( - SmallVec<[GroupByScalar; 2]>, - AccumulatorSet, - SmallVec<[u32; 4]>, - ), - RandomState, ->; +pub struct AccumulationGroupState { + group_by_values: SmallVec<[GroupByScalar; 2]>, + // Each aggregate either has an Accumulator or a GroupsAccumulator. For each i, + // accumulator_set[i].is_some() != groups_accumulators[i].is_some(). + accumulator_set: SpottyAccumulatorSet, + indices: SmallVec<[u32; 4]>, + group_index: usize, +} + +#[allow(missing_docs)] +#[derive(Default)] +pub struct AccumulationState { + accumulators: HashMap, + groups_accumulators: Vec>>, + // For now, always equal to accumulators.len() + next_group_index: usize, +} + +impl AccumulationState { + /// Constructs an initial AccumulationState. + pub fn new( + groups_accumulators: Vec>>, + ) -> AccumulationState { + AccumulationState { + accumulators: HashMap::new(), + groups_accumulators, + next_group_index: 0, + } + } +} impl Stream for GroupedHashAggregateStream { type Item = ArrowResult; @@ -1138,11 +1224,11 @@ impl RecordBatchStream for HashAggregateStream { /// Create a RecordBatch with all group keys and accumulator' states or values. pub(crate) fn create_batch_from_map( mode: &AggregateMode, - accumulators: &Accumulators, + accumulation_state: &AccumulationState, num_group_expr: usize, output_schema: &Schema, ) -> ArrowResult { - if accumulators.is_empty() { + if accumulation_state.accumulators.is_empty() { return Ok(RecordBatch::new_empty(Arc::new(output_schema.to_owned()))); } // 1. for each key @@ -1153,12 +1239,23 @@ pub(crate) fn create_batch_from_map( let mut key_columns: Vec> = Vec::with_capacity(num_group_expr); let mut value_columns = Vec::new(); - for (_, (group_by_values, accumulator_set, _)) in accumulators { + for ( + _, + AccumulationGroupState { + group_by_values, + accumulator_set, + group_index, + .. + }, + ) in &accumulation_state.accumulators + { // 2 and 3. - write_group_result_row( + write_group_result_row_with_groups_accumulator( *mode, group_by_values, accumulator_set, + &accumulation_state.groups_accumulators, + *group_index, &output_schema.fields()[0..num_group_expr], &mut key_columns, &mut value_columns, @@ -1223,6 +1320,50 @@ pub fn write_group_result_row( finalize_aggregation_into(&accumulator_set, &mode, value_columns) } +// TODO: Dedup with write_group_result_row. +#[allow(missing_docs)] +pub fn write_group_result_row_with_groups_accumulator( + mode: AggregateMode, + group_by_values: &[GroupByScalar], + accumulator_set: &SpottyAccumulatorSet, + groups_accumulators: &[Option>], + group_index: usize, + key_fields: &[Field], + key_columns: &mut Vec>, + value_columns: &mut Vec>, +) -> Result<()> { + let add_key_columns = key_columns.is_empty(); + for i in 0..group_by_values.len() { + match &group_by_values[i] { + // Optimization to avoid allocation on conversion to ScalarValue. + GroupByScalar::Utf8(str) => { + if add_key_columns { + key_columns.push(Box::new(StringBuilder::new(0))); + } + key_columns[i] + .as_any_mut() + .downcast_mut::() + .unwrap() + .append_value(str)?; + } + v => { + let scalar = v.to_scalar(key_fields[i].data_type()); + if add_key_columns { + key_columns.push(create_builder(&scalar)); + } + append_value(&mut *key_columns[i], &scalar)?; + } + } + } + finalize_aggregation_into_with_groups_accumulators( + &accumulator_set, + groups_accumulators, + group_index, + &mode, + value_columns, + ) +} + #[allow(missing_docs)] pub fn create_accumulators( aggr_expr: &[Arc], @@ -1233,6 +1374,40 @@ pub fn create_accumulators( .collect::>>() } +// TODO: Name? (Spotty) +#[allow(missing_docs)] +pub fn create_spotty_accumulators( + aggr_expr: &[Arc], +) -> Result { + aggr_expr + .iter() + .map(|expr| { + Ok(if expr.uses_groups_accumulator() { + None + } else { + Some(expr.create_accumulator()?) + }) + }) + .collect::>>() +} + +#[allow(missing_docs)] +pub fn create_accumulation_state( + aggr_expr: &[Arc], +) -> ArrowResult { + let mut groups_accumulators = + Vec::>>::with_capacity(aggr_expr.len()); + for expr in aggr_expr { + if let Some(groups_acc) = expr.create_groups_accumulator()? { + groups_accumulators.push(Some(groups_acc)); + } else { + groups_accumulators.push(None); + } + } + + Ok(AccumulationState::new(groups_accumulators)) +} + #[allow(unused_variables)] pub(crate) fn create_builder(s: &ScalarValue) -> Box { macro_rules! create_list_builder { @@ -1369,6 +1544,61 @@ fn finalize_aggregation_into( Ok(()) } +/// adds aggregation results into columns, creating the required builders when necessary. +/// final value (mode = Final) or states (mode = Partial) +fn finalize_aggregation_into_with_groups_accumulators( + accumulators: &SpottyAccumulatorSet, + groups_accumulators: &[Option>], + group_index: usize, + mode: &AggregateMode, + columns: &mut Vec>, +) -> Result<()> { + let add_columns = columns.is_empty(); + match mode { + AggregateMode::Partial => { + let mut col_i = 0; + for (i, a) in accumulators.iter().enumerate() { + let state = if let Some(a) = a { + a.state() + } else { + groups_accumulators[i] + .as_ref() + .unwrap() + .peek_state(group_index) + }?; + // build the vector of states + for v in state { + if add_columns { + columns.push(create_builder(&v)); + assert_eq!(col_i + 1, columns.len()); + } + append_value(&mut *columns[col_i], &v)?; + col_i += 1; + } + } + } + AggregateMode::Final | AggregateMode::FinalPartitioned | AggregateMode::Full => { + for i in 0..accumulators.len() { + // merge the state to the final value + let v: ScalarValue = if let Some(accumulator) = &accumulators[i] { + accumulator.evaluate()? + } else { + groups_accumulators[i] + .as_ref() + .unwrap() + .peek_evaluate(group_index)? + }; + if add_columns { + columns.push(create_builder(&v)); + assert_eq!(i + 1, columns.len()); + } + append_value(&mut *columns[i], &v)?; + } + } + } + Ok(()) +} + /// returns a vector of ArrayRefs, where each entry corresponds to either the /// final value (mode = Final) or states (mode = Partial) fn finalize_aggregation( diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 0c76b6e55fa7..23e667781109 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -34,6 +34,7 @@ use arrow::{array::ArrayRef, datatypes::Field}; use async_trait::async_trait; pub use display::DisplayFormatType; use futures::stream::Stream; +use groups_accumulator::GroupsAccumulator; use hashbrown::HashMap; use std::fmt; use std::fmt::{Debug, Display}; @@ -464,6 +465,19 @@ pub trait AggregateExpr: Send + Sync + Debug { /// return states with the same description as `state_fields` fn create_accumulator(&self) -> Result>; + /// Returns true if and only if create_groups_accumulator returns Ok(Some(_)) (if not an Err(_)). + fn uses_groups_accumulator(&self) -> bool { + return false; + } + + /// the groups accumulator used to accumulate values from the expression. If this returns None, + /// create_accumulator must be used. + fn create_groups_accumulator( + &self, + ) -> ArrowResult>> { + Ok(None) + } + /// the fields that encapsulate the Accumulator's state /// the number of fields here equals the number of states that the accumulator contains fn state_fields(&self) -> Result>; @@ -638,6 +652,9 @@ pub mod expressions; pub mod filter; pub mod functions; pub mod group_scalar; +pub mod groups_accumulator; +pub mod groups_accumulator_adapter; +pub mod groups_accumulator_flat_adapter; pub mod hash_aggregate; pub mod hash_join; pub mod hash_utils;