Skip to content

Commit 9a3f801

Browse files
committed
ESQL: Detect time series agg
This replaces the explicit tagging of `timeseries = true` on time series aggregations with a scan of the `combine` method. This is a step towards supporting many inputs to aggs universally.
1 parent 4be260a commit 9a3f801

20 files changed

+37
-57
lines changed

x-pack/plugin/esql/compute/ann/src/main/java/org/elasticsearch/compute/ann/Aggregator.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,4 @@
5454
* into a warning and turn into a null value.
5555
*/
5656
Class<? extends Exception>[] warnExceptions() default {};
57-
58-
/**
59-
* If {@code true} then the @timestamp LongVector will be appended to the input blocks of the aggregation function.
60-
*/
61-
boolean includeTimestamps() default false;
6257
}

x-pack/plugin/esql/compute/ann/src/main/java/org/elasticsearch/compute/ann/GroupingAggregator.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,4 @@
2727
* into a warning and turn into a null value.
2828
*/
2929
Class<? extends Exception>[] warnExceptions() default {};
30-
31-
/**
32-
* {@code true} if this is a time-series aggregation
33-
*/
34-
boolean timeseries() default false;
3530
}

x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorImplementer.java

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import static org.elasticsearch.compute.gen.Methods.requireAnyArgs;
3838
import static org.elasticsearch.compute.gen.Methods.requireAnyType;
3939
import static org.elasticsearch.compute.gen.Methods.requireArgs;
40+
import static org.elasticsearch.compute.gen.Methods.requireArgsStartsWith;
4041
import static org.elasticsearch.compute.gen.Methods.requireName;
4142
import static org.elasticsearch.compute.gen.Methods.requirePrimitiveOrImplements;
4243
import static org.elasticsearch.compute.gen.Methods.requireStaticMethod;
@@ -87,8 +88,7 @@ public AggregatorImplementer(
8788
Elements elements,
8889
TypeElement declarationType,
8990
IntermediateState[] interStateAnno,
90-
List<TypeMirror> warnExceptions,
91-
boolean includeTimestampVector
91+
List<TypeMirror> warnExceptions
9292
) {
9393
this.declarationType = declarationType;
9494
this.warnExceptions = warnExceptions;
@@ -105,8 +105,18 @@ public AggregatorImplementer(
105105
declarationType,
106106
aggState.declaredType().isPrimitive() ? requireType(aggState.declaredType()) : requireVoidType(),
107107
requireName("combine"),
108-
combineArgs(aggState, includeTimestampVector)
108+
requireArgsStartsWith(requireType(aggState.declaredType()), requireAnyType("<aggregation input column type>"))
109109
);
110+
switch (combine.getParameters().size()) {
111+
case 2 -> includeTimestampVector = false;
112+
case 3 -> {
113+
if (false == TypeName.get(combine.getParameters().get(1).asType()).equals(TypeName.LONG)) {
114+
throw new IllegalArgumentException("combine/3's second parameter must be long but was: " + combine);
115+
}
116+
includeTimestampVector = true;
117+
}
118+
default -> throw new IllegalArgumentException("combine must have 2 or 3 parameters but was: " + combine);
119+
}
110120
// TODO support multiple parameters
111121
this.aggParam = AggregationParameter.create(combine.getParameters().getLast().asType());
112122

@@ -121,19 +131,6 @@ public AggregatorImplementer(
121131
(declarationType.getSimpleName() + "AggregatorFunction").replace("AggregatorAggregator", "Aggregator")
122132
);
123133
this.intermediateState = Arrays.stream(interStateAnno).map(IntermediateStateDesc::newIntermediateStateDesc).toList();
124-
this.includeTimestampVector = includeTimestampVector;
125-
}
126-
127-
private static Methods.ArgumentMatcher combineArgs(AggregationState aggState, boolean includeTimestampVector) {
128-
if (includeTimestampVector) {
129-
return requireArgs(
130-
requireType(aggState.declaredType()),
131-
requireType(TypeName.LONG), // @timestamp
132-
requireAnyType("<aggregation input column type>")
133-
);
134-
} else {
135-
return requireArgs(requireType(aggState.declaredType()), requireAnyType("<aggregation input column type>"));
136-
}
137134
}
138135

139136
ClassName implementation() {

x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorProcessor.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,7 @@ public boolean process(Set<? extends TypeElement> set, RoundEnvironment roundEnv
9191
env.getElementUtils(),
9292
aggClass,
9393
intermediateState,
94-
warnExceptionsTypes,
95-
aggClass.getAnnotation(Aggregator.class).includeTimestamps()
94+
warnExceptionsTypes
9695
);
9796
write(aggClass, "aggregator", implementer.sourceFile(), env);
9897
}
@@ -106,8 +105,7 @@ public boolean process(Set<? extends TypeElement> set, RoundEnvironment roundEnv
106105
env.getElementUtils(),
107106
aggClass,
108107
intermediateState,
109-
warnExceptionsTypes,
110-
aggClass.getAnnotation(GroupingAggregator.class).timeseries()
108+
warnExceptionsTypes
111109
);
112110
write(aggClass, "grouping aggregator", groupingAggregatorImplementer.sourceFile(), env);
113111
}

x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/GroupingAggregatorImplementer.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import static org.elasticsearch.compute.gen.Methods.requireAnyArgs;
4141
import static org.elasticsearch.compute.gen.Methods.requireAnyType;
4242
import static org.elasticsearch.compute.gen.Methods.requireArgs;
43+
import static org.elasticsearch.compute.gen.Methods.requireArgsStartsWith;
4344
import static org.elasticsearch.compute.gen.Methods.requireName;
4445
import static org.elasticsearch.compute.gen.Methods.requirePrimitiveOrImplements;
4546
import static org.elasticsearch.compute.gen.Methods.requireStaticMethod;
@@ -98,8 +99,7 @@ public GroupingAggregatorImplementer(
9899
Elements elements,
99100
TypeElement declarationType,
100101
IntermediateState[] interStateAnno,
101-
List<TypeMirror> warnExceptions,
102-
boolean timseries
102+
List<TypeMirror> warnExceptions
103103
) {
104104
this.declarationType = declarationType;
105105
this.warnExceptions = warnExceptions;
@@ -116,8 +116,18 @@ public GroupingAggregatorImplementer(
116116
declarationType,
117117
aggState.declaredType().isPrimitive() ? requireType(aggState.declaredType()) : requireVoidType(),
118118
requireName("combine"),
119-
combineArgs(aggState, timseries)
119+
combineArgs(aggState)
120120
);
121+
switch (combine.getParameters().size()) {
122+
case 2, 3 -> timseries = false;
123+
case 4 -> {
124+
if (false == TypeName.get(combine.getParameters().get(2).asType()).equals(TypeName.LONG)) {
125+
throw new IllegalArgumentException("combine/4's third parameter must be long but was: " + combine);
126+
}
127+
timseries = true;
128+
}
129+
default -> throw new IllegalArgumentException("combine must have 2, 3, or 4 parameters but was: " + combine);
130+
}
121131
// TODO support multiple parameters
122132
this.aggParam = AggregationParameter.create(combine.getParameters().getLast().asType());
123133

@@ -135,21 +145,13 @@ public GroupingAggregatorImplementer(
135145
this.intermediateState = Arrays.stream(interStateAnno)
136146
.map(AggregatorImplementer.IntermediateStateDesc::newIntermediateStateDesc)
137147
.toList();
138-
this.timseries = timseries;
139148
}
140149

141-
private static Methods.ArgumentMatcher combineArgs(AggregationState aggState, boolean includeTimestampVector) {
150+
private static Methods.ArgumentMatcher combineArgs(AggregationState aggState) {
142151
if (aggState.declaredType().isPrimitive()) {
143152
return requireArgs(requireType(aggState.declaredType()), requireAnyType("<aggregation input column type>"));
144-
} else if (includeTimestampVector) {
145-
return requireArgs(
146-
requireType(aggState.declaredType()),
147-
requireType(TypeName.INT),
148-
requireType(TypeName.LONG), // @timestamp
149-
requireAnyType("<aggregation input column type>")
150-
);
151153
} else {
152-
return requireArgs(
154+
return requireArgsStartsWith(
153155
requireType(aggState.declaredType()),
154156
requireType(TypeName.INT),
155157
requireAnyType("<aggregation input column type>")

x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/Methods.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,13 @@ static ArgumentMatcher requireArgs(TypeMatcher... argTypes) {
123123
);
124124
}
125125

126+
static ArgumentMatcher requireArgsStartsWith(TypeMatcher... argTypes) {
127+
return new ArgumentMatcher(
128+
args -> args.size() >= argTypes.length && IntStream.range(0, argTypes.length).allMatch(i -> argTypes[i].test(args.get(i))),
129+
Stream.of(argTypes).map(TypeMatcher::toString).collect(joining(", ")) + ", ..."
130+
);
131+
}
132+
126133
record NameMatcher(Set<String> names) implements Predicate<String> {
127134
@Override
128135
public boolean test(String name) {

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeDoubleAggregator.java

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeFloatAggregator.java

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeIntAggregator.java

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/FirstOverTimeLongAggregator.java

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)