@@ -393,59 +393,36 @@ private static InternalAggregations aggregate(
393393 if (resultSetSize == 0 ) {
394394 return null ;
395395 }
396- if ( resultSetSize == 1 ) {
397- if (partialResults .hasNext ()) {
398- return InternalAggregations . reduce ( partialResults .next (), reduceContext );
399- }
396+ final InternalAggregations first ;
397+ if (partialResults .hasNext ()) {
398+ first = partialResults .next ();
399+ } else {
400400 try (var delayable = toConsume .next ().consumeAggs ()) {
401- return InternalAggregations . reduce ( delayable .expand (), reduceContext );
401+ first = delayable .expand ();
402402 }
403403 }
404+ if (resultSetSize == 1 ) {
405+ return InternalAggregations .reduce (first , reduceContext );
406+ }
404407 try {
405408 // general case
406- if (partialResults .hasNext ()) {
407- return consumeAggResults (partialResults , toConsume , createReducer (resultSetSize , reduceContext , partialResults .next ()));
408- }
409- AggregatorsReducer reducer ;
410- try (var delayable = toConsume .next ().consumeAggs ()) {
411- reducer = createReducer (resultSetSize , reduceContext , delayable .expand ());
409+ try (var reducer = new AggregatorsReducer (first , reduceContext , resultSetSize )) {
410+ reducer .accept (first );
411+ partialResults .forEachRemaining (reducer ::accept );
412+ while (toConsume .hasNext ()) {
413+ final InternalAggregations next ;
414+ try (var delayable = toConsume .next ().consumeAggs ()) {
415+ next = delayable .expand ();
416+ }
417+ reducer .accept (next );
418+ }
419+ return reducer .get ();
412420 }
413- return consumeAggResults (partialResults , toConsume , reducer );
414421 } finally {
415422 toConsume .forEachRemaining (QuerySearchResult ::releaseAggs );
416423 }
417424 }
418425
419- private static AggregatorsReducer createReducer (int resultSetSize , AggregationReduceContext reduceContext , InternalAggregations first ) {
420- boolean success = false ;
421- var reducer = new AggregatorsReducer (first , reduceContext , resultSetSize );
422- try {
423- reducer .accept (first );
424- success = true ;
425- return reducer ;
426- } finally {
427- if (success == false ) {
428- reducer .close ();
429- }
430- }
431- }
432-
433- private static InternalAggregations consumeAggResults (
434- Iterator <InternalAggregations > partialResults ,
435- Iterator <QuerySearchResult > toConsume ,
436- AggregatorsReducer reducer
437- ) {
438- try (reducer ) {
439- partialResults .forEachRemaining (reducer ::accept );
440- while (toConsume .hasNext ()) {
441- try (var delayable = toConsume .next ().consumeAggs ()) {
442- reducer .accept (delayable .expand ());
443- }
444- }
445- return reducer .get ();
446- }
447- }
448-
449426 public int getNumReducePhases () {
450427 return numReducePhases ;
451428 }
0 commit comments