Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
6ea45d9
Grammar for ES|QL change point (with dummy impl)
jan-elastic Jan 24, 2025
eadffb9
pipeline breaker
jan-elastic Jan 24, 2025
f1eb1b2
ChangePointOperator
jan-elastic Jan 27, 2025
7333fcf
Add sorting
jan-elastic Jan 27, 2025
10032c2
basic csv test
jan-elastic Jan 27, 2025
6a62802
conflict
jan-elastic Jan 28, 2025
49600b6
Update docs/changelog/120998.yaml
jan-elastic Jan 28, 2025
77572e9
[CI] Auto commit changes from spotless
Jan 28, 2025
0886289
polish
jan-elastic Jan 28, 2025
97f16e3
Non-long data type
jan-elastic Jan 28, 2025
c2c5911
Move OrderBy/Limit to the logical plan
jan-elastic Jan 29, 2025
cddcfab
fix mem.leak
jan-elastic Jan 30, 2025
14072dc
csv test for reusing column names
jan-elastic Jan 30, 2025
c5fd3cb
Warning indeterminable
jan-elastic Jan 30, 2025
e905a79
capability
jan-elastic Jan 30, 2025
995baad
handle null values
jan-elastic Jan 30, 2025
cd223c3
too much data
jan-elastic Jan 30, 2025
8d0f691
type text->keyword
jan-elastic Jan 31, 2025
ea24893
default timestamp and output columns
jan-elastic Jan 31, 2025
37c3be2
spotless
jan-elastic Jan 31, 2025
1088a09
ChangePointOperatorTests + fix memory leaks
jan-elastic Jan 31, 2025
7f01d41
Merge branch 'main' of github.com:elastic/elasticsearch into esql-cha…
jan-elastic Jan 31, 2025
eabbaa1
[CI] Auto commit changes from spotless
Jan 31, 2025
7d30d44
improve test
jan-elastic Jan 31, 2025
ac48a09
add comments/todos
jan-elastic Feb 3, 2025
906ca1a
handle multivalued columns
jan-elastic Feb 3, 2025
33ef895
don't register unserialiazable
jan-elastic Feb 3, 2025
98fa981
surrogate
jan-elastic Feb 3, 2025
535101b
make "too much data" tests readable
jan-elastic Feb 3, 2025
8692df6
more tests
jan-elastic Feb 3, 2025
5cab77b
Merge branch 'main' of github.com:elastic/elasticsearch into esql-cha…
jan-elastic Feb 3, 2025
61e24dd
Error handling
jan-elastic Feb 3, 2025
976450e
fix multivalued test
jan-elastic Feb 4, 2025
4885af7
Merge branch 'main' of github.com:elastic/elasticsearch into esql-cha…
jan-elastic Feb 4, 2025
047feb7
more name conflict tests
jan-elastic Feb 4, 2025
9278422
[CI] Auto commit changes from spotless
Feb 4, 2025
41057f7
more tests
jan-elastic Feb 4, 2025
f9464de
improve code
jan-elastic Feb 5, 2025
f50c2aa
CSV test for various input key/value types
jan-elastic Feb 5, 2025
8e05959
Merge branch 'main' of github.com:elastic/elasticsearch into esql-cha…
jan-elastic Feb 5, 2025
a7c76dd
one more csv test
jan-elastic Feb 5, 2025
bed70ab
Check sortable/numeric for all types
jan-elastic Feb 5, 2025
e81e47e
Merge branch 'main' of github.com:elastic/elasticsearch into esql-cha…
jan-elastic Feb 5, 2025
688567b
add null type to testChangePoint_valueNumeric
jan-elastic Feb 6, 2025
22acc9b
more CSV tests
jan-elastic Feb 6, 2025
2b137ca
skip nulls instead of zeroing them
jan-elastic Feb 6, 2025
96e6d6a
error on MV
jan-elastic Feb 6, 2025
f00522a
Test+todo for nicer error message
jan-elastic Feb 6, 2025
12876e0
Merge branch 'main' of github.com:elastic/elasticsearch into esql-cha…
jan-elastic Feb 6, 2025
21ec77c
better error msg
jan-elastic Feb 6, 2025
1d3e362
Revert "better error msg"
jan-elastic Feb 6, 2025
dabf260
fix
jan-elastic Feb 6, 2025
459d408
make csv test deterministic
jan-elastic Feb 6, 2025
dbd5838
replace NamedExpression -> Attribute
jan-elastic Feb 10, 2025
4ced5c2
skip MVs + warning
jan-elastic Feb 10, 2025
654f78d
Merge branch 'main' of github.com:elastic/elasticsearch into esql-cha…
jan-elastic Feb 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/120998.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120998
summary: ES|QL `change_point` processing command
area: Machine Learning
type: feature
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockUtils;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xpack.ml.aggs.MlAggsHelper;
import org.elasticsearch.xpack.ml.aggs.changepoint.ChangePointDetector;
import org.elasticsearch.xpack.ml.aggs.changepoint.ChangeType;

import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;

/**
* Find spikes, dips and change point in a list of values.
* <p>
* Warning: this operator cannot handle large amounts of data! It buffers all
* data that is passed to it, runs the change point detector on the data (which
* is a compute-heavy process), and then outputs all data with the change points.
*/
public class ChangePointOperator implements Operator {

public static final int INPUT_VALUE_COUNT_LIMIT = 1000;

public record Factory(int channel, String sourceText, int sourceLine, int sourceColumn) implements OperatorFactory {
@Override
public Operator get(DriverContext driverContext) {
return new ChangePointOperator(driverContext, channel, sourceText, sourceLine, sourceColumn);
}

@Override
public String describe() {
return "ChangePointOperator[channel=" + channel + "]";
}
}

private final DriverContext driverContext;
private final int channel;
private final String sourceText;
private final int sourceLine;
private final int sourceColumn;

private final Deque<Page> inputPages;
private final Deque<Page> outputPages;
private boolean finished;
private Warnings warnings;

// TODO: make org.elasticsearch.xpack.esql.core.tree.Source available here
// (by modularizing esql-core) and use that instead of the individual fields.
public ChangePointOperator(DriverContext driverContext, int channel, String sourceText, int sourceLine, int sourceColumn) {
this.driverContext = driverContext;
this.channel = channel;
this.sourceText = sourceText;
this.sourceLine = sourceLine;
this.sourceColumn = sourceColumn;

finished = false;
inputPages = new LinkedList<>();
outputPages = new LinkedList<>();
warnings = null;
}

@Override
public boolean needsInput() {
return finished == false;
}

@Override
public void addInput(Page page) {
inputPages.add(page);
}

@Override
public void finish() {
if (finished == false) {
finished = true;
createOutputPages();
}
}

@Override
public boolean isFinished() {
return finished && outputPages.isEmpty();
}

@Override
public Page getOutput() {
if (finished == false || outputPages.isEmpty()) {
return null;
}
return outputPages.removeFirst();
}

private void createOutputPages() {
int valuesCount = 0;
for (Page page : inputPages) {
valuesCount += page.getPositionCount();
}
boolean tooManyValues = valuesCount > INPUT_VALUE_COUNT_LIMIT;
if (tooManyValues) {
valuesCount = INPUT_VALUE_COUNT_LIMIT;
}

List<Double> values = new ArrayList<>(valuesCount);
List<Integer> bucketIndexes = new ArrayList<>(valuesCount);
int valuesIndex = 0;
boolean hasNulls = false;
boolean hasMultivalued = false;
for (Page inputPage : inputPages) {
Block inputBlock = inputPage.getBlock(channel);
for (int i = 0; i < inputBlock.getPositionCount() && valuesIndex < valuesCount; i++) {
Object value = BlockUtils.toJavaObject(inputBlock, i);
if (value == null) {
hasNulls = true;
valuesIndex++;
} else if (value instanceof List<?>) {
hasMultivalued = true;
valuesIndex++;
} else {
values.add(((Number) value).doubleValue());
bucketIndexes.add(valuesIndex++);
}
}
}

MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
null,
values.stream().mapToDouble(Double::doubleValue).toArray(),
bucketIndexes.stream().mapToInt(Integer::intValue).toArray()
);
ChangeType changeType = ChangePointDetector.getChangeType(bucketValues);
int changePointIndex = changeType.changePoint();

BlockFactory blockFactory = driverContext.blockFactory();
int pageStartIndex = 0;
while (inputPages.isEmpty() == false) {
Page inputPage = inputPages.peek();
Page outputPage;
Block changeTypeBlock = null;
Block changePvalueBlock = null;
boolean success = false;
try {
if (pageStartIndex <= changePointIndex && changePointIndex < pageStartIndex + inputPage.getPositionCount()) {
try (
BytesRefBlock.Builder changeTypeBlockBuilder = blockFactory.newBytesRefBlockBuilder(inputPage.getPositionCount());
DoubleBlock.Builder pvalueBlockBuilder = blockFactory.newDoubleBlockBuilder(inputPage.getPositionCount())
) {
for (int i = 0; i < inputPage.getPositionCount(); i++) {
if (pageStartIndex + i == changePointIndex) {
changeTypeBlockBuilder.appendBytesRef(new BytesRef(changeType.getWriteableName()));
pvalueBlockBuilder.appendDouble(changeType.pValue());
} else {
changeTypeBlockBuilder.appendNull();
pvalueBlockBuilder.appendNull();
}
}
changeTypeBlock = changeTypeBlockBuilder.build();
changePvalueBlock = pvalueBlockBuilder.build();
}
} else {
changeTypeBlock = blockFactory.newConstantNullBlock(inputPage.getPositionCount());
changePvalueBlock = blockFactory.newConstantNullBlock(inputPage.getPositionCount());
}

outputPage = inputPage.appendBlocks(new Block[] { changeTypeBlock, changePvalueBlock });
success = true;
} finally {
if (success == false) {
Releasables.closeExpectNoException(changeTypeBlock, changePvalueBlock);
}
}

inputPages.removeFirst();
outputPages.add(outputPage);
pageStartIndex += inputPage.getPositionCount();
}

if (changeType instanceof ChangeType.Indeterminable indeterminable) {
warnings(false).registerException(new IllegalArgumentException(indeterminable.getReason()));
}
if (tooManyValues) {
warnings(true).registerException(
new IllegalArgumentException("too many values; keeping only first " + INPUT_VALUE_COUNT_LIMIT + " values")
);
}
if (hasNulls) {
warnings(true).registerException(new IllegalArgumentException("values contain nulls; skipping them"));
}
if (hasMultivalued) {
warnings(true).registerException(
new IllegalArgumentException(
"values contains multivalued entries; skipping them (please consider reducing them with e.g. MV_AVG or MV_SUM)"
)
);
}
}

@Override
public void close() {
for (Page page : inputPages) {
page.releaseBlocks();
}
for (Page page : outputPages) {
page.releaseBlocks();
}
}

@Override
public String toString() {
return "ChangePointOperator[channel=" + channel + "]";
}

private Warnings warnings(boolean onlyWarnings) {
if (warnings == null) {
if (onlyWarnings) {
this.warnings = Warnings.createOnlyWarnings(driverContext.warningsMode(), sourceLine, sourceColumn, sourceText);
} else {
this.warnings = Warnings.createWarnings(driverContext.warningsMode(), sourceLine, sourceColumn, sourceText);
}
}
return warnings;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ public void registerException(Exception exception) {
* @param sourceText The source text that caused the warning. Same as `source.text()`
* @return A warnings collector object
*/
// TODO: rename to createWarningsTreatedAsNull
public static Warnings createWarnings(DriverContext.WarningsMode warningsMode, int lineNumber, int columnNumber, String sourceText) {
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "treating result as null");
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "evaluation of [{}] failed, treating result as null");
}

/**
Expand All @@ -50,7 +51,26 @@ public static Warnings createWarningsTreatedAsFalse(
int columnNumber,
String sourceText
) {
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "treating result as false");
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "evaluation of [{}] failed, treating result as false");
}

/**
* Create a new warnings object based on the given mode which warns that
* evaluation resulted in warnings.
* @param warningsMode The warnings collection strategy to use
* @param lineNumber The line number of the source text. Same as `source.getLineNumber()`
* @param columnNumber The column number of the source text. Same as `source.getColumnNumber()`
* @param sourceText The source text that caused the warning. Same as `source.text()`
* @return A warnings collector object
*/
// TODO: rename to createWarnings
public static Warnings createOnlyWarnings(
DriverContext.WarningsMode warningsMode,
int lineNumber,
int columnNumber,
String sourceText
) {
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "warnings during evaluation of [{}]");
}

private static Warnings createWarnings(
Expand Down Expand Up @@ -78,14 +98,7 @@ private static Warnings createWarnings(

private Warnings(int lineNumber, int columnNumber, String sourceText, String first) {
this.location = format("Line {}:{}: ", lineNumber, columnNumber);
this.first = format(
null,
"{}evaluation of [{}] failed, {}. Only first {} failures recorded.",
location,
sourceText,
first,
MAX_ADDED_WARNINGS
);
this.first = format(null, "{}" + first + ". Only first {} failures recorded.", location, sourceText, MAX_ADDED_WARNINGS);
}

public void registerException(Exception exception) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.test.OperatorTestCase;
import org.elasticsearch.compute.test.SequenceLongBlockSourceOperator;
import org.hamcrest.Matcher;

import java.util.ArrayList;
import java.util.List;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;

public class ChangePointOperatorTests extends OperatorTestCase {

@Override
protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
// size must be in [25, 1000] for ChangePoint to function correctly
// and detect the step change.
size = Math.clamp(size, 25, 1000);
List<Long> data = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
if (i <= size / 2) {
data.add(0L);
} else {
data.add(1L);
}
}
return new SequenceLongBlockSourceOperator(blockFactory, data);
}

@Override
protected void assertSimpleOutput(List<Page> input, List<Page> results) {
boolean seenOne = false;
assertThat(results, hasSize(input.size()));
for (int i = 0; i < results.size(); i++) {
Page inputPage = input.get(i);
Page resultPage = results.get(i);
assertThat(resultPage.getPositionCount(), equalTo(inputPage.getPositionCount()));
assertThat(resultPage.getBlockCount(), equalTo(3));
for (int j = 0; j < resultPage.getPositionCount(); j++) {
long inputValue = ((LongBlock) resultPage.getBlock(0)).getLong(j);
long resultValue = ((LongBlock) resultPage.getBlock(0)).getLong(j);
assertThat(resultValue, equalTo(inputValue));
if (seenOne == false && resultValue == 1L) {
BytesRef type = ((BytesRefBlock) resultPage.getBlock(1)).getBytesRef(j, new BytesRef());
double pvalue = ((DoubleBlock) resultPage.getBlock(2)).getDouble(j);
assertThat(type.utf8ToString(), equalTo("step_change"));
assertThat(pvalue, equalTo(0.0));
seenOne = true;
} else {
assertThat(resultPage.getBlock(1).isNull(j), equalTo(true));
assertThat(resultPage.getBlock(2).isNull(j), equalTo(true));
}
}
}
assertThat(seenOne, equalTo(true));
}

@Override
protected Operator.OperatorFactory simple() {
return new ChangePointOperator.Factory(0, null, 0, 0);
}

@Override
protected Matcher<String> expectedDescriptionOfSimple() {
return equalTo("ChangePointOperator[channel=0]");
}

@Override
protected Matcher<String> expectedToStringOfSimple() {
return equalTo("ChangePointOperator[channel=0]");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@

import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.in;

/**
* Base tests for {@link Operator}s that are not {@link SourceOperator} or {@link SinkOperator}.
Expand Down
Loading