Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/119458.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 119458
summary: ES|QL change point
area: Machine Learning
type: feature
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,8 @@
*/
Class<? extends Exception>[] warnExceptions() default {};

/**
* If {@code true} then the @timestamp LongVector will be appended to the input blocks of the aggregation function.
*/
boolean includeTimestamps() default false;
}
22 changes: 22 additions & 0 deletions x-pack/plugin/esql/compute/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,28 @@ tasks.named('stringTemplates').configure {
it.outputFile = "org/elasticsearch/compute/aggregation/TopIpAggregator.java"
}

File changePointAggregatorInputFile = new File("${projectDir}/src/main/java/org/elasticsearch/compute/aggregation/X-ChangePointAggregator.java.st")
template {
it.properties = intProperties
it.inputFile = changePointAggregatorInputFile
it.outputFile = "org/elasticsearch/compute/aggregation/ChangePointIntAggregator.java"
}
template {
it.properties = longProperties
it.inputFile = changePointAggregatorInputFile
it.outputFile = "org/elasticsearch/compute/aggregation/ChangePointLongAggregator.java"
}
template {
it.properties = floatProperties
it.inputFile = changePointAggregatorInputFile
it.outputFile = "org/elasticsearch/compute/aggregation/ChangePointFloatAggregator.java"
}
template {
it.properties = doubleProperties
it.inputFile = changePointAggregatorInputFile
it.outputFile = "org/elasticsearch/compute/aggregation/ChangePointDoubleAggregator.java"
}

File multivalueDedupeInputFile = file("src/main/java/org/elasticsearch/compute/operator/mvdedupe/X-MultivalueDedupe.java.st")
template {
it.properties = intProperties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,14 @@ public class AggregatorImplementer {
private final boolean valuesIsBytesRef;
private final List<IntermediateStateDesc> intermediateState;
private final List<Parameter> createParameters;
private final boolean includeTimestampVector;

public AggregatorImplementer(
Elements elements,
TypeElement declarationType,
IntermediateState[] interStateAnno,
List<TypeMirror> warnExceptions
List<TypeMirror> warnExceptions,
boolean includeTimestampVector
) {
this.declarationType = declarationType;
this.warnExceptions = warnExceptions;
Expand Down Expand Up @@ -128,6 +130,7 @@ public AggregatorImplementer(
);
this.valuesIsBytesRef = BYTES_REF.equals(TypeName.get(combine.getParameters().get(combine.getParameters().size() - 1).asType()));
intermediateState = Arrays.stream(interStateAnno).map(IntermediateStateDesc::newIntermediateStateDesc).toList();
this.includeTimestampVector = includeTimestampVector;
}

ClassName implementation() {
Expand Down Expand Up @@ -359,34 +362,44 @@ private MethodSpec addRawInput() {
builder.addStatement("return");
}
builder.endControlFlow();
builder.addStatement("$T block = page.getBlock(channels.get(0))", valueBlockType(init, combine));
builder.addStatement("$T vector = block.asVector()", valueVectorType(init, combine));
if (includeTimestampVector) {
builder.addStatement("$T timestampsBlock = page.getBlock(channels.get(1))", LONG_BLOCK);
builder.addStatement("$T timestampsVector = timestampsBlock.asVector()", LONG_VECTOR);
builder.beginControlFlow("if (timestampsVector == null) ");
builder.addStatement("throw new IllegalStateException($S)", "expected @timestamp vector; but got a block");
builder.endControlFlow();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine. But it's worth saying out loud: Things like this usually should become null and a warning. They'd put the agg in an "I'm broken" state and produce a warning on output. Like sum should do if it overflows. It doesn't do that now, but it should.

Anyway, I think this is fine to just hard fail here - just like this - because we're going to want to build machinery around the agg to make sure that it's input is a time series. Which will have the constraint that the timestamp is always single valued. And, probably, descending.

}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First arity 2 aggregation function!

I suppose it's ok to make it timestamp-specific at this point. At some point we'll rework this when we more correlation or something, but it's all good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RATE aggregation already takes two args (also timestamp also 2nd arg), but that's still in snapshot mode. Unfortunately, that's just a GroupingAggregator. So, I basically ported the includeTimestamps from that.


builder.beginControlFlow("if (mask.allTrue())");
String extra = includeTimestampVector ? ", timestampsVector" : "";
{
builder.addComment("No masking");
builder.addStatement("$T block = page.getBlock(channels.get(0))", valueBlockType(init, combine));
builder.addStatement("$T vector = block.asVector()", valueVectorType(init, combine));
builder.beginControlFlow("if (vector != null)");
builder.addStatement("addRawVector(vector)");
builder.addStatement("addRawVector(vector$L)", extra);
builder.nextControlFlow("else");
builder.addStatement("addRawBlock(block)");
builder.addStatement("addRawBlock(block$L)", extra);
builder.endControlFlow();
builder.addStatement("return");
}
builder.endControlFlow();

builder.nextControlFlow("else");
builder.addComment("Some positions masked away, others kept");
builder.addStatement("$T block = page.getBlock(channels.get(0))", valueBlockType(init, combine));
builder.addStatement("$T vector = block.asVector()", valueVectorType(init, combine));
builder.beginControlFlow("if (vector != null)");
builder.addStatement("addRawVector(vector, mask)");
builder.addStatement("addRawVector(vector$L, mask)", extra);
builder.nextControlFlow("else");
builder.addStatement("addRawBlock(block, mask)");
builder.addStatement("addRawBlock(block$L, mask)", extra);
builder.endControlFlow();
builder.endControlFlow();
return builder.build();
}

private MethodSpec addRawVector(boolean masked) {
MethodSpec.Builder builder = MethodSpec.methodBuilder("addRawVector");
builder.addModifiers(Modifier.PRIVATE).addParameter(valueVectorType(init, combine), "vector");
if (includeTimestampVector) {
builder.addParameter(LONG_VECTOR, "timestamps");
}
if (masked) {
builder.addParameter(BOOLEAN_VECTOR, "mask");
}
Expand Down Expand Up @@ -416,6 +429,9 @@ private MethodSpec addRawVector(boolean masked) {
private MethodSpec addRawBlock(boolean masked) {
MethodSpec.Builder builder = MethodSpec.methodBuilder("addRawBlock");
builder.addModifiers(Modifier.PRIVATE).addParameter(valueBlockType(init, combine), "block");
if (includeTimestampVector) {
builder.addParameter(LONG_VECTOR, "timestamps");
}
if (masked) {
builder.addParameter(BOOLEAN_VECTOR, "mask");
}
Expand Down Expand Up @@ -455,6 +471,8 @@ private void combineRawInput(MethodSpec.Builder builder, String blockVariable) {
}
if (valuesIsBytesRef) {
combineRawInputForBytesRef(builder, blockVariable);
} else if (includeTimestampVector) {
combineRawInputWithTimestamp(builder, blockVariable);
} else if (returnType.isPrimitive()) {
combineRawInputForPrimitive(returnType, builder, blockVariable);
} else if (returnType == TypeName.VOID) {
Expand Down Expand Up @@ -492,6 +510,12 @@ private void combineRawInputForVoid(MethodSpec.Builder builder, String blockVari
);
}

private void combineRawInputWithTimestamp(MethodSpec.Builder builder, String blockVariable) {
TypeName valueType = TypeName.get(combine.getParameters().get(combine.getParameters().size() - 1).asType());
String blockType = valueType.toString().substring(0, 1).toUpperCase(Locale.ROOT) + valueType.toString().substring(1);
builder.addStatement("$T.combine(state, timestamps.getLong(i), $L.get$L(i))", declarationType, blockVariable, blockType);
}

private void combineRawInputForBytesRef(MethodSpec.Builder builder, String blockVariable) {
// scratch is a BytesRef var that must have been defined before the iteration starts
builder.addStatement("$T.combine(state, $L.getBytesRef(i, scratch))", declarationType, blockVariable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,14 @@ public boolean process(Set<? extends TypeElement> set, RoundEnvironment roundEnv
);
if (aggClass.getAnnotation(Aggregator.class) != null) {
IntermediateState[] intermediateState = aggClass.getAnnotation(Aggregator.class).value();
implementer = new AggregatorImplementer(env.getElementUtils(), aggClass, intermediateState, warnExceptionsTypes);
boolean includeTimestamps = aggClass.getAnnotation(Aggregator.class).includeTimestamps();
implementer = new AggregatorImplementer(
env.getElementUtils(),
aggClass,
intermediateState,
warnExceptionsTypes,
includeTimestamps
);
write(aggClass, "aggregator", implementer.sourceFile(), env);
}
GroupingAggregatorImplementer groupingAggregatorImplementer = null;
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading