@@ -135,7 +135,7 @@ impl Spiller {
135135 & mut self ,
136136 mut partition : PartitionedPayload ,
137137 is_row_shuffle : bool ,
138- dispatch_method : & Vec < u64 > ,
138+ dispatch_method : & [ u64 ] ,
139139 ) -> Result < ( ) > {
140140 match self {
141141 Spiller :: Standalone ( spiller) => Self :: spill_partition ( spiller, partition. payloads ) ,
@@ -472,30 +472,28 @@ impl AccumulatingTransform for NewTransformPartialAggregate {
472472
473473 if self . is_row_shuffle {
474474 blocks. extend ( payloads. into_iter ( ) . map ( DataBlock :: empty_with_meta) )
475+ } else if self . dispatch_method . len ( ) == 1 {
476+ blocks. push ( DataBlock :: empty_with_meta ( ExchangeShuffleMeta :: create (
477+ payloads
478+ . into_iter ( )
479+ . map ( DataBlock :: empty_with_meta)
480+ . collect ( ) ,
481+ ) ) )
475482 } else {
476- if self . dispatch_method . len ( ) == 1 {
477- blocks. push ( DataBlock :: empty_with_meta ( ExchangeShuffleMeta :: create (
478- payloads
479- . into_iter ( )
480- . map ( DataBlock :: empty_with_meta)
481- . collect ( ) ,
482- ) ) )
483- } else {
484- let mut chunks = Vec :: with_capacity ( self . dispatch_method . len ( ) ) ;
485- for bucket_num in self . dispatch_method . iter ( ) {
486- let chunk: Vec < AggregateMeta > = payloads
487- . drain ( 0 ..* bucket_num as usize )
488- . map ( |payload| {
489- AggregateMeta :: downcast_from ( payload)
490- . expect ( "AggregateMeta is expected" )
491- } )
492- . collect ( ) ;
493- chunks. push ( AggregateMeta :: create_partitioned ( None , chunk) ) ;
494- }
495- blocks. push ( DataBlock :: empty_with_meta ( ExchangeShuffleMeta :: create (
496- chunks. into_iter ( ) . map ( DataBlock :: empty_with_meta) . collect ( ) ,
497- ) ) )
483+ let mut chunks = Vec :: with_capacity ( self . dispatch_method . len ( ) ) ;
484+ for bucket_num in self . dispatch_method . iter ( ) {
485+ let chunk: Vec < AggregateMeta > = payloads
486+ . drain ( 0 ..* bucket_num as usize )
487+ . map ( |payload| {
488+ AggregateMeta :: downcast_from ( payload)
489+ . expect ( "AggregateMeta is expected" )
490+ } )
491+ . collect ( ) ;
492+ chunks. push ( AggregateMeta :: create_partitioned ( None , chunk) ) ;
498493 }
494+ blocks. push ( DataBlock :: empty_with_meta ( ExchangeShuffleMeta :: create (
495+ chunks. into_iter ( ) . map ( DataBlock :: empty_with_meta) . collect ( ) ,
496+ ) ) )
499497 }
500498 blocks
501499 }
0 commit comments