1212import org .apache .logging .log4j .LogManager ;
1313import org .apache .logging .log4j .Logger ;
1414import org .apache .lucene .search .TopDocs ;
15+ import org .elasticsearch .TransportVersions ;
1516import org .elasticsearch .action .search .SearchPhaseController .TopDocsStats ;
1617import org .elasticsearch .common .breaker .CircuitBreaker ;
1718import org .elasticsearch .common .breaker .CircuitBreakingException ;
2526import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
2627import org .elasticsearch .core .Nullable ;
2728import org .elasticsearch .core .Releasable ;
28- import org .elasticsearch .core .Releasables ;
2929import org .elasticsearch .core .Tuple ;
3030import org .elasticsearch .search .SearchPhaseResult ;
3131import org .elasticsearch .search .SearchService ;
@@ -220,7 +220,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
220220 }
221221 final int resultSize = buffer .size () + (mergeResult == null ? 0 : 1 ) + batchedResults .size ();
222222 final List <TopDocs > topDocsList = hasTopDocs ? new ArrayList <>(resultSize ) : null ;
223- final Deque <InternalAggregations > aggsList = hasAggs ? new ArrayDeque <>(resultSize ) : null ;
223+ final Deque <DelayableWriteable < InternalAggregations > > aggsList = hasAggs ? new ArrayDeque <>(resultSize ) : null ;
224224 // consume partial merge result from the un-batched execution path that is used for BwC, shard-level retries, and shard level
225225 // execution for shards on the coordinating node itself
226226 if (mergeResult != null ) {
@@ -253,7 +253,7 @@ public boolean hasNext() {
253253 }
254254
255255 @ Override
256- public InternalAggregations next () {
256+ public DelayableWriteable < InternalAggregations > next () {
257257 return aggsList .pollFirst ();
258258 }
259259 },
@@ -300,7 +300,7 @@ public InternalAggregations next() {
300300 private static void consumePartialMergeResult (
301301 MergeResult partialResult ,
302302 List <TopDocs > topDocsList ,
303- Collection <InternalAggregations > aggsList
303+ Collection <DelayableWriteable < InternalAggregations > > aggsList
304304 ) {
305305 if (topDocsList != null ) {
306306 topDocsList .add (partialResult .reducedTopDocs );
@@ -310,7 +310,7 @@ private static void consumePartialMergeResult(
310310 }
311311 }
312312
313- private static void addAggsToList (MergeResult partialResult , Collection <InternalAggregations > aggsList ) {
313+ private static void addAggsToList (MergeResult partialResult , Collection <DelayableWriteable < InternalAggregations > > aggsList ) {
314314 var aggs = partialResult .reducedAggs ;
315315 if (aggs != null ) {
316316 aggsList .add (aggs );
@@ -382,45 +382,34 @@ private MergeResult partialReduce(
382382 }
383383 // we leave the results un-serialized because serializing is slow but we compute the serialized
384384 // size as an estimate of the memory used by the newly reduced aggregations.
385- return new MergeResult (processedShards , newTopDocs , newAggs , newAggs != null ? DelayableWriteable .getSerializedSize (newAggs ) : 0 );
385+ return new MergeResult (
386+ processedShards ,
387+ newTopDocs ,
388+ newAggs == null ? null : DelayableWriteable .referencing (newAggs ),
389+ newAggs != null ? DelayableWriteable .getSerializedSize (newAggs ) : 0
390+ );
386391 }
387392
388393 private static InternalAggregations aggregate (
389394 Iterator <QuerySearchResult > toConsume ,
390- Iterator <InternalAggregations > partialResults ,
395+ Iterator <DelayableWriteable < InternalAggregations > > partialResults ,
391396 int resultSetSize ,
392397 AggregationReduceContext reduceContext
393398 ) {
394- interface ReleasableIterator extends Iterator <InternalAggregations >, Releasable {}
395- try (var aggsIter = new ReleasableIterator () {
396-
397- private Releasable toRelease ;
398-
399- @ Override
400- public void close () {
401- Releasables .close (toRelease );
402- }
403-
404- @ Override
405- public boolean hasNext () {
406- return toConsume .hasNext ();
407- }
408-
409- @ Override
410- public InternalAggregations next () {
411- var res = toConsume .next ().consumeAggs ();
412- Releasables .close (toRelease );
413- toRelease = res ;
414- return res .expand ();
415- }
416- }) {
417- return InternalAggregations .topLevelReduce (
418- partialResults .hasNext () ? Iterators .concat (partialResults , aggsIter ) : aggsIter ,
419- resultSetSize ,
420- reduceContext
421- );
399+ try {
400+ Iterator <InternalAggregations > aggsIter = Iterators .map (toConsume , r -> {
401+ try (var res = r .consumeAggs ()) {
402+ return res .expand ();
403+ }
404+ });
405+ return InternalAggregations .topLevelReduce (partialResults .hasNext () ? Iterators .concat (Iterators .map (partialResults , r -> {
406+ try (r ) {
407+ return r .expand ();
408+ }
409+ }), aggsIter ) : aggsIter , resultSetSize , reduceContext );
422410 } finally {
423411 toConsume .forEachRemaining (QuerySearchResult ::releaseAggs );
412+ partialResults .forEachRemaining (Releasable ::close );
424413 }
425414 }
426415
@@ -648,23 +637,30 @@ private static void releaseAggs(List<QuerySearchResult> toConsume) {
648637 record MergeResult (
649638 List <SearchShard > processedShards ,
650639 TopDocs reducedTopDocs ,
651- @ Nullable InternalAggregations reducedAggs ,
640+ @ Nullable DelayableWriteable < InternalAggregations > reducedAggs ,
652641 long estimatedSize
653642 ) implements Writeable {
654643
655644 static MergeResult readFrom (StreamInput in ) throws IOException {
656- return new MergeResult (
657- List .of (),
658- Lucene .readTopDocsIncludingShardIndex (in ),
659- in .readOptionalWriteable (InternalAggregations ::readFrom ),
660- in .readVLong ()
661- );
645+ return new MergeResult (List .of (), Lucene .readTopDocsIncludingShardIndex (in ), in .readOptionalWriteable (i -> {
646+ if (i .getTransportVersion ().onOrAfter (TransportVersions .BATCHED_QUERY_EXECUTION_DELAYABLE_WRITABLE )) {
647+ return DelayableWriteable .delayed (InternalAggregations ::readFrom , i );
648+ } else {
649+ return DelayableWriteable .referencing (InternalAggregations .readFrom (i ));
650+ }
651+ }), in .readVLong ());
662652 }
663653
664654 @ Override
665655 public void writeTo (StreamOutput out ) throws IOException {
666656 Lucene .writeTopDocsIncludingShardIndex (out , reducedTopDocs );
667- out .writeOptionalWriteable (reducedAggs );
657+ out .writeOptionalWriteable (
658+ reducedAggs == null
659+ ? null
660+ : (out .getTransportVersion ().onOrAfter (TransportVersions .BATCHED_QUERY_EXECUTION_DELAYABLE_WRITABLE )
661+ ? reducedAggs
662+ : reducedAggs .expand ())
663+ );
668664 out .writeVLong (estimatedSize );
669665 }
670666 }
0 commit comments