Skip to content

Commit cb90a8b

Browse files
alex-spiessmalyshev
authored andcommitted
Sketch out another approach to planning remote join
1 parent b7dd7d4 commit cb90a8b

File tree

10 files changed

+65
-75
lines changed

10 files changed

+65
-75
lines changed

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -119,11 +119,11 @@ public MultiClusterSpecIT(
119119
"StatsAndLookupMessageFromIndex",
120120
"MvJoinKeyOnTheLookupIndexAfterStats",
121121
"MvJoinKeyOnFromAfterStats",
122-
// Lookup join after SORT+LIMIT is not supported in CCS yet
123-
"NullifiedJoinKeyToPurgeTheJoin"
124-
// "SortBeforeAndAfterJoin",
125-
// "SortEvalBeforeLookup",
126-
// "SortBeforeAndAfterMultipleJoinAndMvExpand"
122+
// Lookup join after SORT is not supported in CCS yet
123+
"NullifiedJoinKeyToPurgeTheJoin",
124+
"SortBeforeAndAfterJoin",
125+
"SortEvalBeforeLookup",
126+
"SortBeforeAndAfterMultipleJoinAndMvExpand"
127127
);
128128

129129
@Override

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ emp_no:integer
331331
10001
332332
;
333333

334-
dropAllFieldsUsedInLookupOnMaybeTheCoordinator
334+
dropAllFieldsUsedInLookupOnTheCoordinator
335335
required_capability: join_lookup_v12
336336

337337
FROM employees
@@ -367,7 +367,7 @@ language_code:integer | language_name:keyword | country:text
367367
1 | English | null
368368
;
369369

370-
nonUniqueRightKeyOnMaybeTheCoordinatorLateLimit
370+
nonUniqueRightKeyOnTheCoordinatorLateLimit
371371
required_capability: join_lookup_v12
372372
required_capability: join_lookup_fix_limit_pushdown
373373

@@ -714,7 +714,7 @@ emp_no:integer | language_code:integer | language_name:keyword
714714
10003 | 4 | German
715715
;
716716

717-
filterOnTheDataNodeThenFilterOnMaybeTheCoordinator
717+
filterOnTheDataNodeThenFilterOnTheCoordinator
718718
required_capability: join_lookup_v12
719719

720720
FROM employees

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineLimits.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ private static Limit duplicateLimitAsFirstGrandchild(Limit limit) {
108108

109109
List<LogicalPlan> grandChildren = limit.child().children();
110110
LogicalPlan firstGrandChild = grandChildren.getFirst();
111-
LogicalPlan newFirstGrandChild = limit.replaceChild(firstGrandChild).withImplied(true);
111+
LogicalPlan newFirstGrandChild = limit.replaceChild(firstGrandChild);
112112

113113
List<LogicalPlan> newGrandChildren = new ArrayList<>();
114114
newGrandChildren.add(newFirstGrandChild);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Aggregate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
4646
import static org.elasticsearch.xpack.esql.plan.logical.Filter.checkFilterConditionDataType;
4747

48-
public class Aggregate extends UnaryPlan implements PostAnalysisVerificationAware, TelemetryAware, SortAgnostic {
48+
public class Aggregate extends UnaryPlan implements PostAnalysisVerificationAware, TelemetryAware, SortAgnostic, PipelineBreaker {
4949
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
5050
LogicalPlan.class,
5151
"Aggregate",

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Limit.java

Lines changed: 11 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import java.io.IOException;
1919
import java.util.Objects;
2020

21-
public class Limit extends UnaryPlan implements TelemetryAware {
21+
public class Limit extends UnaryPlan implements TelemetryAware, PipelineBreaker {
2222
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Limit", Limit::new);
2323

2424
private final Expression limit;
@@ -29,21 +29,19 @@ public class Limit extends UnaryPlan implements TelemetryAware {
2929
* infinite loops from adding a duplicate of the limit past the child over and over again.
3030
*/
3131
private final transient boolean duplicated;
32-
private final transient boolean implied;
3332

3433
/**
3534
* Default way to create a new instance. Do not use this to copy an existing instance, as this sets {@link Limit#duplicated} to
3635
* {@code false}.
3736
*/
3837
public Limit(Source source, Expression limit, LogicalPlan child) {
39-
this(source, limit, child, false, false);
38+
this(source, limit, child, false);
4039
}
4140

42-
public Limit(Source source, Expression limit, LogicalPlan child, boolean duplicated, boolean implied) {
41+
public Limit(Source source, Expression limit, LogicalPlan child, boolean duplicated) {
4342
super(source, child);
4443
this.limit = limit;
4544
this.duplicated = duplicated;
46-
this.implied = implied;
4745
}
4846

4947
/**
@@ -54,24 +52,20 @@ private Limit(StreamInput in) throws IOException {
5452
Source.readFrom((PlanStreamInput) in),
5553
in.readNamedWriteable(Expression.class),
5654
in.readNamedWriteable(LogicalPlan.class),
57-
false,
5855
false
5956
);
6057
}
6158

6259
/**
63-
* Omits serializing {@link Limit#duplicated} because when sent to a data node, this should always be {@code false}.
64-
* That's because if it's true, this means a copy of this limit was pushed down below an MvExpand or Join, and thus there's
65-
* another pipeline breaker further upstream - we're already on the coordinator node.
60+
* Omits serializing {@link Limit#duplicated} because this is only required to avoid pushing duplicating a limit past
61+
* {@link org.elasticsearch.xpack.esql.plan.logical.join.Join} or {@link MvExpand} in an infinite loop, see
62+
* {@link org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownAndCombineLimits}.
6663
*/
6764
@Override
6865
public void writeTo(StreamOutput out) throws IOException {
6966
Source.EMPTY.writeTo(out);
7067
out.writeNamedWriteable(limit());
7168
out.writeNamedWriteable(child());
72-
// Let's make sure we notice during tests if we ever serialize a duplicated Limit.
73-
// FIXME: verify that this condition is correct and fix the comments
74-
assert implied || duplicated == false;
7569
}
7670

7771
@Override
@@ -81,36 +75,28 @@ public String getWriteableName() {
8175

8276
@Override
8377
protected NodeInfo<Limit> info() {
84-
return NodeInfo.create(this, Limit::new, limit, child(), duplicated, implied);
78+
return NodeInfo.create(this, Limit::new, limit, child(), duplicated);
8579
}
8680

8781
@Override
8882
public Limit replaceChild(LogicalPlan newChild) {
89-
return new Limit(source(), limit, newChild, duplicated, implied);
83+
return new Limit(source(), limit, newChild, duplicated);
9084
}
9185

9286
public Expression limit() {
9387
return limit;
9488
}
9589

9690
public Limit withLimit(Expression limit) {
97-
return new Limit(source(), limit, child(), duplicated, implied);
91+
return new Limit(source(), limit, child(), duplicated);
9892
}
9993

10094
public boolean duplicated() {
10195
return duplicated;
10296
}
10397

104-
public boolean implied() {
105-
return implied;
106-
}
107-
10898
public Limit withDuplicated(boolean duplicated) {
109-
return new Limit(source(), limit, child(), duplicated, implied);
110-
}
111-
112-
public Limit withImplied(boolean implied) {
113-
return new Limit(source(), limit, child(), duplicated, implied);
99+
return new Limit(source(), limit, child(), duplicated);
114100
}
115101

116102
@Override
@@ -120,7 +106,7 @@ public boolean expressionsResolved() {
120106

121107
@Override
122108
public int hashCode() {
123-
return Objects.hash(limit, child(), duplicated, implied);
109+
return Objects.hash(limit, child(), duplicated);
124110
}
125111

126112
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/OrderBy.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ public class OrderBy extends UnaryPlan
3131
PostAnalysisVerificationAware,
3232
PostOptimizationVerificationAware,
3333
TelemetryAware,
34-
SortAgnostic {
34+
SortAgnostic,
35+
PipelineBreaker {
3536
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "OrderBy", OrderBy::new);
3637

3738
private final List<Order> order;
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.plan.logical;
9+
10+
public interface PipelineBreaker {}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/TopN.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.util.List;
2222
import java.util.Objects;
2323

24-
public class TopN extends UnaryPlan {
24+
public class TopN extends UnaryPlan implements PipelineBreaker {
2525
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "TopN", TopN::new);
2626

2727
private final List<Order> order;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,16 @@
1313
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1414
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1515
import org.elasticsearch.xpack.esql.core.tree.Source;
16-
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
1716
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
18-
import org.elasticsearch.xpack.esql.plan.logical.Limit;
1917
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
20-
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
18+
import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker;
2119
import org.elasticsearch.xpack.esql.plan.logical.SurrogateLogicalPlan;
2220
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
2321
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.UsingJoinType;
2422

23+
import java.util.HashSet;
2524
import java.util.List;
25+
import java.util.Set;
2626

2727
import static java.util.Collections.emptyList;
2828
import static org.elasticsearch.xpack.esql.common.Failure.fail;
@@ -97,28 +97,18 @@ public void postAnalysisVerification(Failures failures) {
9797
}
9898

9999
private void checkRemoteJoin(Failures failures) {
100-
boolean[] agg = { false };
101-
boolean[] enrichCoord = { false };
102-
boolean[] limit = { false };
100+
Set<String> fails = new HashSet<>();
103101

104102
this.forEachUp(UnaryPlan.class, u -> {
105-
if (u instanceof Aggregate) {
106-
agg[0] = true;
107-
} else if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) {
108-
enrichCoord[0] = true;
109-
} else if (u instanceof Limit) {
110-
limit[0] = true;
103+
if (u instanceof PipelineBreaker) {
104+
fails.add(u.nodeName());
105+
}
106+
if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) {
107+
fails.add("ENRICH with coordinator policy");
111108
}
112109
});
113-
if (agg[0]) {
114-
failures.add(fail(this, "LOOKUP JOIN with remote indices can't be executed after STATS"));
115-
}
116-
if (enrichCoord[0]) {
117-
failures.add(fail(this, "LOOKUP JOIN with remote indices can't be executed after ENRICH with coordinator policy"));
118-
}
119-
if (limit[0]) {
120-
failures.add(fail(this, "LOOKUP JOIN with remote indices can't be executed after LIMIT"));
121-
}
110+
111+
fails.forEach(f -> failures.add(fail(this, "LOOKUP JOIN with remote indices can't be executed after " + f)));
122112
}
123113

124114
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.elasticsearch.xpack.esql.plan.logical.LeafPlan;
2121
import org.elasticsearch.xpack.esql.plan.logical.Limit;
2222
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
23-
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
23+
import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker;
2424
import org.elasticsearch.xpack.esql.plan.logical.Sample;
2525
import org.elasticsearch.xpack.esql.plan.logical.TopN;
2626
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
@@ -205,11 +205,6 @@ private PhysicalPlan mapToFragmentExec(Join logical, PhysicalPlan child) {
205205
Holder<Boolean> hasFragment = new Holder<>(false);
206206
Holder<Boolean> forceLocal = new Holder<>(false);
207207

208-
// If we have any forced limits inside, we can't execute this as a remote join
209-
if (logical.left().anyMatch(pl -> pl instanceof Limit limit && limit.implied() == false)) {
210-
return null;
211-
}
212-
213208
var childTransformed = child.transformUp(f -> {
214209
if (forceLocal.get()) {
215210
return f;
@@ -276,23 +271,31 @@ private PhysicalPlan mapBinary(BinaryPlan bp) {
276271
return new FragmentExec(bp);
277272
}
278273

274+
if (join.isRemote()) {
275+
// This is generally wrong in case of pipeline breakers upstream from the join, but we validate against these.
276+
// The only potential pipeline breakers upstream should be limits duplicated past the join from PushdownAndCombineLimits,
277+
// but they are okay to perform on the data nodes because they only serve to reduce the number of rows processed and
278+
// don't affect correctness due to another limit being downstream.
279+
return new FragmentExec(bp);
280+
}
281+
279282
PhysicalPlan left = map(bp.left());
280283

281284
// only broadcast joins supported for now - hence push down as a streaming operator
282285
if (left instanceof FragmentExec fragment) {
283286
return new FragmentExec(bp);
284287
}
285288

286-
if (FRAGMENT_EXEC_HACK_ENABLED) {
287-
var leftPlan = mapToFragmentExec(join, left);
288-
if (leftPlan == null) {
289-
if (join.isRemote()) {
290-
throw new EsqlIllegalArgumentException("Remote joins are not supported in this context: [" + join + "]");
291-
}
292-
} else {
293-
return leftPlan;
294-
}
295-
}
289+
// if (FRAGMENT_EXEC_HACK_ENABLED) {
290+
// var leftPlan = mapToFragmentExec(join, left);
291+
// if (leftPlan == null) {
292+
// if (join.isRemote()) {
293+
// throw new EsqlIllegalArgumentException("Remote joins are not supported in this context: [" + join + "]");
294+
// }
295+
// } else {
296+
// return leftPlan;
297+
// }
298+
// }
296299

297300
PhysicalPlan right = map(bp.right());
298301
// if the right is data we can use a hash join directly
@@ -322,7 +325,7 @@ private PhysicalPlan mapFork(Fork fork) {
322325
}
323326

324327
public static boolean isPipelineBreaker(LogicalPlan p) {
325-
return p instanceof Aggregate || p instanceof TopN || p instanceof Limit || p instanceof OrderBy;
328+
return p instanceof PipelineBreaker;
326329
}
327330

328331
private PhysicalPlan addExchangeForFragment(LogicalPlan logical, PhysicalPlan child) {

0 commit comments

Comments
 (0)