diff --git a/docs/changelog/121240.yaml b/docs/changelog/121240.yaml new file mode 100644 index 0000000000000..b0ca8e5e614db --- /dev/null +++ b/docs/changelog/121240.yaml @@ -0,0 +1,5 @@ +pr: 121240 +summary: Implement runtime skip_unavailable=true +area: ES|QL +type: enhancement +issues: [] diff --git a/test/framework/src/main/java/org/elasticsearch/test/FailingFieldPlugin.java b/test/framework/src/main/java/org/elasticsearch/test/FailingFieldPlugin.java index 64c90826fda85..670191676726a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/FailingFieldPlugin.java +++ b/test/framework/src/main/java/org/elasticsearch/test/FailingFieldPlugin.java @@ -24,12 +24,14 @@ public class FailingFieldPlugin extends Plugin implements ScriptPlugin { + public static final String FAILING_FIELD_LANG = "failing_field"; + @Override public ScriptEngine getScriptEngine(Settings settings, Collection> contexts) { return new ScriptEngine() { @Override public String getType() { - return "failing_field"; + return FAILING_FIELD_LANG; } @Override diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java index 5f3f135810322..6a1b9f9d051eb 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java @@ -31,6 +31,10 @@ public static ElasticsearchCluster remoteCluster() { } public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteCluster) { + return localCluster(remoteCluster, true); + } + + public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteCluster, Boolean skipUnavailable) { return ElasticsearchCluster.local() .name(LOCAL_CLUSTER_NAME) .distribution(DistributionType.DEFAULT) @@ -41,6 +45,7 @@ public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteClust .setting("node.roles", "[data,ingest,master,remote_cluster_client]") .setting("cluster.remote.remote_cluster.seeds", () -> "\"" + remoteCluster.getTransportEndpoint(0) + "\"") .setting("cluster.remote.connections_per_cluster", "1") + .setting("cluster.remote." + REMOTE_CLUSTER_NAME + ".skip_unavailable", skipUnavailable.toString()) .shared(true) .setting("cluster.routing.rebalance.enable", "none") .build(); diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationSkipUnFalseIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationSkipUnFalseIT.java new file mode 100644 index 0000000000000..4b9957f9f5cea --- /dev/null +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationSkipUnFalseIT.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.ccq; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.elasticsearch.test.TestClustersThreadFilter; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.junit.ClassRule; +import org.junit.rules.RuleChain; +import org.junit.rules.TestRule; + +// Duplicate of EsqlRestValidationIT test where skip_unavailable is set to false +@ThreadLeakFilters(filters = TestClustersThreadFilter.class) +public class EsqlRestValidationSkipUnFalseIT extends EsqlRestValidationIT { + static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster, false); + + @ClassRule + public static TestRule clusterRule = RuleChain.outerRule(remoteCluster).around(localCluster); + + @Override + protected String getTestRestCluster() { + return localCluster.getHttpAddresses(); + } +} diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java index d9149ee291fdf..1c2ac0e10b792 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java @@ -18,6 +18,7 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.test.FailingFieldPlugin; import org.elasticsearch.test.XContentTestUtils; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xcontent.XContentBuilder; @@ -63,6 +64,7 @@ protected Collection> nodePlugins(String clusterAlias) { plugins.add(CrossClusterAsyncQueryIT.InternalExchangePlugin.class); plugins.add(SimplePauseFieldPlugin.class); plugins.add(FailingPauseFieldPlugin.class); + plugins.add(FailingFieldPlugin.class); plugins.add(CrossClusterAsyncQueryIT.CountingPauseFieldPlugin.class); return plugins; } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java index 9c0447affc754..40ea21371e513 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java @@ -510,11 +510,17 @@ private static void assertCCSExecutionInfoDetails(EsqlExecutionInfo executionInf assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); assertTrue(executionInfo.isCrossClusterSearch()); + boolean hasPartials = false; for (String clusterAlias : executionInfo.clusterAliases()) { EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias); assertThat(cluster.getTook().millis(), greaterThanOrEqualTo(0L)); assertThat(cluster.getTook().millis(), lessThanOrEqualTo(executionInfo.overallTook().millis())); + if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.PARTIAL + || cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) { + hasPartials = true; + } } + assertThat(executionInfo.isPartial(), equalTo(hasPartials)); } private void setSkipUnavailable(String clusterAlias, boolean skip) { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java index 0ca01ae7ec69e..c1976c9fa2ad8 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java @@ -23,9 +23,12 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.TermsQueryBuilder; +import org.elasticsearch.test.FailingFieldPlugin; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.XContentTestUtils; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; @@ -433,6 +436,7 @@ public void assertExpectedClustersForMissingIndicesTests(EsqlExecutionInfo execu Set expectedClusterAliases = expected.stream().map(c -> c.clusterAlias()).collect(Collectors.toSet()); assertThat(executionInfo.clusterAliases(), equalTo(expectedClusterAliases)); + boolean hasSkipped = false; for (ExpectedCluster expectedCluster : expected) { EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(expectedCluster.clusterAlias()); String msg = cluster.getClusterAlias(); @@ -451,10 +455,12 @@ public void assertExpectedClustersForMissingIndicesTests(EsqlExecutionInfo execu assertThat(msg, cluster.getFailures().get(0).getCause(), instanceOf(VerificationException.class)); String expectedMsg = "Unknown index [" + expectedCluster.indexExpression() + "]"; assertThat(msg, cluster.getFailures().get(0).getCause().getMessage(), containsString(expectedMsg)); + hasSkipped = true; } // currently failed shards is always zero - change this once we start allowing partial data for individual shard failures assertThat(msg, cluster.getFailedShards(), equalTo(0)); } + assertThat(executionInfo.isPartial(), equalTo(hasSkipped)); } public void testSearchesWhereNonExistentClusterIsSpecifiedWithWildcards() throws Exception { @@ -500,6 +506,7 @@ public void testSearchesWhereNonExistentClusterIsSpecifiedWithWildcards() throws assertThat(executionInfo.isCrossClusterSearch(), is(true)); assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.isPartial(), equalTo(true)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER))); @@ -556,6 +563,7 @@ public void testCCSExecutionOnSearchesWithLimit0() throws Exception { long overallTookMillis = executionInfo.overallTook().millis(); assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.isPartial(), equalTo(false)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER))); EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); @@ -604,6 +612,7 @@ public void testMetadataIndex() throws Exception { assertThat(executionInfo.isCrossClusterSearch(), is(true)); assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(executionInfo.isPartial(), equalTo(false)); EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); assertThat(remoteCluster.getIndexExpression(), equalTo("logs*")); @@ -799,6 +808,17 @@ public void testWarnings() throws Exception { assertTrue(latch.await(30, TimeUnit.SECONDS)); } + // Non-disconnect remote failures still fail the request even if skip_unavailable is true + public void testRemoteFailureSkipUnavailableTrue() throws IOException { + Map testClusterInfo = setupFailClusters(); + String localIndex = (String) testClusterInfo.get("local.index"); + String remote1Index = (String) testClusterInfo.get("remote.index"); + int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); + String q = Strings.format("FROM %s,cluster-a:%s*", localIndex, remote1Index); + IllegalStateException e = expectThrows(IllegalStateException.class, () -> runQuery(q, false)); + assertThat(e.getMessage(), containsString("Accessing failing field")); + } + private static void assertClusterMetadataInResponse(EsqlQueryResponse resp, boolean responseExpectMeta) { try { final Map esqlResponseAsMap = XContentTestUtils.convertToMap(resp); @@ -925,4 +945,46 @@ Map createEmptyIndicesWithNoMappings(int numClusters) { return clusterToEmptyIndexMap; } + + Map setupFailClusters() throws IOException { + int numShardsLocal = randomIntBetween(1, 3); + populateLocalIndices(LOCAL_INDEX, numShardsLocal); + + int numShardsRemote = randomIntBetween(1, 3); + populateRemoteIndicesFail(REMOTE_CLUSTER_1, REMOTE_INDEX, numShardsRemote); + + Map clusterInfo = new HashMap<>(); + clusterInfo.put("local.num_shards", numShardsLocal); + clusterInfo.put("local.index", LOCAL_INDEX); + clusterInfo.put("remote.num_shards", numShardsRemote); + clusterInfo.put("remote.index", REMOTE_INDEX); + setSkipUnavailable(REMOTE_CLUSTER_1, true); + return clusterInfo; + } + + void populateRemoteIndicesFail(String clusterAlias, String indexName, int numShards) throws IOException { + Client remoteClient = client(clusterAlias); + XContentBuilder mapping = JsonXContent.contentBuilder().startObject(); + mapping.startObject("runtime"); + { + mapping.startObject("fail_me"); + { + mapping.field("type", "long"); + mapping.startObject("script").field("source", "").field("lang", FailingFieldPlugin.FAILING_FIELD_LANG).endObject(); + } + mapping.endObject(); + } + mapping.endObject(); + assertAcked( + remoteClient.admin() + .indices() + .prepareCreate(indexName) + .setSettings(Settings.builder().put("index.number_of_shards", numShards)) + .setMapping(mapping.endObject()) + ); + + remoteClient.prepareIndex(indexName).setSource("id", 0).get(); + remoteClient.admin().indices().prepareRefresh(indexName).get(); + } + } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java index b3e004e37bd5c..667ac23461000 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java @@ -57,6 +57,7 @@ public void testCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() throws Exc long overallTookMillis = executionInfo.overallTook().millis(); assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.isPartial(), equalTo(true)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, LOCAL_CLUSTER))); @@ -109,6 +110,7 @@ public void testCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() throws Exc long overallTookMillis = executionInfo.overallTook().millis(); assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.isPartial(), equalTo(true)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, LOCAL_CLUSTER))); @@ -161,6 +163,7 @@ public void testCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() throws Exc long overallTookMillis = executionInfo.overallTook().millis(); assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.isPartial(), equalTo(true)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2, LOCAL_CLUSTER))); @@ -233,6 +236,7 @@ public void testRemoteOnlyCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() long overallTookMillis = executionInfo.overallTook().millis(); assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.isPartial(), equalTo(true)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1))); @@ -275,6 +279,7 @@ public void testRemoteOnlyCCSAgainstDisconnectedRemoteWithSkipUnavailableTrue() long overallTookMillis = executionInfo.overallTook().millis(); assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertThat(executionInfo.isPartial(), equalTo(true)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2))); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java index 8dcc1ec57e319..4777b41afb036 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.action; +import org.elasticsearch.Build; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.TransportCancelTasksAction; @@ -75,6 +76,11 @@ public void resetPlugin() { SimplePauseFieldPlugin.resetPlugin(); } + @Override + protected boolean reuseClusters() { + return false; + } + private void createRemoteIndex(int numDocs) throws Exception { XContentBuilder mapping = JsonXContent.contentBuilder().startObject(); mapping.startObject("runtime"); @@ -96,6 +102,26 @@ private void createRemoteIndex(int numDocs) throws Exception { bulk.get(); } + private void createLocalIndex(int numDocs) throws Exception { + XContentBuilder mapping = JsonXContent.contentBuilder().startObject(); + mapping.startObject("runtime"); + { + mapping.startObject("const"); + { + mapping.field("type", "long"); + } + mapping.endObject(); + } + mapping.endObject(); + mapping.endObject(); + client(LOCAL_CLUSTER).admin().indices().prepareCreate("test").setMapping(mapping).get(); + BulkRequestBuilder bulk = client(LOCAL_CLUSTER).prepareBulk("test").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int i = 0; i < numDocs; i++) { + bulk.add(new IndexRequest().source("const", i)); + } + bulk.get(); + } + public void testCancel() throws Exception { createRemoteIndex(between(10, 100)); EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); @@ -208,4 +234,88 @@ public void testTasks() throws Exception { } requestFuture.actionGet(30, TimeUnit.SECONDS).close(); } + + // Check that cancelling remote task with skip_unavailable=true produces failure + public void testCancelSkipUnavailable() throws Exception { + createRemoteIndex(between(10, 100)); + EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); + request.query("FROM *:test | STATS total=sum(const) | LIMIT 1"); + request.pragmas(randomPragmas()); + request.includeCCSMetadata(true); + PlainActionFuture requestFuture = new PlainActionFuture<>(); + client().execute(EsqlQueryAction.INSTANCE, request, requestFuture); + assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS)); + List rootTasks = new ArrayList<>(); + assertBusy(() -> { + List tasks = client(REMOTE_CLUSTER).admin() + .cluster() + .prepareListTasks() + .setActions(ComputeService.CLUSTER_ACTION_NAME) + .get() + .getTasks(); + assertThat(tasks, hasSize(1)); + rootTasks.addAll(tasks); + }); + var cancelRequest = new CancelTasksRequest().setTargetTaskId(rootTasks.get(0).taskId()).setReason("remote failed"); + client(REMOTE_CLUSTER).execute(TransportCancelTasksAction.TYPE, cancelRequest); + try { + assertBusy(() -> { + List drivers = client(REMOTE_CLUSTER).admin() + .cluster() + .prepareListTasks() + .setActions(DriverTaskRunner.ACTION_NAME) + .get() + .getTasks(); + assertThat(drivers.size(), greaterThanOrEqualTo(1)); + for (TaskInfo driver : drivers) { + assertTrue(driver.cancelled()); + } + }); + } finally { + SimplePauseFieldPlugin.allowEmitting.countDown(); + } + + Exception error = expectThrows(Exception.class, requestFuture::actionGet); + assertThat(error.getMessage(), containsString("remote failed")); + } + + // Check that closing remote node with skip_unavailable=true produces partial + public void testCloseSkipUnavailable() throws Exception { + // We are using delay() here because closing cluster while inside pause fields doesn't seem to produce clean closure + assumeTrue("Only snapshot builds have delay()", Build.current().isSnapshot()); + createRemoteIndex(between(1000, 5000)); + createLocalIndex(10); + EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); + request.query(""" + FROM test*,cluster-a:test* METADATA _index + | EVAL cluster=MV_FIRST(SPLIT(_index, ":")) + | WHERE CASE(cluster == "cluster-a", delay(1ms), true) + | STATS total = sum(const) | LIMIT 1 + """); + request.pragmas(randomPragmas()); + var requestFuture = client().execute(EsqlQueryAction.INSTANCE, request); + assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS)); + SimplePauseFieldPlugin.allowEmitting.countDown(); + cluster(REMOTE_CLUSTER).close(); + try (var resp = requestFuture.actionGet()) { + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isPartial(), equalTo(true)); + + List> values = getValuesList(resp); + assertThat(values.get(0).size(), equalTo(1)); + // We can't be sure of the exact value here as we don't know if any data from remote came in, but all local data should be there + assertThat((long) values.get(0).get(0), greaterThanOrEqualTo(45L)); + + EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(REMOTE_CLUSTER); + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(localCluster.getSuccessfulShards(), equalTo(1)); + + assertThat(cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + assertThat(cluster.getSuccessfulShards(), equalTo(0)); + assertThat(cluster.getFailures().size(), equalTo(1)); + } + } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlAsyncTestUtils.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlAsyncTestUtils.java index d7117fb5e0750..ca10e5d3419ac 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlAsyncTestUtils.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlAsyncTestUtils.java @@ -98,7 +98,7 @@ public static void waitForCluster(Client client, String clusterName, String asyn } assertThat(clusterInfo.getStatus(), not(equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING))); } - }); + }, 30, TimeUnit.SECONDS); } public static EsqlQueryResponse runAsyncQuery(Client client, EsqlQueryRequest request) { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/SimplePauseFieldPlugin.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/SimplePauseFieldPlugin.java index 3ba73dd9a402e..a8a2d27e88569 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/SimplePauseFieldPlugin.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/SimplePauseFieldPlugin.java @@ -31,6 +31,10 @@ public void onStartExecute() { @Override public boolean onWait() throws InterruptedException { - return allowEmitting.await(30, TimeUnit.SECONDS); + try { + return allowEmitting.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + return true; + } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java index 19ed77405daa2..5953be62e8315 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java @@ -25,9 +25,11 @@ import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.session.Configuration; +import org.elasticsearch.xpack.esql.session.EsqlCCSUtils; import java.util.ArrayList; import java.util.List; @@ -71,34 +73,56 @@ void startComputeOnRemoteCluster( ExchangeSourceHandler exchangeSource, RemoteCluster cluster, Runnable cancelQueryOnFailure, + EsqlExecutionInfo executionInfo, ActionListener listener ) { var queryPragmas = configuration.pragmas(); listener = ActionListener.runBefore(listener, exchangeSource.addEmptySink()::close); final var childSessionId = computeService.newChildSession(sessionId); final AtomicReference finalResponse = new AtomicReference<>(); + final String clusterAlias = cluster.clusterAlias(); try (var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> { var resp = finalResponse.get(); return Objects.requireNonNullElseGet(resp, () -> new ComputeResponse(profiles)); }))) { + var openExchangeListener = computeListener.acquireAvoid(); ExchangeService.openExchange( transportService, cluster.connection, childSessionId, queryPragmas.exchangeBufferSize(), esqlExecutor, - computeListener.acquireCompute().delegateFailureAndWrap((l, unused) -> { - var remoteSink = exchangeService.newRemoteSink(rootTask, childSessionId, transportService, cluster.connection); + EsqlCCSUtils.skipUnavailableListener( + openExchangeListener, + executionInfo, + clusterAlias, + EsqlExecutionInfo.Cluster.Status.SKIPPED + ).delegateFailureAndWrap((l, unused) -> { + var listenerGroup = new RemoteListenerGroup( + transportService, + rootTask, + computeListener, + clusterAlias, + executionInfo, + openExchangeListener + ); + + var remoteSink = exchangeService.newRemoteSink( + listenerGroup.getGroupTask(), + childSessionId, + transportService, + cluster.connection + ); exchangeSource.addRemoteSink( remoteSink, - true, + executionInfo.isSkipUnavailable(clusterAlias) == false, () -> {}, queryPragmas.concurrentExchangeClients(), - computeListener.acquireAvoid() + listenerGroup.getExchangeRequestListener() ); var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices); - var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, childSessionId, configuration, remotePlan); - final ActionListener clusterListener = l.map(r -> { + var clusterRequest = new ClusterComputeRequest(clusterAlias, childSessionId, configuration, remotePlan); + final ActionListener clusterListener = listenerGroup.getClusterRequestListener().map(r -> { finalResponse.set(r); return r.getProfiles(); }); @@ -106,13 +130,14 @@ void startComputeOnRemoteCluster( cluster.connection, ComputeService.CLUSTER_ACTION_NAME, clusterRequest, - rootTask, + listenerGroup.getGroupTask(), TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor) ); }) ); } + } List getRemoteClusters( diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 71c2a65037e9a..55c093554d7a5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -276,6 +276,7 @@ public void execute( exchangeSource, cluster, cancelQueryOnFailure, + execInfo, computeListener.acquireCompute().map(r -> { updateExecutionInfo(execInfo, cluster.clusterAlias(), r); return r.getProfiles(); @@ -309,11 +310,10 @@ private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String cluster } else { // if the cluster is an older version and does not send back took time, then calculate it here on the coordinator // and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response - var tookTime = TimeValue.timeValueNanos(System.nanoTime() - executionInfo.getRelativeStartNanos()); executionInfo.swapCluster( clusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(runningToSuccess.apply(v.getStatus())) - .setTook(tookTime) + .setTook(executionInfo.tookSoFar()) .build() ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java new file mode 100644 index 0000000000000..9ef085257b87b --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java @@ -0,0 +1,122 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plugin; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.compute.operator.DriverProfile; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; +import org.elasticsearch.xpack.esql.session.EsqlCCSUtils; + +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +/** + * Create group task for this cluster. This group task ensures that two branches of the computation: + * the exchange sink and the cluster request, belong to the same group and each of them can cancel the other. + * runAfter listeners below ensure that the group is finalized when both branches are done. + * The group task is the child of the root task, so if the root task is cancelled, the group task is cancelled too. + */ +class RemoteListenerGroup { + private final CancellableTask groupTask; + private final ActionListener exchangeRequestListener; + private final ActionListener> clusterRequestListener; + private final TaskManager taskManager; + private final String clusterAlias; + private final EsqlExecutionInfo executionInfo; + private final TransportService transportService; + + RemoteListenerGroup( + TransportService transportService, + Task rootTask, + ComputeListener computeListener, + String clusterAlias, + EsqlExecutionInfo executionInfo, + ActionListener delegate + ) { + this.transportService = transportService; + this.taskManager = transportService.getTaskManager(); + this.clusterAlias = clusterAlias; + this.executionInfo = executionInfo; + groupTask = createGroupTask(rootTask, () -> rootTask.getDescription() + "[" + clusterAlias + "]"); + CountDown countDown = new CountDown(2); + // The group is done when both the sink and the cluster request are done + Runnable finishGroup = () -> { + if (countDown.countDown()) { + taskManager.unregister(groupTask); + delegate.onResponse(null); + } + }; + // Cancel the group on sink failure + exchangeRequestListener = createCancellingListener("exchange sink failure", computeListener.acquireAvoid(), finishGroup); + + // Cancel the group on cluster request failure + clusterRequestListener = createCancellingListener("exchange cluster action failure", computeListener.acquireCompute(), finishGroup); + } + + /** + * Create a listener that: + * 1. Cancels the group task on failure + * 2. Marks the cluster as partial if the error is ignorable, otherwise propagates the error + */ + private ActionListener createCancellingListener(String reason, ActionListener delegate, Runnable finishGroup) { + return ActionListener.runAfter(delegate.delegateResponse((inner, e) -> { + taskManager.cancelTaskAndDescendants(groupTask, reason, true, ActionListener.running(() -> { + EsqlCCSUtils.skipUnavailableListener(delegate, executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL) + .onFailure(e); + })); + }), finishGroup); + } + + public CancellableTask getGroupTask() { + return groupTask; + } + + public ActionListener getExchangeRequestListener() { + return exchangeRequestListener; + } + + public ActionListener> getClusterRequestListener() { + return clusterRequestListener; + } + + private CancellableTask createGroupTask(Task parentTask, Supplier description) { + return (CancellableTask) taskManager.register( + "transport", + "esql_compute_group", + new ComputeGroupTaskRequest(parentTask.taskInfo(transportService.getLocalNode().getId(), false).taskId(), description) + ); + } + + private static class ComputeGroupTaskRequest extends TransportRequest { + private final Supplier parentDescription; + + ComputeGroupTaskRequest(TaskId parentTask, Supplier description) { + this.parentDescription = description; + setParentTask(parentTask); + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + assert parentTaskId.isSet(); + return new CancellableTask(id, type, action, "", parentTaskId, headers); + } + + @Override + public String getDescription() { + return "group [" + parentDescription.get() + "]"; + } + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index b3a2c403137f3..caf70063e08fa 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -267,6 +267,16 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener { + // If we had any skipped or partial clusters, the result is partial + if (executionInfo.getClusters() + .values() + .stream() + .anyMatch( + c -> c.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED + || c.getStatus() == EsqlExecutionInfo.Cluster.Status.PARTIAL + )) { + executionInfo.markAsPartial(); + } recordCCSTelemetry(task, executionInfo, request, null); listener.onResponse(toResponse(task, request, configuration, result)); }, ex -> { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java similarity index 85% rename from x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java rename to x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index 6be243456e040..64e5c6647e9ca 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.license.XPackLicenseState; @@ -25,6 +26,7 @@ import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; +import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo.Cluster; import org.elasticsearch.xpack.esql.analysis.Analyzer; import org.elasticsearch.xpack.esql.analysis.TableInfo; import org.elasticsearch.xpack.esql.index.IndexResolution; @@ -35,11 +37,12 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; -class EsqlSessionCCSUtils { +public class EsqlCCSUtils { - private EsqlSessionCCSUtils() {} + private EsqlCCSUtils() {} static Map determineUnavailableRemoteClusters(List failures) { Map unavailableRemotes = new HashMap<>(); @@ -171,16 +174,7 @@ static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInf entry.getValue().getException() ); if (skipUnavailable) { - execInfo.swapCluster( - clusterAlias, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED) - .setTotalShards(0) - .setSuccessfulShards(0) - .setSkippedShards(0) - .setFailedShards(0) - .setFailures(List.of(new ShardSearchFailure(e))) - .build() - ); + markClusterWithFinalStateAndNoShards(execInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e); } else { throw e; } @@ -338,4 +332,60 @@ public static void checkForCcsLicense( } } } + + /** + * Mark cluster with a final status (success or failure). + * Most metrics are set to 0 if not set yet, except for "took" which is set to the total time taken so far. + * The status must be the final status of the cluster, not RUNNING. + */ + public static void markClusterWithFinalStateAndNoShards( + EsqlExecutionInfo executionInfo, + String clusterAlias, + Cluster.Status status, + @Nullable Exception ex + ) { + assert status != Cluster.Status.RUNNING : "status must be a final state, not RUNNING"; + executionInfo.swapCluster(clusterAlias, (k, v) -> { + Cluster.Builder builder = new Cluster.Builder(v).setStatus(status) + .setTook(executionInfo.tookSoFar()) + .setTotalShards(Objects.requireNonNullElse(v.getTotalShards(), 0)) + .setSuccessfulShards(Objects.requireNonNullElse(v.getTotalShards(), 0)) + .setSkippedShards(Objects.requireNonNullElse(v.getTotalShards(), 0)) + .setFailedShards(Objects.requireNonNullElse(v.getTotalShards(), 0)); + if (ex != null) { + builder.setFailures(List.of(new ShardSearchFailure(ex))); + } + return builder.build(); + }); + } + + /** + * We will ignore the error if it's remote unavailable and the cluster is marked to skip unavailable. + */ + public static boolean shouldIgnoreRuntimeError(EsqlExecutionInfo executionInfo, String clusterAlias, Exception e) { + if (executionInfo.isSkipUnavailable(clusterAlias) == false) { + return false; + } + + return ExceptionsHelper.isRemoteUnavailableException(e); + } + + /** + * Wrap a listener so that it will skip errors that are ignorable + */ + public static ActionListener skipUnavailableListener( + ActionListener delegate, + EsqlExecutionInfo executionInfo, + String clusterAlias, + EsqlExecutionInfo.Cluster.Status status + ) { + return delegate.delegateResponse((l, e) -> { + if (shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) { + markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, status, e); + l.onResponse(null); + } else { + l.onFailure(e); + } + }); + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 8e64d3fe84fec..46b4c5cb281d9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -161,7 +161,7 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P parse(request.query(), request.params()), executionInfo, request.filter(), - new EsqlSessionCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) { + new EsqlCCSUtils.CssPartialErrorsActionListener(executionInfo, listener) { @Override public void onResponse(LogicalPlan analyzedPlan) { preMapper.preMapper( @@ -188,7 +188,7 @@ public void executeOptimizedPlan( ) { PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request); // TODO: this could be snuck into the underlying listener - EsqlSessionCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo); + EsqlCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo); // execute any potential subplans executeSubPlans(physicalPlan, planRunner, executionInfo, request, listener); } @@ -315,7 +315,7 @@ public void analyzedPlan( .collect(Collectors.toSet()); final List indices = preAnalysis.indices; - EsqlSessionCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, verifier.licenseState()); + EsqlCCSUtils.checkForCcsLicense(executionInfo, indices, indicesExpressionGrouper, verifier.licenseState()); final Set targetClusters = enrichPolicyResolver.groupIndicesPerCluster( indices.stream() @@ -430,7 +430,7 @@ private void preAnalyzeIndices( } // if the preceding call to the enrich policy API found unavailable clusters, recreate the index expression to search // based only on available clusters (which could now be an empty list) - String indexExpressionToResolve = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); + String indexExpressionToResolve = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); if (indexExpressionToResolve.isEmpty()) { // if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution listener.onResponse( @@ -464,8 +464,8 @@ private boolean analyzeCCSIndices( ActionListener l ) { IndexResolution indexResolution = result.indices; - EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); - EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.unavailableClusters()); + EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.unavailableClusters()); if (executionInfo.isCrossClusterSearch() && executionInfo.getClusterStateCount(EsqlExecutionInfo.Cluster.Status.RUNNING) == 0) { // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception // to let the LogicalPlanActionListener decide how to proceed diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java index 3e4dd6849478a..3e59b5218e7f8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java @@ -140,7 +140,7 @@ public IndexResolution mergedMappings(String indexPattern, FieldCapabilitiesResp fields.put(name, field); } - Map unavailableRemotes = EsqlSessionCCSUtils.determineUnavailableRemoteClusters( + Map unavailableRemotes = EsqlCCSUtils.determineUnavailableRemoteClusters( fieldCapsResponse.getFailures() ); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java similarity index 71% rename from x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java rename to x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java index a84e5b144e64c..49cfbba5c7610 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java @@ -8,7 +8,9 @@ package org.elasticsearch.xpack.esql.session; import org.apache.lucene.index.CorruptIndexException; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure; import org.elasticsearch.action.search.ShardSearchFailure; @@ -20,6 +22,7 @@ import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.license.internal.XPackLicenseStatus; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.NoSeedNodeLeftException; @@ -47,26 +50,30 @@ import java.util.stream.Collectors; import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY; -import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.checkForCcsLicense; +import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.checkForCcsLicense; +import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.shouldIgnoreRuntimeError; +import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.skipUnavailableListener; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; -public class EsqlSessionCCSUtilsTests extends ESTestCase { +public class EsqlCCSUtilsTests extends ESTestCase { + + private final String LOCAL_CLUSTER_ALIAS = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + private final String REMOTE1_ALIAS = "remote1"; + private final String REMOTE2_ALIAS = "remote2"; public void testCreateIndexExpressionFromAvailableClusters() { - final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; - final String remote1Alias = "remote1"; - final String remote2Alias = "remote2"; // no clusters marked as skipped { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); - executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", true)); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true)); + executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true)); - String indexExpr = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); + String indexExpr = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); List list = Arrays.stream(Strings.splitStringByCommaToArray(indexExpr)).toList(); assertThat(list.size(), equalTo(5)); assertThat( @@ -78,19 +85,19 @@ public void testCreateIndexExpressionFromAvailableClusters() { // one cluster marked as skipped, so not present in revised index expression { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*,foo", true)); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*,foo", true)); executionInfo.swapCluster( - remote2Alias, + REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster( - remote2Alias, + REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true, EsqlExecutionInfo.Cluster.Status.SKIPPED ) ); - String indexExpr = EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); + String indexExpr = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo); List list = Arrays.stream(Strings.splitStringByCommaToArray(indexExpr)).toList(); assertThat(list.size(), equalTo(3)); assertThat(new HashSet<>(list), equalTo(Strings.commaDelimitedListToSet("logs*,remote1:*,remote1:foo"))); @@ -99,73 +106,70 @@ public void testCreateIndexExpressionFromAvailableClusters() { // two clusters marked as skipped, so only local cluster present in revised index expression { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); executionInfo.swapCluster( - remote1Alias, - (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*,foo", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) + REMOTE1_ALIAS, + (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*,foo", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) ); executionInfo.swapCluster( - remote2Alias, + REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster( - remote2Alias, + REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true, EsqlExecutionInfo.Cluster.Status.SKIPPED ) ); - assertThat(EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("logs*")); + assertThat(EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("logs*")); } // only remotes present and all marked as skipped, so in revised index expression should be empty string { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); executionInfo.swapCluster( - remote1Alias, - (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*,foo", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) + REMOTE1_ALIAS, + (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*,foo", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) ); executionInfo.swapCluster( - remote2Alias, + REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster( - remote2Alias, + REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true, EsqlExecutionInfo.Cluster.Status.SKIPPED ) ); - assertThat(EsqlSessionCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("")); + assertThat(EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo), equalTo("")); } } public void testUpdateExecutionInfoWithUnavailableClusters() { - final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; - final String remote1Alias = "remote1"; - final String remote2Alias = "remote2"; // skip_unavailable=true clusters are unavailable, both marked as SKIPPED { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); - executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", true)); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true)); + executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", true)); var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); - var unvailableClusters = Map.of(remote1Alias, failure, remote2Alias, failure); - EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, unvailableClusters); + var unvailableClusters = Map.of(REMOTE1_ALIAS, failure, REMOTE2_ALIAS, failure); + EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, unvailableClusters); - assertThat(executionInfo.clusterAliases(), equalTo(Set.of(localClusterAlias, remote1Alias, remote2Alias))); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER_ALIAS, REMOTE1_ALIAS, REMOTE2_ALIAS))); assertNull(executionInfo.overallTook()); - EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); assertClusterStatusAndShardCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.SKIPPED); - EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS); assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.SKIPPED); } @@ -173,14 +177,17 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { // skip_unavailable=false cluster is unavailable, throws Exception { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); - executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true)); + executionInfo.swapCluster( + REMOTE2_ALIAS, + (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", false) + ); var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); RemoteTransportException e = expectThrows( RemoteTransportException.class, - () -> EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of(remote2Alias, failure)) + () -> EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of(REMOTE2_ALIAS, failure)) ); assertThat(e.status().getStatus(), equalTo(500)); assertThat( @@ -193,42 +200,42 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { // all clusters available, no Clusters in ExecutionInfo should be modified { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); - executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true)); + executionInfo.swapCluster( + REMOTE2_ALIAS, + (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", false) + ); - EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of()); + EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, Map.of()); - assertThat(executionInfo.clusterAliases(), equalTo(Set.of(localClusterAlias, remote1Alias, remote2Alias))); + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER_ALIAS, REMOTE1_ALIAS, REMOTE2_ALIAS))); assertNull(executionInfo.overallTook()); - EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); assertClusterStatusAndShardCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS); assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); } } public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { - final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; - final String remote1Alias = "remote1"; - final String remote2Alias = "remote2"; // all clusters had matching indices from field-caps call, so no updates to EsqlExecutionInfo should happen { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", randomBoolean())); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", randomBoolean())); executionInfo.swapCluster( - remote2Alias, - (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", randomBoolean()) + REMOTE2_ALIAS, + (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", randomBoolean()) ); EsIndex esIndex = new EsIndex( @@ -251,17 +258,17 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Map.of()); - EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); - EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); assertClusterStatusAndShardCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS); assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); } @@ -270,11 +277,11 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { // marked as SKIPPED with 0 total shards, 0 took time, etc. { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", randomBoolean())); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", randomBoolean())); executionInfo.swapCluster( - remote2Alias, - (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", randomBoolean()) + REMOTE2_ALIAS, + (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", randomBoolean()) ); EsIndex esIndex = new EsIndex( @@ -295,13 +302,13 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { Map unavailableClusters = Map.of(); IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters); - EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); - EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); assertThat(remote1Cluster.getTook().millis(), equalTo(0L)); @@ -310,7 +317,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { assertThat(remote1Cluster.getSkippedShards(), equalTo(0)); assertThat(remote1Cluster.getFailedShards(), equalTo(0)); - EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS); assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.RUNNING); } @@ -320,11 +327,11 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { // marked as SKIPPED { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", randomBoolean())); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", randomBoolean())); executionInfo.swapCluster( - remote2Alias, - (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1*,mylogs2*,logs*", randomBoolean()) + REMOTE2_ALIAS, + (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1*,mylogs2*,logs*", randomBoolean()) ); EsIndex esIndex = new EsIndex( @@ -334,22 +341,22 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { ); // remote1 is unavailable var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); - Map unavailableClusters = Map.of(remote1Alias, failure); + Map unavailableClusters = Map.of(REMOTE1_ALIAS, failure); IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters); - EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); - EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); // since remote1 is in the unavailable Map (passed to IndexResolution.valid), it's status will not be changed // by updateExecutionInfoWithClustersWithNoMatchingIndices (it is handled in updateExecutionInfoWithUnavailableClusters) assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); - EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS); assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1*,mylogs2*,logs*")); assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); assertThat(remote2Cluster.getTook().millis(), equalTo(0L)); @@ -363,11 +370,11 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { // but had no matching indices and since a concrete index was requested, a VerificationException is thrown { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*")); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", randomBoolean())); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*")); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", randomBoolean())); executionInfo.swapCluster( - remote2Alias, - (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", randomBoolean()) + REMOTE2_ALIAS, + (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", randomBoolean()) ); EsIndex esIndex = new EsIndex( @@ -377,11 +384,11 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { ); var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); - Map unavailableClusters = Map.of(remote1Alias, failure); + Map unavailableClusters = Map.of(REMOTE1_ALIAS, failure); IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters); VerificationException ve = expectThrows( VerificationException.class, - () -> EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution) + () -> EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution) ); assertThat(ve.getDetailedMessage(), containsString("Unknown index [remote2:mylogs1,mylogs2,logs*]")); } @@ -390,13 +397,13 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { // (the EsqlSessionCCSUtils.updateExecutionInfoWithUnavailableClusters() method handles that case not the one tested here) { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*")); - executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", randomBoolean())); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*")); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", randomBoolean())); // remote2 is already marked as SKIPPED (simulating failed enrich policy lookup due to unavailable cluster) executionInfo.swapCluster( - remote2Alias, + REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster( - remote2Alias, + REMOTE2_ALIAS, "mylogs1*,mylogs2*,logs*", randomBoolean(), EsqlExecutionInfo.Cluster.Status.SKIPPED @@ -411,22 +418,22 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { // remote1 is unavailable var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect")); - Map unavailableClusters = Map.of(remote1Alias, failure); + Map unavailableClusters = Map.of(REMOTE1_ALIAS, failure); IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters); - EsqlSessionCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); + EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); - EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER_ALIAS); assertThat(localCluster.getIndexExpression(), equalTo("logs*")); assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); - EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); // since remote1 is in the unavailable Map (passed to IndexResolution.valid), it's status will not be changed // by updateExecutionInfoWithClustersWithNoMatchingIndices (it is handled in updateExecutionInfoWithUnavailableClusters) assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); - EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS); assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1*,mylogs2*,logs*")); assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); } @@ -444,7 +451,7 @@ public void testDetermineUnavailableRemoteClusters() { ) ); - Map unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures); + Map unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(failures); assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote1", "remote2"))); } @@ -454,7 +461,7 @@ public void testDetermineUnavailableRemoteClusters() { failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSuchRemoteClusterException("remote2"))); failures.add(new FieldCapabilitiesFailure(new String[] { "remote2:mylogs1" }, new NoSeedNodeLeftException("no seed node"))); - Map unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures); + Map unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(failures); assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote2"))); } @@ -468,7 +475,7 @@ public void testDetermineUnavailableRemoteClusters() { new IllegalStateException("Unable to open any connections") ) ); - Map unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures); + Map unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(failures); assertThat(unavailableClusters.keySet(), equalTo(Set.of("remote2"))); } @@ -476,29 +483,28 @@ public void testDetermineUnavailableRemoteClusters() { { List failures = new ArrayList<>(); failures.add(new FieldCapabilitiesFailure(new String[] { "remote1:mylogs1" }, new RuntimeException("foo"))); - Map unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures); + Map unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(failures); assertThat(unavailableClusters.keySet(), equalTo(Set.of())); } // empty failures list { List failures = new ArrayList<>(); - Map unavailableClusters = EsqlSessionCCSUtils.determineUnavailableRemoteClusters(failures); + Map unavailableClusters = EsqlCCSUtils.determineUnavailableRemoteClusters(failures); assertThat(unavailableClusters.keySet(), equalTo(Set.of())); } } public void testUpdateExecutionInfoAtEndOfPlanning() { - String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; - String remote1Alias = "remote1"; - String remote2Alias = "remote2"; + String REMOTE1_ALIAS = "remote1"; + String REMOTE2_ALIAS = "remote2"; EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); - executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); executionInfo.swapCluster( - remote1Alias, - (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) + REMOTE1_ALIAS, + (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true, EsqlExecutionInfo.Cluster.Status.SKIPPED) ); - executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); + executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", false)); assertNull(executionInfo.planningTookTime()); assertNull(executionInfo.overallTook()); @@ -506,7 +512,7 @@ public void testUpdateExecutionInfoAtEndOfPlanning() { Thread.sleep(1); } catch (InterruptedException e) {} - EsqlSessionCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo); + EsqlCCSUtils.updateExecutionInfoAtEndOfPlanning(executionInfo); assertThat(executionInfo.planningTookTime().millis(), greaterThanOrEqualTo(0L)); assertNull(executionInfo.overallTook()); @@ -517,7 +523,7 @@ public void testUpdateExecutionInfoAtEndOfPlanning() { assertNull(localCluster.getTotalShards()); assertNull(localCluster.getTook()); - EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(REMOTE1_ALIAS); assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); assertThat(remote1Cluster.getTotalShards(), equalTo(0)); assertThat(remote1Cluster.getSuccessfulShards(), equalTo(0)); @@ -526,7 +532,7 @@ public void testUpdateExecutionInfoAtEndOfPlanning() { assertThat(remote1Cluster.getTook().millis(), greaterThanOrEqualTo(0L)); assertThat(remote1Cluster.getTook().millis(), equalTo(executionInfo.planningTookTime().millis())); - EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(REMOTE2_ALIAS); assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); assertNull(remote2Cluster.getTotalShards()); assertNull(remote2Cluster.getTook()); @@ -534,7 +540,10 @@ public void testUpdateExecutionInfoAtEndOfPlanning() { private void assertClusterStatusAndShardCounts(EsqlExecutionInfo.Cluster cluster, EsqlExecutionInfo.Cluster.Status status) { assertThat(cluster.getStatus(), equalTo(status)); - assertNull(cluster.getTook()); + if (cluster.getTook() != null) { + // It is also ok if it's null in some tests + assertThat(cluster.getTook().millis(), greaterThanOrEqualTo(0L)); + } if (status == EsqlExecutionInfo.Cluster.Status.RUNNING) { assertNull(cluster.getTotalShards()); assertNull(cluster.getSuccessfulShards()); @@ -545,6 +554,11 @@ private void assertClusterStatusAndShardCounts(EsqlExecutionInfo.Cluster cluster assertThat(cluster.getSuccessfulShards(), equalTo(0)); assertThat(cluster.getSkippedShards(), equalTo(0)); assertThat(cluster.getFailedShards(), equalTo(0)); + } else if (status == EsqlExecutionInfo.Cluster.Status.PARTIAL) { + assertThat(cluster.getTotalShards(), equalTo(0)); + assertThat(cluster.getSuccessfulShards(), equalTo(0)); + assertThat(cluster.getSkippedShards(), equalTo(0)); + assertThat(cluster.getFailedShards(), equalTo(0)); } else { fail("Unexpected status: " + status); } @@ -560,35 +574,32 @@ private static Map randomMapping() { } public void testReturnSuccessWithEmptyResult() { - String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; - String remote1Alias = "remote1"; - String remote2Alias = "remote2"; String remote3Alias = "remote3"; NoClustersToSearchException noClustersException = new NoClustersToSearchException(); Predicate skipUnPredicate = s -> { - if (s.equals("remote2") || s.equals("remote3")) { + if (s.equals(REMOTE2_ALIAS) || s.equals("remote3")) { return true; } return false; }; - EsqlExecutionInfo.Cluster localCluster = new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false); - EsqlExecutionInfo.Cluster remote1 = new EsqlExecutionInfo.Cluster(remote1Alias, "logs*", false); - EsqlExecutionInfo.Cluster remote2 = new EsqlExecutionInfo.Cluster(remote2Alias, "logs*", true); + EsqlExecutionInfo.Cluster localCluster = new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false); + EsqlExecutionInfo.Cluster remote1 = new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "logs*", false); + EsqlExecutionInfo.Cluster remote2 = new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "logs*", true); EsqlExecutionInfo.Cluster remote3 = new EsqlExecutionInfo.Cluster(remote3Alias, "logs*", true); // not a cross-cluster cluster search, so do not return empty result { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); - executionInfo.swapCluster(localClusterAlias, (k, v) -> localCluster); - assertFalse(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, noClustersException)); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> localCluster); + assertFalse(EsqlCCSUtils.returnSuccessWithEmptyResult(executionInfo, noClustersException)); } // local cluster is present, so do not return empty result { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); - executionInfo.swapCluster(localClusterAlias, (k, v) -> localCluster); - executionInfo.swapCluster(remote1Alias, (k, v) -> remote1); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> localCluster); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> remote1); // TODO: this logic will be added in the follow-on PR that handles missing indices // assertFalse(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, noClustersException)); } @@ -596,16 +607,16 @@ public void testReturnSuccessWithEmptyResult() { // remote-only, one cluster is skip_unavailable=false, so do not return empty result { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); - executionInfo.swapCluster(remote1Alias, (k, v) -> remote1); - executionInfo.swapCluster(remote2Alias, (k, v) -> remote2); - assertFalse(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, noClustersException)); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> remote1); + executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> remote2); + assertFalse(EsqlCCSUtils.returnSuccessWithEmptyResult(executionInfo, noClustersException)); } // remote-only, all clusters are skip_unavailable=true, so should return empty result with // NoSuchClustersException or "remote unavailable" type exception { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); - executionInfo.swapCluster(remote2Alias, (k, v) -> remote2); + executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> remote2); executionInfo.swapCluster(remote3Alias, (k, v) -> remote3); Exception e = randomFrom( new NoSuchRemoteClusterException("foo"), @@ -613,23 +624,22 @@ public void testReturnSuccessWithEmptyResult() { new NoSeedNodeLeftException("foo"), new IllegalStateException("unknown host") ); - assertTrue(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, e)); + assertTrue(EsqlCCSUtils.returnSuccessWithEmptyResult(executionInfo, e)); } // remote-only, all clusters are skip_unavailable=true, but exception is not "remote unavailable" so return false // Note: this functionality may change in follow-on PRs, so remove this test in that case { EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); - executionInfo.swapCluster(remote2Alias, (k, v) -> remote2); + executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> remote2); executionInfo.swapCluster(remote3Alias, (k, v) -> remote3); - assertFalse(EsqlSessionCCSUtils.returnSuccessWithEmptyResult(executionInfo, new NullPointerException())); + assertFalse(EsqlCCSUtils.returnSuccessWithEmptyResult(executionInfo, new NullPointerException())); } } public void testUpdateExecutionInfoToReturnEmptyResult() { - String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; - String remote1Alias = "remote1"; - String remote2Alias = "remote2"; + String REMOTE1_ALIAS = "remote1"; + String REMOTE2_ALIAS = "remote2"; String remote3Alias = "remote3"; ConnectTransportException transportEx = new ConnectTransportException(null, "foo"); Predicate skipUnPredicate = s -> { @@ -639,9 +649,9 @@ public void testUpdateExecutionInfoToReturnEmptyResult() { return false; }; - EsqlExecutionInfo.Cluster localCluster = new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false); - EsqlExecutionInfo.Cluster remote1 = new EsqlExecutionInfo.Cluster(remote1Alias, "logs*", true); - EsqlExecutionInfo.Cluster remote2 = new EsqlExecutionInfo.Cluster(remote2Alias, "logs*", true); + EsqlExecutionInfo.Cluster localCluster = new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false); + EsqlExecutionInfo.Cluster remote1 = new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "logs*", true); + EsqlExecutionInfo.Cluster remote2 = new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "logs*", true); EsqlExecutionInfo.Cluster remote3 = new EsqlExecutionInfo.Cluster(remote3Alias, "logs*", true); EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, randomBoolean()); @@ -652,13 +662,13 @@ public void testUpdateExecutionInfoToReturnEmptyResult() { assertNull(executionInfo.overallTook()); - EsqlSessionCCSUtils.updateExecutionInfoToReturnEmptyResult(executionInfo, transportEx); + EsqlCCSUtils.updateExecutionInfoToReturnEmptyResult(executionInfo, transportEx); assertNotNull(executionInfo.overallTook()); - assertThat(executionInfo.getCluster(localClusterAlias).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); - assertThat(executionInfo.getCluster(localClusterAlias).getFailures().size(), equalTo(0)); + assertThat(executionInfo.getCluster(LOCAL_CLUSTER_ALIAS).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(executionInfo.getCluster(LOCAL_CLUSTER_ALIAS).getFailures().size(), equalTo(0)); - for (String remoteAlias : Set.of(remote1Alias, remote2Alias, remote3Alias)) { + for (String remoteAlias : Set.of(REMOTE1_ALIAS, REMOTE2_ALIAS, remote3Alias)) { assertThat(executionInfo.getCluster(remoteAlias).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); List remoteFailures = executionInfo.getCluster(remoteAlias).getFailures(); assertThat(remoteFailures.size(), equalTo(1)); @@ -667,11 +677,11 @@ public void testUpdateExecutionInfoToReturnEmptyResult() { } public void testConcreteIndexRequested() { - assertThat(EsqlSessionCCSUtils.concreteIndexRequested("logs*"), equalTo(false)); - assertThat(EsqlSessionCCSUtils.concreteIndexRequested("mylogs1,mylogs2,logs*"), equalTo(true)); - assertThat(EsqlSessionCCSUtils.concreteIndexRequested("x*,logs"), equalTo(true)); - assertThat(EsqlSessionCCSUtils.concreteIndexRequested("logs,metrics"), equalTo(true)); - assertThat(EsqlSessionCCSUtils.concreteIndexRequested("*"), equalTo(false)); + assertThat(EsqlCCSUtils.concreteIndexRequested("logs*"), equalTo(false)); + assertThat(EsqlCCSUtils.concreteIndexRequested("mylogs1,mylogs2,logs*"), equalTo(true)); + assertThat(EsqlCCSUtils.concreteIndexRequested("x*,logs"), equalTo(true)); + assertThat(EsqlCCSUtils.concreteIndexRequested("logs,metrics"), equalTo(true)); + assertThat(EsqlCCSUtils.concreteIndexRequested("*"), equalTo(false)); } public void testCheckForCcsLicense() { @@ -758,6 +768,64 @@ public void testCheckForCcsLicense() { } } + public void testShouldIgnoreRuntimeError() { + Predicate skipUnPredicate = s -> s.equals(REMOTE1_ALIAS); + + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, true); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true)); + executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", false)); + + // remote1: skip_unavailable=true, so should ignore connect errors, but not others + assertThat( + shouldIgnoreRuntimeError(executionInfo, REMOTE1_ALIAS, new IllegalStateException("Unable to open any connections")), + is(true) + ); + assertThat(shouldIgnoreRuntimeError(executionInfo, REMOTE1_ALIAS, new TaskCancelledException("task cancelled")), is(false)); + assertThat(shouldIgnoreRuntimeError(executionInfo, REMOTE1_ALIAS, new ElasticsearchException("something is wrong")), is(false)); + // remote2: skip_unavailable=false, so should not ignore any errors + assertThat( + shouldIgnoreRuntimeError(executionInfo, REMOTE2_ALIAS, new IllegalStateException("Unable to open any connections")), + is(false) + ); + assertThat(shouldIgnoreRuntimeError(executionInfo, REMOTE2_ALIAS, new TaskCancelledException("task cancelled")), is(false)); + // same for local + assertThat( + shouldIgnoreRuntimeError(executionInfo, LOCAL_CLUSTER_ALIAS, new IllegalStateException("Unable to open any connections")), + is(false) + ); + assertThat(shouldIgnoreRuntimeError(executionInfo, LOCAL_CLUSTER_ALIAS, new TaskCancelledException("task cancelled")), is(false)); + } + + public void testSkipUnavailableListener() { + Predicate skipUnPredicate = s -> s.equals(REMOTE1_ALIAS); + + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, true); + executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false)); + executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true)); + executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", false)); + + ActionListener expectResult = ActionListener.wrap(unused -> {}, (e) -> fail("Listener should not have failed")); + ActionListener expectFailure = ActionListener.wrap(unused -> fail("Listener should have failed"), (e) -> {}); + + // snip_unavailable=true but not connect exception, so should fail + skipUnavailableListener(expectFailure, executionInfo, REMOTE1_ALIAS, EsqlExecutionInfo.Cluster.Status.PARTIAL).onFailure( + new ElasticsearchException("something is wrong") + ); + assertThat(executionInfo.getCluster(REMOTE1_ALIAS).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING)); + + // snip_unavailable=true, so should not fail + skipUnavailableListener(expectResult, executionInfo, REMOTE1_ALIAS, EsqlExecutionInfo.Cluster.Status.PARTIAL).onFailure( + new IllegalStateException("Unable to open any connections") + ); + assertThat(executionInfo.getCluster(REMOTE1_ALIAS).getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + // snip_unavailable=false, so should fail + skipUnavailableListener(expectFailure, executionInfo, REMOTE2_ALIAS, EsqlExecutionInfo.Cluster.Status.PARTIAL).onFailure( + new IllegalStateException("Unable to open any connections") + ); + + } + private XPackLicenseStatus activeLicenseStatus(License.OperationMode operationMode) { return new XPackLicenseStatus(operationMode, true, null); }