Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/120998.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 120998
summary: ES|QL `change_point` processing command
area: Machine Learning
type: feature
issues: []
2 changes: 2 additions & 0 deletions docs/reference/esql/esql-commands.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ image::images/esql/processing-command.svg[A processing command changing an input

{esql} supports these processing commands:

* experimental:[] <<esql-change_point>>
* <<esql-dissect>>
* <<esql-drop>>
* <<esql-enrich>>
Expand All @@ -55,6 +56,7 @@ include::source-commands/from.asciidoc[]
include::source-commands/row.asciidoc[]
include::source-commands/show.asciidoc[]

include::processing-commands/change_point.asciidoc[]
include::processing-commands/dissect.asciidoc[]
include::processing-commands/drop.asciidoc[]
include::processing-commands/enrich.asciidoc[]
Expand Down
47 changes: 47 additions & 0 deletions docs/reference/esql/processing-commands/change_point.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
[discrete]
[[esql-change_point]]
=== `CHANGE_POINT`

[NOTE]
====
The `CHANGE_POINT` command requires a [platinum license](https://www.elastic.co/subscriptions).

preview::[]

`CHANGE_POINT` detects spikes, dips, and change points in a metric.

**Syntax**

[source,esql]
----
CHANGE_POINT value [ON key] [AS type_name, pvalue_name]
----

*Parameters*

`value`
: The column with the metric in which you want to detect a change point.

`key`
: The column with the key to order the values by. If not specified, `@timestamp` is used.

`type_name`
: The name of the output column with the change point type. If not specified, `type` is used.

`pvalue_name`
: The name of the output column with the p-value that indicates how extreme the change point is. If not specified, `pvalue` is used.

[NOTE]
====
There must be at least 22 values for change point detection. Fewer than 1,000 is preferred.

*Example*

[source.merge.styled,esql]
----
include::{esql-specs}/change_point.csv-spec[tag=changePointForDocs]
----
[%header.monospaced.styled,format=dsv,separator=|]
|===
include::{esql-specs}/change_point.csv-spec[tag=changePointForDocs-result]
|===
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockUtils;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xpack.ml.aggs.MlAggsHelper;
import org.elasticsearch.xpack.ml.aggs.changepoint.ChangePointDetector;
import org.elasticsearch.xpack.ml.aggs.changepoint.ChangeType;

import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;

/**
* Find spikes, dips and change point in a list of values.
* <p>
* Warning: this operator cannot handle large amounts of data! It buffers all
* data that is passed to it, runs the change point detector on the data (which
* is a compute-heavy process), and then outputs all data with the change points.
*/
public class ChangePointOperator implements Operator {

public static final int INPUT_VALUE_COUNT_LIMIT = 1000;

public record Factory(int channel, String sourceText, int sourceLine, int sourceColumn) implements OperatorFactory {
@Override
public Operator get(DriverContext driverContext) {
return new ChangePointOperator(driverContext, channel, sourceText, sourceLine, sourceColumn);
}

@Override
public String describe() {
return "ChangePointOperator[channel=" + channel + "]";
}
}

private final DriverContext driverContext;
private final int channel;
private final String sourceText;
private final int sourceLine;
private final int sourceColumn;

private final Deque<Page> inputPages;
private final Deque<Page> outputPages;
private boolean finished;
private Warnings warnings;

// TODO: make org.elasticsearch.xpack.esql.core.tree.Source available here
// (by modularizing esql-core) and use that instead of the individual fields.
public ChangePointOperator(DriverContext driverContext, int channel, String sourceText, int sourceLine, int sourceColumn) {
this.driverContext = driverContext;
this.channel = channel;
this.sourceText = sourceText;
this.sourceLine = sourceLine;
this.sourceColumn = sourceColumn;

finished = false;
inputPages = new LinkedList<>();
outputPages = new LinkedList<>();
warnings = null;
}

@Override
public boolean needsInput() {
return finished == false;
}

@Override
public void addInput(Page page) {
inputPages.add(page);
}

@Override
public void finish() {
if (finished == false) {
finished = true;
createOutputPages();
}
}

@Override
public boolean isFinished() {
return finished && outputPages.isEmpty();
}

@Override
public Page getOutput() {
if (finished == false || outputPages.isEmpty()) {
return null;
}
return outputPages.removeFirst();
}

private void createOutputPages() {
int valuesCount = 0;
for (Page page : inputPages) {
valuesCount += page.getPositionCount();
}
boolean tooManyValues = valuesCount > INPUT_VALUE_COUNT_LIMIT;
if (tooManyValues) {
valuesCount = INPUT_VALUE_COUNT_LIMIT;
}

List<Double> values = new ArrayList<>(valuesCount);
List<Integer> bucketIndexes = new ArrayList<>(valuesCount);
int valuesIndex = 0;
boolean hasNulls = false;
boolean hasMultivalued = false;
for (Page inputPage : inputPages) {
Block inputBlock = inputPage.getBlock(channel);
for (int i = 0; i < inputBlock.getPositionCount() && valuesIndex < valuesCount; i++) {
Object value = BlockUtils.toJavaObject(inputBlock, i);
if (value == null) {
hasNulls = true;
valuesIndex++;
} else if (value instanceof List<?>) {
hasMultivalued = true;
valuesIndex++;
} else {
values.add(((Number) value).doubleValue());
bucketIndexes.add(valuesIndex++);
}
}
}

MlAggsHelper.DoubleBucketValues bucketValues = new MlAggsHelper.DoubleBucketValues(
null,
values.stream().mapToDouble(Double::doubleValue).toArray(),
bucketIndexes.stream().mapToInt(Integer::intValue).toArray()
);
ChangeType changeType = ChangePointDetector.getChangeType(bucketValues);
int changePointIndex = changeType.changePoint();

BlockFactory blockFactory = driverContext.blockFactory();
int pageStartIndex = 0;
while (inputPages.isEmpty() == false) {
Page inputPage = inputPages.peek();
Page outputPage;
Block changeTypeBlock = null;
Block changePvalueBlock = null;
boolean success = false;
try {
if (pageStartIndex <= changePointIndex && changePointIndex < pageStartIndex + inputPage.getPositionCount()) {
try (
BytesRefBlock.Builder changeTypeBlockBuilder = blockFactory.newBytesRefBlockBuilder(inputPage.getPositionCount());
DoubleBlock.Builder pvalueBlockBuilder = blockFactory.newDoubleBlockBuilder(inputPage.getPositionCount())
) {
for (int i = 0; i < inputPage.getPositionCount(); i++) {
if (pageStartIndex + i == changePointIndex) {
changeTypeBlockBuilder.appendBytesRef(new BytesRef(changeType.getWriteableName()));
pvalueBlockBuilder.appendDouble(changeType.pValue());
} else {
changeTypeBlockBuilder.appendNull();
pvalueBlockBuilder.appendNull();
}
}
changeTypeBlock = changeTypeBlockBuilder.build();
changePvalueBlock = pvalueBlockBuilder.build();
}
} else {
changeTypeBlock = blockFactory.newConstantNullBlock(inputPage.getPositionCount());
changePvalueBlock = blockFactory.newConstantNullBlock(inputPage.getPositionCount());
}

outputPage = inputPage.appendBlocks(new Block[] { changeTypeBlock, changePvalueBlock });
success = true;
} finally {
if (success == false) {
Releasables.closeExpectNoException(changeTypeBlock, changePvalueBlock);
}
}

inputPages.removeFirst();
outputPages.add(outputPage);
pageStartIndex += inputPage.getPositionCount();
}

if (changeType instanceof ChangeType.Indeterminable indeterminable) {
warnings(false).registerException(new IllegalArgumentException(indeterminable.getReason()));
}
if (tooManyValues) {
warnings(true).registerException(
new IllegalArgumentException("too many values; keeping only first " + INPUT_VALUE_COUNT_LIMIT + " values")
);
}
if (hasNulls) {
warnings(true).registerException(new IllegalArgumentException("values contain nulls; skipping them"));
}
if (hasMultivalued) {
warnings(true).registerException(
new IllegalArgumentException(
"values contains multivalued entries; skipping them (please consider reducing them with e.g. MV_AVG or MV_SUM)"
)
);
}
}

@Override
public void close() {
for (Page page : inputPages) {
page.releaseBlocks();
}
for (Page page : outputPages) {
page.releaseBlocks();
}
}

@Override
public String toString() {
return "ChangePointOperator[channel=" + channel + "]";
}

private Warnings warnings(boolean onlyWarnings) {
if (warnings == null) {
if (onlyWarnings) {
this.warnings = Warnings.createOnlyWarnings(driverContext.warningsMode(), sourceLine, sourceColumn, sourceText);
} else {
this.warnings = Warnings.createWarnings(driverContext.warningsMode(), sourceLine, sourceColumn, sourceText);
}
}
return warnings;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ public void registerException(Exception exception) {
* @param sourceText The source text that caused the warning. Same as `source.text()`
* @return A warnings collector object
*/
// TODO: rename to createWarningsTreatedAsNull
public static Warnings createWarnings(DriverContext.WarningsMode warningsMode, int lineNumber, int columnNumber, String sourceText) {
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "treating result as null");
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "evaluation of [{}] failed, treating result as null");
}

/**
Expand All @@ -50,7 +51,26 @@ public static Warnings createWarningsTreatedAsFalse(
int columnNumber,
String sourceText
) {
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "treating result as false");
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "evaluation of [{}] failed, treating result as false");
}

/**
* Create a new warnings object based on the given mode which warns that
* evaluation resulted in warnings.
* @param warningsMode The warnings collection strategy to use
* @param lineNumber The line number of the source text. Same as `source.getLineNumber()`
* @param columnNumber The column number of the source text. Same as `source.getColumnNumber()`
* @param sourceText The source text that caused the warning. Same as `source.text()`
* @return A warnings collector object
*/
// TODO: rename to createWarnings
public static Warnings createOnlyWarnings(
DriverContext.WarningsMode warningsMode,
int lineNumber,
int columnNumber,
String sourceText
) {
return createWarnings(warningsMode, lineNumber, columnNumber, sourceText, "warnings during evaluation of [{}]");
}

private static Warnings createWarnings(
Expand Down Expand Up @@ -78,14 +98,7 @@ private static Warnings createWarnings(

private Warnings(int lineNumber, int columnNumber, String sourceText, String first) {
this.location = format("Line {}:{}: ", lineNumber, columnNumber);
this.first = format(
null,
"{}evaluation of [{}] failed, {}. Only first {} failures recorded.",
location,
sourceText,
first,
MAX_ADDED_WARNINGS
);
this.first = format(null, "{}" + first + ". Only first {} failures recorded.", location, sourceText, MAX_ADDED_WARNINGS);
}

public void registerException(Exception exception) {
Expand Down
Loading