diff --git a/docs/reference/esql/esql-across-clusters.asciidoc b/docs/reference/esql/esql-across-clusters.asciidoc index cfcb5de73602c..db266fafde9d6 100644 --- a/docs/reference/esql/esql-across-clusters.asciidoc +++ b/docs/reference/esql/esql-across-clusters.asciidoc @@ -188,9 +188,10 @@ FROM *:my-index-000001 [[ccq-cluster-details]] ==== Cross-cluster metadata -ES|QL {ccs} responses include metadata about the search on each cluster when the response format is JSON. +Using the `"include_ccs_metadata": true` option, users can request that +ES|QL {ccs} responses include metadata about the search on each cluster (when the response format is JSON). Here we show an example using the async search endpoint. {ccs-cap} metadata is also present in the synchronous -search endpoint. +search endpoint response when requested. [source,console] ---- @@ -200,7 +201,8 @@ POST /_query/async?format=json FROM my-index-000001,cluster_one:my-index-000001,cluster_two:my-index* | STATS COUNT(http.response.status_code) BY user.id | LIMIT 2 - """ + """, + "include_ccs_metadata": true } ---- // TEST[setup:my_index] @@ -238,7 +240,7 @@ Which returns: "(local)": { <4> "status": "successful", "indices": "blogs", - "took": 36, <5> + "took": 41, <5> "_shards": { <6> "total": 13, "successful": 13, @@ -260,7 +262,7 @@ Which returns: "cluster_two": { "status": "successful", "indices": "cluster_two:my-index*", - "took": 41, + "took": 40, "_shards": { "total": 18, "successful": 18, @@ -286,7 +288,7 @@ it is identified as "(local)". <5> How long (in milliseconds) the search took on each cluster. This can be useful to determine which clusters have slower response times than others. <6> The shard details for the search on that cluster, including a count of shards that were -skipped due to the can-match phase. Shards are skipped when they cannot have any matching data +skipped due to the can-match phase results. Shards are skipped when they cannot have any matching data and therefore are not included in the full ES|QL query. @@ -294,9 +296,6 @@ The cross-cluster metadata can be used to determine whether any data came back f For instance, in the query below, the wildcard expression for `cluster-two` did not resolve to a concrete index (or indices). The cluster is, therefore, marked as 'skipped' and the total number of shards searched is set to zero. -Since the other cluster did have a matching index, the search did not return an error, but -instead returned all the matching data it could find. - [source,console] ---- @@ -306,7 +305,8 @@ POST /_query/async?format=json FROM cluster_one:my-index*,cluster_two:logs* | STATS COUNT(http.response.status_code) BY user.id | LIMIT 2 - """ + """, + "include_ccs_metadata": true } ---- // TEST[continued] diff --git a/docs/reference/esql/esql-query-api.asciidoc b/docs/reference/esql/esql-query-api.asciidoc index d1db21043a5b5..b1582721ad0e0 100644 --- a/docs/reference/esql/esql-query-api.asciidoc +++ b/docs/reference/esql/esql-query-api.asciidoc @@ -67,6 +67,11 @@ precedence. `false`. The API only supports this parameter for CBOR, JSON, SMILE, and YAML responses. See <>. +`include_ccs_metadata`:: +(Optional, boolean) If `true`, cross-cluster searches will include metadata about the query +on each cluster. Defaults to `false`. The API only supports this parameter for CBOR, JSON, SMILE, +and YAML responses. See <>. + `locale`:: (Optional, string) Returns results (especially dates) formatted per the conventions of the locale. For syntax, refer to <>. @@ -85,6 +90,7 @@ https://en.wikipedia.org/wiki/Query_plan[EXPLAIN PLAN]. `query`:: (Required, string) {esql} query to run. For syntax, refer to <>. + ifeval::["{release-state}"=="unreleased"] `table`:: (Optional, object) Named "table" parameters that can be referenced by the <> command. @@ -108,6 +114,13 @@ returned if `drop_null_columns` is sent with the request. (array of arrays) Values for the search results. +`_clusters`:: +(object) +Metadata about clusters involved in the execution of a cross-cluster query. Only returned (1) for +cross-cluster searches and (2) when `include_ccs_metadata` is sent in the body and set to `true` +and (3) when `format` of the response is set to JSON (the default), CBOR, SMILE, or YAML. +See <> for more information. + `profile`:: (object) Profile describing the execution of the query. Only returned if `profile` was sent in the body. diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 0f9c27a7877b8..03186e63240e5 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -241,6 +241,7 @@ static TransportVersion def(int id) { public static final TransportVersion RETRIEVERS_TELEMETRY_ADDED = def(8_765_00_0); public static final TransportVersion ESQL_CACHED_STRING_SERIALIZATION = def(8_766_00_0); public static final TransportVersion CHUNK_SENTENCE_OVERLAP_SETTING_ADDED = def(8_767_00_0); + public static final TransportVersion OPT_IN_ESQL_CCS_EXECUTION_INFO = def(8_768_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java index 454f3962c07ea..1f72827057c5b 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java @@ -127,8 +127,18 @@ void indexDocs(RestClient client, String index, List docs) throws IOExcepti refresh(client, index); } - private Map run(String query) throws IOException { - Map resp = runEsql(new RestEsqlTestCase.RequestObjectBuilder().query(query).build()); + private Map run(String query, boolean includeCCSMetadata) throws IOException { + Map resp = runEsql( + new RestEsqlTestCase.RequestObjectBuilder().query(query).includeCCSMetadata(includeCCSMetadata).build() + ); + logger.info("--> query {} response {}", query, resp); + return resp; + } + + private Map runWithColumnarAndIncludeCCSMetadata(String query) throws IOException { + Map resp = runEsql( + new RestEsqlTestCase.RequestObjectBuilder().query(query).includeCCSMetadata(true).columnar(true).build() + ); logger.info("--> query {} response {}", query, resp); return resp; } @@ -147,62 +157,77 @@ private Map runEsql(RestEsqlTestCase.RequestObjectBuilder reques public void testCount() throws Exception { { - Map result = run("FROM test-local-index,*:test-remote-index | STATS c = COUNT(*)"); + boolean includeCCSMetadata = randomBoolean(); + Map result = run("FROM test-local-index,*:test-remote-index | STATS c = COUNT(*)", includeCCSMetadata); var columns = List.of(Map.of("name", "c", "type", "long")); var values = List.of(List.of(localDocs.size() + remoteDocs.size())); MapMatcher mapMatcher = matchesMap(); - assertMap( - result, - mapMatcher.entry("columns", columns) - .entry("values", values) - .entry("took", greaterThanOrEqualTo(0)) - .entry("_clusters", any(Map.class)) - ); - assertClusterDetailsMap(result, false); + if (includeCCSMetadata) { + mapMatcher = mapMatcher.entry("_clusters", any(Map.class)); + } + assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0))); + if (includeCCSMetadata) { + assertClusterDetailsMap(result, false); + } } { - Map result = run("FROM *:test-remote-index | STATS c = COUNT(*)"); + boolean includeCCSMetadata = randomBoolean(); + Map result = run("FROM *:test-remote-index | STATS c = COUNT(*)", includeCCSMetadata); var columns = List.of(Map.of("name", "c", "type", "long")); var values = List.of(List.of(remoteDocs.size())); MapMatcher mapMatcher = matchesMap(); - assertMap( - result, - mapMatcher.entry("columns", columns) - .entry("values", values) - .entry("took", greaterThanOrEqualTo(0)) - .entry("_clusters", any(Map.class)) - ); - assertClusterDetailsMap(result, true); + if (includeCCSMetadata) { + mapMatcher = mapMatcher.entry("_clusters", any(Map.class)); + } + assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0))); + if (includeCCSMetadata) { + assertClusterDetailsMap(result, true); + } } } public void testUngroupedAggs() throws Exception { { - Map result = run("FROM test-local-index,*:test-remote-index | STATS total = SUM(data)"); + boolean includeCCSMetadata = randomBoolean(); + Map result = run("FROM test-local-index,*:test-remote-index | STATS total = SUM(data)", includeCCSMetadata); var columns = List.of(Map.of("name", "total", "type", "long")); long sum = Stream.concat(localDocs.stream(), remoteDocs.stream()).mapToLong(d -> d.data).sum(); var values = List.of(List.of(Math.toIntExact(sum))); // check all sections of map except _cluster/details MapMatcher mapMatcher = matchesMap(); - assertMap( - result, - mapMatcher.entry("columns", columns) - .entry("values", values) - .entry("took", greaterThanOrEqualTo(0)) - .entry("_clusters", any(Map.class)) - ); - assertClusterDetailsMap(result, false); + if (includeCCSMetadata) { + mapMatcher = mapMatcher.entry("_clusters", any(Map.class)); + } + assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0))); + if (includeCCSMetadata) { + assertClusterDetailsMap(result, false); + } } { - Map result = run("FROM *:test-remote-index | STATS total = SUM(data)"); + boolean includeCCSMetadata = randomBoolean(); + Map result = run("FROM *:test-remote-index | STATS total = SUM(data)", includeCCSMetadata); + var columns = List.of(Map.of("name", "total", "type", "long")); + long sum = remoteDocs.stream().mapToLong(d -> d.data).sum(); + var values = List.of(List.of(Math.toIntExact(sum))); + + MapMatcher mapMatcher = matchesMap(); + if (includeCCSMetadata) { + mapMatcher = mapMatcher.entry("_clusters", any(Map.class)); + } + assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0))); + if (includeCCSMetadata) { + assertClusterDetailsMap(result, true); + } + } + { + Map result = runWithColumnarAndIncludeCCSMetadata("FROM *:test-remote-index | STATS total = SUM(data)"); var columns = List.of(Map.of("name", "total", "type", "long")); long sum = remoteDocs.stream().mapToLong(d -> d.data).sum(); var values = List.of(List.of(Math.toIntExact(sum))); - // check all sections of map except _cluster/details MapMatcher mapMatcher = matchesMap(); assertMap( result, @@ -269,7 +294,11 @@ private void assertClusterDetailsMap(Map result, boolean remoteO public void testGroupedAggs() throws Exception { { - Map result = run("FROM test-local-index,*:test-remote-index | STATS total = SUM(data) BY color | SORT color"); + boolean includeCCSMetadata = randomBoolean(); + Map result = run( + "FROM test-local-index,*:test-remote-index | STATS total = SUM(data) BY color | SORT color", + includeCCSMetadata + ); var columns = List.of(Map.of("name", "total", "type", "long"), Map.of("name", "color", "type", "keyword")); var values = Stream.concat(localDocs.stream(), remoteDocs.stream()) .collect(Collectors.toMap(d -> d.color, Doc::data, Long::sum)) @@ -280,17 +309,20 @@ public void testGroupedAggs() throws Exception { .toList(); MapMatcher mapMatcher = matchesMap(); - assertMap( - result, - mapMatcher.entry("columns", columns) - .entry("values", values) - .entry("took", greaterThanOrEqualTo(0)) - .entry("_clusters", any(Map.class)) - ); - assertClusterDetailsMap(result, false); + if (includeCCSMetadata) { + mapMatcher = mapMatcher.entry("_clusters", any(Map.class)); + } + assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0))); + if (includeCCSMetadata) { + assertClusterDetailsMap(result, false); + } } { - Map result = run("FROM *:test-remote-index | STATS total = SUM(data) by color | SORT color"); + boolean includeCCSMetadata = randomBoolean(); + Map result = run( + "FROM *:test-remote-index | STATS total = SUM(data) by color | SORT color", + includeCCSMetadata + ); var columns = List.of(Map.of("name", "total", "type", "long"), Map.of("name", "color", "type", "keyword")); var values = remoteDocs.stream() .collect(Collectors.toMap(d -> d.color, Doc::data, Long::sum)) @@ -300,16 +332,15 @@ public void testGroupedAggs() throws Exception { .map(e -> List.of(Math.toIntExact(e.getValue()), e.getKey())) .toList(); - // check all sections of map except _cluster/details + // check all sections of map except _clusters/details MapMatcher mapMatcher = matchesMap(); - assertMap( - result, - mapMatcher.entry("columns", columns) - .entry("values", values) - .entry("took", greaterThanOrEqualTo(0)) - .entry("_clusters", any(Map.class)) - ); - assertClusterDetailsMap(result, true); + if (includeCCSMetadata) { + mapMatcher = mapMatcher.entry("_clusters", any(Map.class)); + } + assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0))); + if (includeCCSMetadata) { + assertClusterDetailsMap(result, true); + } } } diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index 3388f6f517bdf..7de4ee4ccae28 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -76,6 +76,7 @@ public void testBasicEsql() throws IOException { indexTimestampData(1); RequestObjectBuilder builder = requestObjectBuilder().query(fromIndex() + " | stats avg(value)"); + requestObjectBuilder().includeCCSMetadata(randomBoolean()); if (Build.current().isSnapshot()) { builder.pragmas(Settings.builder().put("data_partitioning", "shard").build()); } diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java index 8163e73078c71..4fa6ac3009654 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java @@ -128,6 +128,7 @@ public static class RequestObjectBuilder { private Boolean keepOnCompletion = null; private Boolean profile = null; + private Boolean includeCCSMetadata = null; private CheckedConsumer filter; @@ -197,6 +198,11 @@ public RequestObjectBuilder profile(boolean profile) { return this; } + public RequestObjectBuilder includeCCSMetadata(boolean includeCCSMetadata) { + this.includeCCSMetadata = includeCCSMetadata; + return this; + } + public RequestObjectBuilder filter(CheckedConsumer filter) { this.filter = filter; return this; @@ -220,6 +226,9 @@ public RequestObjectBuilder build() throws IOException { if (profile != null) { builder.field("profile", profile); } + if (includeCCSMetadata != null) { + builder.field("include_ccs_metadata", includeCCSMetadata); + } if (filter != null) { builder.startObject("filter"); filter.accept(builder); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java index 452d56680e8da..7d8bb738098d3 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java @@ -14,6 +14,7 @@ import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.core.Tuple; import org.elasticsearch.ingest.common.IngestCommonPlugin; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.license.LicenseService; @@ -220,7 +221,7 @@ static String enrichVendors(Enrich.Mode mode) { public void testWithHostsPolicy() { for (var mode : Enrich.Mode.values()) { String query = "FROM events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os"; - try (EsqlQueryResponse resp = runQuery(query)) { + try (EsqlQueryResponse resp = runQuery(query, null)) { List> rows = getValuesList(resp); assertThat( rows, @@ -237,9 +238,14 @@ public void testWithHostsPolicy() { assertFalse(resp.getExecutionInfo().isCrossClusterSearch()); } } + + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + for (var mode : Enrich.Mode.values()) { String query = "FROM *:events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os"; - try (EsqlQueryResponse resp = runQuery(query)) { + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { List> rows = getValuesList(resp); assertThat( rows, @@ -255,6 +261,7 @@ public void testWithHostsPolicy() { ) ); EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of("c1", "c2"))); assertCCSExecutionInfoDetails(executionInfo); } @@ -262,7 +269,7 @@ public void testWithHostsPolicy() { for (var mode : Enrich.Mode.values()) { String query = "FROM *:events,events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os"; - try (EsqlQueryResponse resp = runQuery(query)) { + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { List> rows = getValuesList(resp); assertThat( rows, @@ -278,6 +285,7 @@ public void testWithHostsPolicy() { ) ); EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2"))); assertCCSExecutionInfoDetails(executionInfo); } @@ -285,6 +293,10 @@ public void testWithHostsPolicy() { } public void testEnrichHostsAggThenEnrichVendorCoordinator() { + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + for (var hostMode : Enrich.Mode.values()) { String query = String.format(Locale.ROOT, """ FROM *:events,events @@ -295,7 +307,7 @@ public void testEnrichHostsAggThenEnrichVendorCoordinator() { | stats c = SUM(c) by vendor | sort vendor """, enrichHosts(hostMode), enrichVendors(Enrich.Mode.COORDINATOR)); - try (EsqlQueryResponse resp = runQuery(query)) { + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { assertThat( getValuesList(resp), equalTo( @@ -309,6 +321,7 @@ public void testEnrichHostsAggThenEnrichVendorCoordinator() { ) ); EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2"))); assertCCSExecutionInfoDetails(executionInfo); } @@ -316,6 +329,10 @@ public void testEnrichHostsAggThenEnrichVendorCoordinator() { } public void testEnrichTwiceThenAggs() { + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + for (var hostMode : Enrich.Mode.values()) { String query = String.format(Locale.ROOT, """ FROM *:events,events @@ -325,7 +342,7 @@ public void testEnrichTwiceThenAggs() { | stats c = COUNT(*) by vendor | sort vendor """, enrichHosts(hostMode), enrichVendors(Enrich.Mode.COORDINATOR)); - try (EsqlQueryResponse resp = runQuery(query)) { + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { assertThat( getValuesList(resp), equalTo( @@ -339,6 +356,7 @@ public void testEnrichTwiceThenAggs() { ) ); EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2"))); assertCCSExecutionInfoDetails(executionInfo); } @@ -346,6 +364,10 @@ public void testEnrichTwiceThenAggs() { } public void testEnrichCoordinatorThenAny() { + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + String query = String.format(Locale.ROOT, """ FROM *:events,events | eval ip= TO_STR(host) @@ -354,7 +376,7 @@ public void testEnrichCoordinatorThenAny() { | stats c = COUNT(*) by vendor | sort vendor """, enrichHosts(Enrich.Mode.COORDINATOR), enrichVendors(Enrich.Mode.ANY)); - try (EsqlQueryResponse resp = runQuery(query)) { + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { assertThat( getValuesList(resp), equalTo( @@ -368,12 +390,17 @@ public void testEnrichCoordinatorThenAny() { ) ); EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2"))); assertCCSExecutionInfoDetails(executionInfo); } } public void testEnrichCoordinatorWithVendor() { + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + for (Enrich.Mode hostMode : Enrich.Mode.values()) { String query = String.format(Locale.ROOT, """ FROM *:events,events @@ -383,7 +410,7 @@ public void testEnrichCoordinatorWithVendor() { | stats c = COUNT(*) by vendor | sort vendor """, enrichHosts(hostMode), enrichVendors(Enrich.Mode.COORDINATOR)); - try (EsqlQueryResponse resp = runQuery(query)) { + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { assertThat( getValuesList(resp), equalTo( @@ -397,6 +424,7 @@ public void testEnrichCoordinatorWithVendor() { ) ); EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2"))); assertCCSExecutionInfoDetails(executionInfo); } @@ -405,6 +433,10 @@ public void testEnrichCoordinatorWithVendor() { } public void testEnrichRemoteWithVendor() { + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + for (Enrich.Mode hostMode : List.of(Enrich.Mode.ANY, Enrich.Mode.REMOTE)) { var query = String.format(Locale.ROOT, """ FROM *:events,events @@ -414,7 +446,7 @@ public void testEnrichRemoteWithVendor() { | stats c = COUNT(*) by vendor | sort vendor """, enrichHosts(hostMode), enrichVendors(Enrich.Mode.REMOTE)); - try (EsqlQueryResponse resp = runQuery(query)) { + try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) { assertThat( getValuesList(resp), equalTo( @@ -430,6 +462,7 @@ public void testEnrichRemoteWithVendor() { ) ); EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2"))); assertCCSExecutionInfoDetails(executionInfo); } @@ -444,7 +477,7 @@ public void testTopNThenEnrichRemote() { | LIMIT 5 | %s """, enrichHosts(Enrich.Mode.REMOTE)); - var error = expectThrows(VerificationException.class, () -> runQuery(query).close()); + var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close()); assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after LIMIT")); } @@ -455,7 +488,7 @@ public void testLimitThenEnrichRemote() { | eval ip= TO_STR(host) | %s """, enrichHosts(Enrich.Mode.REMOTE)); - var error = expectThrows(VerificationException.class, () -> runQuery(query).close()); + var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close()); assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after LIMIT")); } @@ -468,7 +501,7 @@ public void testAggThenEnrichRemote() { | %s | sort vendor """, enrichHosts(Enrich.Mode.ANY), enrichVendors(Enrich.Mode.REMOTE)); - var error = expectThrows(VerificationException.class, () -> runQuery(query).close()); + var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close()); assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after STATS")); } @@ -480,20 +513,23 @@ public void testEnrichCoordinatorThenEnrichRemote() { | %s | sort vendor """, enrichHosts(Enrich.Mode.COORDINATOR), enrichVendors(Enrich.Mode.REMOTE)); - var error = expectThrows(VerificationException.class, () -> runQuery(query).close()); + var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close()); assertThat( error.getMessage(), containsString("ENRICH with remote policy can't be executed after another ENRICH with coordinator policy") ); } - protected EsqlQueryResponse runQuery(String query) { + protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse) { EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); request.query(query); request.pragmas(AbstractEsqlIntegTestCase.randomPragmas()); if (randomBoolean()) { request.profile(true); } + if (ccsMetadataInResponse != null) { + request.includeCCSMetadata(ccsMetadataInResponse); + } return client(LOCAL_CLUSTER).execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS); } @@ -516,6 +552,15 @@ private static void assertCCSExecutionInfoDetails(EsqlExecutionInfo executionInf } } + public static Tuple randomIncludeCCSMetadata() { + return switch (randomIntBetween(1, 3)) { + case 1 -> new Tuple<>(Boolean.TRUE, Boolean.TRUE); + case 2 -> new Tuple<>(Boolean.FALSE, Boolean.FALSE); + case 3 -> new Tuple<>(null, Boolean.FALSE); + default -> throw new AssertionError("should not get here"); + }; + } + public static class LocalStateEnrich extends LocalStateCompositeXPackPlugin { public LocalStateEnrich(final Settings settings, final Path configPath) throws Exception { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java index 4f4f3d112247e..adfa2fc7273cd 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java @@ -10,7 +10,6 @@ import org.elasticsearch.Build; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.Priority; @@ -21,13 +20,16 @@ import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.AbstractMultiClustersTestCase; import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.XContentTestUtils; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -79,12 +81,15 @@ public List> getSettings() { } } - public void testSimple() { + public void testSuccessfulPathways() { Map testClusterInfo = setupTwoClusters(); int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards"); - try (EsqlQueryResponse resp = runQuery("from logs-*,*:logs-* | stats sum (v)")) { + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + try (EsqlQueryResponse resp = runQuery("from logs-*,*:logs-* | stats sum (v)", requestIncludeMeta)) { List> values = getValuesList(resp); assertThat(values, hasSize(1)); assertThat(values.get(0), equalTo(List.of(330L))); @@ -93,6 +98,7 @@ public void testSimple() { assertNotNull(executionInfo); assertThat(executionInfo.isCrossClusterSearch(), is(true)); assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER))); @@ -113,9 +119,12 @@ public void testSimple() { assertThat(localCluster.getSuccessfulShards(), equalTo(localNumShards)); assertThat(localCluster.getSkippedShards(), equalTo(0)); assertThat(localCluster.getFailedShards(), equalTo(0)); + + // ensure that the _clusters metadata is present only if requested + assertClusterMetadataInResponse(resp, responseExpectMeta); } - try (EsqlQueryResponse resp = runQuery("from logs-*,*:logs-* | stats count(*) by tag | sort tag | keep tag")) { + try (EsqlQueryResponse resp = runQuery("from logs-*,*:logs-* | stats count(*) by tag | sort tag | keep tag", requestIncludeMeta)) { List> values = getValuesList(resp); assertThat(values, hasSize(2)); assertThat(values.get(0), equalTo(List.of("local"))); @@ -125,6 +134,7 @@ public void testSimple() { assertNotNull(executionInfo); assertThat(executionInfo.isCrossClusterSearch(), is(true)); assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER))); @@ -145,6 +155,9 @@ public void testSimple() { assertThat(localCluster.getSuccessfulShards(), equalTo(localNumShards)); assertThat(localCluster.getSkippedShards(), equalTo(0)); assertThat(localCluster.getFailedShards(), equalTo(0)); + + // ensure that the _clusters metadata is present only if requested + assertClusterMetadataInResponse(resp, responseExpectMeta); } } @@ -153,9 +166,13 @@ public void testSearchesWhereMissingIndicesAreSpecified() { int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards"); + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + // since a valid local index was specified, the invalid index on cluster-a does not throw an exception, // but instead is simply ignored - ensure this is captured in the EsqlExecutionInfo - try (EsqlQueryResponse resp = runQuery("from logs-*,cluster-a:no_such_index | stats sum (v)")) { + try (EsqlQueryResponse resp = runQuery("from logs-*,cluster-a:no_such_index | stats sum (v)", requestIncludeMeta)) { EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); List> values = getValuesList(resp); assertThat(values, hasSize(1)); @@ -164,6 +181,7 @@ public void testSearchesWhereMissingIndicesAreSpecified() { assertNotNull(executionInfo); assertThat(executionInfo.isCrossClusterSearch(), is(true)); assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER))); @@ -188,7 +206,12 @@ public void testSearchesWhereMissingIndicesAreSpecified() { // since the remote cluster has a valid index expression, the missing local index is ignored // make this is captured in the EsqlExecutionInfo - try (EsqlQueryResponse resp = runQuery("from no_such_index,*:logs-* | stats count(*) by tag | sort tag | keep tag")) { + try ( + EsqlQueryResponse resp = runQuery( + "from no_such_index,*:logs-* | stats count(*) by tag | sort tag | keep tag", + requestIncludeMeta + ) + ) { List> values = getValuesList(resp); assertThat(values, hasSize(1)); assertThat(values.get(0), equalTo(List.of("remote"))); @@ -197,6 +220,7 @@ public void testSearchesWhereMissingIndicesAreSpecified() { assertNotNull(executionInfo); assertThat(executionInfo.isCrossClusterSearch(), is(true)); assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER))); @@ -223,7 +247,8 @@ public void testSearchesWhereMissingIndicesAreSpecified() { // in the index expression of the EsqlExecutionInfo and with an indication that zero shards were searched try ( EsqlQueryResponse resp = runQuery( - "FROM no_such_index*,*:no_such_index1,*:no_such_index2,logs-1 | STATS COUNT(*) by tag | SORT tag | KEEP tag" + "FROM no_such_index*,*:no_such_index1,*:no_such_index2,logs-1 | STATS COUNT(*) by tag | SORT tag | KEEP tag", + requestIncludeMeta ) ) { List> values = getValuesList(resp); @@ -234,6 +259,7 @@ public void testSearchesWhereMissingIndicesAreSpecified() { assertNotNull(executionInfo); assertThat(executionInfo.isCrossClusterSearch(), is(true)); assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER))); @@ -257,7 +283,7 @@ public void testSearchesWhereMissingIndicesAreSpecified() { } // wildcard on remote cluster that matches nothing - should be present in EsqlExecutionInfo marked as SKIPPED, no shards searched - try (EsqlQueryResponse resp = runQuery("from cluster-a:no_such_index*,logs-* | stats sum (v)")) { + try (EsqlQueryResponse resp = runQuery("from cluster-a:no_such_index*,logs-* | stats sum (v)", requestIncludeMeta)) { EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); List> values = getValuesList(resp); assertThat(values, hasSize(1)); @@ -266,6 +292,7 @@ public void testSearchesWhereMissingIndicesAreSpecified() { assertNotNull(executionInfo); assertThat(executionInfo.isCrossClusterSearch(), is(true)); assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER))); @@ -293,8 +320,12 @@ public void testSearchesWhereNonExistentClusterIsSpecifiedWithWildcards() { Map testClusterInfo = setupTwoClusters(); int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + // a query which matches no remote cluster is not a cross cluster search - try (EsqlQueryResponse resp = runQuery("from logs-*,x*:no_such_index* | stats sum (v)")) { + try (EsqlQueryResponse resp = runQuery("from logs-*,x*:no_such_index* | stats sum (v)", requestIncludeMeta)) { EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); List> values = getValuesList(resp); assertThat(values, hasSize(1)); @@ -303,12 +334,18 @@ public void testSearchesWhereNonExistentClusterIsSpecifiedWithWildcards() { assertNotNull(executionInfo); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER))); assertThat(executionInfo.isCrossClusterSearch(), is(false)); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); // since this not a CCS, only the overall took time in the EsqlExecutionInfo matters assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); } // cluster-foo* matches nothing and so should not be present in the EsqlExecutionInfo - try (EsqlQueryResponse resp = runQuery("from logs-*,no_such_index*,cluster-a:no_such_index*,cluster-foo*:* | stats sum (v)")) { + try ( + EsqlQueryResponse resp = runQuery( + "from logs-*,no_such_index*,cluster-a:no_such_index*,cluster-foo*:* | stats sum (v)", + requestIncludeMeta + ) + ) { EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); List> values = getValuesList(resp); assertThat(values, hasSize(1)); @@ -317,6 +354,7 @@ public void testSearchesWhereNonExistentClusterIsSpecifiedWithWildcards() { assertNotNull(executionInfo); assertThat(executionInfo.isCrossClusterSearch(), is(true)); assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER))); @@ -349,8 +387,12 @@ public void testSearchesWhereNonExistentClusterIsSpecifiedWithWildcards() { public void testCCSExecutionOnSearchesWithLimit0() { setupTwoClusters(); + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + // Ensure non-cross cluster queries have overall took time - try (EsqlQueryResponse resp = runQuery("FROM logs* | LIMIT 0")) { + try (EsqlQueryResponse resp = runQuery("FROM logs* | LIMIT 0", requestIncludeMeta)) { EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); assertNotNull(executionInfo); assertThat(executionInfo.isCrossClusterSearch(), is(false)); @@ -358,12 +400,13 @@ public void testCCSExecutionOnSearchesWithLimit0() { } // ensure cross-cluster searches have overall took time and correct per-cluster details in EsqlExecutionInfo - try (EsqlQueryResponse resp = runQuery("FROM logs*,cluster-a:* | LIMIT 0")) { + try (EsqlQueryResponse resp = runQuery("FROM logs*,cluster-a:* | LIMIT 0", requestIncludeMeta)) { EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); assertNotNull(executionInfo); assertThat(executionInfo.isCrossClusterSearch(), is(true)); long overallTookMillis = executionInfo.overallTook().millis(); assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER))); EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER); @@ -387,12 +430,13 @@ public void testCCSExecutionOnSearchesWithLimit0() { assertNull(localCluster.getFailedShards()); } - try (EsqlQueryResponse resp = runQuery("FROM logs*,cluster-a:nomatch* | LIMIT 0")) { + try (EsqlQueryResponse resp = runQuery("FROM logs*,cluster-a:nomatch* | LIMIT 0", requestIncludeMeta)) { EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); assertNotNull(executionInfo); assertThat(executionInfo.isCrossClusterSearch(), is(true)); long overallTookMillis = executionInfo.overallTook().millis(); assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER))); EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER); @@ -415,12 +459,13 @@ public void testCCSExecutionOnSearchesWithLimit0() { assertNull(localCluster.getFailedShards()); } - try (EsqlQueryResponse resp = runQuery("FROM nomatch*,cluster-a:* | LIMIT 0")) { + try (EsqlQueryResponse resp = runQuery("FROM nomatch*,cluster-a:* | LIMIT 0", requestIncludeMeta)) { EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); assertNotNull(executionInfo); assertThat(executionInfo.isCrossClusterSearch(), is(true)); long overallTookMillis = executionInfo.overallTook().millis(); assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER))); EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER); @@ -447,7 +492,16 @@ public void testMetadataIndex() { int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards"); - try (EsqlQueryResponse resp = runQuery("FROM logs*,*:logs* METADATA _index | stats sum(v) by _index | sort _index")) { + Tuple includeCCSMetadata = randomIncludeCCSMetadata(); + Boolean requestIncludeMeta = includeCCSMetadata.v1(); + boolean responseExpectMeta = includeCCSMetadata.v2(); + + try ( + EsqlQueryResponse resp = runQuery( + "FROM logs*,*:logs* METADATA _index | stats sum(v) by _index | sort _index", + requestIncludeMeta + ) + ) { List> values = getValuesList(resp); assertThat(values.get(0), equalTo(List.of(285L, "cluster-a:logs-2"))); assertThat(values.get(1), equalTo(List.of(45L, "logs-1"))); @@ -455,6 +509,7 @@ public void testMetadataIndex() { EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); assertNotNull(executionInfo); assertThat(executionInfo.isCrossClusterSearch(), is(true)); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER); @@ -477,18 +532,6 @@ public void testMetadataIndex() { } } - void waitForNoInitializingShards(Client client, TimeValue timeout, String... indices) { - ClusterHealthResponse resp = client.admin() - .cluster() - .prepareHealth(TEST_REQUEST_TIMEOUT, indices) - .setWaitForEvents(Priority.LANGUID) - .setWaitForNoRelocatingShards(true) - .setWaitForNoInitializingShards(true) - .setTimeout(timeout) - .get(); - assertFalse(Strings.toString(resp, true, true), resp.isTimedOut()); - } - public void testProfile() { Map testClusterInfo = setupTwoClusters(); int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); @@ -529,6 +572,7 @@ public void testProfile() { EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER); assertNull(remoteCluster); assertThat(executionInfo.isCrossClusterSearch(), is(false)); + assertThat(executionInfo.includeCCSMetadata(), is(false)); // since this not a CCS, only the overall took time in the EsqlExecutionInfo matters assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); } @@ -550,6 +594,7 @@ public void testProfile() { EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); assertNotNull(executionInfo); assertThat(executionInfo.isCrossClusterSearch(), is(true)); + assertThat(executionInfo.includeCCSMetadata(), is(false)); assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER); @@ -582,6 +627,7 @@ public void testProfile() { EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); assertNotNull(executionInfo); assertThat(executionInfo.isCrossClusterSearch(), is(true)); + assertThat(executionInfo.includeCCSMetadata(), is(false)); assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER); @@ -608,14 +654,11 @@ public void testProfile() { public void testWarnings() throws Exception { Map testClusterInfo = setupTwoClusters(); - String localIndex = (String) testClusterInfo.get("local.index"); - String remoteIndex = (String) testClusterInfo.get("remote.index"); int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards"); EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); request.query("FROM logs*,*:logs* | EVAL ip = to_ip(id) | STATS total = sum(v) by ip | LIMIT 10"); - PlainActionFuture future = new PlainActionFuture<>(); InternalTestCluster cluster = cluster(LOCAL_CLUSTER); String node = randomFrom(cluster.getNodeNames()); CountDownLatch latch = new CountDownLatch(1); @@ -634,6 +677,7 @@ public void testWarnings() throws Exception { EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); assertNotNull(executionInfo); assertThat(executionInfo.isCrossClusterSearch(), is(true)); + assertThat(executionInfo.includeCCSMetadata(), is(false)); assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L)); EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER); @@ -662,11 +706,34 @@ public void testWarnings() throws Exception { assertTrue(latch.await(30, TimeUnit.SECONDS)); } - protected EsqlQueryResponse runQuery(String query) { + private static void assertClusterMetadataInResponse(EsqlQueryResponse resp, boolean responseExpectMeta) { + try { + final Map esqlResponseAsMap = XContentTestUtils.convertToMap(resp); + final Object clusters = esqlResponseAsMap.get("_clusters"); + if (responseExpectMeta) { + assertNotNull(clusters); + // test a few entries to ensure it looks correct (other tests do a full analysis of the metadata in the response) + @SuppressWarnings("unchecked") + Map inner = (Map) clusters; + assertTrue(inner.containsKey("total")); + assertTrue(inner.containsKey("details")); + } else { + assertNull(clusters); + } + } catch (IOException e) { + fail("Could not convert ESQL response to Map: " + e); + } + } + + protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse) { EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); request.query(query); request.pragmas(AbstractEsqlIntegTestCase.randomPragmas()); - request.profile(true); + request.profile(randomInt(5) == 2); + request.columnar(randomBoolean()); + if (ccsMetadataInResponse != null) { + request.includeCCSMetadata(ccsMetadataInResponse); + } return runQuery(request); } @@ -674,6 +741,32 @@ protected EsqlQueryResponse runQuery(EsqlQueryRequest request) { return client(LOCAL_CLUSTER).execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS); } + /** + * v1: value to send to runQuery (can be null; null means use default value) + * v2: whether to expect CCS Metadata in the response (cannot be null) + * @return + */ + public static Tuple randomIncludeCCSMetadata() { + return switch (randomIntBetween(1, 3)) { + case 1 -> new Tuple<>(Boolean.TRUE, Boolean.TRUE); + case 2 -> new Tuple<>(Boolean.FALSE, Boolean.FALSE); + case 3 -> new Tuple<>(null, Boolean.FALSE); + default -> throw new AssertionError("should not get here"); + }; + } + + void waitForNoInitializingShards(Client client, TimeValue timeout, String... indices) { + ClusterHealthResponse resp = client.admin() + .cluster() + .prepareHealth(TEST_REQUEST_TIMEOUT, indices) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setWaitForNoInitializingShards(true) + .setTimeout(timeout) + .get(); + assertFalse(Strings.toString(resp, true, true), resp.isTimedOut()); + } + Map setupTwoClusters() { String localIndex = "logs-1"; int numShardsLocal = randomIntBetween(1, 5); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index f7966ff5ae9ec..dabccd4ffeb17 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.action; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -63,24 +64,29 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable { private final transient Predicate skipUnavailablePredicate; private TimeValue overallTook; - public EsqlExecutionInfo() { - this(Predicates.always()); // default all clusters to skip_unavailable=true + // whether the user has asked for CCS metadata to be in the JSON response (the overall took will always be present) + private final boolean includeCCSMetadata; + + public EsqlExecutionInfo(boolean includeCCSMetadata) { + this(Predicates.always(), includeCCSMetadata); // default all clusters to skip_unavailable=true } /** * @param skipUnavailablePredicate provide lookup for whether a given cluster has skip_unavailable set to true or false */ - public EsqlExecutionInfo(Predicate skipUnavailablePredicate) { + public EsqlExecutionInfo(Predicate skipUnavailablePredicate, boolean includeCCSMetadata) { this.clusterInfo = ConcurrentCollections.newConcurrentMap(); this.skipUnavailablePredicate = skipUnavailablePredicate; + this.includeCCSMetadata = includeCCSMetadata; } /** * For testing use with fromXContent parsing only * @param clusterInfo */ - EsqlExecutionInfo(ConcurrentMap clusterInfo) { + EsqlExecutionInfo(ConcurrentMap clusterInfo, boolean includeCCSMetadata) { this.clusterInfo = clusterInfo; + this.includeCCSMetadata = includeCCSMetadata; this.skipUnavailablePredicate = Predicates.always(); } @@ -94,6 +100,11 @@ public EsqlExecutionInfo(StreamInput in) throws IOException { clusterList.forEach(c -> m.put(c.getClusterAlias(), c)); this.clusterInfo = m; } + if (in.getTransportVersion().onOrAfter(TransportVersions.OPT_IN_ESQL_CCS_EXECUTION_INFO)) { + this.includeCCSMetadata = in.readBoolean(); + } else { + this.includeCCSMetadata = false; + } this.skipUnavailablePredicate = Predicates.always(); } @@ -105,6 +116,13 @@ public void writeTo(StreamOutput out) throws IOException { } else { out.writeCollection(Collections.emptyList()); } + if (out.getTransportVersion().onOrAfter(TransportVersions.OPT_IN_ESQL_CCS_EXECUTION_INFO)) { + out.writeBoolean(includeCCSMetadata); + } + } + + public boolean includeCCSMetadata() { + return includeCCSMetadata; } public void overallTook(TimeValue took) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java index 4ab310863c61d..239f9e2696f88 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java @@ -42,6 +42,7 @@ public class EsqlQueryRequest extends org.elasticsearch.xpack.core.esql.action.E private String query; private boolean columnar; private boolean profile; + private boolean includeCCSMetadata; private Locale locale; private QueryBuilder filter; private QueryPragmas pragmas = new QueryPragmas(Settings.EMPTY); @@ -128,6 +129,14 @@ public void profile(boolean profile) { this.profile = profile; } + public void includeCCSMetadata(boolean include) { + this.includeCCSMetadata = include; + } + + public boolean includeCCSMetadata() { + return includeCCSMetadata; + } + /** * Is profiling enabled? */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java index 3232f3a9118d4..4e59d5419fe6f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java @@ -206,7 +206,7 @@ public Iterator toXContentChunked(ToXContent.Params params b.append(ResponseXContentUtils.allColumns(columns, "columns")); } b.array("values", ResponseXContentUtils.columnValues(this.columns, this.pages, columnar, nullColumns)); - if (executionInfo != null && executionInfo.isCrossClusterSearch()) { + if (executionInfo != null && executionInfo.isCrossClusterSearch() && executionInfo.includeCCSMetadata()) { b.field("_clusters", executionInfo); } if (profile != null) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RequestXContent.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RequestXContent.java index b930fa5823404..7224aa049093d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RequestXContent.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RequestXContent.java @@ -80,6 +80,7 @@ String fields() { private static final ParseField LOCALE_FIELD = new ParseField("locale"); private static final ParseField PROFILE_FIELD = new ParseField("profile"); private static final ParseField ACCEPT_PRAGMA_RISKS = new ParseField("accept_pragma_risks"); + private static final ParseField INCLUDE_CCS_METADATA_FIELD = new ParseField("include_ccs_metadata"); static final ParseField TABLES_FIELD = new ParseField("tables"); static final ParseField WAIT_FOR_COMPLETION_TIMEOUT = new ParseField("wait_for_completion_timeout"); @@ -117,6 +118,7 @@ private static void objectParserCommon(ObjectParser parser) parser.declareBoolean(EsqlQueryRequest::columnar, COLUMNAR_FIELD); parser.declareObject(EsqlQueryRequest::filter, (p, c) -> AbstractQueryBuilder.parseTopLevelQuery(p), FILTER_FIELD); parser.declareBoolean(EsqlQueryRequest::acceptedPragmaRisks, ACCEPT_PRAGMA_RISKS); + parser.declareBoolean(EsqlQueryRequest::includeCCSMetadata, INCLUDE_CCS_METADATA_FIELD); parser.declareObject( EsqlQueryRequest::pragmas, (p, c) -> new QueryPragmas(Settings.builder().loadFromMap(p.map()).build()), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index ce2a1d7a5f660..f714695504a1d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -834,7 +834,7 @@ public void messageReceived(ClusterComputeRequest request, TransportChannel chan * execution metadata for ES|QL processing local to this cluster. The execution info will be copied into the * ComputeResponse that is sent back to the primary coordinating cluster. */ - EsqlExecutionInfo execInfo = new EsqlExecutionInfo(); + EsqlExecutionInfo execInfo = new EsqlExecutionInfo(true); execInfo.swapCluster(clusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(clusterAlias, Arrays.toString(request.indices()))); CancellableTask cancellable = (CancellableTask) task; long start = request.configuration().getQueryStartTimeNanos(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlMediaTypeParser.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlMediaTypeParser.java index 915efe9302a92..17329ca2e0054 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlMediaTypeParser.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlMediaTypeParser.java @@ -37,10 +37,15 @@ public class EsqlMediaTypeParser { * format. If there is a {@code format} parameter we use that. If there * isn't but there is a {@code Accept} header then we use that. If there * isn't then we use the {@code Content-Type} header which is required. + * + * Also validates certain parameter combinations and throws IllegalArgumentException if invalid + * combinations are detected. */ public static MediaType getResponseMediaType(RestRequest request, EsqlQueryRequest esqlRequest) { var mediaType = request.hasParam(URL_PARAM_FORMAT) ? mediaTypeFromParams(request) : mediaTypeFromHeaders(request); - return validateColumnarRequest(esqlRequest.columnar(), mediaType, request); + validateColumnarRequest(esqlRequest.columnar(), mediaType); + validateIncludeCCSMetadata(esqlRequest.includeCCSMetadata(), mediaType); + return checkNonNullMediaType(mediaType, request); } private static MediaType mediaTypeFromHeaders(RestRequest request) { @@ -53,7 +58,7 @@ private static MediaType mediaTypeFromParams(RestRequest request) { return MEDIA_TYPE_REGISTRY.queryParamToMediaType(request.param(URL_PARAM_FORMAT)); } - private static MediaType validateColumnarRequest(boolean requestIsColumnar, MediaType fromMediaType, RestRequest request) { + private static void validateColumnarRequest(boolean requestIsColumnar, MediaType fromMediaType) { if (requestIsColumnar && fromMediaType instanceof TextFormat) { throw new IllegalArgumentException( "Invalid use of [columnar] argument: cannot be used in combination with " @@ -61,7 +66,16 @@ private static MediaType validateColumnarRequest(boolean requestIsColumnar, Medi + " formats" ); } - return checkNonNullMediaType(fromMediaType, request); + } + + private static void validateIncludeCCSMetadata(boolean includeCCSMetadata, MediaType fromMediaType) { + if (includeCCSMetadata && fromMediaType instanceof TextFormat) { + throw new IllegalArgumentException( + "Invalid use of [include_ccs_metadata] argument: cannot be used in combination with " + + Arrays.stream(TextFormat.values()).map(MediaType::queryParameter).toList() + + " formats" + ); + } } private static MediaType checkNonNullMediaType(MediaType mediaType, RestRequest request) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 17c795f2de28c..193930cdf711d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -167,7 +167,10 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener remoteClusterService.isSkipUnavailable(clusterAlias)); + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo( + clusterAlias -> remoteClusterService.isSkipUnavailable(clusterAlias), + request.includeCCSMetadata() + ); BiConsumer> runPhase = (physicalPlan, resultListener) -> computeService.execute( sessionId, (CancellableTask) task, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index 3eef31e1cc406..965358c0c3f8c 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -433,7 +433,7 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception { session.executeOptimizedPlan( new EsqlQueryRequest(), - new EsqlExecutionInfo(), + new EsqlExecutionInfo(randomBoolean()), runPhase(bigArrays, physicalOperationProviders), session.optimizedPlan(analyzed), listener.delegateFailureAndWrap( diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java index abf03d4fe06dd..b147cfde21721 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java @@ -134,7 +134,7 @@ EsqlQueryResponse randomResponseAsync(boolean columnar, EsqlQueryResponse.Profil } EsqlExecutionInfo createExecutionInfo() { - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(); + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); executionInfo.overallTook(new TimeValue(5000)); executionInfo.swapCluster( "", @@ -426,9 +426,9 @@ static EsqlExecutionInfo parseClusters(XContentParser parser) throws IOException } } if (clusterInfoMap.isEmpty()) { - return new EsqlExecutionInfo(); + return new EsqlExecutionInfo(true); } else { - return new EsqlExecutionInfo(clusterInfoMap); + return new EsqlExecutionInfo(clusterInfoMap, true); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatterTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatterTests.java index c145d770409da..e735ba83168bb 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatterTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatterTests.java @@ -82,7 +82,7 @@ public class TextFormatterTests extends ESTestCase { null, randomBoolean(), randomBoolean(), - new EsqlExecutionInfo() + new EsqlExecutionInfo(randomBoolean()) ); TextFormatter formatter = new TextFormatter(esqlResponse); @@ -157,7 +157,7 @@ public void testFormatWithoutHeader() { null, randomBoolean(), randomBoolean(), - new EsqlExecutionInfo() + new EsqlExecutionInfo(randomBoolean()) ); String[] result = getTextBodyContent(new TextFormatter(response).format(false)).split("\n"); @@ -198,7 +198,7 @@ public void testVeryLongPadding() { null, randomBoolean(), randomBoolean(), - new EsqlExecutionInfo() + new EsqlExecutionInfo(randomBoolean()) ) ).format(false) ) diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java index da11a790e6f2f..8cfcb605a19d5 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java @@ -125,7 +125,7 @@ private ComputeResponse randomResponse(boolean includeExecutionInfo) { public void testEmpty() { PlainActionFuture results = new PlainActionFuture<>(); - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(); + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(randomBoolean()); try ( ComputeListener ignored = ComputeListener.create( RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, @@ -145,7 +145,7 @@ public void testEmpty() { public void testCollectComputeResults() { PlainActionFuture future = new PlainActionFuture<>(); List allProfiles = new ArrayList<>(); - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(); + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(randomBoolean()); try ( ComputeListener computeListener = ComputeListener.create( RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, @@ -194,7 +194,7 @@ public void testAcquireComputeCCSListener() { PlainActionFuture future = new PlainActionFuture<>(); List allProfiles = new ArrayList<>(); String remoteAlias = "rc1"; - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(); + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); executionInfo.swapCluster(remoteAlias, (k, v) -> new EsqlExecutionInfo.Cluster(remoteAlias, "logs*", false)); try ( ComputeListener computeListener = ComputeListener.create( @@ -248,7 +248,7 @@ public void testAcquireComputeCCSListener() { public void testAcquireComputeRunningOnRemoteClusterFillsInTookTime() { PlainActionFuture future = new PlainActionFuture<>(); List allProfiles = new ArrayList<>(); - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(); + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); String remoteAlias = "rc1"; executionInfo.swapCluster( remoteAlias, @@ -318,7 +318,7 @@ public void testAcquireComputeRunningOnRemoteClusterFillsInTookTime() { public void testAcquireComputeRunningOnQueryingClusterFillsInTookTime() { PlainActionFuture future = new PlainActionFuture<>(); List allProfiles = new ArrayList<>(); - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(); + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); String localCluster = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; // we need a remote cluster in the ExecutionInfo in order to simulate a CCS, since ExecutionInfo is only // fully filled in for cross-cluster searches @@ -372,7 +372,7 @@ public void testCancelOnFailure() throws Exception { int failedTasks = between(1, 100); PlainActionFuture rootListener = new PlainActionFuture<>(); CancellableTask rootTask = newTask(); - EsqlExecutionInfo execInfo = new EsqlExecutionInfo(); + EsqlExecutionInfo execInfo = new EsqlExecutionInfo(randomBoolean()); try ( ComputeListener computeListener = ComputeListener.create( RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, @@ -436,7 +436,7 @@ public void onFailure(Exception e) { } }; CountDownLatch latch = new CountDownLatch(1); - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(); + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(randomBoolean()); try ( ComputeListener computeListener = ComputeListener.create( RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/EsqlMediaTypeParserTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/EsqlMediaTypeParserTests.java index 789d6e5adbfc7..4b9166c621940 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/EsqlMediaTypeParserTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/EsqlMediaTypeParserTests.java @@ -80,6 +80,18 @@ public void testColumnarWithAcceptText() { assertEquals(e.getMessage(), "Invalid use of [columnar] argument: cannot be used in combination with [txt, csv, tsv] formats"); } + public void testIncludeCCSMetadataWithAcceptText() { + var accept = randomFrom("text/plain", "text/csv", "text/tab-separated-values"); + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> getResponseMediaType(reqWithAccept(accept), createTestInstance(false, true)) + ); + assertEquals( + "Invalid use of [include_ccs_metadata] argument: cannot be used in combination with [txt, csv, tsv] formats", + e.getMessage() + ); + } + public void testColumnarWithParamText() { IllegalArgumentException e = expectThrows( IllegalArgumentException.class, @@ -88,6 +100,26 @@ public void testColumnarWithParamText() { assertEquals(e.getMessage(), "Invalid use of [columnar] argument: cannot be used in combination with [txt, csv, tsv] formats"); } + public void testIncludeCCSMetadataWithNonJSONMediaTypesInParams() { + { + RestRequest restRequest = reqWithParams(Map.of("format", randomFrom("txt", "csv", "tsv"))); + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> getResponseMediaType(restRequest, createTestInstance(false, true)) + ); + assertEquals( + "Invalid use of [include_ccs_metadata] argument: cannot be used in combination with [txt, csv, tsv] formats", + e.getMessage() + ); + } + { + // check that no exception is thrown for the XContent types + RestRequest restRequest = reqWithParams(Map.of("format", randomFrom("SMILE", "YAML", "CBOR", "JSON"))); + MediaType responseMediaType = getResponseMediaType(restRequest, createTestInstance(true, true)); + assertNotNull(responseMediaType); + } + } + public void testNoFormat() { IllegalArgumentException e = expectThrows( IllegalArgumentException.class, @@ -113,4 +145,10 @@ protected EsqlQueryRequest createTestInstance(boolean columnar) { request.columnar(columnar); return request; } + + protected EsqlQueryRequest createTestInstance(boolean columnar, boolean includeCCSMetadata) { + var request = createTestInstance(columnar); + request.includeCCSMetadata(includeCCSMetadata); + return request; + } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java index 326756ad0b5f4..7e93213fcee21 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java @@ -30,7 +30,7 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; final String remote1Alias = "remote1"; final String remote2Alias = "remote2"; - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(); + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", true)); @@ -59,7 +59,7 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; final String remote1Alias = "remote1"; final String remote2Alias = "remote2"; - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(); + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); @@ -87,7 +87,7 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; final String remote1Alias = "remote1"; final String remote2Alias = "remote2"; - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(); + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); @@ -117,7 +117,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; final String remote1Alias = "remote1"; final String remote2Alias = "remote2"; - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(); + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); @@ -160,7 +160,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; final String remote1Alias = "remote1"; final String remote2Alias = "remote2"; - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(); + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); @@ -206,7 +206,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; final String remote1Alias = "remote1"; final String remote2Alias = "remote2"; - EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(); + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", false)); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/stats/PlanExecutorMetricsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/stats/PlanExecutorMetricsTests.java index adc449bfc092e..9edc85223e7b3 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/stats/PlanExecutorMetricsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/stats/PlanExecutorMetricsTests.java @@ -120,7 +120,7 @@ public void testFailedMetric() { randomAlphaOfLength(10), EsqlTestUtils.TEST_CFG, enrichResolver, - new EsqlExecutionInfo(), + new EsqlExecutionInfo(randomBoolean()), groupIndicesByCluster, runPhase, new ActionListener<>() { @@ -149,7 +149,7 @@ public void onFailure(Exception e) { randomAlphaOfLength(10), EsqlTestUtils.TEST_CFG, enrichResolver, - new EsqlExecutionInfo(), + new EsqlExecutionInfo(randomBoolean()), groupIndicesByCluster, runPhase, new ActionListener<>() {