@@ -79,7 +79,7 @@ use super::{
7979use crate :: cube_ext;
8080
8181use crate :: cube_ext:: ordfloat:: { OrdF32 , OrdF64 } ;
82- use crate :: physical_plan:: sorted_aggregate:: SortedAggState ;
82+ use crate :: physical_plan:: sorted_aggregate:: { agg_key_equals , SortedAggState } ;
8383use compute:: cast;
8484use smallvec:: smallvec;
8585use smallvec:: SmallVec ;
@@ -440,13 +440,20 @@ pub(crate) fn group_aggregate_batch(
440440 // Keys received in this batch
441441 let mut batch_keys = BinaryBuilder :: new ( 0 ) ;
442442
443- for row in 0 ..batch. num_rows ( ) {
443+ let mut row: usize = 0 ;
444+
445+ while row < batch. num_rows ( ) {
444446 if skip_row ( & batch, row) {
445447 continue ;
446448 }
447449 // 1.1
448450 create_key ( & group_values, row, & mut key)
449451 . map_err ( DataFusionError :: into_arrow_external_error) ?;
452+ let start = row;
453+ row += 1 ;
454+ while row < batch. num_rows ( ) && agg_key_equals ( & key, & group_values, row) ? {
455+ row += 1 ;
456+ }
450457
451458 accumulators
452459 . raw_entry_mut ( )
@@ -456,7 +463,7 @@ pub(crate) fn group_aggregate_batch(
456463 if v. is_empty ( ) {
457464 batch_keys. append_value ( & key) . expect ( "must not fail" ) ;
458465 } ;
459- v. push ( row as u32 )
466+ v. extend ( ( start as u32 .. row as u32 ) . into_iter ( ) )
460467 } )
461468 // 1.2
462469 . or_insert_with ( || {
@@ -469,7 +476,11 @@ pub(crate) fn group_aggregate_batch(
469476 std:: mem:: swap ( & mut taken_values, & mut group_by_values) ;
470477 (
471478 key. clone ( ) ,
472- ( taken_values, accumulator_set, smallvec ! [ row as u32 ] ) ,
479+ (
480+ taken_values,
481+ accumulator_set,
482+ smallvec ! [ ( start as u32 ..row as u32 ) . into_iter( ) ] ,
483+ ) ,
473484 )
474485 } ) ;
475486 }
0 commit comments