Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
Class<? extends Exception>[] warnExceptions() default {};

/**
* If {@code true} then the @timestamp LongVector will be appended to the input blocks of the aggregation function.
* {@code true} if this is a time-series aggregation
*/
boolean includeTimestamps() default false;
boolean timeseries() default false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public boolean process(Set<? extends TypeElement> set, RoundEnvironment roundEnv
aggClass,
intermediateState,
warnExceptionsTypes,
aggClass.getAnnotation(GroupingAggregator.class).includeTimestamps()
aggClass.getAnnotation(GroupingAggregator.class).timeseries()
);
write(aggClass, "grouping aggregator", groupingAggregatorImplementer.sourceFile(), env);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static org.elasticsearch.compute.gen.Types.BYTES_REF;
import static org.elasticsearch.compute.gen.Types.DRIVER_CONTEXT;
import static org.elasticsearch.compute.gen.Types.ELEMENT_TYPE;
import static org.elasticsearch.compute.gen.Types.GROUPING_AGGREGATOR_EVALUATOR_CONTEXT;
import static org.elasticsearch.compute.gen.Types.GROUPING_AGGREGATOR_FUNCTION;
import static org.elasticsearch.compute.gen.Types.GROUPING_AGGREGATOR_FUNCTION_ADD_INPUT;
import static org.elasticsearch.compute.gen.Types.INTERMEDIATE_STATE_DESC;
Expand Down Expand Up @@ -82,7 +83,7 @@ public class GroupingAggregatorImplementer {
private final List<Parameter> createParameters;
private final ClassName implementation;
private final List<AggregatorImplementer.IntermediateStateDesc> intermediateState;
private final boolean includeTimestampVector;
private final boolean timseries;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/timseries/timeSeries


private final AggregationState aggState;
private final AggregationParameter aggParam;
Expand All @@ -92,7 +93,7 @@ public GroupingAggregatorImplementer(
TypeElement declarationType,
IntermediateState[] interStateAnno,
List<TypeMirror> warnExceptions,
boolean includeTimestampVector
boolean timseries
) {
this.declarationType = declarationType;
this.warnExceptions = warnExceptions;
Expand All @@ -109,7 +110,7 @@ public GroupingAggregatorImplementer(
declarationType,
aggState.declaredType().isPrimitive() ? requireType(aggState.declaredType()) : requireVoidType(),
requireName("combine"),
combineArgs(aggState, includeTimestampVector)
combineArgs(aggState, timseries)
);
// TODO support multiple parameters
this.aggParam = AggregationParameter.create(combine.getParameters().getLast().asType());
Expand All @@ -128,7 +129,7 @@ public GroupingAggregatorImplementer(
this.intermediateState = Arrays.stream(interStateAnno)
.map(AggregatorImplementer.IntermediateStateDesc::newIntermediateStateDesc)
.toList();
this.includeTimestampVector = includeTimestampVector;
this.timseries = timseries;
}

private static Methods.ArgumentMatcher combineArgs(AggregationState aggState, boolean includeTimestampVector) {
Expand Down Expand Up @@ -318,7 +319,7 @@ private MethodSpec prepareProcessPage() {

builder.addStatement("$T valuesBlock = page.getBlock(channels.get(0))", blockType(aggParam.type()));
builder.addStatement("$T valuesVector = valuesBlock.asVector()", vectorType(aggParam.type()));
if (includeTimestampVector) {
if (timseries) {
builder.addStatement("$T timestampsBlock = page.getBlock(channels.get(1))", LONG_BLOCK);
builder.addStatement("$T timestampsVector = timestampsBlock.asVector()", LONG_VECTOR);

Expand All @@ -327,7 +328,7 @@ private MethodSpec prepareProcessPage() {
builder.endControlFlow();
}
builder.beginControlFlow("if (valuesVector == null)");
String extra = includeTimestampVector ? ", timestampsVector" : "";
String extra = timseries ? ", timestampsVector" : "";
{
builder.beginControlFlow("if (valuesBlock.mayHaveNulls())");
builder.addStatement("state.enableGroupIdTracking(seenGroupIds)");
Expand Down Expand Up @@ -373,7 +374,7 @@ private MethodSpec addRawInputLoop(TypeName groupsType, TypeName valuesType) {
MethodSpec.Builder builder = MethodSpec.methodBuilder("addRawInput");
builder.addModifiers(Modifier.PRIVATE);
builder.addParameter(TypeName.INT, "positionOffset").addParameter(groupsType, "groups").addParameter(valuesType, "values");
if (includeTimestampVector) {
if (timseries) {
builder.addParameter(LONG_VECTOR, "timestamps");
}
if (aggParam.isBytesRef()) {
Expand Down Expand Up @@ -456,7 +457,7 @@ private void combineRawInput(MethodSpec.Builder builder, String blockVariable, S

private void combineRawInputForBytesRef(MethodSpec.Builder builder, String blockVariable, String offsetVariable) {
// scratch is a BytesRef var that must have been defined before the iteration starts
if (includeTimestampVector) {
if (timseries) {
if (offsetVariable.contains(" + ")) {
builder.addStatement("var valuePosition = $L", offsetVariable);
offsetVariable = "valuePosition";
Expand All @@ -474,7 +475,7 @@ private void combineRawInputForBytesRef(MethodSpec.Builder builder, String block
}

private void combineRawInputForPrimitive(MethodSpec.Builder builder, String blockVariable, String offsetVariable) {
if (includeTimestampVector) {
if (timseries) {
if (offsetVariable.contains(" + ")) {
builder.addStatement("var valuePosition = $L", offsetVariable);
offsetVariable = "valuePosition";
Expand All @@ -498,7 +499,7 @@ private void combineRawInputForPrimitive(MethodSpec.Builder builder, String bloc
}

private void combineRawInputForVoid(MethodSpec.Builder builder, String blockVariable, String offsetVariable) {
if (includeTimestampVector) {
if (timseries) {
if (offsetVariable.contains(" + ")) {
builder.addStatement("var valuePosition = $L", offsetVariable);
offsetVariable = "valuePosition";
Expand Down Expand Up @@ -683,18 +684,30 @@ private MethodSpec evaluateFinal() {
.addParameter(BLOCK_ARRAY, "blocks")
.addParameter(TypeName.INT, "offset")
.addParameter(INT_VECTOR, "selected")
.addParameter(DRIVER_CONTEXT, "driverContext");
.addParameter(GROUPING_AGGREGATOR_EVALUATOR_CONTEXT, "evaluatorContext");

if (aggState.declaredType().isPrimitive()) {
builder.addStatement("blocks[offset] = state.toValuesBlock(selected, driverContext)");
builder.addStatement("blocks[offset] = state.toValuesBlock(selected, evaluatorContext.driverContext())");
} else if (timseries) {
requireStaticMethod(
declarationType,
requireType(BLOCK),
requireName("evaluateFinal"),
requireArgs(
requireType(aggState.declaredType()),
requireType(INT_VECTOR),
requireType(GROUPING_AGGREGATOR_EVALUATOR_CONTEXT)
)
);
builder.addStatement("blocks[offset] = $T.evaluateFinal(state, selected, evaluatorContext)", declarationType);
} else {
requireStaticMethod(
declarationType,
requireType(BLOCK),
requireName("evaluateFinal"),
requireArgs(requireType(aggState.declaredType()), requireType(INT_VECTOR), requireType(DRIVER_CONTEXT))
);
builder.addStatement("blocks[offset] = $T.evaluateFinal(state, selected, driverContext)", declarationType);
builder.addStatement("blocks[offset] = $T.evaluateFinal(state, selected, evaluatorContext.driverContext())", declarationType);
}
return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ public class Types {
static final TypeName LIST_AGG_FUNC_DESC = ParameterizedTypeName.get(ClassName.get(List.class), INTERMEDIATE_STATE_DESC);

static final ClassName DRIVER_CONTEXT = ClassName.get(OPERATOR_PACKAGE, "DriverContext");
static final ClassName GROUPING_AGGREGATOR_EVALUATOR_CONTEXT = ClassName.get(
AGGREGATION_PACKAGE,
"GroupingAggregatorEvaluationContext"
);

static final ClassName EXPRESSION_EVALUATOR = ClassName.get(OPERATOR_PACKAGE, "EvalOperator", "ExpressionEvaluator");
static final ClassName EXPRESSION_EVALUATOR_FACTORY = ClassName.get(OPERATOR_PACKAGE, "EvalOperator", "ExpressionEvaluator", "Factory");
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading