Skip to content

Commit 2cdefe0

Browse files
authored
ESQL: Push down filter passed lookup join (#118410) (#118702)
Improve the planner to detect filters that can be pushed down 'through' a LOOKUP JOIN by determining the conditions scoped to the left/main side and moving them closer to the source. Relates #118305
1 parent bbcdc81 commit 2cdefe0

File tree

5 files changed

+369
-3
lines changed

5 files changed

+369
-3
lines changed

docs/changelog/118410.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 118410
2+
summary: Push down filter passed lookup join
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/predicate/Predicates.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public static Expression combineAnd(List<Expression> exps) {
6161
*
6262
* using the given combiner.
6363
*
64-
* While a bit longer, this method creates a balanced tree as oppose to a plain
64+
* While a bit longer, this method creates a balanced tree as opposed to a plain
6565
* recursive approach which creates an unbalanced one (either to the left or right).
6666
*/
6767
private static Expression combine(List<Expression> exps, BiFunction<Expression, Expression, Expression> combiner) {

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,3 +342,68 @@ count:long | type:keyword
342342
3 | Success
343343
1 | Disconnected
344344
;
345+
346+
//
347+
// Filtering tests
348+
//
349+
350+
lookupWithFilterOnLeftSideField
351+
required_capability: join_lookup_v5
352+
353+
FROM employees
354+
| EVAL language_code = languages
355+
| LOOKUP JOIN languages_lookup ON language_code
356+
| SORT emp_no
357+
| KEEP emp_no, language_code, language_name
358+
| WHERE emp_no >= 10091 AND emp_no < 10094
359+
;
360+
361+
emp_no:integer | language_code:integer | language_name:keyword
362+
10091 | 3 | Spanish
363+
10092 | 1 | English
364+
10093 | 3 | Spanish
365+
;
366+
367+
lookupMessageWithFilterOnRightSideField-Ignore
368+
required_capability: join_lookup_v5
369+
370+
FROM sample_data
371+
| LOOKUP JOIN message_types_lookup ON message
372+
| WHERE type == "Error"
373+
| KEEP @timestamp, client_ip, event_duration, message, type
374+
| SORT @timestamp DESC
375+
;
376+
377+
@timestamp:date | client_ip:ip | event_duration:long | message:keyword | type:keyword
378+
2023-10-23T13:53:55.832Z | 172.21.3.15 | 5033755 | Connection error | Error
379+
2023-10-23T13:52:55.015Z | 172.21.3.15 | 8268153 | Connection error | Error
380+
2023-10-23T13:51:54.732Z | 172.21.3.15 | 725448 | Connection error | Error
381+
;
382+
383+
lookupWithFieldAndRightSideAfterStats
384+
required_capability: join_lookup_v5
385+
386+
FROM sample_data
387+
| LOOKUP JOIN message_types_lookup ON message
388+
| STATS count = count(message) BY type
389+
| WHERE type == "Error"
390+
;
391+
392+
count:long | type:keyword
393+
3 | Error
394+
;
395+
396+
lookupWithFieldOnJoinKey-Ignore
397+
required_capability: join_lookup_v5
398+
399+
FROM employees
400+
| EVAL language_code = languages
401+
| LOOKUP JOIN languages_lookup ON language_code
402+
| WHERE language_code > 1 AND language_name IS NOT NULL
403+
| KEEP emp_no, language_code, language_name
404+
;
405+
406+
emp_no:integer | language_code:integer | language_name:keyword
407+
10001 | 2 | French
408+
10003 | 4 | German
409+
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.xpack.esql.core.expression.Expressions;
1616
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
1717
import org.elasticsearch.xpack.esql.core.expression.predicate.Predicates;
18+
import org.elasticsearch.xpack.esql.core.util.CollectionUtils;
1819
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
1920
import org.elasticsearch.xpack.esql.plan.logical.Eval;
2021
import org.elasticsearch.xpack.esql.plan.logical.Filter;
@@ -23,6 +24,8 @@
2324
import org.elasticsearch.xpack.esql.plan.logical.Project;
2425
import org.elasticsearch.xpack.esql.plan.logical.RegexExtract;
2526
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
27+
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
28+
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
2629

2730
import java.util.ArrayList;
2831
import java.util.List;
@@ -76,11 +79,63 @@ protected LogicalPlan rule(Filter filter) {
7679
} else if (child instanceof OrderBy orderBy) {
7780
// swap the filter with its child
7881
plan = orderBy.replaceChild(filter.with(orderBy.child(), condition));
82+
} else if (child instanceof Join join) {
83+
return pushDownPastJoin(filter, join);
7984
}
8085
// cannot push past a Limit, this could change the tailing result set returned
8186
return plan;
8287
}
8388

89+
private record ScopedFilter(List<Expression> commonFilters, List<Expression> leftFilters, List<Expression> rightFilters) {}
90+
91+
// split the filter condition in 3 parts:
92+
// 1. filter scoped to the left
93+
// 2. filter scoped to the right
94+
// 3. filter that requires both sides to be evaluated
95+
private static ScopedFilter scopeFilter(List<Expression> filters, LogicalPlan left, LogicalPlan right) {
96+
List<Expression> rest = new ArrayList<>(filters);
97+
List<Expression> leftFilters = new ArrayList<>();
98+
List<Expression> rightFilters = new ArrayList<>();
99+
100+
AttributeSet leftOutput = left.outputSet();
101+
AttributeSet rightOutput = right.outputSet();
102+
103+
// first remove things that are left scoped only
104+
rest.removeIf(f -> f.references().subsetOf(leftOutput) && leftFilters.add(f));
105+
// followed by right scoped only
106+
rest.removeIf(f -> f.references().subsetOf(rightOutput) && rightFilters.add(f));
107+
return new ScopedFilter(rest, leftFilters, rightFilters);
108+
}
109+
110+
private static LogicalPlan pushDownPastJoin(Filter filter, Join join) {
111+
LogicalPlan plan = filter;
112+
// pushdown only through LEFT joins
113+
// TODO: generalize this for other join types
114+
if (join.config().type() == JoinTypes.LEFT) {
115+
LogicalPlan left = join.left();
116+
LogicalPlan right = join.right();
117+
118+
// split the filter condition in 3 parts:
119+
// 1. filter scoped to the left
120+
// 2. filter scoped to the right
121+
// 3. filter that requires both sides to be evaluated
122+
ScopedFilter scoped = scopeFilter(Predicates.splitAnd(filter.condition()), left, right);
123+
// push the left scoped filter down to the left child, keep the rest intact
124+
if (scoped.leftFilters.size() > 0) {
125+
// push the filter down to the left child
126+
left = new Filter(left.source(), left, Predicates.combineAnd(scoped.leftFilters));
127+
// update the join with the new left child
128+
join = (Join) join.replaceLeft(left);
129+
130+
// keep the remaining filters in place, otherwise return the new join;
131+
Expression remainingFilter = Predicates.combineAnd(CollectionUtils.combine(scoped.commonFilters, scoped.rightFilters));
132+
plan = remainingFilter != null ? filter.with(join, remainingFilter) : join;
133+
}
134+
}
135+
// ignore the rest of the join
136+
return plan;
137+
}
138+
84139
private static Function<Expression, Expression> NO_OP = expression -> expression;
85140

86141
private static LogicalPlan maybePushDownPastUnary(

0 commit comments

Comments
 (0)