Skip to content

Commit 67a9fa1

Browse files
authored
with_clause feature
1 parent 51fe585 commit 67a9fa1

File tree

3 files changed

+356
-14
lines changed

3 files changed

+356
-14
lines changed
Lines changed: 332 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,332 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.relational.it.query.recent;
21+
22+
import org.apache.iotdb.isession.ITableSession;
23+
import org.apache.iotdb.isession.SessionConfig;
24+
import org.apache.iotdb.isession.SessionDataSet;
25+
import org.apache.iotdb.it.env.EnvFactory;
26+
import org.apache.iotdb.it.framework.IoTDBTestRunner;
27+
import org.apache.iotdb.itbase.category.TableClusterIT;
28+
import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
29+
import org.apache.iotdb.itbase.env.BaseEnv;
30+
import org.apache.iotdb.rpc.IoTDBConnectionException;
31+
import org.apache.iotdb.rpc.StatementExecutionException;
32+
33+
import org.junit.After;
34+
import org.junit.AfterClass;
35+
import org.junit.Assert;
36+
import org.junit.Before;
37+
import org.junit.BeforeClass;
38+
import org.junit.Test;
39+
import org.junit.experimental.categories.Category;
40+
import org.junit.runner.RunWith;
41+
42+
import java.sql.Connection;
43+
import java.sql.DriverManager;
44+
import java.sql.ResultSet;
45+
import java.sql.ResultSetMetaData;
46+
import java.sql.SQLException;
47+
import java.sql.Statement;
48+
import java.util.Locale;
49+
50+
import static org.apache.iotdb.db.it.utils.TestUtils.tableAssertTestFail;
51+
import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest;
52+
import static org.junit.Assert.assertEquals;
53+
import static org.junit.Assert.fail;
54+
55+
@RunWith(IoTDBTestRunner.class)
56+
@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
57+
public class IoTDBCteIT {
58+
private static final String DATABASE_NAME = "testdb";
59+
60+
private static final String[] creationSqls =
61+
new String[] {
62+
"CREATE DATABASE IF NOT EXISTS testdb",
63+
"USE testdb",
64+
"CREATE TABLE IF NOT EXISTS testtb(deviceid STRING TAG, voltage FLOAT FIELD)",
65+
"INSERT INTO testtb VALUES(1000, 'd1', 100.0)",
66+
"INSERT INTO testtb VALUES(2000, 'd1', 200.0)",
67+
"INSERT INTO testtb VALUES(1000, 'd2', 300.0)",
68+
};
69+
70+
private static final String dropDbSqls = "DROP DATABASE IF EXISTS testdb";
71+
72+
@BeforeClass
73+
public static void setUpClass() {
74+
Locale.setDefault(Locale.ENGLISH);
75+
76+
EnvFactory.getEnv()
77+
.getConfig()
78+
.getCommonConfig()
79+
.setPartitionInterval(1000)
80+
.setMemtableSizeThreshold(10000);
81+
EnvFactory.getEnv().initClusterEnvironment();
82+
}
83+
84+
@AfterClass
85+
public static void tearDownClass() {
86+
EnvFactory.getEnv().cleanClusterEnvironment();
87+
}
88+
89+
@Before
90+
public void setUp() throws SQLException {
91+
prepareData();
92+
}
93+
94+
@After
95+
public void tearDown() {
96+
try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
97+
Statement statement = connection.createStatement()) {
98+
statement.execute(dropDbSqls);
99+
} catch (Exception e) {
100+
fail(e.getMessage());
101+
}
102+
}
103+
104+
@Test
105+
public void testQuery() {
106+
String[] expectedHeader = new String[] {"time", "deviceid", "voltage"};
107+
String[] retArray =
108+
new String[] {
109+
"1970-01-01T00:00:01.000Z,d1,100.0,",
110+
"1970-01-01T00:00:02.000Z,d1,200.0,",
111+
"1970-01-01T00:00:01.000Z,d2,300.0,"
112+
};
113+
tableResultSetEqualTest(
114+
"with cte as (select * from testtb) select * from cte order by deviceid",
115+
expectedHeader,
116+
retArray,
117+
DATABASE_NAME);
118+
119+
expectedHeader = new String[] {"deviceid", "voltage"};
120+
retArray = new String[] {"d1,100.0,", "d1,200.0,", "d2,300.0,"};
121+
tableResultSetEqualTest(
122+
"with cte as (select deviceid, voltage from testtb) select * from cte order by deviceid",
123+
expectedHeader,
124+
retArray,
125+
DATABASE_NAME);
126+
127+
expectedHeader = new String[] {"deviceid", "avg_voltage"};
128+
retArray = new String[] {"d1,150.0,", "d2,300.0,"};
129+
tableResultSetEqualTest(
130+
"with cte as (select deviceid, avg(voltage) as avg_voltage from testtb group by deviceid) select * from cte order by deviceid",
131+
expectedHeader,
132+
retArray,
133+
DATABASE_NAME);
134+
}
135+
136+
@Test
137+
public void testPartialColumn() {
138+
String[] expectedHeader = new String[] {"id", "v"};
139+
String[] retArray = new String[] {"d1,100.0,", "d1,200.0,", "d2,300.0,"};
140+
tableResultSetEqualTest(
141+
"with cte(id, v) as (select deviceid, voltage from testtb) select * from cte order by id",
142+
expectedHeader,
143+
retArray,
144+
DATABASE_NAME);
145+
146+
tableAssertTestFail(
147+
"with cte(v) as (select deviceid, voltage from testtb) select * from cte order by id",
148+
"701: Column alias list has 1 entries but relation has 2 columns",
149+
DATABASE_NAME);
150+
}
151+
152+
@Test
153+
public void testExplain() throws SQLException {
154+
try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
155+
Statement statement = connection.createStatement()) {
156+
statement.execute("USE testdb");
157+
// explain
158+
ResultSet resultSet =
159+
statement.executeQuery(
160+
"explain with cte as (select * from testtb) select * from cte order by deviceid");
161+
ResultSetMetaData metaData = resultSet.getMetaData();
162+
assertEquals(metaData.getColumnCount(), 1);
163+
assertEquals(metaData.getColumnName(1), "distribution plan");
164+
165+
// explain analyze
166+
resultSet =
167+
statement.executeQuery(
168+
"explain analyze with cte as (select * from testtb) select * from cte order by deviceid");
169+
metaData = resultSet.getMetaData();
170+
assertEquals(metaData.getColumnCount(), 1);
171+
assertEquals(metaData.getColumnName(1), "Explain Analyze");
172+
}
173+
}
174+
175+
@Test
176+
public void testMultiReference() {
177+
String[] expectedHeader = new String[] {"time", "deviceid", "voltage"};
178+
String[] retArray = new String[] {"1970-01-01T00:00:01.000Z,d2,300.0,"};
179+
tableResultSetEqualTest(
180+
"with cte as (select * from testtb) select * from cte where voltage > (select avg(voltage) from cte)",
181+
expectedHeader,
182+
retArray,
183+
DATABASE_NAME);
184+
}
185+
186+
@Test
187+
public void testDomain() {
188+
String[] expectedHeader = new String[] {"deviceid", "voltage"};
189+
String[] retArray = new String[] {"d1,100.0,", "d1,200.0,", "d2,300.0,"};
190+
tableResultSetEqualTest(
191+
"with testtb as (select deviceid, voltage from testtb) select * from testtb order by deviceid",
192+
expectedHeader,
193+
retArray,
194+
DATABASE_NAME);
195+
196+
tableAssertTestFail(
197+
"with testtb as (select voltage from testtb) select * from testtb order by deviceid",
198+
"616: Column 'deviceid' cannot be resolved",
199+
DATABASE_NAME);
200+
}
201+
202+
@Test
203+
public void testSession() throws IoTDBConnectionException, StatementExecutionException {
204+
try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) {
205+
session.executeNonQueryStatement("use testdb");
206+
SessionDataSet dataSet =
207+
session.executeQueryStatement("with cte as (select * from testtb) select * from cte");
208+
209+
assertEquals(dataSet.getColumnNames().size(), 3);
210+
assertEquals(dataSet.getColumnNames().get(0), "time");
211+
assertEquals(dataSet.getColumnNames().get(1), "deviceid");
212+
assertEquals(dataSet.getColumnNames().get(2), "voltage");
213+
int cnt = 0;
214+
while (dataSet.hasNext()) {
215+
dataSet.next();
216+
cnt++;
217+
}
218+
Assert.assertEquals(3, cnt);
219+
}
220+
}
221+
222+
@Test
223+
public void testJdbc() throws ClassNotFoundException, SQLException {
224+
BaseEnv env = EnvFactory.getEnv();
225+
String uri = String.format("jdbc:iotdb://%s:%s?sql_dialect=table", env.getIP(), env.getPort());
226+
Class.forName("org.apache.iotdb.jdbc.IoTDBDriver");
227+
try (Connection connection =
228+
DriverManager.getConnection(
229+
uri, SessionConfig.DEFAULT_USER, SessionConfig.DEFAULT_PASSWORD);
230+
Statement statement = connection.createStatement()) {
231+
statement.executeUpdate("use testdb");
232+
ResultSet resultSet =
233+
statement.executeQuery("with cte as (select * from testtb) select * from cte");
234+
235+
final ResultSetMetaData metaData = resultSet.getMetaData();
236+
assertEquals(metaData.getColumnCount(), 3);
237+
assertEquals(metaData.getColumnLabel(1), "time");
238+
assertEquals(metaData.getColumnLabel(2), "deviceid");
239+
assertEquals(metaData.getColumnLabel(3), "voltage");
240+
241+
int cnt = 0;
242+
while (resultSet.next()) {
243+
cnt++;
244+
}
245+
Assert.assertEquals(3, cnt);
246+
}
247+
}
248+
249+
@Test
250+
public void testNest() {
251+
String sql1 =
252+
"WITH"
253+
+ " cte1 AS (select deviceid, voltage from testtb where voltage > 200),"
254+
+ " cte2 AS (SELECT voltage FROM cte1)"
255+
+ " SELECT * FROM cte2";
256+
257+
String sql2 =
258+
"WITH"
259+
+ " cte2 AS (SELECT voltage FROM cte1),"
260+
+ " cte1 AS (select deviceid, voltage from testtb where voltage > 200)"
261+
+ " SELECT * FROM cte2";
262+
263+
String[] expectedHeader = new String[] {"voltage"};
264+
String[] retArray = new String[] {"300.0,"};
265+
tableResultSetEqualTest(sql1, expectedHeader, retArray, DATABASE_NAME);
266+
267+
tableAssertTestFail(sql2, "550: Table 'testdb.cte1' does not exist.", DATABASE_NAME);
268+
}
269+
270+
@Test
271+
public void testRecursive() {
272+
String sql =
273+
"WITH RECURSIVE t(n) AS ("
274+
+ " VALUES (1)"
275+
+ " UNION ALL"
276+
+ " SELECT n+1 FROM t WHERE n < 100)"
277+
+ " SELECT sum(n) FROM t";
278+
279+
tableAssertTestFail(sql, "701: recursive cte is not supported yet", DATABASE_NAME);
280+
}
281+
282+
@Test
283+
public void testPrivileges() throws SQLException {
284+
Connection adminCon = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
285+
Statement adminStmt = adminCon.createStatement();
286+
try {
287+
adminStmt.execute("CREATE USER tmpuser 'tmppw123456789'");
288+
adminStmt.execute("USE testdb");
289+
adminStmt.execute(
290+
"CREATE TABLE IF NOT EXISTS testtb1(deviceid STRING TAG, voltage FLOAT FIELD)");
291+
adminStmt.execute("GRANT SELECT ON testdb.testtb TO USER tmpuser");
292+
293+
try (Connection connection =
294+
EnvFactory.getEnv()
295+
.getConnection("tmpuser", "tmppw123456789", BaseEnv.TABLE_SQL_DIALECT);
296+
Statement statement = connection.createStatement()) {
297+
statement.execute("USE testdb");
298+
statement.execute("with cte as (select * from testtb) select * from cte");
299+
}
300+
301+
try (Connection connection =
302+
EnvFactory.getEnv()
303+
.getConnection("tmpuser", "tmppw123456789", BaseEnv.TABLE_SQL_DIALECT);
304+
Statement statement = connection.createStatement()) {
305+
statement.execute("USE testdb");
306+
statement.execute("with cte as (select * from testtb1) select * from testtb");
307+
fail("No exception!");
308+
} catch (Exception e) {
309+
Assert.assertTrue(
310+
e.getMessage(),
311+
e.getMessage()
312+
.contains(
313+
"803: Access Denied: No permissions for this operation, please add privilege SELECT ON testdb.testtb1"));
314+
}
315+
} finally {
316+
adminStmt.execute("DROP USER tmpuser");
317+
adminStmt.execute("DROP TABLE IF EXISTS testtb1");
318+
}
319+
}
320+
321+
private static void prepareData() {
322+
try (Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
323+
Statement statement = connection.createStatement()) {
324+
325+
for (String sql : creationSqls) {
326+
statement.execute(sql);
327+
}
328+
} catch (Exception e) {
329+
fail(e.getMessage());
330+
}
331+
}
332+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -898,20 +898,7 @@ private Scope analyzeWith(Query node, Optional<Scope> scope) {
898898

899899
boolean isRecursive = false;
900900
if (with.isRecursive()) {
901-
// cannot nest pattern recognition within recursive query
902-
903-
isRecursive = tryProcessRecursiveQuery(withQuery, name, withScopeBuilder);
904-
// WITH query is not shaped accordingly to the rules for expandable query and will be
905-
// processed like a plain WITH query.
906-
// Since RECURSIVE is specified, any reference to WITH query name is considered a
907-
// recursive reference and is not allowed.
908-
if (!isRecursive) {
909-
List<Node> recursiveReferences =
910-
findReferences(withQuery.getQuery(), withQuery.getName());
911-
if (!recursiveReferences.isEmpty()) {
912-
throw new SemanticException("recursive reference not allowed in this context");
913-
}
914-
}
901+
throw new SemanticException("recursive cte is not supported yet.");
915902
}
916903

917904
if (!isRecursive) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,29 @@ protected RelationPlan visitTable(final Table table, final Void context) {
236236
}
237237

238238
final Scope scope = analysis.getScope(table);
239+
240+
// Common Table Expression
241+
final Query namedQuery = analysis.getNamedQuery(table);
242+
if (namedQuery != null) {
243+
RelationPlan subPlan;
244+
if (analysis.isExpandableQuery(namedQuery)) {
245+
// recursive cte
246+
throw new SemanticException("unexpected recursive cte");
247+
} else {
248+
subPlan = process(namedQuery, null);
249+
}
250+
251+
// Add implicit coercions if view query produces types that don't match the declared output
252+
// types of the view (e.g., if the underlying tables referenced by the view changed)
253+
List<Type> types =
254+
analysis.getOutputDescriptor(table).getAllFields().stream()
255+
.map(Field::getType)
256+
.collect(toImmutableList());
257+
258+
NodeAndMappings coerced = coerce(subPlan, types, symbolAllocator, idAllocator);
259+
return new RelationPlan(coerced.getNode(), scope, coerced.getFields(), outerContext);
260+
}
261+
239262
final ImmutableList.Builder<Symbol> outputSymbolsBuilder = ImmutableList.builder();
240263
final ImmutableMap.Builder<Symbol, ColumnSchema> symbolToColumnSchema = ImmutableMap.builder();
241264
final Collection<Field> fields = scope.getRelationType().getAllFields();

0 commit comments

Comments
 (0)