Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/127583.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 127583
summary: Add optimization to purge join on null merge key
area: ES|QL
type: enhancement
issues:
- 125577
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public String toString() {

@Override
public String nodeString() {
return child.nodeString() + " AS " + name();
return child.nodeString() + " AS " + name() + "#" + id();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not strictly related, but not sure why we wouldn't include the id, it's easier to track which exactly reference points to an alias.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1650,3 +1650,22 @@ event_duration:long
2764889
3450233
;

nullifiedJoinKeyToPurgeTheJoin
required_capability: join_lookup_v12

FROM employees
| RENAME languages AS language_code
| SORT emp_no, language_code
| LIMIT 4
| EVAL language_code = TO_INTEGER(NULL)
| LOOKUP JOIN languages_lookup ON language_code
| KEEP emp_no, language_code, language_name
;

emp_no:integer | language_code:integer | language_name:keyword
10001 |null |null
10002 |null |null
10003 |null |null
10004 |null |null
;
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.elasticsearch.xpack.esql.optimizer.rules.logical.SubstituteSurrogateExpressions;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.SubstituteSurrogatePlans;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.TranslateTimeSeriesAggregate;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.PruneJoinOnNullMatchingField;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor;
import org.elasticsearch.xpack.esql.rule.RuleExecutor;
Expand Down Expand Up @@ -201,7 +202,8 @@ protected static Batch<LogicalPlan> operators() {
new PushDownEnrich(),
new PushDownAndCombineOrderBy(),
new PruneRedundantOrderBy(),
new PruneRedundantSortClauses()
new PruneRedundantSortClauses(),
new PruneJoinOnNullMatchingField()
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.optimizer.rules.logical.local;

import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.AttributeMap;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.join.Join;

import java.util.ArrayList;
import java.util.List;

import static org.elasticsearch.xpack.esql.core.expression.Expressions.isGuaranteedNull;
import static org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.INNER;
import static org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.LEFT;

/**
* The rule matches a plan pattern having a Join on top of a Project and/or Eval. It then checks if the join's performed on a field which
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking for this pattern is restrictive IMO, but it's also not what the rule actually does. Leftover?

Also, this is repeating some of the logic of PropagateEvalFoldables. Could they share code? That one collects aliases from the plan when they point to literals (via potentially several indirections), which this rule also does. But PropagateEvalFoldables does only 1 pass to collect all aliases, while this rule descends back into the children whenever it finds a join, forgetting about the previous resolutions.

PropagateEvalFoldables has a boolean called shouldCollect. That's looks like it could become a more general predicate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking for this pattern is restrictive IMO

It is more restrictive than I'd wish for it to be more effective, indeed, but I found no other way to detect null join keys (in stabilised plans, after all transformations). Any other command doing data transformations would need executing or "invasive" analysis to determine if a null just passes through. But happy to apply modifications if I overlooked solutions.
Non-data-transforming commands are pushed out of the way (a.t.p.).

but it's also not what the rule actually does. Leftover?

No, I think that's actually what the rule does; but maybe I misunderstood the question? :)

this is repeating some of the logic of PropagateEvalFoldables. Could they share code?

The refs collection in this rule is more restrictive than that in PropagateEvalFoldables and operates on few node types. I guess there might be a way to share code, but not sure it'll make it more legible.

But PropagateEvalFoldables does only 1 pass to collect all aliases, while this rule descends back into the children whenever it finds a join, forgetting about the previous resolutions.

Right; that's because it can only apply this rule on a specific tree pattern; if there were more nodes in-between (of different types), the rule wouldn't work.

PropagateEvalFoldables has a boolean called shouldCollect. That's looks like it could become a more general predicate?

The tree traversing would be different, though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking for this pattern is restrictive IMO, but it's also not what the rule actually does. Leftover?

Sorry, you are correct; I didn't notice the break in the switch below and thought we'd just go over all children but looking only for evals and projections in it.

The refs collection in this rule is more restrictive than that in PropagateEvalFoldables and operates on few node types. I guess there might be a way to share code, but not sure it'll make it more legible.

My point is that both rules look for chains of aliases and propagate the result into a command that (transitively) depneds on a literal. I think we should have only 1 way to do this; if the approach is correct for PropagateEvalFoldables, it should also be correct here - and if it's not, then PropagateEvalFoldables probably has a bug and we need to find a different solution.

Conceptually, the difference is that PropagateEvalFoldables just places a literal in the downstream command - while we rather wish to prune it - but that could be solved either by to place a literal into the join config (which I think is interesting anyway because that allows even more optimizations).

* is aliased to null (in type or value); if that's the case, it prunes the join, replacing it with an Eval - returning aliases to null
* for all the fields added in by the right side of the Join - plus a Project on top of it.
* The rule can apply on the coordinator already, but it's more likely to be effective on the data nodes, where null aliasing is inserted
* due to locally missing fields. This rule relies on that behavior -- see {@link ReplaceMissingFieldWithNull}.
*/
public class PruneJoinOnNullMatchingField extends OptimizerRules.OptimizerRule<Join> {

@Override
protected LogicalPlan rule(Join join) {
LogicalPlan plan = join;
var joinType = join.config().type();
if (joinType == INNER || joinType == LEFT) { // other types will have different replacement logic
AttributeMap.Builder<Expression> attributeMapBuilder = AttributeMap.builder();
loop: for (var child = join.left();; child = ((UnaryPlan) child).child()) { // cast is safe as both plans are UnaryPlans
Copy link
Contributor

@astefan astefan May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this loop you are collecting all the aliases that are contained in every Project and Eval on the left hand side of the LOOKUP until a LogicalPlan of a different type is found. Are there cases where evals and projects were not merged together already by CombineEvals and CombineProjections, respectively?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there cases where evals and projects were not merged together already by CombineEvals and CombineProjections, respectively?

Yes; but:

  • if the plan goes through multiple transformations, that's OK, this rule will apply apply as soon as the pattern Join - Project/Eval is detected; and the plan will converge to a stable state eventually.
  • the plan is most useful on the data node, where the plan has stabilised already (before taking into account the local conditions, that is).

switch (child) {
case Project project -> project.projections().forEach(projection -> {
if (projection instanceof Alias alias) {
attributeMapBuilder.put(alias.toAttribute(), alias.child());
}
});
case Eval eval -> eval.fields().forEach(alias -> attributeMapBuilder.put(alias.toAttribute(), alias.child()));
default -> {
break loop;
}
}
}
for (var attr : AttributeSet.of(join.config().matchFields())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this shouldn't go over the match fields, but over the left + right fields. I don't trust the match fields right now, as their contract is never enforced and they only exist because our Join modeling got wonky.

That said, for the current implementation it's probably correct: only for the left fields can we ever know that they will be null before running the execution.

var resolved = attributeMapBuilder.build().resolve(attr);
if (resolved != null && isGuaranteedNull(resolved)) {
plan = replaceJoin(join);
break;
}
}
}
return plan;
}

private static LogicalPlan replaceJoin(Join join) {
var joinRightOutput = join.rightOutputFields();
if (joinRightOutput.isEmpty()) { // can be empty when the join key is null and the other right side entries pruned (by an agg)
return join.left();
}
List<Alias> aliases = new ArrayList<>(joinRightOutput.size());
// TODO: cache aliases by type, à la ReplaceMissingFieldWithNull#missingToNull (tho lookup indices won't have Ks of fields)
joinRightOutput.forEach(a -> aliases.add(new Alias(a.source(), a.name(), Literal.of(a, null), a.id())));
var eval = new Eval(join.source(), join.left(), aliases);
return new Project(join.source(), eval, join.computeOutput(join.left().output(), Expressions.asAttributes(aliases)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
Expand Down Expand Up @@ -111,6 +112,7 @@
import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.unboundLogicalOptimizerContext;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultLookupResolution;
import static org.elasticsearch.xpack.esql.core.querydsl.query.Query.unscore;
import static org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec.StatsType;
import static org.hamcrest.Matchers.contains;
Expand Down Expand Up @@ -204,7 +206,14 @@ private Analyzer makeAnalyzer(String mappingFileName, EnrichResolution enrichRes
IndexResolution getIndexResult = IndexResolution.valid(test);

return new Analyzer(
new AnalyzerContext(config, new EsqlFunctionRegistry(), getIndexResult, enrichResolution, emptyInferenceResolution()),
new AnalyzerContext(
config,
new EsqlFunctionRegistry(),
getIndexResult,
defaultLookupResolution(),
enrichResolution,
emptyInferenceResolution()
),
new Verifier(new Metrics(new EsqlFunctionRegistry()), new XPackLicenseState(() -> 0L))
);
}
Expand Down Expand Up @@ -1377,6 +1386,82 @@ public void testMissingFieldsDoNotGetExtracted() {
assertThat(Expressions.names(fields), contains("_meta_field", "gender", "hire_date", "job", "job.raw", "languages", "long_noidx"));
}

/*
* LimitExec[1000[INTEGER]]
* \_AggregateExec[[language_code{r}#6],[COUNT(emp_no{f}#12,true[BOOLEAN]) AS c#11, language_code{r}#6],FINAL,[language_code{r}#6, $
* $c$count{r}#25, $$c$seen{r}#26],12]
* \_ExchangeExec[[language_code{r}#6, $$c$count{r}#25, $$c$seen{r}#26],true]
* \_AggregateExec[[languages{r}#15],[COUNT(emp_no{f}#12,true[BOOLEAN]) AS c#11, languages{r}#15 AS language_code#6],INITIAL,[langua
* ges{r}#15, $$c$count{r}#27, $$c$seen{r}#28],12]
* \_FieldExtractExec[emp_no{f}#12]<[],[]>
* \_EvalExec[[null[INTEGER] AS languages#15]]
* \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#29], limit[], sort[] estimatedRowSize[12]
*/
public void testMissingFieldsPurgesTheJoinLocally() {
var stats = EsqlTestUtils.statsForMissingField("languages");

var plan = plannerOptimizer.plan("""
from test
| keep emp_no, languages
| rename languages AS language_code
| lookup join languages_lookup ON language_code
| stats c = count(emp_no) by language_code
""", stats);

var limit = as(plan, LimitExec.class);
var agg = as(limit.child(), AggregateExec.class);
assertThat(Expressions.names(agg.output()), contains("c", "language_code"));

var exchange = as(agg.child(), ExchangeExec.class);
agg = as(exchange.child(), AggregateExec.class);
var extract = as(agg.child(), FieldExtractExec.class);
var eval = as(extract.child(), EvalExec.class);
var source = as(eval.child(), EsQueryExec.class);
}

/*
* LimitExec[1000[INTEGER]]
* \_LookupJoinExec[[language_code{r}#6],[language_code{f}#23],[language_name{f}#24]]
* |_LimitExec[1000[INTEGER]]
* | \_AggregateExec[[languages{f}#15],[COUNT(emp_no{f}#12,true[BOOLEAN]) AS c#10, languages{f}#15 AS language_code#6],FINAL,[language
* s{f}#15, $$c$count{r}#25, $$c$seen{r}#26],62]
* | \_ExchangeExec[[languages{f}#15, $$c$count{r}#25, $$c$seen{r}#26],true]
* | \_AggregateExec[[languages{r}#15],[COUNT(emp_no{f}#12,true[BOOLEAN]) AS c#10, languages{r}#15 AS language_code#6],INITIAL,
* [languages{r}#15, $$c$count{r}#27, $$c$seen{r}#28],12]
* | \_FieldExtractExec[emp_no{f}#12]<[],[]>
* | \_EvalExec[[null[INTEGER] AS languages#15]]
* | \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#29], limit[], sort[] estimatedRowSize[12]
* \_EsQueryExec[languages_lookup], indexMode[lookup], query[][_doc{f}#30], limit[], sort[] estimatedRowSize[4]
*/
public void testMissingFieldsDoesNotPurgeTheJoinOnCoordinator() {
var stats = EsqlTestUtils.statsForMissingField("languages");

// same as the query above, but with the last two lines swapped, so that the join is no longer pushed to the data nodes
var plan = plannerOptimizer.plan("""
from test
| keep emp_no, languages
| rename languages AS language_code
| stats c = count(emp_no) by language_code
| lookup join languages_lookup ON language_code
""", stats);

var limit = as(plan, LimitExec.class);
var join = as(limit.child(), LookupJoinExec.class);
limit = as(join.left(), LimitExec.class);
var agg = as(limit.child(), AggregateExec.class);
var exchange = as(agg.child(), ExchangeExec.class);
agg = as(exchange.child(), AggregateExec.class);
var extract = as(agg.child(), FieldExtractExec.class);
var eval = as(extract.child(), EvalExec.class);
var source = as(eval.child(), EsQueryExec.class);
assertThat(source.indexPattern(), is("test"));
assertThat(source.indexMode(), is(IndexMode.STANDARD));

source = as(join.right(), EsQueryExec.class);
assertThat(source.indexPattern(), is("languages_lookup"));
assertThat(source.indexMode(), is(IndexMode.LOOKUP));
}

/*
Checks that match filters are pushed down to Lucene when using no casting, for example:
WHERE first_name:"Anna")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2782,6 +2782,50 @@ public void testDescendantLimitLookupJoin() {
var localRelation = as(limitBefore.child(), LocalRelation.class);
}

/*
* EsqlProject[[emp_no{f}#9, first_name{f}#10, languages{f}#12, language_code{r}#3, language_name{r}#22]]
* \_Eval[[null[INTEGER] AS language_code#3, null[KEYWORD] AS language_name#22]]
* \_Limit[1000[INTEGER],false]
* \_EsRelation[test][_meta_field{f}#15, emp_no{f}#9, first_name{f}#10, g..]
*/
public void testPruneJoinOnNullMatchingField() {
var plan = optimizedPlan("""
from test
| eval language_code = null::integer
| keep emp_no, first_name, languages, language_code
| lookup join languages_lookup on language_code
""");

var project = as(plan, Project.class);
assertThat(Expressions.names(project.output()), contains("emp_no", "first_name", "languages", "language_code", "language_name"));
var eval = as(project.child(), Eval.class);
var limit = asLimit(eval.child(), 1000, false);
var source = as(limit.child(), EsRelation.class);
}

/*
* EsqlProject[[emp_no{f}#15, first_name{f}#16, my_null{r}#3 AS language_code#9, language_name{r}#27]]
* \_Eval[[null[INTEGER] AS my_null#3, null[KEYWORD] AS language_name#27]]
* \_Limit[1000[INTEGER],false]
* \_EsRelation[test][_meta_field{f}#21, emp_no{f}#15, first_name{f}#16, ..]
*/
public void testPruneJoinOnNullAssignedMatchingField() {
var plan = optimizedPlan("""
from test
| eval my_null = null::integer
| rename languages as language_code
| eval language_code = my_null
| lookup join languages_lookup on language_code
| keep emp_no, first_name, language_code, language_name
""");

var project = as(plan, EsqlProject.class);
assertThat(Expressions.names(project.output()), contains("emp_no", "first_name", "language_code", "language_name"));
var eval = as(project.child(), Eval.class);
var limit = asLimit(eval.child(), 1000, false);
var source = as(limit.child(), EsRelation.class);
}

private static List<String> orderNames(TopN topN) {
return topN.order().stream().map(o -> as(o.child(), NamedExpression.class).name()).toList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7666,7 +7666,7 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP
// The TopN needs an estimated row size for the planner to work
var plans = PlannerUtils.breakPlanBetweenCoordinatorAndDataNode(EstimatesRowSize.estimateRowSize(0, plan), config);
plan = useDataNodePlan ? plans.v2() : plans.v1();
plan = PlannerUtils.localPlan(List.of(), config, FoldContext.small(), plan);
plan = PlannerUtils.localPlan(config, FoldContext.small(), plan, TEST_SEARCH_STATS);
ExchangeSinkHandler exchangeSinkHandler = new ExchangeSinkHandler(null, 10, () -> 10);
LocalExecutionPlanner planner = new LocalExecutionPlanner(
"test",
Expand Down
Loading