-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Fail request when all target shards fail in runtime #131177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,6 +7,7 @@ | |
|
|
||
| package org.elasticsearch.xpack.esql.plugin; | ||
|
|
||
| import org.elasticsearch.ExceptionsHelper; | ||
| import org.elasticsearch.action.ActionListener; | ||
| import org.elasticsearch.action.OriginalIndices; | ||
| import org.elasticsearch.action.search.SearchRequest; | ||
|
|
@@ -375,9 +376,10 @@ public void executePlan( | |
| var computeListener = new ComputeListener( | ||
| transportService.getThreadPool(), | ||
| cancelQueryOnFailure, | ||
| listener.map(completionInfo -> { | ||
| listener.delegateFailureAndWrap((l, completionInfo) -> { | ||
| failIfAllShardsFailed(execInfo, collectedPages); | ||
| execInfo.markEndQuery(); // TODO: revisit this time recording model as part of INLINESTATS improvements | ||
| return new Result(outputAttributes, collectedPages, completionInfo, execInfo); | ||
| l.onResponse(new Result(outputAttributes, collectedPages, completionInfo, execInfo)); | ||
| }) | ||
| ) | ||
| ) { | ||
|
|
@@ -540,6 +542,46 @@ private static void updateExecutionInfoAfterCoordinatorOnlyQuery(EsqlExecutionIn | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * If all of target shards excluding the skipped shards failed, then we should fail the entire query regardless of the partial_results | ||
| * configuration or skip_unavailable setting. This behavior doesn't fully align with the search API as the skip_unavailable | ||
| * would ignore all the failures from the remote clusters; hence, only fail the request when all shards in the local cluster failed. | ||
| */ | ||
| static void failIfAllShardsFailed(EsqlExecutionInfo execInfo, List<Page> finalResults) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this method concerning only remote shards/failures or local too? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method treats both local and remote failures uniformly. This is where the behavior in ES|QL differs slightly from the search API. Here, we iterate over all execution info in each cluster (both local and remotes) to accumulate successful shards (excluding skipped shards) and failed shards. The request only fails if there are no successful shards, some failed shards, and no rows produced in the final results. |
||
| // do not fail if any final result has results | ||
| if (finalResults.stream().anyMatch(p -> p.getPositionCount() > 0)) { | ||
| return; | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure if this will behave correctly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The request will not fail as long as the single target shard does not fail. |
||
| int totalFailedShards = 0; | ||
| for (EsqlExecutionInfo.Cluster cluster : execInfo.clusterInfo.values()) { | ||
| final Integer successfulShards = cluster.getSuccessfulShards(); | ||
| if (successfulShards != null && successfulShards > 0) { | ||
| return; | ||
| } | ||
| if (cluster.getFailedShards() != null) { | ||
| totalFailedShards += cluster.getFailedShards(); | ||
| } | ||
| } | ||
| if (totalFailedShards == 0) { | ||
| return; | ||
| } | ||
| final var failureCollector = new FailureCollector(); | ||
| for (EsqlExecutionInfo.Cluster cluster : execInfo.clusterInfo.values()) { | ||
| var failedShards = cluster.getFailedShards(); | ||
| if (failedShards != null && failedShards > 0) { | ||
| assert cluster.getFailures().isEmpty() == false : "expected failures for cluster [" + cluster.getClusterAlias() + "]"; | ||
| for (ShardSearchFailure failure : cluster.getFailures()) { | ||
| if (failure.getCause() instanceof Exception e) { | ||
smalyshev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| failureCollector.unwrapAndCollect(e); | ||
| } else { | ||
| failureCollector.unwrapAndCollect(failure); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| ExceptionsHelper.reThrowIfNotNull(failureCollector.getFailure()); | ||
| } | ||
|
|
||
| void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, ActionListener<DriverCompletionInfo> listener) { | ||
| listener = ActionListener.runBefore(listener, () -> Releasables.close(context.searchContexts())); | ||
| List<EsPhysicalOperationProviders.ShardContext> contexts = new ArrayList<>(context.searchContexts().size()); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.