Skip to content

Commit c39abcb

Browse files
authored
Integrate window function optimization rules into IoTDB (#16953)
1 parent f7589ab commit c39abcb

40 files changed

+4825
-43
lines changed
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.relational.it.db.it;
21+
22+
import org.apache.iotdb.it.env.EnvFactory;
23+
import org.apache.iotdb.it.framework.IoTDBTestRunner;
24+
import org.apache.iotdb.itbase.category.TableClusterIT;
25+
import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
26+
27+
import org.junit.AfterClass;
28+
import org.junit.BeforeClass;
29+
import org.junit.Test;
30+
import org.junit.experimental.categories.Category;
31+
import org.junit.runner.RunWith;
32+
33+
import java.sql.Connection;
34+
import java.sql.Statement;
35+
36+
import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest;
37+
import static org.junit.Assert.fail;
38+
39+
@RunWith(IoTDBTestRunner.class)
40+
@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
41+
public class IoTDBWindowFunction3IT {
42+
private static final String DATABASE_NAME = "test";
43+
private static final String[] sqls =
44+
new String[] {
45+
"CREATE DATABASE " + DATABASE_NAME,
46+
"USE " + DATABASE_NAME,
47+
"create table demo (device string tag, value double field)",
48+
"insert into demo values (2021-01-01T09:05:00, 'd1', 3)",
49+
"insert into demo values (2021-01-01T09:07:00, 'd1', 5)",
50+
"insert into demo values (2021-01-01T09:09:00, 'd1', 3)",
51+
"insert into demo values (2021-01-01T09:10:00, 'd1', 1)",
52+
"insert into demo values (2021-01-01T09:08:00, 'd2', 2)",
53+
"insert into demo values (2021-01-01T09:15:00, 'd2', 4)",
54+
"FLUSH",
55+
"CLEAR ATTRIBUTE CACHE",
56+
};
57+
58+
protected static void insertData() {
59+
try (Connection connection = EnvFactory.getEnv().getTableConnection();
60+
Statement statement = connection.createStatement()) {
61+
for (String sql : sqls) {
62+
statement.execute(sql);
63+
}
64+
} catch (Exception e) {
65+
fail("insertData failed.");
66+
}
67+
}
68+
69+
@BeforeClass
70+
public static void setUp() {
71+
EnvFactory.getEnv().getConfig().getCommonConfig().setSortBufferSize(1024 * 1024);
72+
EnvFactory.getEnv().initClusterEnvironment();
73+
insertData();
74+
}
75+
76+
@AfterClass
77+
public static void tearDown() {
78+
EnvFactory.getEnv().cleanClusterEnvironment();
79+
}
80+
81+
@Test
82+
public void testMergeWindowFunctions() {
83+
String[] expectedHeader = new String[] {"time", "device", "value", "a", "b"};
84+
String[] retArray =
85+
new String[] {
86+
"2021-01-01T09:05:00.000Z,d1,3.0,3.0,4.0,",
87+
"2021-01-01T09:07:00.000Z,d1,5.0,5.0,6.0,",
88+
"2021-01-01T09:09:00.000Z,d1,3.0,3.0,4.0,",
89+
"2021-01-01T09:10:00.000Z,d1,1.0,1.0,2.0,",
90+
"2021-01-01T09:08:00.000Z,d2,2.0,2.0,4.0,",
91+
"2021-01-01T09:15:00.000Z,d2,4.0,4.0,6.0,",
92+
};
93+
tableResultSetEqualTest(
94+
"SELECT *, a + min(value) OVER (PARTITION BY device ORDER BY value) as b FROM (SELECT *, max(value) OVER (PARTITION BY device ORDER BY value) as a FROM demo) ORDER BY device, time",
95+
expectedHeader,
96+
retArray,
97+
DATABASE_NAME);
98+
}
99+
100+
@Test
101+
public void testSwapWindowFunctions() {
102+
String[] expectedHeader = new String[] {"time", "device", "value", "p1", "p2"};
103+
String[] retArray =
104+
new String[] {
105+
"2021-01-01T09:05:00.000Z,d1,3.0,1.0,6.0,",
106+
"2021-01-01T09:07:00.000Z,d1,5.0,1.0,5.0,",
107+
"2021-01-01T09:09:00.000Z,d1,3.0,1.0,6.0,",
108+
"2021-01-01T09:10:00.000Z,d1,1.0,1.0,1.0,",
109+
"2021-01-01T09:08:00.000Z,d2,2.0,2.0,2.0,",
110+
"2021-01-01T09:15:00.000Z,d2,4.0,2.0,4.0,",
111+
};
112+
tableResultSetEqualTest(
113+
"SELECT *, min(value) OVER (PARTITION BY device) as p1, sum(value) OVER (PARTITION BY device, value) as p2 FROM demo ORDER BY device, time",
114+
expectedHeader,
115+
retArray,
116+
DATABASE_NAME);
117+
}
118+
119+
@Test
120+
public void testPushDownFilterIntoWindow() {
121+
String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
122+
String[] retArray =
123+
new String[] {
124+
"2021-01-01T09:10:00.000Z,d1,1.0,1,",
125+
"2021-01-01T09:05:00.000Z,d1,3.0,2,",
126+
"2021-01-01T09:08:00.000Z,d2,2.0,1,",
127+
"2021-01-01T09:15:00.000Z,d2,4.0,2,",
128+
};
129+
tableResultSetEqualTest(
130+
"SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY device ORDER BY value) as rn FROM demo) WHERE rn <= 2 ORDER BY device, time",
131+
expectedHeader,
132+
retArray,
133+
DATABASE_NAME);
134+
}
135+
136+
@Test
137+
public void testPushDownLimitIntoWindow() {
138+
String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
139+
String[] retArray =
140+
new String[] {
141+
"2021-01-01T09:05:00.000Z,d1,3.0,2,", "2021-01-01T09:07:00.000Z,d1,5.0,4,",
142+
};
143+
tableResultSetEqualTest(
144+
"SELECT * FROM (SELECT *, row_number() OVER (PARTITION BY device ORDER BY value) as rn FROM demo) ORDER BY device, time LIMIT 2 ",
145+
expectedHeader,
146+
retArray,
147+
DATABASE_NAME);
148+
}
149+
150+
@Test
151+
public void testReplaceWindowWithRowNumber() {
152+
String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
153+
String[] retArray =
154+
new String[] {
155+
"2021-01-01T09:05:00.000Z,d1,3.0,1,",
156+
"2021-01-01T09:07:00.000Z,d1,5.0,2,",
157+
"2021-01-01T09:09:00.000Z,d1,3.0,3,",
158+
"2021-01-01T09:10:00.000Z,d1,1.0,4,",
159+
"2021-01-01T09:08:00.000Z,d2,2.0,1,",
160+
"2021-01-01T09:15:00.000Z,d2,4.0,2,",
161+
};
162+
tableResultSetEqualTest(
163+
"SELECT *, row_number() OVER (PARTITION BY device) AS rn FROM demo ORDER BY device, time",
164+
expectedHeader,
165+
retArray,
166+
DATABASE_NAME);
167+
}
168+
169+
@Test
170+
public void testRemoveRedundantWindow() {
171+
String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
172+
String[] retArray = new String[] {};
173+
tableResultSetEqualTest(
174+
"SELECT *, row_number() OVER (PARTITION BY device) AS rn FROM demo WHERE 1 = 2",
175+
expectedHeader,
176+
retArray,
177+
DATABASE_NAME);
178+
}
179+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.queryengine.execution.operator;
21+
22+
import org.apache.tsfile.read.common.block.TsBlock;
23+
24+
import java.util.Iterator;
25+
26+
public interface GroupedTopNBuilder {
27+
void addTsBlock(TsBlock tsBlock);
28+
29+
Iterator<TsBlock> getResult();
30+
31+
long getEstimatedSizeInBytes();
32+
}

0 commit comments

Comments
 (0)