Skip to content

Commit a719e49

Browse files
authored
[To dev/1.3] Pipe IT: always throw exception with failure when executing non-queries & Fixed some semantic errors of IT (#16331) (#16771)
* setup * spotless * cont * try fix * reset * reset * remove TODO
1 parent 460a570 commit a719e49

30 files changed

+636
-845
lines changed

integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,60 @@ public static boolean tryExecuteNonQueriesWithRetry(
491491
return false;
492492
}
493493

494+
public static void executeNonQuery(BaseEnv env, String sql, Connection defaultConnection) {
495+
executeNonQuery(
496+
env, sql, SessionConfig.DEFAULT_USER, SessionConfig.DEFAULT_PASSWORD, defaultConnection);
497+
}
498+
499+
public static void executeNonQuery(
500+
BaseEnv env, String sql, String userName, String password, Connection defaultConnection) {
501+
executeNonQueries(env, Collections.singletonList(sql), userName, password, defaultConnection);
502+
}
503+
504+
public static void executeNonQueries(
505+
BaseEnv env, List<String> sqlList, Connection defaultConnection) {
506+
executeNonQueries(
507+
env,
508+
sqlList,
509+
SessionConfig.DEFAULT_USER,
510+
SessionConfig.DEFAULT_PASSWORD,
511+
defaultConnection);
512+
}
513+
514+
public static void executeNonQueries(
515+
BaseEnv env,
516+
List<String> sqlList,
517+
String userName,
518+
String password,
519+
Connection defaultConnection) {
520+
int lastIndex = 0;
521+
Connection localConnection = null;
522+
Connection connectionToUse = defaultConnection;
523+
Statement statement;
524+
try {
525+
// create a new connection if default is not provided or the previous is broken
526+
if (connectionToUse == null) {
527+
localConnection = env.getConnection(userName, password);
528+
connectionToUse = localConnection;
529+
}
530+
statement = connectionToUse.createStatement();
531+
for (int i = lastIndex; i < sqlList.size(); ++i) {
532+
statement.execute(sqlList.get(i));
533+
}
534+
} catch (SQLException e) {
535+
// the default connection should be closed by the upper level
536+
// while the local connection should be closed here
537+
if (connectionToUse == localConnection && localConnection != null) {
538+
try {
539+
localConnection.close();
540+
} catch (SQLException ex) {
541+
// ignore
542+
}
543+
}
544+
throw new RuntimeException(e);
545+
}
546+
}
547+
494548
public static void executeNonQueryOnSpecifiedDataNodeWithRetry(
495549
BaseEnv env, DataNodeWrapper wrapper, String sql) {
496550
for (int retryCountLeft = 10; retryCountLeft >= 0; retryCountLeft--) {

integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -476,13 +476,12 @@ public void testAlterPipeSourceAndProcessor() {
476476
}
477477

478478
// Insert data on sender
479-
if (!TestUtils.tryExecuteNonQueriesWithRetry(
479+
TestUtils.executeNonQueries(
480480
senderEnv,
481481
Arrays.asList(
482-
"insert into root.db.d1 (time, at1) values (1000, 1), (1500, 2), (2000, 3), (2500, 4), (3000, 5)",
483-
"flush"))) {
484-
fail();
485-
}
482+
"insert into root.db.d1 (time,at1) values (1000,1),(1500,2),(2000,3),(2500,4),(3000,5)",
483+
"flush"),
484+
null);
486485

487486
// Check data on receiver
488487
final Set<String> expectedResSet = new HashSet<>();
@@ -503,22 +502,20 @@ public void testAlterPipeSourceAndProcessor() {
503502
}
504503

505504
// Insert data on sender
506-
if (!TestUtils.tryExecuteNonQueriesWithRetry(
505+
TestUtils.executeNonQueries(
507506
senderEnv,
508507
Arrays.asList(
509-
"insert into root.db.d2 (time, at1) values (11000, 1), (11500, 2), (12000, 3), (12500, 4), (13000, 5)",
510-
"flush"))) {
511-
fail();
512-
}
508+
"insert into root.db.d2 (time,at1) values (11000,1),(11500,2),(12000,3),(12500,4),(13000,5)",
509+
"flush"),
510+
null);
513511

514512
// Insert data on sender
515-
if (!TestUtils.tryExecuteNonQueriesWithRetry(
513+
TestUtils.executeNonQueries(
516514
senderEnv,
517515
Arrays.asList(
518-
"insert into root.db.d1 (time, at1) values (11000, 1), (11500, 2), (12000, 3), (12500, 4), (13000, 5)",
519-
"flush"))) {
520-
fail();
521-
}
516+
"insert into root.db.d1 (time,at1) values (11000,1),(11500,2),(12000,3),(12500,4),(13000,5)",
517+
"flush"),
518+
null);
522519

523520
// Check data on receiver
524521
expectedResSet.clear();
@@ -531,10 +528,7 @@ public void testAlterPipeSourceAndProcessor() {
531528
expectedResSet);
532529

533530
// Create database on sender
534-
if (!TestUtils.tryExecuteNonQueryWithRetry(
535-
senderEnv, "create timeSeries root.db.d2.at2 int32")) {
536-
fail();
537-
}
531+
TestUtils.executeNonQuery(senderEnv, "create timeSeries root.db.d2.at2 int32", null);
538532

539533
// Check database on receiver
540534
TestUtils.assertDataEventuallyOnEnv(

integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoConflictIT.java

Lines changed: 37 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -91,14 +91,10 @@ public void testDoubleLivingAutoConflict() throws Exception {
9191
final int receiverPort = receiverDataNode.getPort();
9292

9393
for (int i = 0; i < 100; ++i) {
94-
if (!TestUtils.tryExecuteNonQueryWithRetry(
95-
senderEnv, String.format("insert into root.db.d1(time, s1) values (%s, 1)", i))) {
96-
return;
97-
}
98-
}
99-
if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
100-
return;
94+
TestUtils.executeNonQuery(
95+
senderEnv, String.format("insert into root.db.d1(time,s1) values (%s,1)", i), null);
10196
}
97+
TestUtils.executeNonQuery(senderEnv, "flush", null);
10298

10399
try (final SyncConfigNodeIServiceClient client =
104100
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
@@ -127,24 +123,16 @@ public void testDoubleLivingAutoConflict() throws Exception {
127123
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
128124
}
129125
for (int i = 100; i < 200; ++i) {
130-
if (!TestUtils.tryExecuteNonQueryWithRetry(
131-
senderEnv, String.format("insert into root.db.d1(time, s1) values (%s, 1)", i))) {
132-
return;
133-
}
134-
}
135-
if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
136-
return;
126+
TestUtils.executeNonQuery(
127+
senderEnv, String.format("insert into root.db.d1(time,s1) values (%s,1)", i), null);
137128
}
129+
TestUtils.executeNonQuery(senderEnv, "flush", null);
138130

139131
for (int i = 200; i < 300; ++i) {
140-
if (!TestUtils.tryExecuteNonQueryWithRetry(
141-
receiverEnv, String.format("insert into root.db.d1(time, s1) values (%s, 1)", i))) {
142-
return;
143-
}
144-
}
145-
if (!TestUtils.tryExecuteNonQueryWithRetry(receiverEnv, "flush")) {
146-
return;
132+
TestUtils.executeNonQuery(
133+
receiverEnv, String.format("insert into root.db.d1(time,s1) values (%s,1)", i), null);
147134
}
135+
TestUtils.executeNonQuery(receiverEnv, "flush", null);
148136

149137
try (final SyncConfigNodeIServiceClient client =
150138
(SyncConfigNodeIServiceClient) receiverEnv.getLeaderConfigNodeConnection()) {
@@ -173,14 +161,10 @@ public void testDoubleLivingAutoConflict() throws Exception {
173161
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
174162
}
175163
for (int i = 300; i < 400; ++i) {
176-
if (!TestUtils.tryExecuteNonQueryWithRetry(
177-
receiverEnv, String.format("insert into root.db.d1(time, s1) values (%s, 1)", i))) {
178-
return;
179-
}
180-
}
181-
if (!TestUtils.tryExecuteNonQueryWithRetry(receiverEnv, "flush")) {
182-
return;
164+
TestUtils.executeNonQuery(
165+
receiverEnv, String.format("insert into root.db.d1(time,s1) values (%s,1)", i), null);
183166
}
167+
TestUtils.executeNonQuery(receiverEnv, "flush", null);
184168

185169
final Set<String> expectedResSet = new HashSet<>();
186170
for (int i = 0; i < 400; ++i) {
@@ -201,23 +185,15 @@ public void testDoubleLivingAutoConflict() throws Exception {
201185
}
202186

203187
for (int i = 400; i < 500; ++i) {
204-
if (!TestUtils.tryExecuteNonQueryWithRetry(
205-
senderEnv, String.format("insert into root.db.d1(time, s1) values (%s, 1)", i))) {
206-
return;
207-
}
208-
}
209-
if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
210-
return;
188+
TestUtils.executeNonQuery(
189+
senderEnv, String.format("insert into root.db.d1(time,s1) values (%s,1)", i), null);
211190
}
191+
TestUtils.executeNonQuery(senderEnv, "flush", null);
212192
for (int i = 500; i < 600; ++i) {
213-
if (!TestUtils.tryExecuteNonQueryWithRetry(
214-
receiverEnv, String.format("insert into root.db.d1(time, s1) values (%s, 1)", i))) {
215-
return;
216-
}
217-
}
218-
if (!TestUtils.tryExecuteNonQueryWithRetry(receiverEnv, "flush")) {
219-
return;
193+
TestUtils.executeNonQuery(
194+
receiverEnv, String.format("insert into root.db.d1(time,s1) values (%s,1)", i), null);
220195
}
196+
TestUtils.executeNonQuery(receiverEnv, "flush", null);
221197

222198
for (int i = 400; i < 600; ++i) {
223199
expectedResSet.add(i + ",1.0,");
@@ -290,37 +266,28 @@ public void testDoubleLivingAutoConflictTemplate() throws Exception {
290266
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());
291267
}
292268

293-
if (!TestUtils.tryExecuteNonQueriesWithRetry(
269+
// Auto extend s1
270+
TestUtils.executeNonQueries(
294271
senderEnv,
295272
Arrays.asList(
296-
// Auto extend s1
297-
"create schema template t1 (s2 INT64 encoding=RLE, s3 INT64 encoding=RLE compression=SNAPPY)",
273+
"create schema template t1 (s2 INT64 encoding=RLE,s3 INT64 encoding=RLE compression=SNAPPY)",
298274
"create database root.db",
299-
"set device template t1 to root.db"))) {
300-
return;
301-
}
275+
"set device template t1 to root.db"),
276+
null);
302277

303278
for (int i = 0; i < 200; ++i) {
304-
if (!TestUtils.tryExecuteNonQueryWithRetry(
305-
senderEnv, String.format("insert into root.db.d1(time, s1) values (%s, 1)", i))) {
306-
return;
307-
}
279+
TestUtils.executeNonQuery(
280+
senderEnv, String.format("insert into root.db.d1(time,s1) values (%s,1)", i), null);
308281
}
309282

310283
for (int i = 200; i < 400; ++i) {
311-
if (!TestUtils.tryExecuteNonQueryWithRetry(
312-
receiverEnv, String.format("insert into root.db.d1(time, s1) values (%s, 1)", i))) {
313-
return;
314-
}
284+
TestUtils.executeNonQuery(
285+
receiverEnv, String.format("insert into root.db.d1(time,s1) values (%s,1)", i), null);
315286
}
316287

317-
if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "flush")) {
318-
return;
319-
}
288+
TestUtils.executeNonQuery(senderEnv, "flush", null);
320289

321-
if (!TestUtils.tryExecuteNonQueryWithRetry(receiverEnv, "flush")) {
322-
return;
323-
}
290+
TestUtils.executeNonQuery(receiverEnv, "flush", null);
324291

325292
final Set<String> expectedResSet = new HashSet<>();
326293
for (int i = 0; i < 400; ++i) {
@@ -363,16 +330,13 @@ public void testAutoManualCreateRace() throws Exception {
363330
Assert.assertEquals(
364331
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());
365332

366-
if (!TestUtils.tryExecuteNonQueryWithRetry(
367-
receiverEnv, "create timeSeries root.ln.wf01.wt01.status with datatype=BOOLEAN")) {
368-
return;
369-
}
333+
TestUtils.executeNonQuery(
334+
receiverEnv, "create timeSeries root.ln.wf01.wt01.status with datatype=BOOLEAN", null);
370335

371-
if (!TestUtils.tryExecuteNonQueryWithRetry(
336+
TestUtils.executeNonQuery(
372337
senderEnv,
373-
"create timeSeries root.ln.wf01.wt01.status with datatype=BOOLEAN tags (tag3=v3) attributes (attr4=v4)")) {
374-
return;
375-
}
338+
"create timeSeries root.ln.wf01.wt01.status with datatype=BOOLEAN tags (tag3=v3) attributes (attr4=v4)",
339+
null);
376340

377341
TestUtils.assertDataEventuallyOnEnv(
378342
receiverEnv,
@@ -393,7 +357,7 @@ public void testHistoricalActivationRace() throws Exception {
393357
try (final SyncConfigNodeIServiceClient client =
394358
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
395359

396-
if (!TestUtils.tryExecuteNonQueriesWithRetry(
360+
TestUtils.executeNonQueries(
397361
senderEnv,
398362
Arrays.asList(
399363
"create database root.sg_aligned",
@@ -404,9 +368,8 @@ public void testHistoricalActivationRace() throws Exception {
404368
"insert into root.sg_aligned.device_aligned.d10(time, s0, s1, s2,s3,s4,s5) values (1706659200,1706659200,10,20.245,25.24555,true,''),(1706662800,null,1706662800,20.241,25.24111,false,'2'),(1706666400,3,null,20.242,25.24222,true,'3'),(1706670000,4,40,null,35.5474,true,'4'),(1706670600,5,1706670600000,20.246,null,false,'5'),(1706671200,6,60,20.248,25.24888,null,'6'),(1706671800,7,1706671800,20.249,25.24999,false,null),(1706672400,8,80,1245.392,75.51234,false,'8'),(1706672600,9,90,2345.397,2285.58734,false,'9'),(1706673000,10,100,20.241,25.24555,false,'10'),(1706673600,11,110,3345.394,4105.544,false,'11'),(1706674200,12,1706674200,30.245,35.24555,false,'12'),(1706674800,13,130,5.39,125.51234,false,'13'),(1706675400,14,1706675400,5.39,135.51234,false,'14'),(1706676000,15,150,5.39,145.51234,false,'15'),(1706676600,16,160,5.39,155.51234,false,'16'),(1706677200,17,170,5.39,165.51234,false,'17'),(1706677600,18,180,5.39,175.51234,false,'18'),(1706677800,19,190,5.39,185.51234,false,'19'),(1706678000,20,200,5.39,195.51234,false,'20'),(1706678200,21,210,5.39,null,false,'21')",
405369
"insert into root.sg_aligned.device_aligned.d10(time, s0, s1, s2,s3,s4,s5) values (-1,1,10,5.39,5.51234,false,'negative')",
406370
"insert into root.sg_aligned.device_aligned.d11(time, s0, s1, s2,s3,s4,s5) values (-1,-11,-110,-5.39,-5.51234,false,'activate:1')",
407-
"insert into root.sg_aligned.device_aligned.d10(time, s0, s1, s2,s3,s4,s5,s6) values(1706678800,1,1706678800,5.39,5.51234,false,'add:s6',32);"))) {
408-
return;
409-
}
371+
"insert into root.sg_aligned.device_aligned.d10(time, s0, s1, s2,s3,s4,s5,s6) values(1706678800,1,1706678800,5.39,5.51234,false,'add:s6',32);"),
372+
null);
410373

411374
final Map<String, String> extractorAttributes = new HashMap<>();
412375
final Map<String, String> processorAttributes = new HashMap<>();

integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAutoDropIT.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,7 @@ public void testAutoDropInHistoricalTransfer() throws Exception {
5151
try (final SyncConfigNodeIServiceClient client =
5252
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
5353

54-
if (!TestUtils.tryExecuteNonQueryWithRetry(
55-
senderEnv, "insert into root.db.d1(time, s1) values (1, 1)")) {
56-
return;
57-
}
54+
TestUtils.executeNonQuery(senderEnv, "insert into root.db.d1(time,s1) values (1,1)", null);
5855

5956
final Map<String, String> extractorAttributes = new HashMap<>();
6057
final Map<String, String> processorAttributes = new HashMap<>();
@@ -101,11 +98,10 @@ public void testAutoDropInHistoricalTransferWithTimeRange() throws Exception {
10198
try (final SyncConfigNodeIServiceClient client =
10299
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
103100

104-
if (!TestUtils.tryExecuteNonQueryWithRetry(
101+
TestUtils.executeNonQuery(
105102
senderEnv,
106-
"insert into root.db.d1(time, s1) values (1000, 1), (2000, 2), (3000, 3), (4000, 4), (5000, 5)")) {
107-
return;
108-
}
103+
"insert into root.db.d1(time,s1) values (1000,1),(2000,2),(3000,3),(4000,4),(5000,5)",
104+
null);
109105

110106
final Map<String, String> extractorAttributes = new HashMap<>();
111107
final Map<String, String> processorAttributes = new HashMap<>();

0 commit comments

Comments
 (0)