Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/changelog/129370.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 129370
summary: Avoid dropping aggregate groupings in local plans
area: ES|QL
type: bug
issues:
- 129811
- 128054
Original file line number Diff line number Diff line change
Expand Up @@ -1454,6 +1454,39 @@ public void testQueryOnEmptyDataIndex() {
}
}

public void testGroupingStatsOnMissingFields() {
assertAcked(client().admin().indices().prepareCreate("missing_field_index").setMapping("data", "type=long"));
long oneValue = between(1, 1000);
indexDoc("missing_field_index", "1", "data", oneValue);
refresh("missing_field_index");
QueryPragmas pragmas = randomPragmas();
pragmas = new QueryPragmas(
Settings.builder().put(pragmas.getSettings()).put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), 1).build()
);
EsqlQueryRequest request = new EsqlQueryRequest();
request.query("FROM missing_field_index,test | STATS s = sum(data) BY color, tag | SORT color");
request.pragmas(pragmas);
try (var r = run(request)) {
var rows = getValuesList(r);
assertThat(rows, hasSize(4));
for (List<Object> row : rows) {
assertThat(row, hasSize(3));
}
assertThat(rows.get(0).get(0), equalTo(20L));
assertThat(rows.get(0).get(1), equalTo("blue"));
assertNull(rows.get(0).get(2));
assertThat(rows.get(1).get(0), equalTo(10L));
assertThat(rows.get(1).get(1), equalTo("green"));
assertNull(rows.get(1).get(2));
assertThat(rows.get(2).get(0), equalTo(30L));
assertThat(rows.get(2).get(1), equalTo("red"));
assertNull(rows.get(2).get(2));
assertThat(rows.get(3).get(0), equalTo(oneValue));
assertNull(rows.get(3).get(1));
assertNull(rows.get(3).get(2));
}
}

private void assertEmptyIndexQueries(String from) {
try (EsqlQueryResponse resp = run(from + "METADATA _source | KEEP _source | LIMIT 1")) {
assertFalse(resp.values().hasNext());
Expand Down Expand Up @@ -1588,6 +1621,8 @@ private void createAndPopulateIndex(String indexName, Settings additionalSetting
"time",
"type=long",
"color",
"type=keyword",
"tag",
"type=keyword"
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
* This class is part of the planner. Data node level logical optimizations. At this point we have access to
* {@link org.elasticsearch.xpack.esql.stats.SearchStats} which provides access to metadata about the index.
*
* <p>NB: This class also reapplies all the rules from {@link LogicalPlanOptimizer#operators()} and {@link LogicalPlanOptimizer#cleanup()}
* <p>NB: This class also reapplies all the rules from {@link LogicalPlanOptimizer#operators(boolean)}
* and {@link LogicalPlanOptimizer#cleanup()}
*/
public class LocalLogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan, LocalLogicalOptimizerContext> {

Expand All @@ -51,7 +52,7 @@ protected List<Batch<LogicalPlan>> batches() {
var rules = new ArrayList<Batch<LogicalPlan>>();
rules.add(local);
// TODO: if the local rules haven't touched the tree, the rest of the rules can be skipped
rules.addAll(asList(operators(), cleanup()));
rules.addAll(asList(operators(true), cleanup()));
return replaceRules(rules);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@
* <li>The {@link LogicalPlanOptimizer#substitutions()} phase rewrites things to expand out shorthand in the syntax. For example,
* a nested expression embedded in a stats gets replaced with an eval followed by a stats, followed by another eval. This phase
* also applies surrogates, such as replacing an average with a sum divided by a count.</li>
* <li>{@link LogicalPlanOptimizer#operators()} (NB: The word "operator" is extremely overloaded and referrers to many different
* <li>{@link LogicalPlanOptimizer#operators(boolean)} (NB: The word "operator" is extremely overloaded and referrers to many different
* things.) transform the tree in various different ways. This includes folding (i.e. computing constant expressions at parse
* time), combining expressions, dropping redundant clauses, and some normalization such as putting literals on the right whenever
* possible. These rules are run in a loop until none of the rules make any changes to the plan (there is also a safety shut off
* after many iterations, although hitting that is considered a bug)</li>
* <li>{@link LogicalPlanOptimizer#cleanup()} Which can replace sorts+limit with a TopN</li>
* </ul>
*
* <p>Note that the {@link LogicalPlanOptimizer#operators()} and {@link LogicalPlanOptimizer#cleanup()} steps are reapplied at the
* <p>Note that the {@link LogicalPlanOptimizer#operators(boolean)} and {@link LogicalPlanOptimizer#cleanup()} steps are reapplied at the
* {@link LocalLogicalPlanOptimizer} layer.</p>
*/
public class LogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan, LogicalOptimizerContext> {
Expand Down Expand Up @@ -117,7 +117,7 @@ protected static List<Batch<LogicalPlan>> rules() {
var defaultTopN = new Batch<>("Add default TopN", new AddDefaultTopN());
var label = new Batch<>("Set as Optimized", Limiter.ONCE, new SetAsOptimized());

return asList(substitutions(), operators(), skip, cleanup(), defaultTopN, label);
return asList(substitutions(), operators(false), skip, cleanup(), defaultTopN, label);
}

protected static Batch<LogicalPlan> substitutions() {
Expand Down Expand Up @@ -150,10 +150,10 @@ protected static Batch<LogicalPlan> substitutions() {
);
}

protected static Batch<LogicalPlan> operators() {
protected static Batch<LogicalPlan> operators(boolean local) {
return new Batch<>(
"Operator Optimization",
new CombineProjections(),
new CombineProjections(local),
new CombineEvals(),
new PruneEmptyPlans(),
new PropagateEmptyRelation(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,26 @@
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;

public final class CombineProjections extends OptimizerRules.OptimizerRule<UnaryPlan> {
// don't drop groupings from a local plan, as the layout has already been agreed upon
private final boolean local;

public CombineProjections() {
public CombineProjections(boolean local) {
super(OptimizerRules.TransformDirection.UP);
this.local = local;
}

@Override
Expand Down Expand Up @@ -57,26 +65,89 @@ protected LogicalPlan rule(UnaryPlan plan) {
return plan;
}

// Agg with underlying Project (group by on sub-queries)
if (plan instanceof Aggregate a) {
if (child instanceof Project p) {
var groupings = a.groupings();
List<Attribute> groupingAttrs = new ArrayList<>(a.groupings().size());
for (Expression grouping : groupings) {
if (grouping instanceof Attribute attribute) {
groupingAttrs.add(attribute);
if (plan instanceof Aggregate a && child instanceof Project p) {
var groupings = a.groupings();

// sanity checks
for (Expression grouping : groupings) {
if ((grouping instanceof Attribute || grouping instanceof Alias as && as.child() instanceof Categorize) == false) {
// After applying ReplaceAggregateNestedExpressionWithEval,
// evaluatable groupings can only contain attributes.
throw new EsqlIllegalArgumentException("Expected an attribute or grouping function, got {}", grouping);
}
}
assert groupings.size() <= 1
|| groupings.stream().anyMatch(group -> group.anyMatch(expr -> expr instanceof Categorize)) == false
: "CombineProjections only tested with a single CATEGORIZE with no additional groups";

// Collect the alias map for resolving the source (f1 = 1, f2 = f1, etc..)
AttributeMap.Builder<Attribute> aliasesBuilder = AttributeMap.builder();
for (NamedExpression ne : p.projections()) {
// Record the aliases.
// Projections are just aliases for attributes, so casting is safe.
aliasesBuilder.put(ne.toAttribute(), (Attribute) Alias.unwrap(ne));
}
var aliases = aliasesBuilder.build();

// Propagate any renames from the lower projection into the upper groupings.
List<Expression> resolvedGroupings = new ArrayList<>();
for (Expression grouping : groupings) {
Expression transformed = grouping.transformUp(Attribute.class, as -> aliases.resolve(as, as));
resolvedGroupings.add(transformed);
}

// This can lead to duplicates in the groupings: e.g.
// | EVAL x = y | STATS ... BY x, y
if (local) {
// On the data node, the groupings must be preserved because they affect the physical output (see
// AbstractPhysicalOperationProviders#intermediateAttributes).
// In case that propagating the lower projection leads to duplicates in the resolved groupings, we'll leave an Eval in place
// of the original projection to create new attributes for the duplicate groups.
Set<Expression> seenResolvedGroupings = new HashSet<>(resolvedGroupings.size());
List<Expression> newGroupings = new ArrayList<>();
List<Alias> aliasesAgainstDuplication = new ArrayList<>();

for (int i = 0; i < groupings.size(); i++) {
Expression resolvedGrouping = resolvedGroupings.get(i);
if (seenResolvedGroupings.add(resolvedGrouping)) {
newGroupings.add(resolvedGrouping);
} else {
// After applying ReplaceAggregateNestedExpressionWithEval, groupings can only contain attributes.
throw new EsqlIllegalArgumentException("Expected an Attribute, got {}", grouping);
// resolving the renames leads to a duplicate here - we need to alias the underlying attribute this refers to.
// should really only be 1 attribute, anyway, but going via .references() includes the case of a
// GroupingFunction.NonEvaluatableGroupingFunction.
Attribute coreAttribute = resolvedGrouping.references().iterator().next();

Alias renameAgainstDuplication = new Alias(
coreAttribute.source(),
TemporaryNameUtils.locallyUniqueTemporaryName(coreAttribute.name(), "temp_name"),
coreAttribute
);
aliasesAgainstDuplication.add(renameAgainstDuplication);

// propagate the new alias into the new grouping
AttributeMap.Builder<Attribute> resolverBuilder = AttributeMap.builder();
resolverBuilder.put(coreAttribute, renameAgainstDuplication.toAttribute());
AttributeMap<Attribute> resolver = resolverBuilder.build();

newGroupings.add(resolvedGrouping.transformUp(Attribute.class, attr -> resolver.resolve(attr, attr)));
}
}
plan = new Aggregate(
a.source(),
p.child(),
a.aggregateType(),
combineUpperGroupingsAndLowerProjections(groupingAttrs, p.projections()),
combineProjections(a.aggregates(), p.projections())
);

LogicalPlan newChild = aliasesAgainstDuplication.isEmpty()
? p.child()
: new Eval(p.source(), p.child(), aliasesAgainstDuplication);
plan = a.with(newChild, newGroupings, combineProjections(a.aggregates(), p.projections()));
} else {
// On the coordinator, we can just discard the duplicates.
// All substitutions happen before; groupings must be attributes at this point except for non-evaluatable groupings which
// will be an alias like `c = CATEGORIZE(attribute)`.
// Due to such aliases, we can't use an AttributeSet to deduplicate. But we can use a regular set to deduplicate based on
// regular equality (i.e. based on names) instead of name ids.
// TODO: The deduplication based on simple equality will be insufficient in case of multiple non-evaluatable groupings, e.g.
// for `| EVAL x = y | STATS ... BY CATEGORIZE(x), CATEGORIZE(y)`. That will require semantic equality instead. Also
// applies in the local case below.
List<Expression> newGroupings = new ArrayList<>(new LinkedHashSet<>(resolvedGroupings));
plan = a.with(p.child(), newGroupings, combineProjections(a.aggregates(), p.projections()));
}
}

Expand Down Expand Up @@ -136,26 +207,6 @@ private static List<NamedExpression> combineProjections(List<? extends NamedExpr
return replaced;
}

private static List<Expression> combineUpperGroupingsAndLowerProjections(
List<? extends Attribute> upperGroupings,
List<? extends NamedExpression> lowerProjections
) {
// Collect the alias map for resolving the source (f1 = 1, f2 = f1, etc..)
AttributeMap<Attribute> aliases = new AttributeMap<>();
for (NamedExpression ne : lowerProjections) {
// Projections are just aliases for attributes, so casting is safe.
aliases.put(ne.toAttribute(), (Attribute) Alias.unwrap(ne));
}

// Replace any matching attribute directly with the aliased attribute from the projection.
AttributeSet replaced = new AttributeSet();
for (Attribute attr : upperGroupings) {
// All substitutions happen before; groupings must be attributes at this point.
replaced.add(aliases.resolve(attr, attr));
}
return new ArrayList<>(replaced);
}

/**
* Replace grouping alias previously contained in the aggregations that might have been projected away.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
import org.elasticsearch.xpack.esql.core.expression.predicate.logical.And;
import org.elasticsearch.xpack.esql.core.expression.predicate.nulls.IsNotNull;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
Expand All @@ -36,6 +37,7 @@
import org.elasticsearch.xpack.esql.index.IndexResolution;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.InferIsNotNull;
import org.elasticsearch.xpack.esql.parser.EsqlParser;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
Expand Down Expand Up @@ -73,6 +75,7 @@
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -587,6 +590,36 @@ public void testIsNotNullOnCase_With_IS_NULL() {
var source = as(filter.child(), EsRelation.class);
}

/**
* \_Aggregate[[first_name{r}#7, $$first_name$temp_name$17{r}#18],[SUM(salary{f}#11,true[BOOLEAN]) AS SUM(salary)#5, first_nam
* e{r}#7, first_name{r}#7 AS last_name#10]]
* \_Eval[[null[KEYWORD] AS first_name#7, null[KEYWORD] AS $$first_name$temp_name$17#18]]
* \_EsRelation[test][_meta_field{f}#12, emp_no{f}#6, first_name{f}#7, ge..]
*/
public void testGroupingByMissingFields() {
var plan = plan("FROM test | STATS SUM(salary) BY first_name, last_name");
var testStats = statsForMissingField("first_name", "last_name");
var localPlan = localPlan(plan, testStats);
Limit limit = as(localPlan, Limit.class);
Aggregate aggregate = as(limit.child(), Aggregate.class);
assertThat(aggregate.groupings(), hasSize(2));
ReferenceAttribute grouping1 = as(aggregate.groupings().get(0), ReferenceAttribute.class);
ReferenceAttribute grouping2 = as(aggregate.groupings().get(1), ReferenceAttribute.class);
Eval eval = as(aggregate.child(), Eval.class);
assertThat(eval.fields(), hasSize(2));
Alias eval1 = eval.fields().get(0);
Literal literal1 = as(eval1.child(), Literal.class);
assertNull(literal1.value());
assertThat(literal1.dataType(), is(DataType.KEYWORD));
Alias eval2 = eval.fields().get(1);
Literal literal2 = as(eval2.child(), Literal.class);
assertNull(literal2.value());
assertThat(literal2.dataType(), is(DataType.KEYWORD));
assertThat(grouping1.id(), equalTo(eval1.id()));
assertThat(grouping2.id(), equalTo(eval2.id()));
as(eval.child(), EsRelation.class);
}

private IsNotNull isNotNull(Expression field) {
return new IsNotNull(EMPTY, field);
}
Expand Down