From ead72b1d4f16f168d0cb379d660d5b1ec8bc5d57 Mon Sep 17 00:00:00 2001 From: tomerqodo Date: Mon, 24 Nov 2025 10:01:59 +0200 Subject: [PATCH] Apply changes for benchmark PR --- docs/changelog/136889.yaml | 6 + muted-tests.yml | 3 - .../SearchQueryThenFetchAsyncAction.java | 156 ++++++++++++++++-- .../search/query/QuerySearchResult.java | 1 - ...sponse_might_include_reduction_failure.csv | 1 + .../resources/transport/upper_bounds/9.1.csv | 2 +- .../resources/transport/upper_bounds/9.2.csv | 2 +- .../resources/transport/upper_bounds/9.3.csv | 2 +- 8 files changed, 153 insertions(+), 20 deletions(-) create mode 100644 docs/changelog/136889.yaml create mode 100644 server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv diff --git a/docs/changelog/136889.yaml b/docs/changelog/136889.yaml new file mode 100644 index 0000000000000..8888eeb316b6c --- /dev/null +++ b/docs/changelog/136889.yaml @@ -0,0 +1,6 @@ +pr: 136889 +summary: Remove early phase failure in batched +area: Search +type: bug +issues: + - 134151 diff --git a/muted-tests.yml b/muted-tests.yml index 8e2b4cfcd68fd..54ceb5af00b38 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -258,9 +258,6 @@ tests: - class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT method: test {p0=mtermvectors/10_basic/Tests catching other exceptions per item} issue: https://github.com/elastic/elasticsearch/issues/122414 -- class: org.elasticsearch.search.SearchWithRejectionsIT - method: testOpenContextsAfterRejections - issue: https://github.com/elastic/elasticsearch/issues/130821 - class: org.elasticsearch.packaging.test.DockerTests method: test090SecurityCliPackaging issue: https://github.com/elastic/elasticsearch/issues/131107 diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 7ad256895ea8f..25f436113baf3 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -85,6 +85,9 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction i.readBoolean() ? new QuerySearchResult(i) : i.readException(), Object[]::new); - this.mergeResult = QueryPhaseResultConsumer.MergeResult.readFrom(in); - this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in); + if (in.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE) && in.readBoolean()) { + this.reductionFailure = in.readException(); + this.mergeResult = null; + this.topDocsStats = null; + } else { + this.reductionFailure = null; + this.mergeResult = QueryPhaseResultConsumer.MergeResult.readFrom(in); + this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in); + } } // public for tests @@ -240,6 +251,10 @@ public Object[] getResults() { return results; } + Exception getReductionFailure() { + return reductionFailure; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(results.length); @@ -250,7 +265,17 @@ public void writeTo(StreamOutput out) throws IOException { writePerShardResult(out, (QuerySearchResult) result); } } - writeMergeResult(out, mergeResult, topDocsStats); + if (out.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE)) { + boolean hasReductionFailure = reductionFailure != null; + out.writeBoolean(hasReductionFailure); + if (hasReductionFailure) { + out.writeException(reductionFailure); + } else { + writeMergeResult(out, mergeResult, topDocsStats); + } + } else { + writeMergeResult(out, mergeResult, topDocsStats); + } } @Override @@ -515,7 +540,12 @@ public Executor executor() { @Override public void handleResponse(NodeQueryResponse response) { if (results instanceof QueryPhaseResultConsumer queryPhaseResultConsumer) { - queryPhaseResultConsumer.addBatchedPartialResult(response.topDocsStats, response.mergeResult); + Exception reductionFailure = response.getReductionFailure(); + if (reductionFailure != null) { + queryPhaseResultConsumer.failure.compareAndSet(null, reductionFailure); + } else { + queryPhaseResultConsumer.addBatchedPartialResult(response.topDocsStats, response.mergeResult); + } } for (int i = 0; i < response.results.length; i++) { var s = request.shards.get(i); @@ -537,6 +567,21 @@ public void handleResponse(NodeQueryResponse response) { @Override public void handleException(TransportException e) { + if (connection.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE) == false) { + bwcHandleException(e); + return; + } + Exception cause = (Exception) ExceptionsHelper.unwrapCause(e); + logger.debug("handling node search exception coming from [" + nodeId + "]", cause); + onNodeQueryFailure(e, request, routing); + } + + /** + * This code is strictly for _snapshot_ backwards compatibility. The feature flag + * {@link SearchService#BATCHED_QUERY_PHASE_FEATURE_FLAG} was not turned on when the transport version + * {@link SearchQueryThenFetchAsyncAction#BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE} was introduced. + */ + private void bwcHandleException(TransportException e) { Exception cause = (Exception) ExceptionsHelper.unwrapCause(e); logger.debug("handling node search exception coming from [" + nodeId + "]", cause); if (e instanceof SendRequestTransportException || cause instanceof TaskCancelledException) { @@ -817,13 +862,101 @@ void onShardDone() { if (countDown.countDown() == false) { return; } + if (channel.getVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE) == false) { + bwcRespond(); + return; + } + var channelListener = new ChannelActionListener<>(channel); + RecyclerBytesStreamOutput out = dependencies.transportService.newNetworkBytesStream(); + out.setTransportVersion(channel.getVersion()); + try (queryPhaseResultConsumer) { + Exception reductionFailure = queryPhaseResultConsumer.failure.get(); + if (reductionFailure == null) { + writeSuccessfulResponse(out); + } else { + writeReductionFailureResponse(out, reductionFailure); + } + } catch (IOException e) { + releaseAllResultsContexts(); + channelListener.onFailure(e); + return; + } + ActionListener.respondAndRelease( + channelListener, + new BytesTransportResponse(out.moveToBytesReference(), out.getTransportVersion()) + ); + } + + // Writes the "successful" response (see NodeQueryResponse for the corresponding read logic) + private void writeSuccessfulResponse(RecyclerBytesStreamOutput out) throws IOException { + final QueryPhaseResultConsumer.MergeResult mergeResult; + try { + mergeResult = Objects.requireNonNullElse( + queryPhaseResultConsumer.consumePartialMergeResultDataNode(), + EMPTY_PARTIAL_MERGE_RESULT + ); + } catch (Exception e) { + writeReductionFailureResponse(out, e); + return; + } + // translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments, + // also collect the set of indices that may be part of a subsequent fetch operation here so that we can release all other + // indices without a roundtrip to the coordinating node + final BitSet relevantShardIndices = new BitSet(searchRequest.shards.size()); + if (mergeResult.reducedTopDocs() != null) { + for (ScoreDoc scoreDoc : mergeResult.reducedTopDocs().scoreDocs) { + final int localIndex = scoreDoc.shardIndex; + scoreDoc.shardIndex = searchRequest.shards.get(localIndex).shardIndex; + relevantShardIndices.set(localIndex); + } + } + final int resultCount = queryPhaseResultConsumer.getNumShards(); + out.writeVInt(resultCount); + for (int i = 0; i < resultCount; i++) { + var result = queryPhaseResultConsumer.results.get(i); + if (result == null) { + NodeQueryResponse.writePerShardException(out, failures.remove(i)); + } else { + // free context id and remove it from the result right away in case we don't need it anymore + maybeFreeContext(result, relevantShardIndices, namedWriteableRegistry); + NodeQueryResponse.writePerShardResult(out, result); + } + } + out.writeBoolean(false); // does not have a reduction failure + NodeQueryResponse.writeMergeResult(out, mergeResult, queryPhaseResultConsumer.topDocsStats); + } + + // Writes the "reduction failure" response (see NodeQueryResponse for the corresponding read logic) + private void writeReductionFailureResponse(RecyclerBytesStreamOutput out, Exception reductionFailure) throws IOException { + final int resultCount = queryPhaseResultConsumer.getNumShards(); + out.writeVInt(resultCount); + for (int i = 0; i < resultCount; i++) { + var result = queryPhaseResultConsumer.results.get(i); + if (result == null) { + NodeQueryResponse.writePerShardException(out, failures.remove(i)); + } else { + NodeQueryResponse.writePerShardResult(out, result); + } + } + out.writeBoolean(true); // does have a reduction failure + out.writeException(reductionFailure); + releaseAllResultsContexts(); + } + + /** + * This code is strictly for _snapshot_ backwards compatibility. The feature flag + * {@link SearchService#BATCHED_QUERY_PHASE_FEATURE_FLAG} was not turned on when the transport version + * {@link SearchQueryThenFetchAsyncAction#BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE} was introduced. + */ + void bwcRespond() { RecyclerBytesStreamOutput out = null; boolean success = false; var channelListener = new ChannelActionListener<>(channel); try (queryPhaseResultConsumer) { var failure = queryPhaseResultConsumer.failure.get(); if (failure != null) { - handleMergeFailure(failure, channelListener, namedWriteableRegistry); + releaseAllResultsContexts(); + channelListener.onFailure(failure); return; } final QueryPhaseResultConsumer.MergeResult mergeResult; @@ -833,7 +966,8 @@ void onShardDone() { EMPTY_PARTIAL_MERGE_RESULT ); } catch (Exception e) { - handleMergeFailure(e, channelListener, namedWriteableRegistry); + releaseAllResultsContexts(); + channelListener.onFailure(e); return; } // translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments, @@ -865,7 +999,8 @@ void onShardDone() { NodeQueryResponse.writeMergeResult(out, mergeResult, queryPhaseResultConsumer.topDocsStats); success = true; } catch (IOException e) { - handleMergeFailure(e, channelListener, namedWriteableRegistry); + releaseAllResultsContexts(); + channelListener.onFailure(e); return; } } finally { @@ -897,11 +1032,7 @@ && isPartOfPIT(searchRequest.searchRequest, q.getContextId(), namedWriteableRegi } } - private void handleMergeFailure( - Exception e, - ChannelActionListener channelListener, - NamedWriteableRegistry namedWriteableRegistry - ) { + private void releaseAllResultsContexts() { queryPhaseResultConsumer.getSuccessfulResults() .forEach( searchPhaseResult -> releaseLocalContext( @@ -911,7 +1042,6 @@ private void handleMergeFailure( namedWriteableRegistry ) ); - channelListener.onFailure(e); } void consumeResult(QuerySearchResult queryResult) { diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 1b3af6c22ca51..f1f04a483deca 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -504,7 +504,6 @@ public void writeToNoId(StreamOutput out) throws IOException { out.writeBoolean(true); writeTopDocs(out, topDocsAndMaxScore); } else { - assert isPartiallyReduced(); out.writeBoolean(false); } } else { diff --git a/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv b/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv new file mode 100644 index 0000000000000..eef83daf2840e --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv @@ -0,0 +1 @@ +9213000,9185007,9112012 diff --git a/server/src/main/resources/transport/upper_bounds/9.1.csv b/server/src/main/resources/transport/upper_bounds/9.1.csv index bd568bbaa22c0..4a30ba80bbfa3 100644 --- a/server/src/main/resources/transport/upper_bounds/9.1.csv +++ b/server/src/main/resources/transport/upper_bounds/9.1.csv @@ -1 +1 @@ -initial_9.1.7,9112011 +batched_response_might_include_reduction_failure,9112012 diff --git a/server/src/main/resources/transport/upper_bounds/9.2.csv b/server/src/main/resources/transport/upper_bounds/9.2.csv index 24c87f7fbf43a..b0c31e59ae1b2 100644 --- a/server/src/main/resources/transport/upper_bounds/9.2.csv +++ b/server/src/main/resources/transport/upper_bounds/9.2.csv @@ -1 +1 @@ -ilm_searchable_snapshot_opt_out_clone,9185006 +batched_response_might_include_reduction_failure,9185007 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv index ce5e1c85f99fd..61602dea24d29 100644 --- a/server/src/main/resources/transport/upper_bounds/9.3.csv +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -1 +1 @@ -resolved_fields_caps,9212000 +batched_response_might_include_reduction_failure,9213000