Skip to content

Commit a2d4638

Browse files
committed
Replaces missing cases of AggregateExec to AbstractAggregateExec and added serialization tests
1 parent 4ce6465 commit a2d4638

File tree

6 files changed

+175
-14
lines changed

6 files changed

+175
-14
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats_top_n.csv-spec

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,12 @@ c:long | m:double | languages.long:long
3535
21 | 2.1 | 5
3636
10 | 2.1 | null
3737
;
38+
39+
keepGroupAfterStats
40+
from employees | stats MAX(height) BY height, languages.long | sort height desc | limit 3 | KEEP height;
41+
42+
height:double
43+
2.1
44+
2.1
45+
2.1
46+
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/SpatialDocValuesExtraction.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.elasticsearch.xpack.esql.expression.function.scalar.spatial.SpatialRelatesFunction;
2121
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
2222
import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules;
23-
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
23+
import org.elasticsearch.xpack.esql.plan.physical.AbstractAggregateExec;
2424
import org.elasticsearch.xpack.esql.plan.physical.EvalExec;
2525
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
2626
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
@@ -69,14 +69,14 @@
6969
* to be serialized between nodes, and is only used locally.
7070
*/
7171
public class SpatialDocValuesExtraction extends PhysicalOptimizerRules.ParameterizedOptimizerRule<
72-
AggregateExec,
72+
AbstractAggregateExec,
7373
LocalPhysicalOptimizerContext> {
7474
@Override
75-
protected PhysicalPlan rule(AggregateExec aggregate, LocalPhysicalOptimizerContext ctx) {
75+
protected PhysicalPlan rule(AbstractAggregateExec aggregate, LocalPhysicalOptimizerContext ctx) {
7676
var foundAttributes = new HashSet<FieldAttribute>();
7777

7878
PhysicalPlan plan = aggregate.transformDown(UnaryExec.class, exec -> {
79-
if (exec instanceof AggregateExec agg) {
79+
if (exec instanceof AbstractAggregateExec agg) {
8080
var orderedAggregates = new ArrayList<NamedExpression>();
8181
var changedAggregates = false;
8282
for (NamedExpression aggExpr : agg.aggregates()) {
@@ -171,7 +171,7 @@ private boolean foundField(Expression expression, Set<FieldAttribute> foundAttri
171171
private boolean allowedForDocValues(
172172
FieldAttribute fieldAttribute,
173173
SearchStats stats,
174-
AggregateExec agg,
174+
AbstractAggregateExec agg,
175175
Set<FieldAttribute> foundAttributes
176176
) {
177177
if (stats.hasDocValues(fieldAttribute.fieldName()) == false) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/SpatialShapeBoundsExtraction.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import org.elasticsearch.xpack.esql.expression.function.aggregate.SpatialExtent;
2020
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
2121
import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules.ParameterizedOptimizerRule;
22-
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
22+
import org.elasticsearch.xpack.esql.plan.physical.AbstractAggregateExec;
2323
import org.elasticsearch.xpack.esql.plan.physical.EvalExec;
2424
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
2525
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
@@ -47,25 +47,25 @@
4747
* not a check like {@code isNotNull}.</li>
4848
* </ul>
4949
*/
50-
public class SpatialShapeBoundsExtraction extends ParameterizedOptimizerRule<AggregateExec, LocalPhysicalOptimizerContext> {
50+
public class SpatialShapeBoundsExtraction extends ParameterizedOptimizerRule<AbstractAggregateExec, LocalPhysicalOptimizerContext> {
5151
@Override
52-
protected PhysicalPlan rule(AggregateExec aggregate, LocalPhysicalOptimizerContext ctx) {
52+
protected PhysicalPlan rule(AbstractAggregateExec aggregate, LocalPhysicalOptimizerContext ctx) {
5353
Set<Attribute> foundAttributes = findSpatialShapeBoundsAttributes(aggregate, ctx);
5454
if (foundAttributes.isEmpty()) {
5555
return aggregate;
5656
}
5757
return aggregate.transformDown(PhysicalPlan.class, exec -> switch (exec) {
58-
case AggregateExec agg -> transformAggregateExec(agg, foundAttributes);
58+
case AbstractAggregateExec agg -> transformAggregateExec(agg, foundAttributes);
5959
case FieldExtractExec fieldExtractExec -> transformFieldExtractExec(fieldExtractExec, foundAttributes);
6060
default -> exec;
6161
});
6262
}
6363

64-
private static Set<Attribute> findSpatialShapeBoundsAttributes(AggregateExec aggregate, LocalPhysicalOptimizerContext ctx) {
64+
private static Set<Attribute> findSpatialShapeBoundsAttributes(AbstractAggregateExec aggregate, LocalPhysicalOptimizerContext ctx) {
6565
var foundAttributes = new HashSet<Attribute>();
6666
aggregate.transformDown(UnaryExec.class, exec -> {
6767
switch (exec) {
68-
case AggregateExec agg -> {
68+
case AbstractAggregateExec agg -> {
6969
List<AggregateFunction> aggregateFunctions = agg.aggregates()
7070
.stream()
7171
.flatMap(e -> SpatialShapeBoundsExtraction.extractAggregateFunction(e).stream())
@@ -110,7 +110,7 @@ private static PhysicalPlan transformFieldExtractExec(FieldExtractExec fieldExtr
110110
return fieldExtractExec.withBoundsAttributes(boundsAttributes);
111111
}
112112

113-
private static PhysicalPlan transformAggregateExec(AggregateExec agg, Set<Attribute> foundAttributes) {
113+
private static PhysicalPlan transformAggregateExec(AbstractAggregateExec agg, Set<Attribute> foundAttributes) {
114114
return agg.transformExpressionsDown(
115115
SpatialExtent.class,
116116
spatialExtent -> foundAttributes.contains(spatialExtent.field())

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@
9090
import org.elasticsearch.xpack.esql.inference.rerank.RerankOperator;
9191
import org.elasticsearch.xpack.esql.plan.logical.Fork;
9292
import org.elasticsearch.xpack.esql.plan.physical.AbstractAggregateExec;
93-
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
9493
import org.elasticsearch.xpack.esql.plan.physical.ChangePointExec;
9594
import org.elasticsearch.xpack.esql.plan.physical.DissectExec;
9695
import org.elasticsearch.xpack.esql.plan.physical.EnrichExec;
@@ -213,7 +212,7 @@ public LocalExecutionPlan plan(String description, FoldContext foldCtx, Physical
213212

214213
// workaround for https://github.com/elastic/elasticsearch/issues/99782
215214
localPhysicalPlan = localPhysicalPlan.transformUp(
216-
AggregateExec.class,
215+
AbstractAggregateExec.class,
217216
a -> a.getMode() == AggregatorMode.FINAL ? new ProjectExec(a.source(), a, Expressions.asAttributes(a.aggregates())) : a
218217
);
219218
PhysicalOperation physicalOperation = plan(localPhysicalPlan, context);
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.plan.logical;
9+
10+
import org.elasticsearch.xpack.esql.core.expression.Expression;
11+
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
12+
import org.elasticsearch.xpack.esql.core.tree.Source;
13+
import org.elasticsearch.xpack.esql.expression.Order;
14+
import org.elasticsearch.xpack.esql.expression.function.FieldAttributeTests;
15+
16+
import java.io.IOException;
17+
import java.util.ArrayList;
18+
import java.util.List;
19+
20+
public class TopNAggregateSerializationTests extends AbstractLogicalPlanSerializationTests<TopNAggregate> {
21+
@Override
22+
protected TopNAggregate createTestInstance() {
23+
Source source = randomSource();
24+
LogicalPlan child = randomChild(0);
25+
List<Expression> groupings = randomFieldAttributes(0, 5, false).stream().map(a -> (Expression) a).toList();
26+
List<? extends NamedExpression> aggregates = AggregateSerializationTests.randomAggregates();
27+
List<Order> order = randomOrder();
28+
Expression limit = FieldAttributeTests.createFieldAttribute(1, true);
29+
30+
return new TopNAggregate(source, child, groupings, aggregates, order, limit);
31+
}
32+
33+
public static List<Order> randomOrder() {
34+
int size = between(1, 5);
35+
List<Order> result = new ArrayList<>(size);
36+
for (int i = 0; i < size; i++) {
37+
Expression field = FieldAttributeTests.createFieldAttribute(1, true);
38+
Order.OrderDirection direction = randomFrom(Order.OrderDirection.values());
39+
Order.NullsPosition nullsPosition = randomFrom(Order.NullsPosition.values());
40+
result.add(new Order(randomSource(), field, direction, nullsPosition));
41+
}
42+
return result;
43+
}
44+
45+
@Override
46+
protected TopNAggregate mutateInstance(TopNAggregate instance) throws IOException {
47+
LogicalPlan child = instance.child();
48+
List<Expression> groupings = instance.groupings();
49+
List<? extends NamedExpression> aggregates = instance.aggregates();
50+
List<Order> order = instance.order();
51+
Expression limit = instance.limit();
52+
switch (between(0, 4)) {
53+
case 0 -> child = randomValueOtherThan(child, () -> randomChild(0));
54+
case 1 -> groupings = randomValueOtherThan(
55+
groupings,
56+
() -> randomFieldAttributes(0, 5, false).stream().map(a -> (Expression) a).toList()
57+
);
58+
case 2 -> aggregates = randomValueOtherThan(aggregates, AggregateSerializationTests::randomAggregates);
59+
case 3 -> order = randomValueOtherThan(order, TopNAggregateSerializationTests::randomOrder);
60+
case 4 -> limit = randomValueOtherThan(limit, () -> FieldAttributeTests.createFieldAttribute(1, true));
61+
}
62+
return new TopNAggregate(instance.source(), child, groupings, aggregates, order, limit);
63+
}
64+
65+
@Override
66+
protected boolean alwaysEmptySource() {
67+
return true;
68+
}
69+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.plan.physical;
9+
10+
import org.elasticsearch.compute.aggregation.AggregatorMode;
11+
import org.elasticsearch.xpack.esql.core.expression.Attribute;
12+
import org.elasticsearch.xpack.esql.core.expression.Expression;
13+
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
14+
import org.elasticsearch.xpack.esql.core.tree.Source;
15+
import org.elasticsearch.xpack.esql.expression.Order;
16+
import org.elasticsearch.xpack.esql.expression.function.FieldAttributeTests;
17+
import org.elasticsearch.xpack.esql.plan.logical.AggregateSerializationTests;
18+
import org.elasticsearch.xpack.esql.plan.logical.TopNAggregateSerializationTests;
19+
20+
import java.io.IOException;
21+
import java.util.List;
22+
23+
public class TopNAggregateExecSerializationTests extends AbstractPhysicalPlanSerializationTests<TopNAggregateExec> {
24+
@Override
25+
protected TopNAggregateExec createTestInstance() {
26+
Source source = randomSource();
27+
PhysicalPlan child = randomChild(0);
28+
List<Expression> groupings = randomFieldAttributes(0, 5, false).stream().map(a -> (Expression) a).toList();
29+
List<? extends NamedExpression> aggregates = AggregateSerializationTests.randomAggregates();
30+
AggregatorMode mode = randomFrom(AggregatorMode.values());
31+
List<Attribute> intermediateAttributes = randomFieldAttributes(0, 5, false);
32+
Integer estimatedRowSize = randomEstimatedRowSize();
33+
List<Order> order = TopNAggregateSerializationTests.randomOrder();
34+
Expression limit = FieldAttributeTests.createFieldAttribute(1, true);
35+
36+
return new TopNAggregateExec(source, child, groupings, aggregates, mode, intermediateAttributes, estimatedRowSize, order, limit);
37+
}
38+
39+
@Override
40+
protected TopNAggregateExec mutateInstance(TopNAggregateExec instance) throws IOException {
41+
PhysicalPlan child = instance.child();
42+
List<? extends Expression> groupings = instance.groupings();
43+
List<? extends NamedExpression> aggregates = instance.aggregates();
44+
List<Attribute> intermediateAttributes = instance.intermediateAttributes();
45+
AggregatorMode mode = instance.getMode();
46+
Integer estimatedRowSize = instance.estimatedRowSize();
47+
List<Order> order = instance.order();
48+
Expression limit = instance.limit();
49+
switch (between(0, 7)) {
50+
case 0 -> child = randomValueOtherThan(child, () -> randomChild(0));
51+
case 1 -> groupings = randomValueOtherThan(groupings, () -> randomFieldAttributes(0, 5, false));
52+
case 2 -> aggregates = randomValueOtherThan(aggregates, AggregateSerializationTests::randomAggregates);
53+
case 3 -> mode = randomValueOtherThan(mode, () -> randomFrom(AggregatorMode.values()));
54+
case 4 -> intermediateAttributes = randomValueOtherThan(intermediateAttributes, () -> randomFieldAttributes(0, 5, false));
55+
case 5 -> estimatedRowSize = randomValueOtherThan(
56+
estimatedRowSize,
57+
AbstractPhysicalPlanSerializationTests::randomEstimatedRowSize
58+
);
59+
case 6 -> {
60+
order = randomValueOtherThan(order, TopNAggregateSerializationTests::randomOrder);
61+
}
62+
case 7 -> {
63+
limit = FieldAttributeTests.createFieldAttribute(1, true);
64+
}
65+
default -> throw new IllegalStateException();
66+
}
67+
return new TopNAggregateExec(
68+
instance.source(),
69+
child,
70+
groupings,
71+
aggregates,
72+
mode,
73+
intermediateAttributes,
74+
estimatedRowSize,
75+
order,
76+
limit
77+
);
78+
}
79+
80+
@Override
81+
protected boolean alwaysEmptySource() {
82+
return true;
83+
}
84+
}

0 commit comments

Comments
 (0)