Skip to content

Commit d39e9a3

Browse files
CaideyipiVGalaxiesluoluoyuyu
authored
Pipe IT: always throw exception with failure when executing non-queries & Fixed some semantic errors of IT (apache#16331)
* always throw ex * fix-IT * fix-n * fix-source * fix-error * fix * fix * fix * fix * fix * refactor * fix-pom * always flush * fix * fix * fix * revert * fix-semantic * revert * try * remove insert result * refactor * Refactor-2 * fix * Comp fix * revert * add log * fix * try fix * fixup! try fix * try skip sub test * store any thrown exception * try fix * fix * fix * spotless * fix * fix * fix * ignore testHistoryAndRealtime * update --------- Co-authored-by: VGalaxies <[email protected]> Co-authored-by: luoluoyuyu <[email protected]>
1 parent c49c068 commit d39e9a3

File tree

53 files changed

+1004
-1874
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+1004
-1874
lines changed

integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java

Lines changed: 77 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -939,19 +939,18 @@ public static void executeNonQueriesWithRetry(
939939
}
940940
}
941941

942-
public static boolean tryExecuteNonQueryWithRetry(
943-
BaseEnv env, String sql, Connection defaultConnection) {
944-
return tryExecuteNonQueryWithRetry(
942+
public static void executeNonQuery(BaseEnv env, String sql, Connection defaultConnection) {
943+
executeNonQuery(
945944
env, sql, SessionConfig.DEFAULT_USER, SessionConfig.DEFAULT_PASSWORD, defaultConnection);
946945
}
947946

948-
public static boolean tryExecuteNonQueryWithRetry(
947+
public static void executeNonQuery(
949948
String dataBaseName,
950949
String sqlDialect,
951950
BaseEnv env,
952951
String sql,
953952
Connection defaultConnection) {
954-
return tryExecuteNonQueryWithRetry(
953+
executeNonQuery(
955954
env,
956955
sql,
957956
SessionConfig.DEFAULT_USER,
@@ -961,21 +960,20 @@ public static boolean tryExecuteNonQueryWithRetry(
961960
defaultConnection);
962961
}
963962

964-
public static boolean tryExecuteNonQueryWithRetry(
963+
public static void executeNonQuery(
965964
BaseEnv env, String sql, String userName, String password, Connection defaultConnection) {
966-
return tryExecuteNonQueriesWithRetry(
967-
env, Collections.singletonList(sql), userName, password, defaultConnection);
965+
executeNonQueries(env, Collections.singletonList(sql), userName, password, defaultConnection);
968966
}
969967

970-
public static boolean tryExecuteNonQueryWithRetry(
968+
public static void executeNonQuery(
971969
BaseEnv env,
972970
String sql,
973971
String userName,
974972
String password,
975973
String dataBaseName,
976974
String sqlDialect,
977975
Connection defaultConnection) {
978-
return tryExecuteNonQueriesWithRetry(
976+
executeNonQueries(
979977
env,
980978
Collections.singletonList(sql),
981979
userName,
@@ -985,9 +983,9 @@ public static boolean tryExecuteNonQueryWithRetry(
985983
defaultConnection);
986984
}
987985

988-
public static boolean tryExecuteNonQueriesWithRetry(
986+
public static void executeNonQueries(
989987
BaseEnv env, List<String> sqlList, Connection defaultConnection) {
990-
return tryExecuteNonQueriesWithRetry(
988+
executeNonQueries(
991989
env,
992990
sqlList,
993991
SessionConfig.DEFAULT_USER,
@@ -997,13 +995,13 @@ public static boolean tryExecuteNonQueriesWithRetry(
997995
defaultConnection);
998996
}
999997

1000-
public static boolean tryExecuteNonQueriesWithRetry(
998+
public static void executeNonQueries(
1001999
String dataBase,
10021000
String sqlDialect,
10031001
BaseEnv env,
10041002
List<String> sqlList,
10051003
Connection defaultConnection) {
1006-
return tryExecuteNonQueriesWithRetry(
1004+
executeNonQueries(
10071005
env,
10081006
sqlList,
10091007
SessionConfig.DEFAULT_USER,
@@ -1015,6 +1013,71 @@ public static boolean tryExecuteNonQueriesWithRetry(
10151013

10161014
// This method will not throw failure given that a failure is encountered.
10171015
// Instead, it returns a flag to indicate the result of the execution.
1016+
public static void executeNonQueries(
1017+
BaseEnv env,
1018+
List<String> sqlList,
1019+
String userName,
1020+
String password,
1021+
Connection defaultConnection) {
1022+
executeNonQueries(env, sqlList, userName, password, null, TREE_SQL_DIALECT, defaultConnection);
1023+
}
1024+
1025+
public static void executeNonQueries(
1026+
BaseEnv env,
1027+
List<String> sqlList,
1028+
String userName,
1029+
String password,
1030+
String dataBase,
1031+
String sqlDialect,
1032+
Connection defaultConnection) {
1033+
int lastIndex = 0;
1034+
Connection localConnection = null;
1035+
Connection connectionToUse = defaultConnection;
1036+
Statement statement;
1037+
try {
1038+
// create a new connection if default is not provided or the previous is broken
1039+
if (connectionToUse == null) {
1040+
localConnection =
1041+
env.getConnection(
1042+
userName,
1043+
password,
1044+
BaseEnv.TABLE_SQL_DIALECT.equals(sqlDialect)
1045+
? BaseEnv.TABLE_SQL_DIALECT
1046+
: TREE_SQL_DIALECT);
1047+
connectionToUse = localConnection;
1048+
}
1049+
statement = connectionToUse.createStatement();
1050+
if (BaseEnv.TABLE_SQL_DIALECT.equals(sqlDialect) && dataBase != null) {
1051+
statement.execute("use " + dataBase);
1052+
}
1053+
for (int i = lastIndex; i < sqlList.size(); ++i) {
1054+
statement.execute(sqlList.get(i));
1055+
}
1056+
} catch (SQLException e) {
1057+
// the default connection should be closed by the upper level
1058+
// while the local connection should be closed here
1059+
if (connectionToUse == localConnection && localConnection != null) {
1060+
try {
1061+
localConnection.close();
1062+
} catch (SQLException ex) {
1063+
// ignore
1064+
}
1065+
}
1066+
throw new RuntimeException(e);
1067+
}
1068+
}
1069+
1070+
public static boolean tryExecuteNonQuery(BaseEnv env, String sql, Connection defaultConnection) {
1071+
return tryExecuteNonQuery(
1072+
env, sql, SessionConfig.DEFAULT_USER, SessionConfig.DEFAULT_PASSWORD, defaultConnection);
1073+
}
1074+
1075+
public static boolean tryExecuteNonQuery(
1076+
BaseEnv env, String sql, String userName, String password, Connection defaultConnection) {
1077+
return tryExecuteNonQueriesWithRetry(
1078+
env, Collections.singletonList(sql), userName, password, defaultConnection);
1079+
}
1080+
10181081
public static boolean tryExecuteNonQueriesWithRetry(
10191082
BaseEnv env,
10201083
List<String> sqlList,

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/TableModelUtils.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public static void createDataBaseAndTable(
7575
statement.execute("use " + database);
7676
statement.execute(
7777
String.format(
78-
"CREATE TABLE %s(s0 string tag, s1 string tag, s2 string tag, s3 string tag,s4 int64 field, s5 float field, s6 string field, s7 timestamp field, s8 int32 field, s9 double field, s10 date field, s11 text field )",
78+
"CREATE TABLE IF NOT EXISTS %s(s0 string tag, s1 string tag, s2 string tag, s3 string tag,s4 int64 field, s5 float field, s6 string field, s7 timestamp field, s8 int32 field, s9 double field, s10 date field, s11 text field )",
7979
table));
8080
} catch (Exception e) {
8181
fail(e.getMessage());
@@ -98,7 +98,7 @@ public static void createDatabase(final BaseEnv baseEnv, final String database,
9898
}
9999
}
100100

101-
public static boolean insertData(
101+
public static void insertData(
102102
final String dataBaseName,
103103
final String tableName,
104104
final int startInclusive,
@@ -112,11 +112,10 @@ public static boolean insertData(
112112
tableName, i, i, i, i, i, i, i, i, i, i, getDateStr(i), i, i));
113113
}
114114
list.add("flush");
115-
return TestUtils.tryExecuteNonQueriesWithRetry(
116-
dataBaseName, BaseEnv.TABLE_SQL_DIALECT, baseEnv, list, null);
115+
TestUtils.executeNonQueries(dataBaseName, BaseEnv.TABLE_SQL_DIALECT, baseEnv, list, null);
117116
}
118117

119-
public static boolean insertData(
118+
public static void insertData(
120119
final String dataBaseName,
121120
final String tableName,
122121
final int start,
@@ -163,8 +162,7 @@ public static boolean insertData(
163162
values[11],
164163
i));
165164
}
166-
return TestUtils.tryExecuteNonQueriesWithRetry(
167-
dataBaseName, BaseEnv.TABLE_SQL_DIALECT, baseEnv, list, null);
165+
TestUtils.executeNonQueries(dataBaseName, BaseEnv.TABLE_SQL_DIALECT, baseEnv, list, null);
168166
}
169167

170168
public static boolean insertDataNotThrowError(
@@ -180,8 +178,12 @@ public static boolean insertDataNotThrowError(
180178
"insert into %s (s0, s3, s2, s1, s4, s5, s6, s7, s8, s9, s10, s11, time) values ('t%s','t%s','t%s','t%s','%s', %s.0, %s, %s, %d, %d.0, '%s', '%s', %s)",
181179
tableName, i, i, i, i, i, i, i, i, i, i, getDateStr(i), i, i));
182180
}
183-
return TestUtils.tryExecuteNonQueriesWithRetry(
184-
dataBaseName, BaseEnv.TABLE_SQL_DIALECT, baseEnv, list, null);
181+
try {
182+
TestUtils.executeNonQueries(dataBaseName, BaseEnv.TABLE_SQL_DIALECT, baseEnv, list, null);
183+
return true;
184+
} catch (final Throwable e) {
185+
return false;
186+
}
185187
}
186188

187189
public static boolean insertData(
@@ -246,10 +248,7 @@ public static void deleteData(
246248
List<String> list = new ArrayList<>(end - start + 1);
247249
list.add(
248250
String.format("delete from %s where time >= %s and time <= %s", tableName, start, end));
249-
if (!TestUtils.tryExecuteNonQueriesWithRetry(
250-
dataBaseName, BaseEnv.TABLE_SQL_DIALECT, baseEnv, list, null)) {
251-
fail();
252-
}
251+
TestUtils.executeNonQueries(dataBaseName, BaseEnv.TABLE_SQL_DIALECT, baseEnv, list, null);
253252
}
254253

255254
// s0 string, s1 string, s2 string, s3 string, s4 int64, s5 float, s6 string s7 timestamp, s8

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeAlterIT.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,6 @@ public void testAlterPipeFailure() {
456456
@Test
457457
public void testAlterPipeSourceAndSink() {
458458
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
459-
boolean insertResult = true;
460459

461460
final Consumer<String> handleFailure =
462461
o -> {
@@ -478,11 +477,8 @@ public void testAlterPipeSourceAndSink() {
478477
fail(e.getMessage());
479478
}
480479

481-
insertResult = TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
482-
insertResult = insertResult && TableModelUtils.insertData("test1", "test1", 0, 100, senderEnv);
483-
if (!insertResult) {
484-
return;
485-
}
480+
TableModelUtils.insertData("test", "test", 0, 100, senderEnv);
481+
TableModelUtils.insertData("test1", "test1", 0, 100, senderEnv);
486482

487483
// Check data on receiver
488484
TableModelUtils.assertData("test", "test", 0, 100, receiverEnv, handleFailure);
@@ -496,12 +492,10 @@ public void testAlterPipeSourceAndSink() {
496492
} catch (final SQLException e) {
497493
fail(e.getMessage());
498494
}
499-
insertResult = TableModelUtils.insertData("test", "test", 100, 200, senderEnv);
500-
insertResult =
501-
insertResult && TableModelUtils.insertData("test1", "test1", 100, 200, senderEnv);
502-
if (!insertResult) {
503-
return;
504-
}
495+
TableModelUtils.insertData("test", "test", 100, 200, senderEnv);
496+
497+
TableModelUtils.insertData("test1", "test1", 100, 200, senderEnv);
498+
505499
TestUtils.assertDataEventuallyOnEnv(
506500
receiverEnv,
507501
TableModelUtils.getQuerySql("test"),

0 commit comments

Comments
 (0)