@@ -6,81 +6,76 @@ use crate::error::Result;
66use crate :: sql:: planner:: Aggregate ;
77use crate :: sql:: types:: { Expression , Row , Rows , Value } ;
88
9- /// Aggregates row values from the source according to the aggregates, using the
10- /// group_by expressions as buckets. Emits rows with group_by buckets then
11- /// aggregates in the given order.
12- pub fn aggregate (
13- mut source : Rows ,
9+ /// Computes bucketed aggregates for input rows. For example, this query would
10+ /// compute COUNT and SUM aggregates bucketed by category and brand:
11+ ///
12+ /// SELECT COUNT(*), SUM(price) FROM products GROUP BY category, brand
13+ pub struct Aggregator {
14+ /// GROUP BY expressions.
1415 group_by : Vec < Expression > ,
16+ /// Aggregates to compute.
1517 aggregates : Vec < Aggregate > ,
16- ) -> Result < Rows > {
17- let mut aggregator = Aggregator :: new ( group_by, aggregates) ;
18- while let Some ( row) = source. next ( ) . transpose ( ) ? {
19- aggregator. add ( row) ?;
20- }
21- aggregator. into_rows ( )
22- }
23-
24- /// Computes bucketed aggregates for rows.
25- struct Aggregator {
26- /// Bucketed accumulators (by group_by values).
18+ /// Accumulators indexed by group_by bucket.
2719 buckets : BTreeMap < Vec < Value > , Vec < Accumulator > > ,
28- /// The set of empty accumulators. Used to create new buckets.
29- empty : Vec < Accumulator > ,
30- /// Group by expressions. Indexes map to bucket values.
31- group_by : Vec < Expression > ,
32- /// Expressions to accumulate. Indexes map to accumulators.
33- expressions : Vec < Expression > ,
3420}
3521
3622impl Aggregator {
3723 /// Creates a new aggregator for the given GROUP BY buckets and aggregates.
38- fn new ( group_by : Vec < Expression > , aggregates : Vec < Aggregate > ) -> Self {
39- use Aggregate :: * ;
40- let accumulators = aggregates. iter ( ) . map ( Accumulator :: new) . collect ( ) ;
41- let expressions = aggregates
42- . into_iter ( )
43- . map ( |aggregate| match aggregate {
44- Average ( expr) | Count ( expr) | Max ( expr) | Min ( expr) | Sum ( expr) => expr,
45- } )
46- . collect ( ) ;
47- Self { buckets : BTreeMap :: new ( ) , empty : accumulators, group_by, expressions }
24+ pub fn new ( group_by : Vec < Expression > , aggregates : Vec < Aggregate > ) -> Self {
25+ Self { group_by, aggregates, buckets : BTreeMap :: new ( ) }
4826 }
4927
5028 /// Adds a row to the aggregator.
51- fn add ( & mut self , row : Row ) -> Result < ( ) > {
52- // Compute the bucket value.
53- let bucket: Vec < Value > =
54- self . group_by . iter ( ) . map ( |expr| expr. evaluate ( Some ( & row) ) ) . try_collect ( ) ?;
29+ pub fn add ( & mut self , row : & Row ) -> Result < ( ) > {
30+ // Compute the bucket values.
31+ let bucket = self . group_by . iter ( ) . map ( |expr| expr. evaluate ( Some ( row) ) ) . try_collect ( ) ?;
32+
33+ // Look up the bucket accumulators, or create a new bucket.
34+ let accumulators = self
35+ . buckets
36+ . entry ( bucket)
37+ . or_insert_with ( || self . aggregates . iter ( ) . map ( Accumulator :: new) . collect ( ) )
38+ . iter_mut ( ) ;
39+
40+ // Collect expressions to evaluate.
41+ let exprs = self . aggregates . iter ( ) . map ( |a| a. expr ( ) ) ;
42+
43+ // Accumulate the evaluated values.
44+ for ( accumulator, expr) in accumulators. zip_eq ( exprs) {
45+ accumulator. add ( expr. evaluate ( Some ( row) ) ?) ?;
46+ }
47+ Ok ( ( ) )
48+ }
5549
56- // Compute and accumulate the input values .
57- let accumulators = self . buckets . entry ( bucket ) . or_insert_with ( || self . empty . clone ( ) ) ;
58- for ( accumulator , expr ) in accumulators . iter_mut ( ) . zip ( & self . expressions ) {
59- accumulator . add ( expr . evaluate ( Some ( & row) ) ?) ?;
50+ /// Adds rows to the aggregator .
51+ pub fn add_rows ( & mut self , rows : Rows ) -> Result < ( ) > {
52+ for row in rows {
53+ self . add ( & row?) ?;
6054 }
6155 Ok ( ( ) )
6256 }
6357
6458 /// Returns a row iterator over the aggregate result.
65- fn into_rows ( self ) -> Result < Rows > {
59+ pub fn into_rows ( self ) -> Rows {
6660 // If there were no rows and no group_by expressions, return a row of
67- // empty accumulators, e.g. SELECT COUNT(*) FROM t WHERE FALSE
61+ // empty accumulators ( e.g. SELECT COUNT(*) FROM t WHERE FALSE).
6862 if self . buckets . is_empty ( ) && self . group_by . is_empty ( ) {
69- let result = self . empty . into_iter ( ) . map ( |acc| acc. value ( ) ) . collect ( ) ;
70- return Ok ( Box :: new ( std:: iter:: once ( result) ) ) ;
63+ let result =
64+ self . aggregates . iter ( ) . map ( Accumulator :: new) . map ( |acc| acc. value ( ) ) . try_collect ( ) ;
65+ return Box :: new ( std:: iter:: once ( result) ) ;
7166 }
7267
7368 // Emit the group_by and aggregate values for each bucket. We use an
7469 // intermediate vec since btree_map::IntoIter doesn't implement Clone
7570 // (required by Rows).
7671 let buckets = self . buckets . into_iter ( ) . collect_vec ( ) ;
77- Ok ( Box :: new ( buckets. into_iter ( ) . map ( |( bucket, accumulators) | {
72+ Box :: new ( buckets. into_iter ( ) . map ( |( bucket, accumulators) | {
7873 bucket
7974 . into_iter ( )
8075 . map ( Ok )
8176 . chain ( accumulators. into_iter ( ) . map ( |acc| acc. value ( ) ) )
8277 . collect ( )
83- } ) ) )
78+ } ) )
8479 }
8580}
8681
0 commit comments