diff --git a/docs/changelog/129370.yaml b/docs/changelog/129370.yaml new file mode 100644 index 0000000000000..73d1c25f4b34c --- /dev/null +++ b/docs/changelog/129370.yaml @@ -0,0 +1,7 @@ +pr: 129370 +summary: Avoid dropping aggregate groupings in local plans +area: ES|QL +type: bug +issues: + - 129811 + - 128054 diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java index de9eb166688f9..c9700f9a6979e 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java @@ -1454,6 +1454,39 @@ public void testQueryOnEmptyDataIndex() { } } + public void testGroupingStatsOnMissingFields() { + assertAcked(client().admin().indices().prepareCreate("missing_field_index").setMapping("data", "type=long")); + long oneValue = between(1, 1000); + indexDoc("missing_field_index", "1", "data", oneValue); + refresh("missing_field_index"); + QueryPragmas pragmas = randomPragmas(); + pragmas = new QueryPragmas( + Settings.builder().put(pragmas.getSettings()).put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), 1).build() + ); + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM missing_field_index,test | STATS s = sum(data) BY color, tag | SORT color"); + request.pragmas(pragmas); + try (var r = run(request)) { + var rows = getValuesList(r); + assertThat(rows, hasSize(4)); + for (List row : rows) { + assertThat(row, hasSize(3)); + } + assertThat(rows.get(0).get(0), equalTo(20L)); + assertThat(rows.get(0).get(1), equalTo("blue")); + assertNull(rows.get(0).get(2)); + assertThat(rows.get(1).get(0), equalTo(10L)); + assertThat(rows.get(1).get(1), equalTo("green")); + assertNull(rows.get(1).get(2)); + assertThat(rows.get(2).get(0), equalTo(30L)); + assertThat(rows.get(2).get(1), equalTo("red")); + assertNull(rows.get(2).get(2)); + assertThat(rows.get(3).get(0), equalTo(oneValue)); + assertNull(rows.get(3).get(1)); + assertNull(rows.get(3).get(2)); + } + } + private void assertEmptyIndexQueries(String from) { try (EsqlQueryResponse resp = run(from + "METADATA _source | KEEP _source | LIMIT 1")) { assertFalse(resp.values().hasNext()); @@ -1588,6 +1621,8 @@ private void createAndPopulateIndex(String indexName, Settings additionalSetting "time", "type=long", "color", + "type=keyword", + "tag", "type=keyword" ) ); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java index 3da07e9485af7..526370322ad60 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java @@ -29,7 +29,8 @@ * This class is part of the planner. Data node level logical optimizations. At this point we have access to * {@link org.elasticsearch.xpack.esql.stats.SearchStats} which provides access to metadata about the index. * - *

NB: This class also reapplies all the rules from {@link LogicalPlanOptimizer#operators()} and {@link LogicalPlanOptimizer#cleanup()} + *

NB: This class also reapplies all the rules from {@link LogicalPlanOptimizer#operators(boolean)} + * and {@link LogicalPlanOptimizer#cleanup()} */ public class LocalLogicalPlanOptimizer extends ParameterizedRuleExecutor { @@ -51,7 +52,7 @@ protected List> batches() { var rules = new ArrayList>(); rules.add(local); // TODO: if the local rules haven't touched the tree, the rest of the rules can be skipped - rules.addAll(asList(operators(), cleanup())); + rules.addAll(asList(operators(true), cleanup())); return replaceRules(rules); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java index 5007b011092f0..50b23a7c2e23a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java @@ -77,7 +77,7 @@ *

  • The {@link LogicalPlanOptimizer#substitutions()} phase rewrites things to expand out shorthand in the syntax. For example, * a nested expression embedded in a stats gets replaced with an eval followed by a stats, followed by another eval. This phase * also applies surrogates, such as replacing an average with a sum divided by a count.
  • - *
  • {@link LogicalPlanOptimizer#operators()} (NB: The word "operator" is extremely overloaded and referrers to many different + *
  • {@link LogicalPlanOptimizer#operators(boolean)} (NB: The word "operator" is extremely overloaded and referrers to many different * things.) transform the tree in various different ways. This includes folding (i.e. computing constant expressions at parse * time), combining expressions, dropping redundant clauses, and some normalization such as putting literals on the right whenever * possible. These rules are run in a loop until none of the rules make any changes to the plan (there is also a safety shut off @@ -85,7 +85,7 @@ *
  • {@link LogicalPlanOptimizer#cleanup()} Which can replace sorts+limit with a TopN
  • * * - *

    Note that the {@link LogicalPlanOptimizer#operators()} and {@link LogicalPlanOptimizer#cleanup()} steps are reapplied at the + *

    Note that the {@link LogicalPlanOptimizer#operators(boolean)} and {@link LogicalPlanOptimizer#cleanup()} steps are reapplied at the * {@link LocalLogicalPlanOptimizer} layer.

    */ public class LogicalPlanOptimizer extends ParameterizedRuleExecutor { @@ -117,7 +117,7 @@ protected static List> rules() { var defaultTopN = new Batch<>("Add default TopN", new AddDefaultTopN()); var label = new Batch<>("Set as Optimized", Limiter.ONCE, new SetAsOptimized()); - return asList(substitutions(), operators(), skip, cleanup(), defaultTopN, label); + return asList(substitutions(), operators(false), skip, cleanup(), defaultTopN, label); } protected static Batch substitutions() { @@ -150,10 +150,10 @@ protected static Batch substitutions() { ); } - protected static Batch operators() { + protected static Batch operators(boolean local) { return new Batch<>( "Operator Optimization", - new CombineProjections(), + new CombineProjections(local), new CombineEvals(), new PruneEmptyPlans(), new PropagateEmptyRelation(), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/CombineProjections.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/CombineProjections.java index 1c256012baeb0..57c33d1fb2ddc 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/CombineProjections.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/CombineProjections.java @@ -15,18 +15,26 @@ import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.Expressions; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; +import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize; import org.elasticsearch.xpack.esql.plan.logical.Aggregate; +import org.elasticsearch.xpack.esql.plan.logical.Eval; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.Project; import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; public final class CombineProjections extends OptimizerRules.OptimizerRule { + // don't drop groupings from a local plan, as the layout has already been agreed upon + private final boolean local; - public CombineProjections() { + public CombineProjections(boolean local) { super(OptimizerRules.TransformDirection.UP); + this.local = local; } @Override @@ -57,26 +65,89 @@ protected LogicalPlan rule(UnaryPlan plan) { return plan; } - // Agg with underlying Project (group by on sub-queries) - if (plan instanceof Aggregate a) { - if (child instanceof Project p) { - var groupings = a.groupings(); - List groupingAttrs = new ArrayList<>(a.groupings().size()); - for (Expression grouping : groupings) { - if (grouping instanceof Attribute attribute) { - groupingAttrs.add(attribute); + if (plan instanceof Aggregate a && child instanceof Project p) { + var groupings = a.groupings(); + + // sanity checks + for (Expression grouping : groupings) { + if ((grouping instanceof Attribute || grouping instanceof Alias as && as.child() instanceof Categorize) == false) { + // After applying ReplaceAggregateNestedExpressionWithEval, + // evaluatable groupings can only contain attributes. + throw new EsqlIllegalArgumentException("Expected an attribute or grouping function, got {}", grouping); + } + } + assert groupings.size() <= 1 + || groupings.stream().anyMatch(group -> group.anyMatch(expr -> expr instanceof Categorize)) == false + : "CombineProjections only tested with a single CATEGORIZE with no additional groups"; + + // Collect the alias map for resolving the source (f1 = 1, f2 = f1, etc..) + AttributeMap.Builder aliasesBuilder = AttributeMap.builder(); + for (NamedExpression ne : p.projections()) { + // Record the aliases. + // Projections are just aliases for attributes, so casting is safe. + aliasesBuilder.put(ne.toAttribute(), (Attribute) Alias.unwrap(ne)); + } + var aliases = aliasesBuilder.build(); + + // Propagate any renames from the lower projection into the upper groupings. + List resolvedGroupings = new ArrayList<>(); + for (Expression grouping : groupings) { + Expression transformed = grouping.transformUp(Attribute.class, as -> aliases.resolve(as, as)); + resolvedGroupings.add(transformed); + } + + // This can lead to duplicates in the groupings: e.g. + // | EVAL x = y | STATS ... BY x, y + if (local) { + // On the data node, the groupings must be preserved because they affect the physical output (see + // AbstractPhysicalOperationProviders#intermediateAttributes). + // In case that propagating the lower projection leads to duplicates in the resolved groupings, we'll leave an Eval in place + // of the original projection to create new attributes for the duplicate groups. + Set seenResolvedGroupings = new HashSet<>(resolvedGroupings.size()); + List newGroupings = new ArrayList<>(); + List aliasesAgainstDuplication = new ArrayList<>(); + + for (int i = 0; i < groupings.size(); i++) { + Expression resolvedGrouping = resolvedGroupings.get(i); + if (seenResolvedGroupings.add(resolvedGrouping)) { + newGroupings.add(resolvedGrouping); } else { - // After applying ReplaceAggregateNestedExpressionWithEval, groupings can only contain attributes. - throw new EsqlIllegalArgumentException("Expected an Attribute, got {}", grouping); + // resolving the renames leads to a duplicate here - we need to alias the underlying attribute this refers to. + // should really only be 1 attribute, anyway, but going via .references() includes the case of a + // GroupingFunction.NonEvaluatableGroupingFunction. + Attribute coreAttribute = resolvedGrouping.references().iterator().next(); + + Alias renameAgainstDuplication = new Alias( + coreAttribute.source(), + TemporaryNameUtils.locallyUniqueTemporaryName(coreAttribute.name(), "temp_name"), + coreAttribute + ); + aliasesAgainstDuplication.add(renameAgainstDuplication); + + // propagate the new alias into the new grouping + AttributeMap.Builder resolverBuilder = AttributeMap.builder(); + resolverBuilder.put(coreAttribute, renameAgainstDuplication.toAttribute()); + AttributeMap resolver = resolverBuilder.build(); + + newGroupings.add(resolvedGrouping.transformUp(Attribute.class, attr -> resolver.resolve(attr, attr))); } } - plan = new Aggregate( - a.source(), - p.child(), - a.aggregateType(), - combineUpperGroupingsAndLowerProjections(groupingAttrs, p.projections()), - combineProjections(a.aggregates(), p.projections()) - ); + + LogicalPlan newChild = aliasesAgainstDuplication.isEmpty() + ? p.child() + : new Eval(p.source(), p.child(), aliasesAgainstDuplication); + plan = a.with(newChild, newGroupings, combineProjections(a.aggregates(), p.projections())); + } else { + // On the coordinator, we can just discard the duplicates. + // All substitutions happen before; groupings must be attributes at this point except for non-evaluatable groupings which + // will be an alias like `c = CATEGORIZE(attribute)`. + // Due to such aliases, we can't use an AttributeSet to deduplicate. But we can use a regular set to deduplicate based on + // regular equality (i.e. based on names) instead of name ids. + // TODO: The deduplication based on simple equality will be insufficient in case of multiple non-evaluatable groupings, e.g. + // for `| EVAL x = y | STATS ... BY CATEGORIZE(x), CATEGORIZE(y)`. That will require semantic equality instead. Also + // applies in the local case below. + List newGroupings = new ArrayList<>(new LinkedHashSet<>(resolvedGroupings)); + plan = a.with(p.child(), newGroupings, combineProjections(a.aggregates(), p.projections())); } } @@ -136,26 +207,6 @@ private static List combineProjections(List combineUpperGroupingsAndLowerProjections( - List upperGroupings, - List lowerProjections - ) { - // Collect the alias map for resolving the source (f1 = 1, f2 = f1, etc..) - AttributeMap aliases = new AttributeMap<>(); - for (NamedExpression ne : lowerProjections) { - // Projections are just aliases for attributes, so casting is safe. - aliases.put(ne.toAttribute(), (Attribute) Alias.unwrap(ne)); - } - - // Replace any matching attribute directly with the aliased attribute from the projection. - AttributeSet replaced = new AttributeSet(); - for (Attribute attr : upperGroupings) { - // All substitutions happen before; groupings must be attributes at this point. - replaced.add(aliases.resolve(attr, attr)); - } - return new ArrayList<>(replaced); - } - /** * Replace grouping alias previously contained in the aggregations that might have been projected away. */ diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java index a16da54354637..c3a74926b279a 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.xpack.esql.core.expression.Expressions; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.expression.Literal; +import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute; import org.elasticsearch.xpack.esql.core.expression.predicate.logical.And; import org.elasticsearch.xpack.esql.core.expression.predicate.nulls.IsNotNull; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; @@ -36,6 +37,7 @@ import org.elasticsearch.xpack.esql.index.IndexResolution; import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.InferIsNotNull; import org.elasticsearch.xpack.esql.parser.EsqlParser; +import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; import org.elasticsearch.xpack.esql.plan.logical.Eval; import org.elasticsearch.xpack.esql.plan.logical.Filter; @@ -73,6 +75,7 @@ import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning; import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; @@ -587,6 +590,36 @@ public void testIsNotNullOnCase_With_IS_NULL() { var source = as(filter.child(), EsRelation.class); } + /** + * \_Aggregate[[first_name{r}#7, $$first_name$temp_name$17{r}#18],[SUM(salary{f}#11,true[BOOLEAN]) AS SUM(salary)#5, first_nam + * e{r}#7, first_name{r}#7 AS last_name#10]] + * \_Eval[[null[KEYWORD] AS first_name#7, null[KEYWORD] AS $$first_name$temp_name$17#18]] + * \_EsRelation[test][_meta_field{f}#12, emp_no{f}#6, first_name{f}#7, ge..] + */ + public void testGroupingByMissingFields() { + var plan = plan("FROM test | STATS SUM(salary) BY first_name, last_name"); + var testStats = statsForMissingField("first_name", "last_name"); + var localPlan = localPlan(plan, testStats); + Limit limit = as(localPlan, Limit.class); + Aggregate aggregate = as(limit.child(), Aggregate.class); + assertThat(aggregate.groupings(), hasSize(2)); + ReferenceAttribute grouping1 = as(aggregate.groupings().get(0), ReferenceAttribute.class); + ReferenceAttribute grouping2 = as(aggregate.groupings().get(1), ReferenceAttribute.class); + Eval eval = as(aggregate.child(), Eval.class); + assertThat(eval.fields(), hasSize(2)); + Alias eval1 = eval.fields().get(0); + Literal literal1 = as(eval1.child(), Literal.class); + assertNull(literal1.value()); + assertThat(literal1.dataType(), is(DataType.KEYWORD)); + Alias eval2 = eval.fields().get(1); + Literal literal2 = as(eval2.child(), Literal.class); + assertNull(literal2.value()); + assertThat(literal2.dataType(), is(DataType.KEYWORD)); + assertThat(grouping1.id(), equalTo(eval1.id())); + assertThat(grouping2.id(), equalTo(eval2.id())); + as(eval.child(), EsRelation.class); + } + private IsNotNull isNotNull(Expression field) { return new IsNotNull(EMPTY, field); }