Skip to content

Commit 908949b

Browse files
[FLINK-36326][source-connector/mysql] Fix auto scan newly-added table failure
This closes #3661. Co-authored-by: Hang Ruan <ruanhang1993@hotmail.com>
1 parent 40db6d5 commit 908949b

File tree

2 files changed

+152
-1
lines changed

2 files changed

+152
-1
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ public class MySqlSourceEnumerator implements SplitEnumerator<MySqlSplit, Pendin
8181

8282
@Nullable private Integer binlogSplitTaskId;
8383

84+
private boolean isBinlogSplitUpdateRequestAlreadySent = false;
85+
8486
public MySqlSourceEnumerator(
8587
SplitEnumeratorContext<MySqlSplit> context,
8688
MySqlSourceConfig sourceConfig,
@@ -273,7 +275,9 @@ private void syncWithReaders(int[] subtaskIds, Throwable t) {
273275
}
274276

275277
private void requestBinlogSplitUpdateIfNeed() {
276-
if (isNewlyAddedAssigningSnapshotFinished(splitAssigner.getAssignerStatus())) {
278+
if (!isBinlogSplitUpdateRequestAlreadySent
279+
&& isNewlyAddedAssigningSnapshotFinished(splitAssigner.getAssignerStatus())) {
280+
isBinlogSplitUpdateRequestAlreadySent = true;
277281
for (int subtaskId : getRegisteredReader()) {
278282
LOG.info(
279283
"The enumerator requests subtask {} to update the binlog split after newly added table.",

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/NewlyAddedTableITCase.java

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import java.util.stream.Stream;
7979

8080
import static java.lang.String.format;
81+
import static org.apache.flink.api.common.restartstrategy.RestartStrategies.noRestart;
8182
import static org.apache.flink.util.Preconditions.checkState;
8283

8384
/** IT tests to cover various newly added tables during capture process. */
@@ -511,6 +512,12 @@ public void testRemoveAndAddNewTable() throws Exception {
511512
temporaryFolder.delete();
512513
}
513514

515+
@Test
516+
public void testNewlyAddedEmptyTableAndInsertAfterJobStart() throws Exception {
517+
testNewlyAddedTableOneByOneWithCreateBeforeStart(
518+
1, new HashMap<>(), "address_hangzhou", "address_beijing");
519+
}
520+
514521
/** Add a collect sink in the job. */
515522
protected CollectResultIterator<RowData> addCollectSink(DataStream<RowData> stream) {
516523
TypeSerializer<RowData> serializer =
@@ -1108,4 +1115,144 @@ private static int sinkSize(String sinkName) {
11081115
}
11091116
}
11101117
}
1118+
1119+
private void testNewlyAddedTableOneByOneWithCreateBeforeStart(
1120+
int parallelism, Map<String, String> sourceOptions, String... captureAddressTables)
1121+
throws Exception {
1122+
final TemporaryFolder temporaryFolder = new TemporaryFolder();
1123+
temporaryFolder.create();
1124+
final String savepointDirectory = temporaryFolder.newFolder().toURI().toString();
1125+
String finishedSavePointPath = null;
1126+
List<String> fetchedDataList = new ArrayList<>();
1127+
for (int round = 0; round < captureAddressTables.length; round++) {
1128+
boolean insertData = round == 0;
1129+
initialAddressTables(getConnection(), captureAddressTables, round, insertData);
1130+
String[] captureTablesThisRound =
1131+
Arrays.asList(captureAddressTables)
1132+
.subList(0, round + 1)
1133+
.toArray(new String[0]);
1134+
String newlyAddedTable = captureAddressTables[round];
1135+
StreamExecutionEnvironment env =
1136+
getStreamExecutionEnvironment(finishedSavePointPath, parallelism);
1137+
env.setRestartStrategy(noRestart());
1138+
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
1139+
String createTableStatement =
1140+
getCreateTableStatement(sourceOptions, captureTablesThisRound);
1141+
tEnv.executeSql(createTableStatement);
1142+
tEnv.executeSql(
1143+
"CREATE TABLE sink ("
1144+
+ " table_name STRING,"
1145+
+ " id BIGINT,"
1146+
+ " country STRING,"
1147+
+ " city STRING,"
1148+
+ " detail_address STRING,"
1149+
+ " primary key (city, id) not enforced"
1150+
+ ") WITH ("
1151+
+ " 'connector' = 'values',"
1152+
+ " 'sink-insert-only' = 'false'"
1153+
+ ")");
1154+
TableResult tableResult = tEnv.executeSql("insert into sink select * from address");
1155+
JobClient jobClient = tableResult.getJobClient().get();
1156+
Thread.sleep(3_000);
1157+
String tableName = captureAddressTables[round];
1158+
if (!insertData) {
1159+
insertData(
1160+
getConnection(),
1161+
customDatabase.getDatabaseName() + "." + tableName,
1162+
tableName.split("_")[1]);
1163+
}
1164+
// step 2: assert fetched snapshot data in this round
1165+
String cityName = newlyAddedTable.split("_")[1];
1166+
List<String> expectedSnapshotDataThisRound =
1167+
Arrays.asList(
1168+
format(
1169+
"+I[%s, 416874195632735147, China, %s, %s West Town address 1]",
1170+
newlyAddedTable, cityName, cityName),
1171+
format(
1172+
"+I[%s, 416927583791428523, China, %s, %s West Town address 2]",
1173+
newlyAddedTable, cityName, cityName),
1174+
format(
1175+
"+I[%s, 417022095255614379, China, %s, %s West Town address 3]",
1176+
newlyAddedTable, cityName, cityName));
1177+
fetchedDataList.addAll(expectedSnapshotDataThisRound);
1178+
waitForUpsertSinkSize("sink", fetchedDataList.size());
1179+
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink"));
1180+
// step 3: make some binlog data for this round
1181+
makeFirstPartBinlogForAddressTable(getConnection(), newlyAddedTable);
1182+
makeSecondPartBinlogForAddressTable(getConnection(), newlyAddedTable);
1183+
// step 4: assert fetched binlog data in this round
1184+
// retract the old data with id 416874195632735147
1185+
fetchedDataList =
1186+
fetchedDataList.stream()
1187+
.filter(
1188+
r ->
1189+
!r.contains(
1190+
format(
1191+
"%s, 416874195632735147",
1192+
newlyAddedTable)))
1193+
.collect(Collectors.toList());
1194+
List<String> expectedBinlogUpsertDataThisRound =
1195+
Arrays.asList(
1196+
// add the new data with id 416874195632735147
1197+
format(
1198+
"+I[%s, 416874195632735147, CHINA, %s, %s West Town address 1]",
1199+
newlyAddedTable, cityName, cityName),
1200+
format(
1201+
"+I[%s, 417022095255614380, China, %s, %s West Town address 4]",
1202+
newlyAddedTable, cityName, cityName));
1203+
// step 5: assert fetched binlog data in this round
1204+
fetchedDataList.addAll(expectedBinlogUpsertDataThisRound);
1205+
waitForUpsertSinkSize("sink", fetchedDataList.size());
1206+
// the result size of sink may arrive fetchedDataList.size() with old data, wait one
1207+
// checkpoint to wait retract old record and send new record
1208+
Thread.sleep(1000);
1209+
assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults("sink"));
1210+
// step 6: trigger savepoint
1211+
if (round != captureAddressTables.length - 1) {
1212+
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
1213+
}
1214+
jobClient.cancel().get();
1215+
}
1216+
}
1217+
1218+
private void initialAddressTables(
1219+
JdbcConnection connection, String[] addressTables, int round, boolean insertData)
1220+
throws SQLException {
1221+
try {
1222+
connection.setAutoCommit(false);
1223+
String tableName = addressTables[round];
1224+
String tableId = customDatabase.getDatabaseName() + "." + tableName;
1225+
String cityName = tableName.split("_")[1];
1226+
connection.execute(
1227+
"CREATE TABLE "
1228+
+ tableId
1229+
+ "("
1230+
+ " id BIGINT UNSIGNED NOT NULL PRIMARY KEY,"
1231+
+ " country VARCHAR(255) NOT NULL,"
1232+
+ " city VARCHAR(255) NOT NULL,"
1233+
+ " detail_address VARCHAR(1024)"
1234+
+ ");");
1235+
if (insertData) {
1236+
insertData(connection, tableId, cityName);
1237+
}
1238+
connection.commit();
1239+
} finally {
1240+
connection.close();
1241+
}
1242+
}
1243+
1244+
private void insertData(JdbcConnection connection, String tableId, String cityName)
1245+
throws SQLException {
1246+
try {
1247+
connection.execute(
1248+
format(
1249+
"INSERT INTO %s "
1250+
+ "VALUES (416874195632735147, 'China', '%s', '%s West Town address 1'),"
1251+
+ " (416927583791428523, 'China', '%s', '%s West Town address 2'),"
1252+
+ " (417022095255614379, 'China', '%s', '%s West Town address 3');",
1253+
tableId, cityName, cityName, cityName, cityName, cityName, cityName));
1254+
} finally {
1255+
connection.close();
1256+
}
1257+
}
11111258
}

0 commit comments

Comments
 (0)