Skip to content

Commit 6edc606

Browse files
authored
Took time and cluster details get updated for coordinator only query operations (#114075) (#114153)
* Took time and cluster details get updated for coordinator only query operations The ComputeService.runCompute pathway for coordinator only operations (such as `FROM foo | LIMIT 0` or a ROW command) get updated with overall took time. This also includes support for cross-cluster coordinator only operations, which come about with queries like `FROM foo,remote:foo | LIMIT 0`. The _clusters metadata is now properly updated for those cases as well. Fixes #114014
1 parent 519da17 commit 6edc606

File tree

6 files changed

+167
-12
lines changed

6 files changed

+167
-12
lines changed

x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ public void testDoNotLogWithInfo() throws IOException {
117117
setLoggingLevel("INFO");
118118
RequestObjectBuilder builder = requestObjectBuilder().query("ROW DO_NOT_LOG_ME = 1");
119119
Map<String, Object> result = runEsql(builder);
120-
assertEquals(2, result.size());
120+
assertEquals(3, result.size());
121+
assertThat(((Integer) result.get("took")).intValue(), greaterThanOrEqualTo(0));
121122
Map<String, String> colA = Map.of("name", "DO_NOT_LOG_ME", "type", "integer");
122123
assertEquals(List.of(colA), result.get("columns"));
123124
assertEquals(List.of(List.of(1)), result.get("values"));
@@ -136,7 +137,8 @@ public void testDoLogWithDebug() throws IOException {
136137
setLoggingLevel("DEBUG");
137138
RequestObjectBuilder builder = requestObjectBuilder().query("ROW DO_LOG_ME = 1");
138139
Map<String, Object> result = runEsql(builder);
139-
assertEquals(2, result.size());
140+
assertEquals(3, result.size());
141+
assertThat(((Integer) result.get("took")).intValue(), greaterThanOrEqualTo(0));
140142
Map<String, String> colA = Map.of("name", "DO_LOG_ME", "type", "integer");
141143
assertEquals(List.of(colA), result.get("columns"));
142144
assertEquals(List.of(List.of(1)), result.get("values"));

x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,8 @@ public static RequestObjectBuilder jsonBuilder() throws IOException {
249249

250250
public void testGetAnswer() throws IOException {
251251
Map<String, Object> answer = runEsql(requestObjectBuilder().query("row a = 1, b = 2"));
252-
assertEquals(2, answer.size());
252+
assertEquals(3, answer.size());
253+
assertThat(((Integer) answer.get("took")).intValue(), greaterThanOrEqualTo(0));
253254
Map<String, String> colA = Map.of("name", "a", "type", "integer");
254255
Map<String, String> colB = Map.of("name", "b", "type", "integer");
255256
assertEquals(List.of(colA, colB), answer.get("columns"));

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
4444
import static org.hamcrest.Matchers.hasSize;
4545
import static org.hamcrest.Matchers.is;
46+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
4647

4748
public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
4849
private static final String REMOTE_CLUSTER = "cluster-a";
@@ -339,6 +340,108 @@ public void testSearchesWhereNonExistentClusterIsSpecifiedWithWildcards() {
339340
}
340341
}
341342

343+
/**
344+
* Searches with LIMIT 0 are used by Kibana to get a list of columns. After the initial planning
345+
* (which involves cross-cluster field-caps calls), it is a coordinator only operation at query time
346+
* which uses a different pathway compared to queries that require data node (and remote data node) operations
347+
* at query time.
348+
*/
349+
public void testCCSExecutionOnSearchesWithLimit0() {
350+
setupTwoClusters();
351+
352+
// Ensure non-cross cluster queries have overall took time
353+
try (EsqlQueryResponse resp = runQuery("FROM logs* | LIMIT 0")) {
354+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
355+
assertNotNull(executionInfo);
356+
assertThat(executionInfo.isCrossClusterSearch(), is(false));
357+
assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
358+
}
359+
360+
// ensure cross-cluster searches have overall took time and correct per-cluster details in EsqlExecutionInfo
361+
try (EsqlQueryResponse resp = runQuery("FROM logs*,cluster-a:* | LIMIT 0")) {
362+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
363+
assertNotNull(executionInfo);
364+
assertThat(executionInfo.isCrossClusterSearch(), is(true));
365+
long overallTookMillis = executionInfo.overallTook().millis();
366+
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
367+
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER)));
368+
369+
EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER);
370+
assertThat(remoteCluster.getIndexExpression(), equalTo("*"));
371+
assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
372+
assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L));
373+
assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
374+
assertNull(remoteCluster.getTotalShards());
375+
assertNull(remoteCluster.getSuccessfulShards());
376+
assertNull(remoteCluster.getSkippedShards());
377+
assertNull(remoteCluster.getFailedShards());
378+
379+
EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
380+
assertThat(localCluster.getIndexExpression(), equalTo("logs*"));
381+
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
382+
assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
383+
assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
384+
assertNull(localCluster.getTotalShards());
385+
assertNull(localCluster.getSuccessfulShards());
386+
assertNull(localCluster.getSkippedShards());
387+
assertNull(localCluster.getFailedShards());
388+
}
389+
390+
try (EsqlQueryResponse resp = runQuery("FROM logs*,cluster-a:nomatch* | LIMIT 0")) {
391+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
392+
assertNotNull(executionInfo);
393+
assertThat(executionInfo.isCrossClusterSearch(), is(true));
394+
long overallTookMillis = executionInfo.overallTook().millis();
395+
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
396+
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER)));
397+
398+
EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER);
399+
assertThat(remoteCluster.getIndexExpression(), equalTo("nomatch*"));
400+
assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
401+
assertThat(remoteCluster.getTook().millis(), equalTo(0L));
402+
assertThat(remoteCluster.getTotalShards(), equalTo(0));
403+
assertThat(remoteCluster.getSuccessfulShards(), equalTo(0));
404+
assertThat(remoteCluster.getSkippedShards(), equalTo(0));
405+
assertThat(remoteCluster.getFailedShards(), equalTo(0));
406+
407+
EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
408+
assertThat(localCluster.getIndexExpression(), equalTo("logs*"));
409+
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
410+
assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
411+
assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
412+
assertNull(localCluster.getTotalShards());
413+
assertNull(localCluster.getSuccessfulShards());
414+
assertNull(localCluster.getSkippedShards());
415+
assertNull(localCluster.getFailedShards());
416+
}
417+
418+
try (EsqlQueryResponse resp = runQuery("FROM nomatch*,cluster-a:* | LIMIT 0")) {
419+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
420+
assertNotNull(executionInfo);
421+
assertThat(executionInfo.isCrossClusterSearch(), is(true));
422+
long overallTookMillis = executionInfo.overallTook().millis();
423+
assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
424+
assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER)));
425+
426+
EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER);
427+
assertThat(remoteCluster.getIndexExpression(), equalTo("*"));
428+
assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
429+
assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L));
430+
assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
431+
assertNull(remoteCluster.getTotalShards());
432+
assertNull(remoteCluster.getSuccessfulShards());
433+
assertNull(remoteCluster.getSkippedShards());
434+
assertNull(remoteCluster.getFailedShards());
435+
436+
EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
437+
assertThat(localCluster.getIndexExpression(), equalTo("nomatch*"));
438+
// TODO: in https://github.com/elastic/elasticsearch/issues/112886, this will be changed to be SKIPPED
439+
assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
440+
assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
441+
assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
442+
}
443+
}
444+
342445
public void testMetadataIndex() {
343446
Map<String, Object> testClusterInfo = setupTwoClusters();
344447
int localNumShards = (Integer) testClusterInfo.get("local.num_shards");

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,17 +171,21 @@ public void execute(
171171
null,
172172
null
173173
);
174+
String local = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
174175
try (
175176
var computeListener = ComputeListener.create(
176-
RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
177+
local,
177178
transportService,
178179
rootTask,
179180
execInfo,
180181
configuration.getQueryStartTimeNanos(),
181-
listener.map(r -> new Result(physicalPlan.output(), collectedPages, r.getProfiles(), execInfo))
182+
listener.map(r -> {
183+
updateExecutionInfoAfterCoordinatorOnlyQuery(configuration.getQueryStartTimeNanos(), execInfo);
184+
return new Result(physicalPlan.output(), collectedPages, r.getProfiles(), execInfo);
185+
})
182186
)
183187
) {
184-
runCompute(rootTask, computeContext, coordinatorPlan, computeListener.acquireCompute());
188+
runCompute(rootTask, computeContext, coordinatorPlan, computeListener.acquireCompute(local));
185189
return;
186190
}
187191
} else {
@@ -247,6 +251,27 @@ public void execute(
247251
}
248252
}
249253

254+
private static void updateExecutionInfoAfterCoordinatorOnlyQuery(long queryStartNanos, EsqlExecutionInfo execInfo) {
255+
long tookTimeNanos = System.nanoTime() - queryStartNanos;
256+
execInfo.overallTook(new TimeValue(tookTimeNanos, TimeUnit.NANOSECONDS));
257+
if (execInfo.isCrossClusterSearch()) {
258+
for (String clusterAlias : execInfo.clusterAliases()) {
259+
// The local cluster 'took' time gets updated as part of the acquireCompute(local) call in the coordinator, so
260+
// here we only need to update status for remote clusters since there are no remote ComputeListeners in this case.
261+
// This happens in cross cluster searches that use LIMIT 0, e.g, FROM logs*,remote*:logs* | LIMIT 0.
262+
if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) {
263+
execInfo.swapCluster(clusterAlias, (k, v) -> {
264+
if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
265+
return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL).build();
266+
} else {
267+
return v;
268+
}
269+
});
270+
}
271+
}
272+
}
273+
}
274+
250275
private List<RemoteCluster> getRemoteClusters(
251276
Map<String, OriginalIndices> clusterToConcreteIndices,
252277
Map<String, OriginalIndices> clusterToOriginalIndices

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import java.util.List;
7373
import java.util.Map;
7474
import java.util.Set;
75+
import java.util.concurrent.TimeUnit;
7576
import java.util.function.BiConsumer;
7677
import java.util.function.BiFunction;
7778
import java.util.function.Predicate;
@@ -245,6 +246,7 @@ private <T> void preAnalyze(
245246
if (indexResolution.isValid()) {
246247
updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
247248
updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.getUnavailableClusters());
249+
updateTookTimeForRemoteClusters(executionInfo);
248250
Set<String> newClusters = enrichPolicyResolver.groupIndicesPerCluster(
249251
indexResolution.get().concreteIndices().toArray(String[]::new)
250252
).keySet();
@@ -285,6 +287,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn
285287
}
286288
Set<String> clustersRequested = executionInfo.clusterAliases();
287289
Set<String> clustersWithNoMatchingIndices = Sets.difference(clustersRequested, clustersWithResolvedIndices);
290+
clustersWithNoMatchingIndices.removeAll(indexResolution.getUnavailableClusters());
288291
/*
289292
* These are clusters in the original request that are not present in the field-caps response. They were
290293
* specified with an index or indices that do not exist, so the search on that cluster is done.
@@ -304,6 +307,28 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn
304307
}
305308
}
306309

310+
private void updateTookTimeForRemoteClusters(EsqlExecutionInfo executionInfo) {
311+
if (executionInfo.isCrossClusterSearch()) {
312+
for (String clusterAlias : executionInfo.clusterAliases()) {
313+
if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) {
314+
executionInfo.swapCluster(clusterAlias, (k, v) -> {
315+
if (v.getTook() == null && v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
316+
// set took time in case we are finished with the remote cluster (e.g., FROM foo | LIMIT 0).
317+
// this will be overwritten later if ES|QL operations happen on the remote cluster (the typical scenario)
318+
TimeValue took = new TimeValue(
319+
System.nanoTime() - configuration.getQueryStartTimeNanos(),
320+
TimeUnit.NANOSECONDS
321+
);
322+
return new EsqlExecutionInfo.Cluster.Builder(v).setTook(took).build();
323+
} else {
324+
return v;
325+
}
326+
});
327+
}
328+
}
329+
}
330+
}
331+
307332
private void preAnalyzeIndices(
308333
LogicalPlan parsed,
309334
EsqlExecutionInfo executionInfo,

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {
216216
randomMapping(),
217217
Map.of("logs-a", IndexMode.STANDARD)
218218
);
219+
// mark remote1 as unavailable
219220
IndexResolution indexResolution = IndexResolution.valid(esIndex, Set.of(remote1Alias));
220221

221222
EsqlSession.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
@@ -226,12 +227,10 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {
226227

227228
EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias);
228229
assertThat(remote1Cluster.getIndexExpression(), equalTo("*"));
229-
assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
230-
assertThat(remote1Cluster.getTook().millis(), equalTo(0L));
231-
assertThat(remote1Cluster.getTotalShards(), equalTo(0));
232-
assertThat(remote1Cluster.getSuccessfulShards(), equalTo(0));
233-
assertThat(remote1Cluster.getSkippedShards(), equalTo(0));
234-
assertThat(remote1Cluster.getFailedShards(), equalTo(0));
230+
// remote1 is left as RUNNING, since another method (updateExecutionInfoWithUnavailableClusters) not under test changes status
231+
assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING));
232+
assertNull(remote1Cluster.getTook());
233+
assertNull(remote1Cluster.getTotalShards());
235234

236235
EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias);
237236
assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*"));

0 commit comments

Comments
 (0)