Skip to content

Commit 40e8967

Browse files
committed
One of the ways to fix the limit issue
1 parent 364e139 commit 40e8967

File tree

3 files changed

+39
-27
lines changed

3 files changed

+39
-27
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineLimits.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ private static Limit duplicateLimitAsFirstGrandchild(Limit limit) {
108108

109109
List<LogicalPlan> grandChildren = limit.child().children();
110110
LogicalPlan firstGrandChild = grandChildren.getFirst();
111-
LogicalPlan newFirstGrandChild = limit.replaceChild(firstGrandChild);
111+
LogicalPlan newFirstGrandChild = limit.replaceChild(firstGrandChild).withImplied(true);
112112

113113
List<LogicalPlan> newGrandChildren = new ArrayList<>();
114114
newGrandChildren.add(newFirstGrandChild);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Limit.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,21 @@ public class Limit extends UnaryPlan implements TelemetryAware {
2929
* infinite loops from adding a duplicate of the limit past the child over and over again.
3030
*/
3131
private final transient boolean duplicated;
32+
private final transient boolean implied;
3233

3334
/**
3435
* Default way to create a new instance. Do not use this to copy an existing instance, as this sets {@link Limit#duplicated} to
3536
* {@code false}.
3637
*/
3738
public Limit(Source source, Expression limit, LogicalPlan child) {
38-
this(source, limit, child, false);
39+
this(source, limit, child, false, false);
3940
}
4041

41-
public Limit(Source source, Expression limit, LogicalPlan child, boolean duplicated) {
42+
public Limit(Source source, Expression limit, LogicalPlan child, boolean duplicated, boolean implied) {
4243
super(source, child);
4344
this.limit = limit;
4445
this.duplicated = duplicated;
46+
this.implied = implied;
4547
}
4648

4749
/**
@@ -52,6 +54,7 @@ private Limit(StreamInput in) throws IOException {
5254
Source.readFrom((PlanStreamInput) in),
5355
in.readNamedWriteable(Expression.class),
5456
in.readNamedWriteable(LogicalPlan.class),
57+
false,
5558
false
5659
);
5760
}
@@ -77,28 +80,36 @@ public String getWriteableName() {
7780

7881
@Override
7982
protected NodeInfo<Limit> info() {
80-
return NodeInfo.create(this, Limit::new, limit, child(), duplicated);
83+
return NodeInfo.create(this, Limit::new, limit, child(), duplicated, implied);
8184
}
8285

8386
@Override
8487
public Limit replaceChild(LogicalPlan newChild) {
85-
return new Limit(source(), limit, newChild, duplicated);
88+
return new Limit(source(), limit, newChild, duplicated, implied);
8689
}
8790

8891
public Expression limit() {
8992
return limit;
9093
}
9194

9295
public Limit withLimit(Expression limit) {
93-
return new Limit(source(), limit, child(), duplicated);
96+
return new Limit(source(), limit, child(), duplicated, implied);
9497
}
9598

9699
public boolean duplicated() {
97100
return duplicated;
98101
}
99102

103+
public boolean implied() {
104+
return implied;
105+
}
106+
100107
public Limit withDuplicated(boolean duplicated) {
101-
return new Limit(source(), limit, child(), duplicated);
108+
return new Limit(source(), limit, child(), duplicated, implied);
109+
}
110+
111+
public Limit withImplied(boolean implied) {
112+
return new Limit(source(), limit, child(), duplicated, implied);
102113
}
103114

104115
@Override
@@ -108,7 +119,7 @@ public boolean expressionsResolved() {
108119

109120
@Override
110121
public int hashCode() {
111-
return Objects.hash(limit, child(), duplicated);
122+
return Objects.hash(limit, child(), duplicated, implied);
112123
}
113124

114125
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -205,20 +205,19 @@ private PhysicalPlan mapToFragmentExec(Join logical, PhysicalPlan child) {
205205
Holder<Boolean> hasFragment = new Holder<>(false);
206206
Holder<Boolean> forceLocal = new Holder<>(false);
207207

208+
// If we have any forced limits inside, we can't execute this as a remote join
209+
if (logical.left().anyMatch(pl -> pl instanceof Limit limit && limit.implied() == false)) {
210+
return null;
211+
}
212+
208213
var childTransformed = child.transformUp(f -> {
209214
if (forceLocal.get()) {
210215
return f;
211216
}
212217
// Once we reached FragmentExec, we stuff our Enrich under it
213218
if (f instanceof FragmentExec) {
214219
hasFragment.set(true);
215-
// FIXME: hack to remove duplicate limits. This is probably not the right way to do it.
216-
return new FragmentExec(logical.transformUp(Limit.class, l -> {
217-
if (l.duplicated()) {
218-
return l.child();
219-
}
220-
return l;
221-
}));
220+
return new FragmentExec(logical);
222221
}
223222
if (f instanceof EnrichExec enrichExec) {
224223
assert enrichExec.mode() != Enrich.Mode.REMOTE : "Unexpected remote ENRICH when looking for fragment";
@@ -236,14 +235,19 @@ private PhysicalPlan mapToFragmentExec(Join logical, PhysicalPlan child) {
236235
forceLocal.set(true);
237236
return f;
238237
}
239-
if (f instanceof LimitExec || f instanceof ExchangeExec || f instanceof TopNExec) {
238+
// No need to include LimitExec because:
239+
// 1. If it's an implied limit, we have a copy of it already on the top
240+
// 2. If it's a forced limit, we can't execute this as a remote join anyway.
241+
if (f instanceof ExchangeExec || f instanceof TopNExec) {
240242
return f;
241243
} else {
242244
return unaryExec.child();
243245
}
244246
}
245-
if (f instanceof LookupJoinExec lj) {
246-
return lj.right();
247+
if (f instanceof LookupJoinExec) {
248+
// We shouldn't be meeting LookupJoinExec unless we can't execute this as a remote join
249+
forceLocal.set(true);
250+
return f;
247251
}
248252
if (f instanceof MergeExec) {
249253
forceLocal.set(true);
@@ -252,14 +256,7 @@ private PhysicalPlan mapToFragmentExec(Join logical, PhysicalPlan child) {
252256
return f;
253257
});
254258

255-
if (forceLocal.get()) {
256-
if (logical.isRemote()) {
257-
throw new EsqlIllegalArgumentException("Remote joins are not supported in this context");
258-
}
259-
return null;
260-
}
261-
262-
if (hasFragment.get()) {
259+
if (forceLocal.get() == false && hasFragment.get()) {
263260
return childTransformed;
264261
}
265262
return null;
@@ -288,7 +285,11 @@ private PhysicalPlan mapBinary(BinaryPlan bp) {
288285

289286
if (FRAGMENT_EXEC_HACK_ENABLED) {
290287
var leftPlan = mapToFragmentExec(join, left);
291-
if (leftPlan != null) {
288+
if (leftPlan == null) {
289+
if (join.isRemote()) {
290+
throw new EsqlIllegalArgumentException("Remote joins are not supported in this context: [" + join + "]");
291+
}
292+
} else {
292293
return leftPlan;
293294
}
294295
}

0 commit comments

Comments
 (0)