2626import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
2727import org .elasticsearch .core .Nullable ;
2828import org .elasticsearch .core .Releasable ;
29+ import org .elasticsearch .core .Releasables ;
2930import org .elasticsearch .core .Tuple ;
3031import org .elasticsearch .search .SearchPhaseResult ;
3132import org .elasticsearch .search .SearchService ;
@@ -162,7 +163,7 @@ public void consumeResult(SearchPhaseResult result, Runnable next) {
162163 consume (querySearchResult , next );
163164 }
164165
165- private final List <Tuple <TopDocsStats , MergeResult >> batchedResults = new ArrayList <>();
166+ private final ArrayDeque <Tuple <TopDocsStats , MergeResult >> batchedResults = new ArrayDeque <>();
166167
167168 /**
168169 * Unlinks partial merge results from this instance and returns them as a partial merge result to be sent to the coordinating node.
@@ -214,7 +215,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
214215 buffer .sort (RESULT_COMPARATOR );
215216 final TopDocsStats topDocsStats = this .topDocsStats ;
216217 var mergeResult = this .mergeResult ;
217- final List <Tuple <TopDocsStats , MergeResult >> batchedResults ;
218+ final ArrayDeque <Tuple <TopDocsStats , MergeResult >> batchedResults ;
218219 synchronized (this .batchedResults ) {
219220 batchedResults = this .batchedResults ;
220221 }
@@ -226,8 +227,8 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
226227 if (mergeResult != null ) {
227228 consumePartialMergeResult (mergeResult , topDocsList , aggsList );
228229 }
229- for ( int i = 0 ; i < batchedResults . size (); i ++) {
230- Tuple < TopDocsStats , MergeResult > batchedResult = batchedResults .set ( i , null );
230+ Tuple < TopDocsStats , MergeResult > batchedResult ;
231+ while (( batchedResult = batchedResults .poll ()) != null ) {
231232 topDocsStats .add (batchedResult .v1 ());
232233 consumePartialMergeResult (batchedResult .v2 (), topDocsList , aggsList );
233234 }
@@ -528,6 +529,12 @@ private void releaseBuffer() {
528529 querySearchResult .releaseAggs ();
529530 }
530531 }
532+ synchronized (this .batchedResults ) {
533+ Tuple <TopDocsStats , MergeResult > batchedResult ;
534+ while ((batchedResult = batchedResults .poll ()) != null ) {
535+ Releasables .close (batchedResult .v2 ().reducedAggs ());
536+ }
537+ }
531538 }
532539
533540 private synchronized void onMergeFailure (Exception exc ) {
0 commit comments