Skip to content

Commit 66c126f

Browse files
Fix a bug where a mix of pushable and non-pushable filters resulted in optimization not applied
1 parent 0e7b3ed commit 66c126f

File tree

14 files changed

+134
-117
lines changed

14 files changed

+134
-117
lines changed

test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -773,7 +773,11 @@ private Map<String, Object> lookupExplosion(int sensorDataCount, int lookupEntri
773773
query.append("id").append(i);
774774
}
775775
if (lookupEntries != lookupEntriesToKeep) {
776-
query.append(" | WHERE filter_key < ").append(lookupEntriesToKeep);
776+
// add a filter to reduce the number of matches
777+
// we add both a Lucine pushable filter and a non-pushable filter
778+
// this is to make sure that even if there are non-pushable filters the pushable filters is still applied
779+
query.append(" | WHERE ABS(filter_key) > -1 AND filter_key < ").append(lookupEntriesToKeep);
780+
777781
}
778782
query.append(" | STATS COUNT(location) | LIMIT 100\"}");
779783
return responseAsMap(query(query.toString(), null));

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -229,16 +229,17 @@ public void populate(int docCount, List<String> expected, Predicate<Integer> fil
229229
}
230230
}
231231

232-
private Expression buildGreaterThanFilter(long value) {
232+
private List<Expression> buildGreaterThanFilter(long value) {
233233
FieldAttribute filterAttribute = new FieldAttribute(
234234
Source.EMPTY,
235235
"l",
236236
new EsField("l", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE)
237237
);
238-
return new GreaterThan(Source.EMPTY, filterAttribute, new Literal(Source.EMPTY, value, DataType.LONG));
238+
Expression greaterThan = new GreaterThan(Source.EMPTY, filterAttribute, new Literal(Source.EMPTY, value, DataType.LONG));
239+
return List.of(greaterThan);
239240
}
240241

241-
private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices, Expression filter) throws IOException {
242+
private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices, List<Expression> filters) throws IOException {
242243
String[] fieldMappers = new String[keyTypes.size() * 2];
243244
for (int i = 0; i < keyTypes.size(); i++) {
244245
fieldMappers[2 * i] = "key" + i;
@@ -268,15 +269,16 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices,
268269
client().admin().cluster().prepareHealth(TEST_REQUEST_TIMEOUT).setWaitForGreenStatus().get();
269270

270271
Predicate<Integer> filterPredicate = l -> true;
271-
if (filter != null) {
272-
if (filter instanceof GreaterThan gt
272+
if (filters != null) {
273+
if (filters.size() == 1
274+
&& filters.get(0) instanceof GreaterThan gt
273275
&& gt.left() instanceof FieldAttribute fa
274276
&& fa.name().equals("l")
275277
&& gt.right() instanceof Literal lit) {
276278
long value = ((Number) lit.value()).longValue();
277279
filterPredicate = l -> l > value;
278280
} else {
279-
fail("Unsupported filter type in test baseline generation: " + filter);
281+
fail("Unsupported filter type in test baseline generation: " + filters);
280282
}
281283
}
282284

@@ -375,7 +377,7 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices,
375377
"lookup",
376378
List.of(new Alias(Source.EMPTY, "l", new ReferenceAttribute(Source.EMPTY, "l", DataType.LONG))),
377379
Source.EMPTY,
378-
filter
380+
filters
379381
);
380382
DriverContext driverContext = driverContext();
381383
try (

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -42,39 +42,42 @@ public class ExpressionQueryList implements LookupEnrichQueryGenerator {
4242
public ExpressionQueryList(
4343
List<QueryList> queryLists,
4444
SearchExecutionContext context,
45-
Expression optionalFilter,
45+
List<Expression> candidateRightHandFilters,
4646
ClusterService clusterService
4747
) {
48-
if (queryLists.size() < 2 && optionalFilter == null) {
48+
if (queryLists.size() < 2 && (candidateRightHandFilters == null || candidateRightHandFilters.isEmpty())) {
4949
throw new IllegalArgumentException("ExpressionQueryList must have at least two QueryLists");
5050
}
5151
this.queryLists = queryLists;
5252
this.context = context;
53-
buildPrePostJoinFilter(optionalFilter, clusterService);
53+
buildPrePostJoinFilter(candidateRightHandFilters, clusterService);
5454
}
5555

56-
private void buildPrePostJoinFilter(Expression optionalFilter, ClusterService clusterService) {
57-
try {
58-
// If the pre-join filter is a FilterExec, we can convert it to a QueryBuilder
59-
// try to convert it to a QueryBuilder, if not possible apply it after the join
60-
if (optionalFilter instanceof TranslationAware translationAware) {
61-
LucenePushdownPredicates lucenePushdownPredicates = LucenePushdownPredicates.from(
62-
SearchContextStats.from(List.of(context)),
63-
new EsqlFlags(clusterService.getClusterSettings())
64-
);
65-
if (TranslationAware.Translatable.YES.equals(translationAware.translatable(lucenePushdownPredicates))) {
66-
preJoinFilters.add(
67-
translationAware.asQuery(lucenePushdownPredicates, TRANSLATOR_HANDLER).toQueryBuilder().toQuery(context)
56+
private void buildPrePostJoinFilter(List<Expression> candidateRightHandFilters, ClusterService clusterService) {
57+
if (candidateRightHandFilters == null || candidateRightHandFilters.isEmpty()) {
58+
return; // no filters to apply
59+
}
60+
for (Expression filter : candidateRightHandFilters) {
61+
try {
62+
if (filter instanceof TranslationAware translationAware) {
63+
LucenePushdownPredicates lucenePushdownPredicates = LucenePushdownPredicates.from(
64+
SearchContextStats.from(List.of(context)),
65+
new EsqlFlags(clusterService.getClusterSettings())
6866
);
67+
if (TranslationAware.Translatable.YES.equals(translationAware.translatable(lucenePushdownPredicates))) {
68+
preJoinFilters.add(
69+
translationAware.asQuery(lucenePushdownPredicates, TRANSLATOR_HANDLER).toQueryBuilder().toQuery(context)
70+
);
71+
}
6972
}
73+
// If the filter is not translatable we will not apply it for now
74+
// as performance testing showed no performance improvement.
75+
// We can revisit this in the future if needed, once we have more optimized workflow in place.
76+
// The filter is optional, so it is OK to ignore it if it cannot be translated.
77+
} catch (IOException e) {
78+
// as the filter is optional an error in its application will be ignored
79+
logger.error(() -> "Failed to translate optional pre-join filter: [" + filter + "]", e);
7080
}
71-
// If the filter is not translatable we will not apply it for now
72-
// as performance testing showed no performance improvement.
73-
// We can revisit this in the future if needed, once we have more optimized workflow in place.
74-
// The filter is optional, so it is OK to ignore it if it cannot be translated.
75-
} catch (IOException e) {
76-
// as the filter is optional an error in its application will be ignored
77-
logger.error(() -> "Failed to translate optional pre-join filter: [" + optionalFilter + "]", e);
7881
}
7982
}
8083

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.Objects;
3535
import java.util.Optional;
3636
import java.util.function.Function;
37+
import java.util.stream.Collectors;
3738

3839
// TODO rename package
3940
public final class LookupFromIndexOperator extends AsyncOperator<LookupFromIndexOperator.OngoingJoin> {
@@ -48,7 +49,7 @@ public record Factory(
4849
String lookupIndex,
4950
List<NamedExpression> loadFields,
5051
Source source,
51-
Expression optionalFilter
52+
List<Expression> candidateRightHandFilters
5253
) implements OperatorFactory {
5354
@Override
5455
public String describe() {
@@ -62,7 +63,8 @@ public String describe() {
6263
.append(" inputChannel=")
6364
.append(matchField.channel());
6465
}
65-
stringBuilder.append(" optional_filter=").append(optionalFilter);
66+
stringBuilder.append(" optional_filter=")
67+
.append(candidateRightHandFilters.stream().map(Expression::toString).collect(Collectors.joining(", ")));
6668
stringBuilder.append("]");
6769
return stringBuilder.toString();
6870
}
@@ -80,7 +82,7 @@ public Operator get(DriverContext driverContext) {
8082
lookupIndex,
8183
loadFields,
8284
source,
83-
optionalFilter
85+
candidateRightHandFilters
8486
);
8587
}
8688
}
@@ -94,7 +96,7 @@ public Operator get(DriverContext driverContext) {
9496
private final Source source;
9597
private long totalRows = 0L;
9698
private final List<MatchConfig> matchFields;
97-
private final Expression optionalFilter;
99+
private final List<Expression> candidateRightHandFilters;
98100
/**
99101
* Total number of pages emitted by this {@link Operator}.
100102
*/
@@ -115,7 +117,7 @@ public LookupFromIndexOperator(
115117
String lookupIndex,
116118
List<NamedExpression> loadFields,
117119
Source source,
118-
Expression optionalFilter
120+
List<Expression> candidateRightHandFilters
119121
) {
120122
super(driverContext, lookupService.getThreadContext(), maxOutstandingRequests);
121123
this.matchFields = matchFields;
@@ -126,7 +128,7 @@ public LookupFromIndexOperator(
126128
this.lookupIndex = lookupIndex;
127129
this.loadFields = loadFields;
128130
this.source = source;
129-
this.optionalFilter = optionalFilter;
131+
this.candidateRightHandFilters = candidateRightHandFilters != null ? candidateRightHandFilters : new ArrayList<>();
130132
}
131133

132134
@Override
@@ -154,7 +156,7 @@ protected void performAsync(Page inputPage, ActionListener<OngoingJoin> listener
154156
new Page(inputBlockArray),
155157
loadFields,
156158
source,
157-
optionalFilter
159+
candidateRightHandFilters
158160
);
159161
lookupService.lookupAsync(
160162
request,
@@ -211,7 +213,8 @@ public String toString() {
211213
.append(" inputChannel=")
212214
.append(matchField.channel());
213215
}
214-
stringBuilder.append(" optional_filter=").append(optionalFilter);
216+
stringBuilder.append(" optional_filter=")
217+
.append(candidateRightHandFilters.stream().map(Expression::toString).collect(Collectors.joining(", ")));
215218
stringBuilder.append("]");
216219
return stringBuilder.toString();
217220
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ protected TransportRequest transportRequest(LookupFromIndexService.Request reque
8989
request.extractFields,
9090
request.matchFields,
9191
request.source,
92-
request.optionalFilter
92+
request.candidateRightHandFilters
9393
);
9494
}
9595

@@ -113,10 +113,10 @@ protected LookupEnrichQueryGenerator queryList(
113113
).onlySingleValues(warnings, "LOOKUP JOIN encountered multi-value");
114114
queryLists.add(q);
115115
}
116-
if (queryLists.size() == 1 && request.optionalFilter == null) {
116+
if (queryLists.size() == 1 && (request.candidateRightHandFilters == null || request.candidateRightHandFilters.isEmpty())) {
117117
return queryLists.getFirst();
118118
}
119-
return new ExpressionQueryList(queryLists, context, request.optionalFilter, clusterService);
119+
return new ExpressionQueryList(queryLists, context, request.candidateRightHandFilters, clusterService);
120120

121121
}
122122

@@ -132,7 +132,7 @@ protected AbstractLookupService.LookupResponse readLookupResponse(StreamInput in
132132

133133
public static class Request extends AbstractLookupService.Request {
134134
private final List<MatchConfig> matchFields;
135-
private final Expression optionalFilter;
135+
private final List<Expression> candidateRightHandFilters;
136136

137137
Request(
138138
String sessionId,
@@ -142,17 +142,18 @@ public static class Request extends AbstractLookupService.Request {
142142
Page inputPage,
143143
List<NamedExpression> extractFields,
144144
Source source,
145-
Expression optionalFilter
145+
List<Expression> candidateRightHandFilters
146146
) {
147147
super(sessionId, index, indexPattern, matchFields.get(0).type(), inputPage, extractFields, source);
148148
this.matchFields = matchFields;
149-
this.optionalFilter = optionalFilter;
149+
this.candidateRightHandFilters = candidateRightHandFilters != null ? candidateRightHandFilters : new ArrayList<>();
150+
;
150151
}
151152
}
152153

153154
protected static class TransportRequest extends AbstractLookupService.TransportRequest {
154155
private final List<MatchConfig> matchFields;
155-
private final Expression optionalFilter;
156+
private final List<Expression> candidateRightHandFilters;
156157

157158
// Right now we assume that the page contains the same number of blocks as matchFields and that the blocks are in the same order
158159
// The channel information inside the MatchConfig, should say the same thing
@@ -165,11 +166,11 @@ protected static class TransportRequest extends AbstractLookupService.TransportR
165166
List<NamedExpression> extractFields,
166167
List<MatchConfig> matchFields,
167168
Source source,
168-
Expression optionalFilter
169+
List<Expression> candidateRightHandFilters
169170
) {
170171
super(sessionId, shardId, indexPattern, inputPage, toRelease, extractFields, source);
171172
this.matchFields = matchFields;
172-
this.optionalFilter = optionalFilter;
173+
this.candidateRightHandFilters = candidateRightHandFilters != null ? candidateRightHandFilters : new ArrayList<>();
173174
}
174175

175176
static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) throws IOException {
@@ -215,9 +216,11 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro
215216
String sourceText = in.readString();
216217
source = new Source(source.source(), sourceText);
217218
}
218-
Expression optionalFilter = null;
219+
List<Expression> candidateRightHandFilters = new ArrayList<>();
219220
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER)) {
220-
optionalFilter = planIn.readOptionalNamedWriteable(Expression.class);
221+
candidateRightHandFilters = planIn.readNamedWriteableCollectionAsList(Expression.class);
222+
} else {
223+
candidateRightHandFilters = new ArrayList<>();
221224
}
222225
TransportRequest result = new TransportRequest(
223226
sessionId,
@@ -228,7 +231,7 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro
228231
extractFields,
229232
matchFields,
230233
source,
231-
optionalFilter
234+
candidateRightHandFilters
232235
);
233236
result.setParentTask(parentTaskId);
234237
return result;
@@ -272,17 +275,17 @@ public void writeTo(StreamOutput out) throws IOException {
272275
out.writeString(source.text());
273276
}
274277
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER)) {
275-
planOut.writeOptionalNamedWriteable(optionalFilter);
278+
planOut.writeNamedWriteableCollection(candidateRightHandFilters);
276279
}
277-
// otherwise we will not send the optionalFilter, as it is optional that is OK
280+
// otherwise we will not send the candidateRightHandFilters, as it is optional that is OK
278281
}
279282

280283
@Override
281284
protected String extraDescription() {
282285
return " ,match_fields="
283286
+ matchFields.stream().map(x -> x.fieldName().string()).collect(Collectors.joining(", "))
284287
+ ", optional_filter="
285-
+ optionalFilter;
288+
+ candidateRightHandFilters.stream().map(Expression::toString).collect(Collectors.joining(", "));
286289
}
287290
}
288291

0 commit comments

Comments
 (0)