Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,4 @@
* into a warning and turn into a null value.
*/
Class<? extends Exception>[] warnExceptions() default {};

/**
* If {@code true} then the @timestamp LongVector will be appended to the input blocks of the aggregation function.
*/
boolean includeTimestamps() default false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,4 @@
* into a warning and turn into a null value.
*/
Class<? extends Exception>[] warnExceptions() default {};

/**
* {@code true} if this is a time-series aggregation
*/
boolean timeseries() default false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import static org.elasticsearch.compute.gen.Methods.requireAnyArgs;
import static org.elasticsearch.compute.gen.Methods.requireAnyType;
import static org.elasticsearch.compute.gen.Methods.requireArgs;
import static org.elasticsearch.compute.gen.Methods.requireArgsStartsWith;
import static org.elasticsearch.compute.gen.Methods.requireName;
import static org.elasticsearch.compute.gen.Methods.requirePrimitiveOrImplements;
import static org.elasticsearch.compute.gen.Methods.requireStaticMethod;
Expand Down Expand Up @@ -87,8 +88,7 @@ public AggregatorImplementer(
Elements elements,
TypeElement declarationType,
IntermediateState[] interStateAnno,
List<TypeMirror> warnExceptions,
boolean includeTimestampVector
List<TypeMirror> warnExceptions
) {
this.declarationType = declarationType;
this.warnExceptions = warnExceptions;
Expand All @@ -105,8 +105,18 @@ public AggregatorImplementer(
declarationType,
aggState.declaredType().isPrimitive() ? requireType(aggState.declaredType()) : requireVoidType(),
requireName("combine"),
combineArgs(aggState, includeTimestampVector)
requireArgsStartsWith(requireType(aggState.declaredType()), requireAnyType("<aggregation input column type>"))
);
switch (combine.getParameters().size()) {
case 2 -> includeTimestampVector = false;
case 3 -> {
if (false == TypeName.get(combine.getParameters().get(1).asType()).equals(TypeName.LONG)) {
throw new IllegalArgumentException("combine/3's second parameter must be long but was: " + combine);
}
includeTimestampVector = true;
}
default -> throw new IllegalArgumentException("combine must have 2 or 3 parameters but was: " + combine);
}
// TODO support multiple parameters
this.aggParam = AggregationParameter.create(combine.getParameters().getLast().asType());

Expand All @@ -121,19 +131,6 @@ public AggregatorImplementer(
(declarationType.getSimpleName() + "AggregatorFunction").replace("AggregatorAggregator", "Aggregator")
);
this.intermediateState = Arrays.stream(interStateAnno).map(IntermediateStateDesc::newIntermediateStateDesc).toList();
this.includeTimestampVector = includeTimestampVector;
}

private static Methods.ArgumentMatcher combineArgs(AggregationState aggState, boolean includeTimestampVector) {
if (includeTimestampVector) {
return requireArgs(
requireType(aggState.declaredType()),
requireType(TypeName.LONG), // @timestamp
requireAnyType("<aggregation input column type>")
);
} else {
return requireArgs(requireType(aggState.declaredType()), requireAnyType("<aggregation input column type>"));
}
}

ClassName implementation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,7 @@ public boolean process(Set<? extends TypeElement> set, RoundEnvironment roundEnv
);
if (aggClass.getAnnotation(Aggregator.class) != null) {
IntermediateState[] intermediateState = aggClass.getAnnotation(Aggregator.class).value();
implementer = new AggregatorImplementer(
env.getElementUtils(),
aggClass,
intermediateState,
warnExceptionsTypes,
aggClass.getAnnotation(Aggregator.class).includeTimestamps()
);
implementer = new AggregatorImplementer(env.getElementUtils(), aggClass, intermediateState, warnExceptionsTypes);
write(aggClass, "aggregator", implementer.sourceFile(), env);
}
GroupingAggregatorImplementer groupingAggregatorImplementer = null;
Expand All @@ -106,8 +100,7 @@ public boolean process(Set<? extends TypeElement> set, RoundEnvironment roundEnv
env.getElementUtils(),
aggClass,
intermediateState,
warnExceptionsTypes,
aggClass.getAnnotation(GroupingAggregator.class).timeseries()
warnExceptionsTypes
);
write(aggClass, "grouping aggregator", groupingAggregatorImplementer.sourceFile(), env);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static org.elasticsearch.compute.gen.Methods.requireAnyArgs;
import static org.elasticsearch.compute.gen.Methods.requireAnyType;
import static org.elasticsearch.compute.gen.Methods.requireArgs;
import static org.elasticsearch.compute.gen.Methods.requireArgsStartsWith;
import static org.elasticsearch.compute.gen.Methods.requireName;
import static org.elasticsearch.compute.gen.Methods.requirePrimitiveOrImplements;
import static org.elasticsearch.compute.gen.Methods.requireStaticMethod;
Expand Down Expand Up @@ -98,8 +99,7 @@ public GroupingAggregatorImplementer(
Elements elements,
TypeElement declarationType,
IntermediateState[] interStateAnno,
List<TypeMirror> warnExceptions,
boolean timseries
List<TypeMirror> warnExceptions
) {
this.declarationType = declarationType;
this.warnExceptions = warnExceptions;
Expand All @@ -116,8 +116,18 @@ public GroupingAggregatorImplementer(
declarationType,
aggState.declaredType().isPrimitive() ? requireType(aggState.declaredType()) : requireVoidType(),
requireName("combine"),
combineArgs(aggState, timseries)
combineArgs(aggState)
);
switch (combine.getParameters().size()) {
case 2, 3 -> timseries = false;
case 4 -> {
if (false == TypeName.get(combine.getParameters().get(2).asType()).equals(TypeName.LONG)) {
throw new IllegalArgumentException("combine/4's third parameter must be long but was: " + combine);
}
timseries = true;
}
default -> throw new IllegalArgumentException("combine must have 2, 3, or 4 parameters but was: " + combine);
}
// TODO support multiple parameters
this.aggParam = AggregationParameter.create(combine.getParameters().getLast().asType());

Expand All @@ -135,21 +145,13 @@ public GroupingAggregatorImplementer(
this.intermediateState = Arrays.stream(interStateAnno)
.map(AggregatorImplementer.IntermediateStateDesc::newIntermediateStateDesc)
.toList();
this.timseries = timseries;
}

private static Methods.ArgumentMatcher combineArgs(AggregationState aggState, boolean includeTimestampVector) {
private static Methods.ArgumentMatcher combineArgs(AggregationState aggState) {
if (aggState.declaredType().isPrimitive()) {
return requireArgs(requireType(aggState.declaredType()), requireAnyType("<aggregation input column type>"));
} else if (includeTimestampVector) {
return requireArgs(
requireType(aggState.declaredType()),
requireType(TypeName.INT),
requireType(TypeName.LONG), // @timestamp
requireAnyType("<aggregation input column type>")
);
} else {
return requireArgs(
return requireArgsStartsWith(
requireType(aggState.declaredType()),
requireType(TypeName.INT),
requireAnyType("<aggregation input column type>")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ static ArgumentMatcher requireArgs(TypeMatcher... argTypes) {
);
}

static ArgumentMatcher requireArgsStartsWith(TypeMatcher... argTypes) {
return new ArgumentMatcher(
args -> args.size() >= argTypes.length && IntStream.range(0, argTypes.length).allMatch(i -> argTypes[i].test(args.get(i))),
Stream.of(argTypes).map(TypeMatcher::toString).collect(joining(", ")) + ", ..."
);
}

record NameMatcher(Set<String> names) implements Predicate<String> {
@Override
public boolean test(String name) {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.elasticsearch.core.Releasables;
* This class is generated. Edit `X-RateAggregator.java.st` instead.
*/
@GroupingAggregator(
timeseries = true,
value = {
@IntermediateState(name = "timestamps", type = "LONG_BLOCK"),
@IntermediateState(name = "values", type = "$TYPE$_BLOCK"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.elasticsearch.core.Releasables;
* This class is generated. Edit `X-ValueOverTimeAggregator.java.st` instead.
*/
@GroupingAggregator(
timeseries = true,
value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "$TYPE$_BLOCK") }
)
public class $Occurrence$OverTime$Type$Aggregator {
Expand Down