Skip to content

Commit 86a3377

Browse files
authored
Pipe IT: Add table model related IT (apache#14086)
1 parent c6fa44d commit 86a3377

24 files changed

+7048
-8
lines changed

.github/workflows/pipe-it-2cluster.yml

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,3 +234,40 @@ jobs:
234234
name: cluster-log-subscription-regression-misc-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }}
235235
path: integration-test/target/cluster-logs
236236
retention-days: 30
237+
table-model:
238+
strategy:
239+
fail-fast: false
240+
max-parallel: 15
241+
matrix:
242+
java: [17]
243+
# StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet.
244+
cluster: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode]
245+
os: [ ubuntu-latest ]
246+
runs-on: ${{ matrix.os }}
247+
steps:
248+
- uses: actions/checkout@v4
249+
- name: Set up JDK ${{ matrix.java }}
250+
uses: actions/setup-java@v4
251+
with:
252+
distribution: liberica
253+
java-version: ${{ matrix.java }}
254+
- name: IT Test
255+
shell: bash
256+
# we do not compile client-cpp for saving time, it is tested in client.yml
257+
# we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml
258+
run: |
259+
mvn clean verify \
260+
-P with-integration-tests \
261+
-DskipUTs \
262+
-DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
263+
-DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \
264+
-pl integration-test \
265+
-am -PMultiClusterIT2TableModel \
266+
-ntp
267+
- name: Upload Artifact
268+
if: failure()
269+
uses: actions/upload-artifact@v4
270+
with:
271+
name: cluster-log-table-model-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }}
272+
path: integration-test/target/cluster-logs
273+
retention-days: 30

integration-test/pom.xml

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@
415415
</activation>
416416
<properties>
417417
<integrationTest.excludedGroups/>
418-
<integrationTest.includedGroups>org.apache.iotdb.itbase.category.MultiClusterIT1,org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema,org.apache.iotdb.itbase.category.MultiClusterIT2ManualCreateSchema,org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionArchVerification,org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionRegressionConsumer,org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionRegressionMisc,org.apache.iotdb.itbase.category.MultiClusterIT3</integrationTest.includedGroups>
418+
<integrationTest.includedGroups>org.apache.iotdb.itbase.category.MultiClusterIT1,org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema,org.apache.iotdb.itbase.category.MultiClusterIT2ManualCreateSchema,org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionArchVerification,org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionRegressionConsumer,org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionRegressionMisc,org.apache.iotdb.itbase.category.MultiClusterIT3,org.apache.iotdb.itbase.category.MultiClusterIT2TableModel</integrationTest.includedGroups>
419419
<integrationTest.launchNodeInSameJVM>false</integrationTest.launchNodeInSameJVM>
420420
<integrationTest.randomSelectWriteNode>true</integrationTest.randomSelectWriteNode>
421421
<integrationTest.readAndVerifyWithMultiNode>true</integrationTest.readAndVerifyWithMultiNode>
@@ -464,6 +464,20 @@
464464
<integrationTest.testEnv>MultiCluster</integrationTest.testEnv>
465465
</properties>
466466
</profile>
467+
<profile>
468+
<id>MultiClusterIT2TableModel</id>
469+
<activation>
470+
<activeByDefault>false</activeByDefault>
471+
</activation>
472+
<properties>
473+
<integrationTest.excludedGroups/>
474+
<integrationTest.includedGroups>org.apache.iotdb.itbase.category.MultiClusterIT2TableModel</integrationTest.includedGroups>
475+
<integrationTest.launchNodeInSameJVM>false</integrationTest.launchNodeInSameJVM>
476+
<integrationTest.randomSelectWriteNode>true</integrationTest.randomSelectWriteNode>
477+
<integrationTest.readAndVerifyWithMultiNode>true</integrationTest.readAndVerifyWithMultiNode>
478+
<integrationTest.testEnv>MultiCluster</integrationTest.testEnv>
479+
</properties>
480+
</profile>
467481
<profile>
468482
<id>MultiClusterIT2SubscriptionArchVerification</id>
469483
<activation>

integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -436,11 +436,18 @@ public Connection getConnection(
436436

437437
@Override
438438
public Connection getWriteOnlyConnectionWithSpecifiedDataNode(
439-
final DataNodeWrapper dataNode, final String username, final String password)
439+
final DataNodeWrapper dataNode,
440+
final String username,
441+
final String password,
442+
String sqlDialect)
440443
throws SQLException {
441444
return new ClusterTestConnection(
442445
getWriteConnectionWithSpecifiedDataNode(
443-
dataNode, null, username, password, TREE_SQL_DIALECT),
446+
dataNode,
447+
null,
448+
username,
449+
password,
450+
TABLE_SQL_DIALECT.equals(sqlDialect) ? TABLE_SQL_DIALECT : TREE_SQL_DIALECT),
444451
Collections.emptyList());
445452
}
446453

integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public Connection getConnection(String username, String password, String sqlDial
137137

138138
@Override
139139
public Connection getWriteOnlyConnectionWithSpecifiedDataNode(
140-
DataNodeWrapper dataNode, String username, String password) {
140+
DataNodeWrapper dataNode, String username, String password, String sqlDialect) {
141141
throw new UnsupportedOperationException();
142142
}
143143

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.itbase.category;
21+
22+
public class MultiClusterIT2TableModel {}

integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,12 +147,19 @@ default Connection getConnection(String username, String password) throws SQLExc
147147
default Connection getWriteOnlyConnectionWithSpecifiedDataNode(DataNodeWrapper dataNode)
148148
throws SQLException {
149149
return getWriteOnlyConnectionWithSpecifiedDataNode(
150-
dataNode, SessionConfig.DEFAULT_USER, SessionConfig.DEFAULT_PASSWORD);
150+
dataNode, SessionConfig.DEFAULT_USER, SessionConfig.DEFAULT_PASSWORD, TREE_SQL_DIALECT);
151+
}
152+
153+
default Connection getWriteOnlyConnectionWithSpecifiedDataNode(
154+
DataNodeWrapper dataNode, String sqlDialect) throws SQLException {
155+
return getWriteOnlyConnectionWithSpecifiedDataNode(
156+
dataNode, SessionConfig.DEFAULT_USER, SessionConfig.DEFAULT_PASSWORD, sqlDialect);
151157
}
152158

153159
// This is useful when you shut down a dataNode.
154160
Connection getWriteOnlyConnectionWithSpecifiedDataNode(
155-
DataNodeWrapper dataNode, String username, String password) throws SQLException;
161+
DataNodeWrapper dataNode, String username, String password, String sqlDialect)
162+
throws SQLException;
156163

157164
default Connection getConnectionWithSpecifiedDataNode(DataNodeWrapper dataNode)
158165
throws SQLException {

integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java

Lines changed: 139 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -638,24 +638,82 @@ public static boolean tryExecuteNonQueryWithRetry(BaseEnv env, String sql) {
638638
env, sql, SessionConfig.DEFAULT_USER, SessionConfig.DEFAULT_PASSWORD);
639639
}
640640

641+
public static boolean tryExecuteNonQueryWithRetry(
642+
String dataBaseName, String sqlDialect, BaseEnv env, String sql) {
643+
return tryExecuteNonQueryWithRetry(
644+
env,
645+
sql,
646+
SessionConfig.DEFAULT_USER,
647+
SessionConfig.DEFAULT_PASSWORD,
648+
dataBaseName,
649+
sqlDialect);
650+
}
651+
641652
public static boolean tryExecuteNonQueryWithRetry(
642653
BaseEnv env, String sql, String userName, String password) {
643654
return tryExecuteNonQueriesWithRetry(env, Collections.singletonList(sql), userName, password);
644655
}
645656

657+
public static boolean tryExecuteNonQueryWithRetry(
658+
BaseEnv env,
659+
String sql,
660+
String userName,
661+
String password,
662+
String dataBaseName,
663+
String sqlDialect) {
664+
return tryExecuteNonQueriesWithRetry(
665+
env, Collections.singletonList(sql), userName, password, dataBaseName, sqlDialect);
666+
}
667+
646668
public static boolean tryExecuteNonQueriesWithRetry(BaseEnv env, List<String> sqlList) {
647669
return tryExecuteNonQueriesWithRetry(
648-
env, sqlList, SessionConfig.DEFAULT_USER, SessionConfig.DEFAULT_PASSWORD);
670+
env,
671+
sqlList,
672+
SessionConfig.DEFAULT_USER,
673+
SessionConfig.DEFAULT_PASSWORD,
674+
null,
675+
BaseEnv.TREE_SQL_DIALECT);
676+
}
677+
678+
public static boolean tryExecuteNonQueriesWithRetry(
679+
String dataBase, String sqlDialect, BaseEnv env, List<String> sqlList) {
680+
return tryExecuteNonQueriesWithRetry(
681+
env,
682+
sqlList,
683+
SessionConfig.DEFAULT_USER,
684+
SessionConfig.DEFAULT_PASSWORD,
685+
dataBase,
686+
sqlDialect);
649687
}
650688

651689
// This method will not throw failure given that a failure is encountered.
652690
// Instead, it returns a flag to indicate the result of the execution.
653691
public static boolean tryExecuteNonQueriesWithRetry(
654692
BaseEnv env, List<String> sqlList, String userName, String password) {
693+
return tryExecuteNonQueriesWithRetry(
694+
env, sqlList, userName, password, null, BaseEnv.TREE_SQL_DIALECT);
695+
}
696+
697+
public static boolean tryExecuteNonQueriesWithRetry(
698+
BaseEnv env,
699+
List<String> sqlList,
700+
String userName,
701+
String password,
702+
String dataBase,
703+
String sqlDialect) {
655704
int lastIndex = 0;
656705
for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
657-
try (Connection connection = env.getConnection(userName, password);
706+
try (Connection connection =
707+
env.getConnection(
708+
userName,
709+
password,
710+
BaseEnv.TABLE_SQL_DIALECT.equals(sqlDialect)
711+
? BaseEnv.TABLE_SQL_DIALECT
712+
: BaseEnv.TREE_SQL_DIALECT);
658713
Statement statement = connection.createStatement()) {
714+
if (BaseEnv.TABLE_SQL_DIALECT.equals(sqlDialect) && dataBase != null) {
715+
statement.execute("use " + dataBase);
716+
}
659717
for (int i = lastIndex; i < sqlList.size(); ++i) {
660718
lastIndex = i;
661719
statement.execute(sqlList.get(i));
@@ -731,6 +789,42 @@ public static boolean tryExecuteNonQueriesOnSpecifiedDataNodeWithRetry(
731789
return false;
732790
}
733791

792+
public static boolean tryExecuteNonQueriesOnSpecifiedDataNodeWithRetry(
793+
BaseEnv env,
794+
DataNodeWrapper wrapper,
795+
List<String> sqlList,
796+
String dataBase,
797+
String sqlDialect) {
798+
int lastIndex = 0;
799+
for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {
800+
try (Connection connection =
801+
env.getWriteOnlyConnectionWithSpecifiedDataNode(wrapper, sqlDialect);
802+
Statement statement = connection.createStatement()) {
803+
804+
if (BaseEnv.TABLE_SQL_DIALECT.equals(sqlDialect) && dataBase != null) {
805+
statement.execute("use " + dataBase);
806+
}
807+
808+
for (int i = lastIndex; i < sqlList.size(); ++i) {
809+
statement.execute(sqlList.get(i));
810+
lastIndex = i;
811+
}
812+
return true;
813+
} catch (SQLException e) {
814+
if (retryCountLeft > 0) {
815+
try {
816+
Thread.sleep(10000);
817+
} catch (InterruptedException ignored) {
818+
}
819+
} else {
820+
e.printStackTrace();
821+
return false;
822+
}
823+
}
824+
}
825+
return false;
826+
}
827+
734828
public static void executeQuery(String sql) {
735829
executeQuery(sql, SessionConfig.DEFAULT_USER, SessionConfig.DEFAULT_PASSWORD);
736830
}
@@ -909,6 +1003,49 @@ public static void assertDataEventuallyOnEnv(
9091003
}
9101004
}
9111005

1006+
public static void assertDataEventuallyOnEnv(
1007+
BaseEnv env,
1008+
String sql,
1009+
String expectedHeader,
1010+
Set<String> expectedResSet,
1011+
String dataBaseName) {
1012+
assertDataEventuallyOnEnv(env, sql, expectedHeader, expectedResSet, 300, dataBaseName);
1013+
}
1014+
1015+
public static void assertDataEventuallyOnEnv(
1016+
BaseEnv env,
1017+
String sql,
1018+
String expectedHeader,
1019+
Set<String> expectedResSet,
1020+
long timeoutSeconds,
1021+
String dataBaseName) {
1022+
try (Connection connection = env.getConnection(BaseEnv.TABLE_SQL_DIALECT);
1023+
Statement statement = connection.createStatement()) {
1024+
// Keep retrying if there are execution failures
1025+
await()
1026+
.pollInSameThread()
1027+
.pollDelay(1L, TimeUnit.SECONDS)
1028+
.pollInterval(1L, TimeUnit.SECONDS)
1029+
.atMost(timeoutSeconds, TimeUnit.SECONDS)
1030+
.untilAsserted(
1031+
() -> {
1032+
try {
1033+
if (dataBaseName != null) {
1034+
statement.execute("use " + dataBaseName);
1035+
}
1036+
if (sql != null && !sql.equals("")) {
1037+
TestUtils.assertResultSetEqual(
1038+
executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet);
1039+
}
1040+
} catch (Exception e) {
1041+
Assert.fail();
1042+
}
1043+
});
1044+
} catch (Exception e) {
1045+
fail(e.getMessage());
1046+
}
1047+
}
1048+
9121049
public static void assertDataEventuallyOnEnv(
9131050
BaseEnv env, String sql, Map<String, String> expectedHeaderWithResult) {
9141051
assertDataEventuallyOnEnv(env, sql, expectedHeaderWithResult, 600);
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.pipe.it.tablemodel;
21+
22+
import org.apache.iotdb.consensus.ConsensusFactory;
23+
import org.apache.iotdb.it.env.MultiEnvFactory;
24+
import org.apache.iotdb.itbase.env.BaseEnv;
25+
26+
import org.junit.After;
27+
import org.junit.Before;
28+
29+
abstract class AbstractPipeTableModelTestIT {
30+
31+
protected BaseEnv senderEnv;
32+
protected BaseEnv receiverEnv;
33+
34+
@Before
35+
public void setUp() {
36+
MultiEnvFactory.createEnv(2);
37+
senderEnv = MultiEnvFactory.getEnv(0);
38+
receiverEnv = MultiEnvFactory.getEnv(1);
39+
setupConfig();
40+
senderEnv.initClusterEnvironment();
41+
receiverEnv.initClusterEnvironment();
42+
}
43+
44+
protected void setupConfig() {
45+
// TODO: delete ratis configurations
46+
senderEnv
47+
.getConfig()
48+
.getCommonConfig()
49+
.setAutoCreateSchemaEnabled(true)
50+
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
51+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
52+
receiverEnv
53+
.getConfig()
54+
.getCommonConfig()
55+
.setAutoCreateSchemaEnabled(true)
56+
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
57+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);
58+
59+
// 10 min, assert that the operations will not time out
60+
senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
61+
receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
62+
}
63+
64+
@After
65+
public final void tearDown() {
66+
senderEnv.cleanClusterEnvironment();
67+
receiverEnv.cleanClusterEnvironment();
68+
}
69+
}

0 commit comments

Comments
 (0)