Skip to content

Commit 35542a1

Browse files
authored
Implement SELECT DISTINCT in TableModel
1 parent 12e1fb2 commit 35542a1

File tree

7 files changed

+420
-1
lines changed

7 files changed

+420
-1
lines changed

integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,9 @@ public static void tearDown() throws Exception {
125125
EnvFactory.getEnv().cleanClusterEnvironment();
126126
}
127127

128+
// ==================================================================
129+
// ==================== Normal Aggregation Test =====================
130+
// ==================================================================
128131
@Test
129132
public void countTest() {
130133
String[] expectedHeader = new String[] {"_col0"};
@@ -3745,4 +3748,130 @@ public void exceptionTest() {
37453748
"701: Aggregate functions [last_by] should only have three arguments",
37463749
DATABASE_NAME);
37473750
}
3751+
3752+
// ==================================================================
3753+
// ===================== Select Distinct Test =======================
3754+
// ==================================================================
3755+
3756+
// Select distinct is a special kind of aggregate query in actual, so we put ITs here to reuse the
3757+
// test data.
3758+
3759+
@Test
3760+
public void simpleTest() {
3761+
String[] expectedHeader = new String[] {"s1"};
3762+
String[] retArray = new String[] {"30,", "36,", "40,", "41,", "55,", "null,"};
3763+
tableResultSetEqualTest(
3764+
"select distinct s1 from table1 order by s1", expectedHeader, retArray, DATABASE_NAME);
3765+
3766+
expectedHeader = new String[] {"region", "s1"};
3767+
retArray =
3768+
new String[] {
3769+
"chaoyang,30,",
3770+
"chaoyang,36,",
3771+
"chaoyang,40,",
3772+
"chaoyang,41,",
3773+
"chaoyang,55,",
3774+
"chaoyang,null,",
3775+
"haidian,30,",
3776+
"haidian,36,",
3777+
"haidian,40,",
3778+
"haidian,41,",
3779+
"haidian,55,",
3780+
"haidian,null,",
3781+
"huangpu,30,",
3782+
"huangpu,36,",
3783+
"huangpu,40,",
3784+
"huangpu,41,",
3785+
"huangpu,55,",
3786+
"huangpu,null,",
3787+
"pudong,30,",
3788+
"pudong,36,",
3789+
"pudong,40,",
3790+
"pudong,41,",
3791+
"pudong,55,",
3792+
"pudong,null,"
3793+
};
3794+
tableResultSetEqualTest(
3795+
"select distinct region, s1 from table1 order by region, s1",
3796+
expectedHeader,
3797+
retArray,
3798+
DATABASE_NAME);
3799+
3800+
// show all devices
3801+
expectedHeader = new String[] {"province", "city", "region", "device_id"};
3802+
retArray =
3803+
new String[] {
3804+
"beijing,beijing,chaoyang,d09,",
3805+
"beijing,beijing,chaoyang,d10,",
3806+
"beijing,beijing,chaoyang,d11,",
3807+
"beijing,beijing,chaoyang,d12,",
3808+
"beijing,beijing,haidian,d13,",
3809+
"beijing,beijing,haidian,d14,",
3810+
"beijing,beijing,haidian,d15,",
3811+
"beijing,beijing,haidian,d16,",
3812+
"shanghai,shanghai,huangpu,d01,",
3813+
"shanghai,shanghai,huangpu,d02,",
3814+
"shanghai,shanghai,huangpu,d03,",
3815+
"shanghai,shanghai,huangpu,d04,",
3816+
"shanghai,shanghai,pudong,d05,",
3817+
"shanghai,shanghai,pudong,d06,",
3818+
"shanghai,shanghai,pudong,d07,",
3819+
"shanghai,shanghai,pudong,d08,",
3820+
};
3821+
tableResultSetEqualTest(
3822+
"select distinct province,city,region,device_id from table1 order by province,city,region,device_id",
3823+
expectedHeader,
3824+
retArray,
3825+
DATABASE_NAME);
3826+
}
3827+
3828+
@Test
3829+
public void withGroupByTest() {
3830+
String[] expectedHeader = new String[] {"s1"};
3831+
String[] retArray = new String[] {"30,", "36,", "40,", "41,", "55,", "null,"};
3832+
tableResultSetEqualTest(
3833+
"select distinct s1 from table1 group by s1 order by s1",
3834+
expectedHeader,
3835+
retArray,
3836+
DATABASE_NAME);
3837+
tableResultSetEqualTest(
3838+
"select distinct s1 from table1 group by s1,s2 order by s1",
3839+
expectedHeader,
3840+
retArray,
3841+
DATABASE_NAME);
3842+
3843+
expectedHeader = new String[] {"_col0"};
3844+
retArray = new String[] {"30.0,", "36.0,", "40.0,", "41.0,", "55.0,", "null,"};
3845+
tableResultSetEqualTest(
3846+
"select distinct avg(s1) from table1 group by s1 order by 1",
3847+
expectedHeader,
3848+
retArray,
3849+
DATABASE_NAME);
3850+
tableResultSetEqualTest(
3851+
"select distinct avg(s1) from table1 group by s1,s2 order by 1",
3852+
expectedHeader,
3853+
retArray,
3854+
DATABASE_NAME);
3855+
3856+
retArray = new String[] {"4,", "8,", "32,"};
3857+
tableResultSetEqualTest(
3858+
"select distinct count(*) from table1 group by s1 order by 1",
3859+
expectedHeader,
3860+
retArray,
3861+
DATABASE_NAME);
3862+
retArray = new String[] {"4,", "8,"};
3863+
tableResultSetEqualTest(
3864+
"select distinct count(*) from table1 group by s1, s2 order by 1",
3865+
expectedHeader,
3866+
retArray,
3867+
DATABASE_NAME);
3868+
}
3869+
3870+
@Test
3871+
public void exceptionTest1() {
3872+
tableAssertTestFail(
3873+
"select distinct s1 from table1 order by s2",
3874+
"701: For SELECT DISTINCT, ORDER BY expressions must appear in select list",
3875+
DATABASE_NAME);
3876+
}
37483877
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ public class Analysis implements IAnalysis {
139139
private final Map<NodeRef<Offset>, Long> offset = new LinkedHashMap<>();
140140
private final Map<NodeRef<Node>, OptionalLong> limit = new LinkedHashMap<>();
141141
private final Map<NodeRef<AllColumns>, List<Field>> selectAllResultFields = new LinkedHashMap<>();
142+
private boolean containsSelectDistinct;
142143

143144
private final Map<NodeRef<Join>, Expression> joins = new LinkedHashMap<>();
144145
private final Map<NodeRef<Join>, JoinUsingAnalysis> joinUsing = new LinkedHashMap<>();
@@ -420,7 +421,7 @@ public boolean isAggregation(QuerySpecification node) {
420421
}
421422

422423
public boolean containsAggregationQuery() {
423-
return !groupingSets.isEmpty();
424+
return !groupingSets.isEmpty() || containsSelectDistinct;
424425
}
425426

426427
public GroupingSetAnalysis getGroupingSets(QuerySpecification node) {
@@ -502,6 +503,10 @@ public List<SelectExpression> getSelectExpressions(Node node) {
502503
return selectExpressions.get(NodeRef.of(node));
503504
}
504505

506+
public void setContainsSelectDistinct() {
507+
this.containsSelectDistinct = true;
508+
}
509+
505510
public void setHaving(QuerySpecification node, Expression expression) {
506511
having.put(NodeRef.of(node), expression);
507512
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,6 +1041,10 @@ private List<Expression> analyzeSelect(QuerySpecification node, Scope scope) {
10411041
}
10421042
analysis.setSelectExpressions(node, selectExpressionBuilder.build());
10431043

1044+
if (node.getSelect().isDistinct()) {
1045+
analysis.setContainsSelectDistinct();
1046+
}
1047+
10441048
return outputExpressionBuilder.build();
10451049
}
10461050

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@
9292
import static org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator.GROUP_KEY_SUFFIX;
9393
import static org.apache.iotdb.db.queryengine.plan.relational.planner.ir.GapFillStartAndEndTimeExtractVisitor.CAN_NOT_INFER_TIME_RANGE;
9494
import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.groupingSets;
95+
import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.singleAggregation;
96+
import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.singleGroupingSet;
9597

9698
public class QueryPlanner {
9799
private final Analysis analysis;
@@ -260,6 +262,7 @@ public RelationPlan plan(QuerySpecification node) {
260262
Iterables.concat(orderBy, outputs), symbolAllocator, queryContext);
261263
}
262264

265+
builder = distinct(builder, node, outputs);
263266
Optional<OrderingScheme> orderingScheme =
264267
orderingScheme(builder, node.getOrderBy(), analysis.getOrderByExpressions(node));
265268
builder = sort(builder, orderingScheme);
@@ -801,6 +804,23 @@ private PlanBuilder fillGroup(
801804
queryIdAllocator.genPlanNodeId(), subPlan.getRoot(), orderingScheme, false, false));
802805
}
803806

807+
private PlanBuilder distinct(
808+
PlanBuilder subPlan, QuerySpecification node, List<Expression> expressions) {
809+
if (node.getSelect().isDistinct()) {
810+
List<Symbol> symbols =
811+
expressions.stream().map(subPlan::translate).collect(Collectors.toList());
812+
813+
return subPlan.withNewRoot(
814+
singleAggregation(
815+
queryIdAllocator.genPlanNodeId(),
816+
subPlan.getRoot(),
817+
ImmutableMap.of(),
818+
singleGroupingSet(symbols)));
819+
}
820+
821+
return subPlan;
822+
}
823+
804824
private Optional<OrderingScheme> orderingScheme(
805825
PlanBuilder subPlan, Optional<OrderBy> orderBy, List<Expression> orderByExpressions) {
806826
if (!orderBy.isPresent() || (analysis.isOrderByRedundant(orderBy.get()))) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
21+
22+
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
23+
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
24+
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Lookup;
25+
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
26+
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
27+
import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
28+
import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
29+
30+
import com.google.common.collect.ImmutableList;
31+
32+
import java.util.List;
33+
34+
import static com.google.common.collect.ImmutableList.toImmutableList;
35+
import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.ChildReplacer.replaceChildren;
36+
import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.aggregation;
37+
38+
public class PruneDistinctAggregation implements Rule<AggregationNode> {
39+
private static final Pattern<AggregationNode> PATTERN =
40+
aggregation().matching(PruneDistinctAggregation::isDistinctOperator);
41+
42+
@Override
43+
public Pattern<AggregationNode> getPattern() {
44+
return PATTERN;
45+
}
46+
47+
@Override
48+
public Result apply(AggregationNode node, Captures captures, Context context) {
49+
Lookup lookup = context.getLookup();
50+
DistinctAggregationRewriter rewriter = new DistinctAggregationRewriter(lookup);
51+
52+
List<PlanNode> newSources =
53+
node.getChildren().stream()
54+
.map(lookup::resolve)
55+
.map(source -> source.accept(rewriter, true))
56+
.collect(toImmutableList());
57+
58+
if (rewriter.isRewritten()) {
59+
return Result.ofPlanNode(replaceChildren(node, newSources));
60+
}
61+
return Result.empty();
62+
}
63+
64+
private static boolean isDistinctOperator(AggregationNode node) {
65+
return node.getAggregations().isEmpty();
66+
}
67+
68+
private static class DistinctAggregationRewriter extends PlanVisitor<PlanNode, Boolean> {
69+
private final Lookup lookup;
70+
private boolean rewritten;
71+
72+
public DistinctAggregationRewriter(Lookup lookup) {
73+
this.lookup = lookup;
74+
this.rewritten = false;
75+
}
76+
77+
public boolean isRewritten() {
78+
return rewritten;
79+
}
80+
81+
private PlanNode rewriteChildren(PlanNode node, Boolean context) {
82+
List<PlanNode> newSources =
83+
node.getChildren().stream()
84+
.map(lookup::resolve)
85+
.map(source -> source.accept(this, context))
86+
.collect(toImmutableList());
87+
88+
return replaceChildren(node, newSources);
89+
}
90+
91+
@Override
92+
public PlanNode visitPlan(PlanNode node, Boolean context) {
93+
// Unable to remove distinct aggregation anymore.
94+
return rewriteChildren(node, false);
95+
}
96+
97+
/*@Override
98+
public PlanNode visitUnion(UnionNode node, Boolean context)
99+
{
100+
return rewriteChildren(node, context);
101+
}
102+
103+
@Override
104+
public PlanNode visitIntersect(IntersectNode node, Boolean context)
105+
{
106+
if (node.isDistinct()) {
107+
return rewriteChildren(node, context);
108+
}
109+
return visitPlan(node, context);
110+
}
111+
112+
@Override
113+
public PlanNode visitExcept(ExceptNode node, Boolean context)
114+
{
115+
if (node.isDistinct()) {
116+
return rewriteChildren(node, context);
117+
}
118+
return visitPlan(node, context);
119+
}*/
120+
121+
@Override
122+
public PlanNode visitAggregation(AggregationNode node, Boolean context) {
123+
boolean distinct = isDistinctOperator(node);
124+
125+
PlanNode rewrittenNode = lookup.resolve(node.getChild()).accept(this, distinct);
126+
127+
if (context && distinct) {
128+
this.rewritten = true;
129+
// Assumes underlying node has same output symbols as this distinct node
130+
return rewrittenNode;
131+
}
132+
133+
return AggregationNode.builderFrom(node)
134+
.setSource(rewrittenNode)
135+
.setPreGroupedSymbols(ImmutableList.of())
136+
.build();
137+
}
138+
}
139+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneAggregationSourceColumns;
3535
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneCorrelatedJoinColumns;
3636
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneCorrelatedJoinCorrelation;
37+
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneDistinctAggregation;
3738
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneEnforceSingleRowColumns;
3839
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneFillColumns;
3940
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneFilterColumns;
@@ -206,6 +207,8 @@ public LogicalOptimizeFactory(PlannerContext plannerContext) {
206207
new RemoveRedundantEnforceSingleRowNode(),
207208
new TransformUncorrelatedSubqueryToJoin())),
208209
new CheckSubqueryNodesAreRewritten(),
210+
new IterativeOptimizer(
211+
plannerContext, ruleStats, ImmutableSet.of(new PruneDistinctAggregation())),
209212
simplifyOptimizer,
210213
new PushPredicateIntoTableScan(),
211214
// redo columnPrune and inlineProjections after pushPredicateIntoTableScan

0 commit comments

Comments
 (0)