Skip to content

Commit 63018a7

Browse files
Switch to passing local logical plan to lookup node
1 parent c15df0a commit 63018a7

File tree

7 files changed

+127
-64
lines changed

7 files changed

+127
-64
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,7 @@ public ExpressionQueryList(
5151
PhysicalPlan rightPreJoinPlan,
5252
ClusterService clusterService
5353
) {
54-
if (queryLists.size() < 2
55-
&& (rightPreJoinPlan == null || rightPreJoinPlan instanceof EsQueryExec esQueryExec && esQueryExec.query() == null)) {
54+
if (queryLists.size() < 2 && (rightPreJoinPlan instanceof FilterExec == false)) {
5655
throw new IllegalArgumentException("ExpressionQueryList must have at least two QueryLists or a pre-join filter");
5756
}
5857
this.queryLists = queryLists;
@@ -73,6 +72,8 @@ private void addToPreJoinFilters(org.elasticsearch.index.query.QueryBuilder quer
7372

7473
private void buildPrePostJoinFilter(PhysicalPlan rightPreJoinPlan, ClusterService clusterService) {
7574
if (rightPreJoinPlan instanceof EsQueryExec esQueryExec) {
75+
// this does not happen right now, as we only do local mapping on the lookup node
76+
// so we have EsSourceExec, not esQueryExec
7677
if (esQueryExec.query() != null) {
7778
addToPreJoinFilters(esQueryExec.query());
7879
}
@@ -97,7 +98,7 @@ private void buildPrePostJoinFilter(PhysicalPlan rightPreJoinPlan, ClusterServic
9798
// either in another FilterExec or in an EsQueryExec
9899
buildPrePostJoinFilter(filterExec.child(), clusterService);
99100
} else if (rightPreJoinPlan instanceof UnaryExec unaryExec) {
100-
// there can be other nodes in the plan such as FieldExtractExec
101+
// there can be other nodes in the plan such as FieldExtractExec in the future
101102
buildPrePostJoinFilter(unaryExec.child(), clusterService);
102103
}
103104
// else we do nothing, as the filters are optional and we don't want to fail the query if there are any errors

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
package org.elasticsearch.xpack.esql.enrich;
99

10+
import org.apache.logging.log4j.LogManager;
11+
import org.apache.logging.log4j.Logger;
1012
import org.elasticsearch.TransportVersions;
1113
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1214
import org.elasticsearch.cluster.project.ProjectResolver;
@@ -37,8 +39,10 @@
3739
import org.elasticsearch.xpack.esql.core.type.DataType;
3840
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
3941
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
40-
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
42+
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
43+
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
4144
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
45+
import org.elasticsearch.xpack.esql.planner.mapper.LocalMapper;
4246

4347
import java.io.IOException;
4448
import java.util.ArrayList;
@@ -53,6 +57,7 @@
5357
*/
5458
public class LookupFromIndexService extends AbstractLookupService<LookupFromIndexService.Request, LookupFromIndexService.TransportRequest> {
5559
public static final String LOOKUP_ACTION_NAME = EsqlQueryAction.NAME + "/lookup_from_index";
60+
private static final Logger logger = LogManager.getLogger(LookupFromIndexService.class);
5661

5762
public LookupFromIndexService(
5863
ClusterService clusterService,
@@ -114,13 +119,32 @@ protected LookupEnrichQueryGenerator queryList(
114119
).onlySingleValues(warnings, "LOOKUP JOIN encountered multi-value");
115120
queryLists.add(q);
116121
}
117-
if (queryLists.size() == 1
118-
&& (request.rightPreJoinPlan == null
119-
|| request.rightPreJoinPlan instanceof EsQueryExec esQueryExec && esQueryExec.query() == null)) {
122+
123+
PhysicalPlan physicalPlan = request.rightPreJoinPlan;
124+
physicalPlan = localLookupNodePlanning(physicalPlan);
125+
if (queryLists.size() == 1 && (physicalPlan instanceof FilterExec == false)) {
120126
return queryLists.getFirst();
121127
}
122-
return new ExpressionQueryList(queryLists, context, request.rightPreJoinPlan, clusterService);
128+
return new ExpressionQueryList(queryLists, context, physicalPlan, clusterService);
129+
130+
}
123131

132+
/**
133+
* This function will perform any planning needed on the local node
134+
* For now, we will just do mapping of the logical plan to physical plan
135+
* In the future we can also do local physical and logical optimizations
136+
*/
137+
private PhysicalPlan localLookupNodePlanning(PhysicalPlan physicalPlan) {
138+
if (physicalPlan instanceof FragmentExec fragmentExec) {
139+
try {
140+
LocalMapper localMapper = new LocalMapper();
141+
return localMapper.map(fragmentExec.fragment());
142+
} catch (Exception e) {
143+
logger.error(() -> "Failed to perform local mapping on the lookup node for plan: [" + physicalPlan + "]", e);
144+
return null;
145+
}
146+
}
147+
return null;
124148
}
125149

126150
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,10 @@
9292
import org.elasticsearch.xpack.esql.inference.XContentRowEncoder;
9393
import org.elasticsearch.xpack.esql.inference.completion.CompletionOperator;
9494
import org.elasticsearch.xpack.esql.inference.rerank.RerankOperator;
95+
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
9596
import org.elasticsearch.xpack.esql.plan.logical.Fork;
97+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
98+
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
9699
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
97100
import org.elasticsearch.xpack.esql.plan.physical.ChangePointExec;
98101
import org.elasticsearch.xpack.esql.plan.physical.DissectExec;
@@ -105,6 +108,7 @@
105108
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec;
106109
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
107110
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
111+
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
108112
import org.elasticsearch.xpack.esql.plan.physical.GrokExec;
109113
import org.elasticsearch.xpack.esql.plan.physical.HashJoinExec;
110114
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
@@ -121,7 +125,6 @@
121125
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
122126
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
123127
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
124-
import org.elasticsearch.xpack.esql.plan.physical.UnaryExec;
125128
import org.elasticsearch.xpack.esql.plan.physical.inference.CompletionExec;
126129
import org.elasticsearch.xpack.esql.plan.physical.inference.RerankExec;
127130
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders.ShardContext;
@@ -741,19 +744,19 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan
741744
}
742745
Layout layout = layoutBuilder.build();
743746

744-
EsQueryExec localSourceExec = fildEsQueryExec(join.lookup());
745-
if (localSourceExec == null || localSourceExec.indexMode() != IndexMode.LOOKUP) {
747+
EsRelation esRelation = fildEsRelation(join.lookup());
748+
if (esRelation == null || esRelation.indexMode() != IndexMode.LOOKUP) {
746749
throw new IllegalArgumentException("can't plan [" + join + "]");
747750
}
748751

749752
// After enabling remote joins, we can have one of the two situations here:
750753
// 1. We've just got one entry - this should be the one relevant to the join, and it should be for this cluster
751754
// 2. We have got multiple entries - this means each cluster has its own one, and we should extract one relevant for this cluster
752755
Map.Entry<String, IndexMode> entry;
753-
if (localSourceExec.indexNameWithModes().size() == 1) {
754-
entry = localSourceExec.indexNameWithModes().entrySet().iterator().next();
756+
if (esRelation.indexNameWithModes().size() == 1) {
757+
entry = esRelation.indexNameWithModes().entrySet().iterator().next();
755758
} else {
756-
var maybeEntry = localSourceExec.indexNameWithModes()
759+
var maybeEntry = esRelation.indexNameWithModes()
757760
.entrySet()
758761
.stream()
759762
.filter(e -> RemoteClusterAware.parseClusterAlias(e.getKey()).equals(clusterAlias))
@@ -796,7 +799,7 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan
796799
parentTask,
797800
context.queryPragmas().enrichMaxWorkers(),
798801
ctx -> lookupFromIndexService,
799-
localSourceExec.indexPattern(),
802+
esRelation.indexPattern(),
800803
indexName,
801804
join.addedFields().stream().map(f -> (NamedExpression) f).toList(),
802805
join.source(),
@@ -806,11 +809,18 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan
806809
);
807810
}
808811

809-
private EsQueryExec fildEsQueryExec(PhysicalPlan lookup) {
810-
if (lookup instanceof EsQueryExec esQueryExec) {
811-
return esQueryExec;
812-
} else if (lookup instanceof UnaryExec unaryExec) {
813-
return fildEsQueryExec(unaryExec.child());
812+
private EsRelation fildEsRelation(PhysicalPlan node) {
813+
if (node instanceof FragmentExec fragmentExec) {
814+
return fildEsRelation(fragmentExec.fragment());
815+
}
816+
return null;
817+
}
818+
819+
private EsRelation fildEsRelation(LogicalPlan node) {
820+
if (node instanceof EsRelation esRelation) {
821+
return esRelation;
822+
} else if (node instanceof UnaryPlan unaryPlan) {
823+
return fildEsRelation(unaryPlan.child());
814824
}
815825
return null;
816826
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
5050
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec;
5151
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
52+
import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec;
5253
import org.elasticsearch.xpack.esql.plan.physical.MergeExec;
5354
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
5455
import org.elasticsearch.xpack.esql.planner.mapper.LocalMapper;
@@ -208,8 +209,16 @@ public static PhysicalPlan localPlan(
208209
) {
209210
final LocalMapper localMapper = new LocalMapper();
210211
var isCoordPlan = new Holder<>(Boolean.TRUE);
212+
List<PhysicalPlan> lookupJoinExecRightChildren = plan.collect(LookupJoinExec.class::isInstance)
213+
.stream()
214+
.map(x -> ((LookupJoinExec) x).right())
215+
.toList();
211216

212217
var localPhysicalPlan = plan.transformUp(FragmentExec.class, f -> {
218+
if (lookupJoinExecRightChildren.contains(f)) {
219+
// do not optimize the right child of a lookup join exec
220+
return f;
221+
}
213222
isCoordPlan.set(Boolean.FALSE);
214223
var optimizedFragment = logicalOptimizer.localOptimize(f.fragment());
215224
var physicalFragment = localMapper.map(optimizedFragment);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
2525
import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
2626
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
27+
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
2728
import org.elasticsearch.xpack.esql.plan.physical.HashJoinExec;
2829
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
2930
import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
@@ -112,16 +113,24 @@ private PhysicalPlan mapBinary(BinaryPlan binary) {
112113
join.rightOutputFields()
113114
);
114115
}
115-
if (right instanceof FilterExec filterExec) {
116-
LookupJoinExec lookupJoinExec = getLookupJoinExec(join, filterExec.child(), left, config);
117-
if (lookupJoinExec != null) {
118-
// build the right child as a FilterExec with the original lookupJoinExec.right() as the child
119-
FilterExec newRightChild = filterExec.replaceChild(lookupJoinExec.right());
120-
return lookupJoinExec.replaceChildren(lookupJoinExec.left(), newRightChild);
121-
}
122-
}
123116
LookupJoinExec lookupJoinExec = getLookupJoinExec(join, right, left, config);
124-
if (lookupJoinExec != null) return lookupJoinExec;
117+
if (lookupJoinExec == null && right instanceof FilterExec filterExec) {
118+
lookupJoinExec = getLookupJoinExec(join, filterExec.child(), left, config);
119+
}
120+
if (lookupJoinExec != null) {
121+
// we want to do local physical planning on the lookup node eventually for the right side of the lookup join
122+
// so here we will wrap the logical plan with a FragmentExec and keep it as is
123+
FragmentExec fragmentExec = new FragmentExec(binary.right());
124+
return new LookupJoinExec(
125+
join.source(),
126+
left,
127+
fragmentExec,
128+
config.leftFields(),
129+
config.rightFields(),
130+
join.rightOutputFields()
131+
);
132+
}
133+
return MapperUtils.unsupported(binary);
125134
}
126135

127136
return MapperUtils.unsupported(binary);

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,9 @@
7171
import org.elasticsearch.xpack.esql.core.type.DataType;
7272
import org.elasticsearch.xpack.esql.core.type.EsField;
7373
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.LessThan;
74-
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
75-
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
74+
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
75+
import org.elasticsearch.xpack.esql.plan.logical.Filter;
76+
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
7677
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
7778
import org.elasticsearch.xpack.esql.plugin.EsqlFlags;
7879
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
@@ -192,7 +193,7 @@ protected Operator.OperatorFactory simple(SimpleOptions options) {
192193
);
193194
}
194195

195-
private FilterExec buildLessThanFilter(int value) {
196+
private FragmentExec buildLessThanFilter(int value) {
196197
FieldAttribute filterAttribute = new FieldAttribute(
197198
Source.EMPTY,
198199
"lint",
@@ -203,19 +204,9 @@ private FilterExec buildLessThanFilter(int value) {
203204
filterAttribute,
204205
new Literal(Source.EMPTY, value, DataType.INTEGER)
205206
);
206-
EsQueryExec queryExec = new EsQueryExec(
207-
Source.EMPTY,
208-
"test",
209-
IndexMode.LOOKUP,
210-
Map.of(),
211-
List.of(),
212-
null,
213-
List.of(),
214-
null,
215-
List.of()
216-
);
217-
FilterExec filterExec = new FilterExec(Source.EMPTY, queryExec, lessThan);
218-
return filterExec;
207+
EsRelation esRelation = new EsRelation(Source.EMPTY, "test", IndexMode.LOOKUP, Map.of(), List.of());
208+
Filter filter = new Filter(Source.EMPTY, esRelation, lessThan);
209+
return new FragmentExec(filter);
219210
}
220211

221212
@Override
@@ -242,12 +233,22 @@ protected Matcher<String> expectedToStringOfSimple() {
242233
for (int i = 0; i < numberOfJoinColumns; i++) {
243234
sb.append("input_type=LONG match_field=match").append(i).append(" inputChannel=").append(i).append(" ");
244235
}
245-
sb.append("right_pre_join_plan=FilterExec\\[lint\\{f}#\\d+ < ")
236+
// Accept either the legacy physical plan rendering (FilterExec/EsQueryExec) or the new FragmentExec rendering
237+
sb.append("right_pre_join_plan=(?:");
238+
// Legacy pattern
239+
sb.append("FilterExec\\[lint\\{f}#\\d+ < ")
246240
.append(LESS_THAN_VALUE)
247241
.append(
248-
"\\[INTEGER]]\\n\\\\_EsQueryExec\\[test], indexMode\\[lookup],\\s*(?:query\\[\\]|\\[\\])?,?\\s*limit\\[\\],"
249-
+ "?\\s*sort\\[(?:\\[\\])?\\]\\s*estimatedRowSize\\[null\\]\\s*queryBuilderAndTags \\[\\[\\]\\]\\]"
242+
"\\[INTEGER]]\\n\\\\_EsQueryExec\\[test], indexMode\\[lookup],\\s*(?:query\\[\\]|\\[\\])?,?\\s*limit\\[\\],?\\s*sort\\[(?:\\[\\])?\\]\\s*estimatedRowSize\\[null\\]\\s*queryBuilderAndTags \\[(?:\\[\\]\\])\\]"
250243
);
244+
sb.append("|");
245+
// New FragmentExec pattern
246+
sb.append("FragmentExec\\[filter=null, estimatedRowSize=\\d+, reducer=\\[\\], fragment=\\[<>\\n")
247+
.append("Filter\\[lint\\{f}#\\d+ < ")
248+
.append(LESS_THAN_VALUE)
249+
.append("\\[INTEGER]]\\n")
250+
.append("\\\\_EsRelation\\[test]\\[LOOKUP]\\[\\]<>\\]\\]\\]");
251+
sb.append(")");
251252
return matchesPattern(sb.toString());
252253
}
253254

0 commit comments

Comments
 (0)