Skip to content

Commit 70686e0

Browse files
Physical Planning on the Lookup Node
1 parent abcba1e commit 70686e0

File tree

22 files changed

+1187
-263
lines changed

22 files changed

+1187
-263
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9305000
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
inference_azure_openai_task_settings_headers,9304000
1+
esql_lookup_planning,9305000

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.elasticsearch.tasks.CancellableTask;
5353
import org.elasticsearch.test.ESTestCase;
5454
import org.elasticsearch.threadpool.ThreadPool;
55+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
5556
import org.elasticsearch.xpack.esql.core.expression.Expression;
5657
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
5758
import org.elasticsearch.xpack.esql.core.expression.Literal;
@@ -413,7 +414,8 @@ private void runLookup(List<DataType> keyTypes, PopulateIndices populateIndices,
413414
Predicates.combineAnd(joinOnConditions),
414415
true, // useStreamingOperator
415416
QueryPragmas.EXCHANGE_BUFFER_SIZE.getDefault(Settings.EMPTY),
416-
false // profile
417+
false, // profile
418+
EsqlTestUtils.TEST_CFG
417419
);
418420
DriverContext driverContext = driverContext();
419421
try (

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java

Lines changed: 39 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.compute.operator.Warnings;
1717
import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator;
1818
import org.elasticsearch.compute.operator.lookup.QueryList;
19+
import org.elasticsearch.core.Nullable;
1920
import org.elasticsearch.index.mapper.MappedFieldType;
2021
import org.elasticsearch.index.query.QueryBuilder;
2122
import org.elasticsearch.index.query.Rewriteable;
@@ -29,9 +30,6 @@
2930
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.Equals;
3031
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.EsqlBinaryComparison;
3132
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.LucenePushdownPredicates;
32-
import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
33-
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
34-
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
3533
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
3634
import org.elasticsearch.xpack.esql.plugin.EsqlFlags;
3735
import org.elasticsearch.xpack.esql.stats.SearchContextStats;
@@ -54,7 +52,7 @@
5452
* The query is then used to fetch the matching rows from the right dataset.
5553
* The class supports two types of joins:
5654
* 1. Field-based join: The join conditions are based on the equality of fields from the left and right datasets.
57-
* It is used for field-based join when the join is on more than one field or there is a preJoinFilter
55+
* It is used for field-based join when the join is on more than one field or there is a rightOnlyFilter
5856
* 2. Expression-based join: The join conditions are based on a complex expression that can involve multiple fields and operators.
5957
*/
6058
public class ExpressionQueryList implements LookupEnrichQueryGenerator {
@@ -66,18 +64,22 @@ public class ExpressionQueryList implements LookupEnrichQueryGenerator {
6664
private ExpressionQueryList(
6765
List<QueryList> queryLists,
6866
SearchExecutionContext context,
69-
PhysicalPlan rightPreJoinPlan,
67+
@Nullable Expression rightOnlyFilter,
68+
@Nullable QueryBuilder pushedQuery,
7069
ClusterService clusterService,
7170
AliasFilter aliasFilter
7271
) {
7372
this.queryLists = new ArrayList<>(queryLists);
7473
this.aliasFilter = aliasFilter;
7574
this.clusterService = clusterService;
75+
if (pushedQuery != null) {
76+
lucenePushableFilterBuilders.add(pushedQuery);
77+
}
7678
LucenePushdownPredicates lucenePushdownPredicates = LucenePushdownPredicates.from(
7779
SearchContextStats.from(List.of(context)),
7880
new EsqlFlags(clusterService.getClusterSettings())
7981
);
80-
buildPreJoinFilter(rightPreJoinPlan, context, lucenePushdownPredicates);
82+
buildPreJoinFilter(rightOnlyFilter, context, lucenePushdownPredicates);
8183
}
8284

8385
/**
@@ -90,14 +92,15 @@ private ExpressionQueryList(
9092
public static ExpressionQueryList fieldBasedJoin(
9193
List<QueryList> queryLists,
9294
SearchExecutionContext context,
93-
PhysicalPlan rightPreJoinPlan,
95+
@Nullable Expression rightOnlyFilter,
96+
@Nullable QueryBuilder pushedQuery,
9497
ClusterService clusterService,
9598
AliasFilter aliasFilter
9699
) {
97-
if (queryLists.size() < 2 && (rightPreJoinPlan instanceof FilterExec == false)) {
100+
if (queryLists.size() < 2 && rightOnlyFilter == null && pushedQuery == null) {
98101
throw new IllegalArgumentException("ExpressionQueryList must have at least two QueryLists or a pre-join filter");
99102
}
100-
return new ExpressionQueryList(queryLists, context, rightPreJoinPlan, clusterService, aliasFilter);
103+
return new ExpressionQueryList(queryLists, context, rightOnlyFilter, pushedQuery, clusterService, aliasFilter);
101104
}
102105

103106
/**
@@ -110,37 +113,33 @@ public static ExpressionQueryList fieldBasedJoin(
110113
*/
111114
public static ExpressionQueryList expressionBasedJoin(
112115
SearchExecutionContext context,
113-
PhysicalPlan rightPreJoinPlan,
116+
@Nullable Expression rightOnlyFilter,
117+
@Nullable QueryBuilder pushedQuery,
114118
ClusterService clusterService,
115-
LookupFromIndexService.TransportRequest request,
119+
List<MatchConfig> matchFields,
120+
Expression joinOnConditions,
116121
AliasFilter aliasFilter,
117122
Warnings warnings
118123
) {
119124
if (LOOKUP_JOIN_ON_BOOLEAN_EXPRESSION.isEnabled() == false) {
120125
throw new UnsupportedOperationException("Lookup Join on Boolean Expression capability is not enabled");
121126
}
122-
if (request.getJoinOnConditions() == null) {
127+
if (joinOnConditions == null) {
123128
throw new IllegalStateException("expressionBasedJoin must have join conditions");
124129
}
125130
ExpressionQueryList expressionQueryList = new ExpressionQueryList(
126131
new ArrayList<>(),
127132
context,
128-
rightPreJoinPlan,
133+
rightOnlyFilter,
134+
pushedQuery,
129135
clusterService,
130136
aliasFilter
131137
);
132-
// Build join-on conditions using the context from planning (this is safe as conditions are static)
133138
LucenePushdownPredicates lucenePushdownPredicates = LucenePushdownPredicates.from(
134139
SearchContextStats.from(List.of(context)),
135140
new EsqlFlags(clusterService.getClusterSettings())
136141
);
137-
expressionQueryList.buildJoinOnForExpressionJoin(
138-
request.getJoinOnConditions(),
139-
request.getMatchFields(),
140-
context,
141-
lucenePushdownPredicates,
142-
warnings
143-
);
142+
expressionQueryList.buildJoinOnForExpressionJoin(joinOnConditions, matchFields, context, lucenePushdownPredicates, warnings);
144143
return expressionQueryList;
145144
}
146145

@@ -242,37 +241,31 @@ private void addToLucenePushableFilters(QueryBuilder queryBuilder) {
242241
}
243242

244243
private void buildPreJoinFilter(
245-
PhysicalPlan rightPreJoinPlan,
244+
@Nullable Expression rightOnlyFilter,
246245
SearchExecutionContext context,
247246
LucenePushdownPredicates lucenePushdownPredicates
248247
) {
249-
if (rightPreJoinPlan instanceof FilterExec filterExec) {
250-
List<Expression> candidateRightHandFilters = Predicates.splitAnd(filterExec.condition());
251-
for (Expression filter : candidateRightHandFilters) {
252-
if (filter instanceof TranslationAware translationAware) {
253-
if (TranslationAware.Translatable.YES.equals(translationAware.translatable(lucenePushdownPredicates))) {
254-
QueryBuilder queryBuilder = translationAware.asQuery(lucenePushdownPredicates, TRANSLATOR_HANDLER).toQueryBuilder();
255-
// Rewrite the query builder to ensure doIndexMetadataRewrite is called
256-
// Some functions, such as KQL require rewriting to work properly
257-
try {
258-
queryBuilder = Rewriteable.rewrite(queryBuilder, context, true);
259-
} catch (IOException e) {
260-
throw new UncheckedIOException("Error while rewriting query for Lucene pushable filter", e);
261-
}
262-
// Store QueryBuilder instead of Query to avoid caching IndexReader references
263-
addToLucenePushableFilters(queryBuilder);
248+
if (rightOnlyFilter == null) {
249+
return;
250+
}
251+
List<Expression> candidateRightHandFilters = Predicates.splitAnd(rightOnlyFilter);
252+
for (Expression filter : candidateRightHandFilters) {
253+
if (filter instanceof TranslationAware translationAware) {
254+
if (TranslationAware.Translatable.YES.equals(translationAware.translatable(lucenePushdownPredicates))) {
255+
QueryBuilder queryBuilder = translationAware.asQuery(lucenePushdownPredicates, TRANSLATOR_HANDLER).toQueryBuilder();
256+
try {
257+
queryBuilder = Rewriteable.rewrite(queryBuilder, context, true);
258+
} catch (IOException e) {
259+
throw new UncheckedIOException("Error while rewriting query for Lucene pushable filter", e);
264260
}
261+
// Store QueryBuilder instead of Query to avoid caching IndexReader references
262+
addToLucenePushableFilters(queryBuilder);
265263
}
266-
// If the filter is not translatable we will not apply it for now
267-
// as performance testing showed no performance improvement.
268-
// We can revisit this in the future if needed, once we have more optimized workflow in place.
269-
// The filter is optional, so it is OK to ignore it if it cannot be translated.
270264
}
271-
} else if (rightPreJoinPlan != null && rightPreJoinPlan instanceof EsSourceExec == false) {
272-
throw new IllegalStateException(
273-
"The right side of a LookupJoinExec can only be a FilterExec on top of an EsSourceExec or an EsSourceExec, but got: "
274-
+ rightPreJoinPlan
275-
);
265+
// If the filter is not translatable we will not apply it for now
266+
// as performance testing showed no performance improvement.
267+
// We can revisit this in the future if needed, once we have more optimized workflow in place.
268+
// The filter is optional, so it is OK to ignore it if it cannot be translated.
276269
}
277270
}
278271

0 commit comments

Comments
 (0)