2222import java .util .ArrayDeque ;
2323import java .util .ArrayList ;
2424import java .util .Deque ;
25+ import java .util .Iterator ;
2526import java .util .List ;
27+ import java .util .Optional ;
2628
2729/**
2830 * Find spikes, dips and change point in a list of values.
@@ -35,20 +37,21 @@ public class ChangePointOperator implements Operator {
3537
3638 public static final int INPUT_VALUE_COUNT_LIMIT = 1000 ;
3739
38- public record Factory (int channel , String sourceText , int sourceLine , int sourceColumn ) implements OperatorFactory {
40+ public record Factory (int metricChannel , Optional < Integer > partitionChannel , String sourceText , int sourceLine , int sourceColumn ) implements OperatorFactory {
3941 @ Override
4042 public Operator get (DriverContext driverContext ) {
41- return new ChangePointOperator (driverContext , channel , sourceText , sourceLine , sourceColumn );
43+ return new ChangePointOperator (driverContext , metricChannel , partitionChannel , sourceText , sourceLine , sourceColumn );
4244 }
4345
4446 @ Override
4547 public String describe () {
46- return " ChangePointOperator[channel=" + channel + "]" ;
48+ return ChangePointOperator . describe ( metricChannel , partitionChannel ) ;
4749 }
4850 }
4951
5052 private final DriverContext driverContext ;
51- private final int channel ;
53+ private final int metricChannel ;
54+ private final Optional <Integer > partitionChannel ;
5255 private final String sourceText ;
5356 private final int sourceLine ;
5457 private final int sourceColumn ;
@@ -60,9 +63,10 @@ public String describe() {
6063
6164 // TODO: make org.elasticsearch.xpack.esql.core.tree.Source available here
6265 // (by modularizing esql-core) and use that instead of the individual fields.
63- public ChangePointOperator (DriverContext driverContext , int channel , String sourceText , int sourceLine , int sourceColumn ) {
66+ public ChangePointOperator (DriverContext driverContext , int metricChannel , Optional < Integer > partitionChannel , String sourceText , int sourceLine , int sourceColumn ) {
6467 this .driverContext = driverContext ;
65- this .channel = channel ;
68+ this .metricChannel = metricChannel ;
69+ this .partitionChannel = partitionChannel ;
6670 this .sourceText = sourceText ;
6771 this .sourceLine = sourceLine ;
6872 this .sourceColumn = sourceColumn ;
@@ -105,61 +109,134 @@ public Page getOutput() {
105109 }
106110
107111 private void createOutputPages () {
112+ int maxValuesCount = checkValueCounts ();
113+ List <MlAggsHelper .DoubleBucketValues > bucketValuesPerPartition = checkNullAndMultivalued (maxValuesCount );
114+
115+ List <ChangeType > changeTypes = new ArrayList <>();
116+ for (MlAggsHelper .DoubleBucketValues bucketValues : bucketValuesPerPartition ) {
117+ ChangeType changeType = ChangePointDetector .getChangeType (bucketValues );
118+ if (changeType instanceof ChangeType .Indeterminable indeterminable ) {
119+ warnings (false ).registerException (new IllegalArgumentException (indeterminable .getReason ()));
120+ }
121+ changeTypes .add (changeType );
122+ }
123+
124+ insertChangePoints (changeTypes );
125+ }
126+
127+ private int checkValueCounts () {
128+ int maxValuesCount = 0 ;
108129 int valuesCount = 0 ;
109- for (Page page : inputPages ) {
110- valuesCount += page .getPositionCount ();
130+ String lastPartitionFieldValue = null ;
131+ for (Page inputPage : inputPages ) {
132+ String currentPartitionFieldValue = getCurrentPartitionFieldValue (inputPage );
133+ if (lastPartitionFieldValue != null ) {
134+ if (currentPartitionFieldValue .equals (lastPartitionFieldValue ) == false ) {
135+ valuesCount = 0 ;
136+ }
137+ }
138+ lastPartitionFieldValue = currentPartitionFieldValue ;
139+ valuesCount += inputPage .getPositionCount ();
140+ maxValuesCount = Math .max (maxValuesCount , valuesCount );
111141 }
112- boolean tooManyValues = valuesCount > INPUT_VALUE_COUNT_LIMIT ;
142+ boolean tooManyValues = maxValuesCount > INPUT_VALUE_COUNT_LIMIT ;
113143 if (tooManyValues ) {
114- valuesCount = INPUT_VALUE_COUNT_LIMIT ;
144+ warnings (true ).registerException (
145+ new IllegalArgumentException ("too many values; keeping only first " + INPUT_VALUE_COUNT_LIMIT + " values" )
146+ );
147+ maxValuesCount = INPUT_VALUE_COUNT_LIMIT ;
115148 }
149+ return maxValuesCount ;
150+ }
116151
117- List <Double > values = new ArrayList <>(valuesCount );
118- List <Integer > bucketIndexes = new ArrayList <>(valuesCount );
119- int valuesIndex = 0 ;
152+ private List <MlAggsHelper .DoubleBucketValues > checkNullAndMultivalued (int maxValuesCount ) {
153+ List <MlAggsHelper .DoubleBucketValues > result = new ArrayList <>();
154+ List <Double > values = new ArrayList <>(maxValuesCount );
155+ List <Integer > bucketIndexes = new ArrayList <>(maxValuesCount );
120156 boolean hasNulls = false ;
121157 boolean hasMultivalued = false ;
158+ String lastPartitionFieldValue = null ;
122159 for (Page inputPage : inputPages ) {
123- Block inputBlock = inputPage .getBlock (channel );
124- for (int i = 0 ; i < inputBlock .getPositionCount () && valuesIndex < valuesCount ; i ++) {
160+ String currentPartitionFieldValue = getCurrentPartitionFieldValue (inputPage );
161+ if (lastPartitionFieldValue != null ) {
162+ if (currentPartitionFieldValue .equals (lastPartitionFieldValue ) == false ) {
163+ MlAggsHelper .DoubleBucketValues bucketValues = new MlAggsHelper .DoubleBucketValues (
164+ null ,
165+ values .stream ().mapToDouble (Double ::doubleValue ).toArray (),
166+ bucketIndexes .stream ().mapToInt (Integer ::intValue ).toArray ()
167+ );
168+ result .add (bucketValues );
169+
170+ values = new ArrayList <>(maxValuesCount );
171+ bucketIndexes = new ArrayList <>(maxValuesCount );
172+ }
173+ }
174+ lastPartitionFieldValue = currentPartitionFieldValue ;
175+ Block inputBlock = inputPage .getBlock (metricChannel );
176+ for (int i = 0 , valuesIndex = 0 ; i < inputBlock .getPositionCount () && valuesIndex < maxValuesCount ; i ++, valuesIndex ++) {
125177 Object value = BlockUtils .toJavaObject (inputBlock , i );
126178 if (value == null ) {
127179 hasNulls = true ;
128- valuesIndex ++;
129180 } else if (value instanceof List <?>) {
130181 hasMultivalued = true ;
131- valuesIndex ++;
132182 } else {
133183 values .add (((Number ) value ).doubleValue ());
134- bucketIndexes .add (valuesIndex ++ );
184+ bucketIndexes .add (valuesIndex );
135185 }
136186 }
137187 }
188+ // Handle last partition separately
189+ if (lastPartitionFieldValue != null ) {
190+ MlAggsHelper .DoubleBucketValues bucketValues = new MlAggsHelper .DoubleBucketValues (
191+ null ,
192+ values .stream ().mapToDouble (Double ::doubleValue ).toArray (),
193+ bucketIndexes .stream ().mapToInt (Integer ::intValue ).toArray ()
194+ );
195+ result .add (bucketValues );
196+ }
138197
139- MlAggsHelper .DoubleBucketValues bucketValues = new MlAggsHelper .DoubleBucketValues (
140- null ,
141- values .stream ().mapToDouble (Double ::doubleValue ).toArray (),
142- bucketIndexes .stream ().mapToInt (Integer ::intValue ).toArray ()
143- );
144- ChangeType changeType = ChangePointDetector .getChangeType (bucketValues );
145- int changePointIndex = changeType .changePoint ();
198+ if (hasNulls ) {
199+ warnings (true ).registerException (new IllegalArgumentException ("values contain nulls; skipping them" ));
200+ }
201+ if (hasMultivalued ) {
202+ warnings (true ).registerException (
203+ new IllegalArgumentException (
204+ "values contains multivalued entries; skipping them (please consider reducing them with e.g. MV_AVG or MV_SUM)"
205+ )
206+ );
207+ }
208+ return result ;
209+ }
146210
211+ private void insertChangePoints (Iterable <ChangeType > changeTypes ) {
212+ Iterator <ChangeType > changeTypesIterator = changeTypes .iterator ();
213+ ChangeType changeType = changeTypesIterator .next ();
147214 BlockFactory blockFactory = driverContext .blockFactory ();
148215 int pageStartIndex = 0 ;
216+ String lastPartitionFieldValue = null ;
149217 while (inputPages .isEmpty () == false ) {
150218 Page inputPage = inputPages .peek ();
151219 Page outputPage ;
152220 Block changeTypeBlock = null ;
153221 Block changePvalueBlock = null ;
154222 boolean success = false ;
223+
224+ String currentPartitionFieldValue = getCurrentPartitionFieldValue (inputPage );
225+ if (lastPartitionFieldValue != null ) {
226+ if (currentPartitionFieldValue .equals (lastPartitionFieldValue ) == false ) {
227+ pageStartIndex = 0 ;
228+ }
229+ }
230+ lastPartitionFieldValue = currentPartitionFieldValue ;
231+
155232 try {
156- if (pageStartIndex <= changePointIndex && changePointIndex < pageStartIndex + inputPage .getPositionCount ()) {
233+ if (pageStartIndex <= changeType . changePoint () && changeType . changePoint () < pageStartIndex + inputPage .getPositionCount ()) {
157234 try (
158235 BytesRefBlock .Builder changeTypeBlockBuilder = blockFactory .newBytesRefBlockBuilder (inputPage .getPositionCount ());
159236 DoubleBlock .Builder pvalueBlockBuilder = blockFactory .newDoubleBlockBuilder (inputPage .getPositionCount ())
160237 ) {
161238 for (int i = 0 ; i < inputPage .getPositionCount (); i ++) {
162- if (pageStartIndex + i == changePointIndex ) {
239+ if (pageStartIndex + i == changeType . changePoint () ) {
163240 changeTypeBlockBuilder .appendBytesRef (new BytesRef (changeType .getWriteableName ()));
164241 pvalueBlockBuilder .appendDouble (changeType .pValue ());
165242 } else {
@@ -170,6 +247,9 @@ private void createOutputPages() {
170247 changeTypeBlock = changeTypeBlockBuilder .build ();
171248 changePvalueBlock = pvalueBlockBuilder .build ();
172249 }
250+ if (changeTypesIterator .hasNext ()) {
251+ changeType = changeTypesIterator .next ();
252+ }
173253 } else {
174254 changeTypeBlock = blockFactory .newConstantNullBlock (inputPage .getPositionCount ());
175255 changePvalueBlock = blockFactory .newConstantNullBlock (inputPage .getPositionCount ());
@@ -187,24 +267,14 @@ private void createOutputPages() {
187267 outputPages .add (outputPage );
188268 pageStartIndex += inputPage .getPositionCount ();
189269 }
270+ }
190271
191- if (changeType instanceof ChangeType .Indeterminable indeterminable ) {
192- warnings (false ).registerException (new IllegalArgumentException (indeterminable .getReason ()));
193- }
194- if (tooManyValues ) {
195- warnings (true ).registerException (
196- new IllegalArgumentException ("too many values; keeping only first " + INPUT_VALUE_COUNT_LIMIT + " values" )
197- );
198- }
199- if (hasNulls ) {
200- warnings (true ).registerException (new IllegalArgumentException ("values contain nulls; skipping them" ));
201- }
202- if (hasMultivalued ) {
203- warnings (true ).registerException (
204- new IllegalArgumentException (
205- "values contains multivalued entries; skipping them (please consider reducing them with e.g. MV_AVG or MV_SUM)"
206- )
207- );
272+ private String getCurrentPartitionFieldValue (Page inputPage ) {
273+ assert partitionChannel .isPresent ();
274+ assert inputPage .getPositionCount () > 0 ;
275+ try (var block = inputPage .getBlock (partitionChannel .get ()).filter (0 )) {
276+ BytesRef partition = ((BytesRefBlock ) block ).getBytesRef (0 , new BytesRef ());
277+ return partition .utf8ToString ();
208278 }
209279 }
210280
@@ -220,7 +290,11 @@ public void close() {
220290
221291 @ Override
222292 public String toString () {
223- return "ChangePointOperator[channel=" + channel + "]" ;
293+ return describe (metricChannel , partitionChannel );
294+ }
295+
296+ private static String describe (int metricChannel , Optional <Integer > partitionChannel ) {
297+ return "ChangePointOperator[metricChannel=" + metricChannel + (partitionChannel .isPresent () ? ", partitionChannel=" + partitionChannel .get () : "" ) + "]" ;
224298 }
225299
226300 private Warnings warnings (boolean onlyWarnings ) {
0 commit comments