Skip to content

Commit 48e3f22

Browse files
Get basic case with translatable filters to work
1 parent 6d71404 commit 48e3f22

File tree

9 files changed

+243
-79
lines changed

9 files changed

+243
-79
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5050,3 +5050,65 @@ id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:int
50505050
15 | null | bar2 | null | null
50515051
null | null | plugh | null | null
50525052
;
5053+
5054+
5055+
lookupJoinWithPushableFilterOnLeft
5056+
required_capability: join_lookup_v12
5057+
required_capability: lookup_join_on_multiple_fields
5058+
5059+
FROM multi_column_joinable
5060+
| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool
5061+
| WHERE other2 > 5000
5062+
| KEEP id_int, name_str, extra1, other1, other2
5063+
| SORT id_int, name_str, extra1, other1, other2
5064+
| LIMIT 20
5065+
;
5066+
5067+
id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer
5068+
4 | David | qux | zeta | 6000
5069+
5 | Eve | quux | eta | 7000
5070+
5 | Eve | quux | theta | 8000
5071+
6 | null | corge | iota | 9000
5072+
7 | Grace | grault | kappa | 10000
5073+
8 | Hank | garply | lambda | 11000
5074+
12 | Liam | xyzzy | nu | 13000
5075+
13 | Mia | thud | xi | 14000
5076+
14 | Nina | foo2 | omicron | 15000
5077+
;
5078+
5079+
lookupJoinWithTwoPushableFiltersOnLeft
5080+
required_capability: join_lookup_v12
5081+
required_capability: lookup_join_on_multiple_fields
5082+
5083+
FROM multi_column_joinable
5084+
| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool
5085+
| WHERE other2 > 5000
5086+
| WHERE other1 like "*ta"
5087+
| KEEP id_int, name_str, extra1, other1, other2
5088+
| SORT id_int, name_str, extra1, other1, other2
5089+
| LIMIT 20
5090+
;
5091+
5092+
id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer
5093+
4 | David | qux | zeta | 6000
5094+
5 | Eve | quux | eta | 7000
5095+
5 | Eve | quux | theta | 8000
5096+
6 | null | corge | iota | 9000
5097+
;
5098+
5099+
lookupJoinWithMixLeftAndRightFilters
5100+
required_capability: join_lookup_v12
5101+
required_capability: lookup_join_on_multiple_fields
5102+
5103+
FROM multi_column_joinable
5104+
| LOOKUP JOIN multi_column_joinable_lookup ON id_int, is_active_bool
5105+
| WHERE other2 > 5000 AND (extra1 == "qux" OR extra1 == "foo2") AND other1 like ("*ta", "*ron")
5106+
| KEEP id_int, name_str, extra1, other1, other2
5107+
| SORT id_int, name_str, extra1, other1, other2
5108+
| LIMIT 20
5109+
;
5110+
5111+
id_int:integer | name_str:keyword | extra1:keyword | other1:keyword | other2:integer
5112+
4 | David | qux | zeta | 6000
5113+
14 | Nina | foo2 | omicron | 15000
5114+
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator;
3939
import org.elasticsearch.compute.operator.Driver;
4040
import org.elasticsearch.compute.operator.DriverContext;
41+
import org.elasticsearch.compute.operator.FilterOperator;
4142
import org.elasticsearch.compute.operator.Operator;
4243
import org.elasticsearch.compute.operator.OutputOperator;
4344
import org.elasticsearch.compute.operator.ProjectOperator;
@@ -73,10 +74,14 @@
7374
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
7475
import org.elasticsearch.xpack.esql.core.expression.Alias;
7576
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
77+
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
7678
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
7779
import org.elasticsearch.xpack.esql.core.tree.Source;
7880
import org.elasticsearch.xpack.esql.core.type.DataType;
81+
import org.elasticsearch.xpack.esql.evaluator.EvalMapper;
82+
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
7983
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
84+
import org.elasticsearch.xpack.esql.planner.Layout;
8085
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
8186
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
8287

@@ -355,13 +360,29 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
355360
warnings
356361
);
357362
releasables.add(queryOperator);
358-
363+
Layout.Builder builder = new Layout.Builder();
364+
builder.append(request.extractFields);
359365
List<Operator> operators = new ArrayList<>();
360366
if (request.extractFields.isEmpty() == false) {
361367
var extractFieldsOperator = extractFieldsOperator(shardContext.context, driverContext, request.extractFields);
368+
builder.append(request.extractFields);
362369
releasables.add(extractFieldsOperator);
363370
operators.add(extractFieldsOperator);
364371
}
372+
if (queryList instanceof PostJoinFilterable postJoinFilterable) {
373+
FilterExec filterExec = postJoinFilterable.getPostJoinFilter();
374+
Operator inputOperator;
375+
if (operators.isEmpty() == false) {
376+
inputOperator = operators.getLast();
377+
} else {
378+
inputOperator = queryOperator;
379+
}
380+
Operator postJoinFilter = filterExecOperator(filterExec, inputOperator, shardContext.context, driverContext, builder);
381+
if (postJoinFilter != null) {
382+
releasables.add(postJoinFilter);
383+
operators.add(postJoinFilter);
384+
}
385+
}
365386
operators.add(finishPages);
366387

367388
/*
@@ -423,6 +444,27 @@ public void onFailure(Exception e) {
423444
}
424445
}
425446

447+
private Operator filterExecOperator(
448+
FilterExec filterExec,
449+
Operator inputOperator, // not needed?
450+
EsPhysicalOperationProviders.ShardContext shardContext,
451+
DriverContext driverContext,
452+
Layout.Builder builder
453+
) {
454+
if (filterExec == null) {
455+
return null;
456+
}
457+
458+
var evaluatorFactory = EvalMapper.toEvaluator(
459+
FoldContext.small()/*is this correct*/,
460+
filterExec.condition(),
461+
builder.build(),
462+
List.of(shardContext)
463+
);
464+
var filterOperatorFactory = new FilterOperator.FilterOperatorFactory(evaluatorFactory);
465+
return filterOperatorFactory.get(driverContext);
466+
}
467+
426468
private static Operator extractFieldsOperator(
427469
EsPhysicalOperationProviders.ShardContext shardContext,
428470
DriverContext driverContext,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java

Lines changed: 57 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -10,32 +10,76 @@
1010
import org.apache.lucene.search.BooleanClause;
1111
import org.apache.lucene.search.BooleanQuery;
1212
import org.apache.lucene.search.Query;
13+
import org.elasticsearch.cluster.service.ClusterService;
1314
import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator;
1415
import org.elasticsearch.compute.operator.lookup.QueryList;
15-
import org.elasticsearch.index.query.QueryBuilder;
1616
import org.elasticsearch.index.query.SearchExecutionContext;
17-
import org.elasticsearch.xpack.esql.core.expression.Literal;
17+
import org.elasticsearch.xpack.esql.capabilities.TranslationAware;
18+
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.LucenePushdownPredicates;
19+
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
20+
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
21+
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
22+
import org.elasticsearch.xpack.esql.plugin.EsqlFlags;
23+
import org.elasticsearch.xpack.esql.stats.SearchContextStats;
1824

1925
import java.io.IOException;
2026
import java.util.List;
2127

28+
import static org.elasticsearch.xpack.esql.planner.TranslatorHandler.TRANSLATOR_HANDLER;
29+
2230
/**
2331
* A {@link LookupEnrichQueryGenerator} that combines multiple {@link QueryList}s into a single query.
2432
* Each query in the resulting query will be a conjunction of all queries from the input lists at the same position.
2533
* In the future we can extend this to support more complex expressions, such as disjunctions or negations.
2634
*/
27-
public class ExpressionQueryList implements LookupEnrichQueryGenerator {
35+
public class ExpressionQueryList implements LookupEnrichQueryGenerator, PostJoinFilterable {
2836
private final List<QueryList> queryLists;
29-
private final QueryBuilder preJoinFilter;
37+
private Query preJoinFilter;
38+
private FilterExec postJoinFilter;
3039
private final SearchExecutionContext context;
3140

32-
public ExpressionQueryList(List<QueryList> queryLists, SearchExecutionContext context, QueryBuilder preJoinFilter) {
33-
if (queryLists.size() < 2 && Literal.TRUE.equals(preJoinFilter)) {
41+
public ExpressionQueryList(
42+
List<QueryList> queryLists,
43+
SearchExecutionContext context,
44+
PhysicalPlan rightPreJoinPlan,
45+
ClusterService clusterService
46+
) {
47+
if (queryLists.size() < 2 && rightPreJoinPlan == null) {
3448
throw new IllegalArgumentException("ExpressionQueryList must have at least two QueryLists");
3549
}
3650
this.queryLists = queryLists;
37-
this.preJoinFilter = preJoinFilter;
3851
this.context = context;
52+
buildPrePostJoinFilter(rightPreJoinPlan, clusterService);
53+
}
54+
55+
private void buildPrePostJoinFilter(PhysicalPlan rightPreJoinPlan, ClusterService clusterService) {
56+
// we support a FilterExec as the pre-join filter
57+
// if we filter Exec is not translatable to a QueryBuilder, we will apply it after the join
58+
if (rightPreJoinPlan instanceof FilterExec filterExec) {
59+
try {
60+
LucenePushdownPredicates lucenePushdownPredicates = LucenePushdownPredicates.from(
61+
SearchContextStats.from(List.of(context)),
62+
new EsqlFlags(clusterService.getClusterSettings())
63+
);
64+
// If the pre-join filter is a FilterExec, we can convert it to a QueryBuilder
65+
// try to convert it to a QueryBuilder, if not possible apply it after the join
66+
if (filterExec instanceof TranslationAware translationAware) {
67+
preJoinFilter = translationAware.asQuery(lucenePushdownPredicates, TRANSLATOR_HANDLER)
68+
.toQueryBuilder()
69+
.toQuery(context);
70+
} else {
71+
// if the filter is not translatable, we will apply it after the join
72+
postJoinFilter = filterExec;
73+
}
74+
} catch (IOException e) {
75+
throw new IllegalArgumentException("Failed to translate pre-join filter: " + filterExec, e);
76+
}
77+
78+
} else if (rightPreJoinPlan instanceof EsQueryExec) {
79+
// we are good, nothing to do here
80+
} else {
81+
throw new IllegalArgumentException("Unsupported pre-join filter type: " + preJoinFilter.getClass().getName());
82+
}
3983
}
4084

4185
@Override
@@ -51,42 +95,12 @@ public Query getQuery(int position) throws IOException {
5195
builder.add(q, BooleanClause.Occur.FILTER);
5296
}
5397
// also attach the pre-join filter if it exists
54-
/*if (Literal.TRUE.equals(preJoinFilter) == false) {
55-
if (preJoinFilter instanceof TranslationAware translationAware) {
56-
Query preJoinQuery = tryToGetAsLuceneQuery(translationAware);
57-
if (preJoinQuery == null) {
58-
preJoinQuery = tryToGetThroughQueryBuilder(translationAware);
59-
}
60-
if (preJoinQuery == null) {
61-
throw new UnsupportedOperationException("Cannot translate pre-join filter to Lucene query: " + preJoinFilter);
62-
}
63-
builder.add(preJoinQuery, BooleanClause.Occur.FILTER);
64-
}
65-
}*/
6698
if (preJoinFilter != null) {
67-
// JULIAN TO DO: Can we precompile the query? I don't want to call toQuery for every row
68-
builder.add(preJoinFilter.toQuery(context), BooleanClause.Occur.FILTER);
99+
builder.add(preJoinFilter, BooleanClause.Occur.FILTER);
69100
}
70101
return builder.build();
71102
}
72103

73-
/*private Query tryToGetThroughQueryBuilder(TranslationAware translationAware) {
74-
// it seems I might need to pass a QueryBuilder, instead of Expression directly????
75-
// can a QueryBuilder support nested complex expressions with AND, OR, NOT?
76-
return translationAware.asQuery(WHAT_GOES_HERE, WHAT_GOES_HERE).toQueryBuilder().toQuery(queryLists.get(0).searchExecutionContext);
77-
}
78-
79-
private Query tryToGetAsLuceneQuery(TranslationAware translationAware) {
80-
// attempt to translate directly to a Lucene Query
81-
// not sure how to get the field name from the expression
82-
MappedFieldType fieldType = context.getFieldType(WHAT_GOES_HERE.fieldName().string());
83-
try {
84-
return translationAware.asLuceneQuery(fieldType, CONSTANT_SCORE_REWRITE, context);
85-
} catch (Exception e) {}
86-
// only a few expression types support asLuceneQuery, it is OK to fail here and we will try a different approach
87-
return null;
88-
}
89-
*/
90104
@Override
91105
public int getPositionCount() {
92106
int positionCount = queryLists.get(0).getPositionCount();
@@ -102,4 +116,9 @@ public int getPositionCount() {
102116
}
103117
return positionCount;
104118
}
119+
120+
@Override
121+
public FilterExec getPostJoinFilter() {
122+
return postJoinFilter;
123+
}
105124
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@
2222
import org.elasticsearch.compute.operator.lookup.RightChunkedLeftJoin;
2323
import org.elasticsearch.core.Releasable;
2424
import org.elasticsearch.core.Releasables;
25-
import org.elasticsearch.index.query.QueryBuilder;
2625
import org.elasticsearch.tasks.CancellableTask;
2726
import org.elasticsearch.xcontent.XContentBuilder;
2827
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
2928
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
3029
import org.elasticsearch.xpack.esql.core.tree.Source;
3130
import org.elasticsearch.xpack.esql.core.type.DataType;
31+
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
3232
import org.elasticsearch.xpack.esql.planner.Layout;
3333

3434
import java.io.IOException;
@@ -70,7 +70,7 @@ public record Factory(
7070
String lookupIndex,
7171
List<NamedExpression> loadFields,
7272
Source source,
73-
QueryBuilder preJoinFilter
73+
PhysicalPlan rightPreJoinPlan
7474
) implements OperatorFactory {
7575
@Override
7676
public String describe() {
@@ -84,7 +84,7 @@ public String describe() {
8484
.append(" inputChannel=")
8585
.append(matchField.channel);
8686
}
87-
stringBuilder.append(" pre_join_filter=").append(preJoinFilter);
87+
stringBuilder.append(" rightPreJoinPlan=").append(rightPreJoinPlan);
8888
stringBuilder.append("]");
8989
return stringBuilder.toString();
9090
}
@@ -102,7 +102,7 @@ public Operator get(DriverContext driverContext) {
102102
lookupIndex,
103103
loadFields,
104104
source,
105-
preJoinFilter
105+
rightPreJoinPlan
106106
);
107107
}
108108
}
@@ -115,8 +115,8 @@ public Operator get(DriverContext driverContext) {
115115
private final List<NamedExpression> loadFields;
116116
private final Source source;
117117
private long totalRows = 0L;
118-
private List<MatchConfig> matchFields;
119-
private QueryBuilder preJoinFilter;
118+
private final List<MatchConfig> matchFields;
119+
private final PhysicalPlan rightPreJoinPlan;
120120
/**
121121
* Total number of pages emitted by this {@link Operator}.
122122
*/
@@ -137,7 +137,7 @@ public LookupFromIndexOperator(
137137
String lookupIndex,
138138
List<NamedExpression> loadFields,
139139
Source source,
140-
QueryBuilder preJoinFilter
140+
PhysicalPlan rightPreJoinPlan
141141
) {
142142
super(driverContext, lookupService.getThreadContext(), maxOutstandingRequests);
143143
this.matchFields = matchFields;
@@ -148,7 +148,7 @@ public LookupFromIndexOperator(
148148
this.lookupIndex = lookupIndex;
149149
this.loadFields = loadFields;
150150
this.source = source;
151-
this.preJoinFilter = preJoinFilter;
151+
this.rightPreJoinPlan = rightPreJoinPlan;
152152
}
153153

154154
@Override
@@ -172,7 +172,7 @@ protected void performAsync(Page inputPage, ActionListener<OngoingJoin> listener
172172
new Page(inputBlockArray),
173173
loadFields,
174174
source,
175-
preJoinFilter
175+
rightPreJoinPlan
176176
);
177177
lookupService.lookupAsync(
178178
request,
@@ -229,7 +229,7 @@ public String toString() {
229229
.append(" inputChannel=")
230230
.append(matchField.channel);
231231
}
232-
stringBuilder.append(" pre_join_filter=").append(preJoinFilter);
232+
stringBuilder.append(" right_pre_join_plan=").append(rightPreJoinPlan);
233233
stringBuilder.append("]");
234234
return stringBuilder.toString();
235235
}

0 commit comments

Comments
 (0)