diff --git a/muted-tests.yml b/muted-tests.yml index 1d60d6b1aebdc..665d03570523d 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -408,27 +408,6 @@ tests: - class: org.elasticsearch.cli.keystore.AddStringKeyStoreCommandTests method: testStdinWithMultipleValues issue: https://github.com/elastic/elasticsearch/issues/126882 -- class: org.elasticsearch.xpack.search.AsyncSearchActionIT - method: testDeleteCancelRunningTask - issue: https://github.com/elastic/elasticsearch/issues/126994 -- class: org.elasticsearch.xpack.search.AsyncSearchActionIT - method: testMaxResponseSize - issue: https://github.com/elastic/elasticsearch/issues/126995 -- class: org.elasticsearch.xpack.search.AsyncSearchActionIT - method: testRemoveAsyncIndex - issue: https://github.com/elastic/elasticsearch/issues/126975 -- class: org.elasticsearch.xpack.search.AsyncSearchActionIT - method: testCleanupOnFailure - issue: https://github.com/elastic/elasticsearch/issues/126999 -- class: org.elasticsearch.xpack.search.AsyncSearchActionIT - method: testUpdateStoreKeepAlive - issue: https://github.com/elastic/elasticsearch/issues/127001 -- class: org.elasticsearch.xpack.search.AsyncSearchActionIT - method: testRestartAfterCompletion - issue: https://github.com/elastic/elasticsearch/issues/126974 -- class: org.elasticsearch.xpack.search.AsyncSearchActionIT - method: testDeleteCleanupIndex - issue: https://github.com/elastic/elasticsearch/issues/127008 - class: org.elasticsearch.packaging.test.DockerTests method: test024InstallPluginFromArchiveUsingConfigFile issue: https://github.com/elastic/elasticsearch/issues/126936 diff --git a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java index 3bde579a823f6..ec63d38616153 100644 --- a/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java +++ b/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; import org.elasticsearch.core.Tuple; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchService; @@ -162,7 +163,7 @@ public void consumeResult(SearchPhaseResult result, Runnable next) { consume(querySearchResult, next); } - private final List> batchedResults = new ArrayList<>(); + private final ArrayDeque> batchedResults = new ArrayDeque<>(); /** * Unlinks partial merge results from this instance and returns them as a partial merge result to be sent to the coordinating node. @@ -214,7 +215,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception { buffer.sort(RESULT_COMPARATOR); final TopDocsStats topDocsStats = this.topDocsStats; var mergeResult = this.mergeResult; - final List> batchedResults; + final ArrayDeque> batchedResults; synchronized (this.batchedResults) { batchedResults = this.batchedResults; } @@ -226,8 +227,8 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception { if (mergeResult != null) { consumePartialMergeResult(mergeResult, topDocsList, aggsList); } - for (int i = 0; i < batchedResults.size(); i++) { - Tuple batchedResult = batchedResults.set(i, null); + Tuple batchedResult; + while ((batchedResult = batchedResults.poll()) != null) { topDocsStats.add(batchedResult.v1()); consumePartialMergeResult(batchedResult.v2(), topDocsList, aggsList); } @@ -528,6 +529,12 @@ private void releaseBuffer() { querySearchResult.releaseAggs(); } } + synchronized (this.batchedResults) { + Tuple batchedResult; + while ((batchedResult = batchedResults.poll()) != null) { + Releasables.close(batchedResult.v2().reducedAggs()); + } + } } private synchronized void onMergeFailure(Exception exc) {