|
9 | 9 | import org.elasticsearch.common.io.stream.NamedWriteableRegistry; |
10 | 10 | import org.elasticsearch.common.io.stream.StreamInput; |
11 | 11 | import org.elasticsearch.common.io.stream.StreamOutput; |
| 12 | +import org.elasticsearch.compute.operator.ChangePointOperator; |
12 | 13 | import org.elasticsearch.xpack.esql.core.expression.Attribute; |
13 | 14 | import org.elasticsearch.xpack.esql.core.expression.AttributeSet; |
14 | 15 | import org.elasticsearch.xpack.esql.core.expression.Expressions; |
| 16 | +import org.elasticsearch.xpack.esql.core.expression.Literal; |
15 | 17 | import org.elasticsearch.xpack.esql.core.expression.NameId; |
16 | 18 | import org.elasticsearch.xpack.esql.core.expression.NamedExpression; |
17 | 19 | import org.elasticsearch.xpack.esql.core.tree.NodeInfo; |
18 | 20 | import org.elasticsearch.xpack.esql.core.tree.Source; |
| 21 | +import org.elasticsearch.xpack.esql.core.type.DataType; |
19 | 22 | import org.elasticsearch.xpack.esql.expression.NamedExpressions; |
| 23 | +import org.elasticsearch.xpack.esql.expression.Order; |
20 | 24 | import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; |
21 | 25 | import org.elasticsearch.xpack.esql.plan.GeneratingPlan; |
22 | 26 |
|
23 | 27 | import java.io.IOException; |
24 | 28 | import java.util.List; |
25 | 29 | import java.util.Objects; |
26 | 30 |
|
27 | | -public class ChangePoint extends UnaryPlan implements GeneratingPlan<ChangePoint> { |
| 31 | +public class ChangePoint extends UnaryPlan implements GeneratingPlan<ChangePoint>, SurrogateLogicalPlan { |
| 32 | + |
28 | 33 | public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( |
29 | 34 | LogicalPlan.class, |
30 | 35 | "ChangePoint", |
@@ -161,4 +166,22 @@ public boolean equals(Object obj) { |
161 | 166 | && Objects.equals(targetType, other.targetType) |
162 | 167 | && Objects.equals(targetPvalue, other.targetPvalue); |
163 | 168 | } |
| 169 | + |
| 170 | + @Override |
| 171 | + public LogicalPlan surrogate() { |
| 172 | + // ChangePoint should always run on the coordinating node after the data is collected |
| 173 | + // in sorted order. This is enforced by adding OrderBy here. |
| 174 | + // Furthermore, ChangePoint should be called with at most 1000 data points. That's |
| 175 | + // enforced by the Limits here. The first Limit of N+1 data points is necessary to |
| 176 | + // generate a possible warning, the second Limit of N is to truncate the output. |
| 177 | + Order order = new Order(source(), key, Order.OrderDirection.ASC, Order.NullsPosition.ANY); |
| 178 | + OrderBy orderBy = new OrderBy(source(), child(), List.of(order)); |
| 179 | + Limit limit = new Limit( |
| 180 | + source(), |
| 181 | + new Literal(Source.EMPTY, ChangePointOperator.INPUT_VALUE_COUNT_LIMIT + 1, DataType.INTEGER), |
| 182 | + orderBy |
| 183 | + ); |
| 184 | + ChangePoint changePoint = new ChangePoint(source(), limit, value, key, targetType, targetPvalue); |
| 185 | + return new Limit(source(), new Literal(Source.EMPTY, ChangePointOperator.INPUT_VALUE_COUNT_LIMIT, DataType.INTEGER), changePoint); |
| 186 | + } |
164 | 187 | } |
0 commit comments