Skip to content

Commit c2c5911

Browse files
committed
Move OrderBy/Limit to the logical plan
1 parent 97f16e3 commit c2c5911

File tree

8 files changed

+24
-57
lines changed

8 files changed

+24
-57
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,13 @@ public PlanFactory visitChangePointCommand(EsqlBaseParser.ChangePointCommandCont
457457
NamedExpression key = visitQualifiedName(ctx.key);
458458
Attribute targetType = new ReferenceAttribute(src, visitQualifiedName(ctx.targetType).name(), DataType.TEXT);
459459
Attribute targetPvalue = new ReferenceAttribute(src, visitQualifiedName(ctx.targetPvalue).name(), DataType.DOUBLE);
460-
return child -> new ChangePoint(src, child, value, key, targetType, targetPvalue);
460+
return child -> {
461+
// ChangePoint should always run on the coordinating node after all data is collected
462+
// in sorted order. This is enforced by adding OrderBy and Limit here.
463+
OrderBy orderBy = new OrderBy(src, child, List.of(new Order(src, key, Order.OrderDirection.ASC, Order.NullsPosition.ANY)));
464+
Limit limit = new Limit(src, new Literal(Source.EMPTY, 1000, DataType.INTEGER), orderBy);
465+
return new ChangePoint(src, limit, value, key, targetType, targetPvalue);
466+
};
461467
}
462468

463469
private static Tuple<Mode, String> parsePolicyName(Token policyToken) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/ChangePoint.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,7 @@ private ChangePoint(StreamInput in) throws IOException {
6666

6767
@Override
6868
public void writeTo(StreamOutput out) throws IOException {
69-
Source.EMPTY.writeTo(out);
70-
out.writeNamedWriteable(child());
71-
out.writeNamedWriteable(value);
72-
out.writeNamedWriteable(key);
73-
out.writeNamedWriteable(targetType);
74-
out.writeNamedWriteable(targetPvalue);
69+
throw new UnsupportedOperationException("ChangePoint should run on the coordinating node, so never be serialized.");
7570
}
7671

7772
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ChangePointExec.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,7 @@ private ChangePointExec(StreamInput in) throws IOException {
6464

6565
@Override
6666
public void writeTo(StreamOutput out) throws IOException {
67-
Source.EMPTY.writeTo(out);
68-
out.writeNamedWriteable(child());
69-
out.writeNamedWriteable(value);
70-
out.writeNamedWriteable(key);
71-
out.writeNamedWriteable(targetType);
72-
out.writeNamedWriteable(targetPvalue);
67+
throw new UnsupportedOperationException("ChangePoint should run on the coordinating node, so never be serialized.");
7368
}
7469

7570
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,6 @@ private PhysicalOperation planMvExpand(MvExpandExec mvExpandExec, LocalExecution
703703
}
704704

705705
private PhysicalOperation planChangePoint(ChangePointExec changePoint, LocalExecutionPlannerContext context) {
706-
// TODO: should this be planned locally?
707706
PhysicalOperation source = plan(changePoint.child(), context);
708707
Layout layout = source.layout.builder().append(changePoint.targetType()).append(changePoint.targetPvalue()).build();
709708
return source.with(new ChangePointOperator.Factory(layout.get(changePoint.value().id()).channel()), layout);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -95,26 +95,6 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
9595
return new TopNExec(topN.source(), mappedChild, topN.order(), topN.limit(), null);
9696
}
9797

98-
if (unary instanceof ChangePoint changePoint) {
99-
// TODO: ChangePoint shouldn't run on the local node
100-
// TODO: fix hardcoded 1000
101-
mappedChild = new TopNExec(
102-
changePoint.source(),
103-
mappedChild,
104-
List.of(new Order(changePoint.source(), changePoint.key(), Order.OrderDirection.ASC, Order.NullsPosition.ANY)),
105-
new Literal(Source.EMPTY, 1000, DataType.INTEGER),
106-
null
107-
);
108-
return new ChangePointExec(
109-
changePoint.source(),
110-
mappedChild,
111-
changePoint.value(),
112-
changePoint.key(),
113-
changePoint.targetType(),
114-
changePoint.targetPvalue()
115-
);
116-
}
117-
11898
//
11999
// Pipeline operators
120100
//

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -177,25 +177,6 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
177177
return new TopNExec(topN.source(), mappedChild, topN.order(), topN.limit(), null);
178178
}
179179

180-
if (unary instanceof ChangePoint changePoint) {
181-
mappedChild = addExchangeForFragment(changePoint, mappedChild);
182-
mappedChild = new TopNExec(
183-
changePoint.source(),
184-
mappedChild,
185-
List.of(new Order(changePoint.source(), changePoint.key(), Order.OrderDirection.ASC, Order.NullsPosition.ANY)),
186-
new Literal(Source.EMPTY, 1000, DataType.INTEGER),
187-
null
188-
);
189-
return new ChangePointExec(
190-
changePoint.source(),
191-
mappedChild,
192-
changePoint.value(),
193-
changePoint.key(),
194-
changePoint.targetType(),
195-
changePoint.targetPvalue()
196-
);
197-
}
198-
199180
//
200181
// Pipeline operators
201182
//

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1414
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
1515
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
16+
import org.elasticsearch.xpack.esql.plan.logical.ChangePoint;
1617
import org.elasticsearch.xpack.esql.plan.logical.Dissect;
1718
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
1819
import org.elasticsearch.xpack.esql.plan.logical.Eval;
@@ -26,6 +27,7 @@
2627
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
2728
import org.elasticsearch.xpack.esql.plan.logical.show.ShowInfo;
2829
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
30+
import org.elasticsearch.xpack.esql.plan.physical.ChangePointExec;
2931
import org.elasticsearch.xpack.esql.plan.physical.DissectExec;
3032
import org.elasticsearch.xpack.esql.plan.physical.EnrichExec;
3133
import org.elasticsearch.xpack.esql.plan.physical.EvalExec;
@@ -98,6 +100,17 @@ static PhysicalPlan mapUnary(UnaryPlan p, PhysicalPlan child) {
98100
return new MvExpandExec(mvExpand.source(), child, mvExpand.target(), mvExpand.expanded());
99101
}
100102

103+
if (p instanceof ChangePoint changePoint) {
104+
return new ChangePointExec(
105+
changePoint.source(),
106+
child,
107+
changePoint.value(),
108+
changePoint.key(),
109+
changePoint.targetType(),
110+
changePoint.targetPvalue()
111+
);
112+
}
113+
101114
return unsupported(p);
102115
}
103116

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import org.elasticsearch.xpack.esql.parser.EsqlParser;
7575
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
7676
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
77+
import org.elasticsearch.xpack.esql.plan.physical.ChangePointExec;
7778
import org.elasticsearch.xpack.esql.plan.physical.HashJoinExec;
7879
import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
7980
import org.elasticsearch.xpack.esql.plan.physical.OutputExec;
@@ -567,12 +568,9 @@ private Throwable reworkException(Throwable th) {
567568

568569
// Asserts that the serialization and deserialization of the plan creates an equivalent plan.
569570
private void opportunisticallyAssertPlanSerialization(PhysicalPlan plan) {
570-
571-
// skip plans with localSourceExec
572-
if (plan.anyMatch(p -> p instanceof LocalSourceExec || p instanceof HashJoinExec)) {
571+
if (plan.anyMatch(p -> p instanceof LocalSourceExec || p instanceof HashJoinExec || p instanceof ChangePointExec)) {
573572
return;
574573
}
575-
576574
SerializationTestUtils.assertSerialization(plan, configuration);
577575
}
578576

0 commit comments

Comments
 (0)