Skip to content

Commit 5855935

Browse files
committed
Make INLINESTATS (and subplans) work with CCS
1 parent 5f06c39 commit 5855935

File tree

5 files changed

+45
-9
lines changed

5 files changed

+45
-9
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
8383
private transient TimeSpan planningTimeSpan; // time elapsed since start of query to calling ComputeService.execute
8484
private TimeValue overallTook;
8585

86+
// Are we doing subplans? No need to serialize this because it is only relevant for the coordinator node.
87+
private transient boolean inSubplan = false;
88+
8689
// This is only used is tests.
8790
public EsqlExecutionInfo(boolean includeCCSMetadata) {
8891
this(Predicates.always(), includeCCSMetadata); // default all clusters to being skippable on failure
@@ -142,6 +145,7 @@ public void writeTo(StreamOutput out) throws IOException {
142145
out.writeOptionalWriteable(overallTimeSpan);
143146
out.writeOptionalWriteable(planningTimeSpan);
144147
}
148+
assert inSubplan == false : "Should not be serializing execution info while in subplans";
145149
}
146150

147151
public boolean includeCCSMetadata() {
@@ -168,8 +172,10 @@ public TimeValue planningTookTime() {
168172
*/
169173
public void markEndQuery() {
170174
assert relativeStart != null : "Relative start time must be set when markEndQuery is called";
171-
overallTimeSpan = relativeStart.stop();
172-
overallTook = overallTimeSpan.toTimeValue();
175+
if (isMainPlan()) {
176+
overallTimeSpan = relativeStart.stop();
177+
overallTook = overallTimeSpan.toTimeValue();
178+
}
173179
}
174180

175181
// for testing only - use markEndQuery in production code
@@ -358,6 +364,18 @@ public void clusterInfoInitializing(boolean clusterInfoInitializing) {
358364
this.clusterInfoInitializing = clusterInfoInitializing;
359365
}
360366

367+
public boolean isMainPlan() {
368+
return inSubplan == false;
369+
}
370+
371+
public void startSubPlan() {
372+
this.inSubplan = true;
373+
}
374+
375+
public void finishSubPlan() {
376+
this.inSubplan = false;
377+
}
378+
361379
/**
362380
* Represents the search metadata about a particular cluster involved in a cross-cluster search.
363381
* The Cluster object can represent either the local cluster or a remote cluster.

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,14 +158,15 @@ private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String cluster
158158
.setSuccessfulShards(resp.getSuccessfulShards())
159159
.setSkippedShards(resp.getSkippedShards())
160160
.setFailedShards(resp.getFailedShards());
161+
// TODO: correctly aggregate took time from subplans
161162
if (resp.getTook() != null) {
162163
builder.setTook(TimeValue.timeValueNanos(executionInfo.planningTookTime().nanos() + resp.getTook().nanos()));
163164
} else {
164165
// if the cluster is an older version and does not send back took time, then calculate it here on the coordinator
165166
// and leave shard info unset, so it is not shown in the CCS metadata section of the JSON response
166167
builder.setTook(executionInfo.tookSoFar());
167168
}
168-
if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
169+
if (executionInfo.isMainPlan() && v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
169170
builder.addFailures(resp.failures);
170171
if (executionInfo.isStopped() || resp.failedShards > 0 || resp.failures.isEmpty() == false) {
171172
builder.setStatus(EsqlExecutionInfo.Cluster.Status.PARTIAL);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ public void executePlan(
384384
cancelQueryOnFailure,
385385
listener.delegateFailureAndWrap((l, completionInfo) -> {
386386
failIfAllShardsFailed(execInfo, collectedPages);
387-
execInfo.markEndQuery(); // TODO: revisit this time recording model as part of INLINESTATS improvements
387+
execInfo.markEndQuery();
388388
l.onResponse(new Result(outputAttributes, collectedPages, completionInfo, execInfo));
389389
})
390390
)
@@ -401,7 +401,7 @@ public void executePlan(
401401
execInfo.swapCluster(LOCAL_CLUSTER, (k, v) -> {
402402
var tookTime = execInfo.tookSoFar();
403403
var builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(tookTime);
404-
if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
404+
if (execInfo.isMainPlan() && v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
405405
final Integer failedShards = execInfo.getCluster(LOCAL_CLUSTER).getFailedShards();
406406
// Set the local cluster status (including the final driver) to partial if the query was stopped
407407
// or encountered resolution or execution failures.
@@ -533,8 +533,8 @@ private static void updateShardCountForCoordinatorOnlyQuery(EsqlExecutionInfo ex
533533

534534
// For queries like: FROM logs* | LIMIT 0 (including cross-cluster LIMIT 0 queries)
535535
private static void updateExecutionInfoAfterCoordinatorOnlyQuery(EsqlExecutionInfo execInfo) {
536-
execInfo.markEndQuery(); // TODO: revisit this time recording model as part of INLINESTATS improvements
537-
if (execInfo.isCrossClusterSearch()) {
536+
execInfo.markEndQuery();
537+
if (execInfo.isCrossClusterSearch() && execInfo.isMainPlan()) {
538538
assert execInfo.planningTookTime() != null : "Planning took time should be set on EsqlExecutionInfo but is null";
539539
for (String clusterAlias : execInfo.clusterAliases()) {
540540
execInfo.swapCluster(clusterAlias, (k, v) -> {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ static boolean returnSuccessWithEmptyResult(EsqlExecutionInfo executionInfo, Exc
121121
}
122122

123123
static void updateExecutionInfoToReturnEmptyResult(EsqlExecutionInfo executionInfo, Exception e) {
124+
// This applies even for subplans - if we had an error and have to skip a cluster, then it will remain skipped.
124125
executionInfo.markEndQuery();
125126
Exception exceptionForResponse;
126127
if (e instanceof ConnectTransportException) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,13 @@ private void executeSubPlan(
262262
// Create a physical plan out of the logical sub-plan
263263
var physicalSubPlan = logicalPlanToPhysicalPlan(subPlans.stubReplacedSubPlan(), request);
264264

265-
runner.run(physicalSubPlan, listener.delegateFailureAndWrap((next, result) -> {
265+
executionInfo.startSubPlan();
266+
var listenerForSublan = listener.delegateResponse((l, e) -> {
267+
executionInfo.finishSubPlan();
268+
l.onFailure(e);
269+
});
270+
271+
runner.run(physicalSubPlan, listenerForSublan.delegateFailureAndWrap((next, result) -> {
266272
try {
267273
// Translate the subquery into a separate, coordinator based plan and the results 'broadcasted' as a local relation
268274
completionInfoAccumulator.accumulate(result.completionInfo());
@@ -282,6 +288,8 @@ private void executeSubPlan(
282288
var newSubPlan = firstSubPlan(newLogicalPlan);
283289

284290
if (newSubPlan == null) {// run the final "main" plan
291+
// TODO: failures!
292+
executionInfo.finishSubPlan();
285293
LOGGER.debug("Executing final plan:\n{}", newLogicalPlan);
286294
var newPhysicalPlan = logicalPlanToPhysicalPlan(newLogicalPlan, request);
287295
runner.run(newPhysicalPlan, next.delegateFailureAndWrap((finalListener, finalResult) -> {
@@ -291,7 +299,15 @@ private void executeSubPlan(
291299
);
292300
}));
293301
} else {// continue executing the subplans
294-
executeSubPlan(completionInfoAccumulator, newLogicalPlan, newSubPlan, executionInfo, runner, request, listener);
302+
executeSubPlan(
303+
completionInfoAccumulator,
304+
newLogicalPlan,
305+
newSubPlan,
306+
executionInfo,
307+
runner,
308+
request,
309+
listenerForSublan
310+
);
295311
}
296312
} finally {
297313
Releasables.closeExpectNoException(Releasables.wrap(Iterators.map(result.pages().iterator(), p -> p::releaseBlocks)));

0 commit comments

Comments
 (0)