Skip to content

Commit d99bd56

Browse files
committed
refactor mapper
1 parent 0a8f716 commit d99bd56

File tree

2 files changed

+35
-13
lines changed

2 files changed

+35
-13
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/LookupJoin.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
1717
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
1818
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
19+
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
1920
import org.elasticsearch.xpack.esql.plan.logical.SurrogateLogicalPlan;
2021
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
2122
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes.UsingJoinType;
@@ -97,22 +98,26 @@ public void postAnalysisVerification(Failures failures) {
9798
private void checkRemoteJoin(Failures failures) {
9899
boolean[] agg = { false };
99100
boolean[] enrichCoord = { false };
101+
boolean[] sort = { false };
100102

101103
this.forEachUp(UnaryPlan.class, u -> {
102104
if (u instanceof Aggregate) {
103105
agg[0] = true;
104106
} else if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) {
105107
enrichCoord[0] = true;
106-
}
107-
if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
108-
if (agg[0]) {
109-
failures.add(fail(enrich, "LOOKUP JOIN with remote indices can't be executed after STATS"));
110-
}
111-
if (enrichCoord[0]) {
112-
failures.add(fail(enrich, "LOOKUP JOIN with remote indices can't be executed after ENRICH with coordinator policy"));
113-
}
108+
} else if (u instanceof OrderBy) {
109+
sort[0] = true;
114110
}
115111
});
112+
if (agg[0]) {
113+
failures.add(fail(this, "LOOKUP JOIN with remote indices can't be executed after STATS"));
114+
}
115+
if (enrichCoord[0]) {
116+
failures.add(fail(this, "LOOKUP JOIN with remote indices can't be executed after ENRICH with coordinator policy"));
117+
}
118+
if (sort[0]) {
119+
failures.add(fail(this, "LOOKUP JOIN with remote indices can't be executed after SORT"));
120+
}
116121
}
117122

118123
public boolean isRemote() {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
3030
import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
3131
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
32+
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
3233
import org.elasticsearch.xpack.esql.plan.physical.EnrichExec;
3334
import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec;
3435
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
@@ -202,8 +203,12 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
202203

203204
private PhysicalPlan mapToFragmentExec(LogicalPlan logical, PhysicalPlan child) {
204205
Holder<Boolean> hasFragment = new Holder<>(false);
206+
Holder<Boolean> forceLocal = new Holder<>(false);
205207

206208
var childTransformed = child.transformUp(f -> {
209+
if (forceLocal.get()) {
210+
return f;
211+
}
207212
// Once we reached FragmentExec, we stuff our Enrich under it
208213
if (f instanceof FragmentExec) {
209214
hasFragment.set(true);
@@ -216,12 +221,21 @@ private PhysicalPlan mapToFragmentExec(LogicalPlan logical, PhysicalPlan child)
216221
}));
217222
}
218223
if (f instanceof EnrichExec enrichExec) {
219-
// It can only be ANY because COORDINATOR would have errored out earlier, and REMOTE should be under FragmentExec
220-
assert enrichExec.mode() == Enrich.Mode.ANY : "enrich must be in ANY mode here";
221-
return enrichExec.child();
224+
assert enrichExec.mode() != Enrich.Mode.REMOTE : "Unexpected remote ENRICH when looking for fragment";
225+
if (enrichExec.mode() == Enrich.Mode.ANY) {
226+
return enrichExec.child();
227+
} else {
228+
forceLocal.set(true);
229+
return f;
230+
}
222231
}
223232
if (f instanceof UnaryExec unaryExec) {
224-
if (f instanceof LimitExec || f instanceof ExchangeExec || f instanceof TopNExec) {
233+
if (f instanceof AggregateExec || f instanceof TopNExec) {
234+
// We can't make a fragment here...
235+
forceLocal.set(true);
236+
return f;
237+
}
238+
if (f instanceof LimitExec || f instanceof ExchangeExec) {
225239
return f;
226240
} else {
227241
return unaryExec.child();
@@ -230,10 +244,13 @@ private PhysicalPlan mapToFragmentExec(LogicalPlan logical, PhysicalPlan child)
230244
if (f instanceof LookupJoinExec lj) {
231245
return lj.right();
232246
}
233-
// Currently, it's either UnaryExec or LeafExec. Leaf will either resolve to FragmentExec or we'll ignore it.
234247
return f;
235248
});
236249

250+
if (forceLocal.get()) {
251+
return null;
252+
}
253+
237254
if (hasFragment.get()) {
238255
return childTransformed;
239256
}

0 commit comments

Comments
 (0)