Skip to content

Commit 080dcd7

Browse files
Fix AsyncSearchActionIT tests
Missed a spot here when moving this to delayed deserialization, we can leak pending batch results here on exceptions. closes #126994 closes #126995 closes #126975 closes #126999 closes #127001 closes #126974 closes #127008
1 parent cd138b0 commit 080dcd7

File tree

2 files changed

+11
-25
lines changed

2 files changed

+11
-25
lines changed

muted-tests.yml

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -411,27 +411,6 @@ tests:
411411
- class: org.elasticsearch.cli.keystore.AddStringKeyStoreCommandTests
412412
method: testStdinWithMultipleValues
413413
issue: https://github.com/elastic/elasticsearch/issues/126882
414-
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
415-
method: testDeleteCancelRunningTask
416-
issue: https://github.com/elastic/elasticsearch/issues/126994
417-
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
418-
method: testMaxResponseSize
419-
issue: https://github.com/elastic/elasticsearch/issues/126995
420-
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
421-
method: testRemoveAsyncIndex
422-
issue: https://github.com/elastic/elasticsearch/issues/126975
423-
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
424-
method: testCleanupOnFailure
425-
issue: https://github.com/elastic/elasticsearch/issues/126999
426-
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
427-
method: testUpdateStoreKeepAlive
428-
issue: https://github.com/elastic/elasticsearch/issues/127001
429-
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
430-
method: testRestartAfterCompletion
431-
issue: https://github.com/elastic/elasticsearch/issues/126974
432-
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
433-
method: testDeleteCleanupIndex
434-
issue: https://github.com/elastic/elasticsearch/issues/127008
435414

436415
# Examples:
437416
#

server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2727
import org.elasticsearch.core.Nullable;
2828
import org.elasticsearch.core.Releasable;
29+
import org.elasticsearch.core.Releasables;
2930
import org.elasticsearch.core.Tuple;
3031
import org.elasticsearch.search.SearchPhaseResult;
3132
import org.elasticsearch.search.SearchService;
@@ -162,7 +163,7 @@ public void consumeResult(SearchPhaseResult result, Runnable next) {
162163
consume(querySearchResult, next);
163164
}
164165

165-
private final List<Tuple<TopDocsStats, MergeResult>> batchedResults = new ArrayList<>();
166+
private final ArrayDeque<Tuple<TopDocsStats, MergeResult>> batchedResults = new ArrayDeque<>();
166167

167168
/**
168169
* Unlinks partial merge results from this instance and returns them as a partial merge result to be sent to the coordinating node.
@@ -214,7 +215,7 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
214215
buffer.sort(RESULT_COMPARATOR);
215216
final TopDocsStats topDocsStats = this.topDocsStats;
216217
var mergeResult = this.mergeResult;
217-
final List<Tuple<TopDocsStats, MergeResult>> batchedResults;
218+
final ArrayDeque<Tuple<TopDocsStats, MergeResult>> batchedResults;
218219
synchronized (this.batchedResults) {
219220
batchedResults = this.batchedResults;
220221
}
@@ -226,8 +227,8 @@ public SearchPhaseController.ReducedQueryPhase reduce() throws Exception {
226227
if (mergeResult != null) {
227228
consumePartialMergeResult(mergeResult, topDocsList, aggsList);
228229
}
229-
for (int i = 0; i < batchedResults.size(); i++) {
230-
Tuple<TopDocsStats, MergeResult> batchedResult = batchedResults.set(i, null);
230+
Tuple<TopDocsStats, MergeResult> batchedResult;
231+
while ((batchedResult = batchedResults.poll()) != null) {
231232
topDocsStats.add(batchedResult.v1());
232233
consumePartialMergeResult(batchedResult.v2(), topDocsList, aggsList);
233234
}
@@ -522,6 +523,12 @@ private void releaseBuffer() {
522523
querySearchResult.releaseAggs();
523524
}
524525
}
526+
synchronized (this.batchedResults) {
527+
Tuple<TopDocsStats, MergeResult> batchedResult;
528+
while ((batchedResult = batchedResults.poll()) != null) {
529+
Releasables.close(batchedResult.v2().reducedAggs());
530+
}
531+
}
525532
}
526533

527534
private synchronized void onMergeFailure(Exception exc) {

0 commit comments

Comments
 (0)