Skip to content

Commit e33e1a0

Browse files
authored
ESQL: async search responses have CCS metadata while searches are running (#117265)
ES|QL async search responses now include CCS metadata while the query is still running. The CCS metadata will be present only if a remote cluster is queried and the user requested it with the `include_ccs_metadata: true` setting on the original request to `POST /_query/async`. The setting cannot be modified in the query to `GET /_query/async/:id`. The core change is that the EsqlExecutionInfo object is set on the EsqlQueryTask, which is used for async ES|QL queries, so that calls to `GET /_query/async/:id` have access to the same EsqlExecutionInfo object that is being updated as the planning and query progress. Secondly, the overall `took` time is now always present on ES|QL responses, even for async-searches while the query is still running. The took time shows a "took-so-far" value and will change upon refresh until the query has finished. This is present regardless of the `include_ccs_metadata` setting. Example response showing in progress state of the query: ``` GET _query/async/FlhaeTBxUU0yU2xhVzM2TlRLY3F1eXcceWlSWWZlRDhUVTJEUGFfZUROaDdtUTo0MDQwNA ``` ```json { "id": "FlhaeTBxUU0yU2xhVzM2TlRLY3F1eXcceWlSWWZlRDhUVTJEUGFfZUROaDdtUTo0MDQwNA==", "is_running": true, "took": 2032, "columns": [], "values": [], "_clusters": { "total": 3, "successful": 1, "running": 2, "skipped": 0, "partial": 0, "failed": 0, "details": { "(local)": { "status": "running", "indices": "web_traffic", "_shards": { "total": 2, "skipped": 0 } }, "remote1": { "status": "running", "indices": "web_traffic" }, "remote2": { "status": "successful", "indices": "web_traffic", "took": 180, "_shards": { "total": 2, "successful": 2, "skipped": 0, "failed": 0 } } } } } ```
1 parent 7a98e31 commit e33e1a0

File tree

12 files changed

+634
-33
lines changed

12 files changed

+634
-33
lines changed

docs/changelog/117265.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 117265
2+
summary: Async search responses have CCS metadata while searches are running
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java

Lines changed: 522 additions & 0 deletions
Large diffs are not rendered by default.

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@
6161
public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
6262
private static final String REMOTE_CLUSTER_1 = "cluster-a";
6363
private static final String REMOTE_CLUSTER_2 = "remote-b";
64+
private static String LOCAL_INDEX = "logs-1";
65+
private static String IDX_ALIAS = "alias1";
66+
private static String FILTERED_IDX_ALIAS = "alias-filtered-1";
67+
private static String REMOTE_INDEX = "logs-2";
6468

6569
@Override
6670
protected Collection<String> remoteClusterAlias() {
@@ -1278,11 +1282,6 @@ Map<String, Object> setupTwoClusters() {
12781282
return setupClusters(2);
12791283
}
12801284

1281-
private static String LOCAL_INDEX = "logs-1";
1282-
private static String IDX_ALIAS = "alias1";
1283-
private static String FILTERED_IDX_ALIAS = "alias-filtered-1";
1284-
private static String REMOTE_INDEX = "logs-2";
1285-
12861285
Map<String, Object> setupClusters(int numClusters) {
12871286
assert numClusters == 2 || numClusters == 3 : "2 or 3 clusters supported not: " + numClusters;
12881287
int numShardsLocal = randomIntBetween(1, 5);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,17 @@ public TimeValue overallTook() {
169169
return overallTook;
170170
}
171171

172+
/**
173+
* How much time the query took since starting.
174+
*/
175+
public TimeValue tookSoFar() {
176+
if (relativeStartNanos == null) {
177+
return new TimeValue(0);
178+
} else {
179+
return new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS);
180+
}
181+
}
182+
172183
public Set<String> clusterAliases() {
173184
return clusterInfo.keySet();
174185
}
@@ -478,7 +489,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
478489
{
479490
builder.field(STATUS_FIELD.getPreferredName(), getStatus().toString());
480491
builder.field(INDICES_FIELD.getPreferredName(), indexExpression);
481-
if (took != null) {
492+
if (took != null && status != Status.RUNNING) {
482493
builder.field(TOOK.getPreferredName(), took.millis());
483494
}
484495
if (totalShards != null) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,11 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
196196
}
197197
b.field("is_running", isRunning);
198198
}
199-
if (executionInfo != null && executionInfo.overallTook() != null) {
200-
b.field("took", executionInfo.overallTook().millis());
199+
if (executionInfo != null) {
200+
long tookInMillis = executionInfo.overallTook() == null
201+
? executionInfo.tookSoFar().millis()
202+
: executionInfo.overallTook().millis();
203+
b.field("took", tookInMillis);
201204
}
202205
if (dropNullColumns) {
203206
b.append(ResponseXContentUtils.allColumns(columns, "all_columns"))

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
public class EsqlQueryTask extends StoredAsyncTask<EsqlQueryResponse> {
1919

20+
private EsqlExecutionInfo executionInfo;
21+
2022
public EsqlQueryTask(
2123
long id,
2224
String type,
@@ -29,10 +31,19 @@ public EsqlQueryTask(
2931
TimeValue keepAlive
3032
) {
3133
super(id, type, action, description, parentTaskId, headers, originHeaders, asyncExecutionId, keepAlive);
34+
this.executionInfo = null;
35+
}
36+
37+
public void setExecutionInfo(EsqlExecutionInfo executionInfo) {
38+
this.executionInfo = executionInfo;
39+
}
40+
41+
public EsqlExecutionInfo executionInfo() {
42+
return executionInfo;
3243
}
3344

3445
@Override
3546
public EsqlQueryResponse getCurrentResult() {
36-
return new EsqlQueryResponse(List.of(), List.of(), null, false, getExecutionId().getEncoded(), true, true, null);
47+
return new EsqlQueryResponse(List.of(), List.of(), null, false, getExecutionId().getEncoded(), true, true, executionInfo);
3748
}
3849
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ private ComputeListener(
112112
if (runningOnRemoteCluster()) {
113113
// for remote executions - this ComputeResponse is created on the remote cluster/node and will be serialized and
114114
// received by the acquireCompute method callback on the coordinating cluster
115+
setFinalStatusAndShardCounts(clusterAlias, executionInfo);
115116
EsqlExecutionInfo.Cluster cluster = esqlExecutionInfo.getCluster(clusterAlias);
116117
result = new ComputeResponse(
117118
collectedProfiles.isEmpty() ? List.of() : collectedProfiles.stream().toList(),
@@ -126,19 +127,33 @@ private ComputeListener(
126127
if (coordinatingClusterIsSearchedInCCS()) {
127128
// if not already marked as SKIPPED, mark the local cluster as finished once the coordinator and all
128129
// data nodes have finished processing
129-
executionInfo.swapCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, (k, v) -> {
130-
if (v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
131-
return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL).build();
132-
} else {
133-
return v;
134-
}
135-
});
130+
setFinalStatusAndShardCounts(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, executionInfo);
136131
}
137132
}
138133
delegate.onResponse(result);
139134
}, e -> delegate.onFailure(failureCollector.getFailure())));
140135
}
141136

137+
private static void setFinalStatusAndShardCounts(String clusterAlias, EsqlExecutionInfo executionInfo) {
138+
executionInfo.swapCluster(clusterAlias, (k, v) -> {
139+
// TODO: once PARTIAL status is supported (partial results work to come), modify this code as needed
140+
if (v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
141+
assert v.getTotalShards() != null && v.getSkippedShards() != null : "Null total or skipped shard count: " + v;
142+
return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)
143+
/*
144+
* Total and skipped shard counts are set early in execution (after can-match).
145+
* Until ES|QL supports shard-level partial results, we just set all non-skipped shards
146+
* as successful and none are failed.
147+
*/
148+
.setSuccessfulShards(v.getTotalShards())
149+
.setFailedShards(0)
150+
.build();
151+
} else {
152+
return v;
153+
}
154+
});
155+
}
156+
142157
/**
143158
* @return true if the "local" querying/coordinator cluster is being searched in a cross-cluster search
144159
*/

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ public void execute(
178178
null
179179
);
180180
String local = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
181+
updateShardCountForCoordinatorOnlyQuery(execInfo);
181182
try (var computeListener = ComputeListener.create(local, transportService, rootTask, execInfo, listener.map(r -> {
182183
updateExecutionInfoAfterCoordinatorOnlyQuery(execInfo);
183184
return new Result(physicalPlan.output(), collectedPages, r.getProfiles(), execInfo);
@@ -260,18 +261,30 @@ public void execute(
260261
}
261262
}
262263

264+
// For queries like: FROM logs* | LIMIT 0 (including cross-cluster LIMIT 0 queries)
265+
private static void updateShardCountForCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) {
266+
if (execInfo.isCrossClusterSearch()) {
267+
for (String clusterAlias : execInfo.clusterAliases()) {
268+
execInfo.swapCluster(
269+
clusterAlias,
270+
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(0)
271+
.setSuccessfulShards(0)
272+
.setSkippedShards(0)
273+
.setFailedShards(0)
274+
.build()
275+
);
276+
}
277+
}
278+
}
279+
263280
// For queries like: FROM logs* | LIMIT 0 (including cross-cluster LIMIT 0 queries)
264281
private static void updateExecutionInfoAfterCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) {
265282
execInfo.markEndQuery(); // TODO: revisit this time recording model as part of INLINESTATS improvements
266283
if (execInfo.isCrossClusterSearch()) {
267284
assert execInfo.planningTookTime() != null : "Planning took time should be set on EsqlExecutionInfo but is null";
268285
for (String clusterAlias : execInfo.clusterAliases()) {
269286
execInfo.swapCluster(clusterAlias, (k, v) -> {
270-
var builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.overallTook())
271-
.setTotalShards(0)
272-
.setSuccessfulShards(0)
273-
.setSkippedShards(0)
274-
.setFailedShards(0);
287+
var builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.overallTook());
275288
if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
276289
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL);
277290
}
@@ -324,9 +337,8 @@ private void startComputeOnDataNodes(
324337
executionInfo.swapCluster(
325338
clusterAlias,
326339
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(dataNodeResult.totalShards())
327-
.setSuccessfulShards(dataNodeResult.totalShards())
340+
// do not set successful or failed shard count here - do it when search is done
328341
.setSkippedShards(dataNodeResult.skippedShards())
329-
.setFailedShards(0)
330342
.build()
331343
);
332344

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,8 @@ private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener
151151

152152
@Override
153153
public void execute(EsqlQueryRequest request, EsqlQueryTask task, ActionListener<EsqlQueryResponse> listener) {
154+
// set EsqlExecutionInfo on async-search task so that it is accessible to GET _query/async while the query is still running
155+
task.setExecutionInfo(createEsqlExecutionInfo(request));
154156
ActionListener.run(listener, l -> innerExecute(task, request, l));
155157
}
156158

@@ -170,10 +172,9 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<Es
170172
System.nanoTime()
171173
);
172174
String sessionId = sessionID(task);
173-
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(
174-
clusterAlias -> remoteClusterService.isSkipUnavailable(clusterAlias),
175-
request.includeCCSMetadata()
176-
);
175+
// async-query uses EsqlQueryTask, so pull the EsqlExecutionInfo out of the task
176+
// sync query uses CancellableTask which does not have EsqlExecutionInfo, so create one
177+
EsqlExecutionInfo executionInfo = getOrCreateExecutionInfo(task, request);
177178
PlanRunner planRunner = (plan, resultListener) -> computeService.execute(
178179
sessionId,
179180
(CancellableTask) task,
@@ -194,6 +195,18 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<Es
194195
);
195196
}
196197

198+
private EsqlExecutionInfo getOrCreateExecutionInfo(Task task, EsqlQueryRequest request) {
199+
if (task instanceof EsqlQueryTask esqlQueryTask && esqlQueryTask.executionInfo() != null) {
200+
return esqlQueryTask.executionInfo();
201+
} else {
202+
return createEsqlExecutionInfo(request);
203+
}
204+
}
205+
206+
private EsqlExecutionInfo createEsqlExecutionInfo(EsqlQueryRequest request) {
207+
return new EsqlExecutionInfo(clusterAlias -> remoteClusterService.isSkipUnavailable(clusterAlias), request.includeCCSMetadata());
208+
}
209+
197210
private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Configuration configuration, Result result) {
198211
List<ColumnInfoImpl> columns = result.schema().stream().map(c -> new ColumnInfoImpl(c.name(), c.dataType().outputType())).toList();
199212
EsqlQueryResponse.Profile profile = configuration.profile() ? new EsqlQueryResponse.Profile(result.profiles()) : null;
@@ -269,7 +282,7 @@ public EsqlQueryResponse initialResponse(EsqlQueryTask task) {
269282
asyncExecutionId,
270283
true, // is_running
271284
true, // isAsync
272-
null
285+
task.executionInfo()
273286
);
274287
}
275288

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ public String sessionId() {
147147
* Execute an ESQL request.
148148
*/
149149
public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, PlanRunner planRunner, ActionListener<Result> listener) {
150+
assert executionInfo != null : "Null EsqlExecutionInfo";
150151
LOGGER.debug("ESQL query:\n{}", request.query());
151152
analyzedPlan(
152153
parse(request.query(), request.params()),

0 commit comments

Comments
 (0)