From 436a831838f7534d3b5bb8ea87039b7524c7c1e4 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Wed, 9 Apr 2025 14:20:43 -0600 Subject: [PATCH 01/22] Fix missing index handling for partial-enabled queries --- ...CrossClusterQueryWithPartialResultsIT.java | 40 ++++++++++--------- .../esql/plugin/ClusterComputeHandler.java | 20 +++++++--- .../xpack/esql/plugin/ComputeService.java | 5 ++- 3 files changed, 39 insertions(+), 26 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java index 463dbb81304a1..810a00c4e06e7 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java @@ -135,29 +135,31 @@ public void testPartialResults() throws Exception { } } - public void testOneRemoteClusterPartial() throws Exception { + public void testLocalIndexMissing() throws Exception { populateIndices(); EsqlQueryRequest request = new EsqlQueryRequest(); - request.query("FROM ok*,cluster-a:ok*,*-b:fail* | KEEP id, fail_me"); - request.allowPartialResults(true); + request.query("FROM ok-local,no_such_index | LIMIT 1"); request.includeCCSMetadata(randomBoolean()); - try (var resp = runQuery(request)) { - assertTrue(resp.isPartial()); - Set allIds = Stream.of(local.okIds, remote1.okIds).flatMap(Collection::stream).collect(Collectors.toSet()); - List> rows = getValuesList(resp); - assertThat(rows.size(), equalTo(allIds.size())); - Set returnedIds = new HashSet<>(); - for (List row : rows) { - assertThat(row.size(), equalTo(2)); - String id = (String) row.get(0); - assertTrue(returnedIds.add(id)); - } - assertThat(returnedIds, equalTo(allIds)); + for (boolean allowPartial : Set.of(true, false)) { + request.allowPartialResults(allowPartial); + Exception error = expectThrows(Exception.class, () -> runQuery(request).close()); + error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error); + assertThat(error.getMessage(), containsString("no such index")); + assertThat(error.getMessage(), containsString("[no_such_index]")); + } + } - assertClusterSuccess(resp, LOCAL_CLUSTER, local.okShards); - assertClusterSuccess(resp, REMOTE_CLUSTER_1, remote1.okShards); - assertClusterPartial(resp, REMOTE_CLUSTER_2, remote2.failingShards, 0); - assertClusterFailure(resp, REMOTE_CLUSTER_2, "Accessing failing field"); + public void testRemoteIndexMissing() throws Exception { + populateIndices(); + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM cluster-a:ok-cluster1,cluster-a:no_such_index | LIMIT 1"); + request.includeCCSMetadata(randomBoolean()); + for (boolean allowPartial : Set.of(true, false)) { + request.allowPartialResults(allowPartial); + Exception error = expectThrows(Exception.class, () -> runQuery(request).close()); + error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error); + assertThat(error.getMessage(), containsString("no such index")); + assertThat(error.getMessage(), containsString("[no_such_index]")); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java index 5ad81177a6a44..bf25a781610c0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.plugin; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.OriginalIndices; @@ -16,6 +17,7 @@ import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancelledException; @@ -88,12 +90,18 @@ void startComputeOnRemoteCluster( if (receivedResults == false && EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) { EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e); l.onResponse(List.of()); - } else if (configuration.allowPartialResults()) { - EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL, e); - l.onResponse(List.of()); - } else { - l.onFailure(e); - } + } else if (configuration.allowPartialResults() + && (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) == false) { + EsqlCCSUtils.markClusterWithFinalStateAndNoShards( + executionInfo, + clusterAlias, + EsqlExecutionInfo.Cluster.Status.PARTIAL, + e + ); + l.onResponse(List.of()); + } else { + l.onFailure(e); + } }); ExchangeService.openExchange( transportService, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 5724587f0573b..3ccb22fdfee78 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.plugin; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchRequest; @@ -27,6 +28,7 @@ import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; @@ -314,7 +316,8 @@ public void execute( ); dataNodesListener.onResponse(r.getProfiles()); }, e -> { - if (configuration.allowPartialResults()) { + if (configuration.allowPartialResults() + && (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) == false) { execInfo.swapCluster( LOCAL_CLUSTER, (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus( From 39141cc7cca72501a2f652930780750ed294e71c Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Wed, 9 Apr 2025 14:31:24 -0600 Subject: [PATCH 02/22] Oops shouldn't delete that --- ...CrossClusterQueryWithPartialResultsIT.java | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java index 810a00c4e06e7..c6d45486549e2 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java @@ -135,6 +135,32 @@ public void testPartialResults() throws Exception { } } + public void testOneRemoteClusterPartial() throws Exception { + populateIndices(); + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM ok*,cluster-a:ok*,*-b:fail* | KEEP id, fail_me"); + request.allowPartialResults(true); + request.includeCCSMetadata(randomBoolean()); + try (var resp = runQuery(request)) { + assertTrue(resp.isPartial()); + Set allIds = Stream.of(local.okIds, remote1.okIds).flatMap(Collection::stream).collect(Collectors.toSet()); + List> rows = getValuesList(resp); + assertThat(rows.size(), equalTo(allIds.size())); + Set returnedIds = new HashSet<>(); + for (List row : rows) { + assertThat(row.size(), equalTo(2)); + String id = (String) row.get(0); + assertTrue(returnedIds.add(id)); + } + assertThat(returnedIds, equalTo(allIds)); + + assertClusterSuccess(resp, LOCAL_CLUSTER, local.okShards); + assertClusterSuccess(resp, REMOTE_CLUSTER_1, remote1.okShards); + assertClusterPartial(resp, REMOTE_CLUSTER_2, remote2.failingShards, 0); + assertClusterFailure(resp, REMOTE_CLUSTER_2, "Accessing failing field"); + } + } + public void testLocalIndexMissing() throws Exception { populateIndices(); EsqlQueryRequest request = new EsqlQueryRequest(); From d3421a1b1c51be55e3510bad506f05799c629512 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Fri, 11 Apr 2025 13:28:52 -0600 Subject: [PATCH 03/22] Set ignore_unavaliable to false --- .../org/elasticsearch/xpack/esql/session/IndexResolver.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java index d2f79ceb1316f..a378949edea2a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java @@ -56,7 +56,7 @@ public class IndexResolver { public static final String UNMAPPED = "unmapped"; public static final IndicesOptions FIELD_CAPS_INDICES_OPTIONS = IndicesOptions.builder() - .concreteTargetOptions(IndicesOptions.ConcreteTargetOptions.ALLOW_UNAVAILABLE_TARGETS) + .concreteTargetOptions(IndicesOptions.ConcreteTargetOptions.ERROR_WHEN_UNAVAILABLE_TARGETS) .wildcardOptions( IndicesOptions.WildcardOptions.builder() .matchOpen(true) From 06ee627464ca12ae4eee98659cfd846671372f3e Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Fri, 11 Apr 2025 18:20:05 -0600 Subject: [PATCH 04/22] Revert --- .../org/elasticsearch/xpack/esql/session/IndexResolver.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java index a378949edea2a..d2f79ceb1316f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java @@ -56,7 +56,7 @@ public class IndexResolver { public static final String UNMAPPED = "unmapped"; public static final IndicesOptions FIELD_CAPS_INDICES_OPTIONS = IndicesOptions.builder() - .concreteTargetOptions(IndicesOptions.ConcreteTargetOptions.ERROR_WHEN_UNAVAILABLE_TARGETS) + .concreteTargetOptions(IndicesOptions.ConcreteTargetOptions.ALLOW_UNAVAILABLE_TARGETS) .wildcardOptions( IndicesOptions.WildcardOptions.builder() .matchOpen(true) From bb890ec159db403f21d5fd46d86f18cbdd9071b0 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Fri, 11 Apr 2025 18:23:31 -0600 Subject: [PATCH 05/22] Update docs/changelog/126738.yaml --- docs/changelog/126738.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/126738.yaml diff --git a/docs/changelog/126738.yaml b/docs/changelog/126738.yaml new file mode 100644 index 0000000000000..6ad81ea204c47 --- /dev/null +++ b/docs/changelog/126738.yaml @@ -0,0 +1,5 @@ +pr: 126738 +summary: Fix missing index exception +area: ES|QL +type: bug +issues: [] From 9f05ded3f00d7cae97ffc29fd0fbc8c8a89463a9 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Mon, 14 Apr 2025 15:57:20 -0600 Subject: [PATCH 06/22] Test for CCS with filters --- .../CrossClusterQueryWithFiltersIT.java | 203 ++++++++++++++++++ 1 file changed, 203 insertions(+) create mode 100644 x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java new file mode 100644 index 0000000000000..6252c0f21fa33 --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java @@ -0,0 +1,203 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.RangeQueryBuilder; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class CrossClusterQueryWithFiltersIT extends AbstractCrossClusterTestCase { + @Override + protected Map skipUnavailableForRemoteClusters() { + return Map.of(REMOTE_CLUSTER_1, false, REMOTE_CLUSTER_2, false); + } + + protected void assertClusterMetadataSuccess(EsqlExecutionInfo.Cluster clusterMetatata, int shards, long took, String indexExpression) { + assertThat(clusterMetatata.getIndexExpression(), equalTo(indexExpression)); + assertThat(clusterMetatata.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(clusterMetatata.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(clusterMetatata.getTook().millis(), lessThanOrEqualTo(took)); + assertThat(clusterMetatata.getTotalShards(), equalTo(shards)); + assertThat(clusterMetatata.getSuccessfulShards(), equalTo(shards)); + assertThat(clusterMetatata.getSkippedShards(), equalTo(0)); + assertThat(clusterMetatata.getFailedShards(), equalTo(0)); + } + + protected void assertClusterMetadataNoShards(EsqlExecutionInfo.Cluster clusterMetatata, int shards, long took, String indexExpression) { + assertThat(clusterMetatata.getIndexExpression(), equalTo(indexExpression)); + assertThat(clusterMetatata.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(clusterMetatata.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(clusterMetatata.getTook().millis(), lessThanOrEqualTo(took)); + assertThat(clusterMetatata.getTotalShards(), equalTo(0)); + assertThat(clusterMetatata.getSuccessfulShards(), equalTo(0)); + assertThat(clusterMetatata.getSkippedShards(), equalTo(0)); + assertThat(clusterMetatata.getFailedShards(), equalTo(0)); + } + + protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse, QueryBuilder filter) { + EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); + request.query(query); + request.pragmas(AbstractEsqlIntegTestCase.randomPragmas()); + request.profile(randomInt(5) == 2); + request.columnar(randomBoolean()); + if (ccsMetadataInResponse != null) { + request.includeCCSMetadata(ccsMetadataInResponse); + } + if (filter != null) { + request.filter(filter); + } + return runQuery(request); + } + + public void testTimestampFilterFromQuery() throws IOException { + int docsTest1 = 50; + int docsTest2 = 30; + int localShards = randomIntBetween(1, 5); + int remoteShards = randomIntBetween(1, 5); + populateDateIndex(LOCAL_CLUSTER, LOCAL_INDEX, localShards, docsTest1, "2024-11-26"); + populateDateIndex(REMOTE_CLUSTER_1, REMOTE_INDEX, remoteShards, docsTest2, "2023-11-26"); + + // Both indices are included + var filter = new RangeQueryBuilder("@timestamp").from("2023-01-01").to("now"); + try (EsqlQueryResponse resp = runQuery("from logs-1,cluster-a:logs-2", randomBoolean(), filter)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(docsTest1 + docsTest2)); + assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a")); + + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER))); + + EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertClusterMetadataSuccess(remoteCluster, remoteShards, overallTookMillis, "logs-2"); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertClusterMetadataSuccess(localCluster, localShards, overallTookMillis, "logs-1"); + } + + // Only local is included + filter = new RangeQueryBuilder("@timestamp").from("2024-01-01").to("now"); + try (EsqlQueryResponse resp = runQuery("from logs-1,cluster-a:logs-2", randomBoolean(), filter)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(docsTest1)); +// assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a")); + + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER))); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertClusterMetadataSuccess(localCluster, localShards, overallTookMillis, "logs-1"); + + EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertClusterMetadataNoShards(remoteCluster, remoteShards, overallTookMillis, "logs-2"); + } + + // + // // Only local is included + // filter = new RangeQueryBuilder("@timestamp").from("2024-01-01").to("now"); + // try (EsqlQueryResponse resp = runQuery("from logs-*,c*:logs-*", randomBoolean(), filter)) { + // List> values = getValuesList(resp); + // assertThat(values, hasSize(docsTest1)); + // + // EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + // assertNotNull(executionInfo); + // assertThat(executionInfo.isCrossClusterSearch(), is(true)); + // long overallTookMillis = executionInfo.overallTook().millis(); + // assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + // + // assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER))); + // + // EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + // // Remote has no shards due to filter + // assertClusterMetadataNoShards(remoteCluster, remoteShards, overallTookMillis, "logs-*"); + // + // EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + // assertClusterMetadataSuccess(localCluster, localShards, overallTookMillis, "logs-*"); + // } + + // Both indices are filtered out + // var filter = new RangeQueryBuilder("@timestamp").from("2025-01-01").to("now"); + // try (EsqlQueryResponse resp = runQuery("from logs-*,c*:logs-*", randomBoolean(), filter)) { + // List> values = getValuesList(resp); + // assertThat(values, hasSize(0)); + // assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a")); + // + // EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + // assertNotNull(executionInfo); + // assertThat(executionInfo.isCrossClusterSearch(), is(true)); + // long overallTookMillis = executionInfo.overallTook().millis(); + // assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + // + // assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER))); + // + // EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + // // Remote has no shards due to filter + // assertClusterMetadataNoShards(remoteCluster, remoteShards, overallTookMillis, "logs-*"); + // + // EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + // assertClusterMetadataNoShards(localCluster, localShards, overallTookMillis, "logs-*"); + // } + + } + + protected void populateDateIndex(String clusterAlias, String indexName, int numShards, int numDocs, String date) { + Client client = client(clusterAlias); + String tag = Strings.isEmpty(clusterAlias) ? "local" : clusterAlias; + assertAcked( + client.admin() + .indices() + .prepareCreate(indexName) + .setSettings(Settings.builder().put("index.number_of_shards", numShards)) + .setMapping( + "id", + "type=keyword", + "tag-" + tag, + "type=keyword", + "v", + "type=long", + "const", + "type=long", + "@timestamp", + "type=date" + ) + ); + Set ids = new HashSet<>(); + for (int i = 0; i < numDocs; i++) { + String id = Long.toString(i); + client.prepareIndex(indexName).setSource("id", id, "tag-" + tag, tag, "v", i, "@timestamp", date).get(); + } + client.admin().indices().prepareRefresh(indexName).get(); + } + +} From 6f348b9a5a7aeb7292d12d43ae48ab7b6e297e79 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Tue, 15 Apr 2025 00:58:52 +0000 Subject: [PATCH 07/22] [CI] Auto commit changes from spotless --- .../xpack/esql/action/CrossClusterQueryWithFiltersIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java index 6252c0f21fa33..5424e06cf4467 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java @@ -106,7 +106,7 @@ public void testTimestampFilterFromQuery() throws IOException { try (EsqlQueryResponse resp = runQuery("from logs-1,cluster-a:logs-2", randomBoolean(), filter)) { List> values = getValuesList(resp); assertThat(values, hasSize(docsTest1)); -// assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a")); + // assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a")); EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); assertNotNull(executionInfo); From 7313daf2de7fbaf867b7bf8f6fd10407769b9841 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Thu, 17 Apr 2025 17:08:43 -0600 Subject: [PATCH 08/22] Partial fix for CCS/filters problems --- .../CrossClusterQueryWithFiltersIT.java | 195 +++++++++++++----- .../xpack/esql/action/ColumnInfoImpl.java | 4 + .../xpack/esql/action/EsqlExecutionInfo.java | 1 + .../xpack/esql/plugin/ComputeService.java | 4 + .../xpack/esql/session/EsqlCCSUtils.java | 36 +++- .../xpack/esql/session/EsqlSession.java | 14 +- 6 files changed, 197 insertions(+), 57 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java index 5424e06cf4467..6679d7bf7240d 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java @@ -12,8 +12,9 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.xpack.esql.VerificationException; -import java.io.IOException; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -21,6 +22,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasItems; @@ -28,6 +30,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; +@TestLogging(value = "org.elasticsearch.xpack.esql:DEBUG", reason = "debug") public class CrossClusterQueryWithFiltersIT extends AbstractCrossClusterTestCase { @Override protected Map skipUnavailableForRemoteClusters() { @@ -56,6 +59,17 @@ protected void assertClusterMetadataNoShards(EsqlExecutionInfo.Cluster clusterMe assertThat(clusterMetatata.getFailedShards(), equalTo(0)); } + protected void assertClusterMetadataSkipped(EsqlExecutionInfo.Cluster clusterMetatata, int shards, long took, String indexExpression) { + assertThat(clusterMetatata.getIndexExpression(), equalTo(indexExpression)); + assertThat(clusterMetatata.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(clusterMetatata.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(clusterMetatata.getTook().millis(), lessThanOrEqualTo(took)); + assertThat(clusterMetatata.getTotalShards(), equalTo(shards)); + assertThat(clusterMetatata.getSuccessfulShards(), equalTo(shards)); + assertThat(clusterMetatata.getSkippedShards(), equalTo(shards)); + assertThat(clusterMetatata.getFailedShards(), equalTo(0)); + } + protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse, QueryBuilder filter) { EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); request.query(query); @@ -71,7 +85,7 @@ protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse return runQuery(request); } - public void testTimestampFilterFromQuery() throws IOException { + public void testTimestampFilterFromQuery() { int docsTest1 = 50; int docsTest2 = 30; int localShards = randomIntBetween(1, 5); @@ -106,7 +120,7 @@ public void testTimestampFilterFromQuery() throws IOException { try (EsqlQueryResponse resp = runQuery("from logs-1,cluster-a:logs-2", randomBoolean(), filter)) { List> values = getValuesList(resp); assertThat(values, hasSize(docsTest1)); - // assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a")); + assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a")); EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); assertNotNull(executionInfo); @@ -123,52 +137,141 @@ public void testTimestampFilterFromQuery() throws IOException { assertClusterMetadataNoShards(remoteCluster, remoteShards, overallTookMillis, "logs-2"); } - // - // // Only local is included - // filter = new RangeQueryBuilder("@timestamp").from("2024-01-01").to("now"); - // try (EsqlQueryResponse resp = runQuery("from logs-*,c*:logs-*", randomBoolean(), filter)) { - // List> values = getValuesList(resp); - // assertThat(values, hasSize(docsTest1)); - // - // EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); - // assertNotNull(executionInfo); - // assertThat(executionInfo.isCrossClusterSearch(), is(true)); - // long overallTookMillis = executionInfo.overallTook().millis(); - // assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); - // - // assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER))); - // - // EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); - // // Remote has no shards due to filter - // assertClusterMetadataNoShards(remoteCluster, remoteShards, overallTookMillis, "logs-*"); - // - // EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); - // assertClusterMetadataSuccess(localCluster, localShards, overallTookMillis, "logs-*"); - // } + // Only remote is included + filter = new RangeQueryBuilder("@timestamp").from("2023-01-01").to("2024-01-01"); + try (EsqlQueryResponse resp = runQuery("from logs-1,cluster-a:logs-2", randomBoolean(), filter)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(docsTest2)); + assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a")); + + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER))); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertClusterMetadataSkipped(localCluster, localShards, overallTookMillis, "logs-1"); + + EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertClusterMetadataSuccess(remoteCluster, remoteShards, overallTookMillis, "logs-2"); + } + + // Only local is included - wildcards + filter = new RangeQueryBuilder("@timestamp").from("2024-01-01").to("now"); + try (EsqlQueryResponse resp = runQuery("from logs-*,c*:logs-*", randomBoolean(), filter)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(docsTest1)); + // assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a")); + + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER))); + + EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertClusterMetadataNoShards(remoteCluster, remoteShards, overallTookMillis, "logs-*"); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + assertClusterMetadataSuccess(localCluster, localShards, overallTookMillis, "logs-*"); + } // Both indices are filtered out - // var filter = new RangeQueryBuilder("@timestamp").from("2025-01-01").to("now"); - // try (EsqlQueryResponse resp = runQuery("from logs-*,c*:logs-*", randomBoolean(), filter)) { - // List> values = getValuesList(resp); - // assertThat(values, hasSize(0)); - // assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a")); - // - // EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); - // assertNotNull(executionInfo); - // assertThat(executionInfo.isCrossClusterSearch(), is(true)); - // long overallTookMillis = executionInfo.overallTook().millis(); - // assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); - // - // assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER))); - // - // EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); - // // Remote has no shards due to filter - // assertClusterMetadataNoShards(remoteCluster, remoteShards, overallTookMillis, "logs-*"); - // - // EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); - // assertClusterMetadataNoShards(localCluster, localShards, overallTookMillis, "logs-*"); - // } + filter = new RangeQueryBuilder("@timestamp").from("2025-01-01").to("now"); + try (EsqlQueryResponse resp = runQuery("from logs-1,c*:logs-2", randomBoolean(), filter)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(0)); + // assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a")); + + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER))); + + EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + // Remote has no shards due to filter + assertClusterMetadataNoShards(remoteCluster, remoteShards, overallTookMillis, "logs-2"); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + // Local cluster can not be filtered out for now + assertClusterMetadataSkipped(localCluster, localShards, overallTookMillis, "logs-1"); + } + + // Both indices are filtered out - wildcards + filter = new RangeQueryBuilder("@timestamp").from("2025-01-01").to("now"); + try (EsqlQueryResponse resp = runQuery("from logs-*,c*:logs-*", randomBoolean(), filter)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(0)); + // assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a")); + + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER))); + + EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + // Remote has no shards due to filter + assertClusterMetadataNoShards(remoteCluster, remoteShards, overallTookMillis, "logs-*"); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + // Local cluster can not be filtered out for now + assertClusterMetadataSkipped(localCluster, localShards, overallTookMillis, "logs-*"); + } + + } + public void testFilterWithMissingIndex() { + int docsTest1 = 50; + int docsTest2 = 30; + int localShards = randomIntBetween(1, 5); + int remoteShards = randomIntBetween(1, 5); + populateDateIndex(LOCAL_CLUSTER, LOCAL_INDEX, localShards, docsTest1, "2024-11-26"); + populateDateIndex(REMOTE_CLUSTER_1, REMOTE_INDEX, remoteShards, docsTest2, "2023-11-26"); + + int docSize = docsTest1; + for (var filter : List.of( + new RangeQueryBuilder("@timestamp").from("2024-01-01").to("now"), + new RangeQueryBuilder("@timestamp").from("2025-01-01").to("now") + )) { + // Local index missing + VerificationException e = expectThrows( + VerificationException.class, + () -> runQuery("from missing", randomBoolean(), filter).close() + ); + assertThat(e.getDetailedMessage(), containsString("Unknown index [missing]")); + // Local index missing + wildcards + // FIXME: planner does not catch this now + // e = expectThrows(VerificationException.class, () -> runQuery("from missing,logs*", randomBoolean(), filter).close()); + // assertThat(e.getDetailedMessage(), containsString("Unknown index [missing]")); + // Local index missing + existing index + // FIXME: planner does not catch this now + // e = expectThrows(VerificationException.class, () -> runQuery("from missing,logs-1", randomBoolean(), filter).close()); + // assertThat(e.getDetailedMessage(), containsString("Unknown index [missing]")); + // Local index missing + existing remote + e = expectThrows(VerificationException.class, () -> runQuery("from missing,c*:logs-2", randomBoolean(), filter).close()); + assertThat(e.getDetailedMessage(), containsString("Unknown index [missing]")); + // Wildcard index missing + e = expectThrows(VerificationException.class, () -> runQuery("from missing*", randomBoolean(), filter).close()); + assertThat(e.getDetailedMessage(), containsString("Unknown index [missing*]")); + // Wildcard index missing + existing index + try (EsqlQueryResponse resp = runQuery("from missing*,logs-1", randomBoolean(), filter)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(docSize)); + // for the second round + docSize = 0; + } + } } protected void populateDateIndex(String clusterAlias, String indexName, int numShards, int numDocs, String date) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ColumnInfoImpl.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ColumnInfoImpl.java index b37c1a30ae0a3..8de0996e32041 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ColumnInfoImpl.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ColumnInfoImpl.java @@ -132,4 +132,8 @@ public DataType type() { public List originalTypes() { return originalTypes; } + + public String toString() { + return "ColumnInfoImpl{" + "name='" + name + '\'' + ", type=" + type + ", originalTypes=" + originalTypes + '}'; + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index 9726782a35d05..640e13b45a7e6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -384,6 +384,7 @@ public static class Cluster implements ToXContentFragment, Writeable { public enum Status { RUNNING, // still running SUCCESSFUL, // all shards completed search + FILTERED, // all shards were filtered out PARTIAL, // only some shards completed the search, partial results from cluster SKIPPED, // entire cluster was skipped FAILED; // search was failed due to errors on this cluster diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 4332e4d6e7054..16c2d25a4e835 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -336,6 +336,10 @@ public void execute( // starts computes on remote clusters final var remoteClusters = clusterComputeHandler.getRemoteClusters(clusterToConcreteIndices, clusterToOriginalIndices); for (ClusterComputeHandler.RemoteCluster cluster : remoteClusters) { + if (execInfo.getCluster(cluster.clusterAlias()).getStatus() != EsqlExecutionInfo.Cluster.Status.RUNNING) { + // if the cluster is already in the terminal state, no need to call it + continue; + } clusterComputeHandler.startComputeOnRemoteCluster( sessionId, rootTask, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index fd20911184ae8..e7a92de1a7899 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.transport.ConnectTransportException; @@ -145,7 +146,7 @@ static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo execu StringBuilder sb = new StringBuilder(); for (String clusterAlias : executionInfo.clusterAliases()) { EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias); - if (cluster.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) { + if (cluster.getStatus() != Cluster.Status.SKIPPED && cluster.getStatus() != Cluster.Status.SUCCESSFUL) { if (cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { sb.append(executionInfo.getCluster(clusterAlias).getIndexExpression()).append(','); } else { @@ -180,7 +181,11 @@ static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInf } } - static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) { + static void updateExecutionInfoWithClustersWithNoMatchingIndices( + EsqlExecutionInfo executionInfo, + IndexResolution indexResolution, + QueryBuilder filter + ) { Set clustersWithResolvedIndices = new HashSet<>(); // determine missing clusters for (String indexName : indexResolution.resolvedIndices()) { @@ -202,8 +207,9 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn * Mark it as SKIPPED with 0 shards searched and took=0. */ for (String c : clustersWithNoMatchingIndices) { - if (executionInfo.getCluster(c).getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) { - // if cluster was already marked SKIPPED during enrich policy resolution, do not overwrite + if (executionInfo.getCluster(c).getStatus() != Cluster.Status.RUNNING + && executionInfo.getCluster(c).getStatus() != Cluster.Status.FILTERED) { + // if cluster was already in the terminal state and not filtered, do not overwrite continue; } final String indexExpression = executionInfo.getCluster(c).getIndexExpression(); @@ -217,9 +223,21 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn } else { fatalErrorMessage += "; " + error; } + if (filter != null) { + // Use filtered status here because we still need to check it on the second lookup without the filter + // to ensure this index is real. + markClusterWithFinalStateAndNoShards(executionInfo, c, Cluster.Status.FILTERED, null); + } else { + markClusterWithFinalStateAndNoShards(executionInfo, c, Cluster.Status.FAILED, new VerificationException(error)); + } } else { // no matching indices and no concrete index requested - just mark it as done, no error - markClusterWithFinalStateAndNoShards(executionInfo, c, Cluster.Status.SUCCESSFUL, null); + markClusterWithFinalStateAndNoShards( + executionInfo, + c, + filter != null ? Cluster.Status.FILTERED : Cluster.Status.SUCCESSFUL, + null + ); } } if (fatalErrorMessage != null) { @@ -227,6 +245,11 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn } } + // Filter-less version + static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) { + updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution, null); + } + // visible for testing static boolean concreteIndexRequested(String indexExpression) { if (Strings.isNullOrBlank(indexExpression)) { @@ -262,6 +285,9 @@ static void updateExecutionInfoAtEndOfPlanning(EsqlExecutionInfo execInfo) { .build() ); } + if (cluster.getStatus() == Cluster.Status.FILTERED) { + markClusterWithFinalStateAndNoShards(execInfo, clusterAlias, Cluster.Status.SUCCESSFUL, null); + } } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 9330df751c00d..71a9f188eba10 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -380,15 +380,13 @@ public void analyzedPlan( }).andThen((l, result) -> { // TODO in follow-PR (for skip_unavailable handling of missing concrete indexes) add some tests for // invalid index resolution to updateExecutionInfo - if (result.indices.isValid()) { - // CCS indices and skip_unavailable cluster values can stop the analysis right here - if (allCCSClustersSkipped(executionInfo, result, logicalPlanListener)) return; - } + // If we run out of clusters to search due to unavailability we can stop the analysis right here + if (allCCSClustersSkipped(executionInfo, result, logicalPlanListener)) return; // whatever tuple we have here (from CCS-special handling or from the original pre-analysis), pass it on to the next step l.onResponse(result); }).andThen((l, result) -> { // first attempt (maybe the only one) at analyzing the plan - analyzeAndMaybeRetry(analyzeAction, requestFilter, result, logicalPlanListener, l); + analyzeAndMaybeRetry(analyzeAction, requestFilter, result, executionInfo, logicalPlanListener, l); }).andThen((l, result) -> { assert requestFilter != null : "The second pre-analysis shouldn't take place when there is no index filter in the request"; @@ -399,6 +397,8 @@ public void analyzedPlan( LOGGER.debug("Analyzing the plan (second attempt, without filter)"); LogicalPlan plan; try { + EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, null); + EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, result.indices.unavailableClusters()); plan = analyzeAction.apply(result); } catch (Exception e) { l.onFailure(e); @@ -508,12 +508,12 @@ private boolean allCCSClustersSkipped( ActionListener logicalPlanListener ) { IndexResolution indexResolution = result.indices; - EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution); EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.unavailableClusters()); if (executionInfo.isCrossClusterSearch() && executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING).findAny().isEmpty()) { // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception // to let the LogicalPlanActionListener decide how to proceed + LOGGER.debug("No more clusters to search, ending analysis stage"); logicalPlanListener.onFailure(new NoClustersToSearchException()); return true; } @@ -525,6 +525,7 @@ private static void analyzeAndMaybeRetry( Function analyzeAction, QueryBuilder requestFilter, PreAnalysisResult result, + EsqlExecutionInfo executionInfo, ActionListener logicalPlanListener, ActionListener l ) { @@ -534,6 +535,7 @@ private static void analyzeAndMaybeRetry( LOGGER.debug("Analyzing the plan ({} attempt, {} filter)", attemptMessage, filterPresentMessage); try { + EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, requestFilter); plan = analyzeAction.apply(result); } catch (Exception e) { if (e instanceof VerificationException ve) { From fb7fb6a1fa537df2e618538b52b36f18df75b3e9 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Thu, 17 Apr 2025 18:19:34 -0600 Subject: [PATCH 09/22] fix tests --- .../qa/rest/EsqlRestValidationTestCase.java | 2 +- .../CrossClusterQueryWithFiltersIT.java | 25 ++++++++++++++++--- .../xpack/esql/action/EsqlExecutionInfo.java | 2 +- .../xpack/esql/session/EsqlSession.java | 11 +++++--- 4 files changed, 31 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlRestValidationTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlRestValidationTestCase.java index 9ec4f60f4c843..18feb0d32a899 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlRestValidationTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlRestValidationTestCase.java @@ -74,7 +74,7 @@ public void wipeTestData() throws IOException { } private String getInexistentIndexErrorMessage() { - return "\"reason\" : \"Found 1 problem\\nline 1:1: Unknown index "; + return "Unknown index "; } public void testInexistentIndexNameWithWildcard() throws IOException { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java index 6679d7bf7240d..7391378283daa 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java @@ -59,7 +59,12 @@ protected void assertClusterMetadataNoShards(EsqlExecutionInfo.Cluster clusterMe assertThat(clusterMetatata.getFailedShards(), equalTo(0)); } - protected void assertClusterMetadataSkipped(EsqlExecutionInfo.Cluster clusterMetatata, int shards, long took, String indexExpression) { + protected void assertClusterMetadataSkippedShards( + EsqlExecutionInfo.Cluster clusterMetatata, + int shards, + long took, + String indexExpression + ) { assertThat(clusterMetatata.getIndexExpression(), equalTo(indexExpression)); assertThat(clusterMetatata.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); assertThat(clusterMetatata.getTook().millis(), greaterThanOrEqualTo(0L)); @@ -153,7 +158,7 @@ public void testTimestampFilterFromQuery() { assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER))); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); - assertClusterMetadataSkipped(localCluster, localShards, overallTookMillis, "logs-1"); + assertClusterMetadataSkippedShards(localCluster, localShards, overallTookMillis, "logs-1"); EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); assertClusterMetadataSuccess(remoteCluster, remoteShards, overallTookMillis, "logs-2"); @@ -202,7 +207,7 @@ public void testTimestampFilterFromQuery() { EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); // Local cluster can not be filtered out for now - assertClusterMetadataSkipped(localCluster, localShards, overallTookMillis, "logs-1"); + assertClusterMetadataSkippedShards(localCluster, localShards, overallTookMillis, "logs-1"); } // Both indices are filtered out - wildcards @@ -226,7 +231,7 @@ public void testTimestampFilterFromQuery() { EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); // Local cluster can not be filtered out for now - assertClusterMetadataSkipped(localCluster, localShards, overallTookMillis, "logs-*"); + assertClusterMetadataSkippedShards(localCluster, localShards, overallTookMillis, "logs-*"); } } @@ -274,6 +279,18 @@ public void testFilterWithMissingIndex() { } } + public void testFilterWithMissingRemoteIndex() { + // TODO + } + + public void testFilterWithUnavailableRemote() { + // TODO + } + + public void testFilterWithUnavailableRemoteAndSkipUnavailable() { + // TODO + } + protected void populateDateIndex(String clusterAlias, String indexName, int numShards, int numDocs, String date) { Client client = client(clusterAlias); String tag = Strings.isEmpty(clusterAlias) ? "local" : clusterAlias; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index 640e13b45a7e6..104bdb900cabf 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -384,7 +384,7 @@ public static class Cluster implements ToXContentFragment, Writeable { public enum Status { RUNNING, // still running SUCCESSFUL, // all shards completed search - FILTERED, // all shards were filtered out + FILTERED, // all shards were filtered out, temporary status for planning PARTIAL, // only some shards completed the search, partial results from cluster SKIPPED, // entire cluster was skipped FAILED; // search was failed due to errors on this cluster diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 71a9f188eba10..84262b5234b02 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -381,7 +381,7 @@ public void analyzedPlan( // TODO in follow-PR (for skip_unavailable handling of missing concrete indexes) add some tests for // invalid index resolution to updateExecutionInfo // If we run out of clusters to search due to unavailability we can stop the analysis right here - if (allCCSClustersSkipped(executionInfo, result, logicalPlanListener)) return; + if (result.indices.isValid() && allCCSClustersSkipped(executionInfo, result, logicalPlanListener)) return; // whatever tuple we have here (from CCS-special handling or from the original pre-analysis), pass it on to the next step l.onResponse(result); }).andThen((l, result) -> { @@ -397,8 +397,10 @@ public void analyzedPlan( LOGGER.debug("Analyzing the plan (second attempt, without filter)"); LogicalPlan plan; try { - EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, null); + // the order here is tricky - if the cluster has been filtered and later became unavailable, + // do we want to declare it successful or skipped? For now, unavailability takes precedence. EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, result.indices.unavailableClusters()); + EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, null); plan = analyzeAction.apply(result); } catch (Exception e) { l.onFailure(e); @@ -535,7 +537,10 @@ private static void analyzeAndMaybeRetry( LOGGER.debug("Analyzing the plan ({} attempt, {} filter)", attemptMessage, filterPresentMessage); try { - EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, requestFilter); + if (result.indices.isValid() || requestFilter != null) { + // Capture filtered out indices + EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, requestFilter); + } plan = analyzeAction.apply(result); } catch (Exception e) { if (e instanceof VerificationException ve) { From a51bf2a6e3c41a1e2e30cee5875843fa05da8d1f Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Fri, 18 Apr 2025 12:50:39 -0600 Subject: [PATCH 10/22] More tests --- .../CrossClusterQueryWithFiltersIT.java | 170 ++++++++++++++++-- 1 file changed, 160 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java index 7391378283daa..d9477df827749 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.action; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; @@ -15,6 +16,7 @@ import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.xpack.esql.VerificationException; +import java.io.IOException; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -37,6 +39,11 @@ protected Map skipUnavailableForRemoteClusters() { return Map.of(REMOTE_CLUSTER_1, false, REMOTE_CLUSTER_2, false); } + @Override + protected boolean reuseClusters() { + return false; + } + protected void assertClusterMetadataSuccess(EsqlExecutionInfo.Cluster clusterMetatata, int shards, long took, String indexExpression) { assertThat(clusterMetatata.getIndexExpression(), equalTo(indexExpression)); assertThat(clusterMetatata.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); @@ -75,6 +82,17 @@ protected void assertClusterMetadataSkippedShards( assertThat(clusterMetatata.getFailedShards(), equalTo(0)); } + protected void assertClusterMetadataSkipped(EsqlExecutionInfo.Cluster clusterMetatata, long took, String indexExpression) { + assertThat(clusterMetatata.getIndexExpression(), equalTo(indexExpression)); + assertThat(clusterMetatata.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + assertThat(clusterMetatata.getTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(clusterMetatata.getTook().millis(), lessThanOrEqualTo(took)); + assertThat(clusterMetatata.getTotalShards(), equalTo(0)); + assertThat(clusterMetatata.getSuccessfulShards(), equalTo(0)); + assertThat(clusterMetatata.getSkippedShards(), equalTo(0)); + assertThat(clusterMetatata.getFailedShards(), equalTo(0)); + } + protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse, QueryBuilder filter) { EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); request.query(query); @@ -244,11 +262,12 @@ public void testFilterWithMissingIndex() { populateDateIndex(LOCAL_CLUSTER, LOCAL_INDEX, localShards, docsTest1, "2024-11-26"); populateDateIndex(REMOTE_CLUSTER_1, REMOTE_INDEX, remoteShards, docsTest2, "2023-11-26"); - int docSize = docsTest1; + int count = 0; for (var filter : List.of( new RangeQueryBuilder("@timestamp").from("2024-01-01").to("now"), new RangeQueryBuilder("@timestamp").from("2025-01-01").to("now") )) { + count++; // Local index missing VerificationException e = expectThrows( VerificationException.class, @@ -264,7 +283,7 @@ public void testFilterWithMissingIndex() { // e = expectThrows(VerificationException.class, () -> runQuery("from missing,logs-1", randomBoolean(), filter).close()); // assertThat(e.getDetailedMessage(), containsString("Unknown index [missing]")); // Local index missing + existing remote - e = expectThrows(VerificationException.class, () -> runQuery("from missing,c*:logs-2", randomBoolean(), filter).close()); + e = expectThrows(VerificationException.class, () -> runQuery("from missing,cluster-a:logs-2", randomBoolean(), filter).close()); assertThat(e.getDetailedMessage(), containsString("Unknown index [missing]")); // Wildcard index missing e = expectThrows(VerificationException.class, () -> runQuery("from missing*", randomBoolean(), filter).close()); @@ -272,23 +291,154 @@ public void testFilterWithMissingIndex() { // Wildcard index missing + existing index try (EsqlQueryResponse resp = runQuery("from missing*,logs-1", randomBoolean(), filter)) { List> values = getValuesList(resp); - assertThat(values, hasSize(docSize)); - // for the second round - docSize = 0; + assertThat(values, hasSize(count > 1 ? 0 : docsTest1)); } } } public void testFilterWithMissingRemoteIndex() { - // TODO + int docsTest1 = 50; + int docsTest2 = 30; + int localShards = randomIntBetween(1, 5); + int remoteShards = randomIntBetween(1, 5); + populateDateIndex(LOCAL_CLUSTER, LOCAL_INDEX, localShards, docsTest1, "2024-11-26"); + populateDateIndex(REMOTE_CLUSTER_1, REMOTE_INDEX, remoteShards, docsTest2, "2023-11-26"); + + int count = 0; + for (var filter : List.of( + new RangeQueryBuilder("@timestamp").from("2023-01-01").to("now"), + new RangeQueryBuilder("@timestamp").from("2024-01-01").to("now"), + new RangeQueryBuilder("@timestamp").from("2025-01-01").to("now") + )) { + count++; + // Local index missing + VerificationException e = expectThrows( + VerificationException.class, + () -> runQuery("from cluster-a:missing", randomBoolean(), filter).close() + ); + assertThat(e.getDetailedMessage(), containsString("Unknown index [cluster-a:missing]")); + // Local index missing + wildcards + // FIXME: planner does not catch this now + // e = expectThrows(VerificationException.class, () -> runQuery("from cluster-a:missing,cluster-a:logs*", randomBoolean(), + // filter).close()); + // assertThat(e.getDetailedMessage(), containsString("Unknown index [cluster-a:missing]")); + // Local index missing + existing index + // FIXME: planner does not catch this now + // e = expectThrows(VerificationException.class, () -> runQuery("from cluster-a:missing,cluster-a:logs-2", randomBoolean(), + // filter).close()); + // assertThat(e.getDetailedMessage(), containsString("Unknown index [cluster-a:missing]")); + // Local index + missing remote + e = expectThrows(VerificationException.class, () -> runQuery("from logs-1,cluster-a:missing", randomBoolean(), filter).close()); + assertThat(e.getDetailedMessage(), containsString("Unknown index [cluster-a:missing]")); + // Wildcard index missing + e = expectThrows(VerificationException.class, () -> runQuery("from cluster-a:missing*", randomBoolean(), filter).close()); + assertThat(e.getDetailedMessage(), containsString("Unknown index [cluster-a:missing*]")); + // Wildcard index missing + existing remote index + try (EsqlQueryResponse resp = runQuery("from cluster-a:missing*,cluster-a:logs-2", randomBoolean(), filter)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(count > 1 ? 0 : docsTest2)); + } + // Wildcard index missing + existing local index + try (EsqlQueryResponse resp = runQuery("from cluster-a:missing*,logs-1", randomBoolean(), filter)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(count > 2 ? 0 : docsTest1)); + } + } } - public void testFilterWithUnavailableRemote() { - // TODO + public void testFilterWithUnavailableRemote() throws IOException { + int docsTest1 = 50; + int localShards = randomIntBetween(1, 5); + populateDateIndex(LOCAL_CLUSTER, LOCAL_INDEX, localShards, docsTest1, "2024-11-26"); + cluster(REMOTE_CLUSTER_1).close(); + + for (var filter : List.of( + new RangeQueryBuilder("@timestamp").from("2024-01-01").to("now"), + new RangeQueryBuilder("@timestamp").from("2025-01-01").to("now") + )) { + // One index + var e = expectThrows(ElasticsearchException.class, () -> runQuery("from cluster-a:log-2", randomBoolean(), filter).close()); + // Two indices + e = expectThrows(ElasticsearchException.class, () -> runQuery("from logs-1,cluster-a:log-2", randomBoolean(), filter).close()); + // Wildcard + e = expectThrows(ElasticsearchException.class, () -> runQuery("from logs-1,cluster-a:log*", randomBoolean(), filter).close()); + } } - public void testFilterWithUnavailableRemoteAndSkipUnavailable() { - // TODO + public void testFilterWithUnavailableRemoteAndSkipUnavailable() throws IOException { + setSkipUnavailable(REMOTE_CLUSTER_1, true); + int docsTest1 = 50; + int localShards = randomIntBetween(1, 5); + populateDateIndex(LOCAL_CLUSTER, LOCAL_INDEX, localShards, docsTest1, "2024-11-26"); + cluster(REMOTE_CLUSTER_1).close(); + int count = 0; + + for (var filter : List.of( + new RangeQueryBuilder("@timestamp").from("2024-01-01").to("now"), + new RangeQueryBuilder("@timestamp").from("2025-01-01").to("now") + )) { + count++; + // One index + try (EsqlQueryResponse resp = runQuery("from cluster-a:logs-2", randomBoolean(), filter)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(0)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1))); + + EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertClusterMetadataSkipped(remoteCluster, overallTookMillis, "logs-2"); + } + // Two indices + try (EsqlQueryResponse resp = runQuery("from logs-1,cluster-a:logs-2", randomBoolean(), filter)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(count > 1 ? 0 : docsTest1)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1))); + + EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertClusterMetadataSkipped(remoteCluster, overallTookMillis, "logs-2"); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + if (count > 1) { + assertClusterMetadataNoShards(localCluster, localShards, overallTookMillis, "logs-1"); + } else { + assertClusterMetadataSuccess(localCluster, localShards, overallTookMillis, "logs-1"); + } + } + // Wildcard + try (EsqlQueryResponse resp = runQuery("from logs-1,cluster-a:logs*", randomBoolean(), filter)) { + List> values = getValuesList(resp); + assertThat(values, hasSize(count > 1 ? 0 : docsTest1)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + long overallTookMillis = executionInfo.overallTook().millis(); + assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1))); + + EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); + assertClusterMetadataSkipped(remoteCluster, overallTookMillis, "logs*"); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + if (count > 1) { + assertClusterMetadataNoShards(localCluster, localShards, overallTookMillis, "logs-1"); + } else { + assertClusterMetadataSuccess(localCluster, localShards, overallTookMillis, "logs-1"); + } + } + } + } protected void populateDateIndex(String clusterAlias, String indexName, int numShards, int numDocs, String date) { From d58d6e6b08b1a937fdde4bb5d179d10772ec9876 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Fri, 18 Apr 2025 15:19:19 -0600 Subject: [PATCH 11/22] We can not eliminate some filtered runtime calls since there could be missing indices So we have to drop FILTERED status for now --- .../CrossClusterQueryWithFiltersIT.java | 6 +++--- .../xpack/esql/action/EsqlExecutionInfo.java | 1 - .../xpack/esql/session/EsqlCCSUtils.java | 21 +++++-------------- 3 files changed, 8 insertions(+), 20 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java index d9477df827749..8dbda1d16b6ff 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java @@ -157,7 +157,7 @@ public void testTimestampFilterFromQuery() { assertClusterMetadataSuccess(localCluster, localShards, overallTookMillis, "logs-1"); EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); - assertClusterMetadataNoShards(remoteCluster, remoteShards, overallTookMillis, "logs-2"); + assertClusterMetadataSkippedShards(remoteCluster, remoteShards, overallTookMillis, "logs-2"); } // Only remote is included @@ -221,7 +221,7 @@ public void testTimestampFilterFromQuery() { EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); // Remote has no shards due to filter - assertClusterMetadataNoShards(remoteCluster, remoteShards, overallTookMillis, "logs-2"); + assertClusterMetadataSkippedShards(remoteCluster, remoteShards, overallTookMillis, "logs-2"); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); // Local cluster can not be filtered out for now @@ -245,7 +245,7 @@ public void testTimestampFilterFromQuery() { EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); // Remote has no shards due to filter - assertClusterMetadataNoShards(remoteCluster, remoteShards, overallTookMillis, "logs-*"); + assertClusterMetadataSkippedShards(remoteCluster, remoteShards, overallTookMillis, "logs-*"); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); // Local cluster can not be filtered out for now diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index bc0f60c774c95..10e0bf29bc1eb 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -394,7 +394,6 @@ public static class Cluster implements ToXContentFragment, Writeable { public enum Status { RUNNING, // still running SUCCESSFUL, // all shards completed search - FILTERED, // all shards were filtered out, temporary status for planning PARTIAL, // only some shards completed the search, partial results from cluster SKIPPED, // entire cluster was skipped FAILED; // search was failed due to errors on this cluster diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index 636e5f4b0c8dc..133b7a14b20b9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -208,8 +208,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices( * Mark it as SKIPPED with 0 shards searched and took=0. */ for (String c : clustersWithNoMatchingIndices) { - if (executionInfo.getCluster(c).getStatus() != Cluster.Status.RUNNING - && executionInfo.getCluster(c).getStatus() != Cluster.Status.FILTERED) { + if (executionInfo.getCluster(c).getStatus() != Cluster.Status.RUNNING) { // if cluster was already in the terminal state and not filtered, do not overwrite continue; } @@ -224,21 +223,14 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices( } else { fatalErrorMessage += "; " + error; } - if (filter != null) { - // Use filtered status here because we still need to check it on the second lookup without the filter - // to ensure this index is real. - markClusterWithFinalStateAndNoShards(executionInfo, c, Cluster.Status.FILTERED, null); - } else { + if (filter == null) { markClusterWithFinalStateAndNoShards(executionInfo, c, Cluster.Status.FAILED, new VerificationException(error)); } } else { // no matching indices and no concrete index requested - just mark it as done, no error - markClusterWithFinalStateAndNoShards( - executionInfo, - c, - filter != null ? Cluster.Status.FILTERED : Cluster.Status.SUCCESSFUL, - null - ); + if (indexResolution.isValid()) { + markClusterWithFinalStateAndNoShards(executionInfo, c, Cluster.Status.SUCCESSFUL, null); + } } } if (fatalErrorMessage != null) { @@ -286,9 +278,6 @@ static void updateExecutionInfoAtEndOfPlanning(EsqlExecutionInfo execInfo) { .build() ); } - if (cluster.getStatus() == Cluster.Status.FILTERED) { - markClusterWithFinalStateAndNoShards(execInfo, clusterAlias, Cluster.Status.SUCCESSFUL, null); - } } } } From d2df0e1873e41b47cfa4a0f5cb29a3dde30e956b Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Mon, 21 Apr 2025 10:25:46 -0600 Subject: [PATCH 12/22] Improve comments --- .../elasticsearch/xpack/esql/session/EsqlCCSUtils.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java index 133b7a14b20b9..ed324cbc2e959 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java @@ -147,6 +147,7 @@ static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo execu StringBuilder sb = new StringBuilder(); for (String clusterAlias : executionInfo.clusterAliases()) { EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias); + // Exclude clusters which are either skipped or have no indices matching wildcard, or filtered out. if (cluster.getStatus() != Cluster.Status.SKIPPED && cluster.getStatus() != Cluster.Status.SUCCESSFUL) { if (cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { sb.append(executionInfo.getCluster(clusterAlias).getIndexExpression()).append(','); @@ -209,7 +210,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices( */ for (String c : clustersWithNoMatchingIndices) { if (executionInfo.getCluster(c).getStatus() != Cluster.Status.RUNNING) { - // if cluster was already in the terminal state and not filtered, do not overwrite + // if cluster was already in a terminal state, we don't need to check it again continue; } final String indexExpression = executionInfo.getCluster(c).getIndexExpression(); @@ -224,11 +225,14 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices( fatalErrorMessage += "; " + error; } if (filter == null) { + // Not very useful since we don't send metadata on errors now, but may be useful in the future + // We check for filter since the filter may be the reason why the index is missing, and then it's ok markClusterWithFinalStateAndNoShards(executionInfo, c, Cluster.Status.FAILED, new VerificationException(error)); } } else { - // no matching indices and no concrete index requested - just mark it as done, no error if (indexResolution.isValid()) { + // no matching indices and no concrete index requested - just mark it as done, no error + // We check for the valid resolution because if we have empty resolution it's still an error. markClusterWithFinalStateAndNoShards(executionInfo, c, Cluster.Status.SUCCESSFUL, null); } } @@ -238,7 +242,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices( } } - // Filter-less version + // Filter-less version, mainly for testing where we don't need filter support static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionInfo executionInfo, IndexResolution indexResolution) { updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution, null); } From 492daae1fff01faf427c26721e7488f40cd2f62c Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Mon, 21 Apr 2025 12:02:10 -0600 Subject: [PATCH 13/22] Add column tests --- .../xpack/esql/action/CrossClusterQueryWithFiltersIT.java | 8 ++++---- .../elasticsearch/xpack/esql/plugin/ComputeService.java | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java index 8dbda1d16b6ff..c9c0e991a5ea6 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java @@ -13,7 +13,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder; -import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.xpack.esql.VerificationException; import java.io.IOException; @@ -32,7 +31,6 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; -@TestLogging(value = "org.elasticsearch.xpack.esql:DEBUG", reason = "debug") public class CrossClusterQueryWithFiltersIT extends AbstractCrossClusterTestCase { @Override protected Map skipUnavailableForRemoteClusters() { @@ -187,6 +185,8 @@ public void testTimestampFilterFromQuery() { try (EsqlQueryResponse resp = runQuery("from logs-*,c*:logs-*", randomBoolean(), filter)) { List> values = getValuesList(resp); assertThat(values, hasSize(docsTest1)); + // FIXME: this is currently inconsistent with the non-wildcard case, since empty wildcard is not an error, + // the second field-caps does not happen and the remote fields are not added to the response. // assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a")); EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); @@ -209,7 +209,7 @@ public void testTimestampFilterFromQuery() { try (EsqlQueryResponse resp = runQuery("from logs-1,c*:logs-2", randomBoolean(), filter)) { List> values = getValuesList(resp); assertThat(values, hasSize(0)); - // assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a")); + assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a")); EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); assertNotNull(executionInfo); @@ -233,7 +233,7 @@ public void testTimestampFilterFromQuery() { try (EsqlQueryResponse resp = runQuery("from logs-*,c*:logs-*", randomBoolean(), filter)) { List> values = getValuesList(resp); assertThat(values, hasSize(0)); - // assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a")); + assertThat(resp.columns().stream().map(ColumnInfoImpl::name).toList(), hasItems("@timestamp", "tag-local", "tag-cluster-a")); EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); assertNotNull(executionInfo); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 099483f927124..36ab9fa49861c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -436,7 +436,7 @@ public void executePlan( final var remoteClusters = clusterComputeHandler.getRemoteClusters(clusterToConcreteIndices, clusterToOriginalIndices); for (ClusterComputeHandler.RemoteCluster cluster : remoteClusters) { if (execInfo.getCluster(cluster.clusterAlias()).getStatus() != EsqlExecutionInfo.Cluster.Status.RUNNING) { - // if the cluster is already in the terminal state, no need to call it + // if the cluster is already in the terminal state from the planning stage, no need to call it continue; } clusterComputeHandler.startComputeOnRemoteCluster( From 9cc3fadab01b66ba7d94d013171da4318f7ccd6e Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Mon, 21 Apr 2025 14:44:27 -0600 Subject: [PATCH 14/22] More tests for unavailable --- .../CrossClusterQueryWithFiltersIT.java | 86 ++++++++++++++++--- 1 file changed, 76 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java index c9c0e991a5ea6..e7bdcd7aa0d47 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java @@ -13,6 +13,11 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.NoSuchRemoteClusterException; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.esql.VerificationException; import java.io.IOException; @@ -346,12 +351,7 @@ public void testFilterWithMissingRemoteIndex() { } } - public void testFilterWithUnavailableRemote() throws IOException { - int docsTest1 = 50; - int localShards = randomIntBetween(1, 5); - populateDateIndex(LOCAL_CLUSTER, LOCAL_INDEX, localShards, docsTest1, "2024-11-26"); - cluster(REMOTE_CLUSTER_1).close(); - + private void checkRemoteFailures() { for (var filter : List.of( new RangeQueryBuilder("@timestamp").from("2024-01-01").to("now"), new RangeQueryBuilder("@timestamp").from("2025-01-01").to("now") @@ -365,13 +365,11 @@ public void testFilterWithUnavailableRemote() throws IOException { } } - public void testFilterWithUnavailableRemoteAndSkipUnavailable() throws IOException { - setSkipUnavailable(REMOTE_CLUSTER_1, true); + private void checkRemoteWithSkipUnavailable() { + int count = 0; int docsTest1 = 50; int localShards = randomIntBetween(1, 5); populateDateIndex(LOCAL_CLUSTER, LOCAL_INDEX, localShards, docsTest1, "2024-11-26"); - cluster(REMOTE_CLUSTER_1).close(); - int count = 0; for (var filter : List.of( new RangeQueryBuilder("@timestamp").from("2024-01-01").to("now"), @@ -438,7 +436,75 @@ public void testFilterWithUnavailableRemoteAndSkipUnavailable() throws IOExcepti } } } + } + + public void testFilterWithUnavailableRemote() throws IOException { + int docsTest1 = 50; + int localShards = randomIntBetween(1, 5); + populateDateIndex(LOCAL_CLUSTER, LOCAL_INDEX, localShards, docsTest1, "2024-11-26"); + cluster(REMOTE_CLUSTER_1).close(); + checkRemoteFailures(); + } + private void makeRemoteFailFieldCaps() { + for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) { + MockTransportService ts = asInstanceOf(MockTransportService.class, transportService); + ts.addRequestHandlingBehavior( + EsqlResolveFieldsAction.NAME, + (handler, request, channel, task) -> handler.messageReceived(request, new TransportChannel() { + @Override + public String getProfileName() { + return channel.getProfileName(); + } + + @Override + public void sendResponse(TransportResponse response) { + sendResponse(new NoSuchRemoteClusterException("cluster [cluster-a] not found, skipping")); + } + + @Override + public void sendResponse(Exception exception) { + channel.sendResponse(exception); + } + }, task) + ); + } + } + + private void clearRemoteRules() { + for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) { + MockTransportService ts = asInstanceOf(MockTransportService.class, transportService); + ts.clearAllRules(); + } + } + + // Test when the disconnect happens on the field-caps call itself + public void testFilterWithUnavailableOnFieldcaps() throws IOException { + int docsTest1 = 50; + int localShards = randomIntBetween(1, 5); + populateDateIndex(LOCAL_CLUSTER, LOCAL_INDEX, localShards, docsTest1, "2024-11-26"); + makeRemoteFailFieldCaps(); + try { + checkRemoteFailures(); + } finally { + clearRemoteRules(); + } + } + + public void testFilterWithUnavailableRemoteAndSkipUnavailable() throws IOException { + setSkipUnavailable(REMOTE_CLUSTER_1, true); + cluster(REMOTE_CLUSTER_1).close(); + checkRemoteWithSkipUnavailable(); + } + + public void testFilterWithUnavailableFieldCapsAndSkipUnavailable() throws IOException { + setSkipUnavailable(REMOTE_CLUSTER_1, true); + makeRemoteFailFieldCaps(); + try { + checkRemoteWithSkipUnavailable(); + } finally { + clearRemoteRules(); + } } protected void populateDateIndex(String clusterAlias, String indexName, int numShards, int numDocs, String date) { From 216bd11c3e610df4c6e64c83bb2a48850578aab5 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Tue, 22 Apr 2025 10:14:00 -0600 Subject: [PATCH 15/22] Declutter asserts --- .../CrossClusterQueryWithFiltersIT.java | 37 ++++++++----------- 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java index e7bdcd7aa0d47..366dd2f20f189 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java @@ -19,6 +19,7 @@ import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.esql.VerificationException; +import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo.Cluster.Status; import java.io.IOException; import java.util.HashSet; @@ -47,26 +48,26 @@ protected boolean reuseClusters() { return false; } - protected void assertClusterMetadataSuccess(EsqlExecutionInfo.Cluster clusterMetatata, int shards, long took, String indexExpression) { + protected void assertClusterMetadata(EsqlExecutionInfo.Cluster clusterMetatata, long took, String indexExpression, Status status) { assertThat(clusterMetatata.getIndexExpression(), equalTo(indexExpression)); - assertThat(clusterMetatata.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(clusterMetatata.getStatus(), equalTo(status)); assertThat(clusterMetatata.getTook().millis(), greaterThanOrEqualTo(0L)); assertThat(clusterMetatata.getTook().millis(), lessThanOrEqualTo(took)); + assertThat(clusterMetatata.getFailedShards(), equalTo(0)); + } + + protected void assertClusterMetadataSuccess(EsqlExecutionInfo.Cluster clusterMetatata, int shards, long took, String indexExpression) { + assertClusterMetadata(clusterMetatata, took, indexExpression, Status.SUCCESSFUL); assertThat(clusterMetatata.getTotalShards(), equalTo(shards)); assertThat(clusterMetatata.getSuccessfulShards(), equalTo(shards)); assertThat(clusterMetatata.getSkippedShards(), equalTo(0)); - assertThat(clusterMetatata.getFailedShards(), equalTo(0)); } - protected void assertClusterMetadataNoShards(EsqlExecutionInfo.Cluster clusterMetatata, int shards, long took, String indexExpression) { - assertThat(clusterMetatata.getIndexExpression(), equalTo(indexExpression)); - assertThat(clusterMetatata.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); - assertThat(clusterMetatata.getTook().millis(), greaterThanOrEqualTo(0L)); - assertThat(clusterMetatata.getTook().millis(), lessThanOrEqualTo(took)); + protected void assertClusterMetadataNoShards(EsqlExecutionInfo.Cluster clusterMetatata, long took, String indexExpression) { + assertClusterMetadata(clusterMetatata, took, indexExpression, Status.SUCCESSFUL); assertThat(clusterMetatata.getTotalShards(), equalTo(0)); assertThat(clusterMetatata.getSuccessfulShards(), equalTo(0)); assertThat(clusterMetatata.getSkippedShards(), equalTo(0)); - assertThat(clusterMetatata.getFailedShards(), equalTo(0)); } protected void assertClusterMetadataSkippedShards( @@ -75,25 +76,17 @@ protected void assertClusterMetadataSkippedShards( long took, String indexExpression ) { - assertThat(clusterMetatata.getIndexExpression(), equalTo(indexExpression)); - assertThat(clusterMetatata.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); - assertThat(clusterMetatata.getTook().millis(), greaterThanOrEqualTo(0L)); - assertThat(clusterMetatata.getTook().millis(), lessThanOrEqualTo(took)); + assertClusterMetadata(clusterMetatata, took, indexExpression, Status.SUCCESSFUL); assertThat(clusterMetatata.getTotalShards(), equalTo(shards)); assertThat(clusterMetatata.getSuccessfulShards(), equalTo(shards)); assertThat(clusterMetatata.getSkippedShards(), equalTo(shards)); - assertThat(clusterMetatata.getFailedShards(), equalTo(0)); } protected void assertClusterMetadataSkipped(EsqlExecutionInfo.Cluster clusterMetatata, long took, String indexExpression) { - assertThat(clusterMetatata.getIndexExpression(), equalTo(indexExpression)); - assertThat(clusterMetatata.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); - assertThat(clusterMetatata.getTook().millis(), greaterThanOrEqualTo(0L)); - assertThat(clusterMetatata.getTook().millis(), lessThanOrEqualTo(took)); + assertClusterMetadata(clusterMetatata, took, indexExpression, Status.SKIPPED); assertThat(clusterMetatata.getTotalShards(), equalTo(0)); assertThat(clusterMetatata.getSuccessfulShards(), equalTo(0)); assertThat(clusterMetatata.getSkippedShards(), equalTo(0)); - assertThat(clusterMetatata.getFailedShards(), equalTo(0)); } protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse, QueryBuilder filter) { @@ -203,7 +196,7 @@ public void testTimestampFilterFromQuery() { assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER))); EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1); - assertClusterMetadataNoShards(remoteCluster, remoteShards, overallTookMillis, "logs-*"); + assertClusterMetadataNoShards(remoteCluster, overallTookMillis, "logs-*"); EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); assertClusterMetadataSuccess(localCluster, localShards, overallTookMillis, "logs-*"); @@ -408,7 +401,7 @@ private void checkRemoteWithSkipUnavailable() { EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); if (count > 1) { - assertClusterMetadataNoShards(localCluster, localShards, overallTookMillis, "logs-1"); + assertClusterMetadataNoShards(localCluster, overallTookMillis, "logs-1"); } else { assertClusterMetadataSuccess(localCluster, localShards, overallTookMillis, "logs-1"); } @@ -430,7 +423,7 @@ private void checkRemoteWithSkipUnavailable() { EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); if (count > 1) { - assertClusterMetadataNoShards(localCluster, localShards, overallTookMillis, "logs-1"); + assertClusterMetadataNoShards(localCluster, overallTookMillis, "logs-1"); } else { assertClusterMetadataSuccess(localCluster, localShards, overallTookMillis, "logs-1"); } From 6cbfe1942aea05031101b7b916cef02083e68653 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Tue, 22 Apr 2025 10:19:30 -0600 Subject: [PATCH 16/22] Remove this change, may not be needed --- .../xpack/esql/qa/rest/EsqlRestValidationTestCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlRestValidationTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlRestValidationTestCase.java index 18feb0d32a899..9ec4f60f4c843 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlRestValidationTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlRestValidationTestCase.java @@ -74,7 +74,7 @@ public void wipeTestData() throws IOException { } private String getInexistentIndexErrorMessage() { - return "Unknown index "; + return "\"reason\" : \"Found 1 problem\\nline 1:1: Unknown index "; } public void testInexistentIndexNameWithWildcard() throws IOException { From 90986956d32e9b01209e0f32509cca3090a3a5e2 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Tue, 22 Apr 2025 11:30:32 -0600 Subject: [PATCH 17/22] add async --- .../xpack/esql/action/CrossClusterQueryWithFiltersIT.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java index 366dd2f20f189..2fe25eae7bc94 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Set; +import static org.elasticsearch.core.TimeValue.timeValueSeconds; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; import static org.hamcrest.Matchers.containsString; @@ -90,7 +91,7 @@ protected void assertClusterMetadataSkipped(EsqlExecutionInfo.Cluster clusterMet } protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse, QueryBuilder filter) { - EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); + EsqlQueryRequest request = randomBoolean() ? EsqlQueryRequest.asyncEsqlQueryRequest() : EsqlQueryRequest.syncEsqlQueryRequest(); request.query(query); request.pragmas(AbstractEsqlIntegTestCase.randomPragmas()); request.profile(randomInt(5) == 2); @@ -101,6 +102,7 @@ protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse if (filter != null) { request.filter(filter); } + request.waitForCompletionTimeout(timeValueSeconds(30)); return runQuery(request); } From c428a432867ab9082724e5a64db94a610721f1fb Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Tue, 22 Apr 2025 13:23:05 -0600 Subject: [PATCH 18/22] Update tests --- .../CrossClusterQueryWithFiltersIT.java | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java index 2fe25eae7bc94..91f1cc12851dc 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java @@ -9,8 +9,10 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.RemoteException; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.test.transport.MockTransportService; @@ -275,13 +277,13 @@ public void testFilterWithMissingIndex() { ); assertThat(e.getDetailedMessage(), containsString("Unknown index [missing]")); // Local index missing + wildcards - // FIXME: planner does not catch this now - // e = expectThrows(VerificationException.class, () -> runQuery("from missing,logs*", randomBoolean(), filter).close()); - // assertThat(e.getDetailedMessage(), containsString("Unknown index [missing]")); + // FIXME: planner does not catch this now, it should be VerificationException but for now it's runtime IndexNotFoundException + var ie = expectThrows(IndexNotFoundException.class, () -> runQuery("from missing,logs*", randomBoolean(), filter).close()); + assertThat(ie.getDetailedMessage(), containsString("no such index [missing]")); // Local index missing + existing index - // FIXME: planner does not catch this now - // e = expectThrows(VerificationException.class, () -> runQuery("from missing,logs-1", randomBoolean(), filter).close()); - // assertThat(e.getDetailedMessage(), containsString("Unknown index [missing]")); + // FIXME: planner does not catch this now, it should be VerificationException but for now it's runtime IndexNotFoundException + ie = expectThrows(IndexNotFoundException.class, () -> runQuery("from missing,logs-1", randomBoolean(), filter).close()); + assertThat(ie.getDetailedMessage(), containsString("no such index [missing]")); // Local index missing + existing remote e = expectThrows(VerificationException.class, () -> runQuery("from missing,cluster-a:logs-2", randomBoolean(), filter).close()); assertThat(e.getDetailedMessage(), containsString("Unknown index [missing]")); @@ -318,15 +320,19 @@ public void testFilterWithMissingRemoteIndex() { ); assertThat(e.getDetailedMessage(), containsString("Unknown index [cluster-a:missing]")); // Local index missing + wildcards - // FIXME: planner does not catch this now - // e = expectThrows(VerificationException.class, () -> runQuery("from cluster-a:missing,cluster-a:logs*", randomBoolean(), - // filter).close()); - // assertThat(e.getDetailedMessage(), containsString("Unknown index [cluster-a:missing]")); + // FIXME: planner does not catch this now, it should be VerificationException but for now it's runtime RemoteException + var ie = expectThrows( + RemoteException.class, + () -> runQuery("from cluster-a:missing,cluster-a:logs*", randomBoolean(), filter).close() + ); + assertThat(ie.getDetailedMessage(), containsString("no such index [missing]")); // Local index missing + existing index - // FIXME: planner does not catch this now - // e = expectThrows(VerificationException.class, () -> runQuery("from cluster-a:missing,cluster-a:logs-2", randomBoolean(), - // filter).close()); - // assertThat(e.getDetailedMessage(), containsString("Unknown index [cluster-a:missing]")); + // FIXME: planner does not catch this now, it should be VerificationException but for now it's runtime RemoteException + ie = expectThrows( + RemoteException.class, + () -> runQuery("from cluster-a:missing,cluster-a:logs-2", randomBoolean(), filter).close() + ); + assertThat(ie.getDetailedMessage(), containsString("no such index [missing]")); // Local index + missing remote e = expectThrows(VerificationException.class, () -> runQuery("from logs-1,cluster-a:missing", randomBoolean(), filter).close()); assertThat(e.getDetailedMessage(), containsString("Unknown index [cluster-a:missing]")); From 1dad0fbec290bf9dbbd5673cfaa4b9cfdceb024d Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Tue, 22 Apr 2025 14:22:44 -0600 Subject: [PATCH 19/22] Remove the test, no longer needed --- .../action/CrossClusterUsageTelemetryIT.java | 26 ------------------- 1 file changed, 26 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterUsageTelemetryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterUsageTelemetryIT.java index fdfbe9c6bf9d5..50976c751c100 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterUsageTelemetryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterUsageTelemetryIT.java @@ -138,32 +138,6 @@ public void testFailed() throws Exception { assertThat(telemetry.getByRemoteCluster().size(), equalTo(0)); } - // TODO: enable when skip-un patch is merged - // public void testSkipAllRemotes() throws Exception { - // var telemetry = getTelemetryFromQuery("from logs-*,c*:no_such_index | stats sum (v)", "unknown"); - // - // assertThat(telemetry.getTotalCount(), equalTo(1L)); - // assertThat(telemetry.getSuccessCount(), equalTo(1L)); - // assertThat(telemetry.getFailureReasons().size(), equalTo(0)); - // assertThat(telemetry.getTook().count(), equalTo(1L)); - // assertThat(telemetry.getTookMrtFalse().count(), equalTo(0L)); - // assertThat(telemetry.getTookMrtTrue().count(), equalTo(0L)); - // assertThat(telemetry.getRemotesPerSearchAvg(), equalTo(2.0)); - // assertThat(telemetry.getRemotesPerSearchMax(), equalTo(2L)); - // assertThat(telemetry.getSearchCountWithSkippedRemotes(), equalTo(1L)); - // assertThat(telemetry.getClientCounts().size(), equalTo(0)); - // - // var perCluster = telemetry.getByRemoteCluster(); - // assertThat(perCluster.size(), equalTo(3)); - // for (String clusterAlias : remoteClusterAlias()) { - // var clusterData = perCluster.get(clusterAlias); - // assertThat(clusterData.getCount(), equalTo(0L)); - // assertThat(clusterData.getSkippedCount(), equalTo(1L)); - // assertThat(clusterData.getTook().count(), equalTo(0L)); - // } - // assertPerClusterCount(perCluster.get(LOCAL_CLUSTER), 1L); - // } - public void testRemoteOnly() throws Exception { setupClusters(); var telemetry = getTelemetryFromQuery("from c*:logs-* | stats sum (v)", "kibana"); From 75913eb05b7cc23ea17e3164908c6d58e326c6fc Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Wed, 23 Apr 2025 09:21:55 -0600 Subject: [PATCH 20/22] Delete docs/changelog/126738.yaml --- docs/changelog/126738.yaml | 5 ----- 1 file changed, 5 deletions(-) delete mode 100644 docs/changelog/126738.yaml diff --git a/docs/changelog/126738.yaml b/docs/changelog/126738.yaml deleted file mode 100644 index 6ad81ea204c47..0000000000000 --- a/docs/changelog/126738.yaml +++ /dev/null @@ -1,5 +0,0 @@ -pr: 126738 -summary: Fix missing index exception -area: ES|QL -type: bug -issues: [] From a8e3efba963a262a1a5068de19f0496f0a27ac37 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Wed, 23 Apr 2025 09:41:31 -0600 Subject: [PATCH 21/22] Fix merge --- .../xpack/esql/action/CrossClusterQueryWithFiltersIT.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java index 2db185190aa75..45cd5015b1262 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java @@ -9,6 +9,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.RemoteException; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.QueryBuilder; From 44e559ab7084fd1cc94a17850191b6bacea9b6e6 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Wed, 23 Apr 2025 11:36:05 -0600 Subject: [PATCH 22/22] oops merge dropped this change, restore --- .../esql/action/CrossClusterQueryWithFiltersIT.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java index 45cd5015b1262..91f1cc12851dc 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithFiltersIT.java @@ -12,6 +12,7 @@ import org.elasticsearch.cluster.RemoteException; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.test.transport.MockTransportService; @@ -276,13 +277,13 @@ public void testFilterWithMissingIndex() { ); assertThat(e.getDetailedMessage(), containsString("Unknown index [missing]")); // Local index missing + wildcards - // FIXME: planner does not catch this now - // e = expectThrows(VerificationException.class, () -> runQuery("from missing,logs*", randomBoolean(), filter).close()); - // assertThat(e.getDetailedMessage(), containsString("Unknown index [missing]")); + // FIXME: planner does not catch this now, it should be VerificationException but for now it's runtime IndexNotFoundException + var ie = expectThrows(IndexNotFoundException.class, () -> runQuery("from missing,logs*", randomBoolean(), filter).close()); + assertThat(ie.getDetailedMessage(), containsString("no such index [missing]")); // Local index missing + existing index - // FIXME: planner does not catch this now - // e = expectThrows(VerificationException.class, () -> runQuery("from missing,logs-1", randomBoolean(), filter).close()); - // assertThat(e.getDetailedMessage(), containsString("Unknown index [missing]")); + // FIXME: planner does not catch this now, it should be VerificationException but for now it's runtime IndexNotFoundException + ie = expectThrows(IndexNotFoundException.class, () -> runQuery("from missing,logs-1", randomBoolean(), filter).close()); + assertThat(ie.getDetailedMessage(), containsString("no such index [missing]")); // Local index missing + existing remote e = expectThrows(VerificationException.class, () -> runQuery("from missing,cluster-a:logs-2", randomBoolean(), filter).close()); assertThat(e.getDetailedMessage(), containsString("Unknown index [missing]"));