Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,6 @@ tests:
- class: org.elasticsearch.action.RejectionActionIT
method: testSimulatedSearchRejectionLoad
issue: https://github.com/elastic/elasticsearch/issues/125901
- class: org.elasticsearch.search.basic.SearchWithRandomDisconnectsIT
method: testSearchWithRandomDisconnects
issue: https://github.com/elastic/elasticsearch/issues/122707
- class: org.elasticsearch.xpack.test.rest.XPackRestIT
method: test {p0=transform/transforms_reset/Test force reseting a running transform}
issue: https://github.com/elastic/elasticsearch/issues/126240
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,7 @@ private static final class QueryPerNodeState {
private final int topDocsSize;
private final CountDown countDown;
private final TransportChannel channel;
private final ActionListener<TransportResponse> listener;
private volatile BottomSortValuesCollector bottomSortCollector;
private final NamedWriteableRegistry namedWriteableRegistry;

Expand All @@ -854,6 +855,7 @@ private QueryPerNodeState(
this.task = task;
this.countDown = new CountDown(queryPhaseResultConsumer.getNumShards());
this.channel = channel;
this.listener = ActionListener.releaseBefore(queryPhaseResultConsumer, new ChannelActionListener<>(channel));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach structurally looks better, as it centralizes the consumer’s lifecycle, ensuring that every success/failure path passes through the same wrapper.

Both approaches behave the same,

  • Serialize with an open consumer,
  • Close the consumer
  • Send response (or failure)
  • Let respondAndRelease to free the bytes

The code in main

  • Uses a local ChannelActionListener and a try (queryPhaseResultConsumer) block.
  • On success, the consumer is closed at the end of the try-with-resources (consumer) block (i.e., after serialization finishes, before building/sending the transport response).
  • On failure, the consumer is also closed by the try-with-resources and failure is sent via channelListener.

The pr's code:

  • Wraps the channel listener with releaseBefore(consumer, …) so the consumer is always closed before sending success/failure.
  • On success, the consumer is closed right before delegating to the channel (via wrapper). Serialization happens with consumer open; then the wrapper closes it and writes.
  • On failure, listener.onFailure(e) closes the consumer first (via wrapper) and then writes the failure.

I prefer the updated code, as releasing the consumer uniformly on both success and failure is cleaner; however, I’m not convinced it addresses the underlying issue.

I ran multiple local executions with @Repeat of SearchWithRandomDisconnectsIT#testSearchWithRandomDisconnects and was unable to reproduce the failure. Not sure if something changed or if it’s just my machine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, you're right, I'm struggling to reproduce on main myself now. I could have sworn I continued to see failures after my #136889 fix, but I might be mistaken - perhaps that solved it.

What do you think? I'm thinking to table this change, which might still be a worthy improvement, and just unmute SearchWithRandomDisconnectsIT for now. We can see if it's still failing in CI.

this.dependencies = dependencies;
this.namedWriteableRegistry = namedWriteableRegistry;
}
Expand All @@ -866,10 +868,9 @@ void onShardDone() {
bwcRespond();
return;
}
var channelListener = new ChannelActionListener<>(channel);
RecyclerBytesStreamOutput out = dependencies.transportService.newNetworkBytesStream();
out.setTransportVersion(channel.getVersion());
try (queryPhaseResultConsumer) {
try {
Exception reductionFailure = queryPhaseResultConsumer.failure.get();
if (reductionFailure == null) {
writeSuccessfulResponse(out);
Expand All @@ -878,13 +879,10 @@ void onShardDone() {
}
} catch (IOException e) {
releaseAllResultsContexts();
channelListener.onFailure(e);
listener.onFailure(e);
return;
}
ActionListener.respondAndRelease(
channelListener,
new BytesTransportResponse(out.moveToBytesReference(), out.getTransportVersion())
);
ActionListener.respondAndRelease(listener, new BytesTransportResponse(out.moveToBytesReference(), out.getTransportVersion()));
}

// Writes the "successful" response (see NodeQueryResponse for the corresponding read logic)
Expand Down Expand Up @@ -951,12 +949,11 @@ private void writeReductionFailureResponse(RecyclerBytesStreamOutput out, Except
void bwcRespond() {
RecyclerBytesStreamOutput out = null;
boolean success = false;
var channelListener = new ChannelActionListener<>(channel);
try (queryPhaseResultConsumer) {
try {
var failure = queryPhaseResultConsumer.failure.get();
if (failure != null) {
releaseAllResultsContexts();
channelListener.onFailure(failure);
listener.onFailure(failure);
return;
}
final QueryPhaseResultConsumer.MergeResult mergeResult;
Expand All @@ -967,7 +964,7 @@ void bwcRespond() {
);
} catch (Exception e) {
releaseAllResultsContexts();
channelListener.onFailure(e);
listener.onFailure(e);
return;
}
// translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments,
Expand Down Expand Up @@ -1000,18 +997,15 @@ void bwcRespond() {
success = true;
} catch (IOException e) {
releaseAllResultsContexts();
channelListener.onFailure(e);
listener.onFailure(e);
return;
}
} finally {
if (success == false && out != null) {
out.close();
}
}
ActionListener.respondAndRelease(
channelListener,
new BytesTransportResponse(out.moveToBytesReference(), out.getTransportVersion())
);
ActionListener.respondAndRelease(listener, new BytesTransportResponse(out.moveToBytesReference(), out.getTransportVersion()));
}

private void maybeFreeContext(
Expand Down