@@ -632,16 +632,6 @@ pub(crate) fn create_group_accumulator(
632632 }
633633}
634634
635- /// Extracts a successful Ok(_) or returns Poll::Ready(Some(Err(e))) with errors
636- macro_rules! extract_ok {
637- ( $RES: expr) => { {
638- match $RES {
639- Ok ( v) => v,
640- Err ( e) => return Poll :: Ready ( Some ( Err ( e) ) ) ,
641- }
642- } } ;
643- }
644-
645635impl Stream for GroupedHashAggregateStream {
646636 type Item = Result < RecordBatch > ;
647637
@@ -661,7 +651,7 @@ impl Stream for GroupedHashAggregateStream {
661651 let input_rows = batch. num_rows ( ) ;
662652
663653 // Do the grouping
664- extract_ok ! ( self . group_aggregate_batch( batch) ) ;
654+ self . group_aggregate_batch ( batch) ? ;
665655
666656 self . update_skip_aggregation_probe ( input_rows) ;
667657
@@ -673,26 +663,24 @@ impl Stream for GroupedHashAggregateStream {
673663 // emit all groups and switch to producing output
674664 if self . hit_soft_group_limit ( ) {
675665 timer. done ( ) ;
676- extract_ok ! ( self . set_input_done_and_produce_output( ) ) ;
666+ self . set_input_done_and_produce_output ( ) ? ;
677667 // make sure the exec_state just set is not overwritten below
678668 break ' reading_input;
679669 }
680670
681671 if let Some ( to_emit) = self . group_ordering . emit_to ( ) {
682672 timer. done ( ) ;
683- if let Some ( batch) =
684- extract_ok ! ( self . emit( to_emit, false ) )
685- {
673+ if let Some ( batch) = self . emit ( to_emit, false ) ? {
686674 self . exec_state =
687675 ExecutionState :: ProducingOutput ( batch) ;
688676 } ;
689677 // make sure the exec_state just set is not overwritten below
690678 break ' reading_input;
691679 }
692680
693- extract_ok ! ( self . emit_early_if_necessary( ) ) ;
681+ self . emit_early_if_necessary ( ) ? ;
694682
695- extract_ok ! ( self . switch_to_skip_aggregation( ) ) ;
683+ self . switch_to_skip_aggregation ( ) ? ;
696684
697685 timer. done ( ) ;
698686 }
@@ -703,10 +691,10 @@ impl Stream for GroupedHashAggregateStream {
703691 let timer = elapsed_compute. timer ( ) ;
704692
705693 // Make sure we have enough capacity for `batch`, otherwise spill
706- extract_ok ! ( self . spill_previous_if_necessary( & batch) ) ;
694+ self . spill_previous_if_necessary ( & batch) ? ;
707695
708696 // Do the grouping
709- extract_ok ! ( self . group_aggregate_batch( batch) ) ;
697+ self . group_aggregate_batch ( batch) ? ;
710698
711699 // If we can begin emitting rows, do so,
712700 // otherwise keep consuming input
@@ -716,16 +704,14 @@ impl Stream for GroupedHashAggregateStream {
716704 // emit all groups and switch to producing output
717705 if self . hit_soft_group_limit ( ) {
718706 timer. done ( ) ;
719- extract_ok ! ( self . set_input_done_and_produce_output( ) ) ;
707+ self . set_input_done_and_produce_output ( ) ? ;
720708 // make sure the exec_state just set is not overwritten below
721709 break ' reading_input;
722710 }
723711
724712 if let Some ( to_emit) = self . group_ordering . emit_to ( ) {
725713 timer. done ( ) ;
726- if let Some ( batch) =
727- extract_ok ! ( self . emit( to_emit, false ) )
728- {
714+ if let Some ( batch) = self . emit ( to_emit, false ) ? {
729715 self . exec_state =
730716 ExecutionState :: ProducingOutput ( batch) ;
731717 } ;
@@ -745,7 +731,7 @@ impl Stream for GroupedHashAggregateStream {
745731 // Found end from input stream
746732 None => {
747733 // inner is done, emit all rows and switch to producing output
748- extract_ok ! ( self . set_input_done_and_produce_output( ) ) ;
734+ self . set_input_done_and_produce_output ( ) ? ;
749735 }
750736 }
751737 }
0 commit comments