Skip to content

Commit 085684b

Browse files
authored
[FLINK-36811][mysql] MySQL cdc setIsProcessingBacklog in snapshot phase and exit when snapshot phase finished (#3793)
1 parent 1fb68a8 commit 085684b

File tree

10 files changed

+216
-31
lines changed

10 files changed

+216
-31
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,8 @@ public SplitEnumerator<MySqlSplit, PendingSplitsState> createEnumerator(
208208
sourceConfig,
209209
enumContext.currentParallelism(),
210210
new ArrayList<>(),
211-
isTableIdCaseSensitive);
211+
isTableIdCaseSensitive,
212+
enumContext);
212213
} catch (Exception e) {
213214
throw new FlinkRuntimeException(
214215
"Failed to discover captured tables for enumerator", e);
@@ -233,7 +234,8 @@ public SplitEnumerator<MySqlSplit, PendingSplitsState> restoreEnumerator(
233234
new MySqlHybridSplitAssigner(
234235
sourceConfig,
235236
enumContext.currentParallelism(),
236-
(HybridPendingSplitsState) checkpoint);
237+
(HybridPendingSplitsState) checkpoint,
238+
enumContext);
237239
} else if (checkpoint instanceof BinlogPendingSplitsState) {
238240
splitAssigner =
239241
new MySqlBinlogSplitAssigner(

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.flink.cdc.connectors.mysql.source.assigners;
1919

20+
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
2021
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.HybridPendingSplitsState;
2122
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsState;
2223
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
@@ -59,23 +60,32 @@ public MySqlHybridSplitAssigner(
5960
MySqlSourceConfig sourceConfig,
6061
int currentParallelism,
6162
List<TableId> remainingTables,
62-
boolean isTableIdCaseSensitive) {
63+
boolean isTableIdCaseSensitive,
64+
SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
6365
this(
6466
sourceConfig,
6567
new MySqlSnapshotSplitAssigner(
66-
sourceConfig, currentParallelism, remainingTables, isTableIdCaseSensitive),
68+
sourceConfig,
69+
currentParallelism,
70+
remainingTables,
71+
isTableIdCaseSensitive,
72+
enumeratorContext),
6773
false,
6874
sourceConfig.getSplitMetaGroupSize());
6975
}
7076

7177
public MySqlHybridSplitAssigner(
7278
MySqlSourceConfig sourceConfig,
7379
int currentParallelism,
74-
HybridPendingSplitsState checkpoint) {
80+
HybridPendingSplitsState checkpoint,
81+
SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
7582
this(
7683
sourceConfig,
7784
new MySqlSnapshotSplitAssigner(
78-
sourceConfig, currentParallelism, checkpoint.getSnapshotPendingSplits()),
85+
sourceConfig,
86+
currentParallelism,
87+
checkpoint.getSnapshotPendingSplits(),
88+
enumeratorContext),
7989
checkpoint.isBinlogSplitAssigned(),
8090
sourceConfig.getSplitMetaGroupSize());
8191
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.flink.cdc.connectors.mysql.source.assigners;
1919

20+
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
2021
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
2122
import org.apache.flink.cdc.connectors.mysql.schema.MySqlSchema;
2223
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.ChunkSplitterState;
@@ -79,6 +80,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
7980
private final int currentParallelism;
8081
private final List<TableId> remainingTables;
8182
private final boolean isRemainingTablesCheckpointed;
83+
private final SplitEnumeratorContext<MySqlSplit> enumeratorContext;
8284

8385
private final MySqlPartition partition;
8486
private final Object lock = new Object();
@@ -95,7 +97,8 @@ public MySqlSnapshotSplitAssigner(
9597
MySqlSourceConfig sourceConfig,
9698
int currentParallelism,
9799
List<TableId> remainingTables,
98-
boolean isTableIdCaseSensitive) {
100+
boolean isTableIdCaseSensitive,
101+
SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
99102
this(
100103
sourceConfig,
101104
currentParallelism,
@@ -108,13 +111,15 @@ public MySqlSnapshotSplitAssigner(
108111
remainingTables,
109112
isTableIdCaseSensitive,
110113
true,
111-
ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
114+
ChunkSplitterState.NO_SPLITTING_TABLE_STATE,
115+
enumeratorContext);
112116
}
113117

114118
public MySqlSnapshotSplitAssigner(
115119
MySqlSourceConfig sourceConfig,
116120
int currentParallelism,
117-
SnapshotPendingSplitsState checkpoint) {
121+
SnapshotPendingSplitsState checkpoint,
122+
SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
118123
this(
119124
sourceConfig,
120125
currentParallelism,
@@ -127,7 +132,8 @@ public MySqlSnapshotSplitAssigner(
127132
checkpoint.getRemainingTables(),
128133
checkpoint.isTableIdCaseSensitive(),
129134
checkpoint.isRemainingTablesCheckpointed(),
130-
checkpoint.getChunkSplitterState());
135+
checkpoint.getChunkSplitterState(),
136+
enumeratorContext);
131137
}
132138

133139
private MySqlSnapshotSplitAssigner(
@@ -142,7 +148,8 @@ private MySqlSnapshotSplitAssigner(
142148
List<TableId> remainingTables,
143149
boolean isTableIdCaseSensitive,
144150
boolean isRemainingTablesCheckpointed,
145-
ChunkSplitterState chunkSplitterState) {
151+
ChunkSplitterState chunkSplitterState,
152+
SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
146153
this.sourceConfig = sourceConfig;
147154
this.currentParallelism = currentParallelism;
148155
this.alreadyProcessedTables = alreadyProcessedTables;
@@ -168,10 +175,12 @@ private MySqlSnapshotSplitAssigner(
168175
createChunkSplitter(sourceConfig, isTableIdCaseSensitive, chunkSplitterState);
169176
this.partition =
170177
new MySqlPartition(sourceConfig.getMySqlConnectorConfig().getLogicalName());
178+
this.enumeratorContext = enumeratorContext;
171179
}
172180

173181
@Override
174182
public void open() {
183+
shouldEnterProcessingBacklog();
175184
chunkSplitter.open();
176185
discoveryCaptureTables();
177186
captureNewlyAddedTables();
@@ -397,17 +406,20 @@ public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos() {
397406
@Override
398407
public void onFinishedSplits(Map<String, BinlogOffset> splitFinishedOffsets) {
399408
this.splitFinishedOffsets.putAll(splitFinishedOffsets);
400-
if (allSnapshotSplitsFinished()
401-
&& AssignerStatus.isAssigningSnapshotSplits(assignerStatus)) {
402-
// Skip the waiting checkpoint when current parallelism is 1 which means we do not need
403-
// to care about the global output data order of snapshot splits and binlog split.
404-
if (currentParallelism == 1) {
405-
assignerStatus = assignerStatus.onFinish();
406-
LOG.info(
407-
"Snapshot split assigner received all splits finished and the job parallelism is 1, snapshot split assigner is turn into finished status.");
408-
} else {
409-
LOG.info(
410-
"Snapshot split assigner received all splits finished, waiting for a complete checkpoint to mark the assigner finished.");
409+
if (allSnapshotSplitsFinished()) {
410+
enumeratorContext.setIsProcessingBacklog(false);
411+
if (AssignerStatus.isAssigningSnapshotSplits(assignerStatus)) {
412+
// Skip the waiting checkpoint when current parallelism is 1 which means we do not
413+
// need
414+
// to care about the global output data order of snapshot splits and binlog split.
415+
if (currentParallelism == 1) {
416+
assignerStatus = assignerStatus.onFinish();
417+
LOG.info(
418+
"Snapshot split assigner received all splits finished and the job parallelism is 1, snapshot split assigner is turn into finished status.");
419+
} else {
420+
LOG.info(
421+
"Snapshot split assigner received all splits finished, waiting for a complete checkpoint to mark the assigner finished.");
422+
}
411423
}
412424
}
413425
}
@@ -607,4 +619,10 @@ private static MySqlChunkSplitter createChunkSplitter(
607619
}
608620
return new MySqlChunkSplitter(mySqlSchema, sourceConfig);
609621
}
622+
623+
private void shouldEnterProcessingBacklog() {
624+
if (assignerStatus == AssignerStatus.INITIAL_ASSIGNING) {
625+
enumeratorContext.setIsProcessingBacklog(true);
626+
}
627+
}
610628
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
import static org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetUtils.initializeEffectiveOffset;
8787
import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo;
8888
import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit;
89+
import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
8990
import static org.assertj.core.api.Assertions.assertThat;
9091
import static org.junit.Assert.assertEquals;
9192
import static org.junit.Assert.assertNotNull;
@@ -1182,7 +1183,11 @@ private List<MySqlSnapshotSplit> getMySqlSplits(
11821183

11831184
final MySqlSnapshotSplitAssigner assigner =
11841185
new MySqlSnapshotSplitAssigner(
1185-
sourceConfig, DEFAULT_PARALLELISM, remainingTables, false);
1186+
sourceConfig,
1187+
DEFAULT_PARALLELISM,
1188+
remainingTables,
1189+
false,
1190+
getMySqlSplitEnumeratorContext());
11861191
assigner.open();
11871192
List<MySqlSnapshotSplit> mySqlSplits = new ArrayList<>();
11881193
while (true) {

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import java.util.Optional;
5151
import java.util.stream.Collectors;
5252

53+
import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
5354
import static org.junit.Assert.assertNotNull;
5455
import static org.junit.Assert.assertTrue;
5556
import static org.junit.Assert.fail;
@@ -629,7 +630,11 @@ private List<MySqlSplit> getMySqlSplits(
629630
MySqlSourceConfig sourceConfig, List<TableId> remainingTables) {
630631
final MySqlSnapshotSplitAssigner assigner =
631632
new MySqlSnapshotSplitAssigner(
632-
sourceConfig, DEFAULT_PARALLELISM, remainingTables, false);
633+
sourceConfig,
634+
DEFAULT_PARALLELISM,
635+
remainingTables,
636+
false,
637+
getMySqlSplitEnumeratorContext());
633638
assigner.open();
634639
List<MySqlSplit> mySqlSplitList = new ArrayList<>();
635640
while (true) {

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit;
3030
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
3131
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
32+
import org.apache.flink.cdc.connectors.mysql.source.utils.MockMySqlSplitEnumeratorEnumeratorContext;
3233
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
3334
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
3435
import org.apache.flink.table.api.DataTypes;
@@ -50,6 +51,8 @@
5051
import java.util.Optional;
5152
import java.util.stream.Collectors;
5253

54+
import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
55+
import static org.assertj.core.api.Assertions.assertThat;
5356
import static org.junit.Assert.assertEquals;
5457
import static org.junit.Assert.assertTrue;
5558

@@ -109,8 +112,11 @@ public void testAssignMySqlBinlogSplitAfterAllSnapshotSplitsFinished() {
109112
ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
110113
HybridPendingSplitsState checkpoint =
111114
new HybridPendingSplitsState(snapshotPendingSplitsState, false);
115+
MockMySqlSplitEnumeratorEnumeratorContext enumeratorContext =
116+
getMySqlSplitEnumeratorContext();
112117
final MySqlHybridSplitAssigner assigner =
113-
new MySqlHybridSplitAssigner(configuration, DEFAULT_PARALLELISM, checkpoint);
118+
new MySqlHybridSplitAssigner(
119+
configuration, DEFAULT_PARALLELISM, checkpoint, enumeratorContext);
114120

115121
// step 2. Get the MySqlBinlogSplit after all snapshot splits finished
116122
Optional<MySqlSplit> binlogSplit = assigner.getNext();
@@ -152,7 +158,12 @@ public void testAssigningInSnapshotOnlyMode() {
152158

153159
// Create and initialize assigner
154160
MySqlHybridSplitAssigner assigner =
155-
new MySqlHybridSplitAssigner(sourceConfig, 1, new ArrayList<>(), false);
161+
new MySqlHybridSplitAssigner(
162+
sourceConfig,
163+
1,
164+
new ArrayList<>(),
165+
false,
166+
getMySqlSplitEnumeratorContext());
156167
assigner.open();
157168

158169
// Get all snapshot splits
@@ -201,6 +212,57 @@ private MySqlSourceConfig getConfig(String[] captureTables, StartupOptions start
201212
.createConfig(0);
202213
}
203214

215+
@Test
216+
public void testSetProcessingBacklog() {
217+
final String captureTable = "customers";
218+
MySqlSourceConfig configuration = getConfig(new String[] {captureTable});
219+
MockMySqlSplitEnumeratorEnumeratorContext enumeratorContext =
220+
getMySqlSplitEnumeratorContext();
221+
final MySqlHybridSplitAssigner assigner =
222+
new MySqlHybridSplitAssigner(
223+
configuration,
224+
DEFAULT_PARALLELISM,
225+
new ArrayList<>(),
226+
false,
227+
enumeratorContext);
228+
assertThat(enumeratorContext.isProcessingBacklog()).isFalse();
229+
assigner.open();
230+
assertThat(enumeratorContext.isProcessingBacklog()).isTrue();
231+
// Get all snapshot splits
232+
List<MySqlSnapshotSplit> snapshotSplits = drainSnapshotSplits(assigner);
233+
Map<String, BinlogOffset> finishedOffsets = new HashMap<>();
234+
int i = 0;
235+
for (MySqlSnapshotSplit snapshotSplit : snapshotSplits) {
236+
BinlogOffset binlogOffset =
237+
BinlogOffset.builder().setBinlogFilePosition("foo", i++).build();
238+
finishedOffsets.put(snapshotSplit.splitId(), binlogOffset);
239+
}
240+
assigner.onFinishedSplits(finishedOffsets);
241+
assertThat(enumeratorContext.isProcessingBacklog()).isFalse();
242+
assigner.close();
243+
}
244+
245+
private MySqlSourceConfigFactory getConfigFactory(String[] captureTables) {
246+
String[] captureTableIds =
247+
Arrays.stream(captureTables)
248+
.map(tableName -> customerDatabase.getDatabaseName() + "." + tableName)
249+
.toArray(String[]::new);
250+
251+
return new MySqlSourceConfigFactory()
252+
.startupOptions(StartupOptions.initial())
253+
.databaseList(customerDatabase.getDatabaseName())
254+
.tableList(captureTableIds)
255+
.hostname(MYSQL_CONTAINER.getHost())
256+
.port(MYSQL_CONTAINER.getDatabasePort())
257+
.username(customerDatabase.getUsername())
258+
.password(customerDatabase.getPassword())
259+
.serverTimeZone(ZoneId.of("UTC").toString());
260+
}
261+
262+
private MySqlSourceConfig getConfig(String[] captureTables) {
263+
return getConfigFactory(captureTables).createConfig(0);
264+
}
265+
204266
private List<MySqlSnapshotSplit> drainSnapshotSplits(MySqlHybridSplitAssigner assigner) {
205267
List<MySqlSnapshotSplit> snapshotSplits = new ArrayList<>();
206268
while (true) {

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
5151
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
5252
import static org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset.ofEarliest;
53+
import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
5354
import static org.junit.Assert.assertEquals;
5455
import static org.junit.Assert.assertFalse;
5556
import static org.junit.Assert.assertTrue;
@@ -475,7 +476,11 @@ public void testEnumerateTablesLazily() {
475476

476477
final MySqlSnapshotSplitAssigner assigner =
477478
new MySqlSnapshotSplitAssigner(
478-
configuration, DEFAULT_PARALLELISM, new ArrayList<>(), false);
479+
configuration,
480+
DEFAULT_PARALLELISM,
481+
new ArrayList<>(),
482+
false,
483+
getMySqlSplitEnumeratorContext());
479484

480485
assertTrue(assigner.needToDiscoveryTables());
481486
assigner.open();
@@ -549,7 +554,11 @@ private List<String> getTestAssignSnapshotSplits(
549554
.collect(Collectors.toList());
550555
final MySqlSnapshotSplitAssigner assigner =
551556
new MySqlSnapshotSplitAssigner(
552-
configuration, DEFAULT_PARALLELISM, remainingTables, false);
557+
configuration,
558+
DEFAULT_PARALLELISM,
559+
remainingTables,
560+
false,
561+
getMySqlSplitEnumeratorContext());
553562
return getSplitsFromAssigner(assigner);
554563
}
555564

@@ -642,7 +651,11 @@ private List<String> getTestAssignSnapshotSplitsFromCheckpoint(AssignerStatus as
642651
true,
643652
ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
644653
final MySqlSnapshotSplitAssigner assigner =
645-
new MySqlSnapshotSplitAssigner(configuration, DEFAULT_PARALLELISM, checkpoint);
654+
new MySqlSnapshotSplitAssigner(
655+
configuration,
656+
DEFAULT_PARALLELISM,
657+
checkpoint,
658+
getMySqlSplitEnumeratorContext());
646659
return getSplitsFromAssigner(assigner);
647660
}
648661

0 commit comments

Comments
 (0)