Skip to content

Commit 92311cd

Browse files
committed
Reapply "WIP: TopNAggregate nodes, exec not fully implemented yet"
This reverts commit ab0452b.
1 parent 531711b commit 92311cd

File tree

14 files changed

+468
-102
lines changed

14 files changed

+468
-102
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.InferNonNullAggConstraint;
1515
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.LocalPropagateEmptyRelation;
1616
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.ReplaceFieldWithConstantOrNull;
17+
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.ReplaceTopNAggregateWithTopNAndAggregate;
1718
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.ReplaceTopNWithLimitAndSort;
1819
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
1920
import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor;
@@ -38,6 +39,7 @@ public class LocalLogicalPlanOptimizer extends ParameterizedRuleExecutor<Logical
3839
new Batch<>(
3940
"Local rewrite",
4041
Limiter.ONCE,
42+
new ReplaceTopNAggregateWithTopNAndAggregate(),
4143
new ReplaceTopNWithLimitAndSort(),
4244
new ReplaceFieldWithConstantOrNull(),
4345
new InferIsNotNull(),

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateEvalFoldables;
2929
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateInlineEvals;
3030
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateNullable;
31-
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateTopNToAggregates;
31+
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceTopNAndAggregateWithTopNAggregate;
3232
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropgateUnmappedFields;
3333
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneColumns;
3434
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneEmptyPlans;
@@ -209,6 +209,6 @@ protected static Batch<LogicalPlan> operators() {
209209
}
210210

211211
protected static Batch<LogicalPlan> cleanup() {
212-
return new Batch<>("Clean Up", new ReplaceLimitAndSortAsTopN(), new PropagateTopNToAggregates(), new ReplaceRowAsLocalRelation(), new PropgateUnmappedFields());
212+
return new Batch<>("Clean Up", new ReplaceLimitAndSortAsTopN(), new ReplaceTopNAndAggregateWithTopNAggregate(), new ReplaceRowAsLocalRelation(), new PropgateUnmappedFields());
213213
}
214214
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.elasticsearch.xpack.esql.VerificationException;
1111
import org.elasticsearch.xpack.esql.common.Failures;
1212
import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns;
13-
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateTopNToAggregates;
1413
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
1514
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
1615
import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PropagateTopNToAggregates.java renamed to x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/ReplaceTopNAndAggregateWithTopNAggregate.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,18 @@
1010
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
1111
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
1212
import org.elasticsearch.xpack.esql.plan.logical.TopN;
13+
import org.elasticsearch.xpack.esql.plan.logical.TopNAggregate;
1314
import org.elasticsearch.xpack.esql.rule.Rule;
1415

1516
/**
1617
* Looks for the structure:
1718
* <pre>
18-
* TopN
19-
* \_Aggregate
19+
* {@link TopN}
20+
* \_{@link Aggregate}
2021
* </pre>
21-
* And replaces the Aggregate with an Aggregate with the TopN data.
22-
* (TODO: Create a new TopNAggregate node instead)
22+
* And replaces it with {@link TopNAggregate}.
2323
*/
24-
public class PropagateTopNToAggregates extends Rule<TopN, LogicalPlan> {
24+
public class ReplaceTopNAndAggregateWithTopNAggregate extends Rule<TopN, LogicalPlan> {
2525

2626
@Override
2727
public LogicalPlan apply(LogicalPlan plan) {
@@ -31,18 +31,16 @@ public LogicalPlan apply(LogicalPlan plan) {
3131
);
3232
}
3333

34-
private TopN applyRule(TopN topN) {
34+
private LogicalPlan applyRule(TopN topN) {
3535
// TODO: Handle TimeSeriesAggregate
3636
if (topN.child() instanceof Aggregate aggregate) {
37-
return topN.replaceChild(
38-
new Aggregate(
39-
aggregate.source(),
40-
aggregate.child(),
41-
aggregate.groupings(),
42-
aggregate.aggregates(),
43-
topN.order(),
44-
topN.limit()
45-
)
37+
return new TopNAggregate(
38+
aggregate.source(),
39+
aggregate.child(),
40+
aggregate.groupings(),
41+
aggregate.aggregates(),
42+
topN.order(),
43+
topN.limit()
4644
);
4745
}
4846
return topN;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.optimizer.rules.logical.local;
9+
10+
import org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules;
11+
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
12+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
13+
import org.elasticsearch.xpack.esql.plan.logical.TopN;
14+
import org.elasticsearch.xpack.esql.plan.logical.TopNAggregate;
15+
16+
import static org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules.TransformDirection.UP;
17+
18+
/**
19+
* Break TopNAggregate back into TopN + Aggregate to allow the order rules to kick in.
20+
*/
21+
public class ReplaceTopNAggregateWithTopNAndAggregate extends OptimizerRules.OptimizerRule<TopNAggregate> {
22+
public ReplaceTopNAggregateWithTopNAndAggregate() {
23+
super(UP);
24+
}
25+
26+
@Override
27+
protected LogicalPlan rule(TopNAggregate plan) {
28+
return new TopN(
29+
plan.source(),
30+
new Aggregate(plan.source(), plan.child(), plan.groupings(), plan.aggregates()),
31+
plan.order(),
32+
plan.limit()
33+
);
34+
}
35+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.xpack.esql.plan.logical.Sample;
2525
import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate;
2626
import org.elasticsearch.xpack.esql.plan.logical.TopN;
27+
import org.elasticsearch.xpack.esql.plan.logical.TopNAggregate;
2728
import org.elasticsearch.xpack.esql.plan.logical.inference.Completion;
2829
import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank;
2930
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
@@ -52,6 +53,7 @@
5253
import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
5354
import org.elasticsearch.xpack.esql.plan.physical.SubqueryExec;
5455
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
56+
import org.elasticsearch.xpack.esql.plan.physical.TopNAggregateExec;
5557
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
5658
import org.elasticsearch.xpack.esql.plan.physical.inference.CompletionExec;
5759
import org.elasticsearch.xpack.esql.plan.physical.inference.RerankExec;
@@ -91,7 +93,8 @@ public static List<NamedWriteableRegistry.Entry> logical() {
9193
Rerank.ENTRY,
9294
Sample.ENTRY,
9395
TimeSeriesAggregate.ENTRY,
94-
TopN.ENTRY
96+
TopN.ENTRY,
97+
TopNAggregate.ENTRY
9598
);
9699
}
97100

@@ -121,7 +124,8 @@ public static List<NamedWriteableRegistry.Entry> physical() {
121124
ShowExec.ENTRY,
122125
SubqueryExec.ENTRY,
123126
TimeSeriesAggregateExec.ENTRY,
124-
TopNExec.ENTRY
127+
TopNExec.ENTRY,
128+
TopNAggregateExec.ENTRY
125129
);
126130
}
127131
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/QueryPlan.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ public AttributeSet references() {
8989
/**
9090
* This very likely needs to be overridden for {@link QueryPlan#references} to be correct when inheriting.
9191
* This can be called on unresolved plans and therefore must not rely on calls to {@link QueryPlan#output()}.
92+
* @see #references()
9293
*/
9394
protected AttributeSet computeReferences() {
9495
return Expressions.references(expressions());

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java

Lines changed: 2 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1111
import org.elasticsearch.common.io.stream.StreamInput;
1212
import org.elasticsearch.common.io.stream.StreamOutput;
13-
import org.elasticsearch.core.Nullable;
1413
import org.elasticsearch.index.IndexMode;
1514
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisVerificationAware;
1615
import org.elasticsearch.xpack.esql.capabilities.TelemetryAware;
@@ -28,7 +27,6 @@
2827
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
2928
import org.elasticsearch.xpack.esql.core.tree.Source;
3029
import org.elasticsearch.xpack.esql.core.util.Holder;
31-
import org.elasticsearch.xpack.esql.expression.Order;
3230
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
3331
import org.elasticsearch.xpack.esql.expression.function.aggregate.FilteredExpression;
3432
import org.elasticsearch.xpack.esql.expression.function.aggregate.Rate;
@@ -57,32 +55,12 @@ public class Aggregate extends UnaryPlan implements PostAnalysisVerificationAwar
5755
protected final List<Expression> groupings;
5856
protected final List<? extends NamedExpression> aggregates;
5957

60-
private final List<Order> order;
61-
@Nullable
62-
private final Expression limit;
63-
6458
protected List<Attribute> lazyOutput;
6559

6660
public Aggregate(Source source, LogicalPlan child, List<Expression> groupings, List<? extends NamedExpression> aggregates) {
6761
super(source, child);
6862
this.groupings = groupings;
6963
this.aggregates = aggregates;
70-
this.order = List.of();
71-
this.limit = null;
72-
}
73-
74-
public Aggregate(
75-
Source source,
76-
LogicalPlan child,
77-
List<Expression> groupings, List<? extends NamedExpression> aggregates,
78-
List<Order> order,
79-
@Nullable Expression limit
80-
) {
81-
super(source, child);
82-
this.groupings = groupings;
83-
this.aggregates = aggregates;
84-
this.order = order;
85-
this.limit = limit;
8664
}
8765

8866
public Aggregate(StreamInput in) throws IOException {
@@ -93,13 +71,6 @@ public Aggregate(StreamInput in) throws IOException {
9371
}
9472
this.groupings = in.readNamedWriteableCollectionAsList(Expression.class);
9573
this.aggregates = in.readNamedWriteableCollectionAsList(NamedExpression.class);
96-
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_TOP_N_AGGREGATES)) {
97-
this.order = in.readCollectionAsList(Order::new);
98-
this.limit = in.readOptionalNamedWriteable(Expression.class);
99-
} else {
100-
this.order = emptyList();
101-
this.limit = null;
102-
}
10374
}
10475

10576
@Override
@@ -112,10 +83,6 @@ public void writeTo(StreamOutput out) throws IOException {
11283
}
11384
out.writeNamedWriteableCollection(groupings);
11485
out.writeNamedWriteableCollection(aggregates());
115-
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_TOP_N_AGGREGATES)) {
116-
out.writeCollection(order);
117-
out.writeOptionalNamedWriteable(limit);
118-
}
11986
}
12087

12188
@Override
@@ -130,15 +97,15 @@ protected NodeInfo<? extends Aggregate> info() {
13097

13198
@Override
13299
public Aggregate replaceChild(LogicalPlan newChild) {
133-
return new Aggregate(source(), newChild, groupings, aggregates, order, limit);
100+
return new Aggregate(source(), newChild, groupings, aggregates);
134101
}
135102

136103
public Aggregate with(List<Expression> newGroupings, List<? extends NamedExpression> newAggregates) {
137104
return with(child(), newGroupings, newAggregates);
138105
}
139106

140107
public Aggregate with(LogicalPlan child, List<Expression> newGroupings, List<? extends NamedExpression> newAggregates) {
141-
return new Aggregate(source(), child, newGroupings, newAggregates, order, limit);
108+
return new Aggregate(source(), child, newGroupings, newAggregates);
142109
}
143110

144111
public List<Expression> groupings() {
@@ -149,14 +116,6 @@ public List<? extends NamedExpression> aggregates() {
149116
return aggregates;
150117
}
151118

152-
public List<Order> order() {
153-
return order;
154-
}
155-
156-
public Expression limit() {
157-
return limit;
158-
}
159-
160119
@Override
161120
public String telemetryLabel() {
162121
return "STATS";

0 commit comments

Comments
 (0)