Skip to content

Commit 2513135

Browse files
authored
Move logic for SINGLE aggs into mapper, directly (#135780)
Follow-up to #131485. Interestingly, the physical plan optimizer tests seem to pick up more cases when we map to a SINGLE agg directly.
1 parent 2a47de4 commit 2513135

File tree

4 files changed

+24
-72
lines changed

4 files changed

+24
-72
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.elasticsearch.xpack.esql.common.Failures;
1212
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1313
import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns;
14-
import org.elasticsearch.xpack.esql.optimizer.rules.physical.SinglePhaseAggregate;
1514
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
1615
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
1716
import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor;
@@ -26,8 +25,7 @@
2625
public class PhysicalPlanOptimizer extends ParameterizedRuleExecutor<PhysicalPlan, PhysicalOptimizerContext> {
2726

2827
private static final List<RuleExecutor.Batch<PhysicalPlan>> RULES = List.of(
29-
new Batch<>("Plan Boundary", Limiter.ONCE, new ProjectAwayColumns()),
30-
new Batch<>("Single aggregation", Limiter.ONCE, new SinglePhaseAggregate())
28+
new Batch<>("Plan Boundary", Limiter.ONCE, new ProjectAwayColumns())
3129
);
3230

3331
private final PhysicalVerifier verifier = PhysicalVerifier.INSTANCE;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/SinglePhaseAggregate.java

Lines changed: 0 additions & 36 deletions
This file was deleted.

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
1313
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1414
import org.elasticsearch.xpack.esql.core.util.Holder;
15+
import org.elasticsearch.xpack.esql.expression.function.grouping.GroupingFunction;
1516
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
1617
import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
1718
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
@@ -160,12 +161,16 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
160161
if (mappedChild instanceof ExchangeExec exchange) {
161162
mappedChild = new ExchangeExec(mappedChild.source(), intermediate, true, exchange.child());
162163
}
163-
// if no exchange was added (aggregation happening on the coordinator), create the initial agg
164-
else {
165-
mappedChild = MapperUtils.aggExec(aggregate, mappedChild, AggregatorMode.INITIAL, intermediate);
166-
}
164+
// if no exchange was added (aggregation happening on the coordinator), try to only create a single-pass agg
165+
else if (aggregate.groupings()
166+
.stream()
167+
.noneMatch(group -> group.anyMatch(expr -> expr instanceof GroupingFunction.NonEvaluatableGroupingFunction))) {
168+
return MapperUtils.aggExec(aggregate, mappedChild, AggregatorMode.SINGLE, intermediate);
169+
} else {
170+
mappedChild = MapperUtils.aggExec(aggregate, mappedChild, AggregatorMode.INITIAL, intermediate);
171+
}
167172

168-
// always add the final/reduction agg
173+
// The final/reduction agg
169174
return MapperUtils.aggExec(aggregate, mappedChild, AggregatorMode.FINAL, intermediate);
170175
}
171176

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3798,11 +3798,7 @@ public void testSpatialTypesAndStatsUseDocValuesNestedLiteral() {
37983798

37993799
var limit = as(plan, LimitExec.class);
38003800
var agg = as(limit.child(), AggregateExec.class);
3801-
assertThat("Aggregation is FINAL", agg.getMode(), equalTo(FINAL));
3802-
assertThat("No groupings in aggregation", agg.groupings().size(), equalTo(0));
3803-
assertAggregation(agg, "centroid", SpatialCentroid.class, GEO_POINT, FieldExtractPreference.NONE);
3804-
agg = as(agg.child(), AggregateExec.class);
3805-
assertThat("Aggregation is PARTIAL", agg.getMode(), equalTo(INITIAL));
3801+
assertThat("Aggregation is SINGLE", agg.getMode(), equalTo(SINGLE));
38063802
assertThat("No groupings in aggregation", agg.groupings().size(), equalTo(0));
38073803
assertAggregation(agg, "centroid", SpatialCentroid.class, GEO_POINT, FieldExtractPreference.NONE);
38083804
var eval = as(agg.child(), EvalExec.class);
@@ -4101,12 +4097,7 @@ public void testSpatialTypesAndStatsUseDocValuesMultiAggregationsGroupedAggregat
41014097

41024098
var limit = as(plan, LimitExec.class);
41034099
var agg = as(limit.child(), AggregateExec.class);
4104-
assertThat("Aggregation is FINAL", agg.getMode(), equalTo(FINAL));
4105-
assertThat("No groupings in aggregation", agg.groupings().size(), equalTo(0));
4106-
assertAggregation(agg, "count", Sum.class);
4107-
assertAggregation(agg, "centroid", SpatialCentroid.class, GEO_POINT, FieldExtractPreference.NONE);
4108-
agg = as(agg.child(), AggregateExec.class);
4109-
assertThat("Aggregation is PARTIAL", agg.getMode(), equalTo(INITIAL));
4100+
assertThat("Aggregation is SINGLE", agg.getMode(), equalTo(SINGLE));
41104101
assertThat("No groupings in aggregation", agg.groupings().size(), equalTo(0));
41114102
assertAggregation(agg, "count", Sum.class);
41124103
assertAggregation(agg, "centroid", SpatialCentroid.class, GEO_POINT, FieldExtractPreference.NONE);
@@ -6897,11 +6888,9 @@ public void testEnrichBeforeAggregation() {
68976888
| ENRICH _coordinator:departments
68986889
| STATS size=count(*) BY department""");
68996890
var limit = as(plan, LimitExec.class);
6900-
var finalAggs = as(limit.child(), AggregateExec.class);
6901-
assertThat(finalAggs.getMode(), equalTo(FINAL));
6902-
var partialAggs = as(finalAggs.child(), AggregateExec.class);
6903-
assertThat(partialAggs.getMode(), equalTo(INITIAL));
6904-
var enrich = as(partialAggs.child(), EnrichExec.class);
6891+
var aggs = as(limit.child(), AggregateExec.class);
6892+
assertThat(aggs.getMode(), equalTo(SINGLE));
6893+
var enrich = as(aggs.child(), EnrichExec.class);
69056894
assertThat(enrich.mode(), equalTo(Enrich.Mode.COORDINATOR));
69066895
assertThat(enrich.concreteIndices(), equalTo(Map.of("", ".enrich-departments-3")));
69076896
var exchange = as(enrich.child(), ExchangeExec.class);
@@ -7228,9 +7217,8 @@ public void testManyEnrich() {
72287217
| STATS teams=count(*) BY supervisor
72297218
""");
72307219
var limit = as(plan, LimitExec.class);
7231-
var finalAgg = as(limit.child(), AggregateExec.class);
7232-
var partialAgg = as(finalAgg.child(), AggregateExec.class);
7233-
var enrich1 = as(partialAgg.child(), EnrichExec.class);
7220+
var agg = as(limit.child(), AggregateExec.class);
7221+
var enrich1 = as(agg.child(), EnrichExec.class);
72347222
assertThat(enrich1.policyName(), equalTo("supervisors"));
72357223
assertThat(enrich1.mode(), equalTo(Enrich.Mode.ANY));
72367224
var finalTopN = as(enrich1.child(), TopNExec.class);
@@ -7254,9 +7242,8 @@ public void testManyEnrich() {
72547242
| STATS teams=count(*) BY supervisor
72557243
""");
72567244
var limit = as(plan, LimitExec.class);
7257-
var finalAgg = as(limit.child(), AggregateExec.class);
7258-
var partialAgg = as(finalAgg.child(), AggregateExec.class);
7259-
var enrich1 = as(partialAgg.child(), EnrichExec.class);
7245+
var agg = as(limit.child(), AggregateExec.class);
7246+
var enrich1 = as(agg.child(), EnrichExec.class);
72607247
assertThat(enrich1.policyName(), equalTo("supervisors"));
72617248
assertThat(enrich1.mode(), equalTo(Enrich.Mode.COORDINATOR));
72627249
var finalTopN = as(enrich1.child(), TopNExec.class);
@@ -7280,9 +7267,8 @@ public void testManyEnrich() {
72807267
| STATS teams=count(*) BY supervisor
72817268
""");
72827269
var limit = as(plan, LimitExec.class);
7283-
var finalAgg = as(limit.child(), AggregateExec.class);
7284-
var partialAgg = as(finalAgg.child(), AggregateExec.class);
7285-
var enrich1 = as(partialAgg.child(), EnrichExec.class);
7270+
var agg = as(limit.child(), AggregateExec.class);
7271+
var enrich1 = as(agg.child(), EnrichExec.class);
72867272
assertThat(enrich1.policyName(), equalTo("supervisors"));
72877273
assertThat(enrich1.mode(), equalTo(Enrich.Mode.ANY));
72887274
var topN = as(enrich1.child(), TopNExec.class);
@@ -7305,9 +7291,8 @@ public void testManyEnrich() {
73057291
| STATS teams=count(*) BY supervisor
73067292
""");
73077293
var limit = as(plan, LimitExec.class);
7308-
var finalAgg = as(limit.child(), AggregateExec.class);
7309-
var partialAgg = as(finalAgg.child(), AggregateExec.class);
7310-
var enrich1 = as(partialAgg.child(), EnrichExec.class);
7294+
var agg = as(limit.child(), AggregateExec.class);
7295+
var enrich1 = as(agg.child(), EnrichExec.class);
73117296
assertThat(enrich1.policyName(), equalTo("supervisors"));
73127297
assertThat(enrich1.mode(), equalTo(Enrich.Mode.ANY));
73137298
var topN = as(enrich1.child(), TopNExec.class);

0 commit comments

Comments
 (0)