Skip to content

Commit abfe672

Browse files
Limit the change to pushable filters only, make the filter optional
1 parent 2ee02a1 commit abfe672

File tree

7 files changed

+50
-120
lines changed

7 files changed

+50
-120
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@
7878
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
7979
import org.elasticsearch.xpack.esql.core.tree.Source;
8080
import org.elasticsearch.xpack.esql.core.type.DataType;
81-
import org.elasticsearch.xpack.esql.core.type.EsField;
8281
import org.elasticsearch.xpack.esql.evaluator.EvalMapper;
8382
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
8483
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
@@ -352,36 +351,12 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
352351
warnings
353352
);
354353
releasables.add(queryOperator);
355-
Layout.Builder builder = new Layout.Builder();
356-
// append the docsIds and positions to the layout
357-
builder.append(
358-
// this looks wrong, what is the datatype for the Docs? It says DocVector but it is not a DataType
359-
new FieldAttribute(Source.EMPTY, "Docs", new EsField("Docs", DataType.DOC_DATA_TYPE, Collections.emptyMap(), false))
360-
);
361-
builder.append(
362-
new FieldAttribute(Source.EMPTY, "Positions", new EsField("Positions", DataType.INTEGER, Collections.emptyMap(), false))
363-
);
364354
List<Operator> operators = new ArrayList<>();
365355
if (request.extractFields.isEmpty() == false) {
366356
var extractFieldsOperator = extractFieldsOperator(shardContext.context, driverContext, request.extractFields);
367-
builder.append(request.extractFields);
368357
releasables.add(extractFieldsOperator);
369358
operators.add(extractFieldsOperator);
370359
}
371-
if (queryList instanceof PostJoinFilterable postJoinFilterable) {
372-
FilterExec filterExec = postJoinFilterable.getPostJoinFilter();
373-
Operator inputOperator;
374-
if (operators.isEmpty() == false) {
375-
inputOperator = operators.getLast();
376-
} else {
377-
inputOperator = queryOperator;
378-
}
379-
Operator postJoinFilter = filterExecOperator(filterExec, inputOperator, shardContext.context, driverContext, builder);
380-
if (postJoinFilter != null) {
381-
releasables.add(postJoinFilter);
382-
operators.add(postJoinFilter);
383-
}
384-
}
385360
operators.add(finishPages);
386361

387362
/*

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java

Lines changed: 22 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
package org.elasticsearch.xpack.esql.enrich;
99

10+
import org.apache.logging.log4j.LogManager;
11+
import org.apache.logging.log4j.Logger;
1012
import org.apache.lucene.search.BooleanClause;
1113
import org.apache.lucene.search.BooleanQuery;
1214
import org.apache.lucene.search.Query;
@@ -15,10 +17,8 @@
1517
import org.elasticsearch.compute.operator.lookup.QueryList;
1618
import org.elasticsearch.index.query.SearchExecutionContext;
1719
import org.elasticsearch.xpack.esql.capabilities.TranslationAware;
20+
import org.elasticsearch.xpack.esql.core.expression.Expression;
1821
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.LucenePushdownPredicates;
19-
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
20-
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
21-
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
2222
import org.elasticsearch.xpack.esql.plugin.EsqlFlags;
2323
import org.elasticsearch.xpack.esql.stats.SearchContextStats;
2424

@@ -33,66 +33,55 @@
3333
* Each query in the resulting query will be a conjunction of all queries from the input lists at the same position.
3434
* In the future we can extend this to support more complex expressions, such as disjunctions or negations.
3535
*/
36-
public class ExpressionQueryList implements LookupEnrichQueryGenerator, PostJoinFilterable {
36+
public class ExpressionQueryList implements LookupEnrichQueryGenerator {
37+
private static final Logger logger = LogManager.getLogger(ExpressionQueryList.class);
3738
private final List<QueryList> queryLists;
3839
private final List<Query> preJoinFilters = new ArrayList<>();
39-
private FilterExec postJoinFilter;
4040
private final SearchExecutionContext context;
4141

4242
public ExpressionQueryList(
4343
List<QueryList> queryLists,
4444
SearchExecutionContext context,
45-
PhysicalPlan rightPreJoinPlan,
45+
Expression optionalFilter,
4646
ClusterService clusterService
4747
) {
48-
if (queryLists.size() < 2 && rightPreJoinPlan == null) {
48+
if (queryLists.size() < 2 && optionalFilter == null) {
4949
throw new IllegalArgumentException("ExpressionQueryList must have at least two QueryLists");
5050
}
5151
this.queryLists = queryLists;
5252
this.context = context;
53-
buildPrePostJoinFilter(rightPreJoinPlan, clusterService);
53+
buildPrePostJoinFilter(optionalFilter, clusterService);
5454
}
5555

56-
private void buildPrePostJoinFilter(PhysicalPlan rightPreJoinPlan, ClusterService clusterService) {
56+
private void buildPrePostJoinFilter(Expression optionalFilter, ClusterService clusterService) {
5757
// we support a FilterExec as the pre-join filter
5858
// if the filter Exec is not translatable to a QueryBuilder, we will apply it after the join
59-
if (rightPreJoinPlan instanceof FilterExec filterExec) {
60-
try {
59+
try {
60+
// If the pre-join filter is a FilterExec, we can convert it to a QueryBuilder
61+
// try to convert it to a QueryBuilder, if not possible apply it after the join
62+
if (optionalFilter instanceof TranslationAware translationAware) {
6163
LucenePushdownPredicates lucenePushdownPredicates = LucenePushdownPredicates.from(
6264
SearchContextStats.from(List.of(context)),
6365
new EsqlFlags(clusterService.getClusterSettings())
6466
);
65-
// If the pre-join filter is a FilterExec, we can convert it to a QueryBuilder
66-
// try to convert it to a QueryBuilder, if not possible apply it after the join
67-
if (filterExec.condition() instanceof TranslationAware translationAware
68-
&& TranslationAware.Translatable.YES.equals(translationAware.translatable(lucenePushdownPredicates))) {
67+
if (TranslationAware.Translatable.YES.equals(translationAware.translatable(lucenePushdownPredicates))) {
6968
preJoinFilters.add(
7069
translationAware.asQuery(lucenePushdownPredicates, TRANSLATOR_HANDLER).toQueryBuilder().toQuery(context)
7170
);
72-
} else {
73-
// if the filter is not translatable, we will apply it after the join
74-
postJoinFilter = filterExec;
7571
}
76-
} catch (IOException e) {
77-
throw new IllegalArgumentException("Failed to translate pre-join filter: " + filterExec, e);
7872
}
79-
80-
} else if (rightPreJoinPlan instanceof EsQueryExec esQueryExec) {
81-
try {
82-
// check the EsQueryExec for a pre-join filter
83-
if (esQueryExec.query() != null) {
84-
preJoinFilters.add(esQueryExec.query().toQuery(context));
85-
}
86-
} catch (IOException e) {
87-
throw new IllegalArgumentException("Failed to translate pre-join filter: " + esQueryExec, e);
88-
}
89-
} else if (rightPreJoinPlan != null) {
90-
throw new IllegalArgumentException("Unsupported pre-join filter type: " + rightPreJoinPlan.getClass().getName());
73+
// If the filter is not translatable we will not apply it for now
74+
// as performance testing showed no significant difference in performance.
75+
// We can revisit this in the future if needed.
76+
// The filter is optional, so that is OK
77+
} catch (IOException e) {
78+
// as the filter is optional an error in its application will be ignored
79+
logger.error(() -> "Failed to translate optional pre-join filter: [" + optionalFilter + "]", e);
9180
}
9281
}
9382

9483
@Override
95-
public Query getQuery(int position) throws IOException {
84+
public Query getQuery(int position) {
9685
BooleanQuery.Builder builder = new BooleanQuery.Builder();
9786
for (QueryList queryList : queryLists) {
9887
Query q = queryList.getQuery(position);
@@ -125,9 +114,4 @@ public int getPositionCount() {
125114
}
126115
return positionCount;
127116
}
128-
129-
@Override
130-
public FilterExec getPostJoinFilter() {
131-
return postJoinFilter;
132-
}
133117
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@
2323
import org.elasticsearch.core.Releasables;
2424
import org.elasticsearch.tasks.CancellableTask;
2525
import org.elasticsearch.xcontent.XContentBuilder;
26+
import org.elasticsearch.xpack.esql.core.expression.Expression;
2627
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
2728
import org.elasticsearch.xpack.esql.core.tree.Source;
28-
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
2929

3030
import java.io.IOException;
3131
import java.util.ArrayList;
@@ -48,7 +48,7 @@ public record Factory(
4848
String lookupIndex,
4949
List<NamedExpression> loadFields,
5050
Source source,
51-
PhysicalPlan rightPreJoinPlan
51+
Expression optionalFilter
5252
) implements OperatorFactory {
5353
@Override
5454
public String describe() {
@@ -62,7 +62,7 @@ public String describe() {
6262
.append(" inputChannel=")
6363
.append(matchField.channel());
6464
}
65-
stringBuilder.append(" right_pre_join_plan=").append(rightPreJoinPlan);
65+
stringBuilder.append(" optional_filter=").append(optionalFilter);
6666
stringBuilder.append("]");
6767
return stringBuilder.toString();
6868
}
@@ -80,7 +80,7 @@ public Operator get(DriverContext driverContext) {
8080
lookupIndex,
8181
loadFields,
8282
source,
83-
rightPreJoinPlan
83+
optionalFilter
8484
);
8585
}
8686
}
@@ -94,7 +94,7 @@ public Operator get(DriverContext driverContext) {
9494
private final Source source;
9595
private long totalRows = 0L;
9696
private final List<MatchConfig> matchFields;
97-
private final PhysicalPlan rightPreJoinPlan;
97+
private final Expression optionalFilter;
9898
/**
9999
* Total number of pages emitted by this {@link Operator}.
100100
*/
@@ -115,7 +115,7 @@ public LookupFromIndexOperator(
115115
String lookupIndex,
116116
List<NamedExpression> loadFields,
117117
Source source,
118-
PhysicalPlan rightPreJoinPlan
118+
Expression optionalFilter
119119
) {
120120
super(driverContext, lookupService.getThreadContext(), maxOutstandingRequests);
121121
this.matchFields = matchFields;
@@ -126,7 +126,7 @@ public LookupFromIndexOperator(
126126
this.lookupIndex = lookupIndex;
127127
this.loadFields = loadFields;
128128
this.source = source;
129-
this.rightPreJoinPlan = rightPreJoinPlan;
129+
this.optionalFilter = optionalFilter;
130130
}
131131

132132
@Override
@@ -154,7 +154,7 @@ protected void performAsync(Page inputPage, ActionListener<OngoingJoin> listener
154154
new Page(inputBlockArray),
155155
loadFields,
156156
source,
157-
rightPreJoinPlan
157+
optionalFilter
158158
);
159159
lookupService.lookupAsync(
160160
request,
@@ -211,7 +211,7 @@ public String toString() {
211211
.append(" inputChannel=")
212212
.append(matchField.channel());
213213
}
214-
stringBuilder.append(" right_pre_join_plan=").append(rightPreJoinPlan);
214+
stringBuilder.append(" optional_filter=").append(optionalFilter);
215215
stringBuilder.append("]");
216216
return stringBuilder.toString();
217217
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,13 @@
3131
import org.elasticsearch.transport.TransportService;
3232
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
3333
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
34+
import org.elasticsearch.xpack.esql.core.expression.Expression;
3435
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
3536
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
3637
import org.elasticsearch.xpack.esql.core.tree.Source;
3738
import org.elasticsearch.xpack.esql.core.type.DataType;
3839
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
3940
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
40-
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
41-
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
4241

4342
import java.io.IOException;
4443
import java.util.ArrayList;
@@ -90,7 +89,7 @@ protected TransportRequest transportRequest(LookupFromIndexService.Request reque
9089
request.extractFields,
9190
request.matchFields,
9291
request.source,
93-
request.rightPreJoinPlan
92+
request.optionalFilter
9493
);
9594
}
9695

@@ -115,12 +114,10 @@ protected LookupEnrichQueryGenerator queryList(
115114
).onlySingleValues(warnings, "LOOKUP JOIN encountered multi-value");
116115
queryLists.add(q);
117116
}
118-
if (queryLists.size() == 1
119-
&& (request.rightPreJoinPlan == null
120-
|| request.rightPreJoinPlan instanceof EsQueryExec esQueryExec && esQueryExec.query() == null)) {
117+
if (queryLists.size() == 1 && request.optionalFilter == null) {
121118
return queryLists.getFirst();
122119
}
123-
return new ExpressionQueryList(queryLists, context, request.rightPreJoinPlan, clusterService);
120+
return new ExpressionQueryList(queryLists, context, request.optionalFilter, clusterService);
124121

125122
}
126123

@@ -136,7 +133,7 @@ protected AbstractLookupService.LookupResponse readLookupResponse(StreamInput in
136133

137134
public static class Request extends AbstractLookupService.Request {
138135
private final List<MatchConfig> matchFields;
139-
private final PhysicalPlan rightPreJoinPlan;
136+
private final Expression optionalFilter;
140137

141138
Request(
142139
String sessionId,
@@ -146,17 +143,17 @@ public static class Request extends AbstractLookupService.Request {
146143
Page inputPage,
147144
List<NamedExpression> extractFields,
148145
Source source,
149-
PhysicalPlan rightPreJoinPlan
146+
Expression optionalFilter
150147
) {
151148
super(sessionId, index, indexPattern, matchFields.get(0).type(), inputPage, extractFields, source);
152149
this.matchFields = matchFields;
153-
this.rightPreJoinPlan = rightPreJoinPlan;
150+
this.optionalFilter = optionalFilter;
154151
}
155152
}
156153

157154
protected static class TransportRequest extends AbstractLookupService.TransportRequest {
158155
private final List<MatchConfig> matchFields;
159-
private final PhysicalPlan rightPreJoinPlan;
156+
private final Expression optionalFilter;
160157

161158
// Right now we assume that the page contains the same number of blocks as matchFields and that the blocks are in the same order
162159
// The channel information inside the MatchConfig, should say the same thing
@@ -169,11 +166,11 @@ protected static class TransportRequest extends AbstractLookupService.TransportR
169166
List<NamedExpression> extractFields,
170167
List<MatchConfig> matchFields,
171168
Source source,
172-
PhysicalPlan rightPreJoinPlan
169+
Expression optionalFilter
173170
) {
174171
super(sessionId, shardId, indexPattern, inputPage, toRelease, extractFields, source);
175172
this.matchFields = matchFields;
176-
this.rightPreJoinPlan = rightPreJoinPlan;
173+
this.optionalFilter = optionalFilter;
177174
}
178175

179176
static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) throws IOException {
@@ -219,9 +216,9 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro
219216
String sourceText = in.readString();
220217
source = new Source(source.source(), sourceText);
221218
}
222-
PhysicalPlan rightPreJoinPlan = null;
219+
Expression optionalFilter = null;
223220
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER)) {
224-
rightPreJoinPlan = planIn.readOptionalNamedWriteable(PhysicalPlan.class);
221+
optionalFilter = planIn.readOptionalNamedWriteable(Expression.class);
225222
}
226223
TransportRequest result = new TransportRequest(
227224
sessionId,
@@ -232,7 +229,7 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro
232229
extractFields,
233230
matchFields,
234231
source,
235-
rightPreJoinPlan
232+
optionalFilter
236233
);
237234
result.setParentTask(parentTaskId);
238235
return result;
@@ -276,20 +273,17 @@ public void writeTo(StreamOutput out) throws IOException {
276273
out.writeString(source.text());
277274
}
278275
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER)) {
279-
planOut.writeOptionalNamedWriteable(rightPreJoinPlan);
276+
planOut.writeOptionalNamedWriteable(optionalFilter);
280277
}
281-
// JULIAN TODO: need a better way to indicate that the filter does not need to be applied here
282-
/*else if (rightPreJoinPlan != null) {
283-
throw new EsqlIllegalArgumentException("LOOKUP JOIN with pre-join filter is not supported on remote node");
284-
}*/
278+
// otherwise we will not send the optionalFilter, as it is optional that is OK
285279
}
286280

287281
@Override
288282
protected String extraDescription() {
289283
return " ,match_fields="
290284
+ matchFields.stream().map(x -> x.fieldName().string()).collect(Collectors.joining(", "))
291-
+ ", rightPreJoinPlan="
292-
+ rightPreJoinPlan;
285+
+ ", optional_filter="
286+
+ optionalFilter;
293287
}
294288
}
295289

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/PostJoinFilterable.java

Lines changed: 0 additions & 18 deletions
This file was deleted.

0 commit comments

Comments
 (0)