diff --git a/muted-tests.yml b/muted-tests.yml index f754a00797d4f..247b0d976cb4a 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -152,9 +152,6 @@ tests: - class: org.elasticsearch.action.RejectionActionIT method: testSimulatedSearchRejectionLoad issue: https://github.com/elastic/elasticsearch/issues/125901 -- class: org.elasticsearch.search.basic.SearchWithRandomDisconnectsIT - method: testSearchWithRandomDisconnects - issue: https://github.com/elastic/elasticsearch/issues/122707 - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=transform/transforms_reset/Test force reseting a running transform} issue: https://github.com/elastic/elasticsearch/issues/126240 diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 25f436113baf3..133108b64a981 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -836,6 +836,7 @@ private static final class QueryPerNodeState { private final int topDocsSize; private final CountDown countDown; private final TransportChannel channel; + private final ActionListener listener; private volatile BottomSortValuesCollector bottomSortCollector; private final NamedWriteableRegistry namedWriteableRegistry; @@ -854,6 +855,7 @@ private QueryPerNodeState( this.task = task; this.countDown = new CountDown(queryPhaseResultConsumer.getNumShards()); this.channel = channel; + this.listener = ActionListener.releaseBefore(queryPhaseResultConsumer, new ChannelActionListener<>(channel)); this.dependencies = dependencies; this.namedWriteableRegistry = namedWriteableRegistry; } @@ -866,10 +868,9 @@ void onShardDone() { bwcRespond(); return; } - var channelListener = new ChannelActionListener<>(channel); RecyclerBytesStreamOutput out = dependencies.transportService.newNetworkBytesStream(); out.setTransportVersion(channel.getVersion()); - try (queryPhaseResultConsumer) { + try { Exception reductionFailure = queryPhaseResultConsumer.failure.get(); if (reductionFailure == null) { writeSuccessfulResponse(out); @@ -878,13 +879,10 @@ void onShardDone() { } } catch (IOException e) { releaseAllResultsContexts(); - channelListener.onFailure(e); + listener.onFailure(e); return; } - ActionListener.respondAndRelease( - channelListener, - new BytesTransportResponse(out.moveToBytesReference(), out.getTransportVersion()) - ); + ActionListener.respondAndRelease(listener, new BytesTransportResponse(out.moveToBytesReference(), out.getTransportVersion())); } // Writes the "successful" response (see NodeQueryResponse for the corresponding read logic) @@ -951,12 +949,11 @@ private void writeReductionFailureResponse(RecyclerBytesStreamOutput out, Except void bwcRespond() { RecyclerBytesStreamOutput out = null; boolean success = false; - var channelListener = new ChannelActionListener<>(channel); - try (queryPhaseResultConsumer) { + try { var failure = queryPhaseResultConsumer.failure.get(); if (failure != null) { releaseAllResultsContexts(); - channelListener.onFailure(failure); + listener.onFailure(failure); return; } final QueryPhaseResultConsumer.MergeResult mergeResult; @@ -967,7 +964,7 @@ void bwcRespond() { ); } catch (Exception e) { releaseAllResultsContexts(); - channelListener.onFailure(e); + listener.onFailure(e); return; } // translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments, @@ -1000,7 +997,7 @@ void bwcRespond() { success = true; } catch (IOException e) { releaseAllResultsContexts(); - channelListener.onFailure(e); + listener.onFailure(e); return; } } finally { @@ -1008,10 +1005,7 @@ void bwcRespond() { out.close(); } } - ActionListener.respondAndRelease( - channelListener, - new BytesTransportResponse(out.moveToBytesReference(), out.getTransportVersion()) - ); + ActionListener.respondAndRelease(listener, new BytesTransportResponse(out.moveToBytesReference(), out.getTransportVersion())); } private void maybeFreeContext(