Skip to content

Commit c0b09df

Browse files
authored
ESQL - Add planning detailed timing to profile information (elastic#138564)
1 parent d6f64a6 commit c0b09df

File tree

20 files changed

+377
-84
lines changed

20 files changed

+377
-84
lines changed

docs/changelog/138564.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 138564
2+
summary: ESQL - Add planning detailed timing to profile information
3+
area: "ES|QL"
4+
type: enhancement
5+
issues: []
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9236000
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
inference_ccm_enablement_service,9235000
1+
plan_profile_version,9236000

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverCompletionInfo.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ public static DriverCompletionInfo includingProfiles(
5151
String description,
5252
String clusterName,
5353
String nodeName,
54-
String planTree
54+
String planTree,
55+
PlanTimeProfile planTimeProfile
5556
) {
5657
long documentsFound = 0;
5758
long valuesLoaded = 0;
@@ -68,7 +69,7 @@ public static DriverCompletionInfo includingProfiles(
6869
documentsFound,
6970
valuesLoaded,
7071
collectedProfiles,
71-
List.of(new PlanProfile(description, clusterName, nodeName, planTree))
72+
List.of(new PlanProfile(description, clusterName, nodeName, planTree, planTimeProfile))
7273
);
7374
}
7475

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/PlanProfile.java

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.compute.operator;
99

10+
import org.elasticsearch.TransportVersion;
1011
import org.elasticsearch.common.io.stream.StreamInput;
1112
import org.elasticsearch.common.io.stream.StreamOutput;
1213
import org.elasticsearch.common.io.stream.Writeable;
@@ -15,10 +16,24 @@
1516

1617
import java.io.IOException;
1718

18-
public record PlanProfile(String description, String clusterName, String nodeName, String planTree) implements Writeable, ToXContentObject {
19+
public record PlanProfile(String description, String clusterName, String nodeName, String planTree, PlanTimeProfile planTimeProfile)
20+
implements
21+
Writeable,
22+
ToXContentObject {
23+
24+
private static final TransportVersion PLAN_PROFILE_VERSION = TransportVersion.fromName("plan_profile_version");
1925

2026
public static PlanProfile readFrom(StreamInput in) throws IOException {
21-
return new PlanProfile(in.readString(), in.readString(), in.readString(), in.readString());
27+
String description = in.readString();
28+
String clusterName = in.readString();
29+
String nodeName = in.readString();
30+
String planTree = in.readString();
31+
PlanTimeProfile profile = null;
32+
if (in.getTransportVersion().supports(PLAN_PROFILE_VERSION)) {
33+
profile = in.readOptionalWriteable(PlanTimeProfile::new);
34+
}
35+
36+
return new PlanProfile(description, clusterName, nodeName, planTree, profile);
2237
}
2338

2439
@Override
@@ -27,15 +42,22 @@ public void writeTo(StreamOutput out) throws IOException {
2742
out.writeString(clusterName);
2843
out.writeString(nodeName);
2944
out.writeString(planTree);
45+
if (out.getTransportVersion().supports(PLAN_PROFILE_VERSION)) {
46+
out.writeOptionalWriteable(planTimeProfile);
47+
}
3048
}
3149

3250
@Override
3351
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
34-
return builder.startObject()
35-
.field("description", description)
36-
.field("cluster_name", clusterName)
37-
.field("node_name", nodeName)
38-
.field("plan", planTree)
39-
.endObject();
52+
builder.startObject();
53+
builder.field("description", description);
54+
builder.field("cluster_name", clusterName);
55+
builder.field("node_name", nodeName);
56+
builder.field("plan", planTree);
57+
if (planTimeProfile != null) {
58+
planTimeProfile.toXContent(builder, params);
59+
}
60+
61+
return builder.endObject();
4062
}
4163
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator;
9+
10+
import org.elasticsearch.common.io.stream.StreamInput;
11+
import org.elasticsearch.common.io.stream.StreamOutput;
12+
import org.elasticsearch.common.io.stream.Writeable;
13+
import org.elasticsearch.xcontent.ToXContentObject;
14+
import org.elasticsearch.xcontent.XContentBuilder;
15+
16+
import java.io.IOException;
17+
import java.util.Objects;
18+
19+
/**
20+
* Profile information for plan optimization phases.
21+
* Captures timing information for logical and physical optimization steps.
22+
*
23+
*/
24+
public final class PlanTimeProfile implements Writeable, ToXContentObject {
25+
private long reductionPlanNanos;
26+
private long logicalOptimizationNanos;
27+
private long physicalOptimizationNanos;
28+
29+
/**
30+
* @param logicalOptimizationNanos Time spent on local logical plan optimization (in nanoseconds)
31+
* @param physicalOptimizationNanos Time spent on local physical plan optimization (in nanoseconds)
32+
* @param reductionPlanNanos Time spent on reduction plan for node_reduce phase (in nanoseconds)
33+
*/
34+
public PlanTimeProfile(long logicalOptimizationNanos, long physicalOptimizationNanos, long reductionPlanNanos) {
35+
this.logicalOptimizationNanos = logicalOptimizationNanos;
36+
this.physicalOptimizationNanos = physicalOptimizationNanos;
37+
this.reductionPlanNanos = reductionPlanNanos;
38+
}
39+
40+
public PlanTimeProfile() {
41+
this.logicalOptimizationNanos = 0L;
42+
this.physicalOptimizationNanos = 0L;
43+
this.reductionPlanNanos = 0L;
44+
}
45+
46+
public PlanTimeProfile(StreamInput in) throws IOException {
47+
this(in.readVLong(), in.readVLong(), in.readVLong());
48+
}
49+
50+
@Override
51+
public void writeTo(StreamOutput out) throws IOException {
52+
out.writeVLong(logicalOptimizationNanos);
53+
out.writeVLong(physicalOptimizationNanos);
54+
out.writeVLong(reductionPlanNanos);
55+
}
56+
57+
@Override
58+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
59+
if (logicalOptimizationNanos > 0) {
60+
builder.field("logical_optimization_nanos", logicalOptimizationNanos);
61+
}
62+
if (physicalOptimizationNanos > 0) {
63+
builder.field("physical_optimization_nanos", physicalOptimizationNanos);
64+
}
65+
if (reductionPlanNanos > 0) {
66+
builder.field("reduction_nanos", physicalOptimizationNanos);
67+
}
68+
return builder;
69+
}
70+
71+
public void addLogicalOptimizationPlanTime(long logicalOptimizationPlanTime) {
72+
this.logicalOptimizationNanos = this.logicalOptimizationNanos + logicalOptimizationPlanTime;
73+
}
74+
75+
public void addPhysicalOptimizationPlanTime(long physicalOptimizationPlanTime) {
76+
this.physicalOptimizationNanos = this.physicalOptimizationNanos + physicalOptimizationPlanTime;
77+
}
78+
79+
public void addReductionPlanNanos(long reductionPlanNanos) {
80+
this.reductionPlanNanos = this.reductionPlanNanos + reductionPlanNanos;
81+
}
82+
83+
@Override
84+
public boolean equals(Object obj) {
85+
if (obj == this) return true;
86+
if (obj == null || obj.getClass() != this.getClass()) return false;
87+
var that = (PlanTimeProfile) obj;
88+
return this.logicalOptimizationNanos == that.logicalOptimizationNanos
89+
&& this.physicalOptimizationNanos == that.physicalOptimizationNanos
90+
&& this.reductionPlanNanos == that.reductionPlanNanos;
91+
}
92+
93+
@Override
94+
public int hashCode() {
95+
return Objects.hash(logicalOptimizationNanos, physicalOptimizationNanos, reductionPlanNanos);
96+
}
97+
98+
@Override
99+
public String toString() {
100+
return "PlanTimeProfile["
101+
+ "logicalOptimizationNanos="
102+
+ logicalOptimizationNanos
103+
+ ", "
104+
+ "physicalOptimizationNanos="
105+
+ physicalOptimizationNanos
106+
+ ", "
107+
+ "reductionPlanNanos="
108+
+ reductionPlanNanos
109+
+ ']';
110+
}
111+
112+
}

x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
7474
import static org.hamcrest.Matchers.instanceOf;
7575
import static org.hamcrest.Matchers.not;
76+
import static org.hamcrest.Matchers.notNullValue;
7677
import static org.hamcrest.Matchers.oneOf;
7778
import static org.hamcrest.Matchers.startsWith;
7879
import static org.hamcrest.core.Is.is;
@@ -365,6 +366,29 @@ public void testProfile() throws IOException {
365366
default -> throw new IllegalArgumentException("can't match " + description);
366367
}
367368
}
369+
@SuppressWarnings("unchecked")
370+
List<Map<String, Object>> plans = (List<Map<String, Object>>) ((Map<String, Object>) result.get("profile")).get("plans");
371+
for (Map<String, Object> plan : plans) {
372+
assertThat(plan.get("cluster_name"), equalTo("test-cluster"));
373+
assertThat(plan.get("node_name"), notNullValue());
374+
assertThat(plan.get("plan"), notNullValue());
375+
String description = (String) plan.get("description");
376+
assertTrue("Unexpected plan description " + description, Set.of("final", "node_reduce", "data").contains(description));
377+
switch (description) {
378+
case "final", "data" -> {
379+
assertThat((int) plan.get("logical_optimization_nanos"), greaterThanOrEqualTo(0));
380+
assertThat((int) plan.get("physical_optimization_nanos"), greaterThanOrEqualTo(0));
381+
assertFalse(plan.containsKey("reduction_nanos"));
382+
}
383+
case "node_reduce" -> {
384+
assertThat((int) plan.get("reduction_nanos"), greaterThanOrEqualTo(0));
385+
assertFalse(plan.containsKey("logical_optimization_nanos"));
386+
assertFalse(plan.containsKey("physical_optimization_nanos"));
387+
}
388+
default -> {
389+
}
390+
}
391+
}
368392
}
369393

370394
private final String PROCESS_NAME = "process_name";
@@ -703,7 +727,7 @@ public void testForceSleepsProfile() throws IOException {
703727
String operators = p.get("operators").toString();
704728
MapMatcher sleepMatcher = matchesMap().entry("reason", "exchange empty")
705729
.entry("sleep_millis", greaterThan(0L))
706-
.entry("thread_name", Matchers.containsString("[esql_worker]")) // NB: this doesn't run in the test thread
730+
.entry("thread_name", containsString("[esql_worker]")) // NB: this doesn't run in the test thread
707731
.entry("wake_millis", greaterThan(0L));
708732
String description = p.get("description").toString();
709733
switch (description) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.compute.aggregation.AggregatorMode;
1414
import org.elasticsearch.compute.data.BlockFactory;
1515
import org.elasticsearch.compute.data.ElementType;
16+
import org.elasticsearch.compute.operator.PlanTimeProfile;
1617
import org.elasticsearch.core.Nullable;
1718
import org.elasticsearch.core.Tuple;
1819
import org.elasticsearch.index.IndexMode;
@@ -179,9 +180,10 @@ public static PhysicalPlan localPlan(
179180
List<SearchExecutionContext> searchContexts,
180181
Configuration configuration,
181182
FoldContext foldCtx,
182-
PhysicalPlan plan
183+
PhysicalPlan plan,
184+
PlanTimeProfile planTimeProfile
183185
) {
184-
return localPlan(plannerSettings, flags, configuration, foldCtx, plan, SearchContextStats.from(searchContexts));
186+
return localPlan(plannerSettings, flags, configuration, foldCtx, plan, SearchContextStats.from(searchContexts), planTimeProfile);
185187
}
186188

187189
public static PhysicalPlan localPlan(
@@ -190,14 +192,15 @@ public static PhysicalPlan localPlan(
190192
Configuration configuration,
191193
FoldContext foldCtx,
192194
PhysicalPlan plan,
193-
SearchStats searchStats
195+
SearchStats searchStats,
196+
PlanTimeProfile planTimeProfile
194197
) {
195198
final var logicalOptimizer = new LocalLogicalPlanOptimizer(new LocalLogicalOptimizerContext(configuration, foldCtx, searchStats));
196199
var physicalOptimizer = new LocalPhysicalPlanOptimizer(
197200
new LocalPhysicalOptimizerContext(plannerSettings, flags, configuration, foldCtx, searchStats)
198201
);
199202

200-
return localPlan(plan, logicalOptimizer, physicalOptimizer);
203+
return localPlan(plan, logicalOptimizer, physicalOptimizer, planTimeProfile);
201204
}
202205

203206
public static PhysicalPlan integrateEsFilterIntoFragment(PhysicalPlan plan, @Nullable QueryBuilder esFilter) {
@@ -215,7 +218,8 @@ public static PhysicalPlan integrateEsFilterIntoFragment(PhysicalPlan plan, @Nul
215218
public static PhysicalPlan localPlan(
216219
PhysicalPlan plan,
217220
LocalLogicalPlanOptimizer logicalOptimizer,
218-
LocalPhysicalPlanOptimizer physicalOptimizer
221+
LocalPhysicalPlanOptimizer physicalOptimizer,
222+
PlanTimeProfile planTimeProfile
219223
) {
220224
var isCoordPlan = new Holder<>(Boolean.TRUE);
221225
Set<PhysicalPlan> lookupJoinExecRightChildren = plan.collect(LookupJoinExec.class::isInstance)
@@ -231,19 +235,36 @@ public static PhysicalPlan localPlan(
231235
return f;
232236
}
233237
isCoordPlan.set(Boolean.FALSE);
238+
239+
// Logical optimization
240+
boolean profilingEnabled = planTimeProfile != null;
241+
long logicalStartNanos = profilingEnabled ? System.nanoTime() : 0;
234242
LogicalPlan optimizedFragment = logicalOptimizer.localOptimize(f.fragment());
235243
PhysicalPlan physicalFragment = LocalMapper.INSTANCE.map(optimizedFragment);
244+
if (profilingEnabled) {
245+
planTimeProfile.addLogicalOptimizationPlanTime(System.nanoTime() - logicalStartNanos);
246+
}
236247
QueryBuilder filter = f.esFilter();
237248
if (filter != null) {
238249
physicalFragment = physicalFragment.transformUp(
239250
EsSourceExec.class,
240251
query -> new EsSourceExec(Source.EMPTY, query.indexPattern(), query.indexMode(), query.output(), filter)
241252
);
242253
}
254+
255+
// Physical optimization
256+
long physicalStartNanos = profilingEnabled ? System.nanoTime() : 0;
243257
var localOptimized = physicalOptimizer.localOptimize(physicalFragment);
258+
if (profilingEnabled) {
259+
planTimeProfile.addPhysicalOptimizationPlanTime(System.nanoTime() - physicalStartNanos);
260+
}
261+
244262
return EstimatesRowSize.estimateRowSize(f.estimatedRowSize(), localOptimized);
245263
});
246-
return isCoordPlan.get() ? plan : localPhysicalPlan;
264+
265+
PhysicalPlan resultPlan = isCoordPlan.get() ? plan : localPhysicalPlan;
266+
267+
return resultPlan;
247268
}
248269

249270
/**

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.action.support.ChannelActionListener;
1414
import org.elasticsearch.compute.lucene.EmptyIndexedByShardId;
1515
import org.elasticsearch.compute.operator.DriverCompletionInfo;
16+
import org.elasticsearch.compute.operator.PlanTimeProfile;
1617
import org.elasticsearch.compute.operator.exchange.ExchangeService;
1718
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
1819
import org.elasticsearch.core.Releasable;
@@ -267,7 +268,8 @@ void runComputeOnRemoteCluster(
267268
configuration.newFoldContext(),
268269
plan,
269270
true,
270-
false
271+
false,
272+
configuration.profile() ? new PlanTimeProfile() : null
271273
);
272274
PhysicalPlan coordinatorPlan = reductionPlan.nodeReducePlan();
273275
final AtomicReference<ComputeResponse> finalResponse = new AtomicReference<>();
@@ -299,6 +301,7 @@ void runComputeOnRemoteCluster(
299301
() -> exchangeSink.createExchangeSink(() -> {})
300302
),
301303
coordinatorPlan,
304+
configuration.profile() ? new PlanTimeProfile() : null,
302305
computeListener.acquireCompute()
303306
);
304307
dataNodeComputeHandler.startComputeOnDataNodes(

0 commit comments

Comments
 (0)