Skip to content

Commit aa16713

Browse files
authored
Implement ASOF INNER JOIN
1 parent 21dfc00 commit aa16713

File tree

35 files changed

+2352
-81
lines changed

35 files changed

+2352
-81
lines changed
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.relational.it.db.it;
21+
22+
import org.apache.iotdb.it.env.EnvFactory;
23+
import org.apache.iotdb.it.framework.IoTDBTestRunner;
24+
import org.apache.iotdb.itbase.category.TableClusterIT;
25+
import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
26+
27+
import org.junit.AfterClass;
28+
import org.junit.BeforeClass;
29+
import org.junit.Test;
30+
import org.junit.experimental.categories.Category;
31+
import org.junit.runner.RunWith;
32+
33+
import java.sql.Connection;
34+
import java.sql.Statement;
35+
36+
import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest;
37+
import static org.junit.Assert.fail;
38+
39+
@RunWith(IoTDBTestRunner.class)
40+
@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
41+
public class IoTDBAsofJoinTableIT {
42+
private static final String DATABASE_NAME = "test";
43+
44+
private static final String[] sql =
45+
new String[] {
46+
"create database test",
47+
"use test",
48+
"create table table1(device string tag, value int32 field)",
49+
"insert into table1(time,device,value) values(2020-01-01 00:00:01.000,'d1',1)",
50+
"insert into table1(time,device,value) values(2020-01-01 00:00:03.000,'d1',3)",
51+
"insert into table1(time,device,value) values(2020-01-01 00:00:05.000,'d1',5)",
52+
"insert into table1(time,device,value) values(2020-01-01 00:00:08.000,'d2',8)",
53+
"create table table2(device string tag, value int32 field)",
54+
"insert into table2(time,device,value) values(2020-01-01 00:00:02.000,'d1',20)",
55+
"insert into table2(time,device,value) values(2020-01-01 00:00:03.000,'d1',30)",
56+
"insert into table2(time,device,value) values(2020-01-01 00:00:04.000,'d2',40)",
57+
"insert into table2(time,device,value) values(2020-01-01 00:00:05.000,'d2',50)"
58+
};
59+
String[] expectedHeader =
60+
new String[] {"time1", "device1", "value1", "time2", "device2", "value2"};
61+
;
62+
String[] retArray;
63+
64+
@BeforeClass
65+
public static void setUp() throws Exception {
66+
EnvFactory.getEnv().initClusterEnvironment();
67+
EnvFactory.getEnv()
68+
.getConfig()
69+
.getCommonConfig()
70+
.setMaxTsBlockLineNumber(2)
71+
.setMaxNumberOfPointsInPage(5);
72+
insertData();
73+
}
74+
75+
@AfterClass
76+
public static void tearDown() throws Exception {
77+
EnvFactory.getEnv().cleanClusterEnvironment();
78+
}
79+
80+
private static void insertData() {
81+
try (Connection connection = EnvFactory.getEnv().getTableConnection();
82+
Statement statement = connection.createStatement()) {
83+
for (String sql : sql) {
84+
statement.execute(sql);
85+
}
86+
} catch (Exception e) {
87+
e.printStackTrace();
88+
fail(e.getMessage());
89+
}
90+
}
91+
92+
@Test
93+
public void innerJoinTest() {
94+
retArray =
95+
new String[] {
96+
"2020-01-01T00:00:03.000Z,d1,3,2020-01-01T00:00:03.000Z,d1,30,",
97+
"2020-01-01T00:00:05.000Z,d1,5,2020-01-01T00:00:05.000Z,d2,50,",
98+
"2020-01-01T00:00:08.000Z,d2,8,2020-01-01T00:00:05.000Z,d2,50,"
99+
};
100+
tableResultSetEqualTest(
101+
"SELECT t1.time as time1, t1.device as device1, t1.value as value1, \n"
102+
+ " t2.time as time2, t2.device as device2, t2.value as value2 \n"
103+
+ "FROM \n"
104+
+ "table1 t1 ASOF INNER JOIN table2 t2\n"
105+
+ "ON\n"
106+
+ "t1.time>=t2.time\n"
107+
+ "order by time1",
108+
expectedHeader,
109+
retArray,
110+
DATABASE_NAME);
111+
112+
retArray =
113+
new String[] {
114+
"2020-01-01T00:00:03.000Z,d1,3,2020-01-01T00:00:03.000Z,d1,30,",
115+
"2020-01-01T00:00:05.000Z,d1,5,2020-01-01T00:00:05.000Z,d2,50,",
116+
};
117+
tableResultSetEqualTest(
118+
"SELECT t1.time as time1, t1.device as device1, t1.value as value1, \n"
119+
+ " t2.time as time2, t2.device as device2, t2.value as value2 \n"
120+
+ "FROM \n"
121+
+ "table1 t1 ASOF(tolerance 2s) INNER JOIN table2 t2\n"
122+
+ "ON\n"
123+
+ "t1.time>=t2.time\n"
124+
+ "order by time1",
125+
expectedHeader,
126+
retArray,
127+
DATABASE_NAME);
128+
129+
retArray =
130+
new String[] {
131+
"2020-01-01T00:00:03.000Z,d1,3,2020-01-01T00:00:03.000Z,d1,30,",
132+
"2020-01-01T00:00:05.000Z,d1,5,2020-01-01T00:00:03.000Z,d1,30,",
133+
};
134+
tableResultSetEqualTest(
135+
"SELECT t1.time as time1, t1.device as device1, t1.value as value1, \n"
136+
+ " t2.time as time2, t2.device as device2, t2.value as value2 \n"
137+
+ "FROM \n"
138+
+ "table1 t1 ASOF(tolerance 2s) INNER JOIN table2 t2\n"
139+
+ "ON\n"
140+
+ "t1.device=t2.device AND t1.time>=t2.time\n"
141+
+ "order by time1",
142+
expectedHeader,
143+
retArray,
144+
DATABASE_NAME);
145+
}
146+
}

integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2213,6 +2213,121 @@ public void lastCacheTest() {
22132213
repeatTest(sql, expectedHeader, retArray, DATABASE_NAME, 3);
22142214
}
22152215

2216+
@Test
2217+
public void asofJoinTest() {
2218+
expectedHeader = new String[] {"time", "device", "level", "time", "device", "level"};
2219+
retArray =
2220+
new String[] {
2221+
"1971-01-01T00:00:00.000Z,d1,l1,1970-01-01T00:00:00.100Z,d1,l5,",
2222+
"1971-01-01T00:01:40.000Z,d1,l1,1971-01-01T00:00:00.000Z,d1,l1,",
2223+
"1971-01-01T00:01:40.000Z,d1,l1,1971-01-01T00:00:00.000Z,d999,null,",
2224+
"1971-01-01T00:01:40.000Z,d1,l1,1971-01-01T00:00:00.000Z,null,l999,",
2225+
"1970-01-01T00:00:00.020Z,d1,l2,1970-01-01T00:00:00.010Z,d11,l11,",
2226+
"1971-01-01T00:00:00.100Z,d1,l2,1971-01-01T00:00:00.000Z,d1,l1,",
2227+
"1971-01-01T00:00:00.100Z,d1,l2,1971-01-01T00:00:00.000Z,d999,null,",
2228+
"1971-01-01T00:00:00.100Z,d1,l2,1971-01-01T00:00:00.000Z,null,l999,",
2229+
"1971-04-26T17:46:40.000Z,d1,l2,1971-01-01T00:00:00.000Z,d1,l1,",
2230+
"1971-04-26T17:46:40.000Z,d1,l2,1971-01-01T00:00:00.000Z,d999,null,"
2231+
};
2232+
// test single join condition
2233+
tableResultSetEqualTest(
2234+
"select table0.time,table0.device,table0.level,table1.time,table1.device,table1.level from table0 asof join table1 on "
2235+
+ "table0.time>table1.time "
2236+
+ "order by table0.device,table0.level,table0.time,table1.device,table1.level limit 10",
2237+
expectedHeader,
2238+
retArray,
2239+
DATABASE_NAME);
2240+
// test expr and '>=' in ASOF condition
2241+
tableResultSetEqualTest(
2242+
"select table0.time,table0.device,table0.level,table1.time,table1.device,table1.level from table0 asof join table1 on "
2243+
+ "table0.time>=table1.time+1 "
2244+
+ "order by table0.device,table0.level,table0.time,table1.device,table1.level limit 10",
2245+
expectedHeader,
2246+
retArray,
2247+
DATABASE_NAME);
2248+
2249+
retArray =
2250+
new String[] {
2251+
"1971-01-01T00:00:00.000Z,d1,l1,1970-01-01T00:00:00.000Z,d1,l1,",
2252+
"1971-01-01T00:01:40.000Z,d1,l1,1971-01-01T00:00:00.000Z,d1,l1,",
2253+
"1971-01-01T00:00:00.100Z,d1,l2,1970-01-01T00:00:00.020Z,d1,l2,",
2254+
"1971-04-26T17:46:40.000Z,d1,l2,1970-01-01T00:00:00.020Z,d1,l2,",
2255+
"1971-01-01T00:00:00.500Z,d1,l3,1970-01-01T00:00:00.040Z,d1,l3,",
2256+
"1971-04-26T17:46:40.020Z,d1,l3,1970-01-01T00:00:00.040Z,d1,l3,",
2257+
"1971-01-01T00:00:01.000Z,d1,l4,1970-01-01T00:00:00.080Z,d1,l4,",
2258+
"1971-04-26T18:01:40.000Z,d1,l4,1970-01-01T00:00:00.080Z,d1,l4,",
2259+
"1971-01-01T00:00:10.000Z,d1,l5,1970-01-01T00:00:00.100Z,d1,l5,",
2260+
"1971-08-20T11:33:20.000Z,d1,l5,1970-01-01T00:00:00.100Z,d1,l5,"
2261+
};
2262+
// test multi join conditions
2263+
tableResultSetEqualTest(
2264+
"select table0.time,table0.device,table0.level,table1.time,table1.device,table1.level from table0 asof join table1 on "
2265+
+ "table0.device=table1.device and table1.level=table0.level and table0.time>table1.time "
2266+
+ "order by table0.device,table0.level,table0.time,table1.device,table1.level",
2267+
expectedHeader,
2268+
retArray,
2269+
DATABASE_NAME);
2270+
// test expr and '>=' in ASOF condition
2271+
tableResultSetEqualTest(
2272+
"select table0.time,table0.device,table0.level,table1.time,table1.device,table1.level from table0 asof join table1 on "
2273+
+ "table0.device=table1.device and table1.level=table0.level and table0.time>=table1.time+1 "
2274+
+ "order by table0.device,table0.level,table0.time,table1.device,table1.level",
2275+
expectedHeader,
2276+
retArray,
2277+
DATABASE_NAME);
2278+
2279+
retArray =
2280+
new String[] {
2281+
"1970-01-01T00:00:00.000Z,d1,l1,1970-01-01T00:00:00.010Z,d11,l11,",
2282+
"1970-01-01T00:00:00.020Z,d1,l2,1970-01-01T00:00:00.030Z,d11,l11,",
2283+
"1970-01-01T00:00:00.040Z,d1,l3,1970-01-01T00:00:00.080Z,d1,l4,",
2284+
"1970-01-01T00:00:00.080Z,d1,l4,1970-01-01T00:00:00.100Z,d1,l5,",
2285+
"1970-01-01T00:00:00.100Z,d1,l5,1971-01-01T00:00:00.000Z,d1,l1,",
2286+
"1970-01-01T00:00:00.100Z,d1,l5,1971-01-01T00:00:00.000Z,d999,null,",
2287+
"1970-01-01T00:00:00.100Z,d1,l5,1971-01-01T00:00:00.000Z,null,l999,",
2288+
"1970-01-01T00:00:00.000Z,d2,l1,1970-01-01T00:00:00.010Z,d11,l11,",
2289+
"1970-01-01T00:00:00.020Z,d2,l2,1970-01-01T00:00:00.030Z,d11,l11,",
2290+
"1970-01-01T00:00:00.040Z,d2,l3,1970-01-01T00:00:00.080Z,d1,l4,"
2291+
};
2292+
// test single join condition
2293+
tableResultSetEqualTest(
2294+
"select table0.time,table0.device,table0.level,table1.time,table1.device,table1.level from table0 asof join table1 on "
2295+
+ "table0.time<table1.time "
2296+
+ "order by table0.device,table0.level,table0.time,table1.device,table1.level limit 10",
2297+
expectedHeader,
2298+
retArray,
2299+
DATABASE_NAME);
2300+
// test expr and '<=' in ASOF condition
2301+
tableResultSetEqualTest(
2302+
"select table0.time,table0.device,table0.level,table1.time,table1.device,table1.level from table0 asof join table1 on "
2303+
+ "table0.time<=table1.time-1 "
2304+
+ "order by table0.device,table0.level,table0.time,table1.device,table1.level limit 10",
2305+
expectedHeader,
2306+
retArray,
2307+
DATABASE_NAME);
2308+
2309+
retArray =
2310+
new String[] {
2311+
"1970-01-01T00:00:00.000Z,d1,l1,1971-01-01T00:00:00.000Z,d1,l1,",
2312+
};
2313+
// test multi join conditions
2314+
tableResultSetEqualTest(
2315+
"select table0.time,table0.device,table0.level,table1.time,table1.device,table1.level from table0 asof join table1 on "
2316+
+ "table0.device=table1.device and table1.level=table0.level and table0.time<table1.time "
2317+
+ "order by table0.device,table0.level,table0.time,table1.device,table1.level",
2318+
expectedHeader,
2319+
retArray,
2320+
DATABASE_NAME);
2321+
// test expr and '>=' in ASOF condition
2322+
tableResultSetEqualTest(
2323+
"select table0.time,table0.device,table0.level,table1.time,table1.device,table1.level from table0 asof join table1 on "
2324+
+ "table0.device=table1.device and table1.level=table0.level and table0.time<=table1.time-1 "
2325+
+ "order by table0.device,table0.level,table0.time,table1.device,table1.level",
2326+
expectedHeader,
2327+
retArray,
2328+
DATABASE_NAME);
2329+
}
2330+
22162331
@Test
22172332
public void exceptionTest() {
22182333
String errMsg = TSStatusCode.SEMANTIC_ERROR.getStatusCode() + ": " + ONLY_SUPPORT_EQUI_JOIN;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.comparator;
21+
22+
import org.apache.tsfile.read.common.block.TsBlock;
23+
24+
import java.util.Optional;
25+
26+
// This Comparator is used to handle the case where the join condition is l<=r, making its interface
27+
// consistent with the case that l<r, such unity helps to avoid branch judgments during the process
28+
// of operator.
29+
public class AscLongTypeIgnoreEqualJoinKeyComparator implements JoinKeyComparator {
30+
31+
private static final AscLongTypeIgnoreEqualJoinKeyComparator INSTANCE =
32+
new AscLongTypeIgnoreEqualJoinKeyComparator();
33+
34+
private AscLongTypeIgnoreEqualJoinKeyComparator() {
35+
// hide constructor
36+
}
37+
38+
public static AscLongTypeIgnoreEqualJoinKeyComparator getInstance() {
39+
return INSTANCE;
40+
}
41+
42+
@Override
43+
public Optional<Boolean> lessThan(
44+
TsBlock left,
45+
int leftColumnIndex,
46+
int leftRowIndex,
47+
TsBlock right,
48+
int rightColumnIndex,
49+
int rightRowIndex) {
50+
if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
51+
|| right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
52+
return Optional.empty();
53+
}
54+
55+
return Optional.of(
56+
left.getColumn(leftColumnIndex).getLong(leftRowIndex)
57+
<= right.getColumn(rightColumnIndex).getLong(rightRowIndex));
58+
}
59+
60+
@Override
61+
public Optional<Boolean> equalsTo(
62+
TsBlock left,
63+
int leftColumnIndex,
64+
int leftRowIndex,
65+
TsBlock right,
66+
int rightColumnIndex,
67+
int rightRowIndex) {
68+
if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
69+
|| right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
70+
return Optional.empty();
71+
}
72+
73+
return Optional.of(
74+
left.getColumn(leftColumnIndex).getLong(leftRowIndex)
75+
== right.getColumn(rightColumnIndex).getLong(rightRowIndex));
76+
}
77+
78+
@Override
79+
public Optional<Boolean> lessThanOrEqual(
80+
TsBlock left,
81+
int leftColumnIndex,
82+
int leftRowIndex,
83+
TsBlock right,
84+
int rightColumnIndex,
85+
int rightRowIndex) {
86+
if (left.getColumn(leftColumnIndex).isNull(leftRowIndex)
87+
|| right.getColumn(rightColumnIndex).isNull(rightRowIndex)) {
88+
return Optional.empty();
89+
}
90+
91+
return Optional.of(
92+
left.getColumn(leftColumnIndex).getLong(leftRowIndex)
93+
< right.getColumn(rightColumnIndex).getLong(rightRowIndex));
94+
}
95+
}

0 commit comments

Comments
 (0)