[Fix][E2E-MongoDB CDC] Stabilize concurrent submission test and ensure CI trigger#10474
[Fix][E2E-MongoDB CDC] Stabilize concurrent submission test and ensure CI trigger#10474yzeng1618 wants to merge 4 commits intoapache:devfrom
Conversation
…on ConditionTimeout
Issue 1: Missing method documentation commentsLocation: private void cleanMongoSourceTable() {
mongodbContainer.executeCommandFileInDatabase("inventoryClean", MONGODB_DATABASE);
}Related context:
Problem description:
Potential risks:
Impact scope:
Severity: MINOR Improvement suggestions: /**
* Cleans only the MongoDB source collections without touching the MySQL sink tables.
* This is specifically used in concurrent CDC tests to avoid race conditions where:
* <ul>
* <li>TRUNCATE on MySQL happens immediately</li>
* <li>CDC stream continues processing and may replay/upsert data to the emptied table</li>
* <li>Result: source is empty but sink is partially filled (e.g., 2/5 records)</li>
* </ul>
*
* <p>By only cleaning MongoDB source, the CDC stream naturally stops producing new events,
* allowing the test to verify final consistency without interference from replay operations.
*
* <p>Use {@link #cleanSourceTable()} when you need to clean both source and sink.
*
* @see #cleanSourceTable()
* @see #testMongodbCdcMultiTaskConcurrentSubmission()
*/
private void cleanMongoSourceTable() {
mongodbContainer.executeCommandFileInDatabase("inventoryClean", MONGODB_DATABASE);
}Rationale:
Issue 2: CI trigger conditions may be too broadLocation: all-connectors-it-1:
needs: [ changes, sanity-check ]
if: needs.changes.outputs.api == 'true' || needs.changes.outputs.engine == 'true' || contains(needs.changes.outputs.it-modules, 'connector-cdc-mongodb-e2e')Related context:
Problem description:
Potential risks:
Impact scope:
Severity: MINOR Improvement suggestions: Option A: Create dedicated MongoDB CDC E2E job mongodb-cdc-connector-it:
needs: [ changes, sanity-check ]
if: needs.changes.outputs.api == 'true' || needs.changes.outputs.engine == 'true' || contains(needs.changes.outputs.it-modules, 'connector-cdc-mongodb-e2e')
runs-on: ${{ matrix.os }}
strategy:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
timeout-minutes: 180
steps:
# ... Steps similar to all-connectors-it, but only runs connector-cdc-mongodb-e2eOption B: Add conditions in multiple all-connectors-it jobs # Add the same condition to all-connectors-it-1, 2, ..., 7
if: needs.changes.outputs.api == 'true' || needs.changes.outputs.engine == 'true' || contains(needs.changes.outputs.it-modules, 'connector-cdc-mongodb-e2e')Rationale:
Note: This requires deep understanding of the project's CI/CD architecture and strategy. Recommend discussing with CI/CD maintainers. Issue 3: Fix may be incomplete - other test methods may also be affectedLocation: @TestTemplate
public void testMongodbCdcToMysqlCheckDataE2e(TestContainer container)
throws InterruptedException {
cleanSourceTable(); // Line 183
// ...
cleanSourceTable(); // Line 200 - Called after running CDC task
TimeUnit.SECONDS.sleep(20);
assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS);
}
@TestTemplate
public void testMongodbCdcMultiTableToMysqlE2e(TestContainer container)
throws InterruptedException {
cleanSourceTable(); // Line 208
// ...
cleanSourceTable(); // Line 226 - Called after running CDC task
TimeUnit.SECONDS.sleep(20);
assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS);
assertionsSourceAndSink(MONGODB_COLLECTION_2, SINK_SQL_ORDERS);
}Related context:
Problem description:
Potential risks:
Impact scope:
Severity: MAJOR Improvement suggestions: Option A: Uniformly fix all similar scenarios @TestTemplate
public void testMongodbCdcToMysqlCheckDataE2e(TestContainer container)
throws InterruptedException {
cleanSourceTable();
CompletableFuture.supplyAsync(
() -> {
try {
container.executeJob("/mongodbcdc_to_mysql.conf");
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException();
}
return null;
});
TimeUnit.SECONDS.sleep(10);
// insert update delete
upsertDeleteSourceTable();
TimeUnit.SECONDS.sleep(20);
assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS);
// Fix: Only clear MongoDB after running CDC task
cleanMongoSourceTable(); // Change to cleanMongoSourceTable()
TimeUnit.SECONDS.sleep(20);
assertionsSourceAndSink(MONGODB_COLLECTION_1, SINK_SQL_PRODUCTS);
}Option B: Analyze each test scenario, selectively fix
Option C: Add comments explaining why not modifying // This test intentionally cleans both source and sink to verify CDC can recover
// from a complete reset scenario.
cleanSourceTable();Rationale:
Suggested actions:
|
|
Modify backend.yaml to enable the CI pipeline to cover the E2E tests for all-connectors-it-1. The CI pipeline now runs successfully as expected. https://github.com/yzeng1618/seatunnel-dev/actions/runs/21851023643 |

Purpose of this pull request
This PR fixes a flaky MongoDB CDC E2E case and ensures the related CI job is triggered when this module changes.
Root cause: The cleanSourceTable() function executes Mongo deleteMany and MySQL TRUNCATE simultaneously. When the job is running continuously, the sink is emptied and then backfilled to 2/5 by replay/upsert within the stream, resulting in a persistent inconsistency where the source is empty but the sink is not.
Does this PR introduce any user-facing change?
No. This PR only changed this test to keep the same goal, but remove a flaky step.
What changed:
Why:
How was this patch tested?
Check list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.