-
Notifications
You must be signed in to change notification settings - Fork 25.6k
ES|QL change_point processing command #120998
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 24 commits
Commits
Show all changes
56 commits
Select commit
Hold shift + click to select a range
6ea45d9
Grammar for ES|QL change point (with dummy impl)
jan-elastic eadffb9
pipeline breaker
jan-elastic f1eb1b2
ChangePointOperator
jan-elastic 7333fcf
Add sorting
jan-elastic 10032c2
basic csv test
jan-elastic 6a62802
conflict
jan-elastic 49600b6
Update docs/changelog/120998.yaml
jan-elastic 77572e9
[CI] Auto commit changes from spotless
0886289
polish
jan-elastic 97f16e3
Non-long data type
jan-elastic c2c5911
Move OrderBy/Limit to the logical plan
jan-elastic cddcfab
fix mem.leak
jan-elastic 14072dc
csv test for reusing column names
jan-elastic c5fd3cb
Warning indeterminable
jan-elastic e905a79
capability
jan-elastic 995baad
handle null values
jan-elastic cd223c3
too much data
jan-elastic 8d0f691
type text->keyword
jan-elastic ea24893
default timestamp and output columns
jan-elastic 37c3be2
spotless
jan-elastic 1088a09
ChangePointOperatorTests + fix memory leaks
jan-elastic 7f01d41
Merge branch 'main' of github.com:elastic/elasticsearch into esql-cha…
jan-elastic eabbaa1
[CI] Auto commit changes from spotless
7d30d44
improve test
jan-elastic ac48a09
add comments/todos
jan-elastic 906ca1a
handle multivalued columns
jan-elastic 33ef895
don't register unserialiazable
jan-elastic 98fa981
surrogate
jan-elastic 535101b
make "too much data" tests readable
jan-elastic 8692df6
more tests
jan-elastic 5cab77b
Merge branch 'main' of github.com:elastic/elasticsearch into esql-cha…
jan-elastic 61e24dd
Error handling
jan-elastic 976450e
fix multivalued test
jan-elastic 4885af7
Merge branch 'main' of github.com:elastic/elasticsearch into esql-cha…
jan-elastic 047feb7
more name conflict tests
jan-elastic 9278422
[CI] Auto commit changes from spotless
41057f7
more tests
jan-elastic f9464de
improve code
jan-elastic f50c2aa
CSV test for various input key/value types
jan-elastic 8e05959
Merge branch 'main' of github.com:elastic/elasticsearch into esql-cha…
jan-elastic a7c76dd
one more csv test
jan-elastic bed70ab
Check sortable/numeric for all types
jan-elastic e81e47e
Merge branch 'main' of github.com:elastic/elasticsearch into esql-cha…
jan-elastic 688567b
add null type to testChangePoint_valueNumeric
jan-elastic 22acc9b
more CSV tests
jan-elastic 2b137ca
skip nulls instead of zeroing them
jan-elastic 96e6d6a
error on MV
jan-elastic f00522a
Test+todo for nicer error message
jan-elastic 12876e0
Merge branch 'main' of github.com:elastic/elasticsearch into esql-cha…
jan-elastic 21ec77c
better error msg
jan-elastic 1d3e362
Revert "better error msg"
jan-elastic dabf260
fix
jan-elastic 459d408
make csv test deterministic
jan-elastic dbd5838
replace NamedExpression -> Attribute
jan-elastic 4ced5c2
skip MVs + warning
jan-elastic 654f78d
Merge branch 'main' of github.com:elastic/elasticsearch into esql-cha…
jan-elastic File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 120998 | ||
| summary: ES|QL `change_point` processing command | ||
| area: Machine Learning | ||
| type: feature | ||
| issues: [] |
207 changes: 207 additions & 0 deletions
207
...in/esql/compute/src/main/java/org/elasticsearch/compute/operator/ChangePointOperator.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,207 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
|
|
||
| package org.elasticsearch.compute.operator; | ||
|
|
||
| import org.apache.lucene.util.BytesRef; | ||
| import org.elasticsearch.compute.data.Block; | ||
| import org.elasticsearch.compute.data.BlockFactory; | ||
| import org.elasticsearch.compute.data.BlockUtils; | ||
| import org.elasticsearch.compute.data.BytesRefBlock; | ||
| import org.elasticsearch.compute.data.DoubleBlock; | ||
| import org.elasticsearch.compute.data.Page; | ||
| import org.elasticsearch.core.Releasables; | ||
| import org.elasticsearch.xpack.ml.aggs.MlAggsHelper; | ||
| import org.elasticsearch.xpack.ml.aggs.changepoint.ChangePointDetector; | ||
| import org.elasticsearch.xpack.ml.aggs.changepoint.ChangeType; | ||
|
|
||
| import java.util.Deque; | ||
| import java.util.LinkedList; | ||
|
|
||
| public class ChangePointOperator implements Operator { | ||
|
|
||
| public static final int INPUT_VALUE_COUNT_LIMIT = 1000; | ||
|
|
||
| public record Factory(int channel, String sourceText, int sourceLine, int sourceColumn) implements OperatorFactory { | ||
jan-elastic marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| @Override | ||
| public Operator get(DriverContext driverContext) { | ||
| return new ChangePointOperator(driverContext, channel, sourceText, sourceLine, sourceColumn); | ||
| } | ||
|
|
||
| @Override | ||
| public String describe() { | ||
| return "ChangePointOperator[channel=" + channel + "]"; | ||
| } | ||
| } | ||
|
|
||
| private final DriverContext driverContext; | ||
| private final int channel; | ||
| private final String sourceText; | ||
| private final int sourceLine; | ||
| private final int sourceColumn; | ||
|
|
||
| private final Deque<Page> inputPages; | ||
| private final Deque<Page> outputPages; | ||
| private boolean finished; | ||
| private Warnings warnings; | ||
|
|
||
| public ChangePointOperator(DriverContext driverContext, int channel, String sourceText, int sourceLine, int sourceColumn) { | ||
| this.driverContext = driverContext; | ||
| this.channel = channel; | ||
| this.sourceText = sourceText; | ||
| this.sourceLine = sourceLine; | ||
| this.sourceColumn = sourceColumn; | ||
|
|
||
| finished = false; | ||
| inputPages = new LinkedList<>(); | ||
| outputPages = new LinkedList<>(); | ||
| warnings = null; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean needsInput() { | ||
| return finished == false; | ||
| } | ||
|
|
||
| @Override | ||
| public void addInput(Page page) { | ||
| inputPages.add(page); | ||
| } | ||
|
|
||
| @Override | ||
| public void finish() { | ||
| if (finished == false) { | ||
| finished = true; | ||
| createOutputPages(); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isFinished() { | ||
| return finished && outputPages.isEmpty(); | ||
| } | ||
|
|
||
| @Override | ||
| public Page getOutput() { | ||
| if (finished == false || outputPages.isEmpty()) { | ||
| return null; | ||
| } | ||
| return outputPages.removeFirst(); | ||
| } | ||
|
|
||
| private void createOutputPages() { | ||
| int valuesCount = 0; | ||
| for (Page page : inputPages) { | ||
| valuesCount += page.getPositionCount(); | ||
| } | ||
| boolean tooManyValues = valuesCount > INPUT_VALUE_COUNT_LIMIT; | ||
| if (tooManyValues) { | ||
| valuesCount = INPUT_VALUE_COUNT_LIMIT; | ||
| } | ||
|
|
||
| double[] values = new double[valuesCount]; | ||
| int valuesIndex = 0; | ||
| boolean hasNulls = false; | ||
| for (Page inputPage : inputPages) { | ||
| Block inputBlock = inputPage.getBlock(channel); | ||
| for (int i = 0; i < inputBlock.getPositionCount() && valuesIndex < valuesCount; i++) { | ||
| Object value = BlockUtils.toJavaObject(inputBlock, i); | ||
| if (value == null) { | ||
| hasNulls = true; | ||
| values[valuesIndex++] = 0; | ||
| } else { | ||
| values[valuesIndex++] = ((Number) value).doubleValue(); | ||
| } | ||
jan-elastic marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
| ChangeType changeType = ChangePointDetector.getChangeType(new MlAggsHelper.DoubleBucketValues(null, values)); | ||
| int changePointIndex = changeType.changePoint(); | ||
|
|
||
| BlockFactory blockFactory = driverContext.blockFactory(); | ||
| int pageStartIndex = 0; | ||
| while (inputPages.isEmpty() == false) { | ||
| Page inputPage = inputPages.peek(); | ||
| Page outputPage; | ||
| Block changeTypeBlock = null; | ||
| Block changePvalueBlock = null; | ||
| boolean success = false; | ||
| try { | ||
| if (pageStartIndex <= changePointIndex && changePointIndex < pageStartIndex + inputPage.getPositionCount()) { | ||
| try ( | ||
| BytesRefBlock.Builder changeTypeBlockBuilder = blockFactory.newBytesRefBlockBuilder(inputPage.getPositionCount()); | ||
| DoubleBlock.Builder pvalueBlockBuilder = blockFactory.newDoubleBlockBuilder(inputPage.getPositionCount()) | ||
| ) { | ||
| for (int i = 0; i < inputPage.getPositionCount(); i++) { | ||
| if (pageStartIndex + i == changePointIndex) { | ||
| changeTypeBlockBuilder.appendBytesRef(new BytesRef(changeType.getWriteableName())); | ||
| pvalueBlockBuilder.appendDouble(changeType.pValue()); | ||
| } else { | ||
| changeTypeBlockBuilder.appendNull(); | ||
| pvalueBlockBuilder.appendNull(); | ||
| } | ||
| } | ||
| changeTypeBlock = changeTypeBlockBuilder.build(); | ||
| changePvalueBlock = pvalueBlockBuilder.build(); | ||
| } | ||
| } else { | ||
| changeTypeBlock = blockFactory.newConstantNullBlock(inputPage.getPositionCount()); | ||
| changePvalueBlock = blockFactory.newConstantNullBlock(inputPage.getPositionCount()); | ||
| } | ||
|
|
||
| outputPage = inputPage.appendBlocks(new Block[] { changeTypeBlock, changePvalueBlock }); | ||
| success = true; | ||
| } finally { | ||
| if (success == false) { | ||
| Releasables.closeExpectNoException(changeTypeBlock, changePvalueBlock); | ||
| } | ||
| } | ||
|
|
||
| inputPages.removeFirst(); | ||
| outputPages.add(outputPage); | ||
| pageStartIndex += inputPage.getPositionCount(); | ||
| } | ||
|
|
||
| if (changeType instanceof ChangeType.Indeterminable indeterminable) { | ||
| warnings(false).registerException(new IllegalArgumentException(indeterminable.getReason())); | ||
| } | ||
| if (tooManyValues) { | ||
| warnings(true).registerException( | ||
| new IllegalArgumentException("too many values; keeping only first " + INPUT_VALUE_COUNT_LIMIT + " values") | ||
| ); | ||
| } | ||
| if (hasNulls) { | ||
| warnings(true).registerException(new IllegalArgumentException("values contain nulls; treating them as zeroes")); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| for (Page page : inputPages) { | ||
| page.releaseBlocks(); | ||
| } | ||
| for (Page page : outputPages) { | ||
| page.releaseBlocks(); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return "ChangePointOperator[channel=" + channel + "]"; | ||
| } | ||
|
|
||
| private Warnings warnings(boolean onlyWarnings) { | ||
| if (warnings == null) { | ||
| if (onlyWarnings) { | ||
| this.warnings = Warnings.createOnlyWarnings(driverContext.warningsMode(), sourceLine, sourceColumn, sourceText); | ||
| } else { | ||
| this.warnings = Warnings.createWarnings(driverContext.warningsMode(), sourceLine, sourceColumn, sourceText); | ||
| } | ||
| } | ||
| return warnings; | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
86 changes: 86 additions & 0 deletions
86
...ql/compute/src/test/java/org/elasticsearch/compute/operator/ChangePointOperatorTests.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,86 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
|
|
||
| package org.elasticsearch.compute.operator; | ||
|
|
||
| import org.apache.lucene.util.BytesRef; | ||
| import org.elasticsearch.compute.data.BlockFactory; | ||
| import org.elasticsearch.compute.data.BytesRefBlock; | ||
| import org.elasticsearch.compute.data.DoubleBlock; | ||
| import org.elasticsearch.compute.data.LongBlock; | ||
| import org.elasticsearch.compute.data.Page; | ||
| import org.elasticsearch.compute.test.OperatorTestCase; | ||
| import org.elasticsearch.compute.test.SequenceLongBlockSourceOperator; | ||
| import org.hamcrest.Matcher; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
|
|
||
| import static org.hamcrest.Matchers.equalTo; | ||
| import static org.hamcrest.Matchers.hasSize; | ||
|
|
||
| public class ChangePointOperatorTests extends OperatorTestCase { | ||
|
|
||
| @Override | ||
| protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { | ||
| // size must be in [25, 1000] for ChangePoint to function correctly | ||
| // and detect the step change. | ||
| size = Math.clamp(size, 25, 1000); | ||
| List<Long> data = new ArrayList<>(size); | ||
| for (int i = 0; i < size; i++) { | ||
| if (i <= size / 2) { | ||
| data.add(0L); | ||
| } else { | ||
| data.add(1L); | ||
| } | ||
| } | ||
| return new SequenceLongBlockSourceOperator(blockFactory, data); | ||
| } | ||
|
|
||
| @Override | ||
| protected void assertSimpleOutput(List<Page> input, List<Page> results) { | ||
| boolean seenOne = false; | ||
| assertThat(results, hasSize(input.size())); | ||
| for (int i = 0; i < results.size(); i++) { | ||
| Page inputPage = input.get(i); | ||
| Page resultPage = results.get(i); | ||
| assertThat(resultPage.getPositionCount(), equalTo(inputPage.getPositionCount())); | ||
| assertThat(resultPage.getBlockCount(), equalTo(3)); | ||
| for (int j = 0; j < resultPage.getPositionCount(); j++) { | ||
| long inputValue = ((LongBlock) resultPage.getBlock(0)).getLong(j); | ||
| long resultValue = ((LongBlock) resultPage.getBlock(0)).getLong(j); | ||
| assertThat(resultValue, equalTo(inputValue)); | ||
| if (seenOne == false && resultValue == 1L) { | ||
| BytesRef type = ((BytesRefBlock) resultPage.getBlock(1)).getBytesRef(j, new BytesRef()); | ||
| double pvalue = ((DoubleBlock) resultPage.getBlock(2)).getDouble(j); | ||
| assertThat(type.utf8ToString(), equalTo("step_change")); | ||
| assertThat(pvalue, equalTo(0.0)); | ||
| seenOne = true; | ||
| } else { | ||
| assertThat(resultPage.getBlock(1).isNull(j), equalTo(true)); | ||
| assertThat(resultPage.getBlock(2).isNull(j), equalTo(true)); | ||
| } | ||
| } | ||
| } | ||
| assertThat(seenOne, equalTo(true)); | ||
| } | ||
|
|
||
| @Override | ||
| protected Operator.OperatorFactory simple() { | ||
| return new ChangePointOperator.Factory(0, null, 0, 0); | ||
| } | ||
|
|
||
| @Override | ||
| protected Matcher<String> expectedDescriptionOfSimple() { | ||
| return equalTo("ChangePointOperator[channel=0]"); | ||
| } | ||
|
|
||
| @Override | ||
| protected Matcher<String> expectedToStringOfSimple() { | ||
| return equalTo("ChangePointOperator[channel=0]"); | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.