Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions docs/reference/esql/esql-across-clusters.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,10 @@ FROM *:my-index-000001
[[ccq-cluster-details]]
==== Cross-cluster metadata

ES|QL {ccs} responses include metadata about the search on each cluster when the response format is JSON.
Using the `"include_ccs_metadata": true` option, users can request that
ES|QL {ccs} responses include metadata about the search on each cluster (when the response format is JSON).
Here we show an example using the async search endpoint. {ccs-cap} metadata is also present in the synchronous
search endpoint.
search endpoint response when requested.

[source,console]
----
Expand All @@ -200,7 +201,8 @@ POST /_query/async?format=json
FROM my-index-000001,cluster_one:my-index-000001,cluster_two:my-index*
| STATS COUNT(http.response.status_code) BY user.id
| LIMIT 2
"""
""",
"include_ccs_metadata": true
}
----
// TEST[setup:my_index]
Expand Down Expand Up @@ -238,7 +240,7 @@ Which returns:
"(local)": { <4>
"status": "successful",
"indices": "blogs",
"took": 36, <5>
"took": 41, <5>
"_shards": { <6>
"total": 13,
"successful": 13,
Expand All @@ -260,7 +262,7 @@ Which returns:
"cluster_two": {
"status": "successful",
"indices": "cluster_two:my-index*",
"took": 41,
"took": 40,
"_shards": {
"total": 18,
"successful": 18,
Expand All @@ -286,17 +288,14 @@ it is identified as "(local)".
<5> How long (in milliseconds) the search took on each cluster. This can be useful to determine
which clusters have slower response times than others.
<6> The shard details for the search on that cluster, including a count of shards that were
skipped due to the can-match phase. Shards are skipped when they cannot have any matching data
skipped due to the can-match phase results. Shards are skipped when they cannot have any matching data
and therefore are not included in the full ES|QL query.


The cross-cluster metadata can be used to determine whether any data came back from a cluster.
For instance, in the query below, the wildcard expression for `cluster-two` did not resolve
to a concrete index (or indices). The cluster is, therefore, marked as 'skipped' and the total
number of shards searched is set to zero.
Since the other cluster did have a matching index, the search did not return an error, but
instead returned all the matching data it could find.


[source,console]
----
Expand All @@ -306,7 +305,8 @@ POST /_query/async?format=json
FROM cluster_one:my-index*,cluster_two:logs*
| STATS COUNT(http.response.status_code) BY user.id
| LIMIT 2
"""
""",
"include_ccs_metadata": true
}
----
// TEST[continued]
Expand Down
13 changes: 13 additions & 0 deletions docs/reference/esql/esql-query-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ precedence.
`false`. The API only supports this parameter for CBOR, JSON, SMILE, and YAML
responses. See <<esql-rest-columnar>>.

`include_ccs_metadata`::
(Optional, boolean) If `true`, cross-cluster searches will include metadata about the query
on each cluster. Defaults to `false`. The API only supports this parameter for CBOR, JSON, SMILE,
and YAML responses. See <<ccq-cluster-details>>.

`locale`::
(Optional, string) Returns results (especially dates) formatted per the conventions of the locale.
For syntax, refer to <<esql-locale-param>>.
Expand All @@ -85,6 +90,7 @@ https://en.wikipedia.org/wiki/Query_plan[EXPLAIN PLAN].
`query`::
(Required, string) {esql} query to run. For syntax, refer to <<esql-syntax>>.


ifeval::["{release-state}"=="unreleased"]
`table`::
(Optional, object) Named "table" parameters that can be referenced by the <<esql-lookup>> command.
Expand All @@ -108,6 +114,13 @@ returned if `drop_null_columns` is sent with the request.
(array of arrays)
Values for the search results.

`_clusters`::
(object)
Metadata about clusters involved in the execution of a cross-cluster query. Only returned (1) for
cross-cluster searches and (2) when `include_ccs_metadata` is sent in the body and set to `true`
and (3) when `format` of the response is set to JSON (the default), CBOR, SMILE, or YAML.
See <<ccq-cluster-details>> for more information.

`profile`::
(object)
Profile describing the execution of the query. Only returned if `profile` was sent in the body.
Expand Down
2 changes: 2 additions & 0 deletions server/src/main/java/org/elasticsearch/TransportVersions.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ static TransportVersion def(int id) {
public static final TransportVersion SIMULATE_INDEX_TEMPLATES_SUBSTITUTIONS = def(8_764_00_0);
public static final TransportVersion RETRIEVERS_TELEMETRY_ADDED = def(8_765_00_0);
public static final TransportVersion ESQL_CACHED_STRING_SERIALIZATION = def(8_766_00_0);
public static final TransportVersion CHUNK_SENTENCE_OVERLAP_SETTING_ADDE1D = def(8_767_00_0);
public static final TransportVersion OPT_IN_ESQL_CCS_EXECUTION_INFO = def(8_768_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,18 @@ void indexDocs(RestClient client, String index, List<Doc> docs) throws IOExcepti
refresh(client, index);
}

private Map<String, Object> run(String query) throws IOException {
Map<String, Object> resp = runEsql(new RestEsqlTestCase.RequestObjectBuilder().query(query).build());
private Map<String, Object> run(String query, boolean includeCCSMetadata) throws IOException {
Map<String, Object> resp = runEsql(
new RestEsqlTestCase.RequestObjectBuilder().query(query).includeCCSMetadata(includeCCSMetadata).build()
);
logger.info("--> query {} response {}", query, resp);
return resp;
}

private Map<String, Object> runWithColumnarAndIncludeCCSMetadata(String query) throws IOException {
Map<String, Object> resp = runEsql(
new RestEsqlTestCase.RequestObjectBuilder().query(query).includeCCSMetadata(true).columnar(true).build()
);
logger.info("--> query {} response {}", query, resp);
return resp;
}
Expand All @@ -147,62 +157,77 @@ private Map<String, Object> runEsql(RestEsqlTestCase.RequestObjectBuilder reques

public void testCount() throws Exception {
{
Map<String, Object> result = run("FROM test-local-index,*:test-remote-index | STATS c = COUNT(*)");
boolean includeCCSMetadata = randomBoolean();
Map<String, Object> result = run("FROM test-local-index,*:test-remote-index | STATS c = COUNT(*)", includeCCSMetadata);
var columns = List.of(Map.of("name", "c", "type", "long"));
var values = List.of(List.of(localDocs.size() + remoteDocs.size()));

MapMatcher mapMatcher = matchesMap();
assertMap(
result,
mapMatcher.entry("columns", columns)
.entry("values", values)
.entry("took", greaterThanOrEqualTo(0))
.entry("_clusters", any(Map.class))
);
assertClusterDetailsMap(result, false);
if (includeCCSMetadata) {
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
}
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
if (includeCCSMetadata) {
assertClusterDetailsMap(result, false);
}
}
{
Map<String, Object> result = run("FROM *:test-remote-index | STATS c = COUNT(*)");
boolean includeCCSMetadata = randomBoolean();
Map<String, Object> result = run("FROM *:test-remote-index | STATS c = COUNT(*)", includeCCSMetadata);
var columns = List.of(Map.of("name", "c", "type", "long"));
var values = List.of(List.of(remoteDocs.size()));

MapMatcher mapMatcher = matchesMap();
assertMap(
result,
mapMatcher.entry("columns", columns)
.entry("values", values)
.entry("took", greaterThanOrEqualTo(0))
.entry("_clusters", any(Map.class))
);
assertClusterDetailsMap(result, true);
if (includeCCSMetadata) {
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
}
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
if (includeCCSMetadata) {
assertClusterDetailsMap(result, true);
}
}
}

public void testUngroupedAggs() throws Exception {
{
Map<String, Object> result = run("FROM test-local-index,*:test-remote-index | STATS total = SUM(data)");
boolean includeCCSMetadata = randomBoolean();
Map<String, Object> result = run("FROM test-local-index,*:test-remote-index | STATS total = SUM(data)", includeCCSMetadata);
var columns = List.of(Map.of("name", "total", "type", "long"));
long sum = Stream.concat(localDocs.stream(), remoteDocs.stream()).mapToLong(d -> d.data).sum();
var values = List.of(List.of(Math.toIntExact(sum)));

// check all sections of map except _cluster/details
MapMatcher mapMatcher = matchesMap();
assertMap(
result,
mapMatcher.entry("columns", columns)
.entry("values", values)
.entry("took", greaterThanOrEqualTo(0))
.entry("_clusters", any(Map.class))
);
assertClusterDetailsMap(result, false);
if (includeCCSMetadata) {
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
}
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
if (includeCCSMetadata) {
assertClusterDetailsMap(result, false);
}
}
{
Map<String, Object> result = run("FROM *:test-remote-index | STATS total = SUM(data)");
boolean includeCCSMetadata = randomBoolean();
Map<String, Object> result = run("FROM *:test-remote-index | STATS total = SUM(data)", includeCCSMetadata);
var columns = List.of(Map.of("name", "total", "type", "long"));
long sum = remoteDocs.stream().mapToLong(d -> d.data).sum();
var values = List.of(List.of(Math.toIntExact(sum)));

MapMatcher mapMatcher = matchesMap();
if (includeCCSMetadata) {
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
}
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
if (includeCCSMetadata) {
assertClusterDetailsMap(result, true);
}
}
{
Map<String, Object> result = runWithColumnarAndIncludeCCSMetadata("FROM *:test-remote-index | STATS total = SUM(data)");
var columns = List.of(Map.of("name", "total", "type", "long"));
long sum = remoteDocs.stream().mapToLong(d -> d.data).sum();
var values = List.of(List.of(Math.toIntExact(sum)));

// check all sections of map except _cluster/details
MapMatcher mapMatcher = matchesMap();
assertMap(
result,
Expand Down Expand Up @@ -269,7 +294,11 @@ private void assertClusterDetailsMap(Map<String, Object> result, boolean remoteO

public void testGroupedAggs() throws Exception {
{
Map<String, Object> result = run("FROM test-local-index,*:test-remote-index | STATS total = SUM(data) BY color | SORT color");
boolean includeCCSMetadata = randomBoolean();
Map<String, Object> result = run(
"FROM test-local-index,*:test-remote-index | STATS total = SUM(data) BY color | SORT color",
includeCCSMetadata
);
var columns = List.of(Map.of("name", "total", "type", "long"), Map.of("name", "color", "type", "keyword"));
var values = Stream.concat(localDocs.stream(), remoteDocs.stream())
.collect(Collectors.toMap(d -> d.color, Doc::data, Long::sum))
Expand All @@ -280,17 +309,20 @@ public void testGroupedAggs() throws Exception {
.toList();

MapMatcher mapMatcher = matchesMap();
assertMap(
result,
mapMatcher.entry("columns", columns)
.entry("values", values)
.entry("took", greaterThanOrEqualTo(0))
.entry("_clusters", any(Map.class))
);
assertClusterDetailsMap(result, false);
if (includeCCSMetadata) {
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
}
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
if (includeCCSMetadata) {
assertClusterDetailsMap(result, false);
}
}
{
Map<String, Object> result = run("FROM *:test-remote-index | STATS total = SUM(data) by color | SORT color");
boolean includeCCSMetadata = randomBoolean();
Map<String, Object> result = run(
"FROM *:test-remote-index | STATS total = SUM(data) by color | SORT color",
includeCCSMetadata
);
var columns = List.of(Map.of("name", "total", "type", "long"), Map.of("name", "color", "type", "keyword"));
var values = remoteDocs.stream()
.collect(Collectors.toMap(d -> d.color, Doc::data, Long::sum))
Expand All @@ -300,16 +332,15 @@ public void testGroupedAggs() throws Exception {
.map(e -> List.of(Math.toIntExact(e.getValue()), e.getKey()))
.toList();

// check all sections of map except _cluster/details
// check all sections of map except _clusters/details
MapMatcher mapMatcher = matchesMap();
assertMap(
result,
mapMatcher.entry("columns", columns)
.entry("values", values)
.entry("took", greaterThanOrEqualTo(0))
.entry("_clusters", any(Map.class))
);
assertClusterDetailsMap(result, true);
if (includeCCSMetadata) {
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
}
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
if (includeCCSMetadata) {
assertClusterDetailsMap(result, true);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public void testBasicEsql() throws IOException {
indexTimestampData(1);

RequestObjectBuilder builder = requestObjectBuilder().query(fromIndex() + " | stats avg(value)");
requestObjectBuilder().includeCCSMetadata(randomBoolean());
if (Build.current().isSnapshot()) {
builder.pragmas(Settings.builder().put("data_partitioning", "shard").build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ public static class RequestObjectBuilder {
private Boolean keepOnCompletion = null;

private Boolean profile = null;
private Boolean includeCCSMetadata = null;

private CheckedConsumer<XContentBuilder, IOException> filter;

Expand Down Expand Up @@ -197,6 +198,11 @@ public RequestObjectBuilder profile(boolean profile) {
return this;
}

public RequestObjectBuilder includeCCSMetadata(boolean includeCCSMetadata) {
this.includeCCSMetadata = includeCCSMetadata;
return this;
}

public RequestObjectBuilder filter(CheckedConsumer<XContentBuilder, IOException> filter) {
this.filter = filter;
return this;
Expand All @@ -220,6 +226,9 @@ public RequestObjectBuilder build() throws IOException {
if (profile != null) {
builder.field("profile", profile);
}
if (includeCCSMetadata != null) {
builder.field("include_ccs_metadata", includeCCSMetadata);
}
if (filter != null) {
builder.startObject("filter");
filter.accept(builder);
Expand Down
Loading
Loading