Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.SinglePhaseAggregate;
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor;
Expand All @@ -26,8 +25,7 @@
public class PhysicalPlanOptimizer extends ParameterizedRuleExecutor<PhysicalPlan, PhysicalOptimizerContext> {

private static final List<RuleExecutor.Batch<PhysicalPlan>> RULES = List.of(
new Batch<>("Plan Boundary", Limiter.ONCE, new ProjectAwayColumns()),
new Batch<>("Single aggregation", Limiter.ONCE, new SinglePhaseAggregate())
new Batch<>("Plan Boundary", Limiter.ONCE, new ProjectAwayColumns())
);

private final PhysicalVerifier verifier = PhysicalVerifier.INSTANCE;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.expression.function.grouping.GroupingFunction;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
Expand Down Expand Up @@ -160,12 +161,16 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
if (mappedChild instanceof ExchangeExec exchange) {
mappedChild = new ExchangeExec(mappedChild.source(), intermediate, true, exchange.child());
}
// if no exchange was added (aggregation happening on the coordinator), create the initial agg
else {
mappedChild = MapperUtils.aggExec(aggregate, mappedChild, AggregatorMode.INITIAL, intermediate);
}
// if no exchange was added (aggregation happening on the coordinator), try to only create a single-pass agg
else if (aggregate.groupings()
.stream()
.noneMatch(group -> group.anyMatch(expr -> expr instanceof GroupingFunction.NonEvaluatableGroupingFunction))) {
return MapperUtils.aggExec(aggregate, mappedChild, AggregatorMode.SINGLE, intermediate);
} else {
mappedChild = MapperUtils.aggExec(aggregate, mappedChild, AggregatorMode.INITIAL, intermediate);
}

// always add the final/reduction agg
// The final/reduction agg
return MapperUtils.aggExec(aggregate, mappedChild, AggregatorMode.FINAL, intermediate);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3798,11 +3798,7 @@ public void testSpatialTypesAndStatsUseDocValuesNestedLiteral() {

var limit = as(plan, LimitExec.class);
var agg = as(limit.child(), AggregateExec.class);
assertThat("Aggregation is FINAL", agg.getMode(), equalTo(FINAL));
assertThat("No groupings in aggregation", agg.groupings().size(), equalTo(0));
assertAggregation(agg, "centroid", SpatialCentroid.class, GEO_POINT, FieldExtractPreference.NONE);
agg = as(agg.child(), AggregateExec.class);
assertThat("Aggregation is PARTIAL", agg.getMode(), equalTo(INITIAL));
assertThat("Aggregation is SINGLE", agg.getMode(), equalTo(SINGLE));
assertThat("No groupings in aggregation", agg.groupings().size(), equalTo(0));
assertAggregation(agg, "centroid", SpatialCentroid.class, GEO_POINT, FieldExtractPreference.NONE);
var eval = as(agg.child(), EvalExec.class);
Expand Down Expand Up @@ -4101,12 +4097,7 @@ public void testSpatialTypesAndStatsUseDocValuesMultiAggregationsGroupedAggregat

var limit = as(plan, LimitExec.class);
var agg = as(limit.child(), AggregateExec.class);
assertThat("Aggregation is FINAL", agg.getMode(), equalTo(FINAL));
assertThat("No groupings in aggregation", agg.groupings().size(), equalTo(0));
assertAggregation(agg, "count", Sum.class);
assertAggregation(agg, "centroid", SpatialCentroid.class, GEO_POINT, FieldExtractPreference.NONE);
agg = as(agg.child(), AggregateExec.class);
assertThat("Aggregation is PARTIAL", agg.getMode(), equalTo(INITIAL));
assertThat("Aggregation is SINGLE", agg.getMode(), equalTo(SINGLE));
assertThat("No groupings in aggregation", agg.groupings().size(), equalTo(0));
assertAggregation(agg, "count", Sum.class);
assertAggregation(agg, "centroid", SpatialCentroid.class, GEO_POINT, FieldExtractPreference.NONE);
Expand Down Expand Up @@ -6897,11 +6888,9 @@ public void testEnrichBeforeAggregation() {
| ENRICH _coordinator:departments
| STATS size=count(*) BY department""");
var limit = as(plan, LimitExec.class);
var finalAggs = as(limit.child(), AggregateExec.class);
assertThat(finalAggs.getMode(), equalTo(FINAL));
var partialAggs = as(finalAggs.child(), AggregateExec.class);
assertThat(partialAggs.getMode(), equalTo(INITIAL));
var enrich = as(partialAggs.child(), EnrichExec.class);
var aggs = as(limit.child(), AggregateExec.class);
assertThat(aggs.getMode(), equalTo(SINGLE));
var enrich = as(aggs.child(), EnrichExec.class);
assertThat(enrich.mode(), equalTo(Enrich.Mode.COORDINATOR));
assertThat(enrich.concreteIndices(), equalTo(Map.of("", ".enrich-departments-3")));
var exchange = as(enrich.child(), ExchangeExec.class);
Expand Down Expand Up @@ -7228,9 +7217,8 @@ public void testManyEnrich() {
| STATS teams=count(*) BY supervisor
""");
var limit = as(plan, LimitExec.class);
var finalAgg = as(limit.child(), AggregateExec.class);
var partialAgg = as(finalAgg.child(), AggregateExec.class);
var enrich1 = as(partialAgg.child(), EnrichExec.class);
var agg = as(limit.child(), AggregateExec.class);
var enrich1 = as(agg.child(), EnrichExec.class);
assertThat(enrich1.policyName(), equalTo("supervisors"));
assertThat(enrich1.mode(), equalTo(Enrich.Mode.ANY));
var finalTopN = as(enrich1.child(), TopNExec.class);
Expand All @@ -7254,9 +7242,8 @@ public void testManyEnrich() {
| STATS teams=count(*) BY supervisor
""");
var limit = as(plan, LimitExec.class);
var finalAgg = as(limit.child(), AggregateExec.class);
var partialAgg = as(finalAgg.child(), AggregateExec.class);
var enrich1 = as(partialAgg.child(), EnrichExec.class);
var agg = as(limit.child(), AggregateExec.class);
var enrich1 = as(agg.child(), EnrichExec.class);
assertThat(enrich1.policyName(), equalTo("supervisors"));
assertThat(enrich1.mode(), equalTo(Enrich.Mode.COORDINATOR));
var finalTopN = as(enrich1.child(), TopNExec.class);
Expand All @@ -7280,9 +7267,8 @@ public void testManyEnrich() {
| STATS teams=count(*) BY supervisor
""");
var limit = as(plan, LimitExec.class);
var finalAgg = as(limit.child(), AggregateExec.class);
var partialAgg = as(finalAgg.child(), AggregateExec.class);
var enrich1 = as(partialAgg.child(), EnrichExec.class);
var agg = as(limit.child(), AggregateExec.class);
var enrich1 = as(agg.child(), EnrichExec.class);
assertThat(enrich1.policyName(), equalTo("supervisors"));
assertThat(enrich1.mode(), equalTo(Enrich.Mode.ANY));
var topN = as(enrich1.child(), TopNExec.class);
Expand All @@ -7305,9 +7291,8 @@ public void testManyEnrich() {
| STATS teams=count(*) BY supervisor
""");
var limit = as(plan, LimitExec.class);
var finalAgg = as(limit.child(), AggregateExec.class);
var partialAgg = as(finalAgg.child(), AggregateExec.class);
var enrich1 = as(partialAgg.child(), EnrichExec.class);
var agg = as(limit.child(), AggregateExec.class);
var enrich1 = as(agg.child(), EnrichExec.class);
assertThat(enrich1.policyName(), equalTo("supervisors"));
assertThat(enrich1.mode(), equalTo(Enrich.Mode.ANY));
var topN = as(enrich1.child(), TopNExec.class);
Expand Down