diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java
index 1bc23a24fc5ab..9ac259cfa46e5 100644
--- a/server/src/main/java/org/elasticsearch/TransportVersions.java
+++ b/server/src/main/java/org/elasticsearch/TransportVersions.java
@@ -322,6 +322,7 @@ static TransportVersion def(int id) {
public static final TransportVersion CLUSTER_STATE_PROJECTS_SETTINGS = def(9_108_0_00);
public static final TransportVersion ML_INFERENCE_ELASTIC_DENSE_TEXT_EMBEDDINGS_ADDED = def(9_109_00_0);
public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION = def(9_110_0_00);
+ public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN = def(9_111_0_00);
/*
* STOP! READ THIS FIRST! No, really,
diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java
index 936916428538e..04c2ff20627a7 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java
@@ -2713,9 +2713,11 @@ public static Request newXContentRequest(HttpMethod method, String endpoint, ToX
}
protected static MapMatcher getProfileMatcher() {
- return matchesMap().entry("query", instanceOf(Map.class))
+ return matchesMap() //
+ .entry("query", instanceOf(Map.class))
.entry("planning", instanceOf(Map.class))
- .entry("drivers", instanceOf(List.class));
+ .entry("drivers", instanceOf(List.class))
+ .entry("plans", instanceOf(List.class));
}
protected static MapMatcher getResultMatcher(boolean includeMetadata, boolean includePartial, boolean includeDocumentsFound) {
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverCompletionInfo.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverCompletionInfo.java
index bafcc076a46e8..7e63fe1681dd3 100644
--- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverCompletionInfo.java
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverCompletionInfo.java
@@ -7,6 +7,7 @@
package org.elasticsearch.compute.operator;
+import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
@@ -24,23 +25,34 @@
* roughly the number of documents times the number of
* fields per document. Except {@code null} values don't count.
* And multivalued fields count as many times as there are values.
- * @param collectedProfiles {@link DriverProfile}s from each driver. These are fairly cheap to build but
+ * @param driverProfiles {@link DriverProfile}s from each driver. These are fairly cheap to build but
* not free so this will be empty if the {@code profile} option was not set in
* the request.
*/
-public record DriverCompletionInfo(long documentsFound, long valuesLoaded, List collectedProfiles) implements Writeable {
+public record DriverCompletionInfo(
+ long documentsFound,
+ long valuesLoaded,
+ List driverProfiles,
+ List planProfiles
+) implements Writeable {
/**
* Completion info we use when we didn't properly complete any drivers.
* Usually this is returned with an error, but it's also used when receiving
* responses from very old nodes.
*/
- public static final DriverCompletionInfo EMPTY = new DriverCompletionInfo(0, 0, List.of());
+ public static final DriverCompletionInfo EMPTY = new DriverCompletionInfo(0, 0, List.of(), List.of());
/**
* Build a {@link DriverCompletionInfo} for many drivers including their profile output.
*/
- public static DriverCompletionInfo includingProfiles(List drivers) {
+ public static DriverCompletionInfo includingProfiles(
+ List drivers,
+ String description,
+ String clusterName,
+ String nodeName,
+ String planTree
+ ) {
long documentsFound = 0;
long valuesLoaded = 0;
List collectedProfiles = new ArrayList<>(drivers.size());
@@ -52,7 +64,12 @@ public static DriverCompletionInfo includingProfiles(List drivers) {
}
collectedProfiles.add(p);
}
- return new DriverCompletionInfo(documentsFound, valuesLoaded, collectedProfiles);
+ return new DriverCompletionInfo(
+ documentsFound,
+ valuesLoaded,
+ collectedProfiles,
+ List.of(new PlanProfile(description, clusterName, nodeName, planTree))
+ );
}
/**
@@ -69,33 +86,45 @@ public static DriverCompletionInfo excludingProfiles(List drivers) {
valuesLoaded += o.valuesLoaded();
}
}
- return new DriverCompletionInfo(documentsFound, valuesLoaded, List.of());
+ return new DriverCompletionInfo(documentsFound, valuesLoaded, List.of(), List.of());
}
- public DriverCompletionInfo(StreamInput in) throws IOException {
- this(in.readVLong(), in.readVLong(), in.readCollectionAsImmutableList(DriverProfile::readFrom));
+ public static DriverCompletionInfo readFrom(StreamInput in) throws IOException {
+ return new DriverCompletionInfo(
+ in.readVLong(),
+ in.readVLong(),
+ in.readCollectionAsImmutableList(DriverProfile::readFrom),
+ in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN)
+ ? in.readCollectionAsImmutableList(PlanProfile::readFrom)
+ : List.of()
+ );
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(documentsFound);
out.writeVLong(valuesLoaded);
- out.writeCollection(collectedProfiles, (o, v) -> v.writeTo(o));
+ out.writeCollection(driverProfiles);
+ if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN)) {
+ out.writeCollection(planProfiles);
+ }
}
public static class Accumulator {
private long documentsFound;
private long valuesLoaded;
- private final List collectedProfiles = new ArrayList<>();
+ private final List driverProfiles = new ArrayList<>();
+ private final List planProfiles = new ArrayList<>();
public void accumulate(DriverCompletionInfo info) {
this.documentsFound += info.documentsFound;
this.valuesLoaded += info.valuesLoaded;
- this.collectedProfiles.addAll(info.collectedProfiles);
+ this.driverProfiles.addAll(info.driverProfiles);
+ this.planProfiles.addAll(info.planProfiles);
}
public DriverCompletionInfo finish() {
- return new DriverCompletionInfo(documentsFound, valuesLoaded, collectedProfiles);
+ return new DriverCompletionInfo(documentsFound, valuesLoaded, driverProfiles, planProfiles);
}
}
@@ -103,15 +132,17 @@ public static class AtomicAccumulator {
private final AtomicLong documentsFound = new AtomicLong();
private final AtomicLong valuesLoaded = new AtomicLong();
private final List collectedProfiles = Collections.synchronizedList(new ArrayList<>());
+ private final List planProfiles = Collections.synchronizedList(new ArrayList<>());
public void accumulate(DriverCompletionInfo info) {
this.documentsFound.addAndGet(info.documentsFound);
this.valuesLoaded.addAndGet(info.valuesLoaded);
- this.collectedProfiles.addAll(info.collectedProfiles);
+ this.collectedProfiles.addAll(info.driverProfiles);
+ this.planProfiles.addAll(info.planProfiles);
}
public DriverCompletionInfo finish() {
- return new DriverCompletionInfo(documentsFound.get(), valuesLoaded.get(), collectedProfiles);
+ return new DriverCompletionInfo(documentsFound.get(), valuesLoaded.get(), collectedProfiles, planProfiles);
}
}
}
diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/PlanProfile.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/PlanProfile.java
new file mode 100644
index 0000000000000..502fa024d31ea
--- /dev/null
+++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/PlanProfile.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.xcontent.ToXContentObject;
+import org.elasticsearch.xcontent.XContentBuilder;
+
+import java.io.IOException;
+
+public record PlanProfile(String description, String clusterName, String nodeName, String planTree) implements Writeable, ToXContentObject {
+
+ public static PlanProfile readFrom(StreamInput in) throws IOException {
+ return new PlanProfile(in.readString(), in.readString(), in.readString(), in.readString());
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ out.writeString(description);
+ out.writeString(clusterName);
+ out.writeString(nodeName);
+ out.writeString(planTree);
+ }
+
+ @Override
+ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+ return builder.startObject()
+ .field("description", description)
+ .field("cluster_name", clusterName)
+ .field("node_name", nodeName)
+ .field("plan", planTree)
+ .endObject();
+ }
+}
diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java
index e14398d9c686a..27dd245121cd9 100644
--- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java
+++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java
@@ -317,7 +317,9 @@ private void testPushQuery(
result,
getResultMatcher(result).entry(
"profile",
- matchesMap().entry("drivers", instanceOf(List.class))
+ matchesMap() //
+ .entry("drivers", instanceOf(List.class))
+ .entry("plans", instanceOf(List.class))
.entry("planning", matchesMap().extraOk())
.entry("query", matchesMap().extraOk())
),
diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java
index 088fb3bbfce56..df4444f5a1e47 100644
--- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java
+++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java
@@ -112,7 +112,9 @@ private void testQuery(Double percent, String query, int documentsFound, boolean
matchesMap().entry("documents_found", documentsFound)
.entry(
"profile",
- matchesMap().entry("drivers", instanceOf(List.class))
+ matchesMap() //
+ .entry("drivers", instanceOf(List.class))
+ .entry("plans", instanceOf(List.class))
.entry("planning", matchesMap().extraOk())
.entry("query", matchesMap().extraOk())
)
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java
index d28652c2a0162..ac4960ce9a134 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java
@@ -20,6 +20,7 @@
import org.elasticsearch.compute.data.BlockStreamInput;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverProfile;
+import org.elasticsearch.compute.operator.PlanProfile;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
@@ -279,6 +280,7 @@ public Iterator extends ToXContent> toXContentChunked(ToXContent.Params params
return b;
}));
content.add(ChunkedToXContentHelper.array("drivers", profile.drivers.iterator(), params));
+ content.add(ChunkedToXContentHelper.array("plans", profile.plans.iterator()));
content.add(ChunkedToXContentHelper.endObject());
}
content.add(ChunkedToXContentHelper.endObject());
@@ -387,15 +389,23 @@ public EsqlResponse responseInternal() {
return esqlResponse;
}
- public record Profile(List drivers) implements Writeable {
+ public record Profile(List drivers, List plans) implements Writeable {
public static Profile readFrom(StreamInput in) throws IOException {
- return new Profile(in.readCollectionAsImmutableList(DriverProfile::readFrom));
+ return new Profile(
+ in.readCollectionAsImmutableList(DriverProfile::readFrom),
+ in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN)
+ ? in.readCollectionAsImmutableList(PlanProfile::readFrom)
+ : List.of()
+ );
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(drivers);
+ if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN)) {
+ out.writeCollection(plans);
+ }
}
}
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java
index 1a9b211d7a487..4f55a2a6e8cec 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java
@@ -59,10 +59,10 @@ final class ComputeResponse extends TransportResponse {
ComputeResponse(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)) {
- completionInfo = new DriverCompletionInfo(in);
+ completionInfo = DriverCompletionInfo.readFrom(in);
} else if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
if (in.readBoolean()) {
- completionInfo = new DriverCompletionInfo(0, 0, in.readCollectionAsImmutableList(DriverProfile::readFrom));
+ completionInfo = new DriverCompletionInfo(0, 0, in.readCollectionAsImmutableList(DriverProfile::readFrom), List.of());
} else {
completionInfo = DriverCompletionInfo.EMPTY;
}
@@ -96,7 +96,7 @@ public void writeTo(StreamOutput out) throws IOException {
completionInfo.writeTo(out);
} else if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
out.writeBoolean(true);
- out.writeCollection(completionInfo.collectedProfiles());
+ out.writeCollection(completionInfo.driverProfiles());
}
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
out.writeOptionalTimeValue(took);
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java
index a16e5f8bd78e0..6d15f88a26f12 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java
@@ -19,7 +19,6 @@
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.lucene.DataPartitioning;
-import org.elasticsearch.compute.operator.Driver;
import org.elasticsearch.compute.operator.DriverCompletionInfo;
import org.elasticsearch.compute.operator.DriverTaskRunner;
import org.elasticsearch.compute.operator.FailureCollector;
@@ -194,7 +193,7 @@ public void execute(
List subplans = subplansAndMainPlan.v1();
// we have no sub plans, so we can just execute the given plan
- if (subplans == null || subplans.size() == 0) {
+ if (subplans == null || subplans.isEmpty()) {
executePlan(sessionId, rootTask, physicalPlan, configuration, foldContext, execInfo, null, listener, null);
return;
}
@@ -230,7 +229,6 @@ public void execute(
);
Runnable cancelQueryOnFailure = cancelQueryOnFailure(rootTask);
- PhysicalPlan finalMainPlan = mainPlan;
try (
ComputeListener localListener = new ComputeListener(
@@ -238,11 +236,11 @@ public void execute(
cancelQueryOnFailure,
finalListener.map(profiles -> {
execInfo.markEndQuery();
- return new Result(finalMainPlan.output(), collectedPages, profiles, execInfo);
+ return new Result(mainPlan.output(), collectedPages, profiles, execInfo);
})
)
) {
- runCompute(rootTask, computeContext, finalMainPlan, localListener.acquireCompute());
+ runCompute(rootTask, computeContext, mainPlan, localListener.acquireCompute());
for (int i = 0; i < subplans.size(); i++) {
var subplan = subplans.get(i);
@@ -539,9 +537,7 @@ void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan,
@Override
public SourceProvider createSourceProvider() {
- final Supplier supplier = () -> super.createSourceProvider();
- return new ReinitializingSourceProvider(supplier);
-
+ return new ReinitializingSourceProvider(super::createSourceProvider);
}
};
contexts.add(
@@ -554,7 +550,6 @@ public SourceProvider createSourceProvider() {
searchService.getIndicesService().getAnalysis(),
defaultDataPartitioning
);
- final List drivers;
try {
LocalExecutionPlanner planner = new LocalExecutionPlanner(
context.sessionId(),
@@ -575,37 +570,40 @@ public SourceProvider createSourceProvider() {
LOGGER.debug("Received physical plan:\n{}", plan);
- plan = PlannerUtils.localPlan(context.searchExecutionContexts(), context.configuration(), context.foldCtx(), plan);
+ var localPlan = PlannerUtils.localPlan(context.searchExecutionContexts(), context.configuration(), context.foldCtx(), plan);
// the planner will also set the driver parallelism in LocalExecutionPlanner.LocalExecutionPlan (used down below)
// it's doing this in the planning of EsQueryExec (the source of the data)
// see also EsPhysicalOperationProviders.sourcePhysicalOperation
- LocalExecutionPlanner.LocalExecutionPlan localExecutionPlan = planner.plan(context.description(), context.foldCtx(), plan);
+ LocalExecutionPlanner.LocalExecutionPlan localExecutionPlan = planner.plan(context.description(), context.foldCtx(), localPlan);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Local execution plan:\n{}", localExecutionPlan.describe());
}
- drivers = localExecutionPlan.createDrivers(context.sessionId());
+ var drivers = localExecutionPlan.createDrivers(context.sessionId());
if (drivers.isEmpty()) {
throw new IllegalStateException("no drivers created");
}
LOGGER.debug("using {} drivers", drivers.size());
+ driverRunner.executeDrivers(
+ task,
+ drivers,
+ transportService.getThreadPool().executor(ESQL_WORKER_THREAD_POOL_NAME),
+ ActionListener.releaseAfter(listener.map(ignored -> {
+ if (context.configuration().profile()) {
+ return DriverCompletionInfo.includingProfiles(
+ drivers,
+ context.description(),
+ clusterService.getClusterName().value(),
+ transportService.getLocalNode().getName(),
+ localPlan.toString()
+ );
+ } else {
+ return DriverCompletionInfo.excludingProfiles(drivers);
+ }
+ }), () -> Releasables.close(drivers))
+ );
} catch (Exception e) {
listener.onFailure(e);
- return;
}
- ActionListener listenerCollectingStatus = listener.map(ignored -> {
- if (context.configuration().profile()) {
- return DriverCompletionInfo.includingProfiles(drivers);
- } else {
- return DriverCompletionInfo.excludingProfiles(drivers);
- }
- });
- listenerCollectingStatus = ActionListener.releaseAfter(listenerCollectingStatus, () -> Releasables.close(drivers));
- driverRunner.executeDrivers(
- task,
- drivers,
- transportService.getThreadPool().executor(ESQL_WORKER_THREAD_POOL_NAME),
- listenerCollectingStatus
- );
}
static PhysicalPlan reductionPlan(ExchangeSinkExec plan, boolean enable) {
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java
index 4de26e4034d81..cdf418df1526b 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java
@@ -15,6 +15,7 @@
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED;
@@ -33,12 +34,12 @@ final class DataNodeComputeResponse extends TransportResponse {
DataNodeComputeResponse(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)) {
- this.completionInfo = new DriverCompletionInfo(in);
+ this.completionInfo = DriverCompletionInfo.readFrom(in);
this.shardLevelFailures = in.readMap(ShardId::new, StreamInput::readException);
return;
}
if (DataNodeComputeHandler.supportShardLevelRetryFailure(in.getTransportVersion())) {
- this.completionInfo = new DriverCompletionInfo(0, 0, in.readCollectionAsImmutableList(DriverProfile::readFrom));
+ this.completionInfo = new DriverCompletionInfo(0, 0, in.readCollectionAsImmutableList(DriverProfile::readFrom), List.of());
this.shardLevelFailures = in.readMap(ShardId::new, StreamInput::readException);
return;
}
@@ -54,7 +55,7 @@ public void writeTo(StreamOutput out) throws IOException {
return;
}
if (DataNodeComputeHandler.supportShardLevelRetryFailure(out.getTransportVersion())) {
- out.writeCollection(completionInfo.collectedProfiles(), (o, v) -> v.writeTo(o));
+ out.writeCollection(completionInfo.driverProfiles());
out.writeMap(shardLevelFailures, (o, v) -> v.writeTo(o), StreamOutput::writeException);
return;
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java
index 673a551884912..4cc928fe07cb3 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java
@@ -339,7 +339,7 @@ private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Config
return new ColumnInfoImpl(c.name(), c.dataType().outputType(), originalTypes);
}).toList();
EsqlQueryResponse.Profile profile = configuration.profile()
- ? new EsqlQueryResponse.Profile(result.completionInfo().collectedProfiles())
+ ? new EsqlQueryResponse.Profile(result.completionInfo().driverProfiles(), result.completionInfo().planProfiles())
: null;
threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, "?0");
if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) {
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java
index 97407768b9347..f62d065518bdf 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java
@@ -12,8 +12,8 @@
import org.elasticsearch.compute.operator.AbstractPageMappingOperator;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.DriverSleeps;
-import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.OperatorStatus;
+import org.elasticsearch.compute.operator.PlanProfile;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.util.List;
@@ -26,12 +26,14 @@ protected Writeable.Reader instanceReader() {
@Override
protected EsqlQueryResponse.Profile createTestInstance() {
- return new EsqlQueryResponse.Profile(randomDriverProfiles());
+ return new EsqlQueryResponse.Profile(randomDriverProfiles(), randomPlanProfiles());
}
@Override
protected EsqlQueryResponse.Profile mutateInstance(EsqlQueryResponse.Profile instance) {
- return new EsqlQueryResponse.Profile(randomValueOtherThan(instance.drivers(), this::randomDriverProfiles));
+ return randomBoolean()
+ ? new EsqlQueryResponse.Profile(randomValueOtherThan(instance.drivers(), this::randomDriverProfiles), instance.plans())
+ : new EsqlQueryResponse.Profile(instance.drivers(), randomValueOtherThan(instance.plans(), this::randomPlanProfiles));
}
@Override
@@ -40,34 +42,41 @@ protected NamedWriteableRegistry getNamedWriteableRegistry() {
}
private List randomDriverProfiles() {
- return randomList(10, this::randomDriverProfile);
+ return randomList(
+ 10,
+ () -> new DriverProfile(
+ randomIdentifier(),
+ randomIdentifier(),
+ randomIdentifier(),
+ randomNonNegativeLong(),
+ randomNonNegativeLong(),
+ randomNonNegativeLong(),
+ randomNonNegativeLong(),
+ randomNonNegativeLong(),
+ randomList(10, this::randomOperatorStatus),
+ DriverSleeps.empty()
+ )
+ );
}
- private DriverProfile randomDriverProfile() {
- return new DriverProfile(
- randomIdentifier(),
- randomIdentifier(),
- randomIdentifier(),
- randomNonNegativeLong(),
- randomNonNegativeLong(),
- randomNonNegativeLong(),
- randomNonNegativeLong(),
- randomNonNegativeLong(),
- randomList(10, this::randomOperatorStatus),
- DriverSleeps.empty()
+ private List randomPlanProfiles() {
+ return randomList(
+ 10,
+ () -> new PlanProfile(randomIdentifier(), randomIdentifier(), randomIdentifier(), randomAlphanumericOfLength(1024))
);
}
private OperatorStatus randomOperatorStatus() {
- String name = randomAlphaOfLength(4);
- Operator.Status status = randomBoolean()
- ? null
- : new AbstractPageMappingOperator.Status(
- randomNonNegativeLong(),
- randomNonNegativeInt(),
- randomNonNegativeLong(),
- randomNonNegativeLong()
- );
- return new OperatorStatus(name, status);
+ return new OperatorStatus(
+ randomAlphaOfLength(4),
+ randomBoolean()
+ ? new AbstractPageMappingOperator.Status(
+ randomNonNegativeLong(),
+ randomNonNegativeInt(),
+ randomNonNegativeLong(),
+ randomNonNegativeLong()
+ )
+ : null
+ );
}
}
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java
index cd0389e757e8c..cb669c2d14937 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java
@@ -33,6 +33,7 @@
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.DriverSleeps;
import org.elasticsearch.compute.operator.OperatorStatus;
+import org.elasticsearch.compute.operator.PlanProfile;
import org.elasticsearch.compute.test.TestBlockFactory;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasables;
@@ -973,7 +974,8 @@ public void testProfileXContent() {
List.of(new OperatorStatus("asdf", new AbstractPageMappingOperator.Status(10021, 10, 111, 222))),
DriverSleeps.empty()
)
- )
+ ),
+ List.of(new PlanProfile("test", "elasticsearch", "node-1", "plan tree"))
),
false,
false,
@@ -1028,6 +1030,14 @@ public void testProfileXContent() {
"last" : [ ]
}
}
+ ],
+ "plans" : [
+ {
+ "description" : "test",
+ "cluster_name" : "elasticsearch",
+ "node_name" : "node-1",
+ "plan" : "plan tree"
+ }
]
}
}"""));
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java
index cac20924ed3b4..88d12b163cf01 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java
@@ -16,6 +16,7 @@
import org.elasticsearch.compute.operator.DriverCompletionInfo;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.DriverSleeps;
+import org.elasticsearch.compute.operator.PlanProfile;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.test.ESTestCase;
@@ -59,11 +60,13 @@ public void shutdownTransportService() {
}
private DriverCompletionInfo randomCompletionInfo() {
- int numProfiles = randomIntBetween(0, 2);
- List profiles = new ArrayList<>(numProfiles);
- for (int i = 0; i < numProfiles; i++) {
- profiles.add(
- new DriverProfile(
+ return new DriverCompletionInfo(
+ randomNonNegativeLong(),
+ randomNonNegativeLong(),
+ randomList(
+ 0,
+ 2,
+ () -> new DriverProfile(
randomIdentifier(),
randomIdentifier(),
randomIdentifier(),
@@ -75,9 +78,13 @@ private DriverCompletionInfo randomCompletionInfo() {
List.of(),
DriverSleeps.empty()
)
- );
- }
- return new DriverCompletionInfo(randomNonNegativeLong(), randomNonNegativeLong(), profiles);
+ ),
+ randomList(
+ 0,
+ 2,
+ () -> new PlanProfile(randomIdentifier(), randomIdentifier(), randomIdentifier(), randomAlphaOfLengthBetween(1, 1024))
+ )
+ );
}
public void testEmpty() {
@@ -86,7 +93,7 @@ public void testEmpty() {
assertFalse(results.isDone());
}
assertTrue(results.isDone());
- assertThat(results.actionGet(10, TimeUnit.SECONDS).collectedProfiles(), empty());
+ assertThat(results.actionGet(10, TimeUnit.SECONDS).driverProfiles(), empty());
}
public void testCollectComputeResults() {
@@ -109,7 +116,7 @@ public void testCollectComputeResults() {
var info = randomCompletionInfo();
documentsFound += info.documentsFound();
valuesLoaded += info.valuesLoaded();
- allProfiles.addAll(info.collectedProfiles());
+ allProfiles.addAll(info.driverProfiles());
ActionListener subListener = computeListener.acquireCompute();
threadPool.schedule(
ActionRunnable.wrap(subListener, l -> l.onResponse(info)),
@@ -123,7 +130,7 @@ public void testCollectComputeResults() {
assertThat(actual.documentsFound(), equalTo(documentsFound));
assertThat(actual.valuesLoaded(), equalTo(valuesLoaded));
assertThat(
- actual.collectedProfiles().stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)),
+ actual.driverProfiles().stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)),
equalTo(allProfiles.stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)))
);
assertThat(onFailure.get(), equalTo(0));
@@ -178,7 +185,7 @@ public void onResponse(DriverCompletionInfo result) {
assertThat(result.documentsFound(), equalTo(documentsFound.get()));
assertThat(result.valuesLoaded(), equalTo(valuesLoaded.get()));
assertThat(
- result.collectedProfiles().stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)),
+ result.driverProfiles().stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)),
equalTo(allProfiles.stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)))
);
Map> responseHeaders = threadPool.getThreadContext()
@@ -216,7 +223,7 @@ public void onFailure(Exception e) {
var resp = randomCompletionInfo();
documentsFound.addAndGet(resp.documentsFound());
valuesLoaded.addAndGet(resp.valuesLoaded());
- allProfiles.addAll(resp.collectedProfiles());
+ allProfiles.addAll(resp.driverProfiles());
int numWarnings = randomIntBetween(1, 5);
Map warnings = new HashMap<>();
for (int i = 0; i < numWarnings; i++) {