|
48 | 48 | import java.util.HashMap; |
49 | 49 | import java.util.List; |
50 | 50 | import java.util.Map; |
| 51 | +import java.util.Objects; |
51 | 52 | import java.util.stream.Collectors; |
52 | 53 | import java.util.stream.Stream; |
53 | 54 |
|
@@ -99,8 +100,7 @@ public AggDef withoutExtra() { |
99 | 100 |
|
100 | 101 | /** Map of AggDef types to intermediate named expressions. */ |
101 | 102 | private static final Map<AggDef, List<IntermediateStateDesc>> MAPPER = AGG_FUNCTIONS.stream() |
102 | | - .flatMap(AggregateMapper::typeAndNames) |
103 | | - .flatMap(AggregateMapper::groupingAndNonGrouping) |
| 103 | + .flatMap(AggregateMapper::aggDefs) |
104 | 104 | .collect(Collectors.toUnmodifiableMap(aggDef -> aggDef, AggregateMapper::lookupIntermediateState)); |
105 | 105 |
|
106 | 106 | /** Cache of aggregates to intermediate expressions. */ |
@@ -167,7 +167,7 @@ private static List<IntermediateStateDesc> getNonNull(AggDef aggDef) { |
167 | 167 | return l; |
168 | 168 | } |
169 | 169 |
|
170 | | - private static Stream<Tuple<Class<?>, Tuple<String, String>>> typeAndNames(Class<?> clazz) { |
| 170 | + private static Stream<AggDef> aggDefs(Class<?> clazz) { |
171 | 171 | List<String> types; |
172 | 172 | List<String> extraConfigs = List.of(""); |
173 | 173 | if (NumericAggregate.class.isAssignableFrom(clazz)) { |
@@ -197,32 +197,26 @@ private static Stream<Tuple<Class<?>, Tuple<String, String>>> typeAndNames(Class |
197 | 197 | assert false : "unknown aggregate type " + clazz; |
198 | 198 | throw new IllegalArgumentException("unknown aggregate type " + clazz); |
199 | 199 | } |
200 | | - return combine(clazz, types, extraConfigs); |
201 | | - } |
202 | | - |
203 | | - private static Stream<Tuple<Class<?>, Tuple<String, String>>> combine(Class<?> clazz, List<String> types, List<String> extraConfigs) { |
204 | | - return combinations(types, extraConfigs).map(combo -> new Tuple<>(clazz, combo)); |
| 200 | + return combinations(types, extraConfigs).flatMap(typeAndExtraConfig -> { |
| 201 | + var type = typeAndExtraConfig.v1(); |
| 202 | + var extra = typeAndExtraConfig.v2(); |
| 203 | + |
| 204 | + if (clazz.isAssignableFrom(Rate.class)) { |
| 205 | + // rate doesn't support non-grouping aggregations |
| 206 | + return Stream.of(new AggDef(clazz, type, extra, true)); |
| 207 | + } else if (Objects.equals(type, "AggregateMetricDouble")) { |
| 208 | + // TODO: support grouping aggregations for aggregate metric double |
| 209 | + return Stream.of(new AggDef(clazz, type, extra, false)); |
| 210 | + } else { |
| 211 | + return Stream.of(new AggDef(clazz, type, extra, true), new AggDef(clazz, type, extra, false)); |
| 212 | + } |
| 213 | + }); |
205 | 214 | } |
206 | 215 |
|
207 | 216 | private static Stream<Tuple<String, String>> combinations(List<String> types, List<String> extraConfigs) { |
208 | 217 | return types.stream().flatMap(type -> extraConfigs.stream().map(config -> new Tuple<>(type, config))); |
209 | 218 | } |
210 | 219 |
|
211 | | - private static Stream<AggDef> groupingAndNonGrouping(Tuple<Class<?>, Tuple<String, String>> tuple) { |
212 | | - if (tuple.v1().isAssignableFrom(Rate.class)) { |
213 | | - // rate doesn't support non-grouping aggregations |
214 | | - return Stream.of(new AggDef(tuple.v1(), tuple.v2().v1(), tuple.v2().v2(), true)); |
215 | | - } else if (tuple.v2().v1().equals("AggregateMetricDouble")) { |
216 | | - // TODO: support grouping aggregations for aggregate metric double |
217 | | - return Stream.of(new AggDef(tuple.v1(), tuple.v2().v1(), tuple.v2().v2(), false)); |
218 | | - } else { |
219 | | - return Stream.of( |
220 | | - new AggDef(tuple.v1(), tuple.v2().v1(), tuple.v2().v2(), true), |
221 | | - new AggDef(tuple.v1(), tuple.v2().v1(), tuple.v2().v2(), false) |
222 | | - ); |
223 | | - } |
224 | | - } |
225 | | - |
226 | 220 | /** Retrieves the intermediate state description for a given class, type, and grouping. */ |
227 | 221 | private static List<IntermediateStateDesc> lookupIntermediateState(AggDef aggDef) { |
228 | 222 | try { |
|
0 commit comments