diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java index fb937f512df5..334f895dcb47 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java @@ -78,6 +78,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -282,6 +283,8 @@ public void testMongodbCdcMultiTaskConcurrentSubmission(TestContainer container) }); TimeUnit.SECONDS.sleep(20); + assertTaskNotCompletedExceptionally(task1, "products"); + assertTaskNotCompletedExceptionally(task2, "orders"); // insert update delete operations upsertDeleteSourceTable(); @@ -291,12 +294,16 @@ public void testMongodbCdcMultiTaskConcurrentSubmission(TestContainer container) // Verify both tasks work correctly without cache interference assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS); assertionsSourceAndSink(MONGODB_COLLECTION_2, SINK_SQL_ORDERS); + assertTaskNotCompletedExceptionally(task1, "products"); + assertTaskNotCompletedExceptionally(task2, "orders"); - // Clean and verify again to ensure CDC continues to work - cleanSourceTable(); + // Append incremental changes and verify again to ensure CDC continues to work + appendIncrementalSourceTableData(); TimeUnit.SECONDS.sleep(20); assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS); assertionsSourceAndSink(MONGODB_COLLECTION_2, SINK_SQL_ORDERS); + assertTaskNotCompletedExceptionally(task1, "products"); + assertTaskNotCompletedExceptionally(task2, "orders"); } @TestTemplate @@ -667,12 +674,53 @@ private void upsertDeleteSourceTable() { mongodbContainer.executeCommandFileInDatabase("inventoryDDL", MONGODB_DATABASE); } + private void appendIncrementalSourceTableData() { + MongoDatabase mongoDatabase = client.getDatabase(MONGODB_DATABASE); + MongoCollection products = mongoDatabase.getCollection(MONGODB_COLLECTION_1); + MongoCollection orders = mongoDatabase.getCollection(MONGODB_COLLECTION_2); + + ObjectId productId = new ObjectId("100000000000000000000120"); + Document product = new Document(); + product.put("_id", productId); + product.put("name", "usb-c cable"); + product.put("description", "durable usb-c charging cable"); + product.put("weight", "50"); + products.insertOne(product); + products.updateOne( + Filters.eq("_id", productId), Updates.set("description", "durable usb-c cable 1m")); + + Document order = new Document(); + order.put("_id", new ObjectId("100000000000000000000121")); + order.put("order_number", 102600); + order.put("order_date", "2023-11-18"); + order.put("quantity", 7); + order.put("product_id", productId); + orders.insertOne(order); + } + private void cleanSourceTable() { mongodbContainer.executeCommandFileInDatabase("inventoryClean", MONGODB_DATABASE); truncateMysqlTable(MONGODB_COLLECTION_1); truncateMysqlTable(MONGODB_COLLECTION_2); } + private void assertTaskNotCompletedExceptionally( + CompletableFuture task, String taskName) { + if (!task.isCompletedExceptionally()) { + return; + } + try { + task.join(); + } catch (CompletionException e) { + Throwable cause = e.getCause() == null ? e : e.getCause(); + throw new AssertionError( + String.format( + "Concurrent MongoDB CDC task for [%s] failed during submission", + taskName), + cause); + } + } + public void initConnection() { String ipAddress = mongodbContainer.getHost(); Integer port = mongodbContainer.getFirstMappedPort();