From a8662d892533e66f607a8fc63639e6994aa7bfab Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Wed, 29 Jan 2025 14:39:27 -0700 Subject: [PATCH 01/17] Implement runtime skip_unavailable=true --- .../esql/plugin/ClusterComputeHandler.java | 40 +++++- .../xpack/esql/plugin/ComputeService.java | 24 +++- .../esql/plugin/RemoteListenerGroup.java | 129 ++++++++++++++++++ .../esql/session/EsqlSessionCCSUtils.java | 41 ++++-- 4 files changed, 212 insertions(+), 22 deletions(-) create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java index 19ed77405daa2..bfee85184aab3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java @@ -25,6 +25,7 @@ import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.session.Configuration; @@ -37,6 +38,8 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; +import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards; + /** * Manages computes across multiple clusters by sending {@link ClusterComputeRequest} to remote clusters and executing the computes. * This handler delegates the execution of computes on data nodes within each remote cluster to {@link DataNodeComputeHandler}. @@ -71,34 +74,51 @@ void startComputeOnRemoteCluster( ExchangeSourceHandler exchangeSource, RemoteCluster cluster, Runnable cancelQueryOnFailure, + EsqlExecutionInfo executionInfo, ActionListener listener ) { var queryPragmas = configuration.pragmas(); listener = ActionListener.runBefore(listener, exchangeSource.addEmptySink()::close); final var childSessionId = computeService.newChildSession(sessionId); final AtomicReference finalResponse = new AtomicReference<>(); + final String clusterAlias = cluster.clusterAlias(); try (var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> { var resp = finalResponse.get(); return Objects.requireNonNullElseGet(resp, () -> new ComputeResponse(profiles)); }))) { + var openExchangeListener = computeListener.acquireAvoid(); ExchangeService.openExchange( transportService, cluster.connection, childSessionId, queryPragmas.exchangeBufferSize(), esqlExecutor, - computeListener.acquireCompute().delegateFailureAndWrap((l, unused) -> { - var remoteSink = exchangeService.newRemoteSink(rootTask, childSessionId, transportService, cluster.connection); + ActionListener.wrap(unused -> { + var listenerGroup = new RemoteListenerGroup( + transportService, + rootTask, + computeListener, + clusterAlias, + executionInfo, + openExchangeListener + ); + + var remoteSink = exchangeService.newRemoteSink( + listenerGroup.getGroupTask(), + childSessionId, + transportService, + cluster.connection + ); exchangeSource.addRemoteSink( remoteSink, - true, + executionInfo.isSkipUnavailable(clusterAlias) == false, () -> {}, queryPragmas.concurrentExchangeClients(), - computeListener.acquireAvoid() + listenerGroup.getExchangeRequestListener() ); var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices); - var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, childSessionId, configuration, remotePlan); - final ActionListener clusterListener = l.map(r -> { + var clusterRequest = new ClusterComputeRequest(clusterAlias, childSessionId, configuration, remotePlan); + final ActionListener clusterListener = listenerGroup.getClusterRequestListener().map(r -> { finalResponse.set(r); return r.getProfiles(); }); @@ -110,9 +130,17 @@ void startComputeOnRemoteCluster( TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor) ); + }, e -> { + if (executionInfo.isSkipUnavailable(clusterAlias)) { + markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e); + openExchangeListener.onResponse(null); + } else { + openExchangeListener.onFailure(e); + } }) ); } + } List getRemoteClusters( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 71c2a65037e9a..4177d7db037a5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -61,6 +61,7 @@ import java.util.function.Supplier; import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME; +import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards; /** * Computes the result of a {@link PhysicalPlan}. @@ -268,6 +269,8 @@ public void execute( // starts computes on remote clusters final var remoteClusters = clusterComputeHandler.getRemoteClusters(clusterToConcreteIndices, clusterToOriginalIndices); for (ClusterComputeHandler.RemoteCluster cluster : remoteClusters) { + var remoteListener = computeListener.acquireCompute(); + String clusterAlias = cluster.clusterAlias(); clusterComputeHandler.startComputeOnRemoteCluster( sessionId, rootTask, @@ -276,9 +279,17 @@ public void execute( exchangeSource, cluster, cancelQueryOnFailure, - computeListener.acquireCompute().map(r -> { - updateExecutionInfo(execInfo, cluster.clusterAlias(), r); - return r.getProfiles(); + execInfo, + ActionListener.wrap((ComputeResponse r) -> { + updateExecutionInfo(execInfo, clusterAlias, r); + remoteListener.onResponse(r.getProfiles()); + }, e -> { + if (shouldIgnoreRemoteError(clusterAlias, e)) { + markClusterWithFinalStateAndNoShards(execInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e); + remoteListener.onResponse(Collections.emptyList()); + } else { + remoteListener.onFailure(e); + } }) ); } @@ -286,6 +297,10 @@ public void execute( } } + private boolean shouldIgnoreRemoteError(String clusterAlias, Exception e) { + return true; + } + private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String clusterAlias, ComputeResponse resp) { Function runningToSuccess = status -> { if (status == EsqlExecutionInfo.Cluster.Status.RUNNING) { @@ -309,11 +324,10 @@ private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String cluster } else { // if the cluster is an older version and does not send back took time, then calculate it here on the coordinator // and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response - var tookTime = TimeValue.timeValueNanos(System.nanoTime() - executionInfo.getRelativeStartNanos()); executionInfo.swapCluster( clusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus())) - .setTook(tookTime) + .setTook(executionInfo.tookSoFar()) .build() ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java new file mode 100644 index 0000000000000..fe5132c6dd4d7 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java @@ -0,0 +1,129 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plugin; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.compute.operator.DriverProfile; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; + +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards; + +// Create group task for this cluster. This group task ensures that two branches of the computation: +// the exchange sink and the cluster request, belong to the same group and each of them can cancel the other. +// runAfter listeners below ensure that the group is finalized when both branches are done. +// The group task is the child of the root task, so if the root task is cancelled, the group task is cancelled too. +class RemoteListenerGroup { + private final CancellableTask groupTask; + private final ActionListener exchangeRequestListener; + private final ActionListener> clusterRequestListener; + private final TaskManager taskManager; + private final String clusterAlias; + private final EsqlExecutionInfo executionInfo; + private final TransportService transportService; + + RemoteListenerGroup( + TransportService transportService, + Task rootTask, + ComputeListener computeListener, + String clusterAlias, + EsqlExecutionInfo executionInfo, + ActionListener delegate + ) { + this.transportService = transportService; + this.taskManager = transportService.getTaskManager(); + this.clusterAlias = clusterAlias; + this.executionInfo = executionInfo; + groupTask = createGroupTask(rootTask, () -> rootTask.getDescription() + "[" + clusterAlias + "]"); + CountDown countDown = new CountDown(2); + // The group is done when both the sink and the cluster request are done + Runnable finishGroup = () -> { + if (countDown.countDown()) { + taskManager.unregister(groupTask); + delegate.onResponse(null); + } + }; + // Cancel the group on sink failure + exchangeRequestListener = createCancellingListener("exchange sink failure", computeListener.acquireAvoid(), finishGroup); + + // Cancel the group on cluster request failure + clusterRequestListener = createCancellingListener("exchange cluster action failure", computeListener.acquireCompute(), finishGroup); + } + + /** + * Create a listener that: + * 1. Cancels the group task on failure + * 2. Marks the cluster as partial if the error is ignorable, otherwise propagates the error + */ + private ActionListener createCancellingListener(String reason, ActionListener delegate, Runnable finishGroup) { + return ActionListener.runAfter(delegate.delegateResponse((inner, e) -> { + taskManager.cancelTaskAndDescendants(groupTask, reason, true, ActionListener.running(() -> { + if (shouldIgnoreRemoteError(clusterAlias, e)) { + markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL, e); + delegate.onResponse(null); + } else { + delegate.onFailure(e); + } + })); + }), finishGroup); + } + + private boolean shouldIgnoreRemoteError(String clusterAlias, Exception e) { + return executionInfo.isSkipUnavailable(clusterAlias); + } + + public CancellableTask getGroupTask() { + return groupTask; + } + + public ActionListener getExchangeRequestListener() { + return exchangeRequestListener; + } + + public ActionListener> getClusterRequestListener() { + return clusterRequestListener; + } + + private CancellableTask createGroupTask(Task parentTask, Supplier description) { + return (CancellableTask) taskManager.register( + "transport", + "esql_compute_group", + new ComputeGroupTaskRequest(parentTask.taskInfo(transportService.getLocalNode().getId(), false).taskId(), description) + ); + } + + private static class ComputeGroupTaskRequest extends TransportRequest { + private final Supplier parentDescription; + + ComputeGroupTaskRequest(TaskId parentTask, Supplier description) { + this.parentDescription = description; + setParentTask(parentTask); + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + assert parentTaskId.isSet(); + return new CancellableTask(id, type, action, "", parentTaskId, headers); + } + + @Override + public String getDescription() { + return "group [" + parentDescription.get() + "]"; + } + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java index 6be243456e040..0ed5dc7dc5bf9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.license.XPackLicenseState; @@ -25,6 +26,7 @@ import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; +import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo.Cluster; import org.elasticsearch.xpack.esql.analysis.Analyzer; import org.elasticsearch.xpack.esql.analysis.TableInfo; import org.elasticsearch.xpack.esql.index.IndexResolution; @@ -37,7 +39,7 @@ import java.util.Map; import java.util.Set; -class EsqlSessionCCSUtils { +public class EsqlSessionCCSUtils { private EsqlSessionCCSUtils() {} @@ -171,16 +173,7 @@ static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInf entry.getValue().getException() ); if (skipUnavailable) { - execInfo.swapCluster( - clusterAlias, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED) - .setTotalShards(0) - .setSuccessfulShards(0) - .setSkippedShards(0) - .setFailedShards(0) - .setFailures(List.of(new ShardSearchFailure(e))) - .build() - ); + markClusterWithFinalStateAndNoShards(execInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e); } else { throw e; } @@ -338,4 +331,30 @@ public static void checkForCcsLicense( } } } + + /** + * Mark cluster with a default cluster state with the given status and potentially failure from exception. + * Most metrics are set to 0 except for "took" which is set to the total time taken so far. + * The status must be the final state of the cluster, not RUNNING. + */ + public static void markClusterWithFinalStateAndNoShards( + EsqlExecutionInfo executionInfo, + String clusterAlias, + Cluster.Status status, + @Nullable Exception ex + ) { + assert status != Cluster.Status.RUNNING : "status must be a final state, not RUNNING"; + executionInfo.swapCluster(clusterAlias, (k, v) -> { + Cluster.Builder builder = new Cluster.Builder(v).setStatus(status) + .setTook(executionInfo.tookSoFar()) + .setTotalShards(0) + .setSuccessfulShards(0) + .setSkippedShards(0) + .setFailedShards(0); + if (ex != null) { + builder.setFailures(List.of(new ShardSearchFailure(ex))); + } + return builder.build(); + }); + } } From 49ed90e462933f4cf6ddeeb3cf778b537b99237f Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Thu, 30 Jan 2025 10:24:26 -0700 Subject: [PATCH 02/17] Better exception check --- .../xpack/esql/plugin/ClusterComputeHandler.java | 3 ++- .../elasticsearch/xpack/esql/plugin/ComputeService.java | 7 ++----- .../xpack/esql/session/EsqlSessionCCSUtils.java | 7 +++++++ .../xpack/esql/session/EsqlSessionCCSUtilsTests.java | 2 +- 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java index bfee85184aab3..ac04eab778ca9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards; +import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.shouldIgnoreRuntimeError; /** * Manages computes across multiple clusters by sending {@link ClusterComputeRequest} to remote clusters and executing the computes. @@ -131,7 +132,7 @@ void startComputeOnRemoteCluster( new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor) ); }, e -> { - if (executionInfo.isSkipUnavailable(clusterAlias)) { + if (shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) { markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e); openExchangeListener.onResponse(null); } else { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 4177d7db037a5..9af87c5100020 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -62,6 +62,7 @@ import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME; import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards; +import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.shouldIgnoreRuntimeError; /** * Computes the result of a {@link PhysicalPlan}. @@ -284,7 +285,7 @@ public void execute( updateExecutionInfo(execInfo, clusterAlias, r); remoteListener.onResponse(r.getProfiles()); }, e -> { - if (shouldIgnoreRemoteError(clusterAlias, e)) { + if (shouldIgnoreRuntimeError(execInfo, clusterAlias, e)) { markClusterWithFinalStateAndNoShards(execInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e); remoteListener.onResponse(Collections.emptyList()); } else { @@ -297,10 +298,6 @@ public void execute( } } - private boolean shouldIgnoreRemoteError(String clusterAlias, Exception e) { - return true; - } - private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String clusterAlias, ComputeResponse resp) { Function runningToSuccess = status -> { if (status == EsqlExecutionInfo.Cluster.Status.RUNNING) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java index 0ed5dc7dc5bf9..f1487b416beea 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java @@ -357,4 +357,11 @@ public static void markClusterWithFinalStateAndNoShards( return builder.build(); }); } + + public static boolean shouldIgnoreRuntimeError(EsqlExecutionInfo executionInfo, String clusterAlias, Exception e) { + if (executionInfo.isSkipUnavailable(clusterAlias) == false) { + return false; + } + return ExceptionsHelper.isRemoteUnavailableException(e); + } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java index a84e5b144e64c..9845474774c4d 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java @@ -534,7 +534,7 @@ public void testUpdateExecutionInfoAtEndOfPlanning() { private void assertClusterStatusAndShardCounts(EsqlExecutionInfo.Cluster cluster, EsqlExecutionInfo.Cluster.Status status) { assertThat(cluster.getStatus(), equalTo(status)); - assertNull(cluster.getTook()); + assertThat(cluster.getTook().millis(), greaterThanOrEqualTo(0L)); if (status == EsqlExecutionInfo.Cluster.Status.RUNNING) { assertNull(cluster.getTotalShards()); assertNull(cluster.getSuccessfulShards()); From 0848dd1f97b9de8f843a523e978f35c89f72a97a Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Thu, 30 Jan 2025 11:55:12 -0700 Subject: [PATCH 03/17] Refine monitored exceptions --- .../esql/plugin/ClusterComputeHandler.java | 13 ++++++++----- .../xpack/esql/plugin/ComputeService.java | 18 ++++-------------- .../xpack/esql/plugin/RemoteListenerGroup.java | 16 ++++++++-------- .../esql/session/EsqlSessionCCSUtils.java | 14 +++++++++----- .../esql/session/EsqlSessionCCSUtilsTests.java | 10 +++++++++- 5 files changed, 38 insertions(+), 33 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java index ac04eab778ca9..4fa6867ea5b72 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java @@ -29,6 +29,7 @@ import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.session.Configuration; +import org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils; import java.util.ArrayList; import java.util.List; @@ -38,9 +39,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; -import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards; -import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.shouldIgnoreRuntimeError; - /** * Manages computes across multiple clusters by sending {@link ClusterComputeRequest} to remote clusters and executing the computes. * This handler delegates the execution of computes on data nodes within each remote cluster to {@link DataNodeComputeHandler}. @@ -132,8 +130,13 @@ void startComputeOnRemoteCluster( new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor) ); }, e -> { - if (shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) { - markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e); + if (EsqlSessionCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) { + EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards( + executionInfo, + clusterAlias, + EsqlExecutionInfo.Cluster.Status.SKIPPED, + e + ); openExchangeListener.onResponse(null); } else { openExchangeListener.onFailure(e); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 9af87c5100020..b552e021f4b49 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -48,6 +48,7 @@ import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.session.Configuration; +import org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils; import org.elasticsearch.xpack.esql.session.Result; import java.util.ArrayList; @@ -61,8 +62,6 @@ import java.util.function.Supplier; import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME; -import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards; -import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.shouldIgnoreRuntimeError; /** * Computes the result of a {@link PhysicalPlan}. @@ -270,8 +269,6 @@ public void execute( // starts computes on remote clusters final var remoteClusters = clusterComputeHandler.getRemoteClusters(clusterToConcreteIndices, clusterToOriginalIndices); for (ClusterComputeHandler.RemoteCluster cluster : remoteClusters) { - var remoteListener = computeListener.acquireCompute(); - String clusterAlias = cluster.clusterAlias(); clusterComputeHandler.startComputeOnRemoteCluster( sessionId, rootTask, @@ -281,16 +278,9 @@ public void execute( cluster, cancelQueryOnFailure, execInfo, - ActionListener.wrap((ComputeResponse r) -> { - updateExecutionInfo(execInfo, clusterAlias, r); - remoteListener.onResponse(r.getProfiles()); - }, e -> { - if (shouldIgnoreRuntimeError(execInfo, clusterAlias, e)) { - markClusterWithFinalStateAndNoShards(execInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e); - remoteListener.onResponse(Collections.emptyList()); - } else { - remoteListener.onFailure(e); - } + computeListener.acquireCompute().map(r -> { + updateExecutionInfo(execInfo, cluster.clusterAlias(), r); + return r.getProfiles(); }) ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java index fe5132c6dd4d7..d0262c33b3c9a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java @@ -17,13 +17,12 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; +import org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils; import java.util.List; import java.util.Map; import java.util.function.Supplier; -import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards; - // Create group task for this cluster. This group task ensures that two branches of the computation: // the exchange sink and the cluster request, belong to the same group and each of them can cancel the other. // runAfter listeners below ensure that the group is finalized when both branches are done. @@ -73,8 +72,13 @@ class RemoteListenerGroup { private ActionListener createCancellingListener(String reason, ActionListener delegate, Runnable finishGroup) { return ActionListener.runAfter(delegate.delegateResponse((inner, e) -> { taskManager.cancelTaskAndDescendants(groupTask, reason, true, ActionListener.running(() -> { - if (shouldIgnoreRemoteError(clusterAlias, e)) { - markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL, e); + if (EsqlSessionCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) { + EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards( + executionInfo, + clusterAlias, + EsqlExecutionInfo.Cluster.Status.PARTIAL, + e + ); delegate.onResponse(null); } else { delegate.onFailure(e); @@ -83,10 +87,6 @@ private ActionListener createCancellingListener(String reason, ActionList }), finishGroup); } - private boolean shouldIgnoreRemoteError(String clusterAlias, Exception e) { - return executionInfo.isSkipUnavailable(clusterAlias); - } - public CancellableTask getGroupTask() { return groupTask; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java index f1487b416beea..b50c8952e1e2c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java @@ -19,6 +19,7 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.NoSuchRemoteClusterException; import org.elasticsearch.transport.RemoteClusterAware; @@ -37,6 +38,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; public class EsqlSessionCCSUtils { @@ -347,10 +349,10 @@ public static void markClusterWithFinalStateAndNoShards( executionInfo.swapCluster(clusterAlias, (k, v) -> { Cluster.Builder builder = new Cluster.Builder(v).setStatus(status) .setTook(executionInfo.tookSoFar()) - .setTotalShards(0) - .setSuccessfulShards(0) - .setSkippedShards(0) - .setFailedShards(0); + .setTotalShards(Objects.requireNonNullElse(v.getTotalShards(), 0)) + .setSuccessfulShards(Objects.requireNonNullElse(v.getTotalShards(), 0)) + .setSkippedShards(Objects.requireNonNullElse(v.getTotalShards(), 0)) + .setFailedShards(Objects.requireNonNullElse(v.getTotalShards(), 0)); if (ex != null) { builder.setFailures(List.of(new ShardSearchFailure(ex))); } @@ -362,6 +364,8 @@ public static boolean shouldIgnoreRuntimeError(EsqlExecutionInfo executionInfo, if (executionInfo.isSkipUnavailable(clusterAlias) == false) { return false; } - return ExceptionsHelper.isRemoteUnavailableException(e); + + return ExceptionsHelper.isRemoteUnavailableException(e) + || (e instanceof RemoteTransportException && e.getCause() instanceof TaskCancelledException); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java index 9845474774c4d..2d2673601f66d 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java @@ -534,7 +534,10 @@ public void testUpdateExecutionInfoAtEndOfPlanning() { private void assertClusterStatusAndShardCounts(EsqlExecutionInfo.Cluster cluster, EsqlExecutionInfo.Cluster.Status status) { assertThat(cluster.getStatus(), equalTo(status)); - assertThat(cluster.getTook().millis(), greaterThanOrEqualTo(0L)); + if (cluster.getTook() != null) { + // It is also ok if it's null in some tests + assertThat(cluster.getTook().millis(), greaterThanOrEqualTo(0L)); + } if (status == EsqlExecutionInfo.Cluster.Status.RUNNING) { assertNull(cluster.getTotalShards()); assertNull(cluster.getSuccessfulShards()); @@ -545,6 +548,11 @@ private void assertClusterStatusAndShardCounts(EsqlExecutionInfo.Cluster cluster assertThat(cluster.getSuccessfulShards(), equalTo(0)); assertThat(cluster.getSkippedShards(), equalTo(0)); assertThat(cluster.getFailedShards(), equalTo(0)); + } else if (status == EsqlExecutionInfo.Cluster.Status.PARTIAL) { + assertThat(cluster.getTotalShards(), equalTo(0)); + assertThat(cluster.getSuccessfulShards(), equalTo(0)); + assertThat(cluster.getSkippedShards(), equalTo(0)); + assertThat(cluster.getFailedShards(), equalTo(0)); } else { fail("Unexpected status: " + status); } From 1629251be0c877ead4d09f749770e2c47a5498f3 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 30 Jan 2025 19:02:47 +0000 Subject: [PATCH 04/17] [CI] Auto commit changes from spotless --- .../java/org/elasticsearch/xpack/esql/plugin/ComputeService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index b552e021f4b49..55c093554d7a5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -48,7 +48,6 @@ import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.session.Configuration; -import org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils; import org.elasticsearch.xpack.esql.session.Result; import java.util.ArrayList; From dcd0d780ed52fc2e122b022f12fb7041420441b9 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Thu, 30 Jan 2025 12:11:43 -0700 Subject: [PATCH 05/17] Update docs/changelog/121240.yaml --- docs/changelog/121240.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/121240.yaml diff --git a/docs/changelog/121240.yaml b/docs/changelog/121240.yaml new file mode 100644 index 0000000000000..b0ca8e5e614db --- /dev/null +++ b/docs/changelog/121240.yaml @@ -0,0 +1,5 @@ +pr: 121240 +summary: Implement runtime skip_unavailable=true +area: ES|QL +type: enhancement +issues: [] From d3c8ef56c70db9afcb6d5c91a3dc64eed0663e47 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Thu, 30 Jan 2025 16:02:16 -0700 Subject: [PATCH 06/17] Add tests --- .../test/FailingFieldPlugin.java | 4 +- .../xpack/esql/ccq/Clusters.java | 5 + .../xpack/esql/ccq/EsqlRestValidationIT.java | 16 +++ .../action/CrossClustersCancellationIT.java | 117 ++++++++++++++++++ .../esql/action/CrossClustersQueryIT.java | 57 +++++++++ .../esql/action/SimplePauseFieldPlugin.java | 6 +- 6 files changed, 203 insertions(+), 2 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/FailingFieldPlugin.java b/test/framework/src/main/java/org/elasticsearch/test/FailingFieldPlugin.java index 64c90826fda85..670191676726a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/FailingFieldPlugin.java +++ b/test/framework/src/main/java/org/elasticsearch/test/FailingFieldPlugin.java @@ -24,12 +24,14 @@ public class FailingFieldPlugin extends Plugin implements ScriptPlugin { + public static final String FAILING_FIELD_LANG = "failing_field"; + @Override public ScriptEngine getScriptEngine(Settings settings, Collection> contexts) { return new ScriptEngine() { @Override public String getType() { - return "failing_field"; + return FAILING_FIELD_LANG; } @Override diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java index 5f3f135810322..6a1b9f9d051eb 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java @@ -31,6 +31,10 @@ public static ElasticsearchCluster remoteCluster() { } public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteCluster) { + return localCluster(remoteCluster, true); + } + + public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteCluster, Boolean skipUnavailable) { return ElasticsearchCluster.local() .name(LOCAL_CLUSTER_NAME) .distribution(DistributionType.DEFAULT) @@ -41,6 +45,7 @@ public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteClust .setting("node.roles", "[data,ingest,master,remote_cluster_client]") .setting("cluster.remote.remote_cluster.seeds", () -> "\"" + remoteCluster.getTransportEndpoint(0) + "\"") .setting("cluster.remote.connections_per_cluster", "1") + .setting("cluster.remote." + REMOTE_CLUSTER_NAME + ".skip_unavailable", skipUnavailable.toString()) .shared(true) .setting("cluster.routing.rebalance.enable", "none") .build(); diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationIT.java index 55500aa1c9537..a37e663547187 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationIT.java @@ -11,6 +11,8 @@ import org.apache.http.HttpHost; import org.elasticsearch.Version; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.core.IOUtils; import org.elasticsearch.test.TestClustersThreadFilter; @@ -50,6 +52,20 @@ public static void closeRemoteClients() throws IOException { } } + @Before + public void randomizeSkipUnavailable() throws IOException { + Request request = new Request("PUT", "_cluster/settings"); + request.setJsonEntity(""" + { + "persistent": { + "cluster.remote.remote_cluster.skip_unavailable": $SKIP_UNAVAILABLE$ + } + } + """.replace("$SKIP_UNAVAILABLE$", String.valueOf(randomBoolean()))); + Response response = client().performRequest(request); + assertOK(response); + } + @Override protected String clusterSpecificIndexName(String pattern) { StringJoiner sj = new StringJoiner(","); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java index cfe6fdeccb190..e9c488bd89c59 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.action; +import org.elasticsearch.Build; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction; @@ -20,6 +21,7 @@ import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.core.TimeValue; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.AbstractMultiClustersTestCase; import org.elasticsearch.transport.TransportService; @@ -39,6 +41,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; public class CrossClustersCancellationIT extends AbstractMultiClustersTestCase { private static final String REMOTE_CLUSTER = "cluster-a"; @@ -75,6 +78,11 @@ public void resetPlugin() { SimplePauseFieldPlugin.resetPlugin(); } + @Override + protected boolean reuseClusters() { + return false; + } + private void createRemoteIndex(int numDocs) throws Exception { XContentBuilder mapping = JsonXContent.contentBuilder().startObject(); mapping.startObject("runtime"); @@ -96,6 +104,26 @@ private void createRemoteIndex(int numDocs) throws Exception { bulk.get(); } + private void createLocalIndex(int numDocs) throws Exception { + XContentBuilder mapping = JsonXContent.contentBuilder().startObject(); + mapping.startObject("runtime"); + { + mapping.startObject("const"); + { + mapping.field("type", "long"); + } + mapping.endObject(); + } + mapping.endObject(); + mapping.endObject(); + client(LOCAL_CLUSTER).admin().indices().prepareCreate("test").setMapping(mapping).get(); + BulkRequestBuilder bulk = client(LOCAL_CLUSTER).prepareBulk("test").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int i = 0; i < numDocs; i++) { + bulk.add(new IndexRequest().source("const", i)); + } + bulk.get(); + } + public void testCancel() throws Exception { createRemoteIndex(between(10, 100)); EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); @@ -208,4 +236,93 @@ public void testTasks() throws Exception { } requestFuture.actionGet(30, TimeUnit.SECONDS).close(); } + + // Check that cancelling remote task with skip_unavailable=true produces partial + public void testCancelSkipUnavailable() throws Exception { + createRemoteIndex(between(10, 100)); + EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); + request.query("FROM *:test | STATS total=sum(const) | LIMIT 1"); + request.pragmas(randomPragmas()); + request.includeCCSMetadata(true); + PlainActionFuture requestFuture = new PlainActionFuture<>(); + client().execute(EsqlQueryAction.INSTANCE, request, requestFuture); + assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS)); + List rootTasks = new ArrayList<>(); + assertBusy(() -> { + List tasks = client(REMOTE_CLUSTER).admin() + .cluster() + .prepareListTasks() + .setActions(ComputeService.CLUSTER_ACTION_NAME) + .get() + .getTasks(); + assertThat(tasks, hasSize(1)); + rootTasks.addAll(tasks); + }); + var cancelRequest = new CancelTasksRequest().setTargetTaskId(rootTasks.get(0).taskId()).setReason("remote failed"); + client(REMOTE_CLUSTER).execute(TransportCancelTasksAction.TYPE, cancelRequest); + try { + assertBusy(() -> { + List drivers = client(REMOTE_CLUSTER).admin() + .cluster() + .prepareListTasks() + .setActions(DriverTaskRunner.ACTION_NAME) + .get() + .getTasks(); + assertThat(drivers.size(), greaterThanOrEqualTo(1)); + for (TaskInfo driver : drivers) { + assertTrue(driver.cancelled()); + } + }); + } finally { + SimplePauseFieldPlugin.allowEmitting.countDown(); + } + var resp = requestFuture.actionGet(); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + + assertNotNull(executionInfo); + EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(REMOTE_CLUSTER); + + assertThat(cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + assertThat(cluster.getFailures().size(), equalTo(1)); + assertThat(cluster.getFailures().get(0).getCause(), instanceOf(TaskCancelledException.class)); + } + + // Check that closing remote node with skip_unavailable=true produces partial + public void testCloseSkipUnavailable() throws Exception { + // We are using delay() here because closing cluster while inside pause fields doesn't seem to produce clean closure + assumeTrue("Only snapshot builds have delay()", Build.current().isSnapshot()); + createRemoteIndex(between(1000, 5000)); + createLocalIndex(10); + EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); + request.query(""" + FROM test*,cluster-a:test* METADATA _index + | EVAL cluster=MV_FIRST(SPLIT(_index, ":")) + | WHERE CASE(cluster == "cluster-a", delay(1ms), true) + | STATS total = sum(const) | LIMIT 1 + """); + request.pragmas(randomPragmas()); + var requestFuture = client().execute(EsqlQueryAction.INSTANCE, request); + assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS)); + SimplePauseFieldPlugin.allowEmitting.countDown(); + cluster(REMOTE_CLUSTER).close(); + try (var resp = requestFuture.actionGet()) { + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + + List> values = getValuesList(resp); + assertThat(values.get(0).size(), equalTo(1)); + // We can't be sure of the exact value here as we don't know if any data from remote came in, but all local data should be there + assertThat((long) values.get(0).get(0), greaterThanOrEqualTo(45L)); + + EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(REMOTE_CLUSTER); + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(localCluster.getSuccessfulShards(), equalTo(1)); + + assertThat(cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + assertThat(cluster.getSuccessfulShards(), equalTo(0)); + assertThat(cluster.getFailures().size(), equalTo(1)); + } + } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java index 1a99fd9d0383d..903fb3b239ac1 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java @@ -27,10 +27,13 @@ import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.test.FailingFieldPlugin; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.XContentTestUtils; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; @@ -81,6 +84,7 @@ protected Collection> nodePlugins(String clusterAlias) { List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class); plugins.add(InternalExchangePlugin.class); + plugins.add(FailingFieldPlugin.class); return plugins; } @@ -836,6 +840,17 @@ public void testWarnings() throws Exception { assertTrue(latch.await(30, TimeUnit.SECONDS)); } + // Non-disconnect remote failures still fail the request + public void testRemoteFailureSkipUnavailable() throws IOException { + Map testClusterInfo = setupFailClusters(); + String localIndex = (String) testClusterInfo.get("local.index"); + String remote1Index = (String) testClusterInfo.get("remote.index"); + int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); + String q = Strings.format("FROM %s,cluster-a:%s*", localIndex, remote1Index); + IllegalStateException e = expectThrows(IllegalStateException.class, () -> runQuery(q, false)); + assertThat(e.getMessage(), containsString("Accessing failing field")); + } + private static void assertClusterMetadataInResponse(EsqlQueryResponse resp, boolean responseExpectMeta) { try { final Map esqlResponseAsMap = XContentTestUtils.convertToMap(resp); @@ -1058,4 +1073,46 @@ private void clearSkipUnavailable() { .setPersistentSettings(settingsBuilder.build()) .get(); } + + Map setupFailClusters() throws IOException { + int numShardsLocal = randomIntBetween(1, 3); + populateLocalIndices(LOCAL_INDEX, numShardsLocal); + + int numShardsRemote = randomIntBetween(1, 3); + populateRemoteIndicesFail(REMOTE_CLUSTER_1, REMOTE_INDEX, numShardsRemote); + + Map clusterInfo = new HashMap<>(); + clusterInfo.put("local.num_shards", numShardsLocal); + clusterInfo.put("local.index", LOCAL_INDEX); + clusterInfo.put("remote.num_shards", numShardsRemote); + clusterInfo.put("remote.index", REMOTE_INDEX); + setSkipUnavailable(REMOTE_CLUSTER_1, true); + return clusterInfo; + } + + void populateRemoteIndicesFail(String clusterAlias, String indexName, int numShards) throws IOException { + Client remoteClient = client(clusterAlias); + XContentBuilder mapping = JsonXContent.contentBuilder().startObject(); + mapping.startObject("runtime"); + { + mapping.startObject("fail_me"); + { + mapping.field("type", "long"); + mapping.startObject("script").field("source", "").field("lang", FailingFieldPlugin.FAILING_FIELD_LANG).endObject(); + } + mapping.endObject(); + } + mapping.endObject(); + assertAcked( + remoteClient.admin() + .indices() + .prepareCreate(indexName) + .setSettings(Settings.builder().put("index.number_of_shards", numShards)) + .setMapping(mapping.endObject()) + ); + + remoteClient.prepareIndex(indexName).setSource("id", 0).get(); + remoteClient.admin().indices().prepareRefresh(indexName).get(); + } + } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/SimplePauseFieldPlugin.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/SimplePauseFieldPlugin.java index 3ba73dd9a402e..a8a2d27e88569 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/SimplePauseFieldPlugin.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/SimplePauseFieldPlugin.java @@ -31,6 +31,10 @@ public void onStartExecute() { @Override public boolean onWait() throws InterruptedException { - return allowEmitting.await(30, TimeUnit.SECONDS); + try { + return allowEmitting.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + return true; + } } } From 5833317adeca3a408ef9924e872a6e11a5a40f87 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Fri, 31 Jan 2025 10:46:36 -0700 Subject: [PATCH 07/17] Make partial to be set when there's skipped or partial --- .../xpack/esql/ccq/EsqlRestValidationIT.java | 16 ---------------- .../CrossClusterEnrichUnavailableClustersIT.java | 6 ++++++ .../CrossClusterQueryUnavailableRemotesIT.java | 5 +++++ .../esql/action/CrossClustersCancellationIT.java | 2 ++ .../xpack/esql/action/CrossClustersQueryIT.java | 6 ++++++ .../esql/plugin/TransportEsqlQueryAction.java | 10 ++++++++++ 6 files changed, 29 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationIT.java index a37e663547187..55500aa1c9537 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationIT.java @@ -11,8 +11,6 @@ import org.apache.http.HttpHost; import org.elasticsearch.Version; -import org.elasticsearch.client.Request; -import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.core.IOUtils; import org.elasticsearch.test.TestClustersThreadFilter; @@ -52,20 +50,6 @@ public static void closeRemoteClients() throws IOException { } } - @Before - public void randomizeSkipUnavailable() throws IOException { - Request request = new Request("PUT", "_cluster/settings"); - request.setJsonEntity(""" - { - "persistent": { - "cluster.remote.remote_cluster.skip_unavailable": $SKIP_UNAVAILABLE$ - } - } - """.replace("$SKIP_UNAVAILABLE$", String.valueOf(randomBoolean()))); - Response response = client().performRequest(request); - assertOK(response); - } - @Override protected String clusterSpecificIndexName(String pattern) { StringJoiner sj = new StringJoiner(","); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java index 9c0447affc754..40ea21371e513 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java @@ -510,11 +510,17 @@ private static void assertCCSExecutionInfoDetails(EsqlExecutionInfo executionInf assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); assertTrue(executionInfo.isCrossClusterSearch()); + boolean hasPartials = false; for (String clusterAlias : executionInfo.clusterAliases()) { EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias); assertThat(cluster.getTook().millis(), greaterThanOrEqualTo(0L)); assertThat(cluster.getTook().millis(), lessThanOrEqualTo(executionInfo.overallTook().millis())); + if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.PARTIAL + || cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) { + hasPartials = true; + } } + assertThat(executionInfo.isPartial(), equalTo(hasPartials)); } private void setSkipUnavailable(String clusterAlias, boolean skip) { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java index eb728895cd00c..8b2ea2856fa21 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java @@ -99,6 +99,7 @@ public void testCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() throws Exc long overallTookMillis = executionInfo.overallTook().millis(); assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.isPartial(), equalTo(true)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, LOCAL_CLUSTER))); @@ -151,6 +152,7 @@ public void testCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() throws Exc long overallTookMillis = executionInfo.overallTook().millis(); assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.isPartial(), equalTo(true)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, LOCAL_CLUSTER))); @@ -203,6 +205,7 @@ public void testCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() throws Exc long overallTookMillis = executionInfo.overallTook().millis(); assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.isPartial(), equalTo(true)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, LOCAL_CLUSTER))); @@ -275,6 +278,7 @@ public void testRemoteOnlyCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() long overallTookMillis = executionInfo.overallTook().millis(); assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.isPartial(), equalTo(true)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1))); @@ -317,6 +321,7 @@ public void testRemoteOnlyCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() long overallTookMillis = executionInfo.overallTook().millis(); assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.isPartial(), equalTo(true)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2))); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java index e9c488bd89c59..63a2542411e72 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java @@ -280,6 +280,7 @@ public void testCancelSkipUnavailable() throws Exception { EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); assertNotNull(executionInfo); + assertThat(executionInfo.isPartial(), equalTo(true)); EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(REMOTE_CLUSTER); assertThat(cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); @@ -308,6 +309,7 @@ public void testCloseSkipUnavailable() throws Exception { try (var resp = requestFuture.actionGet()) { EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); assertNotNull(executionInfo); + assertThat(executionInfo.isPartial(), equalTo(true)); List> values = getValuesList(resp); assertThat(values.get(0).size(), equalTo(1)); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java index 903fb3b239ac1..24a93dba35897 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java @@ -474,6 +474,7 @@ public void assertExpectedClustersForMissingIndicesTests(EsqlExecutionInfo execu Set expectedClusterAliases = expected.stream().map(c -> c.clusterAlias()).collect(Collectors.toSet()); assertThat(executionInfo.clusterAliases(), equalTo(expectedClusterAliases)); + boolean hasSkipped = false; for (ExpectedCluster expectedCluster : expected) { EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(expectedCluster.clusterAlias()); String msg = cluster.getClusterAlias(); @@ -492,10 +493,12 @@ public void assertExpectedClustersForMissingIndicesTests(EsqlExecutionInfo execu assertThat(msg, cluster.getFailures().get(0).getCause(), instanceOf(VerificationException.class)); String expectedMsg = "Unknown index [" + expectedCluster.indexExpression() + "]"; assertThat(msg, cluster.getFailures().get(0).getCause().getMessage(), containsString(expectedMsg)); + hasSkipped = true; } // currently failed shards is always zero - change this once we start allowing partial data for individual shard failures assertThat(msg, cluster.getFailedShards(), equalTo(0)); } + assertThat(executionInfo.isPartial(), equalTo(hasSkipped)); } public void testSearchesWhereNonExistentClusterIsSpecifiedWithWildcards() { @@ -541,6 +544,7 @@ public void testSearchesWhereNonExistentClusterIsSpecifiedWithWildcards() { assertThat(executionInfo.isCrossClusterSearch(), is(true)); assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.isPartial(), equalTo(true)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER))); @@ -597,6 +601,7 @@ public void testCCSExecutionOnSearchesWithLimit0() { long overallTookMillis = executionInfo.overallTook().millis(); assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.isPartial(), equalTo(false)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER))); EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); @@ -645,6 +650,7 @@ public void testMetadataIndex() { assertThat(executionInfo.isCrossClusterSearch(), is(true)); assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(executionInfo.isPartial(), equalTo(false)); EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); assertThat(remoteCluster.getIndexExpression(), equalTo("logs*")); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index a32b4591943f4..bcec730fb8fc9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -260,6 +260,16 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener { + // If we had any skipped or partial clusters, the result is partial + if (executionInfo.getClusters() + .values() + .stream() + .anyMatch( + c -> c.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED + || c.getStatus() == EsqlExecutionInfo.Cluster.Status.PARTIAL + )) { + executionInfo.markAsPartial(); + } recordCCSTelemetry(task, executionInfo, request, null); listener.onResponse(toResponse(task, request, configuration, result)); }, ex -> { From 31e0eb46dd11b7f9513ff2d06712c5096a3dcd3d Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Fri, 31 Jan 2025 12:49:05 -0700 Subject: [PATCH 08/17] Add skip_unavailable=false test --- .../ccq/EsqlRestValidationSkipUnFalseIT.java | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationSkipUnFalseIT.java diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationSkipUnFalseIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationSkipUnFalseIT.java new file mode 100644 index 0000000000000..4b9957f9f5cea --- /dev/null +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationSkipUnFalseIT.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.ccq; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.elasticsearch.test.TestClustersThreadFilter; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.junit.ClassRule; +import org.junit.rules.RuleChain; +import org.junit.rules.TestRule; + +// Duplicate of EsqlRestValidationIT test where skip_unavailable is set to false +@ThreadLeakFilters(filters = TestClustersThreadFilter.class) +public class EsqlRestValidationSkipUnFalseIT extends EsqlRestValidationIT { + static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster, false); + + @ClassRule + public static TestRule clusterRule = RuleChain.outerRule(remoteCluster).around(localCluster); + + @Override + protected String getTestRestCluster() { + return localCluster.getHttpAddresses(); + } +} From d3b27a148f4edb4d9d20dce09cd45a1c502f4230 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Fri, 31 Jan 2025 13:09:54 -0700 Subject: [PATCH 09/17] Pull feedback --- .../esql/plugin/ClusterComputeHandler.java | 6 +- .../esql/plugin/RemoteListenerGroup.java | 16 ++--- ...SessionCCSUtils.java => EsqlCCSUtils.java} | 12 ++-- .../xpack/esql/session/EsqlSession.java | 12 ++-- .../xpack/esql/session/IndexResolver.java | 2 +- ...UtilsTests.java => EsqlCCSUtilsTests.java} | 60 +++++++++---------- 6 files changed, 55 insertions(+), 53 deletions(-) rename x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/{EsqlSessionCCSUtils.java => EsqlCCSUtils.java} (97%) rename x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/{EsqlSessionCCSUtilsTests.java => EsqlCCSUtilsTests.java} (93%) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java index 4fa6867ea5b72..467d6f3307c8d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java @@ -29,7 +29,7 @@ import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.session.Configuration; -import org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils; +import org.elasticsearch.xpack.esql.session.EsqlCCSUtils; import java.util.ArrayList; import java.util.List; @@ -130,8 +130,8 @@ void startComputeOnRemoteCluster( new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor) ); }, e -> { - if (EsqlSessionCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) { - EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards( + if (EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) { + EsqlCCSUtils.markClusterWithFinalStateAndNoShards( executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java index d0262c33b3c9a..0f3b05b1b02fa 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java @@ -17,16 +17,18 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; -import org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils; +import org.elasticsearch.xpack.esql.session.EsqlCCSUtils; import java.util.List; import java.util.Map; import java.util.function.Supplier; -// Create group task for this cluster. This group task ensures that two branches of the computation: -// the exchange sink and the cluster request, belong to the same group and each of them can cancel the other. -// runAfter listeners below ensure that the group is finalized when both branches are done. -// The group task is the child of the root task, so if the root task is cancelled, the group task is cancelled too. +/** + * Create group task for this cluster. This group task ensures that two branches of the computation: + * the exchange sink and the cluster request, belong to the same group and each of them can cancel the other. + * runAfter listeners below ensure that the group is finalized when both branches are done. + * The group task is the child of the root task, so if the root task is cancelled, the group task is cancelled too. + */ class RemoteListenerGroup { private final CancellableTask groupTask; private final ActionListener exchangeRequestListener; @@ -72,8 +74,8 @@ class RemoteListenerGroup { private ActionListener createCancellingListener(String reason, ActionListener delegate, Runnable finishGroup) { return ActionListener.runAfter(delegate.delegateResponse((inner, e) -> { taskManager.cancelTaskAndDescendants(groupTask, reason, true, ActionListener.running(() -> { - if (EsqlSessionCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) { - EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards( + if (EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) { + EsqlCCSUtils.markClusterWithFinalStateAndNoShards( executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java similarity index 97% rename from x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java rename to x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index b50c8952e1e2c..69a2f995da761 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -41,9 +41,9 @@ import java.util.Objects; import java.util.Set; -public class EsqlSessionCCSUtils { +public class EsqlCCSUtils { - private EsqlSessionCCSUtils() {} + private EsqlCCSUtils() {} static Map determineUnavailableRemoteClusters(List failures) { Map unavailableRemotes = new HashMap<>(); @@ -335,9 +335,9 @@ public static void checkForCcsLicense( } /** - * Mark cluster with a default cluster state with the given status and potentially failure from exception. - * Most metrics are set to 0 except for "took" which is set to the total time taken so far. - * The status must be the final state of the cluster, not RUNNING. + * Mark cluster with a final status (success or failure). + * Most metrics are set to 0 if not set yet, except for "took" which is set to the total time taken so far. + * The status must be the final status of the cluster, not RUNNING. */ public static void markClusterWithFinalStateAndNoShards( EsqlExecutionInfo executionInfo, @@ -366,6 +366,6 @@ public static boolean shouldIgnoreRuntimeError(EsqlExecutionInfo executionInfo, } return ExceptionsHelper.isRemoteUnavailableException(e) - || (e instanceof RemoteTransportException && e.getCause() instanceof TaskCancelledException); + || (e instanceof RemoteTransportException && ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index ff1b599ef191e..04ee667833757 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -161,7 +161,7 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P parse(request.query(), request.params()), executionInfo, request.filter(), - new EsqlSessionCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) { + new EsqlCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) { @Override public void onResponse(LogicalPlan analyzedPlan) { preMapper.preMapper( @@ -188,7 +188,7 @@ public void executeOptimizedPlan( ) { PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request); // TODO: this could be snuck into the underlying listener - EsqlSessionCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo); + EsqlCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo); // execute any potential subplans executeSubPlans(physicalPlan, planRunner, executionInfo, request, listener); } @@ -315,7 +315,7 @@ public void analyzedPlan( .collect(Collectors.toSet()); final List indices = preAnalysis.indices; - EsqlSessionCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, verifier.licenseState()); + EsqlCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, verifier.licenseState()); final Set targetClusters = enrichPolicyResolver.groupIndicesPerCluster( indices.stream() @@ -430,7 +430,7 @@ private void preAnalyzeIndices( } // if the preceding call to the enrich policy API found unavailable clusters, recreate the index expression to search // based only on available clusters (which could now be an empty list) - String indexExpressionToResolve = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); + String indexExpressionToResolve = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); if (indexExpressionToResolve.isEmpty()) { // if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution listener.onResponse( @@ -464,8 +464,8 @@ private boolean analyzeCCSIndices( ActionListener l ) { IndexResolution indexResolution = result.indices; - EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); - EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.unavailableClusters()); + EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.unavailableClusters()); if (executionInfo.isCrossClusterSearch() && executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING).findAny().isEmpty()) { // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java index b11a8580a1e18..b7b3b437be93f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java @@ -139,7 +139,7 @@ public IndexResolution mergedMappings(String indexPattern, FieldCapabilitiesResp fields.put(name, field); } - Map unavailableRemotes = EsqlSessionCCSUtils.determineUnavailableRemoteClusters( + Map unavailableRemotes = EsqlCCSUtils.determineUnavailableRemoteClusters( fieldCapsResponse.getFailures() ); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java similarity index 93% rename from x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java rename to x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java index 2d2673601f66d..83e990f41241e 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java @@ -47,12 +47,12 @@ import java.util.stream.Collectors; import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY; -import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.checkForCcsLicense; +import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.checkForCcsLicense; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; -public class EsqlSessionCCSUtilsTests extends ESTestCase { +public class EsqlCCSUtilsTests extends ESTestCase { public void testCreateIndexExpressionFromAvailableClusters() { final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; @@ -66,7 +66,7 @@ public void testCreateIndexExpressionFromAvailableClusters() { executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", true)); - String indexExpr = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); + String indexExpr = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); List list = Arrays.stream(Strings.splitStringByCommaToArray(indexExpr)).toList(); assertThat(list.size(), equalTo(5)); assertThat( @@ -90,7 +90,7 @@ public void testCreateIndexExpressionFromAvailableClusters() { ) ); - String indexExpr = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); + String indexExpr = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); List list = Arrays.stream(Strings.splitStringByCommaToArray(indexExpr)).toList(); assertThat(list.size(), equalTo(3)); assertThat(new HashSet<>(list), equalTo(Strings.commaDelimitedListToSet("logs*,remote1:*,remote1:foo"))); @@ -114,7 +114,7 @@ public void testCreateIndexExpressionFromAvailableClusters() { ) ); - assertThat(EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("logs*")); + assertThat(EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("logs*")); } // only remotes present and all marked as skipped, so in revised index expression should be empty string @@ -134,7 +134,7 @@ public void testCreateIndexExpressionFromAvailableClusters() { ) ); - assertThat(EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("")); + assertThat(EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("")); } } @@ -152,7 +152,7 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); var unvailableClusters = Map.of(remote1Alias, failure, remote2Alias, failure); - EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, unvailableClusters); + EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, unvailableClusters); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(localClusterAlias, remote1Alias, remote2Alias))); assertNull(executionInfo.overallTook()); @@ -180,7 +180,7 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); RemoteTransportException e = expectThrows( RemoteTransportException.class, - () -> EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of(remote2Alias, failure)) + () -> EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of(remote2Alias, failure)) ); assertThat(e.status().getStatus(), equalTo(500)); assertThat( @@ -197,7 +197,7 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); - EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of()); + EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of()); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(localClusterAlias, remote1Alias, remote2Alias))); assertNull(executionInfo.overallTook()); @@ -251,7 +251,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Map.of()); - EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); @@ -295,7 +295,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { Map unavailableClusters = Map.of(); IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters); - EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); @@ -337,7 +337,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { Map unavailableClusters = Map.of(remote1Alias, failure); IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters); - EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); @@ -381,7 +381,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters); VerificationException ve = expectThrows( VerificationException.class, - () -> EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution) + () -> EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution) ); assertThat(ve.getDetailedMessage(), containsString("Unknown index [remote2:mylogs1,mylogs2,logs*]")); } @@ -414,7 +414,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { Map unavailableClusters = Map.of(remote1Alias, failure); IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters); - EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); @@ -444,7 +444,7 @@ public void testDetermineUnavailableRemoteClusters() { ) ); - Map unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures); + Map unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(failures); assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote1", "remote2"))); } @@ -454,7 +454,7 @@ public void testDetermineUnavailableRemoteClusters() { failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSuchRemoteClusterException("remote2"))); failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSeedNodeLeftException("no seed node"))); - Map unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures); + Map unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(failures); assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote2"))); } @@ -468,7 +468,7 @@ public void testDetermineUnavailableRemoteClusters() { new IllegalStateException("Unable to open any connections") ) ); - Map unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures); + Map unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(failures); assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote2"))); } @@ -476,14 +476,14 @@ public void testDetermineUnavailableRemoteClusters() { { List failures = new ArrayList<>(); failures.add(new FieldCapabilitiesFailure(new String[] { "remote1:mylogs1" }, new RuntimeException("foo"))); - Map unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures); + Map unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(failures); assertThat(unavailableClusters.keySet(), equalTo(Set.of())); } // empty failures list { List failures = new ArrayList<>(); - Map unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures); + Map unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(failures); assertThat(unavailableClusters.keySet(), equalTo(Set.of())); } } @@ -506,7 +506,7 @@ public void testUpdateExecutionInfoAtEndOfPlanning() { Thread.sleep(1); } catch (InterruptedException e) {} - EsqlSessionCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo); + EsqlCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo); assertThat(executionInfo.planningTookTime().millis(), greaterThanOrEqualTo(0L)); assertNull(executionInfo.overallTook()); @@ -589,7 +589,7 @@ public void testReturnSuccessWithEmptyResult() { { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); executionInfo.swapCluster(localClusterAlias, (k, v) -> localCluster); - assertFalse(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, noClustersException)); + assertFalse(EsqlCCSUtils.returnSuccessWithEmptyResult(executionInfo, noClustersException)); } // local cluster is present, so do not return empty result @@ -606,7 +606,7 @@ public void testReturnSuccessWithEmptyResult() { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); executionInfo.swapCluster(remote1Alias, (k, v) -> remote1); executionInfo.swapCluster(remote2Alias, (k, v) -> remote2); - assertFalse(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, noClustersException)); + assertFalse(EsqlCCSUtils.returnSuccessWithEmptyResult(executionInfo, noClustersException)); } // remote-only, all clusters are skip_unavailable=true, so should return empty result with @@ -621,7 +621,7 @@ public void testReturnSuccessWithEmptyResult() { new NoSeedNodeLeftException("foo"), new IllegalStateException("unknown host") ); - assertTrue(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, e)); + assertTrue(EsqlCCSUtils.returnSuccessWithEmptyResult(executionInfo, e)); } // remote-only, all clusters are skip_unavailable=true, but exception is not "remote unavailable" so return false @@ -630,7 +630,7 @@ public void testReturnSuccessWithEmptyResult() { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); executionInfo.swapCluster(remote2Alias, (k, v) -> remote2); executionInfo.swapCluster(remote3Alias, (k, v) -> remote3); - assertFalse(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, new NullPointerException())); + assertFalse(EsqlCCSUtils.returnSuccessWithEmptyResult(executionInfo, new NullPointerException())); } } @@ -660,7 +660,7 @@ public void testUpdateExecutionInfoToReturnEmptyResult() { assertNull(executionInfo.overallTook()); - EsqlSessionCCSUtils.updateExecutionInfoToReturnEmptyResult(executionInfo, transportEx); + EsqlCCSUtils.updateExecutionInfoToReturnEmptyResult(executionInfo, transportEx); assertNotNull(executionInfo.overallTook()); assertThat(executionInfo.getCluster(localClusterAlias).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); @@ -675,11 +675,11 @@ public void testUpdateExecutionInfoToReturnEmptyResult() { } public void testConcreteIndexRequested() { - assertThat(EsqlSessionCCSUtils.concreteIndexRequested("logs*"), equalTo(false)); - assertThat(EsqlSessionCCSUtils.concreteIndexRequested("mylogs1,mylogs2,logs*"), equalTo(true)); - assertThat(EsqlSessionCCSUtils.concreteIndexRequested("x*,logs"), equalTo(true)); - assertThat(EsqlSessionCCSUtils.concreteIndexRequested("logs,metrics"), equalTo(true)); - assertThat(EsqlSessionCCSUtils.concreteIndexRequested("*"), equalTo(false)); + assertThat(EsqlCCSUtils.concreteIndexRequested("logs*"), equalTo(false)); + assertThat(EsqlCCSUtils.concreteIndexRequested("mylogs1,mylogs2,logs*"), equalTo(true)); + assertThat(EsqlCCSUtils.concreteIndexRequested("x*,logs"), equalTo(true)); + assertThat(EsqlCCSUtils.concreteIndexRequested("logs,metrics"), equalTo(true)); + assertThat(EsqlCCSUtils.concreteIndexRequested("*"), equalTo(false)); } public void testCheckForCcsLicense() { From 99cc226238250d9d33338798c5b2ba25d1ffddd4 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Fri, 31 Jan 2025 13:51:17 -0700 Subject: [PATCH 10/17] Make cancel to fail and other feedback --- .../esql/action/CrossClustersCancellationIT.java | 13 +++---------- .../xpack/esql/action/CrossClustersQueryIT.java | 4 ++-- .../xpack/esql/session/EsqlCCSUtils.java | 6 ++++-- 3 files changed, 9 insertions(+), 14 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java index 63a2542411e72..374b8d2128e74 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java @@ -237,7 +237,7 @@ public void testTasks() throws Exception { requestFuture.actionGet(30, TimeUnit.SECONDS).close(); } - // Check that cancelling remote task with skip_unavailable=true produces partial + // Check that cancelling remote task with skip_unavailable=true produces failure public void testCancelSkipUnavailable() throws Exception { createRemoteIndex(between(10, 100)); EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); @@ -276,16 +276,9 @@ public void testCancelSkipUnavailable() throws Exception { } finally { SimplePauseFieldPlugin.allowEmitting.countDown(); } - var resp = requestFuture.actionGet(); - EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); - assertNotNull(executionInfo); - assertThat(executionInfo.isPartial(), equalTo(true)); - EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(REMOTE_CLUSTER); - - assertThat(cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); - assertThat(cluster.getFailures().size(), equalTo(1)); - assertThat(cluster.getFailures().get(0).getCause(), instanceOf(TaskCancelledException.class)); + Exception error = expectThrows(Exception.class, requestFuture::actionGet); + assertThat(error.getMessage(), containsString("remote failed")); } // Check that closing remote node with skip_unavailable=true produces partial diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java index 24a93dba35897..7dc903f49747f 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java @@ -846,8 +846,8 @@ public void testWarnings() throws Exception { assertTrue(latch.await(30, TimeUnit.SECONDS)); } - // Non-disconnect remote failures still fail the request - public void testRemoteFailureSkipUnavailable() throws IOException { + // Non-disconnect remote failures still fail the request even if skip_unavailable is true + public void testRemoteFailureSkipUnavailableTrue() throws IOException { Map testClusterInfo = setupFailClusters(); String localIndex = (String) testClusterInfo.get("local.index"); String remote1Index = (String) testClusterInfo.get("remote.index"); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index 69a2f995da761..c18d6d072986f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -360,12 +360,14 @@ public static void markClusterWithFinalStateAndNoShards( }); } + /** + * We will ignore the error if it's remote unavailable and the cluster is marked to skip unavailable. + */ public static boolean shouldIgnoreRuntimeError(EsqlExecutionInfo executionInfo, String clusterAlias, Exception e) { if (executionInfo.isSkipUnavailable(clusterAlias) == false) { return false; } - return ExceptionsHelper.isRemoteUnavailableException(e) - || (e instanceof RemoteTransportException && ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null); + return ExceptionsHelper.isRemoteUnavailableException(e); } } From 027b156ffd0ab4ee17129aebe62077c0210d008e Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Fri, 31 Jan 2025 20:58:14 +0000 Subject: [PATCH 11/17] [CI] Auto commit changes from spotless --- .../xpack/esql/action/CrossClustersCancellationIT.java | 2 -- .../java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java | 1 - 2 files changed, 3 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java index 374b8d2128e74..91610817af543 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java @@ -21,7 +21,6 @@ import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.core.TimeValue; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.AbstractMultiClustersTestCase; import org.elasticsearch.transport.TransportService; @@ -41,7 +40,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.instanceOf; public class CrossClustersCancellationIT extends AbstractMultiClustersTestCase { private static final String REMOTE_CLUSTER = "cluster-a"; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index c18d6d072986f..89cd4b3d4d7cd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -19,7 +19,6 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.license.XPackLicenseState; -import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.NoSuchRemoteClusterException; import org.elasticsearch.transport.RemoteClusterAware; From 8ef56ebd97ec23d554921009e60cb017a73e458c Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Fri, 31 Jan 2025 14:57:26 -0700 Subject: [PATCH 12/17] Refactor the listener code --- .../esql/plugin/ClusterComputeHandler.java | 19 ++++++------------- .../esql/plugin/RemoteListenerGroup.java | 13 ++----------- .../xpack/esql/session/EsqlCCSUtils.java | 19 +++++++++++++++++++ 3 files changed, 27 insertions(+), 24 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java index 467d6f3307c8d..da643f7d50a33 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java @@ -92,7 +92,12 @@ void startComputeOnRemoteCluster( childSessionId, queryPragmas.exchangeBufferSize(), esqlExecutor, - ActionListener.wrap(unused -> { + EsqlCCSUtils.skipUnavailableListener( + openExchangeListener, + executionInfo, + clusterAlias, + EsqlExecutionInfo.Cluster.Status.SKIPPED + ).delegateFailureAndWrap((l, unused) -> { var listenerGroup = new RemoteListenerGroup( transportService, rootTask, @@ -129,18 +134,6 @@ void startComputeOnRemoteCluster( TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor) ); - }, e -> { - if (EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) { - EsqlCCSUtils.markClusterWithFinalStateAndNoShards( - executionInfo, - clusterAlias, - EsqlExecutionInfo.Cluster.Status.SKIPPED, - e - ); - openExchangeListener.onResponse(null); - } else { - openExchangeListener.onFailure(e); - } }) ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java index 0f3b05b1b02fa..9ef085257b87b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java @@ -74,17 +74,8 @@ class RemoteListenerGroup { private ActionListener createCancellingListener(String reason, ActionListener delegate, Runnable finishGroup) { return ActionListener.runAfter(delegate.delegateResponse((inner, e) -> { taskManager.cancelTaskAndDescendants(groupTask, reason, true, ActionListener.running(() -> { - if (EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) { - EsqlCCSUtils.markClusterWithFinalStateAndNoShards( - executionInfo, - clusterAlias, - EsqlExecutionInfo.Cluster.Status.PARTIAL, - e - ); - delegate.onResponse(null); - } else { - delegate.onFailure(e); - } + EsqlCCSUtils.skipUnavailableListener(delegate, executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL) + .onFailure(e); })); }), finishGroup); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index 89cd4b3d4d7cd..63cd6e8bba810 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -369,4 +369,23 @@ public static boolean shouldIgnoreRuntimeError(EsqlExecutionInfo executionInfo, return ExceptionsHelper.isRemoteUnavailableException(e); } + + /** + * Wrap a listener so that it will skip errors that are ignorable + */ + public static ActionListener skipUnavailableListener( + ActionListener delegate, + EsqlExecutionInfo executionInfo, + String clusterAlias, + EsqlExecutionInfo.Cluster.Status status + ) { + return delegate.delegateResponse((l, e) -> { + if (EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) { + EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, status, e); + l.onResponse(null); + } else { + l.onFailure(e); + } + }); + } } From 23bdac02117b0638f31466d279599468f079ebeb Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Mon, 3 Feb 2025 10:59:42 -0700 Subject: [PATCH 13/17] Fix task for child request --- .../elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java index da643f7d50a33..5953be62e8315 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java @@ -130,7 +130,7 @@ void startComputeOnRemoteCluster( cluster.connection, ComputeService.CLUSTER_ACTION_NAME, clusterRequest, - rootTask, + listenerGroup.getGroupTask(), TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor) ); From 3eb5b68d5b0a619fbc3ce05ae10415f2f4cb4c0a Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Mon, 3 Feb 2025 13:23:25 -0700 Subject: [PATCH 14/17] Add more tests --- .../xpack/esql/session/EsqlCCSUtils.java | 4 +- .../xpack/esql/session/EsqlCCSUtilsTests.java | 282 +++++++++++------- 2 files changed, 173 insertions(+), 113 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index 63cd6e8bba810..64e5c6647e9ca 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -380,8 +380,8 @@ public static ActionListener skipUnavailableListener( EsqlExecutionInfo.Cluster.Status status ) { return delegate.delegateResponse((l, e) -> { - if (EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) { - EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, status, e); + if (shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) { + markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, status, e); l.onResponse(null); } else { l.onFailure(e); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java index 83e990f41241e..49cfbba5c7610 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java @@ -8,7 +8,9 @@ package org.elasticsearch.xpack.esql.session; import org.apache.lucene.index.CorruptIndexException; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.action.search.ShardSearchFailure; @@ -20,6 +22,7 @@ import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.license.internal.XPackLicenseStatus; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.NoSeedNodeLeftException; @@ -48,23 +51,27 @@ import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY; import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.checkForCcsLicense; +import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.shouldIgnoreRuntimeError; +import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.skipUnavailableListener; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; public class EsqlCCSUtilsTests extends ESTestCase { + private final String LOCAL_CLUSTER_ALIAS = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + private final String REMOTE1_ALIAS = "remote1"; + private final String REMOTE2_ALIAS = "remote2"; + public void testCreateIndexExpressionFromAvailableClusters() { - final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; - final String remote1Alias = "remote1"; - final String remote2Alias = "remote2"; // no clusters marked as skipped { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); - executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", true)); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true)); + executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true)); String indexExpr = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); List list = Arrays.stream(Strings.splitStringByCommaToArray(indexExpr)).toList(); @@ -78,12 +85,12 @@ public void testCreateIndexExpressionFromAvailableClusters() { // one cluster marked as skipped, so not present in revised index expression { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*,foo", true)); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*,foo", true)); executionInfo.swapCluster( - remote2Alias, + REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster( - remote2Alias, + REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true, EsqlExecutionInfo.Cluster.Status.SKIPPED @@ -99,15 +106,15 @@ public void testCreateIndexExpressionFromAvailableClusters() { // two clusters marked as skipped, so only local cluster present in revised index expression { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); executionInfo.swapCluster( - remote1Alias, - (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*,foo", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) + REMOTE1_ALIAS, + (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*,foo", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) ); executionInfo.swapCluster( - remote2Alias, + REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster( - remote2Alias, + REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true, EsqlExecutionInfo.Cluster.Status.SKIPPED @@ -121,13 +128,13 @@ public void testCreateIndexExpressionFromAvailableClusters() { { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); executionInfo.swapCluster( - remote1Alias, - (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*,foo", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) + REMOTE1_ALIAS, + (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*,foo", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) ); executionInfo.swapCluster( - remote2Alias, + REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster( - remote2Alias, + REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true, EsqlExecutionInfo.Cluster.Status.SKIPPED @@ -139,33 +146,30 @@ public void testCreateIndexExpressionFromAvailableClusters() { } public void testUpdateExecutionInfoWithUnavailableClusters() { - final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; - final String remote1Alias = "remote1"; - final String remote2Alias = "remote2"; // skip_unavailable=true clusters are unavailable, both marked as SKIPPED { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); - executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", true)); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true)); + executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true)); var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); - var unvailableClusters = Map.of(remote1Alias, failure, remote2Alias, failure); + var unvailableClusters = Map.of(REMOTE1_ALIAS, failure, REMOTE2_ALIAS, failure); EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, unvailableClusters); - assertThat(executionInfo.clusterAliases(), equalTo(Set.of(localClusterAlias, remote1Alias, remote2Alias))); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER_ALIAS, REMOTE1_ALIAS, REMOTE2_ALIAS))); assertNull(executionInfo.overallTook()); - EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); assertClusterStatusAndShardCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.SKIPPED); - EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS); assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.SKIPPED); } @@ -173,14 +177,17 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { // skip_unavailable=false cluster is unavailable, throws Exception { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); - executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true)); + executionInfo.swapCluster( + REMOTE2_ALIAS, + (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", false) + ); var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); RemoteTransportException e = expectThrows( RemoteTransportException.class, - () -> EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of(remote2Alias, failure)) + () -> EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of(REMOTE2_ALIAS, failure)) ); assertThat(e.status().getStatus(), equalTo(500)); assertThat( @@ -193,42 +200,42 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { // all clusters available, no Clusters in ExecutionInfo should be modified { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); - executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true)); + executionInfo.swapCluster( + REMOTE2_ALIAS, + (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", false) + ); EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of()); - assertThat(executionInfo.clusterAliases(), equalTo(Set.of(localClusterAlias, remote1Alias, remote2Alias))); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER_ALIAS, REMOTE1_ALIAS, REMOTE2_ALIAS))); assertNull(executionInfo.overallTook()); - EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); assertClusterStatusAndShardCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS); assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); } } public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { - final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; - final String remote1Alias = "remote1"; - final String remote2Alias = "remote2"; // all clusters had matching indices from field-caps call, so no updates to EsqlExecutionInfo should happen { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", randomBoolean())); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", randomBoolean())); executionInfo.swapCluster( - remote2Alias, - (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", randomBoolean()) + REMOTE2_ALIAS, + (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", randomBoolean()) ); EsIndex esIndex = new EsIndex( @@ -253,15 +260,15 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); - EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); assertClusterStatusAndShardCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS); assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); } @@ -270,11 +277,11 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { // marked as SKIPPED with 0 total shards, 0 took time, etc. { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", randomBoolean())); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", randomBoolean())); executionInfo.swapCluster( - remote2Alias, - (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", randomBoolean()) + REMOTE2_ALIAS, + (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", randomBoolean()) ); EsIndex esIndex = new EsIndex( @@ -297,11 +304,11 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); - EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); assertThat(remote1Cluster.getTook().millis(), equalTo(0L)); @@ -310,7 +317,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { assertThat(remote1Cluster.getSkippedShards(), equalTo(0)); assertThat(remote1Cluster.getFailedShards(), equalTo(0)); - EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS); assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); } @@ -320,11 +327,11 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { // marked as SKIPPED { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", randomBoolean())); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", randomBoolean())); executionInfo.swapCluster( - remote2Alias, - (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1*,mylogs2*,logs*", randomBoolean()) + REMOTE2_ALIAS, + (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1*,mylogs2*,logs*", randomBoolean()) ); EsIndex esIndex = new EsIndex( @@ -334,22 +341,22 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { ); // remote1 is unavailable var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); - Map unavailableClusters = Map.of(remote1Alias, failure); + Map unavailableClusters = Map.of(REMOTE1_ALIAS, failure); IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters); EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); - EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); // since remote1 is in the unavailable Map (passed to IndexResolution.valid), it's status will not be changed // by updateExecutionInfoWithClustersWithNoMatchingIndices (it is handled in updateExecutionInfoWithUnavailableClusters) assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); - EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS); assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1*,mylogs2*,logs*")); assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); assertThat(remote2Cluster.getTook().millis(), equalTo(0L)); @@ -363,11 +370,11 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { // but had no matching indices and since a concrete index was requested, a VerificationException is thrown { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*")); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", randomBoolean())); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*")); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", randomBoolean())); executionInfo.swapCluster( - remote2Alias, - (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", randomBoolean()) + REMOTE2_ALIAS, + (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", randomBoolean()) ); EsIndex esIndex = new EsIndex( @@ -377,7 +384,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { ); var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); - Map unavailableClusters = Map.of(remote1Alias, failure); + Map unavailableClusters = Map.of(REMOTE1_ALIAS, failure); IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters); VerificationException ve = expectThrows( VerificationException.class, @@ -390,13 +397,13 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { // (the EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters() method handles that case not the one tested here) { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*")); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", randomBoolean())); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*")); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", randomBoolean())); // remote2 is already marked as SKIPPED (simulating failed enrich policy lookup due to unavailable cluster) executionInfo.swapCluster( - remote2Alias, + REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster( - remote2Alias, + REMOTE2_ALIAS, "mylogs1*,mylogs2*,logs*", randomBoolean(), EsqlExecutionInfo.Cluster.Status.SKIPPED @@ -411,22 +418,22 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { // remote1 is unavailable var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); - Map unavailableClusters = Map.of(remote1Alias, failure); + Map unavailableClusters = Map.of(REMOTE1_ALIAS, failure); IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters); EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); - EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); // since remote1 is in the unavailable Map (passed to IndexResolution.valid), it's status will not be changed // by updateExecutionInfoWithClustersWithNoMatchingIndices (it is handled in updateExecutionInfoWithUnavailableClusters) assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); - EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS); assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1*,mylogs2*,logs*")); assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); } @@ -489,16 +496,15 @@ public void testDetermineUnavailableRemoteClusters() { } public void testUpdateExecutionInfoAtEndOfPlanning() { - String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; - String remote1Alias = "remote1"; - String remote2Alias = "remote2"; + String REMOTE1_ALIAS = "remote1"; + String REMOTE2_ALIAS = "remote2"; EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); executionInfo.swapCluster( - remote1Alias, - (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) + REMOTE1_ALIAS, + (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) ); - executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); + executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", false)); assertNull(executionInfo.planningTookTime()); assertNull(executionInfo.overallTook()); @@ -517,7 +523,7 @@ public void testUpdateExecutionInfoAtEndOfPlanning() { assertNull(localCluster.getTotalShards()); assertNull(localCluster.getTook()); - EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); assertThat(remote1Cluster.getTotalShards(), equalTo(0)); assertThat(remote1Cluster.getSuccessfulShards(), equalTo(0)); @@ -526,7 +532,7 @@ public void testUpdateExecutionInfoAtEndOfPlanning() { assertThat(remote1Cluster.getTook().millis(), greaterThanOrEqualTo(0L)); assertThat(remote1Cluster.getTook().millis(), equalTo(executionInfo.planningTookTime().millis())); - EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS); assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); assertNull(remote2Cluster.getTotalShards()); assertNull(remote2Cluster.getTook()); @@ -568,35 +574,32 @@ private static Map randomMapping() { } public void testReturnSuccessWithEmptyResult() { - String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; - String remote1Alias = "remote1"; - String remote2Alias = "remote2"; String remote3Alias = "remote3"; NoClustersToSearchException noClustersException = new NoClustersToSearchException(); Predicate skipUnPredicate = s -> { - if (s.equals("remote2") || s.equals("remote3")) { + if (s.equals(REMOTE2_ALIAS) || s.equals("remote3")) { return true; } return false; }; - EsqlExecutionInfo.Cluster localCluster = new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false); - EsqlExecutionInfo.Cluster remote1 = new EsqlExecutionInfo.Cluster(remote1Alias, "logs*", false); - EsqlExecutionInfo.Cluster remote2 = new EsqlExecutionInfo.Cluster(remote2Alias, "logs*", true); + EsqlExecutionInfo.Cluster localCluster = new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false); + EsqlExecutionInfo.Cluster remote1 = new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "logs*", false); + EsqlExecutionInfo.Cluster remote2 = new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "logs*", true); EsqlExecutionInfo.Cluster remote3 = new EsqlExecutionInfo.Cluster(remote3Alias, "logs*", true); // not a cross-cluster cluster search, so do not return empty result { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); - executionInfo.swapCluster(localClusterAlias, (k, v) -> localCluster); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> localCluster); assertFalse(EsqlCCSUtils.returnSuccessWithEmptyResult(executionInfo, noClustersException)); } // local cluster is present, so do not return empty result { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); - executionInfo.swapCluster(localClusterAlias, (k, v) -> localCluster); - executionInfo.swapCluster(remote1Alias, (k, v) -> remote1); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> localCluster); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> remote1); // TODO: this logic will be added in the follow-on PR that handles missing indices // assertFalse(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, noClustersException)); } @@ -604,8 +607,8 @@ public void testReturnSuccessWithEmptyResult() { // remote-only, one cluster is skip_unavailable=false, so do not return empty result { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); - executionInfo.swapCluster(remote1Alias, (k, v) -> remote1); - executionInfo.swapCluster(remote2Alias, (k, v) -> remote2); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> remote1); + executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> remote2); assertFalse(EsqlCCSUtils.returnSuccessWithEmptyResult(executionInfo, noClustersException)); } @@ -613,7 +616,7 @@ public void testReturnSuccessWithEmptyResult() { // NoSuchClustersException or "remote unavailable" type exception { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); - executionInfo.swapCluster(remote2Alias, (k, v) -> remote2); + executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> remote2); executionInfo.swapCluster(remote3Alias, (k, v) -> remote3); Exception e = randomFrom( new NoSuchRemoteClusterException("foo"), @@ -628,16 +631,15 @@ public void testReturnSuccessWithEmptyResult() { // Note: this functionality may change in follow-on PRs, so remove this test in that case { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); - executionInfo.swapCluster(remote2Alias, (k, v) -> remote2); + executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> remote2); executionInfo.swapCluster(remote3Alias, (k, v) -> remote3); assertFalse(EsqlCCSUtils.returnSuccessWithEmptyResult(executionInfo, new NullPointerException())); } } public void testUpdateExecutionInfoToReturnEmptyResult() { - String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; - String remote1Alias = "remote1"; - String remote2Alias = "remote2"; + String REMOTE1_ALIAS = "remote1"; + String REMOTE2_ALIAS = "remote2"; String remote3Alias = "remote3"; ConnectTransportException transportEx = new ConnectTransportException(null, "foo"); Predicate skipUnPredicate = s -> { @@ -647,9 +649,9 @@ public void testUpdateExecutionInfoToReturnEmptyResult() { return false; }; - EsqlExecutionInfo.Cluster localCluster = new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false); - EsqlExecutionInfo.Cluster remote1 = new EsqlExecutionInfo.Cluster(remote1Alias, "logs*", true); - EsqlExecutionInfo.Cluster remote2 = new EsqlExecutionInfo.Cluster(remote2Alias, "logs*", true); + EsqlExecutionInfo.Cluster localCluster = new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false); + EsqlExecutionInfo.Cluster remote1 = new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "logs*", true); + EsqlExecutionInfo.Cluster remote2 = new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "logs*", true); EsqlExecutionInfo.Cluster remote3 = new EsqlExecutionInfo.Cluster(remote3Alias, "logs*", true); EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); @@ -663,10 +665,10 @@ public void testUpdateExecutionInfoToReturnEmptyResult() { EsqlCCSUtils.updateExecutionInfoToReturnEmptyResult(executionInfo, transportEx); assertNotNull(executionInfo.overallTook()); - assertThat(executionInfo.getCluster(localClusterAlias).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); - assertThat(executionInfo.getCluster(localClusterAlias).getFailures().size(), equalTo(0)); + assertThat(executionInfo.getCluster(LOCAL_CLUSTER_ALIAS).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(executionInfo.getCluster(LOCAL_CLUSTER_ALIAS).getFailures().size(), equalTo(0)); - for (String remoteAlias : Set.of(remote1Alias, remote2Alias, remote3Alias)) { + for (String remoteAlias : Set.of(REMOTE1_ALIAS, REMOTE2_ALIAS, remote3Alias)) { assertThat(executionInfo.getCluster(remoteAlias).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); List remoteFailures = executionInfo.getCluster(remoteAlias).getFailures(); assertThat(remoteFailures.size(), equalTo(1)); @@ -766,6 +768,64 @@ public void testCheckForCcsLicense() { } } + public void testShouldIgnoreRuntimeError() { + Predicate skipUnPredicate = s -> s.equals(REMOTE1_ALIAS); + + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, true); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true)); + executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", false)); + + // remote1: skip_unavailable=true, so should ignore connect errors, but not others + assertThat( + shouldIgnoreRuntimeError(executionInfo, REMOTE1_ALIAS, new IllegalStateException("Unable to open any connections")), + is(true) + ); + assertThat(shouldIgnoreRuntimeError(executionInfo, REMOTE1_ALIAS, new TaskCancelledException("task cancelled")), is(false)); + assertThat(shouldIgnoreRuntimeError(executionInfo, REMOTE1_ALIAS, new ElasticsearchException("something is wrong")), is(false)); + // remote2: skip_unavailable=false, so should not ignore any errors + assertThat( + shouldIgnoreRuntimeError(executionInfo, REMOTE2_ALIAS, new IllegalStateException("Unable to open any connections")), + is(false) + ); + assertThat(shouldIgnoreRuntimeError(executionInfo, REMOTE2_ALIAS, new TaskCancelledException("task cancelled")), is(false)); + // same for local + assertThat( + shouldIgnoreRuntimeError(executionInfo, LOCAL_CLUSTER_ALIAS, new IllegalStateException("Unable to open any connections")), + is(false) + ); + assertThat(shouldIgnoreRuntimeError(executionInfo, LOCAL_CLUSTER_ALIAS, new TaskCancelledException("task cancelled")), is(false)); + } + + public void testSkipUnavailableListener() { + Predicate skipUnPredicate = s -> s.equals(REMOTE1_ALIAS); + + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, true); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true)); + executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", false)); + + ActionListener expectResult = ActionListener.wrap(unused -> {}, (e) -> fail("Listener should not have failed")); + ActionListener expectFailure = ActionListener.wrap(unused -> fail("Listener should have failed"), (e) -> {}); + + // snip_unavailable=true but not connect exception, so should fail + skipUnavailableListener(expectFailure, executionInfo, REMOTE1_ALIAS, EsqlExecutionInfo.Cluster.Status.PARTIAL).onFailure( + new ElasticsearchException("something is wrong") + ); + assertThat(executionInfo.getCluster(REMOTE1_ALIAS).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); + + // snip_unavailable=true, so should not fail + skipUnavailableListener(expectResult, executionInfo, REMOTE1_ALIAS, EsqlExecutionInfo.Cluster.Status.PARTIAL).onFailure( + new IllegalStateException("Unable to open any connections") + ); + assertThat(executionInfo.getCluster(REMOTE1_ALIAS).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + // snip_unavailable=false, so should fail + skipUnavailableListener(expectFailure, executionInfo, REMOTE2_ALIAS, EsqlExecutionInfo.Cluster.Status.PARTIAL).onFailure( + new IllegalStateException("Unable to open any connections") + ); + + } + private XPackLicenseStatus activeLicenseStatus(License.OperationMode operationMode) { return new XPackLicenseStatus(operationMode, true, null); } From ff431f9c9dddefe18780a4cb1a06e59cb1ab41d3 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Mon, 3 Feb 2025 13:25:39 -0700 Subject: [PATCH 15/17] Bump timeout to give slow clusters more time --- .../org/elasticsearch/xpack/esql/action/EsqlAsyncTestUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlAsyncTestUtils.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlAsyncTestUtils.java index d7117fb5e0750..ca10e5d3419ac 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlAsyncTestUtils.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlAsyncTestUtils.java @@ -98,7 +98,7 @@ public static void waitForCluster(Client client, String clusterName, String asyn } assertThat(clusterInfo.getStatus(), not(equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING))); } - }); + }, 30, TimeUnit.SECONDS); } public static EsqlQueryResponse runAsyncQuery(Client client, EsqlQueryRequest request) { From c9be7dc157dfd3b30af2ba88f3e0fccc215e76f4 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Mon, 3 Feb 2025 13:43:25 -0700 Subject: [PATCH 16/17] Test fixes --- .../xpack/esql/action/AbstractCrossClusterTestCase.java | 2 ++ .../elasticsearch/xpack/esql/action/CrossClusterQueryIT.java | 1 + 2 files changed, 3 insertions(+) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java index d9149ee291fdf..1c2ac0e10b792 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java @@ -18,6 +18,7 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.test.FailingFieldPlugin; import org.elasticsearch.test.XContentTestUtils; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xcontent.XContentBuilder; @@ -63,6 +64,7 @@ protected Collection> nodePlugins(String clusterAlias) { plugins.add(CrossClusterAsyncQueryIT.InternalExchangePlugin.class); plugins.add(SimplePauseFieldPlugin.class); plugins.add(FailingPauseFieldPlugin.class); + plugins.add(FailingFieldPlugin.class); plugins.add(CrossClusterAsyncQueryIT.CountingPauseFieldPlugin.class); return plugins; } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java index 35eb4ce410aae..c1976c9fa2ad8 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.TermsQueryBuilder; +import org.elasticsearch.test.FailingFieldPlugin; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.XContentTestUtils; import org.elasticsearch.transport.TransportService; From 39216e2be97daacc2276fb5bc4f79ee2d1d40c92 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Mon, 3 Feb 2025 14:32:05 -0700 Subject: [PATCH 17/17] Fix test --- .../esql/action/CrossClusterQueryUnavailableRemotesIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java index 5536959b6d854..667ac23461000 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java @@ -251,7 +251,7 @@ public void testRemoteOnlyCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() assertThat(remoteCluster.getFailedShards(), equalTo(0)); // ensure that the _clusters metadata is present only if requested - assertClusterMetadataInResponse(resp, responseExpectMeta, numClusters); + assertClusterMetadataInResponse(resp, responseExpectMeta, 1); } // close remote cluster 2 so that it is also unavailable @@ -304,7 +304,7 @@ public void testRemoteOnlyCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() assertThat(remote2Cluster.getFailedShards(), equalTo(0)); // ensure that the _clusters metadata is present only if requested - assertClusterMetadataInResponse(resp, responseExpectMeta, numClusters); + assertClusterMetadataInResponse(resp, responseExpectMeta, 2); } } finally {