Skip to content

Commit fd6f74f

Browse files
committed
feat(cube): Cube Accumulator extensions
1 parent d0cdc72 commit fd6f74f

File tree

3 files changed

+63
-1
lines changed

3 files changed

+63
-1
lines changed

datafusion/expr-common/src/accumulator.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
//! Accumulator module contains the trait definition for aggregation function's accumulators.
1919
2020
use arrow::array::ArrayRef;
21-
use datafusion_common::{internal_err, Result, ScalarValue};
21+
use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue};
2222
use std::fmt::Debug;
2323

2424
/// Tracks an aggregate function's state.
@@ -72,6 +72,23 @@ pub trait Accumulator: Send + Sync + Debug {
7272
/// when possible (for example distinct strings)
7373
fn evaluate(&mut self) -> Result<ScalarValue>;
7474

75+
/// Cube: Like evaluate() but doesn't modify the accumulator.
76+
fn peek_evaluate(&self) -> Result<ScalarValue> {
77+
not_impl_err!("Accumulator::peek_evaluate not implemented for {}", std::any::type_name::<Self>())
78+
}
79+
/// Cube: Resets the accumulator to its initial (zero-like) state.
80+
fn reset(&mut self) -> Result<()> {
81+
not_impl_err!("Accumulator::reset not implemented for {}", std::any::type_name::<Self>())
82+
}
83+
/// Cube: Like state() but doesn't modify the accumulator.
84+
fn peek_state(&self) -> Result<Vec<ScalarValue>> {
85+
not_impl_err!("Accumulator::peek_state not implemented for {}", std::any::type_name::<Self>())
86+
}
87+
/// Cube: true if this Accumulator supports these Cube accumulator functions.
88+
fn supports_cube_ext(&self) -> bool {
89+
false
90+
}
91+
7592
/// Returns the allocated size required for this accumulator, in
7693
/// bytes, including `Self`.
7794
///

datafusion/functions-aggregate/src/min_max.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -846,6 +846,21 @@ impl Accumulator for MaxAccumulator {
846846
fn size(&self) -> usize {
847847
std::mem::size_of_val(self) - std::mem::size_of_val(&self.max) + self.max.size()
848848
}
849+
850+
// Cube exts:
851+
fn peek_evaluate(&self) -> Result<ScalarValue> {
852+
Ok(self.max.clone())
853+
}
854+
fn reset(&mut self) -> Result<()> {
855+
self.max = ScalarValue::try_from(self.max.data_type())?;
856+
Ok(())
857+
}
858+
fn peek_state(&self) -> Result<Vec<ScalarValue>> {
859+
Ok(vec![self.peek_evaluate()?])
860+
}
861+
fn supports_cube_ext(&self) -> bool {
862+
true
863+
}
849864
}
850865

851866
#[derive(Debug)]
@@ -1104,6 +1119,21 @@ impl Accumulator for MinAccumulator {
11041119
fn size(&self) -> usize {
11051120
std::mem::size_of_val(self) - std::mem::size_of_val(&self.min) + self.min.size()
11061121
}
1122+
1123+
// Cube exts:
1124+
fn peek_evaluate(&self) -> Result<ScalarValue> {
1125+
Ok(self.min.clone())
1126+
}
1127+
fn reset(&mut self) -> Result<()> {
1128+
self.min = ScalarValue::try_from(self.min.data_type())?;
1129+
Ok(())
1130+
}
1131+
fn peek_state(&self) -> Result<Vec<ScalarValue>> {
1132+
Ok(vec![self.peek_evaluate()?])
1133+
}
1134+
fn supports_cube_ext(&self) -> bool {
1135+
true
1136+
}
11071137
}
11081138

11091139
#[derive(Debug)]

datafusion/functions-aggregate/src/sum.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,21 @@ impl<T: ArrowNumericType> Accumulator for SumAccumulator<T> {
281281
fn size(&self) -> usize {
282282
std::mem::size_of_val(self)
283283
}
284+
285+
// Cube exts:
286+
fn peek_evaluate(&self) -> Result<ScalarValue> {
287+
ScalarValue::new_primitive::<T>(self.sum, &self.data_type)
288+
}
289+
fn reset(&mut self) -> Result<()> {
290+
self.sum = None;
291+
Ok(())
292+
}
293+
fn peek_state(&self) -> Result<Vec<ScalarValue>> {
294+
Ok(vec![self.peek_evaluate()?])
295+
}
296+
fn supports_cube_ext(&self) -> bool {
297+
true
298+
}
284299
}
285300

286301
/// This accumulator incrementally computes sums over a sliding window

0 commit comments

Comments
 (0)