From 318d42458f65636e261d1bc6e3ddf97569080914 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Wed, 6 Nov 2024 14:49:36 -0700 Subject: [PATCH 01/24] Ignore failures on skip_unavailable --- .../xpack/esql/plugin/ComputeListener.java | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java index 49af4a593e6e5..317adb0217f99 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.esql.plugin; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.compute.operator.FailureCollector; @@ -188,6 +189,27 @@ ActionListener acquireAvoid() { }); } + ActionListener acquireSkipUnavailable(@Nullable String computeClusterAlias) { + if (computeClusterAlias == null + || esqlExecutionInfo.isCrossClusterSearch() == false + || runningOnRemoteCluster() + || computeClusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) + || esqlExecutionInfo.isSkipUnavailable(computeClusterAlias) == false) { + return acquireAvoid(); + } + return refs.acquire().delegateResponse((l, e) -> { + LOGGER.error("Skipping unavailable cluster {} in ESQL query: {}", computeClusterAlias, e); + esqlExecutionInfo.swapCluster(computeClusterAlias, (k, v) -> { + assert v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED + : "We shouldn't be running compute on a cluster that's already marked as skipped"; + return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.PARTIAL) + .setFailures(List.of(new ShardSearchFailure(e))) + .build(); + }); + l.onResponse(null); + }); + } + /** * Acquires a new listener that collects compute result. This listener will also collect warnings emitted during compute * @param computeClusterAlias The cluster alias where the compute is happening. Used when metadata needs to be gathered @@ -198,7 +220,7 @@ ActionListener acquireCompute(@Nullable String computeClusterAl assert computeClusterAlias == null || (esqlExecutionInfo != null && esqlExecutionInfo.getRelativeStartNanos() != null) : "When clusterAlias is provided to acquireCompute, executionInfo and relativeStartTimeNanos must be non-null"; - return acquireAvoid().map(resp -> { + return acquireSkipUnavailable(computeClusterAlias).map(resp -> { responseHeaders.collect(); var profiles = resp.getProfiles(); if (profiles != null && profiles.isEmpty() == false) { From 378f71a9df0455c09f18e116407b4311a077a586 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Thu, 7 Nov 2024 11:59:51 -0700 Subject: [PATCH 02/24] Cover more cases for skip --- .../xpack/esql/plugin/ComputeListener.java | 36 ++++++++++++------- .../xpack/esql/plugin/ComputeService.java | 20 ++++++++--- 2 files changed, 39 insertions(+), 17 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java index 317adb0217f99..a874203b9846b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java @@ -189,23 +189,33 @@ ActionListener acquireAvoid() { }); } + boolean shouldIgnoreRemoteErrors(@Nullable String computeClusterAlias) { + return computeClusterAlias != null + && esqlExecutionInfo.isCrossClusterSearch() + && computeClusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false + && runningOnRemoteCluster() == false + && esqlExecutionInfo.isSkipUnavailable(computeClusterAlias); + } + + void markAsPartial(String computeClusterAlias, Exception e) { + // We use PARTIAL here because we can not know whether the cluster have already sent any data. + esqlExecutionInfo.swapCluster(computeClusterAlias, (k, v) -> { + assert v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED + : "We shouldn't be running compute on a cluster that's already marked as skipped"; + return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.PARTIAL) + .setFailures(List.of(new ShardSearchFailure(e))) + .build(); + }); + } + ActionListener acquireSkipUnavailable(@Nullable String computeClusterAlias) { - if (computeClusterAlias == null - || esqlExecutionInfo.isCrossClusterSearch() == false - || runningOnRemoteCluster() - || computeClusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) - || esqlExecutionInfo.isSkipUnavailable(computeClusterAlias) == false) { + if (shouldIgnoreRemoteErrors(computeClusterAlias) == false) { return acquireAvoid(); } return refs.acquire().delegateResponse((l, e) -> { - LOGGER.error("Skipping unavailable cluster {} in ESQL query: {}", computeClusterAlias, e); - esqlExecutionInfo.swapCluster(computeClusterAlias, (k, v) -> { - assert v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED - : "We shouldn't be running compute on a cluster that's already marked as skipped"; - return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.PARTIAL) - .setFailures(List.of(new ShardSearchFailure(e))) - .build(); - }); + // TODO: drop this in final patch + LOGGER.error("Marking failed cluster {} as partial: {}", computeClusterAlias, e); + markAsPartial(computeClusterAlias, e); l.onResponse(null); }); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index ffad379001ed0..e67ed21cffd4d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -376,20 +376,23 @@ private void startComputeOnRemoteClusters( var linkExchangeListeners = ActionListener.releaseAfter(computeListener.acquireAvoid(), exchangeSource.addEmptySink()); try (RefCountingListener refs = new RefCountingListener(linkExchangeListeners)) { for (RemoteCluster cluster : clusters) { + final String clusterAlias = cluster.clusterAlias(); + final boolean shouldSkipOnFailure = computeListener.shouldIgnoreRemoteErrors(clusterAlias); + final var exchangeListener = refs.acquire(); ExchangeService.openExchange( transportService, cluster.connection, sessionId, queryPragmas.exchangeBufferSize(), esqlExecutor, - refs.acquire().delegateFailureAndWrap((l, unused) -> { + ActionListener.wrap(unused -> { var remoteSink = exchangeService.newRemoteSink(rootTask, sessionId, transportService, cluster.connection); exchangeSource.addRemoteSink(remoteSink, queryPragmas.concurrentExchangeClients()); var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices); - var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, sessionId, configuration, remotePlan); + var clusterRequest = new ClusterComputeRequest(clusterAlias, sessionId, configuration, remotePlan); var clusterListener = ActionListener.runBefore( - computeListener.acquireCompute(cluster.clusterAlias()), - () -> l.onResponse(null) + computeListener.acquireCompute(clusterAlias), + () -> exchangeListener.onResponse(null) ); transportService.sendChildRequest( cluster.connection, @@ -399,6 +402,15 @@ private void startComputeOnRemoteClusters( TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor) ); + }, e -> { + if (shouldSkipOnFailure) { + // TODO: drop this in final patch + LOGGER.error("Marking failed cluster {} as partial: {}", clusterAlias, e); + computeListener.markAsPartial(clusterAlias, e); + exchangeListener.onResponse(null); + } else { + exchangeListener.onFailure(e); + } }) ); } From dc8ec22cc690460c577d8df6aa64d71fc58c034a Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Thu, 7 Nov 2024 13:12:53 -0700 Subject: [PATCH 03/24] Fix existing tests by defaulting skip_un to false --- .../java/org/elasticsearch/xpack/esql/ccq/Clusters.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java index fa8cb49c59aed..0734bbf0d98ef 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java @@ -31,6 +31,10 @@ public static ElasticsearchCluster remoteCluster() { } public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteCluster) { + return localCluster(remoteCluster, false); + } + + public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteCluster, Boolean skipUnavailable) { return ElasticsearchCluster.local() .name(LOCAL_CLUSTER_NAME) .distribution(DistributionType.DEFAULT) @@ -40,6 +44,7 @@ public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteClust .setting("xpack.license.self_generated.type", "trial") .setting("node.roles", "[data,ingest,master,remote_cluster_client]") .setting("cluster.remote.remote_cluster.seeds", () -> "\"" + remoteCluster.getTransportEndpoint(0) + "\"") + .setting("cluster.remote.remote_cluster.skip_unavailable", skipUnavailable.toString()) .setting("cluster.remote.connections_per_cluster", "1") .shared(true) .setting("cluster.routing.rebalance.enable", "none") From dc1e4fa001ea58221d891b658bd5305a9d22e843 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Thu, 7 Nov 2024 18:03:31 -0700 Subject: [PATCH 04/24] Add tests for both skip_un settings --- .../xpack/esql/ccq/Clusters.java | 2 +- .../xpack/esql/ccq/EsqlRestValidationIT.java | 2 +- .../EsqlRestValidationSkipUnavailableIT.java | 37 +++++++++++++++++++ .../qa/rest/EsqlRestValidationTestCase.java | 14 ++++--- 4 files changed, 48 insertions(+), 7 deletions(-) create mode 100644 x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationSkipUnavailableIT.java diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java index 0734bbf0d98ef..019fd42a4fa19 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java @@ -31,7 +31,7 @@ public static ElasticsearchCluster remoteCluster() { } public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteCluster) { - return localCluster(remoteCluster, false); + return localCluster(remoteCluster, true); } public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteCluster, Boolean skipUnavailable) { diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationIT.java index 21307c5362417..ba03e0286e610 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationIT.java @@ -28,7 +28,7 @@ @ThreadLeakFilters(filters = TestClustersThreadFilter.class) public class EsqlRestValidationIT extends EsqlRestValidationTestCase { static ElasticsearchCluster remoteCluster = Clusters.remoteCluster(); - static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster); + static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster, false); @ClassRule public static TestRule clusterRule = RuleChain.outerRule(remoteCluster).around(localCluster); diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationSkipUnavailableIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationSkipUnavailableIT.java new file mode 100644 index 0000000000000..eb2d722d0a482 --- /dev/null +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationSkipUnavailableIT.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.ccq; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.elasticsearch.test.TestClustersThreadFilter; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.junit.ClassRule; +import org.junit.rules.RuleChain; +import org.junit.rules.TestRule; + +import java.io.IOException; + +@ThreadLeakFilters(filters = TestClustersThreadFilter.class) +public class EsqlRestValidationSkipUnavailableIT extends EsqlRestValidationIT { + static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster, true); + + @ClassRule + public static TestRule clusterRule = RuleChain.outerRule(remoteCluster).around(localCluster); + + @Override + protected String getTestRestCluster() { + return localCluster.getHttpAddresses(); + } + + @Override + protected void assertErrorMessageMaybe(String indexName, String errorMessage, int statusCode) throws IOException { + assertValidRequestOnIndices(new String[] { indexName }); + } + +} diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlRestValidationTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlRestValidationTestCase.java index 9ec4f60f4c843..ef63bd10504ef 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlRestValidationTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlRestValidationTestCase.java @@ -87,7 +87,7 @@ public void testInexistentIndexNameWithoutWildcard() throws IOException { public void testExistentIndexWithoutWildcard() throws IOException { for (String indexName : existentIndexWithoutWildcard) { - assertErrorMessage(indexName, "\"reason\" : \"no such index [inexistent]\"", 404); + assertErrorMessageMaybe(indexName, "\"reason\" : \"no such index [inexistent]\"", 404); } } @@ -95,18 +95,22 @@ public void testExistentIndexWithWildcard() throws IOException { assertValidRequestOnIndices(existentIndexWithWildcard); } + protected void assertErrorMessageMaybe(String indexName, String errorMessage, int statusCode) throws IOException { + assertErrorMessage(indexName, errorMessage, statusCode); + } + public void testAlias() throws IOException { createAlias(); for (String indexName : existentAliasWithoutWildcard) { - assertErrorMessage(indexName, "\"reason\" : \"no such index [inexistent]\"", 404); + assertErrorMessageMaybe(indexName, "\"reason\" : \"no such index [inexistent]\"", 404); } assertValidRequestOnIndices(existentAliasWithWildcard); deleteAlias(); } - private void assertErrorMessages(String[] indices, String errorMessage, int statusCode) throws IOException { + protected void assertErrorMessages(String[] indices, String errorMessage, int statusCode) throws IOException { for (String indexName : indices) { assertErrorMessage(indexName, errorMessage + "[" + clusterSpecificIndexName(indexName) + "]", statusCode); } @@ -116,7 +120,7 @@ protected String clusterSpecificIndexName(String indexName) { return indexName; } - private void assertErrorMessage(String indexName, String errorMessage, int statusCode) throws IOException { + protected void assertErrorMessage(String indexName, String errorMessage, int statusCode) throws IOException { var specificName = clusterSpecificIndexName(indexName); final var request = createRequest(specificName); ResponseException exc = expectThrows(ResponseException.class, () -> client().performRequest(request)); @@ -138,7 +142,7 @@ private Request createRequest(String indexName) throws IOException { return request; } - private void assertValidRequestOnIndices(String[] indices) throws IOException { + protected void assertValidRequestOnIndices(String[] indices) throws IOException { for (String indexName : indices) { final var request = createRequest(clusterSpecificIndexName(indexName)); Response response = client().performRequest(request); From 6c09cb431fc63687c06a248dc98826d6091b32dd Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Tue, 12 Nov 2024 14:53:45 -0700 Subject: [PATCH 05/24] Add handling remote sink failures --- .../operator/exchange/ExchangeService.java | 23 ++++++++++++++++--- .../exchange/ExchangeServiceTests.java | 4 ++-- .../xpack/esql/plugin/ComputeListener.java | 3 +++ .../xpack/esql/plugin/ComputeService.java | 14 +++++++---- 4 files changed, 35 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java index 06059944f1310..42808d0564a1c 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java @@ -42,6 +42,7 @@ import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; /** * {@link ExchangeService} is responsible for exchanging pages between exchange sinks and sources on the same or different nodes. @@ -277,9 +278,16 @@ protected void doRun() { * @param exchangeId the exchange ID * @param transportService the transport service * @param conn the connection to the remote node where the remote exchange sink is located + * @param failureCollector if not null, the failures will be sent to this consumer and the sink will be marked as finished */ - public RemoteSink newRemoteSink(Task parentTask, String exchangeId, TransportService transportService, Transport.Connection conn) { - return new TransportRemoteSink(transportService, blockFactory, conn, parentTask, exchangeId, executor); + public RemoteSink newRemoteSink( + Task parentTask, + String exchangeId, + TransportService transportService, + Transport.Connection conn, + Consumer failureCollector + ) { + return new TransportRemoteSink(transportService, blockFactory, conn, parentTask, exchangeId, executor, failureCollector); } static final class TransportRemoteSink implements RemoteSink { @@ -289,6 +297,7 @@ static final class TransportRemoteSink implements RemoteSink { final Task parentTask; final String exchangeId; final Executor responseExecutor; + final Consumer failureCollector; final AtomicLong estimatedPageSizeInBytes = new AtomicLong(0L); @@ -298,7 +307,8 @@ static final class TransportRemoteSink implements RemoteSink { Transport.Connection connection, Task parentTask, String exchangeId, - Executor responseExecutor + Executor responseExecutor, + Consumer failureCollector ) { this.transportService = transportService; this.blockFactory = blockFactory; @@ -306,6 +316,7 @@ static final class TransportRemoteSink implements RemoteSink { this.parentTask = parentTask; this.exchangeId = exchangeId; this.responseExecutor = responseExecutor; + this.failureCollector = failureCollector; } @Override @@ -316,6 +327,12 @@ public void fetchPageAsync(boolean allSourcesFinished, ActionListener blockFactory.breaker().addWithoutBreaking(-reservedBytes)); } + if (failureCollector != null) { + listener = listener.delegateResponse((l, ex) -> { + failureCollector.accept(ex); + l.onResponse(new ExchangeResponse(blockFactory, null, true)); + }); + } transportService.sendChildRequest( connection, EXCHANGE_ACTION_NAME, diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java index 9e07f9c8f5faf..6ad91dee9d790 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java @@ -383,7 +383,7 @@ public void testConcurrentWithTransportActions() { sourceHandler.addCompletionListener(sourceCompletionFuture); ExchangeSinkHandler sinkHandler = exchange1.createSinkHandler(exchangeId, randomExchangeBuffer()); Transport.Connection connection = node0.getConnection(node1.getLocalNode()); - sourceHandler.addRemoteSink(exchange0.newRemoteSink(task, exchangeId, node0, connection), randomIntBetween(1, 5)); + sourceHandler.addRemoteSink(exchange0.newRemoteSink(task, exchangeId, node0, connection, null), randomIntBetween(1, 5)); final int maxInputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000); final int maxOutputSeqNo = rarely() ? -1 : randomIntBetween(0, 50_000); runConcurrentTest(maxInputSeqNo, maxOutputSeqNo, sourceHandler::createExchangeSource, sinkHandler::createExchangeSink); @@ -442,7 +442,7 @@ public void sendResponse(TransportResponse transportResponse) { sourceHandler.addCompletionListener(sourceCompletionFuture); ExchangeSinkHandler sinkHandler = exchange1.createSinkHandler(exchangeId, randomIntBetween(1, 128)); Transport.Connection connection = node0.getConnection(node1.getLocalNode()); - sourceHandler.addRemoteSink(exchange0.newRemoteSink(task, exchangeId, node0, connection), randomIntBetween(1, 5)); + sourceHandler.addRemoteSink(exchange0.newRemoteSink(task, exchangeId, node0, connection, null), randomIntBetween(1, 5)); Exception err = expectThrows( Exception.class, () -> runConcurrentTest(maxSeqNo, maxSeqNo, sourceHandler::createExchangeSource, sinkHandler::createExchangeSink) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java index a874203b9846b..ee1049fff3ad2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java @@ -202,6 +202,9 @@ void markAsPartial(String computeClusterAlias, Exception e) { esqlExecutionInfo.swapCluster(computeClusterAlias, (k, v) -> { assert v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED : "We shouldn't be running compute on a cluster that's already marked as skipped"; + if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.PARTIAL) { + return v; + } return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.PARTIAL) .setFailures(List.of(new ShardSearchFailure(e))) .build(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index e67ed21cffd4d..e04144d61d671 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -335,7 +335,7 @@ private void startComputeOnDataNodes( queryPragmas.exchangeBufferSize(), esqlExecutor, refs.acquire().delegateFailureAndWrap((l, unused) -> { - var remoteSink = exchangeService.newRemoteSink(parentTask, sessionId, transportService, node.connection); + var remoteSink = exchangeService.newRemoteSink(parentTask, sessionId, transportService, node.connection, null); exchangeSource.addRemoteSink(remoteSink, queryPragmas.concurrentExchangeClients()); ActionListener computeResponseListener = computeListener.acquireCompute(clusterAlias); var dataNodeListener = ActionListener.runBefore(computeResponseListener, () -> l.onResponse(null)); @@ -377,7 +377,7 @@ private void startComputeOnRemoteClusters( try (RefCountingListener refs = new RefCountingListener(linkExchangeListeners)) { for (RemoteCluster cluster : clusters) { final String clusterAlias = cluster.clusterAlias(); - final boolean shouldSkipOnFailure = computeListener.shouldIgnoreRemoteErrors(clusterAlias); + final boolean suppressRemoteFailure = computeListener.shouldIgnoreRemoteErrors(clusterAlias); final var exchangeListener = refs.acquire(); ExchangeService.openExchange( transportService, @@ -386,7 +386,13 @@ private void startComputeOnRemoteClusters( queryPragmas.exchangeBufferSize(), esqlExecutor, ActionListener.wrap(unused -> { - var remoteSink = exchangeService.newRemoteSink(rootTask, sessionId, transportService, cluster.connection); + var remoteSink = exchangeService.newRemoteSink( + rootTask, + sessionId, + transportService, + cluster.connection, + suppressRemoteFailure ? ex -> computeListener.markAsPartial(clusterAlias, ex) : null + ); exchangeSource.addRemoteSink(remoteSink, queryPragmas.concurrentExchangeClients()); var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices); var clusterRequest = new ClusterComputeRequest(clusterAlias, sessionId, configuration, remotePlan); @@ -403,7 +409,7 @@ private void startComputeOnRemoteClusters( new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor) ); }, e -> { - if (shouldSkipOnFailure) { + if (suppressRemoteFailure) { // TODO: drop this in final patch LOGGER.error("Marking failed cluster {} as partial: {}", clusterAlias, e); computeListener.markAsPartial(clusterAlias, e); From 2842278f84302678c5b90b0072c1e7cc0d60ebe0 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Wed, 13 Nov 2024 16:53:38 -0700 Subject: [PATCH 06/24] Update docs/changelog/116365.yaml --- docs/changelog/116365.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/116365.yaml diff --git a/docs/changelog/116365.yaml b/docs/changelog/116365.yaml new file mode 100644 index 0000000000000..b799be4e86f30 --- /dev/null +++ b/docs/changelog/116365.yaml @@ -0,0 +1,5 @@ +pr: 116365 +summary: Ignore remote ES|QL execution failures when skip_unavailable=true +area: ES|QL +type: enhancement +issues: [] From 1135803d36452b17a580efab28861d248fbd7c9c Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Thu, 14 Nov 2024 11:08:59 -0700 Subject: [PATCH 07/24] Enable runtime missing index tests --- .../esql/action/CrossClustersQueryIT.java | 78 ++++++++++++------- .../xpack/esql/plugin/ComputeListener.java | 20 +++-- .../xpack/esql/plugin/ComputeService.java | 2 - .../esql/session/EsqlSessionCCSUtils.java | 71 +++++++++-------- 4 files changed, 98 insertions(+), 73 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java index 6801e1f4eb404..42a0a9c1066c1 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java @@ -506,34 +506,47 @@ public void testSearchesAgainstNonMatchingIndicesWithSkipUnavailableTrue() { // since cluster-a is skip_unavailable=true and at least one cluster has a matching indices, no error is thrown { - // TODO solve in follow-on PR which does skip_unavailable handling at execution time - // String q = Strings.format("FROM %s,cluster-a:nomatch,cluster-a:%s*", localIndex, remote1Index); - // try (EsqlQueryResponse resp = runQuery(q, requestIncludeMeta)) { - // assertThat(getValuesList(resp).size(), greaterThanOrEqualTo(1)); - // EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); - // assertThat(executionInfo.isCrossClusterSearch(), is(true)); - // assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); - // assertExpectedClustersForMissingIndicesTests(executionInfo, List.of( - // // local cluster is never marked as SKIPPED even when no matching indices - just marked as 0 shards searched - // new ExpectedCluster(REMOTE_CLUSTER_1, "nomatch", EsqlExecutionInfo.Cluster.Status.SKIPPED, 0), - // new ExpectedCluster(REMOTE_CLUSTER_1, "*", EsqlExecutionInfo.Cluster.Status.SUCCESSFUL, remote2NumShards) - // )); - // } - - // TODO: handle LIMIT 0 for this case in follow-on PR - // String limit0 = q + " | LIMIT 0"; - // try (EsqlQueryResponse resp = runQuery(limit0, requestIncludeMeta)) { - // assertThat(resp.columns().size(), greaterThanOrEqualTo(1)); - // assertThat(getValuesList(resp).size(), greaterThanOrEqualTo(0)); - // EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); - // assertThat(executionInfo.isCrossClusterSearch(), is(true)); - // assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); - // assertExpectedClustersForMissingIndicesTests(executionInfo, List.of( - // // local cluster is never marked as SKIPPED even when no matching indices - just marked as 0 shards searched - // new ExpectedCluster(LOCAL_CLUSTER, localIndex, EsqlExecutionInfo.Cluster.Status.SUCCESSFUL, 0), - // new ExpectedCluster(REMOTE_CLUSTER_1, "nomatch," + remote1Index + "*", EsqlExecutionInfo.Cluster.Status.SKIPPED, 0) - // )); - // } + String q = Strings.format("FROM %s,cluster-a:nomatch,cluster-a:%s*", localIndex, remote1Index); + try (EsqlQueryResponse resp = runQuery(q, requestIncludeMeta)) { + assertThat(getValuesList(resp).size(), greaterThanOrEqualTo(1)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertExpectedClustersForMissingIndicesTests( + executionInfo, + List.of( + new ExpectedCluster(LOCAL_CLUSTER, localIndex, EsqlExecutionInfo.Cluster.Status.SUCCESSFUL, localNumShards), + new ExpectedCluster( + REMOTE_CLUSTER_1, + "nomatch," + remote1Index + "*", + EsqlExecutionInfo.Cluster.Status.PARTIAL, + 0 + ) + ) + ); + } + + String limit0 = q + " | LIMIT 0"; + try (EsqlQueryResponse resp = runQuery(limit0, requestIncludeMeta)) { + assertThat(resp.columns().size(), greaterThanOrEqualTo(1)); + assertThat(getValuesList(resp).size(), greaterThanOrEqualTo(0)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertExpectedClustersForMissingIndicesTests( + executionInfo, + List.of( + new ExpectedCluster(LOCAL_CLUSTER, localIndex, EsqlExecutionInfo.Cluster.Status.SUCCESSFUL, 0), + new ExpectedCluster( + REMOTE_CLUSTER_1, + "nomatch," + remote1Index + "*", + // TODO: this probably should be PARTIAL instead + EsqlExecutionInfo.Cluster.Status.SUCCESSFUL, + 0 + ) + ) + ); + } } // tests with three clusters --- @@ -844,12 +857,15 @@ public void assertExpectedClustersForMissingIndicesTests(EsqlExecutionInfo execu long overallTookMillis = executionInfo.overallTook().millis(); assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); - Set expectedClusterAliases = expected.stream().map(c -> c.clusterAlias()).collect(Collectors.toSet()); + Set expectedClusterAliases = expected.stream().map(ExpectedCluster::clusterAlias).collect(Collectors.toSet()); assertThat(executionInfo.clusterAliases(), equalTo(expectedClusterAliases)); for (ExpectedCluster expectedCluster : expected) { EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(expectedCluster.clusterAlias()); String msg = cluster.getClusterAlias(); + if (msg.equals(LOCAL_CLUSTER)) { + msg = "(local)"; + } assertThat(msg, cluster.getIndexExpression(), equalTo(expectedCluster.indexExpression())); assertThat(msg, cluster.getStatus(), equalTo(expectedCluster.status())); assertThat(msg, cluster.getTook().millis(), greaterThanOrEqualTo(0L)); @@ -865,6 +881,10 @@ public void assertExpectedClustersForMissingIndicesTests(EsqlExecutionInfo execu assertThat(msg, cluster.getFailures().get(0).getCause(), instanceOf(VerificationException.class)); String expectedMsg = "Unknown index [" + expectedCluster.indexExpression() + "]"; assertThat(msg, cluster.getFailures().get(0).getCause().getMessage(), containsString(expectedMsg)); + } else if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.PARTIAL) { + assertThat(msg, cluster.getSuccessfulShards(), equalTo(0)); + assertThat(msg, cluster.getSkippedShards(), equalTo(expectedCluster.totalShards())); + assertThat(msg, cluster.getFailures().size(), equalTo(1)); } // currently failed shards is always zero - change this once we start allowing partial data for individual shard failures assertThat(msg, cluster.getFailedShards(), equalTo(0)); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java index ee1049fff3ad2..86a212dbfbdcf 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.esql.plugin; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.compute.operator.FailureCollector; @@ -29,6 +28,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.emptyClusterState; + /** * A variant of {@link RefCountingListener} with the following differences: * 1. Automatically cancels sub tasks on failure. @@ -189,6 +190,9 @@ ActionListener acquireAvoid() { }); } + /** + * Should we ignore a failure from the remote cluster due to skip_unavailable=true setting? + */ boolean shouldIgnoreRemoteErrors(@Nullable String computeClusterAlias) { return computeClusterAlias != null && esqlExecutionInfo.isCrossClusterSearch() @@ -197,27 +201,31 @@ && runningOnRemoteCluster() == false && esqlExecutionInfo.isSkipUnavailable(computeClusterAlias); } + /** + * Marks the cluster as PARTIAL and adds the exception to the cluster's failures record. + * Currently, additional failures are not recorded, TODO: check if this should be the case. + */ void markAsPartial(String computeClusterAlias, Exception e) { // We use PARTIAL here because we can not know whether the cluster have already sent any data. esqlExecutionInfo.swapCluster(computeClusterAlias, (k, v) -> { assert v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED : "We shouldn't be running compute on a cluster that's already marked as skipped"; + LOGGER.debug("Marking failed cluster {} as partial: {}", computeClusterAlias, e); if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.PARTIAL) { return v; } - return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.PARTIAL) - .setFailures(List.of(new ShardSearchFailure(e))) - .build(); + return emptyClusterState(v, EsqlExecutionInfo.Cluster.Status.PARTIAL, e); }); } + /** + * Acquire a listener that respects skip_unavailable setting for this cluster. + */ ActionListener acquireSkipUnavailable(@Nullable String computeClusterAlias) { if (shouldIgnoreRemoteErrors(computeClusterAlias) == false) { return acquireAvoid(); } return refs.acquire().delegateResponse((l, e) -> { - // TODO: drop this in final patch - LOGGER.error("Marking failed cluster {} as partial: {}", computeClusterAlias, e); markAsPartial(computeClusterAlias, e); l.onResponse(null); }); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index cdb63994b1694..ef5e735240d09 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -408,8 +408,6 @@ private void startComputeOnRemoteClusters( ); }, e -> { if (suppressRemoteFailure) { - // TODO: drop this in final patch - LOGGER.error("Marking failed cluster {} as partial: {}", clusterAlias, e); computeListener.markAsPartial(clusterAlias, e); exchangeListener.onResponse(null); } else { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java index 4fe2fef7e3f45..1cf1ff8a65f2a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java @@ -13,12 +13,14 @@ import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; +import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo.Cluster; import org.elasticsearch.xpack.esql.analysis.Analyzer; import org.elasticsearch.xpack.esql.index.IndexResolution; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; @@ -30,7 +32,7 @@ import java.util.Map; import java.util.Set; -class EsqlSessionCCSUtils { +public class EsqlSessionCCSUtils { private EsqlSessionCCSUtils() {} @@ -112,16 +114,16 @@ static void updateExecutionInfoToReturnEmptyResult(EsqlExecutionInfo executionIn } for (String clusterAlias : executionInfo.clusterAliases()) { executionInfo.swapCluster(clusterAlias, (k, v) -> { - EsqlExecutionInfo.Cluster.Builder builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(executionInfo.overallTook()) + Cluster.Builder builder = new Cluster.Builder(v).setTook(executionInfo.overallTook()) .setTotalShards(0) .setSuccessfulShards(0) .setSkippedShards(0) .setFailedShards(0); if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { // never mark local cluster as skipped - builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL); + builder.setStatus(Cluster.Status.SUCCESSFUL); } else { - builder.setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED); + builder.setStatus(Cluster.Status.SKIPPED); // add this exception to the failures list only if there is no failure already recorded there if (v.getFailures() == null || v.getFailures().size() == 0) { builder.setFailures(List.of(new ShardSearchFailure(exceptionForResponse))); @@ -135,8 +137,8 @@ static void updateExecutionInfoToReturnEmptyResult(EsqlExecutionInfo executionIn static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo) { StringBuilder sb = new StringBuilder(); for (String clusterAlias : executionInfo.clusterAliases()) { - EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias); - if (cluster.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) { + Cluster cluster = executionInfo.getCluster(clusterAlias); + if (cluster.getStatus() != Cluster.Status.SKIPPED) { if (cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { sb.append(executionInfo.getCluster(clusterAlias).getIndexExpression()).append(','); } else { @@ -164,16 +166,7 @@ static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInf entry.getValue().getException() ); if (skipUnavailable) { - execInfo.swapCluster( - clusterAlias, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED) - .setTotalShards(0) - .setSuccessfulShards(0) - .setSkippedShards(0) - .setFailedShards(0) - .setFailures(List.of(new ShardSearchFailure(e))) - .build() - ); + execInfo.swapCluster(clusterAlias, (k, v) -> emptyClusterState(v, Cluster.Status.SKIPPED, e)); } else { throw e; } @@ -217,27 +210,17 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn } } else { // handles local cluster (when no concrete indices requested) and skip_unavailable=true clusters - EsqlExecutionInfo.Cluster.Status status; + Cluster.Status status; ShardSearchFailure failure; + Exception failureException; if (c.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { - status = EsqlExecutionInfo.Cluster.Status.SUCCESSFUL; - failure = null; + status = Cluster.Status.SUCCESSFUL; + failureException = null; } else { - status = EsqlExecutionInfo.Cluster.Status.SKIPPED; - failure = new ShardSearchFailure(new VerificationException("Unknown index [" + indexExpression + "]")); + status = Cluster.Status.SKIPPED; + failureException = new VerificationException("Unknown index [" + indexExpression + "]"); } - executionInfo.swapCluster(c, (k, v) -> { - var builder = new EsqlExecutionInfo.Cluster.Builder(v).setStatus(status) - .setTook(new TimeValue(0)) - .setTotalShards(0) - .setSuccessfulShards(0) - .setSkippedShards(0) - .setFailedShards(0); - if (failure != null) { - builder.setFailures(List.of(failure)); - } - return builder.build(); - }); + executionInfo.swapCluster(c, (k, v) -> emptyClusterState(v, status, failureException)); } } if (fatalErrorMessage != null) { @@ -245,6 +228,22 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn } } + /** + * Create an empty cluster state with the given status and potentially failure from exception. + */ + public static Cluster emptyClusterState(Cluster prev, Cluster.Status status, @Nullable Exception ex) { + var builder = new Cluster.Builder(prev).setStatus(status) + .setTook(new TimeValue(0)) + .setTotalShards(0) + .setSuccessfulShards(0) + .setSkippedShards(0) + .setFailedShards(0); + if (ex != null) { + builder.setFailures(List.of(new ShardSearchFailure(ex))); + } + return builder.build(); + } + // visible for testing static boolean missingIndicesIsFatal(String clusterAlias, EsqlExecutionInfo executionInfo) { // missing indices on local cluster is fatal only if a concrete index requested @@ -273,11 +272,11 @@ static void updateExecutionInfoAtEndOfPlanning(EsqlExecutionInfo execInfo) { if (execInfo.isCrossClusterSearch()) { execInfo.markEndPlanning(); for (String clusterAlias : execInfo.clusterAliases()) { - EsqlExecutionInfo.Cluster cluster = execInfo.getCluster(clusterAlias); - if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) { + Cluster cluster = execInfo.getCluster(clusterAlias); + if (cluster.getStatus() == Cluster.Status.SKIPPED) { execInfo.swapCluster( clusterAlias, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.planningTookTime()) + (k, v) -> new Cluster.Builder(v).setTook(execInfo.planningTookTime()) .setTotalShards(0) .setSuccessfulShards(0) .setSkippedShards(0) From a61ed17e1555cd61ec1745ba709d676bb6db6d06 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Thu, 14 Nov 2024 14:07:47 -0700 Subject: [PATCH 08/24] More runtime missing index tests --- .../xpack/esql/action/EsqlExecutionInfo.java | 11 +++ .../xpack/esql/plugin/ComputeListener.java | 16 ++-- .../esql/session/EsqlSessionCCSUtils.java | 36 +++++---- .../session/EsqlSessionCCSUtilsTests.java | 38 ++++++++- .../CrossClusterEsqlRCS1MissingIndicesIT.java | 77 ++++++++++--------- 5 files changed, 118 insertions(+), 60 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index 80bb2afe57122..9c37ce23ab2dd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -160,6 +160,17 @@ public void markEndQuery() { overallTook = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS); } + /** + * How much time the query took since starting. + */ + public TimeValue tookSoFar() { + if (relativeStartNanos == null) { + return new TimeValue(0); + } else { + return new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS); + } + } + // for testing only - use markEndQuery in production code void overallTook(TimeValue took) { this.overallTook = took; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java index 86a212dbfbdcf..8c568e437a9fd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java @@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.emptyClusterState; +import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.markClusterEmptyInfo; /** * A variant of {@link RefCountingListener} with the following differences: @@ -206,16 +206,14 @@ && runningOnRemoteCluster() == false * Currently, additional failures are not recorded, TODO: check if this should be the case. */ void markAsPartial(String computeClusterAlias, Exception e) { + var status = esqlExecutionInfo.getCluster(computeClusterAlias).getStatus(); + assert status != EsqlExecutionInfo.Cluster.Status.SKIPPED + : "We shouldn't be running compute on a cluster that's already marked as skipped"; // We use PARTIAL here because we can not know whether the cluster have already sent any data. - esqlExecutionInfo.swapCluster(computeClusterAlias, (k, v) -> { - assert v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED - : "We shouldn't be running compute on a cluster that's already marked as skipped"; + if (status != EsqlExecutionInfo.Cluster.Status.PARTIAL) { LOGGER.debug("Marking failed cluster {} as partial: {}", computeClusterAlias, e); - if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.PARTIAL) { - return v; - } - return emptyClusterState(v, EsqlExecutionInfo.Cluster.Status.PARTIAL, e); - }); + markClusterEmptyInfo(esqlExecutionInfo, computeClusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL, e); + } } /** diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java index 1cf1ff8a65f2a..da967e1303c31 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java @@ -166,7 +166,7 @@ static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInf entry.getValue().getException() ); if (skipUnavailable) { - execInfo.swapCluster(clusterAlias, (k, v) -> emptyClusterState(v, Cluster.Status.SKIPPED, e)); + markClusterEmptyInfo(execInfo, clusterAlias, Cluster.Status.SKIPPED, e); } else { throw e; } @@ -211,7 +211,6 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn } else { // handles local cluster (when no concrete indices requested) and skip_unavailable=true clusters Cluster.Status status; - ShardSearchFailure failure; Exception failureException; if (c.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { status = Cluster.Status.SUCCESSFUL; @@ -220,7 +219,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn status = Cluster.Status.SKIPPED; failureException = new VerificationException("Unknown index [" + indexExpression + "]"); } - executionInfo.swapCluster(c, (k, v) -> emptyClusterState(v, status, failureException)); + markClusterEmptyInfo(executionInfo, c, status, failureException); } } if (fatalErrorMessage != null) { @@ -229,19 +228,26 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn } /** - * Create an empty cluster state with the given status and potentially failure from exception. + * Mark cluster with an empty cluster state with the given status and potentially failure from exception. */ - public static Cluster emptyClusterState(Cluster prev, Cluster.Status status, @Nullable Exception ex) { - var builder = new Cluster.Builder(prev).setStatus(status) - .setTook(new TimeValue(0)) - .setTotalShards(0) - .setSuccessfulShards(0) - .setSkippedShards(0) - .setFailedShards(0); - if (ex != null) { - builder.setFailures(List.of(new ShardSearchFailure(ex))); - } - return builder.build(); + public static void markClusterEmptyInfo( + EsqlExecutionInfo executionInfo, + String clusterAlias, + Cluster.Status status, + @Nullable Exception ex + ) { + executionInfo.swapCluster(clusterAlias, (k, v) -> { + Cluster.Builder builder = new Cluster.Builder(v).setStatus(status) + .setTook(executionInfo.tookSoFar()) + .setTotalShards(0) + .setSuccessfulShards(0) + .setSkippedShards(0) + .setFailedShards(0); + if (ex != null) { + builder.setFailures(List.of(new ShardSearchFailure(ex))); + } + return builder.build(); + }); } // visible for testing diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java index 60b632c443f8e..71022ab042f57 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java @@ -156,6 +156,35 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.SKIPPED); } + // skip_unavailable=true clusters are unavailable, both marked as PARTIAL + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); + executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); + executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); + executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", true)); + + Exception ex = new NoSeedNodeLeftException("unable to connect"); + var unvailableClusters = Map.of(remote1Alias, ex, remote2Alias, ex); + unvailableClusters.forEach( + (c, f) -> EsqlSessionCCSUtils.markClusterEmptyInfo(executionInfo, c, EsqlExecutionInfo.Cluster.Status.PARTIAL, ex) + ); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(localClusterAlias, remote1Alias, remote2Alias))); + assertNull(executionInfo.overallTook()); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); + assertThat(localCluster.getIndexExpression(), equalTo("logs*")); + assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); + assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); + assertClusterStatusAndShardCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.PARTIAL); + + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); + assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); + assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.PARTIAL); + } + // skip_unavailable=false cluster is unavailable, throws Exception { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); @@ -454,13 +483,20 @@ public void testUpdateExecutionInfoAtEndOfPlanning() { private void assertClusterStatusAndShardCounts(EsqlExecutionInfo.Cluster cluster, EsqlExecutionInfo.Cluster.Status status) { assertThat(cluster.getStatus(), equalTo(status)); - assertNull(cluster.getTook()); if (status == EsqlExecutionInfo.Cluster.Status.RUNNING) { + assertNull(cluster.getTook()); assertNull(cluster.getTotalShards()); assertNull(cluster.getSuccessfulShards()); assertNull(cluster.getSkippedShards()); assertNull(cluster.getFailedShards()); } else if (status == EsqlExecutionInfo.Cluster.Status.SKIPPED) { + assertNotNull(cluster.getTook()); + assertThat(cluster.getTotalShards(), equalTo(0)); + assertThat(cluster.getSuccessfulShards(), equalTo(0)); + assertThat(cluster.getSkippedShards(), equalTo(0)); + assertThat(cluster.getFailedShards(), equalTo(0)); + } else if (status == EsqlExecutionInfo.Cluster.Status.PARTIAL) { + assertNotNull(cluster.getTook()); assertThat(cluster.getTotalShards(), equalTo(0)); assertThat(cluster.getSuccessfulShards(), equalTo(0)); assertThat(cluster.getSkippedShards(), equalTo(0)); diff --git a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS1MissingIndicesIT.java b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS1MissingIndicesIT.java index 0f39104511be0..c48f08632e03b 100644 --- a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS1MissingIndicesIT.java +++ b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS1MissingIndicesIT.java @@ -92,7 +92,6 @@ public void assertExpectedClustersForMissingIndicesTests(Map res Map clusterDetails = (Map) detailsMap.get(expectedCluster.clusterAlias()); String msg = expectedCluster.clusterAlias(); - assertThat(msg, (int) clusterDetails.get("took"), greaterThan(0)); assertThat(msg, clusterDetails.get("status"), is(expectedCluster.status())); Map shards = (Map) clusterDetails.get("_shards"); if (expectedCluster.totalShards() == null) { @@ -102,6 +101,7 @@ public void assertExpectedClustersForMissingIndicesTests(Map res } if (expectedCluster.status().equals("successful")) { + assertThat(msg, (int) clusterDetails.get("took"), greaterThan(0)); assertThat((int) shards.get("successful"), is((int) shards.get("total"))); assertThat((int) shards.get("skipped"), is(0)); @@ -116,6 +116,11 @@ public void assertExpectedClustersForMissingIndicesTests(Map res assertThat(innerReason.get("reason").toString(), containsString(expectedMsg)); assertThat(innerReason.get("type").toString(), containsString("verification_exception")); + } else if (expectedCluster.status().equals("partial")) { + assertThat((int) shards.get("successful"), is(0)); + assertThat((int) shards.get("skipped"), is(0)); + ArrayList failures = (ArrayList) clusterDetails.get("failures"); + assertThat(failures.size(), is(1)); } else { fail(msg + "; Unexpected status: " + expectedCluster.status()); } @@ -302,40 +307,42 @@ public void testSearchesAgainstNonMatchingIndicesWithSkipUnavailableTrue() throw assertThat(e.getMessage(), containsString(Strings.format("%s:%s", REMOTE_CLUSTER_ALIAS, remoteExpr))); } - // TODO uncomment and test in follow-on PR which does skip_unavailable handling at execution time - // { - // String q = Strings.format("FROM %s,%s:nomatch,%s:%s*", INDEX1, REMOTE_CLUSTER_ALIAS, REMOTE_CLUSTER_ALIAS, INDEX2); - // - // String limit1 = q + " | LIMIT 1"; - // Response response = client().performRequest(esqlRequest(limit1)); - // assertOK(response); - // - // Map map = responseAsMap(response); - // assertThat(((ArrayList) map.get("columns")).size(), greaterThanOrEqualTo(1)); - // assertThat(((ArrayList) map.get("values")).size(), greaterThanOrEqualTo(1)); - // - // assertExpectedClustersForMissingIndicesTests(map, - // List.of( - // new ExpectedCluster("(local)", INDEX1, "successful", null), - // new ExpectedCluster(REMOTE_CLUSTER_ALIAS, "nomatch," + INDEX2 + "*", "skipped", 0) - // ) - // ); - // - // String limit0 = q + " | LIMIT 0"; - // response = client().performRequest(esqlRequest(limit0)); - // assertOK(response); - // - // map = responseAsMap(response); - // assertThat(((ArrayList) map.get("columns")).size(), greaterThanOrEqualTo(1)); - // assertThat(((ArrayList) map.get("values")).size(), is(0)); - // - // assertExpectedClustersForMissingIndicesTests(map, - // List.of( - // new ExpectedCluster("(local)", INDEX1, "successful", 0), - // new ExpectedCluster(REMOTE_CLUSTER_ALIAS, "nomatch," + INDEX2 + "*", "skipped", 0) - // ) - // ); - // } + { + String q = Strings.format("FROM %s,%s:nomatch,%s:%s*", INDEX1, REMOTE_CLUSTER_ALIAS, REMOTE_CLUSTER_ALIAS, INDEX2); + + String limit1 = q + " | LIMIT 1"; + Response response = client().performRequest(esqlRequest(limit1)); + assertOK(response); + + Map map = responseAsMap(response); + assertThat(((ArrayList) map.get("columns")).size(), greaterThanOrEqualTo(1)); + assertThat(((ArrayList) map.get("values")).size(), greaterThanOrEqualTo(1)); + + assertExpectedClustersForMissingIndicesTests( + map, + List.of( + new ExpectedCluster("(local)", INDEX1, "successful", 1), + new ExpectedCluster(REMOTE_CLUSTER_ALIAS, "nomatch," + INDEX2 + "*", "partial", 0) + ) + ); + + String limit0 = q + " | LIMIT 0"; + response = client().performRequest(esqlRequest(limit0)); + assertOK(response); + + map = responseAsMap(response); + assertThat(((ArrayList) map.get("columns")).size(), greaterThanOrEqualTo(1)); + assertThat(((ArrayList) map.get("values")).size(), is(0)); + + assertExpectedClustersForMissingIndicesTests( + map, + List.of( + new ExpectedCluster("(local)", INDEX1, "successful", 0), + // TODO: this should actually be partial, but LIMIT 0 is not processed properly yet + new ExpectedCluster(REMOTE_CLUSTER_ALIAS, "nomatch," + INDEX2 + "*", "successful", 0) + ) + ); + } } @SuppressWarnings("unchecked") From 9a81820df395315eaaa229e870387ab50f533464 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Thu, 14 Nov 2024 17:19:59 -0700 Subject: [PATCH 09/24] Add cancellation/shutdown tests --- docs/changelog/116365.yaml | 2 +- .../operator/exchange/ExchangeService.java | 6 +- .../action/CrossClustersCancellationIT.java | 128 ++++++++++++++++++ .../esql/session/EsqlSessionCCSUtils.java | 1 - 4 files changed, 132 insertions(+), 5 deletions(-) diff --git a/docs/changelog/116365.yaml b/docs/changelog/116365.yaml index b799be4e86f30..9903587bf4cee 100644 --- a/docs/changelog/116365.yaml +++ b/docs/changelog/116365.yaml @@ -2,4 +2,4 @@ pr: 116365 summary: Ignore remote ES|QL execution failures when skip_unavailable=true area: ES|QL type: enhancement -issues: [] +issues: [ 112886 ] diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java index 42808d0564a1c..0b7bd969c4efe 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java @@ -285,7 +285,7 @@ public RemoteSink newRemoteSink( String exchangeId, TransportService transportService, Transport.Connection conn, - Consumer failureCollector + @Nullable Consumer failureCollector ) { return new TransportRemoteSink(transportService, blockFactory, conn, parentTask, exchangeId, executor, failureCollector); } @@ -297,7 +297,7 @@ static final class TransportRemoteSink implements RemoteSink { final Task parentTask; final String exchangeId; final Executor responseExecutor; - final Consumer failureCollector; + final @Nullable Consumer failureCollector; final AtomicLong estimatedPageSizeInBytes = new AtomicLong(0L); @@ -308,7 +308,7 @@ static final class TransportRemoteSink implements RemoteSink { Task parentTask, String exchangeId, Executor responseExecutor, - Consumer failureCollector + @Nullable Consumer failureCollector ) { this.transportService = transportService; this.blockFactory = blockFactory; diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java index df6a1e00b0212..0173ccab506a4 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.action; +import org.elasticsearch.Build; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction; import org.elasticsearch.action.bulk.BulkRequestBuilder; @@ -25,10 +26,12 @@ import org.elasticsearch.script.ScriptContext; import org.elasticsearch.script.ScriptEngine; import org.elasticsearch.search.lookup.SearchLookup; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.AbstractMultiClustersTestCase; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.esql.plugin.ComputeService; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.junit.Before; @@ -40,10 +43,13 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; import static org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase.randomPragmas; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; public class CrossClustersCancellationIT extends AbstractMultiClustersTestCase { private static final String REMOTE_CLUSTER = "cluster-a"; @@ -62,6 +68,11 @@ protected Collection> nodePlugins(String clusterAlias) { return plugins; } + @Override + protected Map skipUnavailableForRemoteClusters() { + return Map.of("cluster-a", true, "cluster-b", false); + } + public static class InternalExchangePlugin extends Plugin { @Override public List> getSettings() { @@ -157,6 +168,26 @@ private void createRemoteIndex(int numDocs) throws Exception { bulk.get(); } + private void createLocalIndex(int numDocs) throws Exception { + XContentBuilder mapping = JsonXContent.contentBuilder().startObject(); + mapping.startObject("runtime"); + { + mapping.startObject("const"); + { + mapping.field("type", "long"); + } + mapping.endObject(); + } + mapping.endObject(); + mapping.endObject(); + client(LOCAL_CLUSTER).admin().indices().prepareCreate("test").setMapping(mapping).get(); + BulkRequestBuilder bulk = client(LOCAL_CLUSTER).prepareBulk("test").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int i = 0; i < numDocs; i++) { + bulk.add(new IndexRequest().source("const", i)); + } + bulk.get(); + } + public void testCancel() throws Exception { createRemoteIndex(between(10, 100)); EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); @@ -189,4 +220,101 @@ public void testCancel() throws Exception { Exception error = expectThrows(Exception.class, requestFuture::actionGet); assertThat(error.getMessage(), containsString("proxy timeout")); } + + // Check that cancelling remote task with skip_unavailable=true produces partial + public void testCancelSkipUnavailable() throws Exception { + createRemoteIndex(between(10, 100)); + EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); + request.query("FROM *:test | STATS total=sum(const) | LIMIT 1"); + request.pragmas(randomPragmas()); + request.includeCCSMetadata(true); + PlainActionFuture requestFuture = new PlainActionFuture<>(); + client().execute(EsqlQueryAction.INSTANCE, request, requestFuture); + assertTrue(PauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS)); + List rootTasks = new ArrayList<>(); + assertBusy(() -> { + List tasks = client(REMOTE_CLUSTER).admin() + .cluster() + .prepareListTasks() + .setActions(ComputeService.CLUSTER_ACTION_NAME) + .get() + .getTasks(); + assertThat(tasks, hasSize(1)); + rootTasks.addAll(tasks); + }); + var cancelRequest = new CancelTasksRequest().setTargetTaskId(rootTasks.get(0).taskId()).setReason("remote failed"); + client(REMOTE_CLUSTER).execute(TransportCancelTasksAction.TYPE, cancelRequest); + assertBusy(() -> { + List drivers = client(REMOTE_CLUSTER).admin() + .cluster() + .prepareListTasks() + .setActions(DriverTaskRunner.ACTION_NAME) + .get() + .getTasks(); + assertThat(drivers.size(), greaterThanOrEqualTo(1)); + for (TaskInfo driver : drivers) { + assertTrue(driver.cancellable()); + } + }); + PauseFieldPlugin.allowEmitting.countDown(); + var resp = requestFuture.actionGet(); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + + assertNotNull(executionInfo); + EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(REMOTE_CLUSTER); + + assertThat(cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + assertThat(cluster.getFailures().size(), equalTo(1)); + assertThat(cluster.getFailures().get(0).getCause(), instanceOf(TaskCancelledException.class)); + } + + // Check that closing remote node with skip_unavailable=true produces partial + public void testCloseSkipUnavailable() throws Exception { + assumeTrue("Only snapshot builds have delay()", Build.current().isSnapshot()); + createRemoteIndex(between(1000, 5000)); + createLocalIndex(100); + EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); + request.query(""" + FROM test*,cluster-a:test* METADATA _index + | EVAL cluster=MV_FIRST(SPLIT(_index, ":")) + | WHERE CASE(cluster == "cluster-a", delay(1ms), true) + | STATS total = sum(const) | LIMIT 1 + """); + request.pragmas(randomPragmas()); + var requestFuture = client().execute(EsqlQueryAction.INSTANCE, request); + assertTrue(PauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS)); + assertBusy(() -> { + List drivers = client(REMOTE_CLUSTER).admin() + .cluster() + .prepareListTasks() + .setActions(DriverTaskRunner.ACTION_NAME) + .get() + .getTasks(); + assertThat(drivers.size(), greaterThanOrEqualTo(1)); + for (TaskInfo driver : drivers) { + assertTrue(driver.cancellable()); + } + }); + PauseFieldPlugin.allowEmitting.countDown(); + cluster(REMOTE_CLUSTER).close(); + try (var resp = requestFuture.actionGet()) { + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + + List> values = getValuesList(resp); + assertThat(values.get(0).size(), equalTo(1)); + // We can't be sure of the exact value here as we don't know if any data from remote came in, but all local data should be there + assertThat((long) values.get(0).get(0), greaterThanOrEqualTo(4950L)); + + EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(REMOTE_CLUSTER); + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(localCluster.getSuccessfulShards(), equalTo(1)); + + assertThat(cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + assertThat(cluster.getSuccessfulShards(), equalTo(0)); + assertThat(cluster.getFailures().size(), equalTo(1)); + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java index da967e1303c31..8bd3a273aa8f6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java @@ -14,7 +14,6 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.Nullable; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteTransportException; From 7c25b358bb0bd78144e4df42bf09dcb7b2357fe0 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Wed, 20 Nov 2024 13:29:42 -0700 Subject: [PATCH 10/24] Fix utils test --- .../xpack/esql/session/EsqlSessionCCSUtilsTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java index 71022ab042f57..daf9a894ccbeb 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java @@ -308,7 +308,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); - assertThat(remote1Cluster.getTook().millis(), equalTo(0L)); + assertThat(remote1Cluster.getTook().millis(), greaterThanOrEqualTo(0L)); assertThat(remote1Cluster.getTotalShards(), equalTo(0)); assertThat(remote1Cluster.getSuccessfulShards(), equalTo(0)); assertThat(remote1Cluster.getSkippedShards(), equalTo(0)); @@ -350,7 +350,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); - assertThat(remote2Cluster.getTook().millis(), equalTo(0L)); + assertThat(remote2Cluster.getTook().millis(), greaterThanOrEqualTo(0L)); assertThat(remote2Cluster.getTotalShards(), equalTo(0)); assertThat(remote2Cluster.getSuccessfulShards(), equalTo(0)); assertThat(remote2Cluster.getSkippedShards(), equalTo(0)); From 14b6a4cd270156316e77dba91ed5b368c48c1f10 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Tue, 26 Nov 2024 13:22:51 -0700 Subject: [PATCH 11/24] Update for new exchange code --- .../operator/exchange/ExchangeService.java | 23 +---- .../xpack/esql/plugin/ComputeService.java | 88 ++++++++++++++++--- .../RemoteClusterSecurityEsqlIT.java | 17 ++-- 3 files changed, 88 insertions(+), 40 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java index f8130745738a6..d633270b5c595 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java @@ -43,7 +43,6 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; /** * {@link ExchangeService} is responsible for exchanging pages between exchange sinks and sources on the same or different nodes. @@ -279,16 +278,9 @@ protected void doRun() { * @param exchangeId the exchange ID * @param transportService the transport service * @param conn the connection to the remote node where the remote exchange sink is located - * @param failureCollector if not null, the failures will be sent to this consumer and the sink will be marked as finished */ - public RemoteSink newRemoteSink( - Task parentTask, - String exchangeId, - TransportService transportService, - Transport.Connection conn, - @Nullable Consumer failureCollector - ) { - return new TransportRemoteSink(transportService, blockFactory, conn, parentTask, exchangeId, executor, failureCollector); + public RemoteSink newRemoteSink(Task parentTask, String exchangeId, TransportService transportService, Transport.Connection conn) { + return new TransportRemoteSink(transportService, blockFactory, conn, parentTask, exchangeId, executor); } static final class TransportRemoteSink implements RemoteSink { @@ -298,7 +290,6 @@ static final class TransportRemoteSink implements RemoteSink { final Task parentTask; final String exchangeId; final Executor responseExecutor; - final @Nullable Consumer failureCollector; final AtomicLong estimatedPageSizeInBytes = new AtomicLong(0L); @@ -308,8 +299,7 @@ static final class TransportRemoteSink implements RemoteSink { Transport.Connection connection, Task parentTask, String exchangeId, - Executor responseExecutor, - @Nullable Consumer failureCollector + Executor responseExecutor ) { this.transportService = transportService; this.blockFactory = blockFactory; @@ -317,7 +307,6 @@ static final class TransportRemoteSink implements RemoteSink { this.parentTask = parentTask; this.exchangeId = exchangeId; this.responseExecutor = responseExecutor; - this.failureCollector = failureCollector; } @Override @@ -328,12 +317,6 @@ public void fetchPageAsync(boolean allSourcesFinished, ActionListener blockFactory.breaker().addWithoutBreaking(-reservedBytes)); } - if (failureCollector != null) { - listener = listener.delegateResponse((l, ex) -> { - failureCollector.accept(ex); - l.onResponse(new ExchangeResponse(blockFactory, null, true)); - }); - } transportService.sendChildRequest( connection, EXCHANGE_ACTION_NAME, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 4f4779d6465b2..84e87f2c04c39 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -21,6 +21,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.Driver; @@ -48,11 +49,14 @@ import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancelledException; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; @@ -104,6 +108,7 @@ public class ComputeService { private final LookupFromIndexService lookupFromIndexService; private final ClusterService clusterService; private final AtomicLong childSessionIdGenerator = new AtomicLong(); + private final TaskManager taskManager; public ComputeService( SearchService searchService, @@ -133,6 +138,7 @@ public ComputeService( this.enrichLookupService = enrichLookupService; this.lookupFromIndexService = lookupFromIndexService; this.clusterService = clusterService; + this.taskManager = transportService.getTaskManager(); } public void execute( @@ -383,6 +389,8 @@ private void startComputeOnRemoteClusters( var linkExchangeListeners = ActionListener.releaseAfter(computeListener.acquireAvoid(), exchangeSource.addEmptySink()); try (RefCountingListener refs = new RefCountingListener(linkExchangeListeners)) { for (RemoteCluster cluster : clusters) { + final String clusterAlias = cluster.clusterAlias(); + final boolean suppressRemoteFailure = computeListener.shouldIgnoreRemoteErrors(clusterAlias); final var childSessionId = newChildSession(sessionId); ExchangeService.openExchange( transportService, @@ -391,29 +399,53 @@ private void startComputeOnRemoteClusters( queryPragmas.exchangeBufferSize(), esqlExecutor, refs.acquire().delegateFailureAndWrap((l, unused) -> { - var remoteSink = exchangeService.newRemoteSink(rootTask, childSessionId, transportService, cluster.connection); - exchangeSource.addRemoteSink(remoteSink, true, queryPragmas.concurrentExchangeClients(), ActionListener.noop()); var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices); var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, childSessionId, configuration, remotePlan); - var clusterListener = ActionListener.runBefore( - computeListener.acquireCompute(clusterAlias), - () -> exchangeListener.onResponse(null) + + // Create group task for this cluster + CancellableTask groupTask = createGroupTask(rootTask, clusterRequest.getDescription()); + CountDown countDown = new CountDown(2); + // The group is done when both the sink and the cluster request are done + Runnable finishGroup = () -> { + if (countDown.countDown()) { + taskManager.unregister(groupTask); + l.onResponse(null); + } + }; + // Cancel the group on sink failure + ActionListener exchangeListener = computeListener.acquireAvoid().delegateResponse((inner, e) -> { + taskManager.cancelTaskAndDescendants(groupTask, "exchange sink failure", true, ActionListener.noop()); + if (suppressRemoteFailure) { + computeListener.markAsPartial(clusterAlias, e); + inner.onResponse(null); + } else { + inner.onFailure(e); + } + }); + var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, cluster.connection); + exchangeSource.addRemoteSink( + remoteSink, + suppressRemoteFailure == false, + queryPragmas.concurrentExchangeClients(), + ActionListener.runAfter(exchangeListener, finishGroup) ); + // Cancel the group on cluster request failure + var clusterRequestListener = computeListener.acquireCompute(clusterAlias).delegateResponse((inner, e) -> { + taskManager.cancelTaskAndDescendants(groupTask, "exchange cluster action failure", true, ActionListener.noop()); + inner.onFailure(e); + }); transportService.sendChildRequest( cluster.connection, CLUSTER_ACTION_NAME, clusterRequest, - rootTask, + groupTask, TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor) + new ActionListenerResponseHandler<>( + ActionListener.runAfter(clusterRequestListener, finishGroup), + ComputeResponse::new, + esqlExecutor + ) ); - }, e -> { - if (suppressRemoteFailure) { - computeListener.markAsPartial(clusterAlias, e); - exchangeListener.onResponse(null); - } else { - exchangeListener.onFailure(e); - } }) ); } @@ -937,4 +969,32 @@ public List searchExecutionContexts() { private String newChildSession(String session) { return session + "/" + childSessionIdGenerator.incrementAndGet(); } + + private CancellableTask createGroupTask(Task parentTask, String description) { + return (CancellableTask) taskManager.register( + "transport", + "esql_compute_group", + new ComputeGroupTaskRequest(parentTask.taskInfo(transportService.getLocalNode().getId(), false).taskId(), description) + ); + } + + private static class ComputeGroupTaskRequest extends TransportRequest { + private final String parentDescription; + + ComputeGroupTaskRequest(TaskId parentTask, String description) { + this.parentDescription = description; + setParentTask(parentTask); + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + assert parentTaskId.isSet(); + return new CancellableTask(id, type, action, "", parentTaskId, headers); + } + + @Override + public String getDescription() { + return "group [" + parentDescription + "]"; + } + } } diff --git a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java index 09449f81121fd..4ef57dc0e4bc9 100644 --- a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java +++ b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java @@ -996,7 +996,6 @@ public void testSearchesAgainstNonMatchingIndicesWithSkipUnavailableTrue() throw } String remoteSearchUserAPIKey = createRemoteSearchUserAPIKey(); - // sanity check - init queries to ensure we can query employees on local and employees,employees2 on remote { Request request = esqlRequest(""" @@ -1194,15 +1193,15 @@ public void accept(Response response, Boolean limit0) throws Exception { } final List expectedClusters = List.of( new ExpectedCluster("(local)", "employees", "successful", limit0 ? 0 : null), - new ExpectedCluster(REMOTE_CLUSTER_ALIAS, "employees_nomatch,employees*", "successful", 0) + // TODO: LIMIT0 query should be partial too, but current implementation does not mark it as such + new ExpectedCluster(REMOTE_CLUSTER_ALIAS, "employees_nomatch,employees*", limit0 ? "successful" : "partial", 0) ); assertExpectedClustersForMissingIndicesTests(map, expectedClusters); }; - // TODO: uncomment in follow on PR handling skip_unavailable errors at execution time - // Request limit1 = esqlRequest(q + " | LIMIT 1"); - // verifier.accept(performRequestWithRemoteSearchUser(limit1), false); - // verifier.accept(performRequestWithRemoteSearchUserViaAPIKey(limit1, remoteSearchUserAPIKey), false); + Request limit1 = esqlRequest(q + " | LIMIT 1"); + verifier.accept(performRequestWithRemoteSearchUser(limit1), false); + verifier.accept(performRequestWithRemoteSearchUserViaAPIKey(limit1, remoteSearchUserAPIKey), false); Request limit0 = esqlRequest(q + " | LIMIT 0"); verifier.accept(performRequestWithRemoteSearchUser(limit0), true); @@ -1653,6 +1652,12 @@ void assertExpectedClustersForMissingIndicesTests(Map responseMa assertThat(innerReason.get("reason").toString(), containsString(expectedMsg)); assertThat(innerReason.get("type").toString(), containsString("verification_exception")); + } else if (expectedCluster.status().equals("partial")) { + assertThat((int) shards.get("successful"), is(0)); + assertThat((int) shards.get("skipped"), is(0)); + ArrayList failures = (ArrayList) clusterDetails.get("failures"); + assertThat(failures.size(), is(1)); + } else { fail(msg + "; Unexpected status: " + expectedCluster.status()); } From 82542f84f46b1606028530cd903026654b5e5c36 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Tue, 26 Nov 2024 14:30:21 -0700 Subject: [PATCH 12/24] Fix build --- .../xpack/esql/action/CrossClustersCancellationIT.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java index fdee779cf3001..e053807539380 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.action; +import org.elasticsearch.Build; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction; @@ -56,6 +57,11 @@ public class CrossClustersCancellationIT extends AbstractMultiClustersTestCase { private static final String REMOTE_CLUSTER = "cluster-a"; + @Override + protected boolean reuseClusters() { + return false; + } + @Override protected Collection remoteClusterAlias() { return List.of(REMOTE_CLUSTER); From d2274f590babb3004bfdc59437cd40b821fc0f1a Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Tue, 26 Nov 2024 14:38:37 -0700 Subject: [PATCH 13/24] Rename method --- .../elasticsearch/xpack/esql/plugin/ComputeListener.java | 4 ++-- .../xpack/esql/session/EsqlSessionCCSUtils.java | 9 +++++---- .../xpack/esql/session/EsqlSessionCCSUtilsTests.java | 2 +- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java index 8c568e437a9fd..c1615ba43cf60 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java @@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.markClusterEmptyInfo; +import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.markClusterNoShards; /** * A variant of {@link RefCountingListener} with the following differences: @@ -212,7 +212,7 @@ void markAsPartial(String computeClusterAlias, Exception e) { // We use PARTIAL here because we can not know whether the cluster have already sent any data. if (status != EsqlExecutionInfo.Cluster.Status.PARTIAL) { LOGGER.debug("Marking failed cluster {} as partial: {}", computeClusterAlias, e); - markClusterEmptyInfo(esqlExecutionInfo, computeClusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL, e); + markClusterNoShards(esqlExecutionInfo, computeClusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL, e); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java index 8bd3a273aa8f6..5773c28ccd2aa 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java @@ -165,7 +165,7 @@ static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInf entry.getValue().getException() ); if (skipUnavailable) { - markClusterEmptyInfo(execInfo, clusterAlias, Cluster.Status.SKIPPED, e); + markClusterNoShards(execInfo, clusterAlias, Cluster.Status.SKIPPED, e); } else { throw e; } @@ -218,7 +218,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn status = Cluster.Status.SKIPPED; failureException = new VerificationException("Unknown index [" + indexExpression + "]"); } - markClusterEmptyInfo(executionInfo, c, status, failureException); + markClusterNoShards(executionInfo, c, status, failureException); } } if (fatalErrorMessage != null) { @@ -227,9 +227,10 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn } /** - * Mark cluster with an empty cluster state with the given status and potentially failure from exception. + * Mark cluster with a default cluster state with the given status and potentially failure from exception. + * Most metrics are set to 0 except for "took" which is set to the total time taken so far. */ - public static void markClusterEmptyInfo( + public static void markClusterNoShards( EsqlExecutionInfo executionInfo, String clusterAlias, Cluster.Status status, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java index daf9a894ccbeb..c6a4bf4294215 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java @@ -166,7 +166,7 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { Exception ex = new NoSeedNodeLeftException("unable to connect"); var unvailableClusters = Map.of(remote1Alias, ex, remote2Alias, ex); unvailableClusters.forEach( - (c, f) -> EsqlSessionCCSUtils.markClusterEmptyInfo(executionInfo, c, EsqlExecutionInfo.Cluster.Status.PARTIAL, ex) + (c, f) -> EsqlSessionCCSUtils.markClusterNoShards(executionInfo, c, EsqlExecutionInfo.Cluster.Status.PARTIAL, ex) ); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(localClusterAlias, remote1Alias, remote2Alias))); From 177d2dcd9894ed8f537425353551178e891fcf9b Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Wed, 27 Nov 2024 14:17:08 -0700 Subject: [PATCH 14/24] Some more comments --- .../elasticsearch/xpack/esql/plugin/ComputeListener.java | 5 +++-- .../elasticsearch/xpack/esql/plugin/ComputeService.java | 8 ++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java index c1615ba43cf60..d1e462082197c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java @@ -203,13 +203,14 @@ && runningOnRemoteCluster() == false /** * Marks the cluster as PARTIAL and adds the exception to the cluster's failures record. - * Currently, additional failures are not recorded, TODO: check if this should be the case. + * Currently, additional failures are not recorded. + * TODO: add accumulating failures. */ void markAsPartial(String computeClusterAlias, Exception e) { var status = esqlExecutionInfo.getCluster(computeClusterAlias).getStatus(); assert status != EsqlExecutionInfo.Cluster.Status.SKIPPED : "We shouldn't be running compute on a cluster that's already marked as skipped"; - // We use PARTIAL here because we can not know whether the cluster have already sent any data. + // We use PARTIAL here because we can not know whether the cluster has already sent any data. if (status != EsqlExecutionInfo.Cluster.Status.PARTIAL) { LOGGER.debug("Marking failed cluster {} as partial: {}", computeClusterAlias, e); markClusterNoShards(esqlExecutionInfo, computeClusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL, e); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 84e87f2c04c39..436bcfc2aca51 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -401,8 +401,10 @@ private void startComputeOnRemoteClusters( refs.acquire().delegateFailureAndWrap((l, unused) -> { var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices); var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, childSessionId, configuration, remotePlan); - - // Create group task for this cluster + // Create group task for this cluster. This group task ensures that two branches of the computation: + // the exchange sink and the cluster request, belong to the same group and each of them can cancel the other. + // runAfter listeners below ensure that the group is finalized when both branches are done. + // The group task is the child of the root task, so if the root task is cancelled, the group task is cancelled too. CancellableTask groupTask = createGroupTask(rootTask, clusterRequest.getDescription()); CountDown countDown = new CountDown(2); // The group is done when both the sink and the cluster request are done @@ -425,6 +427,8 @@ private void startComputeOnRemoteClusters( var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, cluster.connection); exchangeSource.addRemoteSink( remoteSink, + // This applies to all the subrequests for this cluster, and all the failures. + // If we need to change it depending on the kind of failure, we'll have to replace this with a predicate. suppressRemoteFailure == false, queryPragmas.concurrentExchangeClients(), ActionListener.runAfter(exchangeListener, finishGroup) From 0128877c0fe6752655878898fdf0d79f85c86da8 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Wed, 27 Nov 2024 14:35:10 -0700 Subject: [PATCH 15/24] Post-sync updates --- .../xpack/esql/action/EsqlExecutionInfo.java | 11 ----------- .../xpack/esql/plugin/ComputeListener.java | 3 +-- 2 files changed, 1 insertion(+), 13 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index 9a993445ce99a..ba7a7e8266845 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -160,17 +160,6 @@ public void markEndQuery() { overallTook = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS); } - /** - * How much time the query took since starting. - */ - public TimeValue tookSoFar() { - if (relativeStartNanos == null) { - return new TimeValue(0); - } else { - return new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS); - } - } - // for testing only - use markEndQuery in production code void overallTook(TimeValue took) { this.overallTook = took; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java index dfde94795313b..3b829e6897894 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java @@ -138,8 +138,7 @@ private ComputeListener( private static void setFinalStatusAndShardCounts(String clusterAlias, EsqlExecutionInfo executionInfo) { executionInfo.swapCluster(clusterAlias, (k, v) -> { - // TODO: once PARTIAL status is supported (partial results work to come), modify this code as needed - if (v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) { + if (v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED && v.getStatus() != EsqlExecutionInfo.Cluster.Status.PARTIAL) { assert v.getTotalShards() != null && v.getSkippedShards() != null : "Null total or skipped shard count: " + v; return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL) /* From a431cf9f036dc4e6313c7a4dfabf056de1ffb177 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Wed, 27 Nov 2024 18:43:27 -0700 Subject: [PATCH 16/24] Add test for failure --- .../esql/action/CrossClustersQueryIT.java | 72 +++++++++++++++++-- 1 file changed, 68 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java index 370c30c0d5276..686b1006f2bc6 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java @@ -31,6 +31,8 @@ import org.elasticsearch.test.XContentTestUtils; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; @@ -61,10 +63,10 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase { private static final String REMOTE_CLUSTER_1 = "cluster-a"; private static final String REMOTE_CLUSTER_2 = "remote-b"; - private static String LOCAL_INDEX = "logs-1"; - private static String IDX_ALIAS = "alias1"; - private static String FILTERED_IDX_ALIAS = "alias-filtered-1"; - private static String REMOTE_INDEX = "logs-2"; + private static final String LOCAL_INDEX = "logs-1"; + private static final String IDX_ALIAS = "alias1"; + private static final String FILTERED_IDX_ALIAS = "alias-filtered-1"; + private static final String REMOTE_INDEX = "logs-2"; @Override protected Collection remoteClusterAlias() { @@ -81,6 +83,7 @@ protected Collection> nodePlugins(String clusterAlias) { List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); plugins.add(EsqlPlugin.class); plugins.add(InternalExchangePlugin.class); + plugins.add(EsqlNodeFailureIT.FailingFieldPlugin.class); return plugins; } @@ -1237,6 +1240,26 @@ public void testWarnings() throws Exception { assertTrue(latch.await(30, TimeUnit.SECONDS)); } + public void testRemoteFailureSkipUnavailable() throws IOException { + Map testClusterInfo = setupFailClusters(); + String localIndex = (String) testClusterInfo.get("local.index"); + String remote1Index = (String) testClusterInfo.get("remote.index"); + int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); + String q = Strings.format("FROM %s,cluster-a:%s*", localIndex, remote1Index); + try (EsqlQueryResponse resp = runQuery(q, false)) { + assertThat(getValuesList(resp).size(), greaterThanOrEqualTo(1)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + assertExpectedClustersForMissingIndicesTests( + executionInfo, + List.of( + new ExpectedCluster(LOCAL_CLUSTER, localIndex, EsqlExecutionInfo.Cluster.Status.SUCCESSFUL, localNumShards), + new ExpectedCluster(REMOTE_CLUSTER_1, remote1Index + "*", EsqlExecutionInfo.Cluster.Status.PARTIAL, 0) + ) + ); + } + } + private static void assertClusterMetadataInResponse(EsqlQueryResponse resp, boolean responseExpectMeta) { try { final Map esqlResponseAsMap = XContentTestUtils.convertToMap(resp); @@ -1333,6 +1356,22 @@ Map setupClusters(int numClusters) { return clusterInfo; } + Map setupFailClusters() throws IOException { + int numShardsLocal = randomIntBetween(1, 5); + populateLocalIndices(LOCAL_INDEX, numShardsLocal); + + int numShardsRemote = randomIntBetween(1, 5); + populateRemoteIndicesFail(REMOTE_CLUSTER_1, REMOTE_INDEX, numShardsRemote); + + Map clusterInfo = new HashMap<>(); + clusterInfo.put("local.num_shards", numShardsLocal); + clusterInfo.put("local.index", LOCAL_INDEX); + clusterInfo.put("remote.num_shards", numShardsRemote); + clusterInfo.put("remote.index", REMOTE_INDEX); + setSkipUnavailable(REMOTE_CLUSTER_1, true); + return clusterInfo; + } + /** * For the local cluster and REMOTE_CLUSTER_1 it creates a standard alias to the index created in populateLocalIndices * and populateRemoteIndices. It also creates a filtered alias against those indices that looks like: @@ -1455,6 +1494,31 @@ void populateRemoteIndices(String clusterAlias, String indexName, int numShards) remoteClient.admin().indices().prepareRefresh(indexName).get(); } + void populateRemoteIndicesFail(String clusterAlias, String indexName, int numShards) throws IOException { + Client remoteClient = client(clusterAlias); + XContentBuilder mapping = JsonXContent.contentBuilder().startObject(); + mapping.startObject("runtime"); + { + mapping.startObject("fail_me"); + { + mapping.field("type", "long"); + mapping.startObject("script").field("source", "").field("lang", "fail").endObject(); + } + mapping.endObject(); + } + mapping.endObject(); + assertAcked( + remoteClient.admin() + .indices() + .prepareCreate(indexName) + .setSettings(Settings.builder().put("index.number_of_shards", numShards)) + .setMapping(mapping.endObject()) + ); + + remoteClient.prepareIndex(indexName).setSource("id", 0).get(); + remoteClient.admin().indices().prepareRefresh(indexName).get(); + } + private void setSkipUnavailable(String clusterAlias, boolean skip) { client(LOCAL_CLUSTER).admin() .cluster() From 2a64e304718bf287a3229e91427ffb902ba06d96 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Wed, 27 Nov 2024 18:44:22 -0700 Subject: [PATCH 17/24] Update docs/changelog/116365.yaml --- docs/changelog/116365.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/changelog/116365.yaml b/docs/changelog/116365.yaml index 9903587bf4cee..b799be4e86f30 100644 --- a/docs/changelog/116365.yaml +++ b/docs/changelog/116365.yaml @@ -2,4 +2,4 @@ pr: 116365 summary: Ignore remote ES|QL execution failures when skip_unavailable=true area: ES|QL type: enhancement -issues: [ 112886 ] +issues: [] From 8a32ddec412b7e7a9ed36019635640b37274cc00 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Mon, 2 Dec 2024 12:46:29 -0700 Subject: [PATCH 18/24] Organize listeners & add exchange failure handling --- .../xpack/esql/plugin/ComputeService.java | 122 ++++++++++++------ 1 file changed, 81 insertions(+), 41 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 9e13ec8671498..48902c6a998d6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -404,70 +404,118 @@ private void startComputeOnRemoteClusters( final String clusterAlias = cluster.clusterAlias(); final boolean suppressRemoteFailure = computeListener.shouldIgnoreRemoteErrors(clusterAlias); final var childSessionId = newChildSession(sessionId); + var openExchangeListener = refs.acquire(); ExchangeService.openExchange( transportService, cluster.connection, childSessionId, queryPragmas.exchangeBufferSize(), esqlExecutor, - refs.acquire().delegateFailureAndWrap((l, unused) -> { + ActionListener.wrap(unused -> { + // Exchange opening is successful var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices); var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, childSessionId, configuration, remotePlan); - // Create group task for this cluster. This group task ensures that two branches of the computation: - // the exchange sink and the cluster request, belong to the same group and each of them can cancel the other. - // runAfter listeners below ensure that the group is finalized when both branches are done. - // The group task is the child of the root task, so if the root task is cancelled, the group task is cancelled too. - CancellableTask groupTask = createGroupTask(rootTask, clusterRequest.getDescription()); - CountDown countDown = new CountDown(2); - // The group is done when both the sink and the cluster request are done - Runnable finishGroup = () -> { - if (countDown.countDown()) { - taskManager.unregister(groupTask); - l.onResponse(null); - } - }; - // Cancel the group on sink failure - ActionListener exchangeListener = computeListener.acquireAvoid().delegateResponse((inner, e) -> { - taskManager.cancelTaskAndDescendants(groupTask, "exchange sink failure", true, ActionListener.noop()); - if (suppressRemoteFailure) { - computeListener.markAsPartial(clusterAlias, e); - inner.onResponse(null); - } else { - inner.onFailure(e); - } - }); - var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, cluster.connection); + var listenerGroup = new RemoteListenerGroup(rootTask, computeListener, clusterAlias, openExchangeListener); + var remoteSink = exchangeService.newRemoteSink( + listenerGroup.getGroupTask(), + childSessionId, + transportService, + cluster.connection + ); exchangeSource.addRemoteSink( remoteSink, // This applies to all the subrequests for this cluster, and all the failures. // If we need to change it depending on the kind of failure, we'll have to replace this with a predicate. suppressRemoteFailure == false, queryPragmas.concurrentExchangeClients(), - ActionListener.runAfter(exchangeListener, finishGroup) + listenerGroup.getExchangeRequestListener() ); - // Cancel the group on cluster request failure - var clusterRequestListener = computeListener.acquireCompute(clusterAlias).delegateResponse((inner, e) -> { - taskManager.cancelTaskAndDescendants(groupTask, "exchange cluster action failure", true, ActionListener.noop()); - inner.onFailure(e); - }); transportService.sendChildRequest( cluster.connection, CLUSTER_ACTION_NAME, clusterRequest, - groupTask, + listenerGroup.getGroupTask(), TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>( - ActionListener.runAfter(clusterRequestListener, finishGroup), + listenerGroup.getClusterRequestListener(), ComputeResponse::new, esqlExecutor ) ); + }, e -> { + // Exchange opening failed + if (suppressRemoteFailure) { + computeListener.markAsPartial(clusterAlias, e); + openExchangeListener.onResponse(null); + } else { + openExchangeListener.onFailure(e); + } }) ); } } } + // Create group task for this cluster. This group task ensures that two branches of the computation: + // the exchange sink and the cluster request, belong to the same group and each of them can cancel the other. + // runAfter listeners below ensure that the group is finalized when both branches are done. + // The group task is the child of the root task, so if the root task is cancelled, the group task is cancelled too. + private class RemoteListenerGroup { + private final CancellableTask groupTask; + private final ActionListener exchangeRequestListener; + private final ActionListener clusterRequestListener; + + public CancellableTask getGroupTask() { + return groupTask; + } + + public ActionListener getExchangeRequestListener() { + return exchangeRequestListener; + } + + public ActionListener getClusterRequestListener() { + return clusterRequestListener; + } + + RemoteListenerGroup(Task rootTask, ComputeListener computeListener, String clusterAlias, ActionListener delegate) { + final boolean suppressRemoteFailure = computeListener.shouldIgnoreRemoteErrors(clusterAlias); + groupTask = createGroupTask(rootTask, rootTask.getDescription() + "[" + clusterAlias + "]"); + CountDown countDown = new CountDown(2); + // The group is done when both the sink and the cluster request are done + Runnable finishGroup = () -> { + if (countDown.countDown()) { + taskManager.unregister(groupTask); + delegate.onResponse(null); + } + }; + // Cancel the group on sink failure + ActionListener exchangeListener = computeListener.acquireAvoid().delegateResponse((inner, e) -> { + taskManager.cancelTaskAndDescendants(groupTask, "exchange sink failure", true, ActionListener.noop()); + if (suppressRemoteFailure) { + computeListener.markAsPartial(clusterAlias, e); + inner.onResponse(null); + } else { + inner.onFailure(e); + } + }); + exchangeRequestListener = ActionListener.runAfter(exchangeListener, finishGroup); + // Cancel the group on cluster request failure + var clusterListener = computeListener.acquireCompute(clusterAlias).delegateResponse((inner, e) -> { + taskManager.cancelTaskAndDescendants(groupTask, "exchange cluster action failure", true, ActionListener.noop()); + inner.onFailure(e); + }); + clusterRequestListener = ActionListener.runAfter(clusterListener, finishGroup); + } + + private CancellableTask createGroupTask(Task parentTask, String description) { + return (CancellableTask) taskManager.register( + "transport", + "esql_compute_group", + new ComputeGroupTaskRequest(parentTask.taskInfo(transportService.getLocalNode().getId(), false).taskId(), description) + ); + } + } + void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, ActionListener listener) { listener = ActionListener.runBefore(listener, () -> Releasables.close(context.searchContexts)); List contexts = new ArrayList<>(context.searchContexts.size()); @@ -986,14 +1034,6 @@ private String newChildSession(String session) { return session + "/" + childSessionIdGenerator.incrementAndGet(); } - private CancellableTask createGroupTask(Task parentTask, String description) { - return (CancellableTask) taskManager.register( - "transport", - "esql_compute_group", - new ComputeGroupTaskRequest(parentTask.taskInfo(transportService.getLocalNode().getId(), false).taskId(), description) - ); - } - private static class ComputeGroupTaskRequest extends TransportRequest { private final String parentDescription; From 441b3908dca38aaee6f1f1ec5b14b457acc87921 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Mon, 2 Dec 2024 13:14:25 -0700 Subject: [PATCH 19/24] Test fixes --- .../action/CrossClustersCancellationIT.java | 41 ++++++++----------- 1 file changed, 16 insertions(+), 25 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java index 030ee6c38635f..cea9a5f7b7b5b 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java @@ -295,19 +295,22 @@ public void testCancelSkipUnavailable() throws Exception { }); var cancelRequest = new CancelTasksRequest().setTargetTaskId(rootTasks.get(0).taskId()).setReason("remote failed"); client(REMOTE_CLUSTER).execute(TransportCancelTasksAction.TYPE, cancelRequest); - assertBusy(() -> { - List drivers = client(REMOTE_CLUSTER).admin() - .cluster() - .prepareListTasks() - .setActions(DriverTaskRunner.ACTION_NAME) - .get() - .getTasks(); - assertThat(drivers.size(), greaterThanOrEqualTo(1)); - for (TaskInfo driver : drivers) { - assertTrue(driver.cancellable()); - } - }); - PauseFieldPlugin.allowEmitting.countDown(); + try { + assertBusy(() -> { + List drivers = client(REMOTE_CLUSTER).admin() + .cluster() + .prepareListTasks() + .setActions(DriverTaskRunner.ACTION_NAME) + .get() + .getTasks(); + assertThat(drivers.size(), greaterThanOrEqualTo(1)); + for (TaskInfo driver : drivers) { + assertTrue(driver.cancelled()); + } + }); + } finally { + PauseFieldPlugin.allowEmitting.countDown(); + } var resp = requestFuture.actionGet(); EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); @@ -334,18 +337,6 @@ public void testCloseSkipUnavailable() throws Exception { request.pragmas(randomPragmas()); var requestFuture = client().execute(EsqlQueryAction.INSTANCE, request); assertTrue(PauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS)); - assertBusy(() -> { - List drivers = client(REMOTE_CLUSTER).admin() - .cluster() - .prepareListTasks() - .setActions(DriverTaskRunner.ACTION_NAME) - .get() - .getTasks(); - assertThat(drivers.size(), greaterThanOrEqualTo(1)); - for (TaskInfo driver : drivers) { - assertTrue(driver.cancellable()); - } - }); PauseFieldPlugin.allowEmitting.countDown(); cluster(REMOTE_CLUSTER).close(); try (var resp = requestFuture.actionGet()) { From 1e73b14da7b02fc7b8b7b5790aa22ee6a10b18f8 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Tue, 3 Dec 2024 10:49:43 -0700 Subject: [PATCH 20/24] Pull feedback --- .../esql/action/CrossClustersQueryIT.java | 3 +- .../xpack/esql/plugin/ComputeListener.java | 4 +- .../xpack/esql/plugin/ComputeService.java | 58 ++++++++++--------- .../esql/session/EsqlSessionCCSUtils.java | 8 ++- .../session/EsqlSessionCCSUtilsTests.java | 7 ++- 5 files changed, 47 insertions(+), 33 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java index 686b1006f2bc6..7511bdc99bbe6 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java @@ -526,6 +526,7 @@ public void testSearchesAgainstNonMatchingIndicesWithSkipUnavailableTrue() { new ExpectedCluster( REMOTE_CLUSTER_1, "nomatch," + remote1Index + "*", + // The nomatch index handling happens at runtime, hence we get PARTIAL instead of SUCCESSFUL EsqlExecutionInfo.Cluster.Status.PARTIAL, 0 ) @@ -547,7 +548,7 @@ public void testSearchesAgainstNonMatchingIndicesWithSkipUnavailableTrue() { new ExpectedCluster( REMOTE_CLUSTER_1, "nomatch," + remote1Index + "*", - // TODO: this probably should be PARTIAL instead + // TODO: this probably should be PARTIAL instead, matching the above case EsqlExecutionInfo.Cluster.Status.SUCCESSFUL, 0 ) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java index 3b829e6897894..736543553d244 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java @@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.markClusterNoShards; +import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards; /** * A variant of {@link RefCountingListener} with the following differences: @@ -227,7 +227,7 @@ void markAsPartial(String computeClusterAlias, Exception e) { // We use PARTIAL here because we can not know whether the cluster has already sent any data. if (status != EsqlExecutionInfo.Cluster.Status.PARTIAL) { LOGGER.debug("Marking failed cluster {} as partial: {}", computeClusterAlias, e); - markClusterNoShards(esqlExecutionInfo, computeClusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL, e); + markClusterWithFinalStateAndNoShards(esqlExecutionInfo, computeClusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL, e); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 48902c6a998d6..ca0c7dd11ada7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -88,6 +88,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME; @@ -465,21 +466,9 @@ private class RemoteListenerGroup { private final ActionListener exchangeRequestListener; private final ActionListener clusterRequestListener; - public CancellableTask getGroupTask() { - return groupTask; - } - - public ActionListener getExchangeRequestListener() { - return exchangeRequestListener; - } - - public ActionListener getClusterRequestListener() { - return clusterRequestListener; - } - RemoteListenerGroup(Task rootTask, ComputeListener computeListener, String clusterAlias, ActionListener delegate) { final boolean suppressRemoteFailure = computeListener.shouldIgnoreRemoteErrors(clusterAlias); - groupTask = createGroupTask(rootTask, rootTask.getDescription() + "[" + clusterAlias + "]"); + groupTask = createGroupTask(rootTask, () -> rootTask.getDescription() + "[" + clusterAlias + "]"); CountDown countDown = new CountDown(2); // The group is done when both the sink and the cluster request are done Runnable finishGroup = () -> { @@ -490,24 +479,41 @@ public ActionListener getClusterRequestListener() { }; // Cancel the group on sink failure ActionListener exchangeListener = computeListener.acquireAvoid().delegateResponse((inner, e) -> { - taskManager.cancelTaskAndDescendants(groupTask, "exchange sink failure", true, ActionListener.noop()); - if (suppressRemoteFailure) { - computeListener.markAsPartial(clusterAlias, e); - inner.onResponse(null); - } else { - inner.onFailure(e); - } + taskManager.cancelTaskAndDescendants(groupTask, "exchange sink failure", true, ActionListener.running(() -> { + if (suppressRemoteFailure) { + computeListener.markAsPartial(clusterAlias, e); + inner.onResponse(null); + } else { + inner.onFailure(e); + } + })); }); exchangeRequestListener = ActionListener.runAfter(exchangeListener, finishGroup); // Cancel the group on cluster request failure var clusterListener = computeListener.acquireCompute(clusterAlias).delegateResponse((inner, e) -> { - taskManager.cancelTaskAndDescendants(groupTask, "exchange cluster action failure", true, ActionListener.noop()); - inner.onFailure(e); + taskManager.cancelTaskAndDescendants( + groupTask, + "exchange cluster action failure", + true, + ActionListener.running(() -> inner.onFailure(e)) + ); }); clusterRequestListener = ActionListener.runAfter(clusterListener, finishGroup); } - private CancellableTask createGroupTask(Task parentTask, String description) { + public CancellableTask getGroupTask() { + return groupTask; + } + + public ActionListener getExchangeRequestListener() { + return exchangeRequestListener; + } + + public ActionListener getClusterRequestListener() { + return clusterRequestListener; + } + + private CancellableTask createGroupTask(Task parentTask, Supplier description) { return (CancellableTask) taskManager.register( "transport", "esql_compute_group", @@ -1035,9 +1041,9 @@ private String newChildSession(String session) { } private static class ComputeGroupTaskRequest extends TransportRequest { - private final String parentDescription; + private final Supplier parentDescription; - ComputeGroupTaskRequest(TaskId parentTask, String description) { + ComputeGroupTaskRequest(TaskId parentTask, Supplier description) { this.parentDescription = description; setParentTask(parentTask); } @@ -1050,7 +1056,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId, @Override public String getDescription() { - return "group [" + parentDescription + "]"; + return "group [" + parentDescription.get() + "]"; } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java index 5773c28ccd2aa..8dbe81575b7a7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java @@ -165,7 +165,7 @@ static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInf entry.getValue().getException() ); if (skipUnavailable) { - markClusterNoShards(execInfo, clusterAlias, Cluster.Status.SKIPPED, e); + markClusterWithFinalStateAndNoShards(execInfo, clusterAlias, Cluster.Status.SKIPPED, e); } else { throw e; } @@ -218,7 +218,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn status = Cluster.Status.SKIPPED; failureException = new VerificationException("Unknown index [" + indexExpression + "]"); } - markClusterNoShards(executionInfo, c, status, failureException); + markClusterWithFinalStateAndNoShards(executionInfo, c, status, failureException); } } if (fatalErrorMessage != null) { @@ -229,13 +229,15 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn /** * Mark cluster with a default cluster state with the given status and potentially failure from exception. * Most metrics are set to 0 except for "took" which is set to the total time taken so far. + * The status must be the final state of the cluster, not RUNNING. */ - public static void markClusterNoShards( + public static void markClusterWithFinalStateAndNoShards( EsqlExecutionInfo executionInfo, String clusterAlias, Cluster.Status status, @Nullable Exception ex ) { + assert status != Cluster.Status.RUNNING : "status must be a final state, not RUNNING"; executionInfo.swapCluster(clusterAlias, (k, v) -> { Cluster.Builder builder = new Cluster.Builder(v).setStatus(status) .setTook(executionInfo.tookSoFar()) diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java index c6a4bf4294215..2f99c679598d2 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java @@ -166,7 +166,12 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { Exception ex = new NoSeedNodeLeftException("unable to connect"); var unvailableClusters = Map.of(remote1Alias, ex, remote2Alias, ex); unvailableClusters.forEach( - (c, f) -> EsqlSessionCCSUtils.markClusterNoShards(executionInfo, c, EsqlExecutionInfo.Cluster.Status.PARTIAL, ex) + (c, f) -> EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards( + executionInfo, + c, + EsqlExecutionInfo.Cluster.Status.PARTIAL, + ex + ) ); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(localClusterAlias, remote1Alias, remote2Alias))); From cbae22282560863fb4b089d9deb91cf147fe6f3e Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Thu, 12 Dec 2024 16:12:15 -0700 Subject: [PATCH 21/24] spotless --- .../elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java index 3ebc7462d7501..e98a40dd20bd5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java @@ -16,7 +16,6 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.Nullable; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.transport.ConnectTransportException; From 1f1601bf675ca219cbc03f6e708fdaaf762b8a0b Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Tue, 14 Jan 2025 12:22:43 -0700 Subject: [PATCH 22/24] fix test --- .../org/elasticsearch/test/FailingFieldPlugin.java | 4 +++- .../esql/action/AbstractPausableIntegTestCase.java | 4 +++- .../xpack/esql/action/AbstractPauseFieldPlugin.java | 4 +++- .../xpack/esql/action/CrossClusterAsyncQueryIT.java | 2 +- .../esql/action/CrossClustersCancellationIT.java | 12 +++++++----- .../xpack/esql/action/CrossClustersQueryIT.java | 5 +++-- 6 files changed, 20 insertions(+), 11 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/FailingFieldPlugin.java b/test/framework/src/main/java/org/elasticsearch/test/FailingFieldPlugin.java index 64c90826fda85..670191676726a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/FailingFieldPlugin.java +++ b/test/framework/src/main/java/org/elasticsearch/test/FailingFieldPlugin.java @@ -24,12 +24,14 @@ public class FailingFieldPlugin extends Plugin implements ScriptPlugin { + public static final String FAILING_FIELD_LANG = "failing_field"; + @Override public ScriptEngine getScriptEngine(Settings settings, Collection> contexts) { return new ScriptEngine() { @Override public String getType() { - return "failing_field"; + return FAILING_FIELD_LANG; } @Override diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPausableIntegTestCase.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPausableIntegTestCase.java index 8054b260f0060..7b68969c09642 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPausableIntegTestCase.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPausableIntegTestCase.java @@ -22,6 +22,8 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.xpack.esql.action.AbstractPauseFieldPlugin.PAUSE_FIELD_LANG; + /** A pausable testcase. Subclasses extend this testcase to simulate slow running queries. * * Uses the evaluation of a runtime field in the mappings "pause_me" of type long, along @@ -64,7 +66,7 @@ public void setupIndex() throws IOException { mapping.startObject("pause_me"); { mapping.field("type", "long"); - mapping.startObject("script").field("source", "").field("lang", "pause").endObject(); + mapping.startObject("script").field("source", "").field("lang", PAUSE_FIELD_LANG).endObject(); } mapping.endObject(); } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPauseFieldPlugin.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPauseFieldPlugin.java index 5554f7e571dfb..05a1e41c596e7 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPauseFieldPlugin.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPauseFieldPlugin.java @@ -28,6 +28,8 @@ */ public abstract class AbstractPauseFieldPlugin extends Plugin implements ScriptPlugin { + public static final String PAUSE_FIELD_LANG = "pause"; + // Called when the engine enters the execute() method. protected void onStartExecute() {} @@ -39,7 +41,7 @@ public ScriptEngine getScriptEngine(Settings settings, Collection requestFuture = new PlainActionFuture<>(); client().execute(EsqlQueryAction.INSTANCE, request, requestFuture); - assertTrue(PauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS)); + assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS)); List rootTasks = new ArrayList<>(); assertBusy(() -> { List tasks = client(REMOTE_CLUSTER).admin() @@ -278,7 +280,7 @@ public void testCancelSkipUnavailable() throws Exception { } }); } finally { - PauseFieldPlugin.allowEmitting.countDown(); + SimplePauseFieldPlugin.allowEmitting.countDown(); } var resp = requestFuture.actionGet(); EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); @@ -305,8 +307,8 @@ public void testCloseSkipUnavailable() throws Exception { """); request.pragmas(randomPragmas()); var requestFuture = client().execute(EsqlQueryAction.INSTANCE, request); - assertTrue(PauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS)); - PauseFieldPlugin.allowEmitting.countDown(); + assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS)); + SimplePauseFieldPlugin.allowEmitting.countDown(); cluster(REMOTE_CLUSTER).close(); try (var resp = requestFuture.actionGet()) { EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java index 2c852a56d97b9..1264ba508d08e 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.test.FailingFieldPlugin; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.XContentTestUtils; import org.elasticsearch.transport.RemoteClusterAware; @@ -83,7 +84,7 @@ protected Collection> nodePlugins(String clusterAlias) { List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class); plugins.add(InternalExchangePlugin.class); - plugins.add(EsqlNodeFailureIT.FailingFieldPlugin.class); + plugins.add(FailingFieldPlugin.class); return plugins; } @@ -1489,7 +1490,7 @@ void populateRemoteIndicesFail(String clusterAlias, String indexName, int numSha mapping.startObject("fail_me"); { mapping.field("type", "long"); - mapping.startObject("script").field("source", "").field("lang", "fail").endObject(); + mapping.startObject("script").field("source", "").field("lang", FailingFieldPlugin.FAILING_FIELD_LANG).endObject(); } mapping.endObject(); } From 4c198021aa68f546bd158973c10d3378a012d148 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Tue, 14 Jan 2025 12:56:27 -0700 Subject: [PATCH 23/24] Move test --- .../session/EsqlSessionCCSUtilsTests.java | 75 ++++++++++--------- 1 file changed, 41 insertions(+), 34 deletions(-) diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java index 0498cf8715fde..82c72d372da1d 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java @@ -170,40 +170,6 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.SKIPPED); } - // skip_unavailable=true clusters are unavailable, both marked as PARTIAL - { - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); - executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", true)); - - Exception ex = new NoSeedNodeLeftException("unable to connect"); - var unvailableClusters = Map.of(remote1Alias, ex, remote2Alias, ex); - unvailableClusters.forEach( - (c, f) -> EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards( - executionInfo, - c, - EsqlExecutionInfo.Cluster.Status.PARTIAL, - ex - ) - ); - - assertThat(executionInfo.clusterAliases(), equalTo(Set.of(localClusterAlias, remote1Alias, remote2Alias))); - assertNull(executionInfo.overallTook()); - - EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); - assertThat(localCluster.getIndexExpression(), equalTo("logs*")); - assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - - EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); - assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); - assertClusterStatusAndShardCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.PARTIAL); - - EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); - assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); - assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.PARTIAL); - } - // skip_unavailable=false cluster is unavailable, throws Exception { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); @@ -250,6 +216,47 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { } } + public void testUpdateExecutionInfoWithUnavailableClustersPartial() { + final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + final String remote1Alias = "remote1"; + final String remote2Alias = "remote2"; + + // skip_unavailable=true clusters are unavailable, both marked as PARTIAL + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); + executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); + executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); + executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", true)); + + Exception ex = new NoSeedNodeLeftException("unable to connect"); + var unvailableClusters = Map.of(remote1Alias, ex, remote2Alias, ex); + unvailableClusters.forEach( + (c, f) -> EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards( + executionInfo, + c, + EsqlExecutionInfo.Cluster.Status.PARTIAL, + ex + ) + ); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(localClusterAlias, remote1Alias, remote2Alias))); + assertNull(executionInfo.overallTook()); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); + assertThat(localCluster.getIndexExpression(), equalTo("logs*")); + assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); + assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); + assertClusterStatusAndShardCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.PARTIAL); + + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); + assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); + assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.PARTIAL); + } + + } + public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; final String remote1Alias = "remote1"; From 4f360ef4366ecb6bd28b1d69db804bc6954b7c39 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Tue, 14 Jan 2025 13:55:17 -0700 Subject: [PATCH 24/24] Fix test --- .../elasticsearch/xpack/esql/ccq/RequestIndexFilteringIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/RequestIndexFilteringIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/RequestIndexFilteringIT.java index 7c81f97714a6f..8362bcea5c251 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/RequestIndexFilteringIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/RequestIndexFilteringIT.java @@ -30,7 +30,7 @@ public class RequestIndexFilteringIT extends RequestIndexFilteringTestCase { static ElasticsearchCluster remoteCluster = Clusters.remoteCluster(); - static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster); + static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster, false); @ClassRule public static TestRule clusterRule = RuleChain.outerRule(remoteCluster).around(localCluster);