1010import org .elasticsearch .common .io .stream .StreamInput ;
1111import org .elasticsearch .common .io .stream .StreamOutput ;
1212import org .elasticsearch .compute .operator .ChangePointOperator ;
13+ import org .elasticsearch .xpack .esql .capabilities .PostAnalysisVerificationAware ;
14+ import org .elasticsearch .xpack .esql .common .Failures ;
1315import org .elasticsearch .xpack .esql .core .expression .Attribute ;
1416import org .elasticsearch .xpack .esql .core .expression .AttributeSet ;
1517import org .elasticsearch .xpack .esql .core .expression .Expressions ;
2830import java .util .List ;
2931import java .util .Objects ;
3032
31- public class ChangePoint extends UnaryPlan implements GeneratingPlan <ChangePoint >, SurrogateLogicalPlan {
33+ import static org .elasticsearch .xpack .esql .common .Failure .fail ;
34+
35+ public class ChangePoint extends UnaryPlan implements GeneratingPlan <ChangePoint >, SurrogateLogicalPlan , PostAnalysisVerificationAware {
3236
3337 public static final NamedWriteableRegistry .Entry ENTRY = new NamedWriteableRegistry .Entry (
3438 LogicalPlan .class ,
@@ -167,15 +171,18 @@ public boolean equals(Object obj) {
167171 && Objects .equals (targetPvalue , other .targetPvalue );
168172 }
169173
174+ private Order order () {
175+ return new Order (source (), key , Order .OrderDirection .ASC , Order .NullsPosition .ANY );
176+ }
177+
170178 @ Override
171179 public LogicalPlan surrogate () {
172180 // ChangePoint should always run on the coordinating node after the data is collected
173181 // in sorted order. This is enforced by adding OrderBy here.
174182 // Furthermore, ChangePoint should be called with at most 1000 data points. That's
175183 // enforced by the Limits here. The first Limit of N+1 data points is necessary to
176184 // generate a possible warning, the second Limit of N is to truncate the output.
177- Order order = new Order (source (), key , Order .OrderDirection .ASC , Order .NullsPosition .ANY );
178- OrderBy orderBy = new OrderBy (source (), child (), List .of (order ));
185+ OrderBy orderBy = new OrderBy (source (), child (), List .of (order ()));
179186 Limit limit = new Limit (
180187 source (),
181188 new Literal (Source .EMPTY , ChangePointOperator .INPUT_VALUE_COUNT_LIMIT + 1 , DataType .INTEGER ),
@@ -184,4 +191,17 @@ public LogicalPlan surrogate() {
184191 ChangePoint changePoint = new ChangePoint (source (), limit , value , key , targetType , targetPvalue );
185192 return new Limit (source (), new Literal (Source .EMPTY , ChangePointOperator .INPUT_VALUE_COUNT_LIMIT , DataType .INTEGER ), changePoint );
186193 }
194+
195+ @ Override
196+ public void postAnalysisVerification (Failures failures ) {
197+ // Key must be sortable
198+ Order order = order ();
199+ if (DataType .isSortable (order .dataType ()) == false ) {
200+ failures .add (fail (this , "change point key [" + key .name () + "] must be sortable" ));
201+ }
202+ // Value must be a number
203+ if (value .dataType ().isNumeric () == false ) {
204+ failures .add (fail (this , "change point value [" + value .name () + "] must be numeric" ));
205+ }
206+ }
187207}
0 commit comments