Skip to content

Commit 03676ee

Browse files
committed
Add query plans to profile output (elastic#128828)
(cherry picked from commit 56d5009)
1 parent f562e63 commit 03676ee

File tree

13 files changed

+202
-90
lines changed

13 files changed

+202
-90
lines changed

test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2699,9 +2699,11 @@ public static Request newXContentRequest(HttpMethod method, String endpoint, ToX
26992699
}
27002700

27012701
protected static MapMatcher getProfileMatcher() {
2702-
return matchesMap().entry("query", instanceOf(Map.class))
2702+
return matchesMap() //
2703+
.entry("query", instanceOf(Map.class))
27032704
.entry("planning", instanceOf(Map.class))
2704-
.entry("drivers", instanceOf(List.class));
2705+
.entry("drivers", instanceOf(List.class))
2706+
.entry("plans", instanceOf(List.class));
27052707
}
27062708

27072709
protected static MapMatcher getResultMatcher(boolean includeMetadata, boolean includePartial, boolean includeDocumentsFound) {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverCompletionInfo.java

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.compute.operator;
99

10+
import org.elasticsearch.TransportVersions;
1011
import org.elasticsearch.common.io.stream.StreamInput;
1112
import org.elasticsearch.common.io.stream.StreamOutput;
1213
import org.elasticsearch.common.io.stream.Writeable;
@@ -24,23 +25,34 @@
2425
* <strong>roughly</strong> the number of documents times the number of
2526
* fields per document. Except {@code null} values don't count.
2627
* And multivalued fields count as many times as there are values.
27-
* @param collectedProfiles {@link DriverProfile}s from each driver. These are fairly cheap to build but
28+
* @param driverProfiles {@link DriverProfile}s from each driver. These are fairly cheap to build but
2829
* not free so this will be empty if the {@code profile} option was not set in
2930
* the request.
3031
*/
31-
public record DriverCompletionInfo(long documentsFound, long valuesLoaded, List<DriverProfile> collectedProfiles) implements Writeable {
32+
public record DriverCompletionInfo(
33+
long documentsFound,
34+
long valuesLoaded,
35+
List<DriverProfile> driverProfiles,
36+
List<PlanProfile> planProfiles
37+
) implements Writeable {
3238

3339
/**
3440
* Completion info we use when we didn't properly complete any drivers.
3541
* Usually this is returned with an error, but it's also used when receiving
3642
* responses from very old nodes.
3743
*/
38-
public static final DriverCompletionInfo EMPTY = new DriverCompletionInfo(0, 0, List.of());
44+
public static final DriverCompletionInfo EMPTY = new DriverCompletionInfo(0, 0, List.of(), List.of());
3945

4046
/**
4147
* Build a {@link DriverCompletionInfo} for many drivers including their profile output.
4248
*/
43-
public static DriverCompletionInfo includingProfiles(List<Driver> drivers) {
49+
public static DriverCompletionInfo includingProfiles(
50+
List<Driver> drivers,
51+
String description,
52+
String clusterName,
53+
String nodeName,
54+
String planTree
55+
) {
4456
long documentsFound = 0;
4557
long valuesLoaded = 0;
4658
List<DriverProfile> collectedProfiles = new ArrayList<>(drivers.size());
@@ -52,7 +64,12 @@ public static DriverCompletionInfo includingProfiles(List<Driver> drivers) {
5264
}
5365
collectedProfiles.add(p);
5466
}
55-
return new DriverCompletionInfo(documentsFound, valuesLoaded, collectedProfiles);
67+
return new DriverCompletionInfo(
68+
documentsFound,
69+
valuesLoaded,
70+
collectedProfiles,
71+
List.of(new PlanProfile(description, clusterName, nodeName, planTree))
72+
);
5673
}
5774

5875
/**
@@ -69,49 +86,63 @@ public static DriverCompletionInfo excludingProfiles(List<Driver> drivers) {
6986
valuesLoaded += o.valuesLoaded();
7087
}
7188
}
72-
return new DriverCompletionInfo(documentsFound, valuesLoaded, List.of());
89+
return new DriverCompletionInfo(documentsFound, valuesLoaded, List.of(), List.of());
7390
}
7491

75-
public DriverCompletionInfo(StreamInput in) throws IOException {
76-
this(in.readVLong(), in.readVLong(), in.readCollectionAsImmutableList(DriverProfile::new));
92+
public static DriverCompletionInfo readFrom(StreamInput in) throws IOException {
93+
return new DriverCompletionInfo(
94+
in.readVLong(),
95+
in.readVLong(),
96+
in.readCollectionAsImmutableList(DriverProfile::readFrom),
97+
in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN)
98+
? in.readCollectionAsImmutableList(PlanProfile::readFrom)
99+
: List.of()
100+
);
77101
}
78102

79103
@Override
80104
public void writeTo(StreamOutput out) throws IOException {
81105
out.writeVLong(documentsFound);
82106
out.writeVLong(valuesLoaded);
83-
out.writeCollection(collectedProfiles, (o, v) -> v.writeTo(o));
107+
out.writeCollection(driverProfiles);
108+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN)) {
109+
out.writeCollection(planProfiles);
110+
}
84111
}
85112

86113
public static class Accumulator {
87114
private long documentsFound;
88115
private long valuesLoaded;
89-
private final List<DriverProfile> collectedProfiles = new ArrayList<>();
116+
private final List<DriverProfile> driverProfiles = new ArrayList<>();
117+
private final List<PlanProfile> planProfiles = new ArrayList<>();
90118

91119
public void accumulate(DriverCompletionInfo info) {
92120
this.documentsFound += info.documentsFound;
93121
this.valuesLoaded += info.valuesLoaded;
94-
this.collectedProfiles.addAll(info.collectedProfiles);
122+
this.driverProfiles.addAll(info.driverProfiles);
123+
this.planProfiles.addAll(info.planProfiles);
95124
}
96125

97126
public DriverCompletionInfo finish() {
98-
return new DriverCompletionInfo(documentsFound, valuesLoaded, collectedProfiles);
127+
return new DriverCompletionInfo(documentsFound, valuesLoaded, driverProfiles, planProfiles);
99128
}
100129
}
101130

102131
public static class AtomicAccumulator {
103132
private final AtomicLong documentsFound = new AtomicLong();
104133
private final AtomicLong valuesLoaded = new AtomicLong();
105134
private final List<DriverProfile> collectedProfiles = Collections.synchronizedList(new ArrayList<>());
135+
private final List<PlanProfile> planProfiles = Collections.synchronizedList(new ArrayList<>());
106136

107137
public void accumulate(DriverCompletionInfo info) {
108138
this.documentsFound.addAndGet(info.documentsFound);
109139
this.valuesLoaded.addAndGet(info.valuesLoaded);
110-
this.collectedProfiles.addAll(info.collectedProfiles);
140+
this.collectedProfiles.addAll(info.driverProfiles);
141+
this.planProfiles.addAll(info.planProfiles);
111142
}
112143

113144
public DriverCompletionInfo finish() {
114-
return new DriverCompletionInfo(documentsFound.get(), valuesLoaded.get(), collectedProfiles);
145+
return new DriverCompletionInfo(documentsFound.get(), valuesLoaded.get(), collectedProfiles, planProfiles);
115146
}
116147
}
117148
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator;
9+
10+
import org.elasticsearch.common.io.stream.StreamInput;
11+
import org.elasticsearch.common.io.stream.StreamOutput;
12+
import org.elasticsearch.common.io.stream.Writeable;
13+
import org.elasticsearch.xcontent.ToXContentObject;
14+
import org.elasticsearch.xcontent.XContentBuilder;
15+
16+
import java.io.IOException;
17+
18+
public record PlanProfile(String description, String clusterName, String nodeName, String planTree) implements Writeable, ToXContentObject {
19+
20+
public static PlanProfile readFrom(StreamInput in) throws IOException {
21+
return new PlanProfile(in.readString(), in.readString(), in.readString(), in.readString());
22+
}
23+
24+
@Override
25+
public void writeTo(StreamOutput out) throws IOException {
26+
out.writeString(description);
27+
out.writeString(clusterName);
28+
out.writeString(nodeName);
29+
out.writeString(planTree);
30+
}
31+
32+
@Override
33+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
34+
return builder.startObject()
35+
.field("description", description)
36+
.field("cluster_name", clusterName)
37+
.field("node_name", nodeName)
38+
.field("plan", planTree)
39+
.endObject();
40+
}
41+
}

x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,9 @@ private void testPushQuery(
317317
result,
318318
getResultMatcher(result).entry(
319319
"profile",
320-
matchesMap().entry("drivers", instanceOf(List.class))
320+
matchesMap() //
321+
.entry("drivers", instanceOf(List.class))
322+
.entry("plans", instanceOf(List.class))
321323
.entry("planning", matchesMap().extraOk())
322324
.entry("query", matchesMap().extraOk())
323325
),

x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,9 @@ private void testQuery(Double percent, String query, int documentsFound, boolean
111111
matchesMap().entry("documents_found", documentsFound)
112112
.entry(
113113
"profile",
114-
matchesMap().entry("drivers", instanceOf(List.class))
114+
matchesMap() //
115+
.entry("drivers", instanceOf(List.class))
116+
.entry("plans", instanceOf(List.class))
115117
.entry("planning", matchesMap().extraOk())
116118
.entry("query", matchesMap().extraOk())
117119
)

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.compute.data.BlockStreamInput;
2121
import org.elasticsearch.compute.data.Page;
2222
import org.elasticsearch.compute.operator.DriverProfile;
23+
import org.elasticsearch.compute.operator.PlanProfile;
2324
import org.elasticsearch.core.AbstractRefCounted;
2425
import org.elasticsearch.core.Nullable;
2526
import org.elasticsearch.core.Releasable;
@@ -372,20 +373,23 @@ public EsqlResponse responseInternal() {
372373
return esqlResponse;
373374
}
374375

375-
public static class Profile implements Writeable {
376-
private final List<DriverProfile> drivers;
376+
public record Profile(List<DriverProfile> drivers, List<PlanProfile> plans) implements Writeable {
377377

378-
public Profile(List<DriverProfile> drivers) {
379-
this.drivers = drivers;
380-
}
381-
382-
public Profile(StreamInput in) throws IOException {
383-
this.drivers = in.readCollectionAsImmutableList(DriverProfile::new);
378+
public static Profile readFrom(StreamInput in) throws IOException {
379+
return new Profile(
380+
in.readCollectionAsImmutableList(DriverProfile::readFrom),
381+
in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN)
382+
? in.readCollectionAsImmutableList(PlanProfile::readFrom)
383+
: List.of()
384+
);
384385
}
385386

386387
@Override
387388
public void writeTo(StreamOutput out) throws IOException {
388389
out.writeCollection(drivers);
390+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN)) {
391+
out.writeCollection(plans);
392+
}
389393
}
390394

391395
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public void writeTo(StreamOutput out) throws IOException {
9696
completionInfo.writeTo(out);
9797
} else if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
9898
out.writeBoolean(true);
99-
out.writeCollection(completionInfo.collectedProfiles());
99+
out.writeCollection(completionInfo.driverProfiles());
100100
}
101101
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
102102
out.writeOptionalTimeValue(took);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.compute.data.BlockFactory;
1919
import org.elasticsearch.compute.data.Page;
2020
import org.elasticsearch.compute.lucene.DataPartitioning;
21-
import org.elasticsearch.compute.operator.Driver;
2221
import org.elasticsearch.compute.operator.DriverCompletionInfo;
2322
import org.elasticsearch.compute.operator.DriverTaskRunner;
2423
import org.elasticsearch.compute.operator.FailureCollector;
@@ -392,9 +391,7 @@ void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan,
392391

393392
@Override
394393
public SourceProvider createSourceProvider() {
395-
final Supplier<SourceProvider> supplier = () -> super.createSourceProvider();
396-
return new ReinitializingSourceProvider(supplier);
397-
394+
return new ReinitializingSourceProvider(super::createSourceProvider);
398395
}
399396
};
400397
contexts.add(
@@ -407,7 +404,6 @@ public SourceProvider createSourceProvider() {
407404
searchService.getIndicesService().getAnalysis(),
408405
defaultDataPartitioning
409406
);
410-
final List<Driver> drivers;
411407
try {
412408
LocalExecutionPlanner planner = new LocalExecutionPlanner(
413409
context.sessionId(),
@@ -428,37 +424,40 @@ public SourceProvider createSourceProvider() {
428424

429425
LOGGER.debug("Received physical plan:\n{}", plan);
430426

431-
plan = PlannerUtils.localPlan(context.searchExecutionContexts(), context.configuration(), context.foldCtx(), plan);
427+
var localPlan = PlannerUtils.localPlan(context.searchExecutionContexts(), context.configuration(), context.foldCtx(), plan);
432428
// the planner will also set the driver parallelism in LocalExecutionPlanner.LocalExecutionPlan (used down below)
433429
// it's doing this in the planning of EsQueryExec (the source of the data)
434430
// see also EsPhysicalOperationProviders.sourcePhysicalOperation
435431
LocalExecutionPlanner.LocalExecutionPlan localExecutionPlan = planner.plan(context.taskDescription(), context.foldCtx(), plan);
436432
if (LOGGER.isDebugEnabled()) {
437433
LOGGER.debug("Local execution plan:\n{}", localExecutionPlan.describe());
438434
}
439-
drivers = localExecutionPlan.createDrivers(context.sessionId());
435+
var drivers = localExecutionPlan.createDrivers(context.sessionId());
440436
if (drivers.isEmpty()) {
441437
throw new IllegalStateException("no drivers created");
442438
}
443439
LOGGER.debug("using {} drivers", drivers.size());
440+
driverRunner.executeDrivers(
441+
task,
442+
drivers,
443+
transportService.getThreadPool().executor(ESQL_WORKER_THREAD_POOL_NAME),
444+
ActionListener.releaseAfter(listener.map(ignored -> {
445+
if (context.configuration().profile()) {
446+
return DriverCompletionInfo.includingProfiles(
447+
drivers,
448+
context.description(),
449+
clusterService.getClusterName().value(),
450+
transportService.getLocalNode().getName(),
451+
localPlan.toString()
452+
);
453+
} else {
454+
return DriverCompletionInfo.excludingProfiles(drivers);
455+
}
456+
}), () -> Releasables.close(drivers))
457+
);
444458
} catch (Exception e) {
445459
listener.onFailure(e);
446-
return;
447460
}
448-
ActionListener<Void> listenerCollectingStatus = listener.map(ignored -> {
449-
if (context.configuration().profile()) {
450-
return DriverCompletionInfo.includingProfiles(drivers);
451-
} else {
452-
return DriverCompletionInfo.excludingProfiles(drivers);
453-
}
454-
});
455-
listenerCollectingStatus = ActionListener.releaseAfter(listenerCollectingStatus, () -> Releasables.close(drivers));
456-
driverRunner.executeDrivers(
457-
task,
458-
drivers,
459-
transportService.getThreadPool().executor(ESQL_WORKER_THREAD_POOL_NAME),
460-
listenerCollectingStatus
461-
);
462461
}
463462

464463
static PhysicalPlan reductionPlan(ExchangeSinkExec plan, boolean enable) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.transport.TransportResponse;
1616

1717
import java.io.IOException;
18+
import java.util.List;
1819
import java.util.Map;
1920

2021
import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19;
@@ -54,7 +55,7 @@ public void writeTo(StreamOutput out) throws IOException {
5455
return;
5556
}
5657
if (DataNodeComputeHandler.supportShardLevelRetryFailure(out.getTransportVersion())) {
57-
out.writeCollection(completionInfo.collectedProfiles(), (o, v) -> v.writeTo(o));
58+
out.writeCollection(completionInfo.driverProfiles());
5859
out.writeMap(shardLevelFailures, (o, v) -> v.writeTo(o), StreamOutput::writeException);
5960
return;
6061
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Config
334334
return new ColumnInfoImpl(c.name(), c.dataType().outputType(), originalTypes);
335335
}).toList();
336336
EsqlQueryResponse.Profile profile = configuration.profile()
337-
? new EsqlQueryResponse.Profile(result.completionInfo().collectedProfiles())
337+
? new EsqlQueryResponse.Profile(result.completionInfo().driverProfiles(), result.completionInfo().planProfiles())
338338
: null;
339339
threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, "?0");
340340
if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) {

0 commit comments

Comments
 (0)