Skip to content

Commit f7ff90e

Browse files
Address code review comments
1 parent f5cb543 commit f7ff90e

File tree

8 files changed

+91
-70
lines changed

8 files changed

+91
-70
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,9 @@
2020
import org.elasticsearch.xpack.esql.core.expression.Expression;
2121
import org.elasticsearch.xpack.esql.expression.predicate.Predicates;
2222
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.LucenePushdownPredicates;
23-
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
23+
import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
2424
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
2525
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
26-
import org.elasticsearch.xpack.esql.plan.physical.UnaryExec;
2726
import org.elasticsearch.xpack.esql.plugin.EsqlFlags;
2827
import org.elasticsearch.xpack.esql.stats.SearchContextStats;
2928

@@ -34,7 +33,7 @@
3433
import static org.elasticsearch.xpack.esql.planner.TranslatorHandler.TRANSLATOR_HANDLER;
3534

3635
/**
37-
* A {@link LookupEnrichQueryGenerator} that combines one or more {@link QueryList}s into a single query.
36+
* A {@link LookupEnrichQueryGenerator} that combines multiple conditions into a single query list.
3837
* Each query in the resulting query will be a conjunction of all queries from the input lists at the same position.
3938
* In addition, we support an optional pre-join filter that will be applied to all queries if it is pushable.
4039
* If the pre-join filter cannot be pushed down to Lucene, it will be ignored.
@@ -71,13 +70,7 @@ private void addToPreJoinFilters(org.elasticsearch.index.query.QueryBuilder quer
7170
}
7271

7372
private void buildPreJoinFilter(PhysicalPlan rightPreJoinPlan, ClusterService clusterService) {
74-
if (rightPreJoinPlan instanceof EsQueryExec esQueryExec) {
75-
// this does not happen right now, as we only do local mapping on the lookup node
76-
// so we have EsSourceExec, not esQueryExec
77-
if (esQueryExec.query() != null) {
78-
addToPreJoinFilters(esQueryExec.query());
79-
}
80-
} else if (rightPreJoinPlan instanceof FilterExec filterExec) {
73+
if (rightPreJoinPlan instanceof FilterExec filterExec) {
8174
List<Expression> candidateRightHandFilters = Predicates.splitAnd(filterExec.condition());
8275
LucenePushdownPredicates lucenePushdownPredicates = LucenePushdownPredicates.from(
8376
SearchContextStats.from(List.of(context)),
@@ -94,15 +87,12 @@ private void buildPreJoinFilter(PhysicalPlan rightPreJoinPlan, ClusterService cl
9487
// We can revisit this in the future if needed, once we have more optimized workflow in place.
9588
// The filter is optional, so it is OK to ignore it if it cannot be translated.
9689
}
97-
// call recursively to find other filters that might be present
98-
// either in another FilterExec or in an EsQueryExec
99-
buildPreJoinFilter(filterExec.child(), clusterService);
100-
} else if (rightPreJoinPlan instanceof UnaryExec unaryExec) {
101-
// there can be other nodes in the plan such as FieldExtractExec in the future
102-
buildPreJoinFilter(unaryExec.child(), clusterService);
90+
} else if (rightPreJoinPlan instanceof EsSourceExec == false) {
91+
throw new IllegalStateException(
92+
"The right side of a LookupJoinExec can only be a FilterExec on top of an EsSourceExec or an EsSourceExec, but got: "
93+
+ rightPreJoinPlan.toString()
94+
);
10395
}
104-
// else we do nothing, as the filters are optional and we don't want to fail the query if there are any errors
105-
// this also covers the case of rightPreJoinPlan being null
10696
}
10797

10898
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/InlineJoin.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1111
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.common.io.stream.StreamOutput;
1213
import org.elasticsearch.compute.data.Block;
1314
import org.elasticsearch.compute.data.BlockUtils;
1415
import org.elasticsearch.xpack.esql.core.expression.Alias;
@@ -75,6 +76,11 @@ public static LogicalPlan inlineData(InlineJoin target, LocalRelation data) {
7576
}
7677
}
7778

79+
@Override
80+
protected LogicalPlan getRightToSerialize(StreamOutput out) {
81+
return right();
82+
}
83+
7884
/**
7985
* Replaces the stubbed source with the actual source.
8086
* NOTE: this will replace the first {@link StubRelation}s found with the source and the method is meant to be used to replace one node

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,11 @@ public Join(StreamInput in) throws IOException {
132132
public void writeTo(StreamOutput out) throws IOException {
133133
source().writeTo(out);
134134
out.writeNamedWriteable(left());
135+
out.writeNamedWriteable(getRightToSerialize(out));
136+
config.writeTo(out);
137+
}
138+
139+
protected LogicalPlan getRightToSerialize(StreamOutput out) {
135140
LogicalPlan rightToSerialize = right();
136141
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER) == false) {
137142
// Prior to TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER
@@ -141,8 +146,7 @@ public void writeTo(StreamOutput out) throws IOException {
141146
rightToSerialize = filter.child();
142147
}
143148
}
144-
out.writeNamedWriteable(rightToSerialize);
145-
config.writeTo(out);
149+
return rightToSerialize;
146150
}
147151

148152
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/BinaryExec.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
package org.elasticsearch.xpack.esql.plan.physical;
99

10-
import org.elasticsearch.TransportVersions;
1110
import org.elasticsearch.common.io.stream.StreamOutput;
1211
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
1312
import org.elasticsearch.xpack.esql.core.tree.Source;
@@ -50,17 +49,11 @@ public PhysicalPlan right() {
5049
public void writeTo(StreamOutput out) throws IOException {
5150
Source.EMPTY.writeTo(out);
5251
out.writeNamedWriteable(left);
53-
PhysicalPlan rightToSerialize = right;
54-
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER) == false
55-
&& this instanceof LookupJoinExec) {
56-
// Prior to TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER
57-
// we do not support a filter on top of the right side of the join
58-
// As we consider the filters optional, we remove them here
59-
while (rightToSerialize instanceof FilterExec filterExec) {
60-
rightToSerialize = filterExec.child();
61-
}
62-
}
63-
out.writeNamedWriteable(rightToSerialize);
52+
out.writeNamedWriteable(getRightToSerialize(out));
53+
}
54+
55+
protected PhysicalPlan getRightToSerialize(StreamOutput out) {
56+
return right;
6457
}
6558

6659
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LookupJoinExec.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.plan.physical;
99

10+
import org.elasticsearch.TransportVersions;
1011
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1112
import org.elasticsearch.common.io.stream.StreamInput;
1213
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -68,6 +69,20 @@ public void writeTo(StreamOutput out) throws IOException {
6869
out.writeNamedWriteableCollection(addedFields);
6970
}
7071

72+
@Override
73+
protected PhysicalPlan getRightToSerialize(StreamOutput out) {
74+
PhysicalPlan rightToSerialize = right();
75+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER) == false) {
76+
// Prior to TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER
77+
// we do not support a filter on top of the right side of the join
78+
// As we consider the filters optional, we remove them here
79+
while (rightToSerialize instanceof FilterExec filterExec) {
80+
rightToSerialize = filterExec.child();
81+
}
82+
}
83+
return rightToSerialize;
84+
}
85+
7186
@Override
7287
public String getWriteableName() {
7388
return ENTRY.name;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import java.util.Set;
6565
import java.util.function.Consumer;
6666
import java.util.function.Predicate;
67+
import java.util.stream.Collectors;
6768

6869
import static java.util.Arrays.asList;
6970
import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.DOC_VALUES;
@@ -209,10 +210,10 @@ public static PhysicalPlan localPlan(
209210
) {
210211
final LocalMapper localMapper = new LocalMapper();
211212
var isCoordPlan = new Holder<>(Boolean.TRUE);
212-
List<PhysicalPlan> lookupJoinExecRightChildren = plan.collect(LookupJoinExec.class::isInstance)
213+
Set<PhysicalPlan> lookupJoinExecRightChildren = plan.collect(LookupJoinExec.class::isInstance)
213214
.stream()
214215
.map(x -> ((LookupJoinExec) x).right())
215-
.toList();
216+
.collect(Collectors.toSet());
216217

217218
var localPhysicalPlan = plan.transformUp(FragmentExec.class, f -> {
218219
if (lookupJoinExecRightChildren.contains(f)) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java

Lines changed: 39 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
1515
import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
1616
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
17+
import org.elasticsearch.xpack.esql.plan.logical.Filter;
1718
import org.elasticsearch.xpack.esql.plan.logical.LeafPlan;
1819
import org.elasticsearch.xpack.esql.plan.logical.Limit;
1920
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
@@ -22,8 +23,8 @@
2223
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
2324
import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
2425
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
26+
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
2527
import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
26-
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
2728
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
2829
import org.elasticsearch.xpack.esql.plan.physical.HashJoinExec;
2930
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
@@ -99,47 +100,49 @@ private PhysicalPlan mapBinary(BinaryPlan binary) {
99100
}
100101

101102
PhysicalPlan left = map(binary.left());
102-
PhysicalPlan right = map(binary.right());
103-
104103
// if the right is data we can use a hash join directly
105-
if (right instanceof LocalSourceExec localData) {
106-
return new HashJoinExec(
107-
join.source(),
108-
left,
109-
localData,
110-
config.matchFields(),
111-
config.leftFields(),
112-
config.rightFields(),
113-
join.rightOutputFields()
114-
);
104+
if (binary.right() instanceof LocalRelation) {
105+
PhysicalPlan right = map(binary.right());
106+
if (right instanceof LocalSourceExec localData) {
107+
return new HashJoinExec(
108+
join.source(),
109+
left,
110+
localData,
111+
config.matchFields(),
112+
config.leftFields(),
113+
config.rightFields(),
114+
join.rightOutputFields()
115+
);
116+
} else {
117+
throw new EsqlIllegalArgumentException("Unsupported right plan for join [" + binary.right().nodeName() + "]");
118+
}
115119
}
116-
LookupJoinExec lookupJoinExec = getLookupJoinExec(join, right, left, config);
117-
if (lookupJoinExec == null && right instanceof FilterExec filterExec) {
118-
lookupJoinExec = getLookupJoinExec(join, filterExec.child(), left, config);
120+
EsRelation rightRelation = null;
121+
if (binary.right() instanceof EsRelation esRelation) {
122+
rightRelation = esRelation;
123+
} else if (binary.right() instanceof Filter filter && filter.child() instanceof EsRelation esRelation) {
124+
rightRelation = esRelation;
119125
}
120-
if (lookupJoinExec != null) {
121-
// we want to do local physical planning on the lookup node eventually for the right side of the lookup join
122-
// so here we will wrap the logical plan with a FragmentExec and keep it as is
123-
FragmentExec fragmentExec = new FragmentExec(binary.right());
124-
return new LookupJoinExec(
125-
join.source(),
126-
left,
127-
fragmentExec,
128-
config.leftFields(),
129-
config.rightFields(),
130-
join.rightOutputFields()
126+
if (rightRelation == null) {
127+
throw new EsqlIllegalArgumentException("Unsupported right plan for lookup join [" + binary.right().nodeName() + "]");
128+
}
129+
if (rightRelation.indexMode() != IndexMode.LOOKUP) {
130+
throw new EsqlIllegalArgumentException(
131+
"To perform a lookup join with index [" + rightRelation.indexPattern() + "], it must be a in lookup index mode"
131132
);
132133
}
133-
return MapperUtils.unsupported(binary);
134+
// we want to do local physical planning on the lookup node eventually for the right side of the lookup join
135+
// so here we will wrap the logical plan with a FragmentExec and keep it as is
136+
FragmentExec fragmentExec = new FragmentExec(binary.right());
137+
return new LookupJoinExec(
138+
join.source(),
139+
left,
140+
fragmentExec,
141+
config.leftFields(),
142+
config.rightFields(),
143+
join.rightOutputFields()
144+
);
134145
}
135-
136146
return MapperUtils.unsupported(binary);
137147
}
138-
139-
private static LookupJoinExec getLookupJoinExec(Join join, PhysicalPlan right, PhysicalPlan left, JoinConfig config) {
140-
if (right instanceof EsSourceExec source && source.indexMode() == IndexMode.LOOKUP) {
141-
return new LookupJoinExec(join.source(), left, right, config.leftFields(), config.rightFields(), join.rightOutputFields());
142-
}
143-
return null;
144-
}
145148
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlFlags.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@
1313

1414
import java.util.List;
1515

16+
/**
17+
* Class holding all the flags that can be used to change behavior for certain features in ESQL.
18+
* The flags are backed by {@link Setting}s so they can be dynamically changed.
19+
* When adding a new flag, make sure to add it to {@link #ALL_ESQL_FLAGS_SETTINGS}
20+
* so it gets registered and unit tests can pass.
21+
*/
1622
public class EsqlFlags {
1723
public static final Setting<Boolean> ESQL_STRING_LIKE_ON_INDEX = Setting.boolSetting(
1824
"esql.query.string_like_on_index",
@@ -41,7 +47,10 @@ public class EsqlFlags {
4147
Setting.Property.NodeScope,
4248
Setting.Property.Dynamic
4349
);
50+
51+
// this is only used for testing purposes right now
4452
public static List<Setting<?>> ALL_ESQL_FLAGS_SETTINGS = List.of(ESQL_STRING_LIKE_ON_INDEX, ESQL_ROUNDTO_PUSHDOWN_THRESHOLD);
53+
4554
private final boolean stringLikeOnIndex;
4655

4756
private final int roundToPushdownThreshold;

0 commit comments

Comments
 (0)