Skip to content

Commit ac0ba24

Browse files
authored
ESQL: Push down filter passed lookup join (#118410)
Improve the planner to detect filters that can be pushed down 'through' a LOOKUP JOIN by determining the conditions scoped to the left/main side and moving them closer to the source. Relates #118305
1 parent e454d74 commit ac0ba24

File tree

5 files changed

+369
-3
lines changed

5 files changed

+369
-3
lines changed

docs/changelog/118410.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 118410
2+
summary: Push down filter passed lookup join
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/predicate/Predicates.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public static Expression combineAnd(List<Expression> exps) {
6161
*
6262
* using the given combiner.
6363
*
64-
* While a bit longer, this method creates a balanced tree as oppose to a plain
64+
* While a bit longer, this method creates a balanced tree as opposed to a plain
6565
* recursive approach which creates an unbalanced one (either to the left or right).
6666
*/
6767
private static Expression combine(List<Expression> exps, BiFunction<Expression, Expression, Expression> combiner) {

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,3 +418,68 @@ count:long | type:keyword
418418
3 | Success
419419
1 | Disconnected
420420
;
421+
422+
//
423+
// Filtering tests
424+
//
425+
426+
lookupWithFilterOnLeftSideField
427+
required_capability: join_lookup_v5
428+
429+
FROM employees
430+
| EVAL language_code = languages
431+
| LOOKUP JOIN languages_lookup ON language_code
432+
| SORT emp_no
433+
| KEEP emp_no, language_code, language_name
434+
| WHERE emp_no >= 10091 AND emp_no < 10094
435+
;
436+
437+
emp_no:integer | language_code:integer | language_name:keyword
438+
10091 | 3 | Spanish
439+
10092 | 1 | English
440+
10093 | 3 | Spanish
441+
;
442+
443+
lookupMessageWithFilterOnRightSideField-Ignore
444+
required_capability: join_lookup_v5
445+
446+
FROM sample_data
447+
| LOOKUP JOIN message_types_lookup ON message
448+
| WHERE type == "Error"
449+
| KEEP @timestamp, client_ip, event_duration, message, type
450+
| SORT @timestamp DESC
451+
;
452+
453+
@timestamp:date | client_ip:ip | event_duration:long | message:keyword | type:keyword
454+
2023-10-23T13:53:55.832Z | 172.21.3.15 | 5033755 | Connection error | Error
455+
2023-10-23T13:52:55.015Z | 172.21.3.15 | 8268153 | Connection error | Error
456+
2023-10-23T13:51:54.732Z | 172.21.3.15 | 725448 | Connection error | Error
457+
;
458+
459+
lookupWithFieldAndRightSideAfterStats
460+
required_capability: join_lookup_v5
461+
462+
FROM sample_data
463+
| LOOKUP JOIN message_types_lookup ON message
464+
| STATS count = count(message) BY type
465+
| WHERE type == "Error"
466+
;
467+
468+
count:long | type:keyword
469+
3 | Error
470+
;
471+
472+
lookupWithFieldOnJoinKey-Ignore
473+
required_capability: join_lookup_v5
474+
475+
FROM employees
476+
| EVAL language_code = languages
477+
| LOOKUP JOIN languages_lookup ON language_code
478+
| WHERE language_code > 1 AND language_name IS NOT NULL
479+
| KEEP emp_no, language_code, language_name
480+
;
481+
482+
emp_no:integer | language_code:integer | language_name:keyword
483+
10001 | 2 | French
484+
10003 | 4 | German
485+
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.xpack.esql.core.expression.Expressions;
1616
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
1717
import org.elasticsearch.xpack.esql.core.expression.predicate.Predicates;
18+
import org.elasticsearch.xpack.esql.core.util.CollectionUtils;
1819
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
1920
import org.elasticsearch.xpack.esql.plan.logical.Eval;
2021
import org.elasticsearch.xpack.esql.plan.logical.Filter;
@@ -23,6 +24,8 @@
2324
import org.elasticsearch.xpack.esql.plan.logical.Project;
2425
import org.elasticsearch.xpack.esql.plan.logical.RegexExtract;
2526
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
27+
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
28+
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
2629

2730
import java.util.ArrayList;
2831
import java.util.List;
@@ -76,11 +79,63 @@ protected LogicalPlan rule(Filter filter) {
7679
} else if (child instanceof OrderBy orderBy) {
7780
// swap the filter with its child
7881
plan = orderBy.replaceChild(filter.with(orderBy.child(), condition));
82+
} else if (child instanceof Join join) {
83+
return pushDownPastJoin(filter, join);
7984
}
8085
// cannot push past a Limit, this could change the tailing result set returned
8186
return plan;
8287
}
8388

89+
private record ScopedFilter(List<Expression> commonFilters, List<Expression> leftFilters, List<Expression> rightFilters) {}
90+
91+
// split the filter condition in 3 parts:
92+
// 1. filter scoped to the left
93+
// 2. filter scoped to the right
94+
// 3. filter that requires both sides to be evaluated
95+
private static ScopedFilter scopeFilter(List<Expression> filters, LogicalPlan left, LogicalPlan right) {
96+
List<Expression> rest = new ArrayList<>(filters);
97+
List<Expression> leftFilters = new ArrayList<>();
98+
List<Expression> rightFilters = new ArrayList<>();
99+
100+
AttributeSet leftOutput = left.outputSet();
101+
AttributeSet rightOutput = right.outputSet();
102+
103+
// first remove things that are left scoped only
104+
rest.removeIf(f -> f.references().subsetOf(leftOutput) && leftFilters.add(f));
105+
// followed by right scoped only
106+
rest.removeIf(f -> f.references().subsetOf(rightOutput) && rightFilters.add(f));
107+
return new ScopedFilter(rest, leftFilters, rightFilters);
108+
}
109+
110+
private static LogicalPlan pushDownPastJoin(Filter filter, Join join) {
111+
LogicalPlan plan = filter;
112+
// pushdown only through LEFT joins
113+
// TODO: generalize this for other join types
114+
if (join.config().type() == JoinTypes.LEFT) {
115+
LogicalPlan left = join.left();
116+
LogicalPlan right = join.right();
117+
118+
// split the filter condition in 3 parts:
119+
// 1. filter scoped to the left
120+
// 2. filter scoped to the right
121+
// 3. filter that requires both sides to be evaluated
122+
ScopedFilter scoped = scopeFilter(Predicates.splitAnd(filter.condition()), left, right);
123+
// push the left scoped filter down to the left child, keep the rest intact
124+
if (scoped.leftFilters.size() > 0) {
125+
// push the filter down to the left child
126+
left = new Filter(left.source(), left, Predicates.combineAnd(scoped.leftFilters));
127+
// update the join with the new left child
128+
join = (Join) join.replaceLeft(left);
129+
130+
// keep the remaining filters in place, otherwise return the new join;
131+
Expression remainingFilter = Predicates.combineAnd(CollectionUtils.combine(scoped.commonFilters, scoped.rightFilters));
132+
plan = remainingFilter != null ? filter.with(join, remainingFilter) : join;
133+
}
134+
}
135+
// ignore the rest of the join
136+
return plan;
137+
}
138+
84139
private static Function<Expression, Expression> NO_OP = expression -> expression;
85140

86141
private static LogicalPlan maybePushDownPastUnary(

0 commit comments

Comments
 (0)