Skip to content

Commit 1782b57

Browse files
authored
ESQL: Remove AggregateMapper reflection, and delegate intermediate state to suppliers (#122023) (#122183)
To avoid having AggregateMapper find aggregators based on their names with reflection, I'm doing some changes: - Make the suppliers have methods returning the intermediate states - To allow this, the suppliers constructor won't receive the chanells as params. Instead, its methods will ask for them - Most changes in this PR are because of this - After those changes, I'm leaving AggregateMapper still there, as it still converts AggregateFunctions to its NamedExpressions (cherry picked from commit 7bea3a5) # Conflicts: # x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java
1 parent 546d184 commit 1782b57

File tree

192 files changed

+1398
-1014
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

192 files changed

+1398
-1014
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ private static Operator operator(DriverContext driverContext, String grouping, S
155155

156156
if (grouping.equals("none")) {
157157
return new AggregationOperator(
158-
List.of(supplier(op, dataType, filter, 0).aggregatorFactory(AggregatorMode.SINGLE).apply(driverContext)),
158+
List.of(supplier(op, dataType, filter).aggregatorFactory(AggregatorMode.SINGLE, List.of(0)).apply(driverContext)),
159159
driverContext
160160
);
161161
}
@@ -182,33 +182,33 @@ private static Operator operator(DriverContext driverContext, String grouping, S
182182
default -> throw new IllegalArgumentException("unsupported grouping [" + grouping + "]");
183183
};
184184
return new HashAggregationOperator(
185-
List.of(supplier(op, dataType, filter, groups.size()).groupingAggregatorFactory(AggregatorMode.SINGLE)),
185+
List.of(supplier(op, dataType, filter).groupingAggregatorFactory(AggregatorMode.SINGLE, List.of(groups.size()))),
186186
() -> BlockHash.build(groups, driverContext.blockFactory(), 16 * 1024, false),
187187
driverContext
188188
);
189189
}
190190

191-
private static AggregatorFunctionSupplier supplier(String op, String dataType, String filter, int dataChannel) {
191+
private static AggregatorFunctionSupplier supplier(String op, String dataType, String filter) {
192192
return filtered(switch (op) {
193-
case COUNT -> CountAggregatorFunction.supplier(List.of(dataChannel));
193+
case COUNT -> CountAggregatorFunction.supplier();
194194
case COUNT_DISTINCT -> switch (dataType) {
195-
case LONGS -> new CountDistinctLongAggregatorFunctionSupplier(List.of(dataChannel), 3000);
196-
case DOUBLES -> new CountDistinctDoubleAggregatorFunctionSupplier(List.of(dataChannel), 3000);
195+
case LONGS -> new CountDistinctLongAggregatorFunctionSupplier(3000);
196+
case DOUBLES -> new CountDistinctDoubleAggregatorFunctionSupplier(3000);
197197
default -> throw new IllegalArgumentException("unsupported data type [" + dataType + "]");
198198
};
199199
case MAX -> switch (dataType) {
200-
case LONGS -> new MaxLongAggregatorFunctionSupplier(List.of(dataChannel));
201-
case DOUBLES -> new MaxDoubleAggregatorFunctionSupplier(List.of(dataChannel));
200+
case LONGS -> new MaxLongAggregatorFunctionSupplier();
201+
case DOUBLES -> new MaxDoubleAggregatorFunctionSupplier();
202202
default -> throw new IllegalArgumentException("unsupported data type [" + dataType + "]");
203203
};
204204
case MIN -> switch (dataType) {
205-
case LONGS -> new MinLongAggregatorFunctionSupplier(List.of(dataChannel));
206-
case DOUBLES -> new MinDoubleAggregatorFunctionSupplier(List.of(dataChannel));
205+
case LONGS -> new MinLongAggregatorFunctionSupplier();
206+
case DOUBLES -> new MinDoubleAggregatorFunctionSupplier();
207207
default -> throw new IllegalArgumentException("unsupported data type [" + dataType + "]");
208208
};
209209
case SUM -> switch (dataType) {
210-
case LONGS -> new SumLongAggregatorFunctionSupplier(List.of(dataChannel));
211-
case DOUBLES -> new SumDoubleAggregatorFunctionSupplier(List.of(dataChannel));
210+
case LONGS -> new SumLongAggregatorFunctionSupplier();
211+
case DOUBLES -> new SumDoubleAggregatorFunctionSupplier();
212212
default -> throw new IllegalArgumentException("unsupported data type [" + dataType + "]");
213213
};
214214
default -> throw new IllegalArgumentException("unsupported op [" + op + "]");

x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorFunctionSupplierImplementer.java

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
import static org.elasticsearch.compute.gen.Types.AGGREGATOR_FUNCTION_SUPPLIER;
3333
import static org.elasticsearch.compute.gen.Types.DRIVER_CONTEXT;
34+
import static org.elasticsearch.compute.gen.Types.LIST_AGG_FUNC_DESC;
3435
import static org.elasticsearch.compute.gen.Types.LIST_INTEGER;
3536
import static org.elasticsearch.compute.gen.Types.STRING;
3637

@@ -66,7 +67,6 @@ public AggregatorFunctionSupplierImplementer(
6667
createParameters.addAll(groupingAggregatorImplementer.createParameters());
6768
}
6869
this.createParameters = new ArrayList<>(createParameters);
69-
this.createParameters.add(0, new Parameter(LIST_INTEGER, "channels"));
7070

7171
this.implementation = ClassName.get(
7272
elements.getPackageOf(declarationType).toString(),
@@ -98,11 +98,9 @@ private TypeSpec type() {
9898
}
9999
createParameters.stream().forEach(p -> p.declareField(builder));
100100
builder.addMethod(ctor());
101-
if (aggregatorImplementer != null) {
102-
builder.addMethod(aggregator());
103-
} else {
104-
builder.addMethod(unsupportedNonGroupingAggregator());
105-
}
101+
builder.addMethod(nonGroupingIntermediateStateDesc());
102+
builder.addMethod(groupingIntermediateStateDesc());
103+
builder.addMethod(aggregator());
106104
builder.addMethod(groupingAggregator());
107105
builder.addMethod(describe());
108106
return builder.build();
@@ -122,19 +120,43 @@ private MethodSpec ctor() {
122120
return builder.build();
123121
}
124122

125-
private MethodSpec unsupportedNonGroupingAggregator() {
126-
MethodSpec.Builder builder = MethodSpec.methodBuilder("aggregator")
127-
.addParameter(DRIVER_CONTEXT, "driverContext")
128-
.returns(Types.AGGREGATOR_FUNCTION);
123+
private MethodSpec nonGroupingIntermediateStateDesc() {
124+
MethodSpec.Builder builder = MethodSpec.methodBuilder("nonGroupingIntermediateStateDesc");
129125
builder.addAnnotation(Override.class).addModifiers(Modifier.PUBLIC);
130-
builder.addStatement("throw new UnsupportedOperationException($S)", "non-grouping aggregator is not supported");
126+
builder.returns(LIST_AGG_FUNC_DESC);
127+
128+
if (aggregatorImplementer == null) {
129+
builder.addStatement("throw new UnsupportedOperationException($S)", "non-grouping aggregator is not supported");
130+
return builder.build();
131+
}
132+
133+
builder.addStatement("return $T.intermediateStateDesc()", aggregatorImplementer.implementation());
134+
135+
return builder.build();
136+
}
137+
138+
private MethodSpec groupingIntermediateStateDesc() {
139+
MethodSpec.Builder builder = MethodSpec.methodBuilder("groupingIntermediateStateDesc");
140+
builder.addAnnotation(Override.class).addModifiers(Modifier.PUBLIC);
141+
builder.returns(LIST_AGG_FUNC_DESC);
142+
143+
builder.addStatement("return $T.intermediateStateDesc()", groupingAggregatorImplementer.implementation());
144+
131145
return builder.build();
132146
}
133147

134148
private MethodSpec aggregator() {
135149
MethodSpec.Builder builder = MethodSpec.methodBuilder("aggregator");
136150
builder.addAnnotation(Override.class).addModifiers(Modifier.PUBLIC);
137151
builder.addParameter(DRIVER_CONTEXT, "driverContext");
152+
builder.addParameter(LIST_INTEGER, "channels");
153+
154+
if (aggregatorImplementer == null) {
155+
builder.returns(Types.AGGREGATOR_FUNCTION);
156+
builder.addStatement("throw new UnsupportedOperationException($S)", "non-grouping aggregator is not supported");
157+
return builder.build();
158+
}
159+
138160
builder.returns(aggregatorImplementer.implementation());
139161

140162
if (hasWarnings) {
@@ -160,6 +182,7 @@ private MethodSpec groupingAggregator() {
160182
MethodSpec.Builder builder = MethodSpec.methodBuilder("groupingAggregator");
161183
builder.addAnnotation(Override.class).addModifiers(Modifier.PUBLIC);
162184
builder.addParameter(DRIVER_CONTEXT, "driverContext");
185+
builder.addParameter(LIST_INTEGER, "channels");
163186
builder.returns(groupingAggregatorImplementer.implementation());
164187

165188
if (hasWarnings) {

x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctBooleanAggregatorFunctionSupplier.java

Lines changed: 13 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctBytesRefAggregatorFunctionSupplier.java

Lines changed: 14 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctDoubleAggregatorFunctionSupplier.java

Lines changed: 14 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctFloatAggregatorFunctionSupplier.java

Lines changed: 14 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctIntAggregatorFunctionSupplier.java

Lines changed: 15 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctLongAggregatorFunctionSupplier.java

Lines changed: 15 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)