Skip to content

Commit 621693f

Browse files
Pipe IT: table model tests for consensus pipes × user pipes (apache#14200)
Co-authored-by: Zhenyu Luo <[email protected]>
1 parent 950ecff commit 621693f

17 files changed

+654
-135
lines changed

.github/workflows/pipe-it-2cluster.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ jobs:
264264
matrix:
265265
java: [17]
266266
# StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet.
267-
cluster: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode]
267+
cluster: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode, PipeConsensusBatchMode, PipeConsensusStreamMode]
268268
os: [ ubuntu-latest ]
269269
runs-on: ${{ matrix.os }}
270270
steps:

integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java

Lines changed: 157 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.Set;
5252
import java.util.TreeMap;
5353
import java.util.concurrent.TimeUnit;
54+
import java.util.function.Consumer;
5455

5556
import static org.apache.iotdb.itbase.constant.TestConstant.DELTA;
5657
import static org.apache.iotdb.itbase.constant.TestConstant.NULL;
@@ -538,6 +539,35 @@ public static void assertResultSetEqual(
538539
}
539540
}
540541

542+
public static void assertResultSetEqual(
543+
ResultSet actualResultSet,
544+
String expectedHeader,
545+
Set<String> expectedRetSet,
546+
Consumer consumer) {
547+
try {
548+
ResultSetMetaData resultSetMetaData = actualResultSet.getMetaData();
549+
StringBuilder header = new StringBuilder();
550+
for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
551+
header.append(resultSetMetaData.getColumnName(i)).append(",");
552+
}
553+
assertEquals(expectedHeader, header.toString());
554+
555+
Set<String> actualRetSet = new HashSet<>();
556+
557+
while (actualResultSet.next()) {
558+
StringBuilder builder = new StringBuilder();
559+
for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
560+
builder.append(actualResultSet.getString(i)).append(",");
561+
}
562+
actualRetSet.add(builder.toString());
563+
}
564+
assertEquals(expectedRetSet, actualRetSet);
565+
} catch (Exception e) {
566+
e.printStackTrace();
567+
Assert.fail(String.valueOf(e));
568+
}
569+
}
570+
541571
public static void assertSingleResultSetEqual(
542572
ResultSet actualResultSet, Map<String, String> expectedHeaderWithResult) {
543573
try {
@@ -974,6 +1004,15 @@ public static void assertDataEventuallyOnEnv(
9741004
assertDataEventuallyOnEnv(env, sql, expectedHeader, expectedResSet, 600);
9751005
}
9761006

1007+
public static void assertDataEventuallyOnEnv(
1008+
final BaseEnv env,
1009+
final String sql,
1010+
final String expectedHeader,
1011+
final Set<String> expectedResSet,
1012+
final Consumer<String> handleFailure) {
1013+
assertDataEventuallyOnEnv(env, sql, expectedHeader, expectedResSet, 600, handleFailure);
1014+
}
1015+
9771016
public static void assertDataEventuallyOnEnv(
9781017
BaseEnv env,
9791018
String sql,
@@ -1004,21 +1043,71 @@ public static void assertDataEventuallyOnEnv(
10041043
}
10051044

10061045
public static void assertDataEventuallyOnEnv(
1007-
BaseEnv env,
1008-
String sql,
1009-
String expectedHeader,
1010-
Set<String> expectedResSet,
1011-
String dataBaseName) {
1012-
assertDataEventuallyOnEnv(env, sql, expectedHeader, expectedResSet, 600, dataBaseName);
1046+
final BaseEnv env,
1047+
final String sql,
1048+
final String expectedHeader,
1049+
final Set<String> expectedResSet,
1050+
final long timeoutSeconds,
1051+
final Consumer<String> handleFailure) {
1052+
try (Connection connection = env.getConnection();
1053+
Statement statement = connection.createStatement()) {
1054+
// Keep retrying if there are execution failures
1055+
await()
1056+
.pollInSameThread()
1057+
.pollDelay(1L, TimeUnit.SECONDS)
1058+
.pollInterval(1L, TimeUnit.SECONDS)
1059+
.atMost(timeoutSeconds, TimeUnit.SECONDS)
1060+
.untilAsserted(
1061+
() -> {
1062+
try {
1063+
TestUtils.assertResultSetEqual(
1064+
executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet);
1065+
} catch (Exception e) {
1066+
if (handleFailure != null) {
1067+
handleFailure.accept(e.getMessage());
1068+
}
1069+
Assert.fail();
1070+
} catch (Error e) {
1071+
if (handleFailure != null) {
1072+
handleFailure.accept(e.getMessage());
1073+
}
1074+
throw e;
1075+
}
1076+
});
1077+
} catch (Exception e) {
1078+
e.printStackTrace();
1079+
fail();
1080+
}
10131081
}
10141082

10151083
public static void assertDataEventuallyOnEnv(
1016-
BaseEnv env,
1017-
String sql,
1018-
String expectedHeader,
1019-
Set<String> expectedResSet,
1020-
long timeoutSeconds,
1021-
String dataBaseName) {
1084+
final BaseEnv env,
1085+
final String sql,
1086+
final String expectedHeader,
1087+
final Set<String> expectedResSet,
1088+
final String dataBaseName) {
1089+
assertDataEventuallyOnEnv(env, sql, expectedHeader, expectedResSet, 600, dataBaseName, null);
1090+
}
1091+
1092+
public static void assertDataEventuallyOnEnv(
1093+
final BaseEnv env,
1094+
final String sql,
1095+
final String expectedHeader,
1096+
final Set<String> expectedResSet,
1097+
final String dataBaseName,
1098+
final Consumer<String> handleFailure) {
1099+
assertDataEventuallyOnEnv(
1100+
env, sql, expectedHeader, expectedResSet, 600, dataBaseName, handleFailure);
1101+
}
1102+
1103+
public static void assertDataEventuallyOnEnv(
1104+
final BaseEnv env,
1105+
final String sql,
1106+
final String expectedHeader,
1107+
final Set<String> expectedResSet,
1108+
final long timeoutSeconds,
1109+
final String dataBaseName,
1110+
final Consumer<String> handleFailure) {
10221111
try (Connection connection = env.getConnection(BaseEnv.TABLE_SQL_DIALECT);
10231112
Statement statement = connection.createStatement()) {
10241113
// Keep retrying if there are execution failures
@@ -1033,12 +1122,20 @@ public static void assertDataEventuallyOnEnv(
10331122
if (dataBaseName != null) {
10341123
statement.execute("use " + dataBaseName);
10351124
}
1036-
if (sql != null && !sql.equals("")) {
1125+
if (sql != null && !sql.isEmpty()) {
10371126
TestUtils.assertResultSetEqual(
10381127
executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet);
10391128
}
10401129
} catch (Exception e) {
1130+
if (handleFailure != null) {
1131+
handleFailure.accept(e.getMessage());
1132+
}
10411133
Assert.fail();
1134+
} catch (Error e) {
1135+
if (handleFailure != null) {
1136+
handleFailure.accept(e.getMessage());
1137+
}
1138+
throw e;
10421139
}
10431140
});
10441141
} catch (Exception e) {
@@ -1081,6 +1178,15 @@ public static void assertDataAlwaysOnEnv(
10811178
assertDataAlwaysOnEnv(env, sql, expectedHeader, expectedResSet, 10);
10821179
}
10831180

1181+
public static void assertDataAlwaysOnEnv(
1182+
BaseEnv env,
1183+
String sql,
1184+
String expectedHeader,
1185+
Set<String> expectedResSet,
1186+
Consumer<String> handleFailure) {
1187+
assertDataAlwaysOnEnv(env, sql, expectedHeader, expectedResSet, 10, handleFailure);
1188+
}
1189+
10841190
public static void assertDataAlwaysOnEnv(
10851191
BaseEnv env,
10861192
String sql,
@@ -1110,6 +1216,44 @@ public static void assertDataAlwaysOnEnv(
11101216
}
11111217
}
11121218

1219+
public static void assertDataAlwaysOnEnv(
1220+
BaseEnv env,
1221+
String sql,
1222+
String expectedHeader,
1223+
Set<String> expectedResSet,
1224+
long consistentSeconds,
1225+
Consumer<String> handleFailure) {
1226+
try (Connection connection = env.getConnection();
1227+
Statement statement = connection.createStatement()) {
1228+
// Keep retrying if there are execution failures
1229+
await()
1230+
.pollInSameThread()
1231+
.pollDelay(1L, TimeUnit.SECONDS)
1232+
.pollInterval(1L, TimeUnit.SECONDS)
1233+
.atMost(consistentSeconds, TimeUnit.SECONDS)
1234+
.failFast(
1235+
() -> {
1236+
try {
1237+
TestUtils.assertResultSetEqual(
1238+
executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet);
1239+
} catch (Exception e) {
1240+
if (handleFailure != null) {
1241+
handleFailure.accept(e.getMessage());
1242+
}
1243+
Assert.fail();
1244+
} catch (Error e) {
1245+
if (handleFailure != null) {
1246+
handleFailure.accept(e.getMessage());
1247+
}
1248+
throw e;
1249+
}
1250+
});
1251+
} catch (Exception e) {
1252+
e.printStackTrace();
1253+
fail();
1254+
}
1255+
}
1256+
11131257
public static void restartDataNodes() {
11141258
EnvFactory.getEnv().shutdownAllDataNodes();
11151259
EnvFactory.getEnv().startAllDataNodes();

integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeAlterIT.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.sql.SQLException;
3838
import java.sql.Statement;
3939
import java.util.List;
40+
import java.util.function.Consumer;
4041

4142
import static org.junit.Assert.fail;
4243

@@ -405,6 +406,12 @@ public void testAlterPipeSourceAndSink() {
405406
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
406407
boolean insertResult = true;
407408

409+
final Consumer<String> handleFailure =
410+
o -> {
411+
TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
412+
TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
413+
};
414+
408415
TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
409416
TableModelUtils.createDataBaseAndTable(senderEnv, "test1", "test1");
410417
// Create pipe
@@ -426,7 +433,7 @@ public void testAlterPipeSourceAndSink() {
426433
}
427434

428435
// Check data on receiver
429-
TableModelUtils.assertData("test", "test", 0, 100, receiverEnv);
436+
TableModelUtils.assertData("test", "test", 0, 100, receiverEnv, handleFailure);
430437

431438
// Alter pipe (modify 'source.path', 'source.inclusion' and
432439
// 'processor.tumbling-time.interval-seconds')
@@ -448,12 +455,14 @@ public void testAlterPipeSourceAndSink() {
448455
TableModelUtils.getQuerySql("test"),
449456
TableModelUtils.generateHeaderResults(),
450457
TableModelUtils.generateExpectedResults(0, 100),
451-
"test");
458+
"test",
459+
handleFailure);
452460
TestUtils.assertDataEventuallyOnEnv(
453461
receiverEnv,
454462
TableModelUtils.getQuerySql("test1"),
455463
TableModelUtils.generateHeaderResults(),
456464
TableModelUtils.generateExpectedResults(0, 200),
457-
"test1");
465+
"test1",
466+
handleFailure);
458467
}
459468
}

integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeAutoConflictIT.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939

4040
import java.util.HashMap;
4141
import java.util.Map;
42+
import java.util.function.Consumer;
4243

4344
import static org.junit.Assert.fail;
4445

@@ -81,6 +82,12 @@ public void testDoubleLivingAutoConflict() throws Exception {
8182
final DataNodeWrapper senderDataNode = senderEnv.getDataNodeWrapper(0);
8283
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
8384

85+
final Consumer<String> handleFailure =
86+
o -> {
87+
TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
88+
TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
89+
};
90+
8491
createDataBaseAndTable(senderEnv);
8592
createDataBaseAndTable(receiverEnv);
8693

@@ -151,7 +158,7 @@ public void testDoubleLivingAutoConflict() throws Exception {
151158
}
152159
insertData("test", "test1", 300, 400, receiverEnv);
153160

154-
TableModelUtils.assertData("test", "test1", 0, 400, receiverEnv);
161+
TableModelUtils.assertData("test", "test1", 0, 400, receiverEnv, handleFailure);
155162

156163
try {
157164
TestUtils.restartCluster(senderEnv);
@@ -163,14 +170,20 @@ public void testDoubleLivingAutoConflict() throws Exception {
163170
insertData("test", "test1", 400, 500, senderEnv);
164171
insertData("test", "test1", 500, 600, receiverEnv);
165172

166-
TableModelUtils.assertData("test", "test1", 0, 600, receiverEnv);
173+
TableModelUtils.assertData("test", "test1", 0, 600, receiverEnv, handleFailure);
167174
}
168175

169176
@Test
170177
public void testDoubleLivingAutoConflictTemplate() throws Exception {
171178
final DataNodeWrapper senderDataNode = senderEnv.getDataNodeWrapper(0);
172179
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
173180

181+
final Consumer<String> handleFailure =
182+
o -> {
183+
TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
184+
TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
185+
};
186+
174187
final String senderIp = senderDataNode.getIp();
175188
final int senderPort = senderDataNode.getPort();
176189
final String receiverIp = receiverDataNode.getIp();
@@ -245,14 +258,16 @@ public void testDoubleLivingAutoConflictTemplate() throws Exception {
245258
TableModelUtils.getQuerySql("test"),
246259
TableModelUtils.generateHeaderResults(),
247260
TableModelUtils.generateExpectedResults(0, 200),
248-
"test");
261+
"test",
262+
handleFailure);
249263

250264
TestUtils.assertDataEventuallyOnEnv(
251265
senderEnv,
252266
TableModelUtils.getQuerySql("test1"),
253267
TableModelUtils.generateHeaderResults(),
254268
TableModelUtils.generateExpectedResults(200, 400),
255-
"test");
269+
"test",
270+
handleFailure);
256271
}
257272

258273
private void createDataBaseAndTable(BaseEnv baseEnv) {

0 commit comments

Comments
 (0)