Skip to content

Commit 6d71404

Browse files
LookupJoin prejoin filter POC WIP
1 parent b6c615c commit 6d71404

File tree

7 files changed

+159
-76
lines changed

7 files changed

+159
-76
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/EnrichQuerySourceOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ Page buildPage(int positions, IntVector.Builder positionsBuilder, IntVector.Buil
159159
return page;
160160
}
161161

162-
private Query nextQuery() {
162+
private Query nextQuery() throws IOException {
163163
++queryPosition;
164164
while (isFinished() == false) {
165165
Query query = queryList.getQuery(queryPosition);

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/ExpressionQueryList.java

Lines changed: 0 additions & 61 deletions
This file was deleted.

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupEnrichQueryGenerator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import org.apache.lucene.search.Query;
1111
import org.elasticsearch.core.Nullable;
1212

13+
import java.io.IOException;
14+
1315
/**
1416
* An interface to generates queries for the lookup and enrich operators.
1517
* This interface is used to retrieve queries based on a position index.
@@ -20,7 +22,7 @@ public interface LookupEnrichQueryGenerator {
2022
* Returns the query at the given position.
2123
*/
2224
@Nullable
23-
Query getQuery(int position);
25+
Query getQuery(int position) throws IOException;
2426

2527
/**
2628
* Returns the number of queries in this generator
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.enrich;
9+
10+
import org.apache.lucene.search.BooleanClause;
11+
import org.apache.lucene.search.BooleanQuery;
12+
import org.apache.lucene.search.Query;
13+
import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator;
14+
import org.elasticsearch.compute.operator.lookup.QueryList;
15+
import org.elasticsearch.index.query.QueryBuilder;
16+
import org.elasticsearch.index.query.SearchExecutionContext;
17+
import org.elasticsearch.xpack.esql.core.expression.Literal;
18+
19+
import java.io.IOException;
20+
import java.util.List;
21+
22+
/**
23+
* A {@link LookupEnrichQueryGenerator} that combines multiple {@link QueryList}s into a single query.
24+
* Each query in the resulting query will be a conjunction of all queries from the input lists at the same position.
25+
* In the future we can extend this to support more complex expressions, such as disjunctions or negations.
26+
*/
27+
public class ExpressionQueryList implements LookupEnrichQueryGenerator {
28+
private final List<QueryList> queryLists;
29+
private final QueryBuilder preJoinFilter;
30+
private final SearchExecutionContext context;
31+
32+
public ExpressionQueryList(List<QueryList> queryLists, SearchExecutionContext context, QueryBuilder preJoinFilter) {
33+
if (queryLists.size() < 2 && Literal.TRUE.equals(preJoinFilter)) {
34+
throw new IllegalArgumentException("ExpressionQueryList must have at least two QueryLists");
35+
}
36+
this.queryLists = queryLists;
37+
this.preJoinFilter = preJoinFilter;
38+
this.context = context;
39+
}
40+
41+
@Override
42+
public Query getQuery(int position) throws IOException {
43+
BooleanQuery.Builder builder = new BooleanQuery.Builder();
44+
for (QueryList queryList : queryLists) {
45+
Query q = queryList.getQuery(position);
46+
if (q == null) {
47+
// if any of the matchFields are null, it means there is no match for this position
48+
// A AND NULL is always NULL, so we can skip this position
49+
return null;
50+
}
51+
builder.add(q, BooleanClause.Occur.FILTER);
52+
}
53+
// also attach the pre-join filter if it exists
54+
/*if (Literal.TRUE.equals(preJoinFilter) == false) {
55+
if (preJoinFilter instanceof TranslationAware translationAware) {
56+
Query preJoinQuery = tryToGetAsLuceneQuery(translationAware);
57+
if (preJoinQuery == null) {
58+
preJoinQuery = tryToGetThroughQueryBuilder(translationAware);
59+
}
60+
if (preJoinQuery == null) {
61+
throw new UnsupportedOperationException("Cannot translate pre-join filter to Lucene query: " + preJoinFilter);
62+
}
63+
builder.add(preJoinQuery, BooleanClause.Occur.FILTER);
64+
}
65+
}*/
66+
if (preJoinFilter != null) {
67+
// JULIAN TO DO: Can we precompile the query? I don't want to call toQuery for every row
68+
builder.add(preJoinFilter.toQuery(context), BooleanClause.Occur.FILTER);
69+
}
70+
return builder.build();
71+
}
72+
73+
/*private Query tryToGetThroughQueryBuilder(TranslationAware translationAware) {
74+
// it seems I might need to pass a QueryBuilder, instead of Expression directly????
75+
// can a QueryBuilder support nested complex expressions with AND, OR, NOT?
76+
return translationAware.asQuery(WHAT_GOES_HERE, WHAT_GOES_HERE).toQueryBuilder().toQuery(queryLists.get(0).searchExecutionContext);
77+
}
78+
79+
private Query tryToGetAsLuceneQuery(TranslationAware translationAware) {
80+
// attempt to translate directly to a Lucene Query
81+
// not sure how to get the field name from the expression
82+
MappedFieldType fieldType = context.getFieldType(WHAT_GOES_HERE.fieldName().string());
83+
try {
84+
return translationAware.asLuceneQuery(fieldType, CONSTANT_SCORE_REWRITE, context);
85+
} catch (Exception e) {}
86+
// only a few expression types support asLuceneQuery, it is OK to fail here and we will try a different approach
87+
return null;
88+
}
89+
*/
90+
@Override
91+
public int getPositionCount() {
92+
int positionCount = queryLists.get(0).getPositionCount();
93+
for (QueryList queryList : queryLists) {
94+
if (queryList.getPositionCount() != positionCount) {
95+
throw new IllegalArgumentException(
96+
"All QueryLists must have the same position count, expected: "
97+
+ positionCount
98+
+ ", but got: "
99+
+ queryList.getPositionCount()
100+
);
101+
}
102+
}
103+
return positionCount;
104+
}
105+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.compute.operator.lookup.RightChunkedLeftJoin;
2323
import org.elasticsearch.core.Releasable;
2424
import org.elasticsearch.core.Releasables;
25+
import org.elasticsearch.index.query.QueryBuilder;
2526
import org.elasticsearch.tasks.CancellableTask;
2627
import org.elasticsearch.xcontent.XContentBuilder;
2728
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
@@ -68,7 +69,8 @@ public record Factory(
6869
String lookupIndexPattern,
6970
String lookupIndex,
7071
List<NamedExpression> loadFields,
71-
Source source
72+
Source source,
73+
QueryBuilder preJoinFilter
7274
) implements OperatorFactory {
7375
@Override
7476
public String describe() {
@@ -82,6 +84,7 @@ public String describe() {
8284
.append(" inputChannel=")
8385
.append(matchField.channel);
8486
}
87+
stringBuilder.append(" pre_join_filter=").append(preJoinFilter);
8588
stringBuilder.append("]");
8689
return stringBuilder.toString();
8790
}
@@ -98,7 +101,8 @@ public Operator get(DriverContext driverContext) {
98101
lookupIndexPattern,
99102
lookupIndex,
100103
loadFields,
101-
source
104+
source,
105+
preJoinFilter
102106
);
103107
}
104108
}
@@ -112,6 +116,7 @@ public Operator get(DriverContext driverContext) {
112116
private final Source source;
113117
private long totalRows = 0L;
114118
private List<MatchConfig> matchFields;
119+
private QueryBuilder preJoinFilter;
115120
/**
116121
* Total number of pages emitted by this {@link Operator}.
117122
*/
@@ -131,7 +136,8 @@ public LookupFromIndexOperator(
131136
String lookupIndexPattern,
132137
String lookupIndex,
133138
List<NamedExpression> loadFields,
134-
Source source
139+
Source source,
140+
QueryBuilder preJoinFilter
135141
) {
136142
super(driverContext, lookupService.getThreadContext(), maxOutstandingRequests);
137143
this.matchFields = matchFields;
@@ -142,6 +148,7 @@ public LookupFromIndexOperator(
142148
this.lookupIndex = lookupIndex;
143149
this.loadFields = loadFields;
144150
this.source = source;
151+
this.preJoinFilter = preJoinFilter;
145152
}
146153

147154
@Override
@@ -164,7 +171,8 @@ protected void performAsync(Page inputPage, ActionListener<OngoingJoin> listener
164171
matchFields,
165172
new Page(inputBlockArray),
166173
loadFields,
167-
source
174+
source,
175+
preJoinFilter
168176
);
169177
lookupService.lookupAsync(
170178
request,
@@ -221,6 +229,7 @@ public String toString() {
221229
.append(" inputChannel=")
222230
.append(matchField.channel);
223231
}
232+
stringBuilder.append(" pre_join_filter=").append(preJoinFilter);
224233
stringBuilder.append("]");
225234
return stringBuilder.toString();
226235
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020
import org.elasticsearch.compute.data.BlockStreamInput;
2121
import org.elasticsearch.compute.data.Page;
2222
import org.elasticsearch.compute.operator.Warnings;
23-
import org.elasticsearch.compute.operator.lookup.ExpressionQueryList;
2423
import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator;
2524
import org.elasticsearch.compute.operator.lookup.QueryList;
2625
import org.elasticsearch.core.Nullable;
2726
import org.elasticsearch.core.Releasables;
27+
import org.elasticsearch.index.query.QueryBuilder;
2828
import org.elasticsearch.index.query.SearchExecutionContext;
2929
import org.elasticsearch.index.shard.ShardId;
3030
import org.elasticsearch.indices.IndicesService;
@@ -90,7 +90,8 @@ protected TransportRequest transportRequest(LookupFromIndexService.Request reque
9090
null,
9191
request.extractFields,
9292
request.matchFields,
93-
request.source
93+
request.source,
94+
request.preJoinFilter
9495
);
9596
}
9697

@@ -103,7 +104,7 @@ protected LookupEnrichQueryGenerator queryList(
103104
@Nullable DataType inputDataType,
104105
Warnings warnings
105106
) {
106-
if (request.matchFields.size() == 1) {
107+
if (request.matchFields.size() == 1 && request.preJoinFilter == null) {
107108
return termQueryList(
108109
context.getFieldType(request.matchFields.get(0).fieldName().string()),
109110
context,
@@ -124,7 +125,7 @@ protected LookupEnrichQueryGenerator queryList(
124125
).onlySingleValues(warnings, "LOOKUP JOIN encountered multi-value");
125126
queryLists.add(q);
126127
}
127-
return new ExpressionQueryList(queryLists);
128+
return new ExpressionQueryList(queryLists, context, request.preJoinFilter);
128129
}
129130

130131
@Override
@@ -139,6 +140,7 @@ protected AbstractLookupService.LookupResponse readLookupResponse(StreamInput in
139140

140141
public static class Request extends AbstractLookupService.Request {
141142
private final List<LookupFromIndexOperator.MatchConfig> matchFields;
143+
private final QueryBuilder preJoinFilter;
142144

143145
Request(
144146
String sessionId,
@@ -147,15 +149,18 @@ public static class Request extends AbstractLookupService.Request {
147149
List<LookupFromIndexOperator.MatchConfig> matchFields,
148150
Page inputPage,
149151
List<NamedExpression> extractFields,
150-
Source source
152+
Source source,
153+
QueryBuilder preJoinFilter
151154
) {
152155
super(sessionId, index, indexPattern, matchFields.get(0).type(), inputPage, extractFields, source);
153156
this.matchFields = matchFields;
157+
this.preJoinFilter = preJoinFilter;
154158
}
155159
}
156160

157161
protected static class TransportRequest extends AbstractLookupService.TransportRequest {
158162
private final List<LookupFromIndexOperator.MatchConfig> matchFields;
163+
private final QueryBuilder preJoinFilter;
159164

160165
TransportRequest(
161166
String sessionId,
@@ -166,10 +171,12 @@ protected static class TransportRequest extends AbstractLookupService.TransportR
166171
Page toRelease,
167172
List<NamedExpression> extractFields,
168173
List<LookupFromIndexOperator.MatchConfig> matchFields,
169-
Source source
174+
Source source,
175+
QueryBuilder preJoinFilter
170176
) {
171177
super(sessionId, shardId, indexPattern, inputDataType, inputPage, toRelease, extractFields, source);
172178
this.matchFields = matchFields;
179+
this.preJoinFilter = preJoinFilter;
173180
}
174181

175182
static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) throws IOException {
@@ -211,6 +218,10 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro
211218
matchFields = new ArrayList<>(1);
212219
matchFields.add(new LookupFromIndexOperator.MatchConfig(new FieldAttribute.FieldName(matchField), 0, inputDataType));
213220
}
221+
QueryBuilder preJoinFilter = null;
222+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_ON_MANY_FIELDS)) {
223+
preJoinFilter = planIn.readOptionalNamedWriteable(QueryBuilder.class);
224+
}
214225
TransportRequest result = new TransportRequest(
215226
sessionId,
216227
shardId,
@@ -220,7 +231,8 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro
220231
inputPage,
221232
extractFields,
222233
matchFields,
223-
source
234+
source,
235+
preJoinFilter
224236
);
225237
result.setParentTask(parentTaskId);
226238
return result;
@@ -254,11 +266,19 @@ public void writeTo(StreamOutput out) throws IOException {
254266
} else if (matchFields.size() > 1) {
255267
throw new EsqlIllegalArgumentException("LOOKUP JOIN on multiple fields is not supported on remote node");
256268
}
269+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_ON_MANY_FIELDS)) {
270+
planOut.writeOptionalNamedWriteable(preJoinFilter);
271+
} else if (preJoinFilter != null) {
272+
throw new EsqlIllegalArgumentException("LOOKUP JOIN with pre-join filter is not supported on remote node");
273+
}
257274
}
258275

259276
@Override
260277
protected String extraDescription() {
261-
return " ,match_fields=" + matchFields.stream().map(x -> x.fieldName().string()).collect(Collectors.joining(", "));
278+
return " ,match_fields="
279+
+ matchFields.stream().map(x -> x.fieldName().string()).collect(Collectors.joining(", "))
280+
+ ", pre_join_filter="
281+
+ preJoinFilter;
262282
}
263283
}
264284

0 commit comments

Comments
 (0)