Skip to content

Commit ffb7782

Browse files
ES|QL: Let include_execution_metadata always return data, also in local only (elastic#137641)
1 parent 7fbb495 commit ffb7782

File tree

11 files changed

+243
-49
lines changed

11 files changed

+243
-49
lines changed

docs/changelog/137641.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 137641
2+
summary: "Let `include_execution_metadata` always return data, also in local only"
3+
area: ES|QL
4+
type: enhancement
5+
issues: []
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9216000
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
inference_api_eis_authorization_persistent_task,9215000
1+
esql_execution_metadata,9216000

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -354,11 +354,7 @@ protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse
354354
request.profile(randomInt(5) == 2);
355355
request.columnar(randomBoolean());
356356
if (ccsMetadataInResponse != null) {
357-
if (randomBoolean()) {
358-
request.includeExecutionMetadata(ccsMetadataInResponse);
359-
} else {
360-
request.includeCCSMetadata(ccsMetadataInResponse);
361-
}
357+
request.includeCCSMetadata(ccsMetadataInResponse);
362358
}
363359
return runQuery(request);
364360
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1039,10 +1039,7 @@ public void testNoBothIncludeCcsMetadataAndIncludeExecutionMetadata() throws Exc
10391039

10401040
assertThat(
10411041
expectThrows(VerificationException.class, () -> runQuery(request)).getMessage(),
1042-
containsString(
1043-
"Both [include_execution_metadata] and [include_ccs_metadata] query parameters are set. "
1044-
+ "Use only [include_execution_metadata]"
1045-
)
1042+
containsString("Both [include_execution_metadata] and [include_ccs_metadata] query parameters are set. Use only one")
10461043
);
10471044
}
10481045
}
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.action;
9+
10+
import org.elasticsearch.test.XContentTestUtils;
11+
12+
import java.io.IOException;
13+
import java.util.List;
14+
import java.util.Map;
15+
import java.util.Set;
16+
17+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
18+
import static org.hamcrest.Matchers.equalTo;
19+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
20+
import static org.hamcrest.Matchers.hasSize;
21+
import static org.hamcrest.Matchers.is;
22+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
23+
24+
public class QueryExecutionMetadataIT extends AbstractCrossClusterTestCase {
25+
26+
@Override
27+
protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
28+
return Map.of(REMOTE_CLUSTER_1, randomBoolean(), REMOTE_CLUSTER_2, randomBoolean());
29+
}
30+
31+
protected void assertClusterInfoSuccess(EsqlExecutionInfo.Cluster clusterInfo, int numShards, long overallTookMillis) {
32+
assertThat(clusterInfo.getIndexExpression(), equalTo("logs-*"));
33+
assertThat(clusterInfo.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
34+
super.assertClusterInfoSuccess(clusterInfo, numShards);
35+
}
36+
37+
protected EsqlQueryResponse runQueryWithMetadata(String query, Boolean includeExecutionMetadata) {
38+
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
39+
request.query(query);
40+
request.pragmas(AbstractEsqlIntegTestCase.randomPragmas());
41+
request.profile(randomInt(5) == 2);
42+
request.columnar(randomBoolean());
43+
if (includeExecutionMetadata != null) {
44+
request.includeExecutionMetadata(includeExecutionMetadata);
45+
}
46+
return runQuery(request);
47+
}
48+
49+
public void testLocal() throws Exception {
50+
testQuery(true, false, 45L);
51+
}
52+
53+
public void testRemote() throws Exception {
54+
testQuery(false, true, 285L);
55+
}
56+
57+
public void testLocalAndRemote() throws Exception {
58+
testQuery(true, true, 330L);
59+
}
60+
61+
protected void testQuery(boolean local, boolean remote, long nRecords) throws Exception {
62+
if (local == false && remote == false) {
63+
throw new IllegalArgumentException("At least one of local or remote must be true");
64+
}
65+
StringBuilder query = new StringBuilder("from ");
66+
if (local) {
67+
query.append("logs-*");
68+
if (remote) {
69+
query.append(",");
70+
}
71+
}
72+
if (remote) {
73+
query.append("c*:logs-*");
74+
}
75+
query.append(" | stats sum (v)");
76+
Map<String, Object> testClusterInfo = setupTwoClusters();
77+
int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
78+
int remoteNumShards = (Integer) testClusterInfo.get("remote1.num_shards");
79+
80+
boolean includeMetadata = randomBoolean();
81+
try (EsqlQueryResponse resp = runQueryWithMetadata(query.toString(), includeMetadata)) {
82+
List<List<Object>> values = getValuesList(resp);
83+
assertThat(values, hasSize(1));
84+
assertThat(values.get(0), equalTo(List.of(nRecords)));
85+
86+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
87+
assertNotNull(executionInfo);
88+
assertThat(executionInfo.isCrossClusterSearch(), is(remote));
89+
long overallTookMillis = executionInfo.overallTook().millis();
90+
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
91+
assertThat(
92+
executionInfo.includeExecutionMetadata(),
93+
equalTo(
94+
includeMetadata ? EsqlExecutionInfo.IncludeExecutionMetadata.ALWAYS : EsqlExecutionInfo.IncludeExecutionMetadata.NEVER
95+
)
96+
);
97+
98+
if (remote && local) {
99+
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1, LOCAL_CLUSTER)));
100+
} else if (remote) {
101+
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER_1)));
102+
} else {
103+
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER)));
104+
}
105+
106+
if (remote) {
107+
EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER_1);
108+
assertClusterInfoSuccess(remoteCluster, remoteNumShards, overallTookMillis);
109+
}
110+
111+
if (local) {
112+
EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
113+
assertClusterInfoSuccess(localCluster, localNumShards, overallTookMillis);
114+
}
115+
116+
assertClusterMetadataInResponse(resp, includeMetadata);
117+
}
118+
}
119+
120+
private static void assertClusterMetadataInResponse(EsqlQueryResponse resp, boolean present) {
121+
try {
122+
final Map<String, Object> esqlResponseAsMap = XContentTestUtils.convertToMap(resp);
123+
final Object clusters = esqlResponseAsMap.get("_clusters");
124+
125+
if (present) {
126+
assertNotNull(clusters);
127+
// test a few entries to ensure it looks correct (other tests do a full analysis of the metadata in the response)
128+
@SuppressWarnings("unchecked")
129+
Map<String, Object> inner = (Map<String, Object>) clusters;
130+
assertTrue(inner.containsKey("total"));
131+
assertTrue(inner.containsKey("details"));
132+
} else {
133+
assertNull(clusters);
134+
}
135+
136+
} catch (IOException e) {
137+
fail("Could not convert ESQL response to Map: " + e);
138+
}
139+
}
140+
141+
Map<String, Object> setupTwoClusters() throws IOException {
142+
return setupClusters(2);
143+
}
144+
145+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

Lines changed: 57 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
6464
public static final ParseField IS_PARTIAL_FIELD = new ParseField("is_partial");
6565

6666
private static final TransportVersion ESQL_QUERY_PLANNING_DURATION = TransportVersion.fromName("esql_query_planning_duration");
67+
public static final TransportVersion EXECUTION_METADATA_VERSION = TransportVersion.fromName("esql_execution_metadata");
6768

6869
// Map key is clusterAlias on the primary querying cluster of a CCS minimize_roundtrips=true query
6970
// The Map itself is immutable after construction - all Clusters will be accounted for at the start of the search.
@@ -72,8 +73,15 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
7273
public final ConcurrentMap<String, Cluster> clusterInfo;
7374
// Is the clusterInfo map iinitialization in progress? If so, we should not try to serialize it.
7475
private transient volatile boolean clusterInfoInitializing;
75-
// whether the user has asked for CCS metadata to be in the JSON response (the overall took will always be present)
76-
private final boolean includeCCSMetadata;
76+
77+
public enum IncludeExecutionMetadata {
78+
ALWAYS,
79+
CCS_ONLY,
80+
NEVER
81+
}
82+
83+
// whether the user has asked for execution/CCS metadata to be in the JSON response (the overall took will always be present)
84+
private final IncludeExecutionMetadata includeExecutionMetadata;
7785

7886
// fields that are not Writeable since they are only needed on the primary CCS coordinator
7987
private final transient Predicate<String> skipOnFailurePredicate; // Predicate to determine if we should skip a cluster on failure
@@ -89,36 +97,53 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
8997
// Are we doing subplans? No need to serialize this because it is only relevant for the coordinator node.
9098
private transient boolean inSubplan = false;
9199

92-
// This is only used is tests.
100+
// FOR TESTS ONLY
93101
public EsqlExecutionInfo(boolean includeCCSMetadata) {
94-
this(Predicates.always(), includeCCSMetadata); // default all clusters to being skippable on failure
102+
// default all clusters to being skippable on failure
103+
this(Predicates.always(), includeCCSMetadata ? IncludeExecutionMetadata.CCS_ONLY : IncludeExecutionMetadata.NEVER);
95104
}
96105

97106
/**
98-
* @param skipOnPlanTimeFailurePredicate Decides whether we should skip the cluster that fails during planning phase.
99-
* @param includeCCSMetadata (user defined setting) whether to include the CCS metadata in the HTTP response
107+
* FOR TESTING use with fromXContent parsing ONLY
100108
*/
101-
public EsqlExecutionInfo(Predicate<String> skipOnPlanTimeFailurePredicate, boolean includeCCSMetadata) {
102-
this.clusterInfo = new ConcurrentHashMap<>();
103-
this.skipOnFailurePredicate = skipOnPlanTimeFailurePredicate;
104-
this.includeCCSMetadata = includeCCSMetadata;
105-
this.relativeStart = TimeSpan.start();
109+
EsqlExecutionInfo(ConcurrentMap<String, Cluster> clusterInfo, boolean includeCCSMetadata) {
110+
this(
111+
clusterInfo,
112+
Predicates.always(),
113+
includeCCSMetadata ? IncludeExecutionMetadata.CCS_ONLY : IncludeExecutionMetadata.NEVER,
114+
null
115+
);
106116
}
107117

108118
/**
109-
* For testing use with fromXContent parsing only
119+
* @param skipOnPlanTimeFailurePredicate Decides whether we should skip the cluster that fails during planning phase.
120+
* @param includeExecutionMetadata (user defined setting) whether to include the execution/CCS metadata in the HTTP response
110121
*/
111-
EsqlExecutionInfo(ConcurrentMap<String, Cluster> clusterInfo, boolean includeCCSMetadata) {
122+
public EsqlExecutionInfo(Predicate<String> skipOnPlanTimeFailurePredicate, IncludeExecutionMetadata includeExecutionMetadata) {
123+
this(new ConcurrentHashMap<>(), skipOnPlanTimeFailurePredicate, includeExecutionMetadata, TimeSpan.start());
124+
}
125+
126+
EsqlExecutionInfo(
127+
ConcurrentMap<String, Cluster> clusterInfo,
128+
Predicate<String> skipOnPlanTimeFailurePredicate,
129+
IncludeExecutionMetadata includeExecutionMetadata,
130+
TimeSpan.Builder relativeStart
131+
) {
132+
assert includeExecutionMetadata != null;
112133
this.clusterInfo = clusterInfo;
113-
this.includeCCSMetadata = includeCCSMetadata;
114-
this.skipOnFailurePredicate = Predicates.always();
115-
this.relativeStart = null;
134+
this.skipOnFailurePredicate = skipOnPlanTimeFailurePredicate;
135+
this.includeExecutionMetadata = includeExecutionMetadata;
136+
this.relativeStart = relativeStart;
116137
}
117138

118139
public EsqlExecutionInfo(StreamInput in) throws IOException {
119140
this.overallTook = in.readOptionalTimeValue();
120141
this.clusterInfo = in.readMapValues(EsqlExecutionInfo.Cluster::new, Cluster::getClusterAlias, ConcurrentHashMap::new);
121-
this.includeCCSMetadata = in.readBoolean();
142+
if (in.getTransportVersion().supports(EXECUTION_METADATA_VERSION)) {
143+
this.includeExecutionMetadata = in.readEnum(IncludeExecutionMetadata.class);
144+
} else {
145+
this.includeExecutionMetadata = in.readBoolean() ? IncludeExecutionMetadata.CCS_ONLY : IncludeExecutionMetadata.NEVER;
146+
}
122147
this.isPartial = in.getTransportVersion().supports(TransportVersions.V_8_18_0) ? in.readBoolean() : false;
123148
this.skipOnFailurePredicate = Predicates.always();
124149
this.relativeStart = null;
@@ -136,7 +161,11 @@ public void writeTo(StreamOutput out) throws IOException {
136161
} else {
137162
out.writeCollection(Collections.emptyList());
138163
}
139-
out.writeBoolean(includeCCSMetadata);
164+
if (out.getTransportVersion().supports(EXECUTION_METADATA_VERSION)) {
165+
out.writeEnum(includeExecutionMetadata);
166+
} else {
167+
out.writeBoolean(includeExecutionMetadata != IncludeExecutionMetadata.NEVER);
168+
}
140169
if (out.getTransportVersion().supports(TransportVersions.V_8_18_0)) {
141170
out.writeBoolean(isPartial);
142171
}
@@ -147,8 +176,13 @@ public void writeTo(StreamOutput out) throws IOException {
147176
assert inSubplan == false : "Should not be serializing execution info while in subplans";
148177
}
149178

179+
// this is still here for testing only, use includeExecutionMetadata() in production code
150180
public boolean includeCCSMetadata() {
151-
return includeCCSMetadata;
181+
return includeExecutionMetadata == IncludeExecutionMetadata.ALWAYS || includeExecutionMetadata == IncludeExecutionMetadata.CCS_ONLY;
182+
}
183+
184+
public IncludeExecutionMetadata includeExecutionMetadata() {
185+
return includeExecutionMetadata;
152186
}
153187

154188
public Predicate<String> skipOnFailurePredicate() {
@@ -232,7 +266,8 @@ public boolean isCrossClusterSearch() {
232266
* This is true on cross-cluster search with includeCCSMetadata=true or when there are partial failures.
233267
*/
234268
public boolean hasMetadataToReport() {
235-
return isCrossClusterSearch() && includeCCSMetadata
269+
return includeExecutionMetadata == IncludeExecutionMetadata.ALWAYS && clusterInfo.isEmpty() == false
270+
|| isCrossClusterSearch() && includeExecutionMetadata == IncludeExecutionMetadata.CCS_ONLY
236271
|| (isPartial && clusterInfo.values().stream().anyMatch(c -> c.getFailures().isEmpty() == false));
237272
}
238273

@@ -275,10 +310,11 @@ public Cluster swapCluster(String clusterAlias, BiFunction<String, Cluster, Clus
275310

276311
@Override
277312
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
313+
// TODO remote this?
278314
if (clusterInfo.isEmpty()) {
279315
return Collections.emptyIterator();
280316
}
281-
if (includeCCSMetadata == false) {
317+
if (includeExecutionMetadata == IncludeExecutionMetadata.NEVER) {
282318
// If includeCCSMetadata is false, the only reason we're here is partial failures, so just report them.
283319
return onlyFailuresToXContent();
284320
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import java.util.function.Function;
7676
import java.util.function.Supplier;
7777

78+
import static org.elasticsearch.xpack.esql.action.EsqlExecutionInfo.IncludeExecutionMetadata.ALWAYS;
7879
import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME;
7980

8081
/**
@@ -523,7 +524,7 @@ public void executePlan(
523524

524525
// For queries like: FROM logs* | LIMIT 0 (including cross-cluster LIMIT 0 queries)
525526
private static void updateShardCountForCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) {
526-
if (execInfo.isCrossClusterSearch()) {
527+
if (execInfo.isCrossClusterSearch() || execInfo.includeExecutionMetadata() == ALWAYS) {
527528
for (String clusterAlias : execInfo.clusterAliases()) {
528529
execInfo.swapCluster(
529530
clusterAlias,
@@ -540,7 +541,7 @@ private static void updateShardCountForCoordinatorOnlyQuery(EsqlExecutionInfo ex
540541
// For queries like: FROM logs* | LIMIT 0 (including cross-cluster LIMIT 0 queries)
541542
private static void updateExecutionInfoAfterCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) {
542543
execInfo.markEndQuery();
543-
if (execInfo.isCrossClusterSearch() && execInfo.isMainPlan()) {
544+
if ((execInfo.isCrossClusterSearch() || execInfo.includeExecutionMetadata() == ALWAYS) && execInfo.isMainPlan()) {
544545
assert execInfo.planningTookTime() != null : "Planning took time should be set on EsqlExecutionInfo but is null";
545546
for (String clusterAlias : execInfo.clusterAliases()) {
546547
execInfo.swapCluster(clusterAlias, (k, v) -> {

0 commit comments

Comments
 (0)