2121
2222import java .util .ArrayList ;
2323import java .util .Deque ;
24+ import java .util .Iterator ;
2425import java .util .LinkedList ;
2526import java .util .List ;
2627
@@ -35,20 +36,21 @@ public class ChangePointOperator implements Operator {
3536
3637 public static final int INPUT_VALUE_COUNT_LIMIT = 1000 ;
3738
38- public record Factory (int channel , String sourceText , int sourceLine , int sourceColumn ) implements OperatorFactory {
39+ public record Factory (int metricChannel , int partitionChannel , String sourceText , int sourceLine , int sourceColumn ) implements OperatorFactory {
3940 @ Override
4041 public Operator get (DriverContext driverContext ) {
41- return new ChangePointOperator (driverContext , channel , sourceText , sourceLine , sourceColumn );
42+ return new ChangePointOperator (driverContext , metricChannel , partitionChannel , sourceText , sourceLine , sourceColumn );
4243 }
4344
4445 @ Override
4546 public String describe () {
46- return "ChangePointOperator[channel =" + channel + "]" ;
47+ return "ChangePointOperator[metricChannel =" + metricChannel + "]" ;
4748 }
4849 }
4950
5051 private final DriverContext driverContext ;
51- private final int channel ;
52+ private final int metricChannel ;
53+ private final int partitionChannel ;
5254 private final String sourceText ;
5355 private final int sourceLine ;
5456 private final int sourceColumn ;
@@ -60,9 +62,10 @@ public String describe() {
6062
6163 // TODO: make org.elasticsearch.xpack.esql.core.tree.Source available here
6264 // (by modularizing esql-core) and use that instead of the individual fields.
63- public ChangePointOperator (DriverContext driverContext , int channel , String sourceText , int sourceLine , int sourceColumn ) {
65+ public ChangePointOperator (DriverContext driverContext , int metricChannel , int partitionChannel , String sourceText , int sourceLine , int sourceColumn ) {
6466 this .driverContext = driverContext ;
65- this .channel = channel ;
67+ this .metricChannel = metricChannel ;
68+ this .partitionChannel = partitionChannel ;
6669 this .sourceText = sourceText ;
6770 this .sourceLine = sourceLine ;
6871 this .sourceColumn = sourceColumn ;
@@ -105,61 +108,134 @@ public Page getOutput() {
105108 }
106109
107110 private void createOutputPages () {
111+ int maxValuesCount = checkValueCounts ();
112+ List <MlAggsHelper .DoubleBucketValues > bucketValuesPerPartition = checkNullAndMultivalued (maxValuesCount );
113+
114+ List <ChangeType > changeTypes = new ArrayList <>();
115+ for (MlAggsHelper .DoubleBucketValues bucketValues : bucketValuesPerPartition ) {
116+ ChangeType changeType = ChangePointDetector .getChangeType (bucketValues );
117+ if (changeType instanceof ChangeType .Indeterminable indeterminable ) {
118+ warnings (false ).registerException (new IllegalArgumentException (indeterminable .getReason ()));
119+ }
120+ changeTypes .add (changeType );
121+ }
122+
123+ insertChangePoints (changeTypes );
124+ }
125+
126+ private int checkValueCounts () {
127+ int maxValuesCount = 0 ;
108128 int valuesCount = 0 ;
109- for (Page page : inputPages ) {
110- valuesCount += page .getPositionCount ();
129+ Block lastPartitionFieldValue = null ;
130+ for (Page inputPage : inputPages ) {
131+ Block currentPartitionFieldValue = getCurrentPartitionFieldValue (inputPage );
132+ if (lastPartitionFieldValue != null ) {
133+ if (currentPartitionFieldValue .equals (lastPartitionFieldValue ) == false ) {
134+ valuesCount = 0 ;
135+ }
136+ }
137+ lastPartitionFieldValue = currentPartitionFieldValue ;
138+ valuesCount += inputPage .getPositionCount ();
139+ maxValuesCount = Math .max (maxValuesCount , valuesCount );
111140 }
112- boolean tooManyValues = valuesCount > INPUT_VALUE_COUNT_LIMIT ;
141+ boolean tooManyValues = maxValuesCount > INPUT_VALUE_COUNT_LIMIT ;
113142 if (tooManyValues ) {
114- valuesCount = INPUT_VALUE_COUNT_LIMIT ;
143+ warnings (true ).registerException (
144+ new IllegalArgumentException ("too many values; keeping only first " + INPUT_VALUE_COUNT_LIMIT + " values" )
145+ );
146+ maxValuesCount = INPUT_VALUE_COUNT_LIMIT ;
115147 }
148+ return maxValuesCount ;
149+ }
116150
117- List <Double > values = new ArrayList <>(valuesCount );
118- List <Integer > bucketIndexes = new ArrayList <>(valuesCount );
119- int valuesIndex = 0 ;
151+ private List <MlAggsHelper .DoubleBucketValues > checkNullAndMultivalued (int maxValuesCount ) {
152+ List <MlAggsHelper .DoubleBucketValues > result = new ArrayList <>();
153+ List <Double > values = new ArrayList <>(maxValuesCount );
154+ List <Integer > bucketIndexes = new ArrayList <>(maxValuesCount );
120155 boolean hasNulls = false ;
121156 boolean hasMultivalued = false ;
157+ Block lastPartitionFieldValue = null ;
122158 for (Page inputPage : inputPages ) {
123- Block inputBlock = inputPage .getBlock (channel );
124- for (int i = 0 ; i < inputBlock .getPositionCount () && valuesIndex < valuesCount ; i ++) {
159+ Block currentPartitionFieldValue = getCurrentPartitionFieldValue (inputPage );
160+ if (lastPartitionFieldValue != null ) {
161+ if (currentPartitionFieldValue .equals (lastPartitionFieldValue ) == false ) {
162+ MlAggsHelper .DoubleBucketValues bucketValues = new MlAggsHelper .DoubleBucketValues (
163+ null ,
164+ values .stream ().mapToDouble (Double ::doubleValue ).toArray (),
165+ bucketIndexes .stream ().mapToInt (Integer ::intValue ).toArray ()
166+ );
167+ result .add (bucketValues );
168+
169+ values = new ArrayList <>(maxValuesCount );
170+ bucketIndexes = new ArrayList <>(maxValuesCount );
171+ }
172+ }
173+ lastPartitionFieldValue = currentPartitionFieldValue ;
174+ Block inputBlock = inputPage .getBlock (metricChannel );
175+ for (int i = 0 , valuesIndex = 0 ; i < inputBlock .getPositionCount () && valuesIndex < maxValuesCount ; i ++, valuesIndex ++) {
125176 Object value = BlockUtils .toJavaObject (inputBlock , i );
126177 if (value == null ) {
127178 hasNulls = true ;
128- valuesIndex ++;
129179 } else if (value instanceof List <?>) {
130180 hasMultivalued = true ;
131- valuesIndex ++;
132181 } else {
133182 values .add (((Number ) value ).doubleValue ());
134- bucketIndexes .add (valuesIndex ++ );
183+ bucketIndexes .add (valuesIndex );
135184 }
136185 }
137186 }
187+ // Handle last partition separately
188+ if (lastPartitionFieldValue != null ) {
189+ MlAggsHelper .DoubleBucketValues bucketValues = new MlAggsHelper .DoubleBucketValues (
190+ null ,
191+ values .stream ().mapToDouble (Double ::doubleValue ).toArray (),
192+ bucketIndexes .stream ().mapToInt (Integer ::intValue ).toArray ()
193+ );
194+ result .add (bucketValues );
195+ }
138196
139- MlAggsHelper .DoubleBucketValues bucketValues = new MlAggsHelper .DoubleBucketValues (
140- null ,
141- values .stream ().mapToDouble (Double ::doubleValue ).toArray (),
142- bucketIndexes .stream ().mapToInt (Integer ::intValue ).toArray ()
143- );
144- ChangeType changeType = ChangePointDetector .getChangeType (bucketValues );
145- int changePointIndex = changeType .changePoint ();
197+ if (hasNulls ) {
198+ warnings (true ).registerException (new IllegalArgumentException ("values contain nulls; skipping them" ));
199+ }
200+ if (hasMultivalued ) {
201+ warnings (true ).registerException (
202+ new IllegalArgumentException (
203+ "values contains multivalued entries; skipping them (please consider reducing them with e.g. MV_AVG or MV_SUM)"
204+ )
205+ );
206+ }
207+ return result ;
208+ }
146209
210+ private void insertChangePoints (Iterable <ChangeType > changeTypes ) {
211+ Iterator <ChangeType > changeTypesIterator = changeTypes .iterator ();
212+ ChangeType changeType = changeTypesIterator .next ();
147213 BlockFactory blockFactory = driverContext .blockFactory ();
148214 int pageStartIndex = 0 ;
215+ Block lastPartitionFieldValue = null ;
149216 while (inputPages .isEmpty () == false ) {
150217 Page inputPage = inputPages .peek ();
151218 Page outputPage ;
152219 Block changeTypeBlock = null ;
153220 Block changePvalueBlock = null ;
154221 boolean success = false ;
222+
223+ Block currentPartitionFieldValue = getCurrentPartitionFieldValue (inputPage );
224+ if (lastPartitionFieldValue != null ) {
225+ if (currentPartitionFieldValue .equals (lastPartitionFieldValue ) == false ) {
226+ pageStartIndex = 0 ;
227+ }
228+ }
229+ lastPartitionFieldValue = currentPartitionFieldValue ;
230+
155231 try {
156- if (pageStartIndex <= changePointIndex && changePointIndex < pageStartIndex + inputPage .getPositionCount ()) {
232+ if (pageStartIndex <= changeType . changePoint () && changeType . changePoint () < pageStartIndex + inputPage .getPositionCount ()) {
157233 try (
158234 BytesRefBlock .Builder changeTypeBlockBuilder = blockFactory .newBytesRefBlockBuilder (inputPage .getPositionCount ());
159235 DoubleBlock .Builder pvalueBlockBuilder = blockFactory .newDoubleBlockBuilder (inputPage .getPositionCount ())
160236 ) {
161237 for (int i = 0 ; i < inputPage .getPositionCount (); i ++) {
162- if (pageStartIndex + i == changePointIndex ) {
238+ if (pageStartIndex + i == changeType . changePoint () ) {
163239 changeTypeBlockBuilder .appendBytesRef (new BytesRef (changeType .getWriteableName ()));
164240 pvalueBlockBuilder .appendDouble (changeType .pValue ());
165241 } else {
@@ -170,6 +246,9 @@ private void createOutputPages() {
170246 changeTypeBlock = changeTypeBlockBuilder .build ();
171247 changePvalueBlock = pvalueBlockBuilder .build ();
172248 }
249+ if (changeTypesIterator .hasNext ()) {
250+ changeType = changeTypesIterator .next ();
251+ }
173252 } else {
174253 changeTypeBlock = blockFactory .newConstantNullBlock (inputPage .getPositionCount ());
175254 changePvalueBlock = blockFactory .newConstantNullBlock (inputPage .getPositionCount ());
@@ -187,25 +266,10 @@ private void createOutputPages() {
187266 outputPages .add (outputPage );
188267 pageStartIndex += inputPage .getPositionCount ();
189268 }
269+ }
190270
191- if (changeType instanceof ChangeType .Indeterminable indeterminable ) {
192- warnings (false ).registerException (new IllegalArgumentException (indeterminable .getReason ()));
193- }
194- if (tooManyValues ) {
195- warnings (true ).registerException (
196- new IllegalArgumentException ("too many values; keeping only first " + INPUT_VALUE_COUNT_LIMIT + " values" )
197- );
198- }
199- if (hasNulls ) {
200- warnings (true ).registerException (new IllegalArgumentException ("values contain nulls; skipping them" ));
201- }
202- if (hasMultivalued ) {
203- warnings (true ).registerException (
204- new IllegalArgumentException (
205- "values contains multivalued entries; skipping them (please consider reducing them with e.g. MV_AVG or MV_SUM)"
206- )
207- );
208- }
271+ private Block getCurrentPartitionFieldValue (Page inputPage ) {
272+ return inputPage .getBlock (partitionChannel ).filter (0 );
209273 }
210274
211275 @ Override
@@ -220,7 +284,7 @@ public void close() {
220284
221285 @ Override
222286 public String toString () {
223- return "ChangePointOperator[channel =" + channel + "]" ;
287+ return "ChangePointOperator[metricChannel =" + metricChannel + ", partitionChannel=" + partitionChannel + "]" ;
224288 }
225289
226290 private Warnings warnings (boolean onlyWarnings ) {
0 commit comments