Skip to content

Commit 4ebb89b

Browse files
authored
implement the intersect (distinct | all ) for table model (apache#16700)
1 parent ababbf3 commit 4ebb89b

File tree

15 files changed

+1015
-18
lines changed

15 files changed

+1015
-18
lines changed
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.relational.it.query.recent;
21+
22+
import org.apache.iotdb.it.env.EnvFactory;
23+
import org.apache.iotdb.it.framework.IoTDBTestRunner;
24+
import org.apache.iotdb.itbase.category.TableClusterIT;
25+
import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
26+
27+
import org.junit.AfterClass;
28+
import org.junit.BeforeClass;
29+
import org.junit.Test;
30+
import org.junit.experimental.categories.Category;
31+
import org.junit.runner.RunWith;
32+
33+
import static org.apache.iotdb.db.it.utils.TestUtils.prepareTableData;
34+
import static org.apache.iotdb.db.it.utils.TestUtils.tableAssertTestFail;
35+
import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest;
36+
37+
@RunWith(IoTDBTestRunner.class)
38+
@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
39+
public class IoTDBIntersectTableIT {
40+
protected static final String DATABASE_NAME = "test";
41+
protected static final String[] createSqls =
42+
new String[] {
43+
"CREATE DATABASE " + DATABASE_NAME,
44+
"USE " + DATABASE_NAME,
45+
// table1: ('d1', 1, 1) * 2, ('d1', 2, 2) *1
46+
"create table table1(device STRING TAG, s1 INT32 FIELD, s2 INT32 FIELD)",
47+
"insert into table1 values (1, 'd1', 1, 1)",
48+
"insert into table1 values (2, 'd1', 1, 1)",
49+
"insert into table1 values (3, 'd1', 2, 2)",
50+
// table2: ('d1', 1, 1.0) * 3, ('d1', 3, 3.0) *1
51+
"create table table2(device STRING TAG, s1 INT64 FIELD, s2 DOUBLE FIELD)",
52+
"insert into table2 values (1, 'd1', 1, 1.0)",
53+
"insert into table2 values (2, 'd1', 1, 1.0)",
54+
"insert into table2 values (3, 'd1', 1, 1.0)",
55+
"insert into table2 values (4, 'd1', 3, 3.0)",
56+
// table3: use for testing alias
57+
"create table table3(device STRING TAG, s1_testName INT64 FIELD, s2_testName DOUBLE FIELD)",
58+
"insert into table3 values (1, 'd1', 1, 1.0)",
59+
"insert into table3 values (2, 'd1', 1, 1.0)",
60+
"insert into table3 values (3, 'd1', 1, 1.0)",
61+
"insert into table3 values (4, 'd1', 3, 3.0)",
62+
// table4: test type compatible
63+
"create table table4(device STRING TAG, s1 TEXT FIELD, s2 DOUBLE FIELD)"
64+
};
65+
66+
@BeforeClass
67+
public static void setUp() throws Exception {
68+
EnvFactory.getEnv().initClusterEnvironment();
69+
prepareTableData(createSqls);
70+
}
71+
72+
@AfterClass
73+
public static void tearDown() throws Exception {
74+
EnvFactory.getEnv().cleanClusterEnvironment();
75+
}
76+
77+
@Test
78+
public void normalTest() {
79+
String[] expectedHeader = new String[] {"device", "s1", "s2"};
80+
81+
// --- INTERSECT (DISTINCT) ---
82+
// table1 and table2, expected one tuple : ('d1', 1, 1.0)
83+
String[] retArray =
84+
new String[] {
85+
"d1,1,1.0,",
86+
};
87+
tableResultSetEqualTest(
88+
"select device, s1, s2 from table1 intersect select device, s1, s2 from table2",
89+
expectedHeader,
90+
retArray,
91+
DATABASE_NAME);
92+
tableResultSetEqualTest(
93+
"select device, s1, s2 from table1 intersect distinct select device, s1, s2 from table2",
94+
expectedHeader,
95+
retArray,
96+
DATABASE_NAME);
97+
98+
// --- INTERSECT ALL ---
99+
// (1, 1.0) shows twice in table1, shows three times in table2
100+
// expected: min(2, 3) = 2 tuple
101+
retArray = new String[] {"d1,1,1.0,", "d1,1,1.0,"};
102+
tableResultSetEqualTest(
103+
"select device, s1, s2 from table1 intersect all select device, s1, s2 from table2",
104+
expectedHeader,
105+
retArray,
106+
DATABASE_NAME);
107+
// test table3, the column name is different
108+
tableResultSetEqualTest(
109+
"select device, s1, s2 from table1 intersect all select device, s1_testName, s2_testName from table3",
110+
expectedHeader,
111+
retArray,
112+
DATABASE_NAME);
113+
}
114+
115+
@Test
116+
public void mappingTest() {
117+
// table1 (aliased): (s1 as col_a) -> (1), (1), (2)
118+
// table2 (aliased): (s2 as col_a) -> (1.0), (1.0), (1.0), (3.0)
119+
// common value: (1.0)
120+
121+
String[] expectedHeader = new String[] {"col_a"};
122+
123+
// --- INTERSECT (DISTINCT) with alias ---
124+
String[] retArray = new String[] {"1.0,"};
125+
tableResultSetEqualTest(
126+
"select col_a from ((select s1 as col_a, device as col_b from table1) intersect (select s2 as col_a, device as col_b from table2)) order by col_a",
127+
expectedHeader,
128+
retArray,
129+
DATABASE_NAME);
130+
131+
// --- INTERSECT ALL with alias ---
132+
retArray = new String[] {"1.0,", "1.0,"};
133+
tableResultSetEqualTest(
134+
"select col_a from ((select s1 as col_a, device as col_b from table1) intersect all (select s2 as col_a, device as col_b from table2)) order by col_a",
135+
expectedHeader,
136+
retArray,
137+
DATABASE_NAME);
138+
}
139+
140+
@Test
141+
public void exceptionTest() {
142+
// type is incompatible (INT32 vs TEXT)
143+
tableAssertTestFail(
144+
"(select * from table1) intersect all (select * from table4)",
145+
"has incompatible types: INT32, TEXT",
146+
DATABASE_NAME);
147+
148+
tableAssertTestFail(
149+
"(select * from table1) intersect all (select time from table4)",
150+
"INTERSECT query has different number of fields: 4, 1",
151+
DATABASE_NAME);
152+
}
153+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExplainAnalyzeNode;
7676
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode;
7777
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
78+
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
7879
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode;
7980
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
8081
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
@@ -1106,6 +1107,15 @@ public List<String> visitUnion(UnionNode node, GraphContext context) {
11061107
return render(node, boxValue, context);
11071108
}
11081109

1110+
@Override
1111+
public List<String> visitIntersect(IntersectNode node, GraphContext context) {
1112+
List<String> boxValue = new ArrayList<>();
1113+
boxValue.add(String.format("Intersect-%s", node.getPlanNodeId().getId()));
1114+
boxValue.add(String.format("OutputSymbols: %s", node.getOutputSymbols()));
1115+
boxValue.add(String.format("isDistinct: %s", node.isDistinct()));
1116+
return render(node, boxValue, context);
1117+
}
1118+
11091119
private List<String> render(PlanNode node, List<String> nodeBoxString, GraphContext context) {
11101120
Box box = new Box(nodeBoxString);
11111121
List<List<String>> children = new ArrayList<>();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@
122122
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode;
123123
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
124124
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
125+
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
125126
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode;
126127
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
127128
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
@@ -312,6 +313,7 @@ public enum PlanNodeType {
312313
TABLE_WINDOW_FUNCTION((short) 1032),
313314
TABLE_INTO_NODE((short) 1033),
314315
TABLE_UNION_NODE((short) 1034),
316+
TABLE_INTERSECT_NODE((short) 1035),
315317

316318
RELATIONAL_INSERT_TABLET((short) 2000),
317319
RELATIONAL_INSERT_ROW((short) 2001),
@@ -701,6 +703,8 @@ public static PlanNode deserialize(ByteBuffer buffer, short nodeType) {
701703
buffer);
702704
case 1034:
703705
return UnionNode.deserialize(buffer);
706+
case 1035:
707+
return IntersectNode.deserialize(buffer);
704708
case 2000:
705709
return RelationalInsertTabletNode.deserialize(buffer);
706710
case 2001:

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@
126126
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode;
127127
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
128128
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
129+
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
129130
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode;
130131
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
131132
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
@@ -840,4 +841,8 @@ public R visitPatternRecognition(PatternRecognitionNode node, C context) {
840841
public R visitUnion(UnionNode node, C context) {
841842
return visitPlan(node, context);
842843
}
844+
845+
public R visitIntersect(IntersectNode node, C context) {
846+
return visitPlan(node, context);
847+
}
843848
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
5858
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
5959
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
60+
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
6061
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
6162
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.Measure;
6263
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
@@ -1145,6 +1146,24 @@ protected RelationPlan visitUnion(Union node, Void context) {
11451146
planNode, analysis.getScope(node), planNode.getOutputSymbols(), outerContext);
11461147
}
11471148

1149+
@Override
1150+
protected RelationPlan visitIntersect(Intersect node, Void context) {
1151+
Preconditions.checkArgument(
1152+
!node.getRelations().isEmpty(), "No relations specified for intersect");
1153+
SetOperationPlan setOperationPlan = process(node);
1154+
1155+
PlanNode intersectNode =
1156+
new IntersectNode(
1157+
idAllocator.genPlanNodeId(),
1158+
setOperationPlan.getChildren(),
1159+
setOperationPlan.getSymbolMapping(),
1160+
ImmutableList.copyOf(setOperationPlan.getSymbolMapping().keySet()),
1161+
node.isDistinct());
1162+
1163+
return new RelationPlan(
1164+
intersectNode, analysis.getScope(node), intersectNode.getOutputSymbols(), outerContext);
1165+
}
1166+
11481167
private SetOperationPlan process(SetOperation node) {
11491168
RelationType outputFields = analysis.getOutputDescriptor(node);
11501169
List<Symbol> outputs =
@@ -1191,11 +1210,6 @@ protected RelationPlan visitValues(Values node, Void context) {
11911210
throw new IllegalStateException("Values is not supported in current version.");
11921211
}
11931212

1194-
@Override
1195-
protected RelationPlan visitIntersect(Intersect node, Void context) {
1196-
throw new IllegalStateException("Intersect is not supported in current version.");
1197-
}
1198-
11991213
@Override
12001214
protected RelationPlan visitExcept(Except node, Void context) {
12011215
throw new IllegalStateException("Except is not supported in current version.");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
21+
22+
import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction;
23+
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
24+
import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
25+
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
26+
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
27+
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
28+
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
29+
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression;
30+
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
31+
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FunctionCall;
32+
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName;
33+
import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
34+
import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
35+
36+
import com.google.common.collect.ImmutableList;
37+
38+
import static java.util.Objects.requireNonNull;
39+
import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.Intersect.distinct;
40+
import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.intersect;
41+
42+
public class ImplementIntersectAll implements Rule<IntersectNode> {
43+
44+
private static final Pattern<IntersectNode> PATTERN = intersect().with(distinct().equalTo(false));
45+
46+
private final Metadata metadata;
47+
48+
public ImplementIntersectAll(Metadata metadata) {
49+
this.metadata = requireNonNull(metadata, "metadata is null");
50+
}
51+
52+
@Override
53+
public Pattern<IntersectNode> getPattern() {
54+
return PATTERN;
55+
}
56+
57+
@Override
58+
public Result apply(IntersectNode node, Captures captures, Context context) {
59+
60+
SetOperationNodeTranslator translator =
61+
new SetOperationNodeTranslator(
62+
metadata, context.getSymbolAllocator(), context.getIdAllocator());
63+
64+
// 1. translate the intersect(all) node to other planNodes
65+
SetOperationNodeTranslator.TranslationResult translationResult =
66+
translator.makeSetContainmentPlanForAll(node);
67+
68+
// 2. add the filter node above the result node from translation process
69+
// filter condition : row_number <= least(countA, countB...)
70+
Expression minCount = translationResult.getCountSymbols().get(0).toSymbolReference();
71+
for (int i = 1; i < translationResult.getCountSymbols().size(); i++) {
72+
minCount =
73+
new FunctionCall(
74+
QualifiedName.of(TableBuiltinScalarFunction.LEAST.getFunctionName()),
75+
ImmutableList.of(
76+
minCount, translationResult.getCountSymbols().get(i).toSymbolReference()));
77+
}
78+
79+
FilterNode filterNode =
80+
new FilterNode(
81+
context.getIdAllocator().genPlanNodeId(),
82+
translationResult.getPlanNode(),
83+
new ComparisonExpression(
84+
ComparisonExpression.Operator.LESS_THAN_OR_EQUAL,
85+
translationResult.getRowNumberSymbol().toSymbolReference(),
86+
minCount));
87+
88+
// 3. add the project node to remove the redundant columns
89+
return Result.ofPlanNode(
90+
new ProjectNode(
91+
context.getIdAllocator().genPlanNodeId(),
92+
filterNode,
93+
Assignments.identity(node.getOutputSymbols())));
94+
}
95+
}

0 commit comments

Comments
 (0)