Skip to content

Commit 6e6e28e

Browse files
Switch to storing the filter in Join
1 parent ec9817d commit 6e6e28e

File tree

7 files changed

+88
-31
lines changed

7 files changed

+88
-31
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,9 +159,11 @@ private static LogicalPlan pushDownPastJoin(Filter filter, Join join) {
159159
// push the filter down to the right child
160160
List<Expression> rightPushableFilters = buildRightPushableFilters(scoped.rightFilters());
161161
if (rightPushableFilters.isEmpty() == false) {
162-
right = new Filter(right.source(), right, Predicates.combineAnd(rightPushableFilters));
162+
// right = new Filter(right.source(), right, Predicates.combineAnd(rightPushableFilters));
163163
// update the join with the new right child
164-
join = (Join) join.replaceRight(right);
164+
// join = (Join) join.replaceRight(right);
165+
Expression optionalRightHandSideFilters = Predicates.combineAnd(rightPushableFilters);
166+
join = join.withOptionalRightHandFilters(optionalRightHandSideFilters);
165167
optimizationApplied = true;
166168
}
167169
// We still want to reapply the filters that we just applied to the right child,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.plan.logical.join;
99

10+
import org.elasticsearch.TransportVersions;
1011
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1112
import org.elasticsearch.common.io.stream.StreamInput;
1213
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -15,6 +16,7 @@
1516
import org.elasticsearch.xpack.esql.common.Failures;
1617
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1718
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
19+
import org.elasticsearch.xpack.esql.core.expression.Expression;
1820
import org.elasticsearch.xpack.esql.core.expression.Expressions;
1921
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
2022
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
@@ -91,15 +93,24 @@ public class Join extends BinaryPlan implements PostAnalysisVerificationAware, S
9193
private List<Attribute> lazyOutput;
9294
// Does this join involve remote indices? This is relevant only on the coordinating node, thus transient.
9395
private transient boolean isRemote = false;
96+
private Expression optionalRightHandFilters = null;
9497

9598
public Join(Source source, LogicalPlan left, LogicalPlan right, JoinConfig config) {
96-
this(source, left, right, config, false);
99+
this(source, left, right, config, false, null);
97100
}
98101

99-
public Join(Source source, LogicalPlan left, LogicalPlan right, JoinConfig config, boolean isRemote) {
102+
public Join(
103+
Source source,
104+
LogicalPlan left,
105+
LogicalPlan right,
106+
JoinConfig config,
107+
boolean isRemote,
108+
Expression optionalRightHandFilters
109+
) {
100110
super(source, left, right);
101111
this.config = config;
102112
this.isRemote = isRemote;
113+
this.optionalRightHandFilters = optionalRightHandFilters;
103114
}
104115

105116
public Join(
@@ -118,6 +129,11 @@ public Join(
118129
public Join(StreamInput in) throws IOException {
119130
super(Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(LogicalPlan.class), in.readNamedWriteable(LogicalPlan.class));
120131
this.config = new JoinConfig(in);
132+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER)) {
133+
this.optionalRightHandFilters = in.readOptionalNamedWriteable(Expression.class);
134+
} else {
135+
this.optionalRightHandFilters = null;
136+
}
121137
}
122138

123139
@Override
@@ -126,6 +142,11 @@ public void writeTo(StreamOutput out) throws IOException {
126142
out.writeNamedWriteable(left());
127143
out.writeNamedWriteable(right());
128144
config.writeTo(out);
145+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER)) {
146+
out.writeOptionalNamedWriteable(optionalRightHandFilters());
147+
}
148+
// as the optionalRightHandFilters are optional it is OK to not write them if the node does not support it
149+
// it will still work, but performance might be worse
129150
}
130151

131152
@Override
@@ -247,17 +268,25 @@ public boolean resolved() {
247268
}
248269

249270
public Join withConfig(JoinConfig config) {
250-
return new Join(source(), left(), right(), config, isRemote);
271+
return new Join(source(), left(), right(), config, isRemote, optionalRightHandFilters());
272+
}
273+
274+
public Expression optionalRightHandFilters() {
275+
return this.optionalRightHandFilters;
276+
}
277+
278+
public Join withOptionalRightHandFilters(Expression optionalRightHandFilters) {
279+
return new Join(source(), left(), right(), config(), isRemote, optionalRightHandFilters);
251280
}
252281

253282
@Override
254283
public Join replaceChildren(LogicalPlan left, LogicalPlan right) {
255-
return new Join(source(), left, right, config, isRemote);
284+
return new Join(source(), left, right, config, isRemote, optionalRightHandFilters());
256285
}
257286

258287
@Override
259288
public int hashCode() {
260-
return Objects.hash(config, left(), right(), isRemote);
289+
return Objects.hash(config, left(), right(), isRemote, optionalRightHandFilters);
261290
}
262291

263292
@Override
@@ -273,7 +302,8 @@ public boolean equals(Object obj) {
273302
return config.equals(other.config)
274303
&& Objects.equals(left(), other.left())
275304
&& Objects.equals(right(), other.right())
276-
&& isRemote == other.isRemote;
305+
&& isRemote == other.isRemote
306+
&& Objects.equals(optionalRightHandFilters, other.optionalRightHandFilters);
277307
}
278308

279309
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, JoinConfig
5555
}
5656

5757
public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, JoinConfig joinConfig, boolean isRemote) {
58-
super(source, left, right, joinConfig, isRemote);
58+
super(source, left, right, joinConfig, isRemote, null);
5959
}
6060

6161
/**
@@ -64,7 +64,7 @@ public LookupJoin(Source source, LogicalPlan left, LogicalPlan right, JoinConfig
6464
@Override
6565
public LogicalPlan surrogate() {
6666
// TODO: decide whether to introduce USING or just basic ON semantics - keep the ordering out for now
67-
return new Join(source(), left(), right(), config(), isRemote());
67+
return new Join(source(), left(), right(), config(), isRemote(), optionalRightHandFilters());
6868
}
6969

7070
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LookupJoinExec.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77

88
package org.elasticsearch.xpack.esql.plan.physical;
99

10+
import org.elasticsearch.TransportVersions;
1011
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1112
import org.elasticsearch.common.io.stream.StreamInput;
1213
import org.elasticsearch.common.io.stream.StreamOutput;
1314
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1415
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
16+
import org.elasticsearch.xpack.esql.core.expression.Expression;
1517
import org.elasticsearch.xpack.esql.core.expression.Expressions;
1618
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1719
import org.elasticsearch.xpack.esql.core.tree.Source;
@@ -37,6 +39,7 @@ public class LookupJoinExec extends BinaryExec implements EstimatesRowSize {
3739
* the right hand side by a {@link EsQueryExec}, and thus lose the information of which fields we'll get from the lookup index.
3840
*/
3941
private final List<Attribute> addedFields;
42+
private final Expression optionalRightHandFilters;
4043
private List<Attribute> lazyOutput;
4144

4245
public LookupJoinExec(
@@ -45,19 +48,26 @@ public LookupJoinExec(
4548
PhysicalPlan lookup,
4649
List<Attribute> leftFields,
4750
List<Attribute> rightFields,
48-
List<Attribute> addedFields
51+
List<Attribute> addedFields,
52+
Expression optionalRightHandFilters
4953
) {
5054
super(source, left, lookup);
5155
this.leftFields = leftFields;
5256
this.rightFields = rightFields;
5357
this.addedFields = addedFields;
58+
this.optionalRightHandFilters = optionalRightHandFilters;
5459
}
5560

5661
private LookupJoinExec(StreamInput in) throws IOException {
5762
super(Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(PhysicalPlan.class), in.readNamedWriteable(PhysicalPlan.class));
5863
this.leftFields = in.readNamedWriteableCollectionAsList(Attribute.class);
5964
this.rightFields = in.readNamedWriteableCollectionAsList(Attribute.class);
6065
this.addedFields = in.readNamedWriteableCollectionAsList(Attribute.class);
66+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER)) {
67+
this.optionalRightHandFilters = in.readOptionalNamedWriteable(Expression.class);
68+
} else {
69+
this.optionalRightHandFilters = null; // For versions before the field was added, we default to null
70+
}
6171
}
6272

6373
@Override
@@ -66,6 +76,15 @@ public void writeTo(StreamOutput out) throws IOException {
6676
out.writeNamedWriteableCollection(leftFields);
6777
out.writeNamedWriteableCollection(rightFields);
6878
out.writeNamedWriteableCollection(addedFields);
79+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_LOOKUP_JOIN_PRE_JOIN_FILTER)) {
80+
out.writeOptionalNamedWriteable(getOptionalRightHandFilters());
81+
}
82+
// as the optionalRightHandFilters are optional it is OK to not write them if the node does not support it
83+
// it will still work, but performance might be worse
84+
}
85+
86+
public Expression getOptionalRightHandFilters() {
87+
return optionalRightHandFilters;
6988
}
7089

7190
@Override
@@ -136,12 +155,12 @@ public AttributeSet rightReferences() {
136155

137156
@Override
138157
public LookupJoinExec replaceChildren(PhysicalPlan left, PhysicalPlan right) {
139-
return new LookupJoinExec(source(), left, right, leftFields, rightFields, addedFields);
158+
return new LookupJoinExec(source(), left, right, leftFields, rightFields, addedFields, optionalRightHandFilters);
140159
}
141160

142161
@Override
143162
protected NodeInfo<? extends PhysicalPlan> info() {
144-
return NodeInfo.create(this, LookupJoinExec::new, left(), right(), leftFields, rightFields, addedFields);
163+
return NodeInfo.create(this, LookupJoinExec::new, left(), right(), leftFields, rightFields, addedFields, optionalRightHandFilters);
145164
}
146165

147166
@Override
@@ -156,11 +175,14 @@ public boolean equals(Object o) {
156175
return false;
157176
}
158177
LookupJoinExec other = (LookupJoinExec) o;
159-
return leftFields.equals(other.leftFields) && rightFields.equals(other.rightFields) && addedFields.equals(other.addedFields);
178+
return leftFields.equals(other.leftFields)
179+
&& rightFields.equals(other.rightFields)
180+
&& addedFields.equals(other.addedFields)
181+
&& Objects.equals(optionalRightHandFilters, other.optionalRightHandFilters);
160182
}
161183

162184
@Override
163185
public int hashCode() {
164-
return Objects.hash(super.hashCode(), leftFields, rightFields, addedFields);
186+
return Objects.hash(super.hashCode(), leftFields, rightFields, addedFields, optionalRightHandFilters);
165187
}
166188
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,6 @@
119119
import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
120120
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
121121
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
122-
import org.elasticsearch.xpack.esql.plan.physical.UnaryExec;
123122
import org.elasticsearch.xpack.esql.plan.physical.inference.CompletionExec;
124123
import org.elasticsearch.xpack.esql.plan.physical.inference.RerankExec;
125124
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders.ShardContext;
@@ -733,8 +732,8 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan
733732
}
734733
Layout layout = layoutBuilder.build();
735734

736-
EsQueryExec localSourceExec = fildEsQueryExec(join.lookup());
737-
if (localSourceExec == null || localSourceExec.indexMode() != IndexMode.LOOKUP) {
735+
EsQueryExec localSourceExec = (EsQueryExec) join.lookup();
736+
if (localSourceExec.indexMode() != IndexMode.LOOKUP) {
738737
throw new IllegalArgumentException("can't plan [" + join + "]");
739738
}
740739

@@ -781,7 +780,11 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan
781780
}
782781
matchFields.add(new MatchConfig(right, input));
783782
}
784-
783+
PhysicalPlan rightPreJoinPlan = join.right();
784+
if (join.getOptionalRightHandFilters() != null) {
785+
// If there are filters on the right side, we need to apply them before the join
786+
rightPreJoinPlan = new FilterExec(Source.EMPTY, rightPreJoinPlan, join.getOptionalRightHandFilters());
787+
}
785788
return source.with(
786789
new LookupFromIndexOperator.Factory(
787790
matchFields,
@@ -793,21 +796,12 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan
793796
indexName,
794797
join.addedFields().stream().map(f -> (NamedExpression) f).toList(),
795798
join.source(),
796-
join.right()
799+
rightPreJoinPlan
797800
),
798801
layout
799802
);
800803
}
801804

802-
private EsQueryExec fildEsQueryExec(PhysicalPlan lookup) {
803-
if (lookup instanceof EsQueryExec esQueryExec) {
804-
return esQueryExec;
805-
} else if (lookup instanceof UnaryExec unaryExec) {
806-
return fildEsQueryExec(unaryExec.child());
807-
}
808-
return null;
809-
}
810-
811805
private PhysicalOperation planLocal(LocalSourceExec localSourceExec, LocalExecutionPlannerContext context) {
812806
Layout.Builder layout = new Layout.Builder();
813807
layout.append(localSourceExec.output());

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,15 @@ private PhysicalPlan mapBinary(BinaryPlan binary) {
129129

130130
private static LookupJoinExec getLookupJoinExec(Join join, PhysicalPlan right, PhysicalPlan left, JoinConfig config) {
131131
if (right instanceof EsSourceExec source && source.indexMode() == IndexMode.LOOKUP) {
132-
return new LookupJoinExec(join.source(), left, right, config.leftFields(), config.rightFields(), join.rightOutputFields());
132+
return new LookupJoinExec(
133+
join.source(),
134+
left,
135+
right,
136+
config.leftFields(),
137+
config.rightFields(),
138+
join.rightOutputFields(),
139+
join.optionalRightHandFilters()
140+
);
133141
}
134142
return null;
135143
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,8 @@ private PhysicalPlan mapBinary(BinaryPlan bp) {
240240
right,
241241
config.leftFields(),
242242
config.rightFields(),
243-
join.rightOutputFields()
243+
join.rightOutputFields(),
244+
join.optionalRightHandFilters()
244245
);
245246
}
246247
}

0 commit comments

Comments
 (0)