Skip to content

Commit b006058

Browse files
committed
Fixed ES and Phoenix Aggregate Tests
1 parent 133a894 commit b006058

File tree

7 files changed

+378
-8
lines changed

7 files changed

+378
-8
lines changed

contrib/storage-drill/src/test/java/org/apache/drill/exec/store/drill/plugin/DrillPluginQueriesTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ public void testAggregationPushDown() throws Exception {
223223
queryBuilder()
224224
.sql(query, TABLE_NAME)
225225
.planMatcher()
226-
.include("query=\"SELECT COUNT\\(\\*\\)")
226+
.include("query=\"SELECT COUNT\\(")
227227
.match();
228228

229229
testBuilder()

contrib/storage-elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/CalciteUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
public class CalciteUtils {
4040

4141
private static final List<String> BANNED_RULES =
42-
Arrays.asList("ElasticsearchProjectRule", "ElasticsearchFilterRule");
42+
Arrays.asList("ElasticsearchProjectRule", "ElasticsearchFilterRule", "ElasticsearchAggregateRule");
4343

4444
public static final Predicate<RelOptRule> RULE_PREDICATE =
4545
relOptRule -> BANNED_RULES.stream()
@@ -61,6 +61,8 @@ public static Set<RelOptRule> elasticSearchRules() {
6161
rules.add(ELASTIC_DREL_CONVERTER_RULE);
6262
rules.add(ElasticsearchProjectRule.INSTANCE);
6363
rules.add(ElasticsearchFilterRule.INSTANCE);
64+
rules.add(ElasticsearchAggregateRule.INSTANCE);
65+
rules.add(ElasticsearchAggregateRule.DRILL_LOGICAL_INSTANCE);
6466
return rules;
6567
}
6668

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.calcite.adapter.elasticsearch;
19+
20+
import org.apache.calcite.plan.Convention;
21+
import org.apache.calcite.plan.RelOptRuleCall;
22+
import org.apache.calcite.plan.RelTraitSet;
23+
import org.apache.calcite.rel.InvalidRelException;
24+
import org.apache.calcite.rel.RelNode;
25+
import org.apache.calcite.rel.convert.ConverterRule;
26+
import org.apache.calcite.rel.core.Aggregate;
27+
import org.apache.calcite.rel.core.AggregateCall;
28+
import org.apache.calcite.rel.logical.LogicalAggregate;
29+
import org.apache.calcite.sql.SqlKind;
30+
import org.apache.calcite.sql.SqlSyntax;
31+
import org.apache.calcite.util.Optionality;
32+
import org.apache.drill.exec.planner.logical.DrillRel;
33+
import org.apache.drill.exec.planner.logical.DrillRelFactories;
34+
import org.apache.drill.exec.planner.sql.DrillSqlAggOperator;
35+
36+
import java.util.ArrayList;
37+
import java.util.HashMap;
38+
import java.util.List;
39+
import java.util.Map;
40+
import java.util.function.Predicate;
41+
42+
/**
43+
* Rule to convert a {@link org.apache.calcite.rel.logical.LogicalAggregate} to an
44+
* {@link org.apache.calcite.adapter.elasticsearch.ElasticsearchAggregate}.
45+
* Matches aggregates with inputs in either Convention.NONE or DrillRel.DRILL_LOGICAL.
46+
*/
47+
public class ElasticsearchAggregateRule extends ConverterRule {
48+
49+
public static final ElasticsearchAggregateRule INSTANCE = ((ConverterRule.Config) Config.INSTANCE
50+
.withConversion(LogicalAggregate.class, (Predicate<RelNode>) r -> true,
51+
Convention.NONE, ElasticsearchRel.CONVENTION, "ElasticsearchAggregateRule:NONE")
52+
.withRelBuilderFactory(DrillRelFactories.LOGICAL_BUILDER)
53+
.as(Config.class))
54+
.withRuleFactory(ElasticsearchAggregateRule::new)
55+
.toRule(ElasticsearchAggregateRule.class);
56+
57+
public static final ElasticsearchAggregateRule DRILL_LOGICAL_INSTANCE = ((ConverterRule.Config) Config.INSTANCE
58+
.withConversion(LogicalAggregate.class, (Predicate<RelNode>) r -> true,
59+
DrillRel.DRILL_LOGICAL, ElasticsearchRel.CONVENTION, "ElasticsearchAggregateRule:DRILL_LOGICAL")
60+
.withRelBuilderFactory(DrillRelFactories.LOGICAL_BUILDER)
61+
.as(Config.class))
62+
.withRuleFactory(ElasticsearchAggregateRule::new)
63+
.toRule(ElasticsearchAggregateRule.class);
64+
65+
private static final Map<String, SqlKind> DRILL_AGG_TO_SQL_KIND = new HashMap<>();
66+
static {
67+
DRILL_AGG_TO_SQL_KIND.put("COUNT", SqlKind.COUNT);
68+
DRILL_AGG_TO_SQL_KIND.put("SUM", SqlKind.SUM);
69+
DRILL_AGG_TO_SQL_KIND.put("MIN", SqlKind.MIN);
70+
DRILL_AGG_TO_SQL_KIND.put("MAX", SqlKind.MAX);
71+
DRILL_AGG_TO_SQL_KIND.put("AVG", SqlKind.AVG);
72+
DRILL_AGG_TO_SQL_KIND.put("ANY_VALUE", SqlKind.ANY_VALUE);
73+
}
74+
75+
public ElasticsearchAggregateRule(ConverterRule.Config config) {
76+
super(config);
77+
}
78+
79+
/**
80+
* Wrapper for DrillSqlAggOperator that overrides getKind() to return the correct SqlKind
81+
* based on the function name instead of OTHER_FUNCTION.
82+
*/
83+
private static class DrillSqlAggOperatorWrapper extends org.apache.calcite.sql.SqlAggFunction {
84+
private final DrillSqlAggOperator wrapped;
85+
private final SqlKind kind;
86+
private final boolean isCount;
87+
88+
public DrillSqlAggOperatorWrapper(DrillSqlAggOperator wrapped, SqlKind kind) {
89+
super(wrapped.getName(), wrapped.getSqlIdentifier(), kind,
90+
wrapped.getReturnTypeInference(), wrapped.getOperandTypeInference(),
91+
wrapped.getOperandTypeChecker(), wrapped.getFunctionType(),
92+
wrapped.requiresOrder(), wrapped.requiresOver(), Optionality.FORBIDDEN);
93+
this.wrapped = wrapped;
94+
this.kind = kind;
95+
this.isCount = kind == SqlKind.COUNT;
96+
}
97+
98+
@Override
99+
public SqlKind getKind() {
100+
return kind;
101+
}
102+
103+
@Override
104+
public SqlSyntax getSyntax() {
105+
// COUNT with zero arguments should use FUNCTION_STAR syntax for COUNT(*)
106+
if (isCount) {
107+
return SqlSyntax.FUNCTION_STAR;
108+
}
109+
return super.getSyntax();
110+
}
111+
}
112+
113+
/**
114+
* Transform aggregate calls that use DrillSqlAggOperator (which has SqlKind.OTHER_FUNCTION)
115+
* to use a wrapped version with the correct SqlKind based on the function name.
116+
* This is needed because ElasticsearchAggregate validates aggregates by SqlKind, but
117+
* DrillSqlAggOperator always uses SqlKind.OTHER_FUNCTION.
118+
*/
119+
private List<AggregateCall> transformDrillAggCalls(List<AggregateCall> aggCalls, Aggregate agg) {
120+
List<AggregateCall> transformed = new ArrayList<>();
121+
for (AggregateCall aggCall : aggCalls) {
122+
if (aggCall.getAggregation() instanceof DrillSqlAggOperator) {
123+
String funcName = aggCall.getAggregation().getName().toUpperCase();
124+
SqlKind kind = DRILL_AGG_TO_SQL_KIND.get(funcName);
125+
if (kind != null) {
126+
// Wrap the DrillSqlAggOperator with the correct SqlKind
127+
DrillSqlAggOperatorWrapper wrappedOp = new DrillSqlAggOperatorWrapper(
128+
(DrillSqlAggOperator) aggCall.getAggregation(), kind);
129+
130+
// Create a new AggregateCall with the wrapped operator
131+
AggregateCall newCall = AggregateCall.create(
132+
wrappedOp,
133+
aggCall.isDistinct(),
134+
aggCall.isApproximate(),
135+
aggCall.ignoreNulls(),
136+
aggCall.getArgList(),
137+
aggCall.filterArg,
138+
aggCall.distinctKeys,
139+
aggCall.collation,
140+
agg.getGroupCount(),
141+
agg.getInput(),
142+
aggCall.type,
143+
aggCall.name
144+
);
145+
transformed.add(newCall);
146+
} else {
147+
transformed.add(aggCall);
148+
}
149+
} else {
150+
transformed.add(aggCall);
151+
}
152+
}
153+
return transformed;
154+
}
155+
156+
@Override
157+
public RelNode convert(RelNode rel) {
158+
Aggregate agg = (Aggregate) rel;
159+
RelTraitSet traitSet = agg.getTraitSet().replace(out);
160+
161+
// Transform DrillSqlAggOperator calls to have correct SqlKind
162+
List<AggregateCall> transformedCalls = transformDrillAggCalls(agg.getAggCallList(), agg);
163+
164+
try {
165+
return new org.apache.calcite.adapter.elasticsearch.ElasticsearchAggregate(
166+
agg.getCluster(),
167+
traitSet,
168+
convert(agg.getInput(), traitSet.simplify()),
169+
agg.getGroupSet(),
170+
agg.getGroupSets(),
171+
transformedCalls);
172+
} catch (InvalidRelException e) {
173+
return null;
174+
}
175+
}
176+
177+
@Override
178+
public boolean matches(RelOptRuleCall call) {
179+
Aggregate agg = call.rel(0);
180+
// Only single group sets are supported
181+
if (agg.getGroupSets().size() != 1) {
182+
return false;
183+
}
184+
return super.matches(call);
185+
}
186+
}

contrib/storage-elasticsearch/src/test/java/org/apache/drill/exec/store/elasticsearch/ElasticSearchPlanTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,11 +135,10 @@ public void testFilterPushDownWithJoin() throws Exception {
135135

136136
@Test
137137
public void testAggregationPushDown() throws Exception {
138-
// Calcite 1.35: Aggregate pushdown behavior changed, aggregates are handled by Drill
139138
queryBuilder()
140139
.sql("select count(*) from elastic.`nation`")
141140
.planMatcher()
142-
.include("StreamAgg")
141+
.include("ElasticsearchAggregate")
143142
.match();
144143
}
145144

@@ -154,11 +153,10 @@ public void testLimitWithSortPushDown() throws Exception {
154153

155154
@Test
156155
public void testAggregationWithGroupByPushDown() throws Exception {
157-
// Calcite 1.35: Aggregate pushdown behavior changed, aggregates are handled by Drill
158156
queryBuilder()
159157
.sql("select sum(n_nationkey) from elastic.`nation` group by n_regionkey")
160158
.planMatcher()
161-
.include("HashAgg")
159+
.include("ElasticsearchAggregate")
162160
.match();
163161
}
164162
}

contrib/storage-phoenix/src/main/java/org/apache/drill/exec/store/phoenix/PhoenixStoragePlugin.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ public Set<? extends RelOptRule> getOptimizerRules(
9595
PlannerPhase phase
9696
) {
9797
switch (phase) {
98+
case LOGICAL:
9899
case PHYSICAL:
99100
return convention.getRules();
100101
default:

0 commit comments

Comments
 (0)