2222import java .util .ArrayDeque ;
2323import java .util .ArrayList ;
2424import java .util .Deque ;
25+ import java .util .Iterator ;
2526import java .util .List ;
27+ import java .util .Objects ;
28+ import java .util .stream .Collectors ;
2629
2730/**
2831 * Find spikes, dips and change point in a list of values.
@@ -35,20 +38,21 @@ public class ChangePointOperator implements Operator {
3538
3639 public static final int INPUT_VALUE_COUNT_LIMIT = 1000 ;
3740
38- public record Factory (int channel , String sourceText , int sourceLine , int sourceColumn ) implements OperatorFactory {
41+ public record Factory (int metricChannel , List < Integer > partitionChannel , String sourceText , int sourceLine , int sourceColumn ) implements OperatorFactory {
3942 @ Override
4043 public Operator get (DriverContext driverContext ) {
41- return new ChangePointOperator (driverContext , channel , sourceText , sourceLine , sourceColumn );
44+ return new ChangePointOperator (driverContext , metricChannel , partitionChannel , sourceText , sourceLine , sourceColumn );
4245 }
4346
4447 @ Override
4548 public String describe () {
46- return " ChangePointOperator[channel=" + channel + "]" ;
49+ return ChangePointOperator . describe ( metricChannel , partitionChannel ) ;
4750 }
4851 }
4952
5053 private final DriverContext driverContext ;
51- private final int channel ;
54+ private final int metricChannel ;
55+ private final List <Integer > partitionChannel ;
5256 private final String sourceText ;
5357 private final int sourceLine ;
5458 private final int sourceColumn ;
@@ -60,9 +64,10 @@ public String describe() {
6064
6165 // TODO: make org.elasticsearch.xpack.esql.core.tree.Source available here
6266 // (by modularizing esql-core) and use that instead of the individual fields.
63- public ChangePointOperator (DriverContext driverContext , int channel , String sourceText , int sourceLine , int sourceColumn ) {
67+ public ChangePointOperator (DriverContext driverContext , int metricChannel , List < Integer > partitionChannel , String sourceText , int sourceLine , int sourceColumn ) {
6468 this .driverContext = driverContext ;
65- this .channel = channel ;
69+ this .metricChannel = metricChannel ;
70+ this .partitionChannel = partitionChannel ;
6671 this .sourceText = sourceText ;
6772 this .sourceLine = sourceLine ;
6873 this .sourceColumn = sourceColumn ;
@@ -105,61 +110,140 @@ public Page getOutput() {
105110 }
106111
107112 private void createOutputPages () {
108- int valuesCount = 0 ;
109- for (Page page : inputPages ) {
110- valuesCount += page .getPositionCount ();
111- }
112- boolean tooManyValues = valuesCount > INPUT_VALUE_COUNT_LIMIT ;
113- if (tooManyValues ) {
114- valuesCount = INPUT_VALUE_COUNT_LIMIT ;
113+ int maxValuesCount = 0 ;
114+ {
115+ int valuesCount = 0 ;
116+ String lastPartitionFieldValue = null ;
117+ for (Page inputPage : inputPages ) {
118+ String currentPartitionFieldValue = getCurrentPartitionKey (inputPage , 0 );
119+ if (lastPartitionFieldValue != null ) {
120+ if (Objects .equals (currentPartitionFieldValue , lastPartitionFieldValue ) == false ) {
121+ valuesCount = 0 ;
122+ }
123+ }
124+ lastPartitionFieldValue = currentPartitionFieldValue ;
125+ valuesCount += inputPage .getPositionCount ();
126+ maxValuesCount = Math .max (maxValuesCount , valuesCount );
127+ }
115128 }
129+ boolean tooManyValues = maxValuesCount > INPUT_VALUE_COUNT_LIMIT ;
116130
117- List <Double > values = new ArrayList <>(valuesCount );
118- List <Integer > bucketIndexes = new ArrayList <>(valuesCount );
119- int valuesIndex = 0 ;
131+
132+ List <MlAggsHelper .DoubleBucketValues > bucketValuesPerPartition = new ArrayList <>();
120133 boolean hasNulls = false ;
121134 boolean hasMultivalued = false ;
122- for (Page inputPage : inputPages ) {
123- Block inputBlock = inputPage .getBlock (channel );
124- for (int i = 0 ; i < inputBlock .getPositionCount () && valuesIndex < valuesCount ; i ++) {
125- Object value = BlockUtils .toJavaObject (inputBlock , i );
126- if (value == null ) {
127- hasNulls = true ;
128- valuesIndex ++;
129- } else if (value instanceof List <?>) {
130- hasMultivalued = true ;
131- valuesIndex ++;
132- } else {
133- values .add (((Number ) value ).doubleValue ());
134- bucketIndexes .add (valuesIndex ++);
135+ {
136+ List <Double > values = new ArrayList <>(maxValuesCount );
137+ List <Integer > bucketIndexes = new ArrayList <>(maxValuesCount );
138+ int valuesIndex = 0 ;
139+ String lastPartitionFieldValue = null ;
140+ for (Page inputPage : inputPages ) {
141+ String currentPartitionFieldValue = getCurrentPartitionKey (inputPage , 0 );
142+ if (lastPartitionFieldValue != null ) {
143+ if (Objects .equals (currentPartitionFieldValue , lastPartitionFieldValue ) == false ) {
144+ MlAggsHelper .DoubleBucketValues bucketValues = new MlAggsHelper .DoubleBucketValues (
145+ null ,
146+ values .stream ().mapToDouble (Double ::doubleValue ).toArray (),
147+ bucketIndexes .stream ().mapToInt (Integer ::intValue ).toArray ()
148+ );
149+ bucketValuesPerPartition .add (bucketValues );
150+
151+ values = new ArrayList <>(maxValuesCount );
152+ bucketIndexes = new ArrayList <>(maxValuesCount );
153+ valuesIndex = 0 ;
154+ }
155+ }
156+ lastPartitionFieldValue = currentPartitionFieldValue ;
157+ Block inputBlock = inputPage .getBlock (metricChannel );
158+ for (int i = 0 ; i < inputBlock .getPositionCount () && valuesIndex < maxValuesCount ; i ++, valuesIndex ++) {
159+ Object value = BlockUtils .toJavaObject (inputBlock , i );
160+ if (value == null ) {
161+ hasNulls = true ;
162+ } else if (value instanceof List <?>) {
163+ hasMultivalued = true ;
164+ } else {
165+ values .add (((Number ) value ).doubleValue ());
166+ bucketIndexes .add (valuesIndex );
167+ }
168+ }
169+ }
170+ // Handle last partition separately
171+ // if (lastPartitionFieldValue != null) {
172+ MlAggsHelper .DoubleBucketValues bucketValues = new MlAggsHelper .DoubleBucketValues (
173+ null ,
174+ values .stream ().mapToDouble (Double ::doubleValue ).toArray (),
175+ bucketIndexes .stream ().mapToInt (Integer ::intValue ).toArray ()
176+ );
177+ bucketValuesPerPartition .add (bucketValues );
178+ // }
179+ }
180+
181+ List <ChangeType > changeTypes = new ArrayList <>();
182+ {
183+ for (MlAggsHelper .DoubleBucketValues bucketValues : bucketValuesPerPartition ) {
184+ ChangeType changeType = ChangePointDetector .getChangeType (bucketValues );
185+ if (changeType instanceof ChangeType .Indeterminable indeterminable ) {
186+ warnings (false ).registerException (new IllegalArgumentException (indeterminable .getReason ()));
135187 }
188+ changeTypes .add (changeType );
136189 }
137190 }
138191
139- MlAggsHelper .DoubleBucketValues bucketValues = new MlAggsHelper .DoubleBucketValues (
140- null ,
141- values .stream ().mapToDouble (Double ::doubleValue ).toArray (),
142- bucketIndexes .stream ().mapToInt (Integer ::intValue ).toArray ()
143- );
144- ChangeType changeType = ChangePointDetector .getChangeType (bucketValues );
145- int changePointIndex = changeType .changePoint ();
192+ insertChangePoints (changeTypes );
193+
194+ if (tooManyValues ) {
195+ warnings (true ).registerException (
196+ new IllegalArgumentException ("too many values; keeping only first " + INPUT_VALUE_COUNT_LIMIT + " values" )
197+ );
198+ }
199+ if (hasNulls ) {
200+ warnings (true ).registerException (new IllegalArgumentException ("values contain nulls; skipping them" ));
201+ }
202+ if (hasMultivalued ) {
203+ warnings (true ).registerException (
204+ new IllegalArgumentException (
205+ "values contains multivalued entries; skipping them (please consider reducing them with e.g. MV_AVG or MV_SUM)"
206+ )
207+ );
208+ }
209+ }
146210
211+ private void insertChangePoints (Iterable <ChangeType > changeTypes ) {
212+ Iterator <ChangeType > changeTypesIterator = changeTypes .iterator ();
213+ ChangeType changeType = null ;
214+ if (changeTypesIterator .hasNext ()) {
215+ changeType = changeTypesIterator .next ();
216+ }
147217 BlockFactory blockFactory = driverContext .blockFactory ();
148218 int pageStartIndex = 0 ;
219+ String lastPartitionFieldValue = null ;
149220 while (inputPages .isEmpty () == false ) {
150221 Page inputPage = inputPages .peek ();
151222 Page outputPage ;
152223 Block changeTypeBlock = null ;
153224 Block changePvalueBlock = null ;
154225 boolean success = false ;
226+
227+ String currentPartitionFieldValue = getCurrentPartitionKey (inputPage , 0 );
228+ if (lastPartitionFieldValue != null ) {
229+ if (Objects .equals (currentPartitionFieldValue , lastPartitionFieldValue ) == false ) {
230+ pageStartIndex = 0 ;
231+ if (changeTypesIterator .hasNext ()) {
232+ changeType = changeTypesIterator .next ();
233+ }
234+ }
235+ }
236+ lastPartitionFieldValue = currentPartitionFieldValue ;
237+
155238 try {
156- if (pageStartIndex <= changePointIndex && changePointIndex < pageStartIndex + inputPage .getPositionCount ()) {
239+ // TODO: How to handle case when there are no change points
240+ if (changeType != null && pageStartIndex <= changeType .changePoint () && changeType .changePoint () < pageStartIndex + inputPage .getPositionCount ()) {
157241 try (
158242 BytesRefBlock .Builder changeTypeBlockBuilder = blockFactory .newBytesRefBlockBuilder (inputPage .getPositionCount ());
159243 DoubleBlock .Builder pvalueBlockBuilder = blockFactory .newDoubleBlockBuilder (inputPage .getPositionCount ())
160244 ) {
161245 for (int i = 0 ; i < inputPage .getPositionCount (); i ++) {
162- if (pageStartIndex + i == changePointIndex ) {
246+ if (pageStartIndex + i == changeType . changePoint () ) {
163247 changeTypeBlockBuilder .appendBytesRef (new BytesRef (changeType .getWriteableName ()));
164248 pvalueBlockBuilder .appendDouble (changeType .pValue ());
165249 } else {
@@ -174,8 +258,10 @@ private void createOutputPages() {
174258 changeTypeBlock = blockFactory .newConstantNullBlock (inputPage .getPositionCount ());
175259 changePvalueBlock = blockFactory .newConstantNullBlock (inputPage .getPositionCount ());
176260 }
177-
178- outputPage = inputPage .appendBlocks (new Block [] { changeTypeBlock , changePvalueBlock });
261+ outputPage = inputPage .appendBlocks (new Block []{changeTypeBlock , changePvalueBlock });
262+ if (pageStartIndex + inputPage .getPositionCount () > INPUT_VALUE_COUNT_LIMIT ) {
263+ outputPage = outputPage .subPage (0 , INPUT_VALUE_COUNT_LIMIT - pageStartIndex );
264+ }
179265 success = true ;
180266 } finally {
181267 if (success == false ) {
@@ -187,25 +273,21 @@ private void createOutputPages() {
187273 outputPages .add (outputPage );
188274 pageStartIndex += inputPage .getPositionCount ();
189275 }
276+ }
190277
191- if (changeType instanceof ChangeType .Indeterminable indeterminable ) {
192- warnings (false ).registerException (new IllegalArgumentException (indeterminable .getReason ()));
193- }
194- if (tooManyValues ) {
195- warnings (true ).registerException (
196- new IllegalArgumentException ("too many values; keeping only first " + INPUT_VALUE_COUNT_LIMIT + " values" )
197- );
198- }
199- if (hasNulls ) {
200- warnings (true ).registerException (new IllegalArgumentException ("values contain nulls; skipping them" ));
278+ private String getCurrentPartitionKey (Page page , int i ) {
279+ if (partitionChannel .isEmpty ()) {
280+ return "-default-partition-" ;
201281 }
202- if (hasMultivalued ) {
203- warnings (true ).registerException (
204- new IllegalArgumentException (
205- "values contains multivalued entries; skipping them (please consider reducing them with e.g. MV_AVG or MV_SUM)"
206- )
207- );
282+ assert page .getPositionCount () > 0 ;
283+ StringBuilder builder = new StringBuilder ();
284+ for (Integer partitionChannel : partitionChannel ) {
285+ try (var block = page .getBlock (partitionChannel ).filter (i )) {
286+ BytesRef partitionFieldValue = ((BytesRefBlock ) block ).getBytesRef (i , new BytesRef ());
287+ builder .append (partitionFieldValue .utf8ToString ());
288+ }
208289 }
290+ return builder .toString ();
209291 }
210292
211293 @ Override
@@ -220,7 +302,15 @@ public void close() {
220302
221303 @ Override
222304 public String toString () {
223- return "ChangePointOperator[channel=" + channel + "]" ;
305+ return describe (metricChannel , partitionChannel );
306+ }
307+
308+ private static String describe (int metricChannel , List <Integer > partitionChannel ) {
309+ return "ChangePointOperator[metricChannel="
310+ + metricChannel
311+ + ", partitionChannels="
312+ + partitionChannel .stream ().map (c -> c .toString ()).collect (Collectors .joining ("," , "[" , "]" ))
313+ + "]" ;
224314 }
225315
226316 private Warnings warnings (boolean onlyWarnings ) {
0 commit comments