Skip to content

Commit fac58ce

Browse files
authored
ESQL: Remove AggregateMapper reflection, and delegate intermediate state to suppliers (elastic#122023) (elastic#122380)
Manual 8.x backport of elastic#122023 To avoid having AggregateMapper find aggregators based on their names with reflection, I'm doing some changes: - Make the suppliers have methods returning the intermediate states - To allow this, the suppliers constructor won't receive the chanells as params. Instead, its methods will ask for them - Most changes in this PR are because of this - After those changes, I'm leaving AggregateMapper still there, as it still converts AggregateFunctions to its NamedExpressions
1 parent 0196787 commit fac58ce

File tree

192 files changed

+1398
-1011
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

192 files changed

+1398
-1011
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ private static Operator operator(DriverContext driverContext, String grouping, S
155155

156156
if (grouping.equals("none")) {
157157
return new AggregationOperator(
158-
List.of(supplier(op, dataType, filter, 0).aggregatorFactory(AggregatorMode.SINGLE).apply(driverContext)),
158+
List.of(supplier(op, dataType, filter).aggregatorFactory(AggregatorMode.SINGLE, List.of(0)).apply(driverContext)),
159159
driverContext
160160
);
161161
}
@@ -182,33 +182,33 @@ private static Operator operator(DriverContext driverContext, String grouping, S
182182
default -> throw new IllegalArgumentException("unsupported grouping [" + grouping + "]");
183183
};
184184
return new HashAggregationOperator(
185-
List.of(supplier(op, dataType, filter, groups.size()).groupingAggregatorFactory(AggregatorMode.SINGLE)),
185+
List.of(supplier(op, dataType, filter).groupingAggregatorFactory(AggregatorMode.SINGLE, List.of(groups.size()))),
186186
() -> BlockHash.build(groups, driverContext.blockFactory(), 16 * 1024, false),
187187
driverContext
188188
);
189189
}
190190

191-
private static AggregatorFunctionSupplier supplier(String op, String dataType, String filter, int dataChannel) {
191+
private static AggregatorFunctionSupplier supplier(String op, String dataType, String filter) {
192192
return filtered(switch (op) {
193-
case COUNT -> CountAggregatorFunction.supplier(List.of(dataChannel));
193+
case COUNT -> CountAggregatorFunction.supplier();
194194
case COUNT_DISTINCT -> switch (dataType) {
195-
case LONGS -> new CountDistinctLongAggregatorFunctionSupplier(List.of(dataChannel), 3000);
196-
case DOUBLES -> new CountDistinctDoubleAggregatorFunctionSupplier(List.of(dataChannel), 3000);
195+
case LONGS -> new CountDistinctLongAggregatorFunctionSupplier(3000);
196+
case DOUBLES -> new CountDistinctDoubleAggregatorFunctionSupplier(3000);
197197
default -> throw new IllegalArgumentException("unsupported data type [" + dataType + "]");
198198
};
199199
case MAX -> switch (dataType) {
200-
case LONGS -> new MaxLongAggregatorFunctionSupplier(List.of(dataChannel));
201-
case DOUBLES -> new MaxDoubleAggregatorFunctionSupplier(List.of(dataChannel));
200+
case LONGS -> new MaxLongAggregatorFunctionSupplier();
201+
case DOUBLES -> new MaxDoubleAggregatorFunctionSupplier();
202202
default -> throw new IllegalArgumentException("unsupported data type [" + dataType + "]");
203203
};
204204
case MIN -> switch (dataType) {
205-
case LONGS -> new MinLongAggregatorFunctionSupplier(List.of(dataChannel));
206-
case DOUBLES -> new MinDoubleAggregatorFunctionSupplier(List.of(dataChannel));
205+
case LONGS -> new MinLongAggregatorFunctionSupplier();
206+
case DOUBLES -> new MinDoubleAggregatorFunctionSupplier();
207207
default -> throw new IllegalArgumentException("unsupported data type [" + dataType + "]");
208208
};
209209
case SUM -> switch (dataType) {
210-
case LONGS -> new SumLongAggregatorFunctionSupplier(List.of(dataChannel));
211-
case DOUBLES -> new SumDoubleAggregatorFunctionSupplier(List.of(dataChannel));
210+
case LONGS -> new SumLongAggregatorFunctionSupplier();
211+
case DOUBLES -> new SumDoubleAggregatorFunctionSupplier();
212212
default -> throw new IllegalArgumentException("unsupported data type [" + dataType + "]");
213213
};
214214
default -> throw new IllegalArgumentException("unsupported op [" + op + "]");

x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorFunctionSupplierImplementer.java

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
import static org.elasticsearch.compute.gen.Types.AGGREGATOR_FUNCTION_SUPPLIER;
3333
import static org.elasticsearch.compute.gen.Types.DRIVER_CONTEXT;
34+
import static org.elasticsearch.compute.gen.Types.LIST_AGG_FUNC_DESC;
3435
import static org.elasticsearch.compute.gen.Types.LIST_INTEGER;
3536
import static org.elasticsearch.compute.gen.Types.STRING;
3637
import static org.elasticsearch.compute.gen.Types.WARNINGS;
@@ -67,7 +68,6 @@ public AggregatorFunctionSupplierImplementer(
6768
createParameters.addAll(groupingAggregatorImplementer.createParameters());
6869
}
6970
this.createParameters = new ArrayList<>(createParameters);
70-
this.createParameters.add(0, new Parameter(LIST_INTEGER, "channels"));
7171

7272
this.implementation = ClassName.get(
7373
elements.getPackageOf(declarationType).toString(),
@@ -99,11 +99,9 @@ private TypeSpec type() {
9999
}
100100
createParameters.stream().forEach(p -> p.declareField(builder));
101101
builder.addMethod(ctor());
102-
if (aggregatorImplementer != null) {
103-
builder.addMethod(aggregator());
104-
} else {
105-
builder.addMethod(unsupportedNonGroupingAggregator());
106-
}
102+
builder.addMethod(nonGroupingIntermediateStateDesc());
103+
builder.addMethod(groupingIntermediateStateDesc());
104+
builder.addMethod(aggregator());
107105
builder.addMethod(groupingAggregator());
108106
builder.addMethod(describe());
109107
return builder.build();
@@ -123,19 +121,43 @@ private MethodSpec ctor() {
123121
return builder.build();
124122
}
125123

126-
private MethodSpec unsupportedNonGroupingAggregator() {
127-
MethodSpec.Builder builder = MethodSpec.methodBuilder("aggregator")
128-
.addParameter(DRIVER_CONTEXT, "driverContext")
129-
.returns(Types.AGGREGATOR_FUNCTION);
124+
private MethodSpec nonGroupingIntermediateStateDesc() {
125+
MethodSpec.Builder builder = MethodSpec.methodBuilder("nonGroupingIntermediateStateDesc");
130126
builder.addAnnotation(Override.class).addModifiers(Modifier.PUBLIC);
131-
builder.addStatement("throw new UnsupportedOperationException($S)", "non-grouping aggregator is not supported");
127+
builder.returns(LIST_AGG_FUNC_DESC);
128+
129+
if (aggregatorImplementer == null) {
130+
builder.addStatement("throw new UnsupportedOperationException($S)", "non-grouping aggregator is not supported");
131+
return builder.build();
132+
}
133+
134+
builder.addStatement("return $T.intermediateStateDesc()", aggregatorImplementer.implementation());
135+
136+
return builder.build();
137+
}
138+
139+
private MethodSpec groupingIntermediateStateDesc() {
140+
MethodSpec.Builder builder = MethodSpec.methodBuilder("groupingIntermediateStateDesc");
141+
builder.addAnnotation(Override.class).addModifiers(Modifier.PUBLIC);
142+
builder.returns(LIST_AGG_FUNC_DESC);
143+
144+
builder.addStatement("return $T.intermediateStateDesc()", groupingAggregatorImplementer.implementation());
145+
132146
return builder.build();
133147
}
134148

135149
private MethodSpec aggregator() {
136150
MethodSpec.Builder builder = MethodSpec.methodBuilder("aggregator");
137151
builder.addAnnotation(Override.class).addModifiers(Modifier.PUBLIC);
138152
builder.addParameter(DRIVER_CONTEXT, "driverContext");
153+
builder.addParameter(LIST_INTEGER, "channels");
154+
155+
if (aggregatorImplementer == null) {
156+
builder.returns(Types.AGGREGATOR_FUNCTION);
157+
builder.addStatement("throw new UnsupportedOperationException($S)", "non-grouping aggregator is not supported");
158+
return builder.build();
159+
}
160+
139161
builder.returns(aggregatorImplementer.implementation());
140162

141163
if (hasWarnings) {
@@ -162,6 +184,7 @@ private MethodSpec groupingAggregator() {
162184
MethodSpec.Builder builder = MethodSpec.methodBuilder("groupingAggregator");
163185
builder.addAnnotation(Override.class).addModifiers(Modifier.PUBLIC);
164186
builder.addParameter(DRIVER_CONTEXT, "driverContext");
187+
builder.addParameter(LIST_INTEGER, "channels");
165188
builder.returns(groupingAggregatorImplementer.implementation());
166189

167190
if (hasWarnings) {

x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctBooleanAggregatorFunctionSupplier.java

Lines changed: 13 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctBytesRefAggregatorFunctionSupplier.java

Lines changed: 14 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctDoubleAggregatorFunctionSupplier.java

Lines changed: 14 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctFloatAggregatorFunctionSupplier.java

Lines changed: 14 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctIntAggregatorFunctionSupplier.java

Lines changed: 15 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctLongAggregatorFunctionSupplier.java

Lines changed: 15 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)