From f29aeb29e56f365018327f4782146b3f138fa7d1 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Wed, 13 Aug 2025 11:08:00 -0600 Subject: [PATCH] Rename skipping logic to remove hard link to skip_unavailable --- .../xpack/esql/action/EsqlExecutionInfo.java | 24 +++++++------- .../esql/plugin/ClusterComputeHandler.java | 4 +-- .../xpack/esql/session/EsqlCCSUtils.java | 18 ++++------ .../xpack/esql/session/EsqlSession.java | 4 +-- .../xpack/esql/session/EsqlCCSUtilsTests.java | 33 ------------------- 5 files changed, 23 insertions(+), 60 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index 1596bd3f64d91..f115d0ac8bf2c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -71,7 +71,7 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable { private final boolean includeCCSMetadata; // fields that are not Writeable since they are only needed on the primary CCS coordinator - private final transient Predicate skipUnavailablePredicate; + private final transient Predicate skipOnFailurePredicate; // Predicate to determine if we should skip a cluster on failure private volatile boolean isPartial; // Does this request have partial results? private transient volatile boolean isStopped; // Have we received stop command? @@ -81,17 +81,18 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable { private transient TimeSpan planningTimeSpan; // time elapsed since start of query to calling ComputeService.execute private TimeValue overallTook; + // This is only used is tests. public EsqlExecutionInfo(boolean includeCCSMetadata) { - this(Predicates.always(), includeCCSMetadata); // default all clusters to skip_unavailable=true + this(Predicates.always(), includeCCSMetadata); // default all clusters to being skippable on failure } /** - * @param skipUnavailablePredicate provide lookup for whether a given cluster has skip_unavailable set to true or false + * @param skipOnPlanTimeFailurePredicate Decides whether we should skip the cluster that fails during planning phase. * @param includeCCSMetadata (user defined setting) whether to include the CCS metadata in the HTTP response */ - public EsqlExecutionInfo(Predicate skipUnavailablePredicate, boolean includeCCSMetadata) { + public EsqlExecutionInfo(Predicate skipOnPlanTimeFailurePredicate, boolean includeCCSMetadata) { this.clusterInfo = new ConcurrentHashMap<>(); - this.skipUnavailablePredicate = skipUnavailablePredicate; + this.skipOnFailurePredicate = skipOnPlanTimeFailurePredicate; this.includeCCSMetadata = includeCCSMetadata; this.relativeStart = TimeSpan.start(); } @@ -102,7 +103,7 @@ public EsqlExecutionInfo(Predicate skipUnavailablePredicate, boolean inc EsqlExecutionInfo(ConcurrentMap clusterInfo, boolean includeCCSMetadata) { this.clusterInfo = clusterInfo; this.includeCCSMetadata = includeCCSMetadata; - this.skipUnavailablePredicate = Predicates.always(); + this.skipOnFailurePredicate = Predicates.always(); this.relativeStart = null; } @@ -111,7 +112,7 @@ public EsqlExecutionInfo(StreamInput in) throws IOException { this.clusterInfo = in.readMapValues(EsqlExecutionInfo.Cluster::new, Cluster::getClusterAlias, ConcurrentHashMap::new); this.includeCCSMetadata = in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) ? in.readBoolean() : false; this.isPartial = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_RESPONSE_PARTIAL) ? in.readBoolean() : false; - this.skipUnavailablePredicate = Predicates.always(); + this.skipOnFailurePredicate = Predicates.always(); this.relativeStart = null; if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_QUERY_PLANNING_DURATION) || in.getTransportVersion().isPatchFrom(TransportVersions.ESQL_QUERY_PLANNING_DURATION_8_19)) { @@ -200,15 +201,16 @@ public Set clusterAliases() { } /** - * @param clusterAlias to lookup skip_unavailable from - * @return skip_unavailable setting (true/false) + * @param clusterAlias to check if we should skip this cluster on failure + * @return whether it's OK to skip the cluster on failure. * @throws NoSuchRemoteClusterException if clusterAlias is unknown to this node's RemoteClusterService */ - public boolean isSkipUnavailable(String clusterAlias) { + public boolean shouldSkipOnFailure(String clusterAlias) { if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { + // local cluster is not skippable for now return false; } - return skipUnavailablePredicate.test(clusterAlias); + return skipOnFailurePredicate.test(clusterAlias); } public boolean isCrossClusterSearch() { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java index 4e8a89d024b71..e93cfc45ea972 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java @@ -85,7 +85,7 @@ void startComputeOnRemoteCluster( final AtomicReference finalResponse = new AtomicReference<>(); listener = listener.delegateResponse((l, e) -> { final boolean receivedResults = finalResponse.get() != null || pagesFetched.get(); - if (EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e) + if (executionInfo.shouldSkipOnFailure(clusterAlias) || (configuration.allowPartialResults() && EsqlCCSUtils.canAllowPartial(e))) { EsqlCCSUtils.markClusterWithFinalStateAndNoShards( executionInfo, @@ -107,7 +107,7 @@ void startComputeOnRemoteCluster( listener.delegateFailure((l, unused) -> { final CancellableTask groupTask; final Runnable onGroupFailure; - boolean failFast = executionInfo.isSkipUnavailable(clusterAlias) == false && configuration.allowPartialResults() == false; + boolean failFast = executionInfo.shouldSkipOnFailure(clusterAlias) == false && configuration.allowPartialResults() == false; if (failFast) { groupTask = rootTask; onGroupFailure = cancelQueryOnFailure; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index 80c08d145d9bb..ce8915af0fc69 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -109,8 +109,9 @@ static boolean returnSuccessWithEmptyResult(EsqlExecutionInfo executionInfo, Exc if (e instanceof NoClustersToSearchException || ExceptionsHelper.isRemoteUnavailableException(e)) { for (String clusterAlias : executionInfo.clusterAliases()) { - if (executionInfo.isSkipUnavailable(clusterAlias) == false - && clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) { + // Check if we have any remotes that can't be skipped on failure. + if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false + && executionInfo.shouldSkipOnFailure(clusterAlias) == false) { return false; } } @@ -227,7 +228,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices( "Unknown index [%s]", (c.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) ? indexExpression : c + ":" + indexExpression) ); - if (executionInfo.isSkipUnavailable(c) == false || usedFilter) { + if (executionInfo.shouldSkipOnFailure(c) == false || usedFilter) { if (fatalErrorMessage == null) { fatalErrorMessage = error; } else { @@ -239,7 +240,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices( markClusterWithFinalStateAndNoShards( executionInfo, c, - executionInfo.isSkipUnavailable(c) ? Cluster.Status.SKIPPED : Cluster.Status.FAILED, + executionInfo.shouldSkipOnFailure(c) ? Cluster.Status.SKIPPED : Cluster.Status.FAILED, new VerificationException(error) ); } @@ -344,7 +345,7 @@ public static void initCrossClusterState( final String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices()); executionInfo.swapCluster(clusterAlias, (k, v) -> { assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet"; - return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias)); + return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias)); }); } @@ -389,13 +390,6 @@ public static void markClusterWithFinalStateAndNoShards( }); } - /** - * We will ignore the error if it's remote unavailable and the cluster is marked to skip unavailable. - */ - public static boolean shouldIgnoreRuntimeError(EsqlExecutionInfo executionInfo, String clusterAlias, Exception e) { - return executionInfo.isSkipUnavailable(clusterAlias); - } - /** * Check whether this exception can be tolerated when partial results are on, or should be treated as fatal. * @return true if the exception can be tolerated, false if it should be treated as fatal. diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 307be48de1a9e..9282de4a459ae 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -344,7 +344,7 @@ static void handleFieldCapsFailures( assert cluster.getStatus() != EsqlExecutionInfo.Cluster.Status.SUCCESSFUL : "can't mark a cluster success with failures"; continue; } - if (allowPartialResults == false && executionInfo.isSkipUnavailable(clusterAlias) == false) { + if (allowPartialResults == false && executionInfo.shouldSkipOnFailure(clusterAlias) == false) { for (FieldCapabilitiesFailure failure : e.getValue()) { failureCollector.unwrapAndCollect(failure.getException()); } @@ -487,7 +487,7 @@ private void preAnalyzeLookupIndex( private void skipClusterOrError(String clusterAlias, EsqlExecutionInfo executionInfo, String message) { VerificationException error = new VerificationException(message); // If we can, skip the cluster and mark it as such - if (executionInfo.isSkipUnavailable(clusterAlias)) { + if (executionInfo.shouldSkipOnFailure(clusterAlias)) { EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, error); } else { throw error; diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java index dc7cf37559bfb..a12d26f48b608 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.esql.session; import org.apache.lucene.index.CorruptIndexException; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; @@ -21,7 +20,6 @@ import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.license.internal.XPackLicenseStatus; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.NoSeedNodeLeftException; @@ -49,13 +47,11 @@ import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY; import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.initCrossClusterState; -import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.shouldIgnoreRuntimeError; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; public class EsqlCCSUtilsTests extends ESTestCase { @@ -766,35 +762,6 @@ private void assertLicenseCheckFails( assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); } - public void testShouldIgnoreRuntimeError() { - Predicate skipUnPredicate = s -> s.equals(REMOTE1_ALIAS); - - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, true); - executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); - executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true)); - executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", false)); - - // remote1: skip_unavailable=true, so should ignore connect errors, but not others - assertThat( - shouldIgnoreRuntimeError(executionInfo, REMOTE1_ALIAS, new IllegalStateException("Unable to open any connections")), - is(true) - ); - assertThat(shouldIgnoreRuntimeError(executionInfo, REMOTE1_ALIAS, new TaskCancelledException("task cancelled")), is(true)); - assertThat(shouldIgnoreRuntimeError(executionInfo, REMOTE1_ALIAS, new ElasticsearchException("something is wrong")), is(true)); - // remote2: skip_unavailable=false, so should not ignore any errors - assertThat( - shouldIgnoreRuntimeError(executionInfo, REMOTE2_ALIAS, new IllegalStateException("Unable to open any connections")), - is(false) - ); - assertThat(shouldIgnoreRuntimeError(executionInfo, REMOTE2_ALIAS, new TaskCancelledException("task cancelled")), is(false)); - // same for local - assertThat( - shouldIgnoreRuntimeError(executionInfo, LOCAL_CLUSTER_ALIAS, new IllegalStateException("Unable to open any connections")), - is(false) - ); - assertThat(shouldIgnoreRuntimeError(executionInfo, LOCAL_CLUSTER_ALIAS, new TaskCancelledException("task cancelled")), is(false)); - } - private XPackLicenseStatus activeLicenseStatus(License.OperationMode operationMode) { return new XPackLicenseStatus(operationMode, true, null); }