Skip to content

Commit 18dd8e3

Browse files
committed
Close consumer on listener completion
1 parent bcb1402 commit 18dd8e3

File tree

2 files changed

+10
-13
lines changed

2 files changed

+10
-13
lines changed

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,6 @@ tests:
152152
- class: org.elasticsearch.action.RejectionActionIT
153153
method: testSimulatedSearchRejectionLoad
154154
issue: https://github.com/elastic/elasticsearch/issues/125901
155-
- class: org.elasticsearch.search.basic.SearchWithRandomDisconnectsIT
156-
method: testSearchWithRandomDisconnects
157-
issue: https://github.com/elastic/elasticsearch/issues/122707
158155
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
159156
method: test {p0=transform/transforms_reset/Test force reseting a running transform}
160157
issue: https://github.com/elastic/elasticsearch/issues/126240

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -836,6 +836,7 @@ private static final class QueryPerNodeState {
836836
private final int topDocsSize;
837837
private final CountDown countDown;
838838
private final TransportChannel channel;
839+
private final ActionListener<TransportResponse> listener;
839840
private volatile BottomSortValuesCollector bottomSortCollector;
840841
private final NamedWriteableRegistry namedWriteableRegistry;
841842

@@ -854,6 +855,7 @@ private QueryPerNodeState(
854855
this.task = task;
855856
this.countDown = new CountDown(queryPhaseResultConsumer.getNumShards());
856857
this.channel = channel;
858+
this.listener = ActionListener.releaseBefore(queryPhaseResultConsumer, new ChannelActionListener<>(channel));
857859
this.dependencies = dependencies;
858860
this.namedWriteableRegistry = namedWriteableRegistry;
859861
}
@@ -866,10 +868,9 @@ void onShardDone() {
866868
bwcRespond();
867869
return;
868870
}
869-
var channelListener = new ChannelActionListener<>(channel);
870871
RecyclerBytesStreamOutput out = dependencies.transportService.newNetworkBytesStream();
871872
out.setTransportVersion(channel.getVersion());
872-
try (queryPhaseResultConsumer) {
873+
try {
873874
Exception reductionFailure = queryPhaseResultConsumer.failure.get();
874875
if (reductionFailure == null) {
875876
writeSuccessfulResponse(out);
@@ -878,11 +879,11 @@ void onShardDone() {
878879
}
879880
} catch (IOException e) {
880881
releaseAllResultsContexts();
881-
channelListener.onFailure(e);
882+
listener.onFailure(e);
882883
return;
883884
}
884885
ActionListener.respondAndRelease(
885-
channelListener,
886+
listener,
886887
new BytesTransportResponse(out.moveToBytesReference(), out.getTransportVersion())
887888
);
888889
}
@@ -951,12 +952,11 @@ private void writeReductionFailureResponse(RecyclerBytesStreamOutput out, Except
951952
void bwcRespond() {
952953
RecyclerBytesStreamOutput out = null;
953954
boolean success = false;
954-
var channelListener = new ChannelActionListener<>(channel);
955-
try (queryPhaseResultConsumer) {
955+
try {
956956
var failure = queryPhaseResultConsumer.failure.get();
957957
if (failure != null) {
958958
releaseAllResultsContexts();
959-
channelListener.onFailure(failure);
959+
listener.onFailure(failure);
960960
return;
961961
}
962962
final QueryPhaseResultConsumer.MergeResult mergeResult;
@@ -967,7 +967,7 @@ void bwcRespond() {
967967
);
968968
} catch (Exception e) {
969969
releaseAllResultsContexts();
970-
channelListener.onFailure(e);
970+
listener.onFailure(e);
971971
return;
972972
}
973973
// translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments,
@@ -1000,7 +1000,7 @@ void bwcRespond() {
10001000
success = true;
10011001
} catch (IOException e) {
10021002
releaseAllResultsContexts();
1003-
channelListener.onFailure(e);
1003+
listener.onFailure(e);
10041004
return;
10051005
}
10061006
} finally {
@@ -1009,7 +1009,7 @@ void bwcRespond() {
10091009
}
10101010
}
10111011
ActionListener.respondAndRelease(
1012-
channelListener,
1012+
listener,
10131013
new BytesTransportResponse(out.moveToBytesReference(), out.getTransportVersion())
10141014
);
10151015
}

0 commit comments

Comments
 (0)