Skip to content

Commit 6df9352

Browse files
committed
Feedback, pass 1
1 parent 00bc8b7 commit 6df9352

File tree

14 files changed

+43
-52
lines changed

14 files changed

+43
-52
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/CombineLimitTopN.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,29 +7,30 @@
77

88
package org.elasticsearch.xpack.esql.optimizer.rules.logical;
99

10-
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
10+
import org.elasticsearch.xpack.esql.expression.Foldables;
1111
import org.elasticsearch.xpack.esql.plan.logical.Limit;
1212
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
1313
import org.elasticsearch.xpack.esql.plan.logical.TopN;
1414

1515
/**
16-
* Combines a Limit followed by a TopN into a single TopN.
16+
* Combines a Limit immediately followed by a TopN into a single TopN.
17+
* This is needed because {@link HoistRemoteEnrichTopN} can create new limits that are not covered by the previous rules.
1718
*/
18-
public final class CombineLimitTopN extends OptimizerRules.ParameterizedOptimizerRule<Limit, LogicalOptimizerContext> {
19+
public final class CombineLimitTopN extends OptimizerRules.OptimizerRule<Limit> {
1920

2021
public CombineLimitTopN() {
2122
super(OptimizerRules.TransformDirection.DOWN);
2223
}
2324

2425
@Override
25-
public LogicalPlan rule(Limit limit, LogicalOptimizerContext ctx) {
26+
public LogicalPlan rule(Limit limit) {
2627
if (limit.child() instanceof TopN topn) {
27-
int thisLimitValue = (int) limit.limit().fold(ctx.foldCtx());
28-
int topNValue = (int) topn.limit().fold(ctx.foldCtx());
28+
int thisLimitValue = Foldables.limitValue(limit.limit(), limit.sourceText());
29+
int topNValue = Foldables.limitValue(topn.limit(), topn.sourceText());
2930
if (topNValue <= thisLimitValue) {
3031
return topn;
3132
} else {
32-
return new TopN(topn.source(), topn.child(), topn.order(), limit.limit(), topn.isLocal());
33+
return new TopN(topn.source(), topn.child(), topn.order(), limit.limit(), topn.local());
3334
}
3435
}
3536
return limit;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichLimit.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ protected LogicalPlan rule(Enrich en, LogicalOptimizerContext ctx) {
4141
// Enrich.
4242
Set<Limit> seenLimits = Collections.newSetFromMap(new IdentityHashMap<>());
4343
en.child().forEachDownMayReturnEarly((p, stop) -> {
44-
if (p instanceof Limit l && l.isLocal() == false) {
44+
if (p instanceof Limit l && l.local() == false) {
4545
seenLimits.add(l);
4646
return;
4747
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/HoistRemoteEnrichTopN.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
package org.elasticsearch.xpack.esql.optimizer.rules.logical;
99

1010
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
11-
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
1211
import org.elasticsearch.xpack.esql.plan.logical.CardinalityPreserving;
1312
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
1413
import org.elasticsearch.xpack.esql.plan.logical.Eval;
@@ -29,20 +28,18 @@
2928
* This is the same idea as {@link HoistRemoteEnrichLimit} but for TopN instead of Limit.
3029
* This must happen after {@link ReplaceLimitAndSortAsTopN}.
3130
*/
32-
public final class HoistRemoteEnrichTopN extends OptimizerRules.ParameterizedOptimizerRule<Enrich, LogicalOptimizerContext>
33-
implements
34-
OptimizerRules.CoordinatorOnly {
31+
public final class HoistRemoteEnrichTopN extends OptimizerRules.OptimizerRule<Enrich> implements OptimizerRules.CoordinatorOnly {
3532
public HoistRemoteEnrichTopN() {
3633
super(OptimizerRules.TransformDirection.UP);
3734
}
3835

3936
@Override
40-
protected LogicalPlan rule(Enrich en, LogicalOptimizerContext ctx) {
37+
protected LogicalPlan rule(Enrich en) {
4138
if (en.mode() == Enrich.Mode.REMOTE) {
4239
LogicalPlan plan = en.child();
4340
// This loop only takes care of one TopN, repeated application will stack them in correct order.
4441
while (true) {
45-
if (plan instanceof TopN top && top.isLocal() == false) {
42+
if (plan instanceof TopN top && top.local() == false) {
4643
// Create a fake OrderBy and "push" Enrich through it to generate aliases
4744
Enrich topWithEnrich = (Enrich) en.replaceChild(new OrderBy(top.source(), en.child(), top.order()));
4845
LogicalPlan pushPlan = PushDownUtils.pushGeneratingPlanPastProjectAndOrderBy(topWithEnrich);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/OptimizerRules.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ public final LogicalPlan apply(LogicalPlan plan, P context) {
110110
protected abstract LogicalPlan rule(SubPlan plan, P context);
111111
}
112112

113-
public interface CoordinatorOnly {
114-
// This rule should only be applied on the coordinator plan, not for local plan
115-
}
113+
/**
114+
* Marker interface: This rule should only be applied on the coordinator plan, not for a local plan.
115+
*/
116+
public interface CoordinatorOnly {}
116117
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineLimits.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.optimizer.rules.logical;
99

10+
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
1011
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
1112
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
1213
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
@@ -34,7 +35,7 @@ public PushDownAndCombineLimits() {
3435
@Override
3536
public LogicalPlan rule(Limit limit, LogicalOptimizerContext ctx) {
3637
if (limit.child() instanceof Limit childLimit) {
37-
return limit.combine(childLimit, ctx.foldCtx());
38+
return combineLimits(limit, childLimit, ctx.foldCtx());
3839
} else if (limit.child() instanceof UnaryPlan unary) {
3940
if (unary instanceof Eval || unary instanceof Project || unary instanceof RegexExtract || unary instanceof InferencePlan<?>) {
4041
// Push the limit under unary
@@ -79,6 +80,18 @@ public LogicalPlan rule(Limit limit, LogicalOptimizerContext ctx) {
7980
return limit;
8081
}
8182

83+
private static Limit combineLimits(Limit upper, Limit lower, FoldContext ctx) {
84+
// Keep the smallest limit
85+
var thisLimitValue = (int) upper.limit().fold(ctx);
86+
var otherLimitValue = (int) lower.limit().fold(ctx);
87+
// We want to preserve the duplicated() value of the smaller limit.
88+
if (otherLimitValue <= thisLimitValue) {
89+
return lower.withLocal(upper.local() || lower.local());
90+
} else {
91+
return new Limit(upper.source(), upper.limit(), lower.child(), upper.duplicated(), upper.local() || lower.local());
92+
}
93+
}
94+
8295
/**
8396
* Checks the existence of another 'visible' Limit, that exists behind an operation that doesn't produce output more data than
8497
* its input (that is not a relation/source nor aggregation).

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/ChangePoint.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
public class ChangePoint extends UnaryPlan
4848
implements
4949
SurrogateLogicalPlan,
50-
CardinalityPreserving,
5150
PostAnalysisVerificationAware,
5251
LicenseAware,
5352
ExecutesOn.Coordinator {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/InlineStats.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,7 @@
3838
* underlying aggregate.
3939
* </p>
4040
*/
41-
public class InlineStats extends UnaryPlan
42-
implements
43-
NamedWriteable,
44-
SurrogateLogicalPlan,
45-
TelemetryAware,
46-
CardinalityPreserving,
47-
SortAgnostic {
41+
public class InlineStats extends UnaryPlan implements NamedWriteable, SurrogateLogicalPlan, TelemetryAware, SortAgnostic {
4842
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
4943
LogicalPlan.class,
5044
"InlineStats",

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Limit.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.elasticsearch.common.io.stream.StreamOutput;
1212
import org.elasticsearch.xpack.esql.capabilities.TelemetryAware;
1313
import org.elasticsearch.xpack.esql.core.expression.Expression;
14-
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
1514
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1615
import org.elasticsearch.xpack.esql.core.tree.Source;
1716
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
@@ -91,18 +90,6 @@ public Limit replaceChild(LogicalPlan newChild) {
9190
return new Limit(source(), limit, newChild, duplicated, local);
9291
}
9392

94-
public Limit combine(Limit other, FoldContext ctx) {
95-
// Keep the smallest limit
96-
var thisLimitValue = (int) limit.fold(ctx);
97-
var otherLimitValue = (int) other.limit.fold(ctx);
98-
// We want to preserve the duplicated() value of the smaller limit.
99-
if (otherLimitValue <= thisLimitValue) {
100-
return other.withLocal(local || other.local);
101-
} else {
102-
return new Limit(source(), limit, other.child(), duplicated, local || other.local);
103-
}
104-
}
105-
10693
public Expression limit() {
10794
return limit;
10895
}
@@ -115,7 +102,7 @@ public boolean duplicated() {
115102
return duplicated;
116103
}
117104

118-
public boolean isLocal() {
105+
public boolean local() {
119106
return local;
120107
}
121108

@@ -157,6 +144,6 @@ public boolean equals(Object obj) {
157144
@Override
158145
public ExecuteLocation executesOn() {
159146
// Global limit always needs to be on the coordinator
160-
return isLocal() ? ExecuteLocation.ANY : ExecuteLocation.COORDINATOR;
147+
return local ? ExecuteLocation.ANY : ExecuteLocation.COORDINATOR;
161148
}
162149
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Lookup.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
* Looks up values from the associated {@code tables}.
3333
* The class is supposed to be substituted by a {@link Join}.
3434
*/
35-
public class Lookup extends UnaryPlan implements SurrogateLogicalPlan, TelemetryAware, CardinalityPreserving, SortAgnostic {
35+
public class Lookup extends UnaryPlan implements SurrogateLogicalPlan, TelemetryAware, SortAgnostic {
3636
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Lookup", Lookup::new);
3737

3838
private final Expression tableName;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/OrderBy.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ public class OrderBy extends UnaryPlan
3333
PostAnalysisVerificationAware,
3434
PostOptimizationPlanVerificationAware,
3535
TelemetryAware,
36-
CardinalityPreserving,
3736
SortAgnostic,
3837
PipelineBreaker {
3938
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "OrderBy", OrderBy::new);

0 commit comments

Comments
 (0)