Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
f627d45
ESQL CCS metadata in responses is opt-in, off by default
quux00 Oct 9, 2024
30b65b4
Cleanup and updated end user API docs
quux00 Oct 9, 2024
9d619f4
Added includeCCSMetadata to EsqlExecutionInfo
quux00 Oct 9, 2024
587500e
Restored jdk-deprecated.txt
quux00 Oct 9, 2024
3afa679
PR feedback: fixed code comment
quux00 Oct 9, 2024
2cc8c38
Merge remote-tracking branch 'elastic/main' into esql-ccs/ccs-metadat…
quux00 Oct 9, 2024
03d37e5
Request.includeCCSMetadata passed into EsqlExecutionInfo ctor
quux00 Oct 9, 2024
ae46e37
Fixed failing MultiClustersIT. Adding randomization for include_ccs_m…
quux00 Oct 9, 2024
f487781
Merge remote-tracking branch 'elastic/main' into esql-ccs/ccs-metadat…
quux00 Oct 9, 2024
3e6849d
Updated the CrossCluster IT tests to set random values for includeCCS…
quux00 Oct 10, 2024
4cd2513
Merge remote-tracking branch 'elastic/main' into esql-ccs/ccs-metadat…
quux00 Oct 10, 2024
6703610
Added restriction that include_ccs_metadata can only be used with for…
quux00 Oct 10, 2024
b0807b8
Merge remote-tracking branch 'elastic/main' into esql-ccs/ccs-metadat…
quux00 Oct 10, 2024
6f7bb2a
PR feedback: include_ccs_metadata works with all XContent types, but …
quux00 Oct 10, 2024
ee53b6b
Merge remote-tracking branch 'elastic/main' into esql-ccs/ccs-metadat…
quux00 Oct 10, 2024
6da4841
PR feedback: Added include_ccs_metadata into RestEsqlIT
quux00 Oct 11, 2024
1191432
Merge remote-tracking branch 'elastic/main' into esql-ccs/ccs-metadat…
quux00 Oct 11, 2024
2d2dd09
Merge remote-tracking branch 'elastic/main' into esql-ccs/ccs-metadat…
quux00 Oct 11, 2024
f8ea2bf
Merge remote-tracking branch 'elastic/main' into esql-ccs/ccs-metadat…
quux00 Oct 11, 2024
e6d95bb
Merge remote-tracking branch 'elastic/main' into esql-ccs/ccs-metadat…
quux00 Oct 11, 2024
c35af29
Merge remote-tracking branch 'elastic/main' into esql-ccs/ccs-metadat…
quux00 Oct 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ java.lang.String#getBytes(int,int,byte[],int)
# java.lang.System#getSecurityManager()
# java.lang.System#setSecurityManager(java.lang.SecurityManager)
java.lang.Thread#checkAccess()
java.lang.Thread#countStackFrames()
java.lang.Thread#resume()
java.lang.Thread#stop()
java.lang.Thread#suspend()
Expand Down
20 changes: 10 additions & 10 deletions docs/reference/esql/esql-across-clusters.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,10 @@ FROM *:my-index-000001
[[ccq-cluster-details]]
==== Cross-cluster metadata

ES|QL {ccs} responses include metadata about the search on each cluster when the response format is JSON.
Using the `"include_ccs_metadata": true` option, users can request that
ES|QL {ccs} responses include metadata about the search on each cluster (when the response format is JSON).
Here we show an example using the async search endpoint. {ccs-cap} metadata is also present in the synchronous
search endpoint.
search endpoint response when requested.

[source,console]
----
Expand All @@ -200,7 +201,8 @@ POST /_query/async?format=json
FROM my-index-000001,cluster_one:my-index-000001,cluster_two:my-index*
| STATS COUNT(http.response.status_code) BY user.id
| LIMIT 2
"""
""",
"include_ccs_metadata": true
}
----
// TEST[setup:my_index]
Expand Down Expand Up @@ -238,7 +240,7 @@ Which returns:
"(local)": { <4>
"status": "successful",
"indices": "blogs",
"took": 36, <5>
"took": 41, <5>
"_shards": { <6>
"total": 13,
"successful": 13,
Expand All @@ -260,7 +262,7 @@ Which returns:
"cluster_two": {
"status": "successful",
"indices": "cluster_two:my-index*",
"took": 41,
"took": 40,
"_shards": {
"total": 18,
"successful": 18,
Expand All @@ -286,17 +288,14 @@ it is identified as "(local)".
<5> How long (in milliseconds) the search took on each cluster. This can be useful to determine
which clusters have slower response times than others.
<6> The shard details for the search on that cluster, including a count of shards that were
skipped due to the can-match phase. Shards are skipped when they cannot have any matching data
skipped due to the can-match phase results. Shards are skipped when they cannot have any matching data
and therefore are not included in the full ES|QL query.


The cross-cluster metadata can be used to determine whether any data came back from a cluster.
For instance, in the query below, the wildcard expression for `cluster-two` did not resolve
to a concrete index (or indices). The cluster is, therefore, marked as 'skipped' and the total
number of shards searched is set to zero.
Since the other cluster did have a matching index, the search did not return an error, but
instead returned all the matching data it could find.


[source,console]
----
Expand All @@ -306,7 +305,8 @@ POST /_query/async?format=json
FROM cluster_one:my-index*,cluster_two:logs*
| STATS COUNT(http.response.status_code) BY user.id
| LIMIT 2
"""
""",
"include_ccs_metadata": true
}
----
// TEST[continued]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ static TransportVersion def(int id) {
public static final TransportVersion FAST_REFRESH_RCO = def(8_762_00_0);
public static final TransportVersion TEXT_SIMILARITY_RERANKER_QUERY_REWRITE = def(8_763_00_0);
public static final TransportVersion SIMULATE_INDEX_TEMPLATES_SUBSTITUTIONS = def(8_764_00_0);
public static final TransportVersion OPT_IN_ESQL_CCS_EXECUTION_INFO = def(8_765_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -63,24 +64,29 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
private final transient Predicate<String> skipUnavailablePredicate;
private TimeValue overallTook;

public EsqlExecutionInfo() {
this(Predicates.always()); // default all clusters to skip_unavailable=true
// whether the user has asked for CCS metadata to be the JSON response (the overall took will always be present)
private final boolean includeCCSMetadata;

public EsqlExecutionInfo(boolean includeCCSMetadata) {
this(Predicates.always(), includeCCSMetadata); // default all clusters to skip_unavailable=true
}

/**
* @param skipUnavailablePredicate provide lookup for whether a given cluster has skip_unavailable set to true or false
*/
public EsqlExecutionInfo(Predicate<String> skipUnavailablePredicate) {
public EsqlExecutionInfo(Predicate<String> skipUnavailablePredicate, boolean includeCCSMetadata) {
this.clusterInfo = ConcurrentCollections.newConcurrentMap();
this.skipUnavailablePredicate = skipUnavailablePredicate;
this.includeCCSMetadata = includeCCSMetadata;
}

/**
* For testing use with fromXContent parsing only
* @param clusterInfo
*/
EsqlExecutionInfo(ConcurrentMap<String, Cluster> clusterInfo) {
EsqlExecutionInfo(ConcurrentMap<String, Cluster> clusterInfo, boolean includeCCSMetadata) {
this.clusterInfo = clusterInfo;
this.includeCCSMetadata = includeCCSMetadata;
this.skipUnavailablePredicate = Predicates.always();
}

Expand All @@ -94,6 +100,11 @@ public EsqlExecutionInfo(StreamInput in) throws IOException {
clusterList.forEach(c -> m.put(c.getClusterAlias(), c));
this.clusterInfo = m;
}
if (in.getTransportVersion().onOrAfter(TransportVersions.OPT_IN_ESQL_CCS_EXECUTION_INFO)) {
this.includeCCSMetadata = in.readBoolean();
} else {
this.includeCCSMetadata = false;
}
this.skipUnavailablePredicate = Predicates.always();
}

Expand All @@ -105,6 +116,13 @@ public void writeTo(StreamOutput out) throws IOException {
} else {
out.writeCollection(Collections.emptyList());
}
if (out.getTransportVersion().onOrAfter(TransportVersions.OPT_IN_ESQL_CCS_EXECUTION_INFO)) {
out.writeBoolean(includeCCSMetadata);
}
}

public boolean includeCCSMetadata() {
return includeCCSMetadata;
}

public void overallTook(TimeValue took) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class EsqlQueryRequest extends org.elasticsearch.xpack.core.esql.action.E
private String query;
private boolean columnar;
private boolean profile;
private boolean includeCCSMetadata;
private Locale locale;
private QueryBuilder filter;
private QueryPragmas pragmas = new QueryPragmas(Settings.EMPTY);
Expand Down Expand Up @@ -128,6 +129,14 @@ public void profile(boolean profile) {
this.profile = profile;
}

public void includeCCSMetadata(boolean include) {
this.includeCCSMetadata = include;
}

public boolean includeCCSMetadata() {
return includeCCSMetadata;
}

/**
* Is profiling enabled?
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
b.append(ResponseXContentUtils.allColumns(columns, "columns"));
}
b.array("values", ResponseXContentUtils.columnValues(this.columns, this.pages, columnar, nullColumns));
if (executionInfo != null && executionInfo.isCrossClusterSearch()) {
if (executionInfo != null && executionInfo.isCrossClusterSearch() && executionInfo.includeCCSMetadata()) {
b.field("_clusters", executionInfo);
}
if (profile != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ String fields() {
private static final ParseField LOCALE_FIELD = new ParseField("locale");
private static final ParseField PROFILE_FIELD = new ParseField("profile");
private static final ParseField ACCEPT_PRAGMA_RISKS = new ParseField("accept_pragma_risks");
private static final ParseField INCLUDE_CCS_METADATA_FIELD = new ParseField("include_ccs_metadata");
static final ParseField TABLES_FIELD = new ParseField("tables");

static final ParseField WAIT_FOR_COMPLETION_TIMEOUT = new ParseField("wait_for_completion_timeout");
Expand All @@ -94,6 +95,7 @@ private static void objectParserCommon(ObjectParser<EsqlQueryRequest, ?> parser)
parser.declareBoolean(EsqlQueryRequest::columnar, COLUMNAR_FIELD);
parser.declareObject(EsqlQueryRequest::filter, (p, c) -> AbstractQueryBuilder.parseTopLevelQuery(p), FILTER_FIELD);
parser.declareBoolean(EsqlQueryRequest::acceptedPragmaRisks, ACCEPT_PRAGMA_RISKS);
parser.declareBoolean(EsqlQueryRequest::includeCCSMetadata, INCLUDE_CCS_METADATA_FIELD);
parser.declareObject(
EsqlQueryRequest::pragmas,
(p, c) -> new QueryPragmas(Settings.builder().loadFromMap(p.map()).build()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ public void messageReceived(ClusterComputeRequest request, TransportChannel chan
* execution metadata for ES|QL processing local to this cluster. The execution info will be copied into the
* ComputeResponse that is sent back to the primary coordinating cluster.
*/
EsqlExecutionInfo execInfo = new EsqlExecutionInfo();
EsqlExecutionInfo execInfo = new EsqlExecutionInfo(true);
execInfo.swapCluster(clusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(clusterAlias, Arrays.toString(request.indices())));
CancellableTask cancellable = (CancellableTask) task;
long start = request.configuration().getQueryStartTimeNanos();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<Es
System.nanoTime()
);
String sessionId = sessionID(task);
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(clusterAlias -> remoteClusterService.isSkipUnavailable(clusterAlias));
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(clusterAlias -> remoteClusterService.isSkipUnavailable(clusterAlias), true);
BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase = (physicalPlan, resultListener) -> computeService.execute(
sessionId,
(CancellableTask) task,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ private ActualResults executePlan(BigArrays bigArrays) throws Exception {

session.executeOptimizedPlan(
new EsqlQueryRequest(),
new EsqlExecutionInfo(),
new EsqlExecutionInfo(randomBoolean()),
runPhase(bigArrays, physicalOperationProviders),
session.optimizedPlan(analyzed),
listener.delegateFailureAndWrap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ EsqlQueryResponse randomResponseAsync(boolean columnar, EsqlQueryResponse.Profil
}

EsqlExecutionInfo createExecutionInfo() {
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo();
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
executionInfo.overallTook(new TimeValue(5000));
executionInfo.swapCluster(
"",
Expand Down Expand Up @@ -426,9 +426,9 @@ static EsqlExecutionInfo parseClusters(XContentParser parser) throws IOException
}
}
if (clusterInfoMap.isEmpty()) {
return new EsqlExecutionInfo();
return new EsqlExecutionInfo(true);
} else {
return new EsqlExecutionInfo(clusterInfoMap);
return new EsqlExecutionInfo(clusterInfoMap, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class TextFormatterTests extends ESTestCase {
null,
randomBoolean(),
randomBoolean(),
new EsqlExecutionInfo()
new EsqlExecutionInfo(randomBoolean())
);

TextFormatter formatter = new TextFormatter(esqlResponse);
Expand Down Expand Up @@ -157,7 +157,7 @@ public void testFormatWithoutHeader() {
null,
randomBoolean(),
randomBoolean(),
new EsqlExecutionInfo()
new EsqlExecutionInfo(randomBoolean())
);

String[] result = getTextBodyContent(new TextFormatter(response).format(false)).split("\n");
Expand Down Expand Up @@ -198,7 +198,7 @@ public void testVeryLongPadding() {
null,
randomBoolean(),
randomBoolean(),
new EsqlExecutionInfo()
new EsqlExecutionInfo(randomBoolean())
)
).format(false)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private ComputeResponse randomResponse(boolean includeExecutionInfo) {

public void testEmpty() {
PlainActionFuture<ComputeResponse> results = new PlainActionFuture<>();
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo();
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(randomBoolean());
try (
ComputeListener ignored = ComputeListener.create(
RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
Expand All @@ -145,7 +145,7 @@ public void testEmpty() {
public void testCollectComputeResults() {
PlainActionFuture<ComputeResponse> future = new PlainActionFuture<>();
List<DriverProfile> allProfiles = new ArrayList<>();
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo();
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(randomBoolean());
try (
ComputeListener computeListener = ComputeListener.create(
RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
Expand Down Expand Up @@ -194,7 +194,7 @@ public void testAcquireComputeCCSListener() {
PlainActionFuture<ComputeResponse> future = new PlainActionFuture<>();
List<DriverProfile> allProfiles = new ArrayList<>();
String remoteAlias = "rc1";
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo();
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
executionInfo.swapCluster(remoteAlias, (k, v) -> new EsqlExecutionInfo.Cluster(remoteAlias, "logs*", false));
try (
ComputeListener computeListener = ComputeListener.create(
Expand Down Expand Up @@ -248,7 +248,7 @@ public void testAcquireComputeCCSListener() {
public void testAcquireComputeRunningOnRemoteClusterFillsInTookTime() {
PlainActionFuture<ComputeResponse> future = new PlainActionFuture<>();
List<DriverProfile> allProfiles = new ArrayList<>();
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo();
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
String remoteAlias = "rc1";
executionInfo.swapCluster(
remoteAlias,
Expand Down Expand Up @@ -318,7 +318,7 @@ public void testAcquireComputeRunningOnRemoteClusterFillsInTookTime() {
public void testAcquireComputeRunningOnQueryingClusterFillsInTookTime() {
PlainActionFuture<ComputeResponse> future = new PlainActionFuture<>();
List<DriverProfile> allProfiles = new ArrayList<>();
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo();
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true);
String localCluster = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
// we need a remote cluster in the ExecutionInfo in order to simulate a CCS, since ExecutionInfo is only
// fully filled in for cross-cluster searches
Expand Down Expand Up @@ -372,7 +372,7 @@ public void testCancelOnFailure() throws Exception {
int failedTasks = between(1, 100);
PlainActionFuture<ComputeResponse> rootListener = new PlainActionFuture<>();
CancellableTask rootTask = newTask();
EsqlExecutionInfo execInfo = new EsqlExecutionInfo();
EsqlExecutionInfo execInfo = new EsqlExecutionInfo(randomBoolean());
try (
ComputeListener computeListener = ComputeListener.create(
RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
Expand Down Expand Up @@ -436,7 +436,7 @@ public void onFailure(Exception e) {
}
};
CountDownLatch latch = new CountDownLatch(1);
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo();
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(randomBoolean());
try (
ComputeListener computeListener = ComputeListener.create(
RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
Expand Down
Loading
Loading