Skip to content

Commit 8aa8c76

Browse files
committed
Add reduction plan
1 parent 6f917f9 commit 8aa8c76

File tree

7 files changed

+49
-17
lines changed

7 files changed

+49
-17
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/PlanTimeProfile.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,31 +22,36 @@
2222
*
2323
*/
2424
public final class PlanTimeProfile implements Writeable, ToXContentObject {
25+
private long reductionPlanNanos;
2526
private long logicalOptimizationNanos;
2627
private long physicalOptimizationNanos;
2728

2829
/**
2930
* @param logicalOptimizationNanos Time spent on local logical plan optimization (in nanoseconds)
3031
* @param physicalOptimizationNanos Time spent on local physical plan optimization (in nanoseconds)
32+
* @param reductionPlanNanos Time spent on reduction plan for node_reduce phase (in nanoseconds)
3133
*/
32-
public PlanTimeProfile(long logicalOptimizationNanos, long physicalOptimizationNanos) {
34+
public PlanTimeProfile(long logicalOptimizationNanos, long physicalOptimizationNanos, long reductionPlanNanos) {
3335
this.logicalOptimizationNanos = logicalOptimizationNanos;
3436
this.physicalOptimizationNanos = physicalOptimizationNanos;
37+
this.reductionPlanNanos = reductionPlanNanos;
3538
}
3639

3740
public PlanTimeProfile() {
3841
this.logicalOptimizationNanos = 0L;
3942
this.physicalOptimizationNanos = 0L;
43+
this.reductionPlanNanos = 0L;
4044
}
4145

4246
public PlanTimeProfile(StreamInput in) throws IOException {
43-
this(in.readVLong(), in.readVLong());
47+
this(in.readVLong(), in.readVLong(), in.readVLong());
4448
}
4549

4650
@Override
4751
public void writeTo(StreamOutput out) throws IOException {
4852
out.writeVLong(logicalOptimizationNanos);
4953
out.writeVLong(physicalOptimizationNanos);
54+
out.writeVLong(reductionPlanNanos);
5055
}
5156

5257
@Override
@@ -57,6 +62,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
5762
if (physicalOptimizationNanos > 0) {
5863
builder.field("physical_optimization_nanos", physicalOptimizationNanos);
5964
}
65+
if (reductionPlanNanos > 0) {
66+
builder.field("reduction_nanos", physicalOptimizationNanos);
67+
}
6068
return builder;
6169
}
6270

@@ -68,18 +76,23 @@ public void addPhysicalOptimizationPlanTime(long physicalOptimizationPlanTime) {
6876
this.physicalOptimizationNanos = this.physicalOptimizationNanos + physicalOptimizationPlanTime;
6977
}
7078

79+
public void addReductionPlanNanos(long reductionPlanNanos) {
80+
this.reductionPlanNanos = this.reductionPlanNanos + reductionPlanNanos;
81+
}
82+
7183
@Override
7284
public boolean equals(Object obj) {
7385
if (obj == this) return true;
7486
if (obj == null || obj.getClass() != this.getClass()) return false;
7587
var that = (PlanTimeProfile) obj;
7688
return this.logicalOptimizationNanos == that.logicalOptimizationNanos
77-
&& this.physicalOptimizationNanos == that.physicalOptimizationNanos;
89+
&& this.physicalOptimizationNanos == that.physicalOptimizationNanos
90+
&& this.reductionPlanNanos == that.reductionPlanNanos;
7891
}
7992

8093
@Override
8194
public int hashCode() {
82-
return Objects.hash(logicalOptimizationNanos, physicalOptimizationNanos);
95+
return Objects.hash(logicalOptimizationNanos, physicalOptimizationNanos, reductionPlanNanos);
8396
}
8497

8598
@Override
@@ -90,6 +103,9 @@ public String toString() {
90103
+ ", "
91104
+ "physicalOptimizationNanos="
92105
+ physicalOptimizationNanos
106+
+ ", "
107+
+ "reductionPlanNanos="
108+
+ reductionPlanNanos
93109
+ ']';
94110
}
95111

x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -372,12 +372,20 @@ public void testProfile() throws IOException {
372372
assertThat(plan.get("cluster_name"), equalTo("test-cluster"));
373373
assertThat(plan.get("node_name"), equalTo("test-cluster-0"));
374374
assertThat(plan.get("plan"), notNullValue());
375-
assertThat((int) plan.get("plan_nanos"), greaterThanOrEqualTo(0));
376375
String description = (String) plan.get("description");
377376
assertTrue("Unexpected plan description " + description, Set.of("final", "node_reduce", "data").contains(description));
378-
if ("final".equals(description) || "data".equals(description)) {
379-
assertThat((int) plan.get("logical_optimization_nanos"), greaterThanOrEqualTo(0));
380-
assertThat((int) plan.get("physical_optimization_nanos"), greaterThanOrEqualTo(0));
377+
switch(description) {
378+
case "final", "data" -> {
379+
assertThat((int) plan.get("logical_optimization_nanos"), greaterThanOrEqualTo(0));
380+
assertThat((int) plan.get("physical_optimization_nanos"), greaterThanOrEqualTo(0));
381+
assertFalse(plan.containsKey("reduction_nanos"));
382+
}
383+
case "node_reduce" -> {
384+
assertThat((int) plan.get("reduction_nanos"), greaterThanOrEqualTo(0));
385+
assertFalse(plan.containsKey("logical_optimization_nanos"));
386+
assertFalse(plan.containsKey("physical_optimization_nanos"));
387+
}
388+
default -> {}
381389
}
382390
}
383391
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,8 @@ void runComputeOnRemoteCluster(
268268
configuration.newFoldContext(),
269269
plan,
270270
true,
271-
false
271+
false,
272+
configuration.profile() ? new PlanTimeProfile() : null
272273
);
273274
PhysicalPlan coordinatorPlan = reductionPlan.nodeReducePlan();
274275
final AtomicReference<ComputeResponse> finalResponse = new AtomicReference<>();

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -742,8 +742,10 @@ static ReductionPlan reductionPlan(
742742
FoldContext foldCtx,
743743
ExchangeSinkExec originalPlan,
744744
boolean runNodeLevelReduction,
745-
boolean reduceNodeLateMaterialization
745+
boolean reduceNodeLateMaterialization,
746+
PlanTimeProfile planTimeProfile
746747
) {
748+
long startTime = planTimeProfile == null ? 0 : System.nanoTime();
747749
PhysicalPlan source = new ExchangeSourceExec(originalPlan.source(), originalPlan.output(), originalPlan.isIntermediateAgg());
748750
ReductionPlan defaultResult = new ReductionPlan(originalPlan.replaceChild(source), originalPlan);
749751
if (reduceNodeLateMaterialization == false && runNodeLevelReduction == false) {
@@ -755,7 +757,7 @@ static ReductionPlan reductionPlan(
755757
originalPlan
756758
);
757759
// The default plan is just the exchange source piped directly into the exchange sink.
758-
return switch (PlannerUtils.reductionPlan(originalPlan)) {
760+
ReductionPlan reductionPlan = switch (PlannerUtils.reductionPlan(originalPlan)) {
759761
case PlannerUtils.TopNReduction topN when reduceNodeLateMaterialization ->
760762
// In the case of TopN, the source output type is replaced since we're pulling the FieldExtractExec to the reduction node,
761763
// so essential we are splitting the TopNExec into two parts, similar to other aggregations, but unlike other aggregations,
@@ -770,6 +772,10 @@ static ReductionPlan reductionPlan(
770772
case PlannerUtils.ReducedPlan rp when runNodeLevelReduction -> placePlanBetweenExchanges.apply(rp.plan());
771773
default -> defaultResult;
772774
};
775+
if (planTimeProfile != null) {
776+
planTimeProfile.addReductionPlanNanos(System.nanoTime() - startTime);
777+
}
778+
return reductionPlan;
773779
}
774780

775781
String newChildSession(String session) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,9 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T
569569
// We can avoid synchronization (for the most part) since the array elements are never modified, and the array is only added to,
570570
// with its size being known before we start the computation.
571571
PlanTimeProfile planTimeProfile = null;
572+
if (configuration.profile()) {
573+
planTimeProfile = new PlanTimeProfile();
574+
}
572575
if (request.plan() instanceof ExchangeSinkExec plan) {
573576
reductionPlan = ComputeService.reductionPlan(
574577
computeService.plannerSettings(),
@@ -577,11 +580,9 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T
577580
configuration.newFoldContext(),
578581
plan,
579582
request.runNodeLevelReduction(),
580-
request.reductionLateMaterialization()
583+
request.reductionLateMaterialization(),
584+
planTimeProfile
581585
);
582-
if (configuration.profile()) {
583-
planTimeProfile = new PlanTimeProfile();
584-
}
585586
} else {
586587
listener.onFailure(new IllegalStateException("expected exchange sink for a remote compute; got " + request.plan()));
587588
return;

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ private List<PlanProfile> randomPlanProfiles() {
7474
}
7575

7676
private PlanTimeProfile randomPlanTimeProfile() {
77-
return randomBoolean() ? null : new PlanTimeProfile(randomNonNegativeLong(), randomNonNegativeLong());
77+
return randomBoolean() ? null : new PlanTimeProfile(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
7878
}
7979

8080
private OperatorStatus randomOperatorStatus() {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ private DriverCompletionInfo randomCompletionInfo() {
9595
}
9696

9797
private PlanTimeProfile randomPlanTimeProfile() {
98-
return randomBoolean() ? null : new PlanTimeProfile(randomNonNegativeLong(), randomNonNegativeLong());
98+
return randomBoolean() ? null : new PlanTimeProfile(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
9999
}
100100

101101
public void testEmpty() {

0 commit comments

Comments
 (0)