Skip to content

Commit 816884a

Browse files
committed
Introduce CHANGE_POINT ... BY ... syntax
1 parent 9ae7759 commit 816884a

File tree

34 files changed

+2758
-1987
lines changed

34 files changed

+2758
-1987
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ChangePointOperator.java

Lines changed: 134 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@
2222
import java.util.ArrayDeque;
2323
import java.util.ArrayList;
2424
import java.util.Deque;
25+
import java.util.Iterator;
2526
import java.util.List;
27+
import java.util.Objects;
28+
import java.util.Optional;
2629

2730
/**
2831
* Find spikes, dips and change point in a list of values.
@@ -35,20 +38,21 @@ public class ChangePointOperator implements Operator {
3538

3639
public static final int INPUT_VALUE_COUNT_LIMIT = 1000;
3740

38-
public record Factory(int channel, String sourceText, int sourceLine, int sourceColumn) implements OperatorFactory {
41+
public record Factory(int metricChannel, Optional<Integer> partitionChannel, String sourceText, int sourceLine, int sourceColumn) implements OperatorFactory {
3942
@Override
4043
public Operator get(DriverContext driverContext) {
41-
return new ChangePointOperator(driverContext, channel, sourceText, sourceLine, sourceColumn);
44+
return new ChangePointOperator(driverContext, metricChannel, partitionChannel, sourceText, sourceLine, sourceColumn);
4245
}
4346

4447
@Override
4548
public String describe() {
46-
return "ChangePointOperator[channel=" + channel + "]";
49+
return ChangePointOperator.describe(metricChannel, partitionChannel);
4750
}
4851
}
4952

5053
private final DriverContext driverContext;
51-
private final int channel;
54+
private final int metricChannel;
55+
private final Optional<Integer> partitionChannel;
5256
private final String sourceText;
5357
private final int sourceLine;
5458
private final int sourceColumn;
@@ -60,9 +64,10 @@ public String describe() {
6064

6165
// TODO: make org.elasticsearch.xpack.esql.core.tree.Source available here
6266
// (by modularizing esql-core) and use that instead of the individual fields.
63-
public ChangePointOperator(DriverContext driverContext, int channel, String sourceText, int sourceLine, int sourceColumn) {
67+
public ChangePointOperator(DriverContext driverContext, int metricChannel, Optional<Integer> partitionChannel, String sourceText, int sourceLine, int sourceColumn) {
6468
this.driverContext = driverContext;
65-
this.channel = channel;
69+
this.metricChannel = metricChannel;
70+
this.partitionChannel = partitionChannel;
6671
this.sourceText = sourceText;
6772
this.sourceLine = sourceLine;
6873
this.sourceColumn = sourceColumn;
@@ -105,61 +110,140 @@ public Page getOutput() {
105110
}
106111

107112
private void createOutputPages() {
108-
int valuesCount = 0;
109-
for (Page page : inputPages) {
110-
valuesCount += page.getPositionCount();
111-
}
112-
boolean tooManyValues = valuesCount > INPUT_VALUE_COUNT_LIMIT;
113-
if (tooManyValues) {
114-
valuesCount = INPUT_VALUE_COUNT_LIMIT;
113+
int maxValuesCount = 0;
114+
{
115+
int valuesCount = 0;
116+
String lastPartitionFieldValue = null;
117+
for (Page inputPage : inputPages) {
118+
String currentPartitionFieldValue = partitionChannel.isPresent() ? getCurrentPartitionFieldValue(inputPage) : null;
119+
if (lastPartitionFieldValue != null) {
120+
if (Objects.equals(currentPartitionFieldValue, lastPartitionFieldValue) == false) {
121+
valuesCount = 0;
122+
}
123+
}
124+
lastPartitionFieldValue = currentPartitionFieldValue;
125+
valuesCount += inputPage.getPositionCount();
126+
maxValuesCount = Math.max(maxValuesCount, valuesCount);
127+
}
115128
}
129+
boolean tooManyValues = maxValuesCount > INPUT_VALUE_COUNT_LIMIT;
116130

117-
List<Double> values = new ArrayList<>(valuesCount);
118-
List<Integer> bucketIndexes = new ArrayList<>(valuesCount);
119-
int valuesIndex = 0;
131+
132+
List<MlAggsHelper.DoubleBucketValues> bucketValuesPerPartition = new ArrayList<>();
120133
boolean hasNulls = false;
121134
boolean hasMultivalued = false;
122-
for (Page inputPage : inputPages) {
123-
Block inputBlock = inputPage.getBlock(channel);
124-
for (int i = 0; i < inputBlock.getPositionCount() && valuesIndex < valuesCount; i++) {
125-
Object value = BlockUtils.toJavaObject(inputBlock, i);
126-
if (value == null) {
127-
hasNulls = true;
128-
valuesIndex++;
129-
} else if (value instanceof List<?>) {
130-
hasMultivalued = true;
131-
valuesIndex++;
132-
} else {
133-
values.add(((Number) value).doubleValue());
134-
bucketIndexes.add(valuesIndex++);
135+
{
136+
List<Double> values = new ArrayList<>(maxValuesCount);
137+
List<Integer> bucketIndexes = new ArrayList<>(maxValuesCount);
138+
int valuesIndex = 0;
139+
String lastPartitionFieldValue = null;
140+
for (Page inputPage : inputPages) {
141+
String currentPartitionFieldValue = partitionChannel.isPresent() ? getCurrentPartitionFieldValue(inputPage) : null;
142+
if (lastPartitionFieldValue != null) {
143+
if (Objects.equals(currentPartitionFieldValue, lastPartitionFieldValue) == false) {
144+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
145+
null,
146+
values.stream().mapToDouble(Double::doubleValue).toArray(),
147+
bucketIndexes.stream().mapToInt(Integer::intValue).toArray()
148+
);
149+
bucketValuesPerPartition.add(bucketValues);
150+
151+
values = new ArrayList<>(maxValuesCount);
152+
bucketIndexes = new ArrayList<>(maxValuesCount);
153+
valuesIndex = 0;
154+
}
155+
}
156+
lastPartitionFieldValue = currentPartitionFieldValue;
157+
Block inputBlock = inputPage.getBlock(metricChannel);
158+
for (int i = 0; i < inputBlock.getPositionCount() && valuesIndex < maxValuesCount; i++, valuesIndex++) {
159+
Object value = BlockUtils.toJavaObject(inputBlock, i);
160+
if (value == null) {
161+
hasNulls = true;
162+
} else if (value instanceof List<?>) {
163+
hasMultivalued = true;
164+
} else {
165+
values.add(((Number) value).doubleValue());
166+
bucketIndexes.add(valuesIndex);
167+
}
135168
}
136169
}
170+
// Handle last partition separately
171+
// if (lastPartitionFieldValue != null) {
172+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
173+
null,
174+
values.stream().mapToDouble(Double::doubleValue).toArray(),
175+
bucketIndexes.stream().mapToInt(Integer::intValue).toArray()
176+
);
177+
bucketValuesPerPartition.add(bucketValues);
178+
// }
137179
}
138180

139-
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
140-
null,
141-
values.stream().mapToDouble(Double::doubleValue).toArray(),
142-
bucketIndexes.stream().mapToInt(Integer::intValue).toArray()
143-
);
144-
ChangeType changeType = ChangePointDetector.getChangeType(bucketValues);
145-
int changePointIndex = changeType.changePoint();
181+
List<ChangeType> changeTypes = new ArrayList<>();
182+
{
183+
for (MlAggsHelper.DoubleBucketValues bucketValues : bucketValuesPerPartition) {
184+
ChangeType changeType = ChangePointDetector.getChangeType(bucketValues);
185+
if (changeType instanceof ChangeType.Indeterminable indeterminable) {
186+
warnings(false).registerException(new IllegalArgumentException(indeterminable.getReason()));
187+
}
188+
changeTypes.add(changeType);
189+
}
190+
}
191+
192+
insertChangePoints(changeTypes);
193+
194+
if (tooManyValues) {
195+
warnings(true).registerException(
196+
new IllegalArgumentException("too many values; keeping only first " + INPUT_VALUE_COUNT_LIMIT + " values")
197+
);
198+
}
199+
if (hasNulls) {
200+
warnings(true).registerException(new IllegalArgumentException("values contain nulls; skipping them"));
201+
}
202+
if (hasMultivalued) {
203+
warnings(true).registerException(
204+
new IllegalArgumentException(
205+
"values contains multivalued entries; skipping them (please consider reducing them with e.g. MV_AVG or MV_SUM)"
206+
)
207+
);
208+
}
209+
}
146210

211+
private void insertChangePoints(Iterable<ChangeType> changeTypes) {
212+
Iterator<ChangeType> changeTypesIterator = changeTypes.iterator();
213+
ChangeType changeType = null;
214+
if (changeTypesIterator.hasNext()) {
215+
changeType = changeTypesIterator.next();
216+
}
147217
BlockFactory blockFactory = driverContext.blockFactory();
148218
int pageStartIndex = 0;
219+
String lastPartitionFieldValue = null;
149220
while (inputPages.isEmpty() == false) {
150221
Page inputPage = inputPages.peek();
151222
Page outputPage;
152223
Block changeTypeBlock = null;
153224
Block changePvalueBlock = null;
154225
boolean success = false;
226+
227+
String currentPartitionFieldValue = partitionChannel.isPresent() ? getCurrentPartitionFieldValue(inputPage) : null;
228+
if (lastPartitionFieldValue != null) {
229+
if (Objects.equals(currentPartitionFieldValue, lastPartitionFieldValue) == false) {
230+
pageStartIndex = 0;
231+
if (changeTypesIterator.hasNext()) {
232+
changeType = changeTypesIterator.next();
233+
}
234+
}
235+
}
236+
lastPartitionFieldValue = currentPartitionFieldValue;
237+
155238
try {
156-
if (pageStartIndex <= changePointIndex && changePointIndex < pageStartIndex + inputPage.getPositionCount()) {
239+
// TODO: How to handle case when there are no change points
240+
if (changeType != null && pageStartIndex <= changeType.changePoint() && changeType.changePoint() < pageStartIndex + inputPage.getPositionCount()) {
157241
try (
158242
BytesRefBlock.Builder changeTypeBlockBuilder = blockFactory.newBytesRefBlockBuilder(inputPage.getPositionCount());
159243
DoubleBlock.Builder pvalueBlockBuilder = blockFactory.newDoubleBlockBuilder(inputPage.getPositionCount())
160244
) {
161245
for (int i = 0; i < inputPage.getPositionCount(); i++) {
162-
if (pageStartIndex + i == changePointIndex) {
246+
if (pageStartIndex + i == changeType.changePoint()) {
163247
changeTypeBlockBuilder.appendBytesRef(new BytesRef(changeType.getWriteableName()));
164248
pvalueBlockBuilder.appendDouble(changeType.pValue());
165249
} else {
@@ -187,24 +271,14 @@ private void createOutputPages() {
187271
outputPages.add(outputPage);
188272
pageStartIndex += inputPage.getPositionCount();
189273
}
274+
}
190275

191-
if (changeType instanceof ChangeType.Indeterminable indeterminable) {
192-
warnings(false).registerException(new IllegalArgumentException(indeterminable.getReason()));
193-
}
194-
if (tooManyValues) {
195-
warnings(true).registerException(
196-
new IllegalArgumentException("too many values; keeping only first " + INPUT_VALUE_COUNT_LIMIT + " values")
197-
);
198-
}
199-
if (hasNulls) {
200-
warnings(true).registerException(new IllegalArgumentException("values contain nulls; skipping them"));
201-
}
202-
if (hasMultivalued) {
203-
warnings(true).registerException(
204-
new IllegalArgumentException(
205-
"values contains multivalued entries; skipping them (please consider reducing them with e.g. MV_AVG or MV_SUM)"
206-
)
207-
);
276+
private String getCurrentPartitionFieldValue(Page inputPage) {
277+
assert partitionChannel.isPresent();
278+
assert inputPage.getPositionCount() > 0;
279+
try (var block = inputPage.getBlock(partitionChannel.get()).filter(0)) {
280+
BytesRef partition = ((BytesRefBlock) block).getBytesRef(0, new BytesRef());
281+
return partition.utf8ToString();
208282
}
209283
}
210284

@@ -220,7 +294,11 @@ public void close() {
220294

221295
@Override
222296
public String toString() {
223-
return "ChangePointOperator[channel=" + channel + "]";
297+
return describe(metricChannel, partitionChannel);
298+
}
299+
300+
private static String describe(int metricChannel, Optional<Integer> partitionChannel) {
301+
return "ChangePointOperator[metricChannel=" + metricChannel + (partitionChannel.isPresent() ? ", partitionChannel=" + partitionChannel.get() : "") + "]";
224302
}
225303

226304
private Warnings warnings(boolean onlyWarnings) {

0 commit comments

Comments
 (0)