Skip to content

Commit aaa5ce8

Browse files
jan-elasticelasticsearchmachine
andauthored
ES|QL change_point processing command (#120998)
* Grammar for ES|QL change point (with dummy impl) * pipeline breaker * ChangePointOperator * Add sorting * basic csv test * conflict * Update docs/changelog/120998.yaml * [CI] Auto commit changes from spotless * polish * Non-long data type * Move OrderBy/Limit to the logical plan * fix mem.leak * csv test for reusing column names * Warning indeterminable * capability * handle null values * too much data * type text->keyword * default timestamp and output columns * spotless * ChangePointOperatorTests + fix memory leaks * [CI] Auto commit changes from spotless * improve test * add comments/todos * handle multivalued columns * don't register unserialiazable * surrogate * make "too much data" tests readable * more tests * Error handling * fix multivalued test * more name conflict tests * [CI] Auto commit changes from spotless * more tests * improve code * CSV test for various input key/value types * one more csv test * Check sortable/numeric for all types * add null type to testChangePoint_valueNumeric * more CSV tests * skip nulls instead of zeroing them * error on MV * Test+todo for nicer error message * better error msg * Revert "better error msg" This reverts commit 21ec77c. * fix * make csv test deterministic * replace NamedExpression -> Attribute * skip MVs + warning --------- Co-authored-by: elasticsearchmachine <[email protected]>
1 parent 49352fd commit aaa5ce8

File tree

29 files changed

+4449
-2256
lines changed

29 files changed

+4449
-2256
lines changed

docs/changelog/120998.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 120998
2+
summary: ES|QL `change_point` processing command
3+
area: Machine Learning
4+
type: feature
5+
issues: []
Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator;
9+
10+
import org.apache.lucene.util.BytesRef;
11+
import org.elasticsearch.compute.data.Block;
12+
import org.elasticsearch.compute.data.BlockFactory;
13+
import org.elasticsearch.compute.data.BlockUtils;
14+
import org.elasticsearch.compute.data.BytesRefBlock;
15+
import org.elasticsearch.compute.data.DoubleBlock;
16+
import org.elasticsearch.compute.data.Page;
17+
import org.elasticsearch.core.Releasables;
18+
import org.elasticsearch.xpack.ml.aggs.MlAggsHelper;
19+
import org.elasticsearch.xpack.ml.aggs.changepoint.ChangePointDetector;
20+
import org.elasticsearch.xpack.ml.aggs.changepoint.ChangeType;
21+
22+
import java.util.ArrayList;
23+
import java.util.Deque;
24+
import java.util.LinkedList;
25+
import java.util.List;
26+
27+
/**
28+
* Find spikes, dips and change point in a list of values.
29+
* <p>
30+
* Warning: this operator cannot handle large amounts of data! It buffers all
31+
* data that is passed to it, runs the change point detector on the data (which
32+
* is a compute-heavy process), and then outputs all data with the change points.
33+
*/
34+
public class ChangePointOperator implements Operator {
35+
36+
public static final int INPUT_VALUE_COUNT_LIMIT = 1000;
37+
38+
public record Factory(int channel, String sourceText, int sourceLine, int sourceColumn) implements OperatorFactory {
39+
@Override
40+
public Operator get(DriverContext driverContext) {
41+
return new ChangePointOperator(driverContext, channel, sourceText, sourceLine, sourceColumn);
42+
}
43+
44+
@Override
45+
public String describe() {
46+
return "ChangePointOperator[channel=" + channel + "]";
47+
}
48+
}
49+
50+
private final DriverContext driverContext;
51+
private final int channel;
52+
private final String sourceText;
53+
private final int sourceLine;
54+
private final int sourceColumn;
55+
56+
private final Deque<Page> inputPages;
57+
private final Deque<Page> outputPages;
58+
private boolean finished;
59+
private Warnings warnings;
60+
61+
// TODO: make org.elasticsearch.xpack.esql.core.tree.Source available here
62+
// (by modularizing esql-core) and use that instead of the individual fields.
63+
public ChangePointOperator(DriverContext driverContext, int channel, String sourceText, int sourceLine, int sourceColumn) {
64+
this.driverContext = driverContext;
65+
this.channel = channel;
66+
this.sourceText = sourceText;
67+
this.sourceLine = sourceLine;
68+
this.sourceColumn = sourceColumn;
69+
70+
finished = false;
71+
inputPages = new LinkedList<>();
72+
outputPages = new LinkedList<>();
73+
warnings = null;
74+
}
75+
76+
@Override
77+
public boolean needsInput() {
78+
return finished == false;
79+
}
80+
81+
@Override
82+
public void addInput(Page page) {
83+
inputPages.add(page);
84+
}
85+
86+
@Override
87+
public void finish() {
88+
if (finished == false) {
89+
finished = true;
90+
createOutputPages();
91+
}
92+
}
93+
94+
@Override
95+
public boolean isFinished() {
96+
return finished && outputPages.isEmpty();
97+
}
98+
99+
@Override
100+
public Page getOutput() {
101+
if (finished == false || outputPages.isEmpty()) {
102+
return null;
103+
}
104+
return outputPages.removeFirst();
105+
}
106+
107+
private void createOutputPages() {
108+
int valuesCount = 0;
109+
for (Page page : inputPages) {
110+
valuesCount += page.getPositionCount();
111+
}
112+
boolean tooManyValues = valuesCount > INPUT_VALUE_COUNT_LIMIT;
113+
if (tooManyValues) {
114+
valuesCount = INPUT_VALUE_COUNT_LIMIT;
115+
}
116+
117+
List<Double> values = new ArrayList<>(valuesCount);
118+
List<Integer> bucketIndexes = new ArrayList<>(valuesCount);
119+
int valuesIndex = 0;
120+
boolean hasNulls = false;
121+
boolean hasMultivalued = false;
122+
for (Page inputPage : inputPages) {
123+
Block inputBlock = inputPage.getBlock(channel);
124+
for (int i = 0; i < inputBlock.getPositionCount() && valuesIndex < valuesCount; i++) {
125+
Object value = BlockUtils.toJavaObject(inputBlock, i);
126+
if (value == null) {
127+
hasNulls = true;
128+
valuesIndex++;
129+
} else if (value instanceof List<?>) {
130+
hasMultivalued = true;
131+
valuesIndex++;
132+
} else {
133+
values.add(((Number) value).doubleValue());
134+
bucketIndexes.add(valuesIndex++);
135+
}
136+
}
137+
}
138+
139+
MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
140+
null,
141+
values.stream().mapToDouble(Double::doubleValue).toArray(),
142+
bucketIndexes.stream().mapToInt(Integer::intValue).toArray()
143+
);
144+
ChangeType changeType = ChangePointDetector.getChangeType(bucketValues);
145+
int changePointIndex = changeType.changePoint();
146+
147+
BlockFactory blockFactory = driverContext.blockFactory();
148+
int pageStartIndex = 0;
149+
while (inputPages.isEmpty() == false) {
150+
Page inputPage = inputPages.peek();
151+
Page outputPage;
152+
Block changeTypeBlock = null;
153+
Block changePvalueBlock = null;
154+
boolean success = false;
155+
try {
156+
if (pageStartIndex <= changePointIndex && changePointIndex < pageStartIndex + inputPage.getPositionCount()) {
157+
try (
158+
BytesRefBlock.Builder changeTypeBlockBuilder = blockFactory.newBytesRefBlockBuilder(inputPage.getPositionCount());
159+
DoubleBlock.Builder pvalueBlockBuilder = blockFactory.newDoubleBlockBuilder(inputPage.getPositionCount())
160+
) {
161+
for (int i = 0; i < inputPage.getPositionCount(); i++) {
162+
if (pageStartIndex + i == changePointIndex) {
163+
changeTypeBlockBuilder.appendBytesRef(new BytesRef(changeType.getWriteableName()));
164+
pvalueBlockBuilder.appendDouble(changeType.pValue());
165+
} else {
166+
changeTypeBlockBuilder.appendNull();
167+
pvalueBlockBuilder.appendNull();
168+
}
169+
}
170+
changeTypeBlock = changeTypeBlockBuilder.build();
171+
changePvalueBlock = pvalueBlockBuilder.build();
172+
}
173+
} else {
174+
changeTypeBlock = blockFactory.newConstantNullBlock(inputPage.getPositionCount());
175+
changePvalueBlock = blockFactory.newConstantNullBlock(inputPage.getPositionCount());
176+
}
177+
178+
outputPage = inputPage.appendBlocks(new Block[] { changeTypeBlock, changePvalueBlock });
179+
success = true;
180+
} finally {
181+
if (success == false) {
182+
Releasables.closeExpectNoException(changeTypeBlock, changePvalueBlock);
183+
}
184+
}
185+
186+
inputPages.removeFirst();
187+
outputPages.add(outputPage);
188+
pageStartIndex += inputPage.getPositionCount();
189+
}
190+
191+
if (changeType instanceof ChangeType.Indeterminable indeterminable) {
192+
warnings(false).registerException(new IllegalArgumentException(indeterminable.getReason()));
193+
}
194+
if (tooManyValues) {
195+
warnings(true).registerException(
196+
new IllegalArgumentException("too many values; keeping only first " + INPUT_VALUE_COUNT_LIMIT + " values")
197+
);
198+
}
199+
if (hasNulls) {
200+
warnings(true).registerException(new IllegalArgumentException("values contain nulls; skipping them"));
201+
}
202+
if (hasMultivalued) {
203+
warnings(true).registerException(
204+
new IllegalArgumentException(
205+
"values contains multivalued entries; skipping them (please consider reducing them with e.g. MV_AVG or MV_SUM)"
206+
)
207+
);
208+
}
209+
}
210+
211+
@Override
212+
public void close() {
213+
for (Page page : inputPages) {
214+
page.releaseBlocks();
215+
}
216+
for (Page page : outputPages) {
217+
page.releaseBlocks();
218+
}
219+
}
220+
221+
@Override
222+
public String toString() {
223+
return "ChangePointOperator[channel=" + channel + "]";
224+
}
225+
226+
private Warnings warnings(boolean onlyWarnings) {
227+
if (warnings == null) {
228+
if (onlyWarnings) {
229+
this.warnings = Warnings.createOnlyWarnings(driverContext.warningsMode(), sourceLine, sourceColumn, sourceText);
230+
} else {
231+
this.warnings = Warnings.createWarnings(driverContext.warningsMode(), sourceLine, sourceColumn, sourceText);
232+
}
233+
}
234+
return warnings;
235+
}
236+
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Warnings.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@ public void registerException(Exception exception) {
3131
* @param sourceText The source text that caused the warning. Same as `source.text()`
3232
* @return A warnings collector object
3333
*/
34+
// TODO: rename to createWarningsTreatedAsNull
3435
public static Warnings createWarnings(DriverContext.WarningsMode warningsMode, int lineNumber, int columnNumber, String sourceText) {
35-
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "treating result as null");
36+
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "evaluation of [{}] failed, treating result as null");
3637
}
3738

3839
/**
@@ -50,7 +51,26 @@ public static Warnings createWarningsTreatedAsFalse(
5051
int columnNumber,
5152
String sourceText
5253
) {
53-
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "treating result as false");
54+
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "evaluation of [{}] failed, treating result as false");
55+
}
56+
57+
/**
58+
* Create a new warnings object based on the given mode which warns that
59+
* evaluation resulted in warnings.
60+
* @param warningsMode The warnings collection strategy to use
61+
* @param lineNumber The line number of the source text. Same as `source.getLineNumber()`
62+
* @param columnNumber The column number of the source text. Same as `source.getColumnNumber()`
63+
* @param sourceText The source text that caused the warning. Same as `source.text()`
64+
* @return A warnings collector object
65+
*/
66+
// TODO: rename to createWarnings
67+
public static Warnings createOnlyWarnings(
68+
DriverContext.WarningsMode warningsMode,
69+
int lineNumber,
70+
int columnNumber,
71+
String sourceText
72+
) {
73+
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "warnings during evaluation of [{}]");
5474
}
5575

5676
private static Warnings createWarnings(
@@ -78,14 +98,7 @@ private static Warnings createWarnings(
7898

7999
private Warnings(int lineNumber, int columnNumber, String sourceText, String first) {
80100
this.location = format("Line {}:{}: ", lineNumber, columnNumber);
81-
this.first = format(
82-
null,
83-
"{}evaluation of [{}] failed, {}. Only first {} failures recorded.",
84-
location,
85-
sourceText,
86-
first,
87-
MAX_ADDED_WARNINGS
88-
);
101+
this.first = format(null, "{}" + first + ". Only first {} failures recorded.", location, sourceText, MAX_ADDED_WARNINGS);
89102
}
90103

91104
public void registerException(Exception exception) {
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator;
9+
10+
import org.apache.lucene.util.BytesRef;
11+
import org.elasticsearch.compute.data.BlockFactory;
12+
import org.elasticsearch.compute.data.BytesRefBlock;
13+
import org.elasticsearch.compute.data.DoubleBlock;
14+
import org.elasticsearch.compute.data.LongBlock;
15+
import org.elasticsearch.compute.data.Page;
16+
import org.elasticsearch.compute.test.OperatorTestCase;
17+
import org.elasticsearch.compute.test.SequenceLongBlockSourceOperator;
18+
import org.hamcrest.Matcher;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
23+
import static org.hamcrest.Matchers.equalTo;
24+
import static org.hamcrest.Matchers.hasSize;
25+
26+
public class ChangePointOperatorTests extends OperatorTestCase {
27+
28+
@Override
29+
protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
30+
// size must be in [25, 1000] for ChangePoint to function correctly
31+
// and detect the step change.
32+
size = Math.clamp(size, 25, 1000);
33+
List<Long> data = new ArrayList<>(size);
34+
for (int i = 0; i < size; i++) {
35+
if (i <= size / 2) {
36+
data.add(0L);
37+
} else {
38+
data.add(1L);
39+
}
40+
}
41+
return new SequenceLongBlockSourceOperator(blockFactory, data);
42+
}
43+
44+
@Override
45+
protected void assertSimpleOutput(List<Page> input, List<Page> results) {
46+
boolean seenOne = false;
47+
assertThat(results, hasSize(input.size()));
48+
for (int i = 0; i < results.size(); i++) {
49+
Page inputPage = input.get(i);
50+
Page resultPage = results.get(i);
51+
assertThat(resultPage.getPositionCount(), equalTo(inputPage.getPositionCount()));
52+
assertThat(resultPage.getBlockCount(), equalTo(3));
53+
for (int j = 0; j < resultPage.getPositionCount(); j++) {
54+
long inputValue = ((LongBlock) resultPage.getBlock(0)).getLong(j);
55+
long resultValue = ((LongBlock) resultPage.getBlock(0)).getLong(j);
56+
assertThat(resultValue, equalTo(inputValue));
57+
if (seenOne == false && resultValue == 1L) {
58+
BytesRef type = ((BytesRefBlock) resultPage.getBlock(1)).getBytesRef(j, new BytesRef());
59+
double pvalue = ((DoubleBlock) resultPage.getBlock(2)).getDouble(j);
60+
assertThat(type.utf8ToString(), equalTo("step_change"));
61+
assertThat(pvalue, equalTo(0.0));
62+
seenOne = true;
63+
} else {
64+
assertThat(resultPage.getBlock(1).isNull(j), equalTo(true));
65+
assertThat(resultPage.getBlock(2).isNull(j), equalTo(true));
66+
}
67+
}
68+
}
69+
assertThat(seenOne, equalTo(true));
70+
}
71+
72+
@Override
73+
protected Operator.OperatorFactory simple() {
74+
return new ChangePointOperator.Factory(0, null, 0, 0);
75+
}
76+
77+
@Override
78+
protected Matcher<String> expectedDescriptionOfSimple() {
79+
return equalTo("ChangePointOperator[channel=0]");
80+
}
81+
82+
@Override
83+
protected Matcher<String> expectedToStringOfSimple() {
84+
return equalTo("ChangePointOperator[channel=0]");
85+
}
86+
}

x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646

4747
import static org.hamcrest.Matchers.empty;
4848
import static org.hamcrest.Matchers.equalTo;
49-
import static org.hamcrest.Matchers.in;
5049

5150
/**
5251
* Base tests for {@link Operator}s that are not {@link SourceOperator} or {@link SinkOperator}.

0 commit comments

Comments
 (0)