1919import org .elasticsearch .xpack .ml .aggs .changepoint .ChangeType ;
2020
2121import java .util .ArrayList ;
22- import java .util .Arrays ;
2322import java .util .List ;
2423
2524public class ChangePointOperator implements Operator {
2625
2726 // TODO: close upon failure / interrupt
2827
28+ public static final int INPUT_VALUE_COUNT_LIMIT = 1000 ;
29+
2930 public record Factory (int inputChannel , String sourceText , int sourceLine , int sourceColumn ) implements OperatorFactory {
3031 @ Override
3132 public Operator get (DriverContext driverContext ) {
@@ -103,14 +104,18 @@ private void createOutputPages() {
103104 for (Page page : inputPages ) {
104105 valuesCount += page .getPositionCount ();
105106 }
107+ boolean tooManyValues = valuesCount > INPUT_VALUE_COUNT_LIMIT ;
108+ if (tooManyValues ) {
109+ valuesCount = INPUT_VALUE_COUNT_LIMIT ;
110+ }
106111
107112 // TODO: account for this memory?
108113 double [] values = new double [valuesCount ];
109114 int valuesIndex = 0 ;
110115 boolean hasNulls = false ;
111116 for (Page inputPage : inputPages ) {
112117 Block inputBlock = inputPage .getBlock (inputChannel );
113- for (int i = 0 ; i < inputBlock .getPositionCount (); i ++) {
118+ for (int i = 0 ; i < inputBlock .getPositionCount () && valuesIndex < valuesCount ; i ++) {
114119 Object value = BlockUtils .toJavaObject (inputBlock , i );
115120 if (value == null ) {
116121 hasNulls = true ;
@@ -124,13 +129,6 @@ private void createOutputPages() {
124129 ChangeType changeType = ChangePointDetector .getChangeType (new MlAggsHelper .DoubleBucketValues (null , values ));
125130 int changePointIndex = changeType .changePoint ();
126131
127- if (changeType instanceof ChangeType .Indeterminable indeterminable ) {
128- warnings (false ).registerException (new IllegalArgumentException (indeterminable .getReason ()));
129- }
130- if (hasNulls ) {
131- warnings (true ).registerException (new IllegalArgumentException ("values contain nulls; treating them as zeroes" ));
132- }
133-
134132 BlockFactory blockFactory = driverContext .blockFactory ();
135133 int pageStartIndex = 0 ;
136134 for (Page inputPage : inputPages ) {
@@ -164,6 +162,16 @@ private void createOutputPages() {
164162 }
165163
166164 inputPages .clear ();
165+
166+ if (changeType instanceof ChangeType .Indeterminable indeterminable ) {
167+ warnings (false ).registerException (new IllegalArgumentException (indeterminable .getReason ()));
168+ }
169+ if (tooManyValues ) {
170+ warnings (true ).registerException (new IllegalArgumentException ("too many values; keeping only first " + INPUT_VALUE_COUNT_LIMIT + " values" ));
171+ }
172+ if (hasNulls ) {
173+ warnings (true ).registerException (new IllegalArgumentException ("values contain nulls; treating them as zeroes" ));
174+ }
167175 }
168176
169177 @ Override
0 commit comments