diff --git a/docs/changelog/136889.yaml b/docs/changelog/136889.yaml new file mode 100644 index 0000000000000..8888eeb316b6c --- /dev/null +++ b/docs/changelog/136889.yaml @@ -0,0 +1,6 @@ +pr: 136889 +summary: Remove early phase failure in batched +area: Search +type: bug +issues: + - 134151 diff --git a/muted-tests.yml b/muted-tests.yml index 015a5ec8608cb..237dbf4e3fe8d 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -377,9 +377,6 @@ tests: issue: https://github.com/elastic/elasticsearch/issues/130050 - class: geoip.GeoIpMultiProjectIT issue: https://github.com/elastic/elasticsearch/issues/130073 -- class: org.elasticsearch.search.SearchWithRejectionsIT - method: testOpenContextsAfterRejections - issue: https://github.com/elastic/elasticsearch/issues/130821 - class: org.elasticsearch.index.IndexingPressureIT method: testWriteCanRejectOnPrimaryBasedOnMaxOperationSize issue: https://github.com/elastic/elasticsearch/issues/130281 diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index a6825ccc81d2c..5db125e08637b 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -83,6 +83,9 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction i.readBoolean() ? new QuerySearchResult(i) : i.readException(), Object[]::new); - this.mergeResult = QueryPhaseResultConsumer.MergeResult.readFrom(in); - this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in); + if (in.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE) && in.readBoolean()) { + this.reductionFailure = in.readException(); + this.mergeResult = null; + this.topDocsStats = null; + } else { + this.reductionFailure = null; + this.mergeResult = QueryPhaseResultConsumer.MergeResult.readFrom(in); + this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in); + } } // public for tests @@ -235,6 +246,10 @@ public Object[] getResults() { return results; } + Exception getReductionFailure() { + return reductionFailure; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(results.length); @@ -245,7 +260,17 @@ public void writeTo(StreamOutput out) throws IOException { writePerShardResult(out, (QuerySearchResult) result); } } - writeMergeResult(out, mergeResult, topDocsStats); + if (out.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE)) { + boolean hasReductionFailure = reductionFailure != null; + out.writeBoolean(hasReductionFailure); + if (hasReductionFailure) { + out.writeException(reductionFailure); + } else { + writeMergeResult(out, mergeResult, topDocsStats); + } + } else { + writeMergeResult(out, mergeResult, topDocsStats); + } } @Override @@ -498,7 +523,12 @@ public Executor executor() { @Override public void handleResponse(NodeQueryResponse response) { if (results instanceof QueryPhaseResultConsumer queryPhaseResultConsumer) { - queryPhaseResultConsumer.addBatchedPartialResult(response.topDocsStats, response.mergeResult); + Exception reductionFailure = response.getReductionFailure(); + if (reductionFailure != null) { + queryPhaseResultConsumer.failure.compareAndSet(null, reductionFailure); + } else { + queryPhaseResultConsumer.addBatchedPartialResult(response.topDocsStats, response.mergeResult); + } } for (int i = 0; i < response.results.length; i++) { var s = request.shards.get(i); @@ -520,6 +550,21 @@ public void handleResponse(NodeQueryResponse response) { @Override public void handleException(TransportException e) { + if (connection.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE) == false) { + bwcHandleException(e); + return; + } + Exception cause = (Exception) ExceptionsHelper.unwrapCause(e); + logger.debug("handling node search exception coming from [" + nodeId + "]", cause); + onNodeQueryFailure(e, request, routing); + } + + /** + * This code is strictly for _snapshot_ backwards compatibility. The feature flag + * {@link SearchService#BATCHED_QUERY_PHASE_FEATURE_FLAG} was not turned on when the transport version + * {@link SearchQueryThenFetchAsyncAction#BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE} was introduced. + */ + private void bwcHandleException(TransportException e) { Exception cause = (Exception) ExceptionsHelper.unwrapCause(e); logger.debug("handling node search exception coming from [" + nodeId + "]", cause); if (e instanceof SendRequestTransportException || cause instanceof TaskCancelledException) { @@ -791,13 +836,98 @@ void onShardDone() { if (countDown.countDown() == false) { return; } + if (channel.getVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE) == false) { + bwcRespond(); + return; + } + var channelListener = new ChannelActionListener<>(channel); + RecyclerBytesStreamOutput out = dependencies.transportService.newNetworkBytesStream(); + out.setTransportVersion(channel.getVersion()); + try (queryPhaseResultConsumer) { + Exception reductionFailure = queryPhaseResultConsumer.failure.get(); + if (reductionFailure == null) { + writeSuccessfulResponse(out); + } else { + writeReductionFailureResponse(out, reductionFailure); + } + } catch (IOException e) { + releaseAllResultsContexts(); + channelListener.onFailure(e); + return; + } + ActionListener.respondAndRelease(channelListener, new BytesTransportResponse(out.moveToBytesReference())); + } + + // Writes the "successful" response (see NodeQueryResponse for the corresponding read logic) + private void writeSuccessfulResponse(RecyclerBytesStreamOutput out) throws IOException { + final QueryPhaseResultConsumer.MergeResult mergeResult; + try { + mergeResult = Objects.requireNonNullElse( + queryPhaseResultConsumer.consumePartialMergeResultDataNode(), + EMPTY_PARTIAL_MERGE_RESULT + ); + } catch (Exception e) { + writeReductionFailureResponse(out, e); + return; + } + // translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments, + // also collect the set of indices that may be part of a subsequent fetch operation here so that we can release all other + // indices without a roundtrip to the coordinating node + final BitSet relevantShardIndices = new BitSet(searchRequest.shards.size()); + if (mergeResult.reducedTopDocs() != null) { + for (ScoreDoc scoreDoc : mergeResult.reducedTopDocs().scoreDocs) { + final int localIndex = scoreDoc.shardIndex; + scoreDoc.shardIndex = searchRequest.shards.get(localIndex).shardIndex; + relevantShardIndices.set(localIndex); + } + } + final int resultCount = queryPhaseResultConsumer.getNumShards(); + out.writeVInt(resultCount); + for (int i = 0; i < resultCount; i++) { + var result = queryPhaseResultConsumer.results.get(i); + if (result == null) { + NodeQueryResponse.writePerShardException(out, failures.remove(i)); + } else { + // free context id and remove it from the result right away in case we don't need it anymore + maybeFreeContext(result, relevantShardIndices, namedWriteableRegistry); + NodeQueryResponse.writePerShardResult(out, result); + } + } + out.writeBoolean(false); // does not have a reduction failure + NodeQueryResponse.writeMergeResult(out, mergeResult, queryPhaseResultConsumer.topDocsStats); + } + + // Writes the "reduction failure" response (see NodeQueryResponse for the corresponding read logic) + private void writeReductionFailureResponse(RecyclerBytesStreamOutput out, Exception reductionFailure) throws IOException { + final int resultCount = queryPhaseResultConsumer.getNumShards(); + out.writeVInt(resultCount); + for (int i = 0; i < resultCount; i++) { + var result = queryPhaseResultConsumer.results.get(i); + if (result == null) { + NodeQueryResponse.writePerShardException(out, failures.remove(i)); + } else { + NodeQueryResponse.writePerShardResult(out, result); + } + } + out.writeBoolean(true); // does have a reduction failure + out.writeException(reductionFailure); + releaseAllResultsContexts(); + } + + /** + * This code is strictly for _snapshot_ backwards compatibility. The feature flag + * {@link SearchService#BATCHED_QUERY_PHASE_FEATURE_FLAG} was not turned on when the transport version + * {@link SearchQueryThenFetchAsyncAction#BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE} was introduced. + */ + void bwcRespond() { RecyclerBytesStreamOutput out = null; boolean success = false; var channelListener = new ChannelActionListener<>(channel); try (queryPhaseResultConsumer) { var failure = queryPhaseResultConsumer.failure.get(); if (failure != null) { - handleMergeFailure(failure, channelListener, namedWriteableRegistry); + releaseAllResultsContexts(); + channelListener.onFailure(failure); return; } final QueryPhaseResultConsumer.MergeResult mergeResult; @@ -807,7 +937,8 @@ void onShardDone() { EMPTY_PARTIAL_MERGE_RESULT ); } catch (Exception e) { - handleMergeFailure(e, channelListener, namedWriteableRegistry); + releaseAllResultsContexts(); + channelListener.onFailure(e); return; } // translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments, @@ -839,7 +970,8 @@ void onShardDone() { NodeQueryResponse.writeMergeResult(out, mergeResult, queryPhaseResultConsumer.topDocsStats); success = true; } catch (IOException e) { - handleMergeFailure(e, channelListener, namedWriteableRegistry); + releaseAllResultsContexts(); + channelListener.onFailure(e); return; } } finally { @@ -868,11 +1000,7 @@ && isPartOfPIT(searchRequest.searchRequest, q.getContextId(), namedWriteableRegi } } - private void handleMergeFailure( - Exception e, - ChannelActionListener channelListener, - NamedWriteableRegistry namedWriteableRegistry - ) { + private void releaseAllResultsContexts() { queryPhaseResultConsumer.getSuccessfulResults() .forEach( searchPhaseResult -> releaseLocalContext( @@ -882,7 +1010,6 @@ private void handleMergeFailure( namedWriteableRegistry ) ); - channelListener.onFailure(e); } void consumeResult(QuerySearchResult queryResult) { diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index d11457ca91536..ff811a5f7abfd 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -498,7 +498,6 @@ public void writeToNoId(StreamOutput out) throws IOException { out.writeBoolean(true); writeTopDocs(out, topDocsAndMaxScore); } else { - assert isPartiallyReduced(); out.writeBoolean(false); } } else { diff --git a/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv b/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv new file mode 100644 index 0000000000000..eef83daf2840e --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv @@ -0,0 +1 @@ +9213000,9185007,9112012 diff --git a/server/src/main/resources/transport/upper_bounds/9.1.csv b/server/src/main/resources/transport/upper_bounds/9.1.csv index bd568bbaa22c0..4a30ba80bbfa3 100644 --- a/server/src/main/resources/transport/upper_bounds/9.1.csv +++ b/server/src/main/resources/transport/upper_bounds/9.1.csv @@ -1 +1 @@ -initial_9.1.7,9112011 +batched_response_might_include_reduction_failure,9112012 diff --git a/server/src/main/resources/transport/upper_bounds/9.2.csv b/server/src/main/resources/transport/upper_bounds/9.2.csv index 2c15e0254cbe8..b0c31e59ae1b2 100644 --- a/server/src/main/resources/transport/upper_bounds/9.2.csv +++ b/server/src/main/resources/transport/upper_bounds/9.2.csv @@ -1 +1 @@ -transform_check_for_dangling_tasks,9170000 +batched_response_might_include_reduction_failure,9185007 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv new file mode 100644 index 0000000000000..61602dea24d29 --- /dev/null +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -0,0 +1 @@ +batched_response_might_include_reduction_failure,9213000