Skip to content

Commit c9a2732

Browse files
[FLINK-37509][base&mysql] Increment scan source no need to buffer data when enable skipping backfill. (#3964)
1 parent e35180d commit c9a2732

File tree

17 files changed

+625
-380
lines changed

17 files changed

+625
-380
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545

4646
import java.io.IOException;
4747
import java.util.ArrayDeque;
48+
import java.util.Collections;
4849
import java.util.HashSet;
4950
import java.util.Iterator;
5051
import java.util.Set;
@@ -92,7 +93,6 @@ public IncrementalSourceSplitReader(
9293

9394
@Override
9495
public RecordsWithSplitIds<SourceRecords> fetch() throws IOException {
95-
9696
try {
9797
suspendStreamReaderIfNeed();
9898
return pollSplitRecords();
@@ -145,13 +145,13 @@ private ChangeEventRecords pollSplitRecords() throws InterruptedException {
145145
Iterator<SourceRecords> dataIt = null;
146146
if (currentFetcher == null) {
147147
// (1) Reads stream split firstly and then read snapshot split
148-
if (streamSplits.size() > 0) {
148+
if (!streamSplits.isEmpty()) {
149149
// the stream split may come from:
150150
// (a) the initial stream split
151151
// (b) added back stream-split in newly added table process
152152
StreamSplit nextSplit = streamSplits.poll();
153153
submitStreamSplit(nextSplit);
154-
} else if (snapshotSplits.size() > 0) {
154+
} else if (!snapshotSplits.isEmpty()) {
155155
submitSnapshotSplit(snapshotSplits.poll());
156156
} else {
157157
LOG.info("No available split to read.");
@@ -162,19 +162,21 @@ private ChangeEventRecords pollSplitRecords() throws InterruptedException {
162162
} else {
163163
currentSplitId = null;
164164
}
165-
return dataIt == null ? finishedSplit() : forRecords(dataIt);
165+
return dataIt == null ? finishedSplit(true) : forUnfinishedRecords(dataIt);
166166
} else if (currentFetcher instanceof IncrementalSourceScanFetcher) {
167-
// (2) try to switch to stream split reading util current snapshot split finished
168167
dataIt = currentFetcher.pollSplitRecords();
169168
if (dataIt != null) {
170169
// first fetch data of snapshot split, return and emit the records of snapshot split
171-
ChangeEventRecords records;
170+
return forUnfinishedRecords(dataIt);
171+
} else {
172+
// (2) try to switch to stream split reading util current snapshot split finished
173+
ChangeEventRecords finishedRecords;
172174
if (context.isHasAssignedStreamSplit()) {
173-
records = forNewAddedTableFinishedSplit(currentSplitId, dataIt);
175+
finishedRecords = forNewAddedTableFinishedSplit(currentSplitId);
174176
closeScanFetcher();
175177
closeStreamFetcher();
176178
} else {
177-
records = forRecords(dataIt);
179+
finishedRecords = finishedSplit(false);
178180
SnapshotSplit nextSplit = snapshotSplits.poll();
179181
if (nextSplit != null) {
180182
checkState(reusedScanFetcher != null);
@@ -183,9 +185,7 @@ private ChangeEventRecords pollSplitRecords() throws InterruptedException {
183185
closeScanFetcher();
184186
}
185187
}
186-
return records;
187-
} else {
188-
return finishedSplit();
188+
return finishedRecords;
189189
}
190190
} else if (currentFetcher instanceof IncrementalSourceStreamFetcher) {
191191
// (3) switch to snapshot split reading if there are newly added snapshot splits
@@ -203,7 +203,7 @@ private ChangeEventRecords pollSplitRecords() throws InterruptedException {
203203
// null will be returned after receiving suspend stream event
204204
// finish current stream split reading
205205
closeStreamFetcher();
206-
return finishedSplit();
206+
return finishedSplit(true);
207207
}
208208
} else {
209209
throw new IllegalStateException("Unsupported reader type.");
@@ -215,9 +215,12 @@ public boolean canAssignNextSplit() {
215215
return currentFetcher == null || currentFetcher.isFinished();
216216
}
217217

218-
private ChangeEventRecords finishedSplit() {
218+
private ChangeEventRecords finishedSplit(boolean recycleScanFetcher) {
219219
final ChangeEventRecords finishedRecords =
220220
ChangeEventRecords.forFinishedSplit(currentSplitId);
221+
if (recycleScanFetcher) {
222+
closeScanFetcher();
223+
}
221224
currentSplitId = null;
222225
return finishedRecords;
223226
}
@@ -226,24 +229,16 @@ private ChangeEventRecords finishedSplit() {
226229
* Finishes new added snapshot split, mark the stream split as finished too, we will add the
227230
* stream split back in {@code MySqlSourceReader}.
228231
*/
229-
private ChangeEventRecords forNewAddedTableFinishedSplit(
230-
final String splitId, final Iterator<SourceRecords> recordsForSplit) {
232+
private ChangeEventRecords forNewAddedTableFinishedSplit(final String splitId) {
231233
final Set<String> finishedSplits = new HashSet<>();
232234
finishedSplits.add(splitId);
233235
finishedSplits.add(STREAM_SPLIT_ID);
234236
currentSplitId = null;
235-
return new ChangeEventRecords(splitId, recordsForSplit, finishedSplits);
237+
return new ChangeEventRecords(splitId, Collections.emptyIterator(), finishedSplits);
236238
}
237239

238-
private ChangeEventRecords forRecords(Iterator<SourceRecords> dataIt) {
239-
if (currentFetcher instanceof IncrementalSourceScanFetcher) {
240-
final ChangeEventRecords finishedRecords =
241-
ChangeEventRecords.forSnapshotRecords(currentSplitId, dataIt);
242-
closeScanFetcher();
243-
return finishedRecords;
244-
} else {
245-
return ChangeEventRecords.forRecords(currentSplitId, dataIt);
246-
}
240+
private ChangeEventRecords forUnfinishedRecords(Iterator<SourceRecords> dataIt) {
241+
return ChangeEventRecords.forRecords(currentSplitId, dataIt);
247242
}
248243

249244
private void submitSnapshotSplit(SnapshotSplit snapshotSplit) {

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/IncrementalSourceScanFetcher.java

Lines changed: 74 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import javax.annotation.Nullable;
3636

3737
import java.util.ArrayList;
38+
import java.util.Collections;
3839
import java.util.HashMap;
3940
import java.util.Iterator;
4041
import java.util.List;
@@ -115,64 +116,87 @@ public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
115116
checkReadException();
116117

117118
if (hasNextElement.get()) {
118-
// eg:
119-
// data input: [low watermark event][snapshot events][high watermark event][change
120-
// events][end watermark event]
121-
// data output: [low watermark event][normalized events][high watermark event]
122-
boolean reachChangeLogStart = false;
123-
boolean reachChangeLogEnd = false;
124-
SourceRecord lowWatermark = null;
125-
SourceRecord highWatermark = null;
126-
Map<Struct, SourceRecord> outputBuffer = new HashMap<>();
127-
while (!reachChangeLogEnd) {
128-
checkReadException();
129-
List<DataChangeEvent> batch = queue.poll();
130-
for (DataChangeEvent event : batch) {
131-
SourceRecord record = event.getRecord();
132-
if (lowWatermark == null) {
133-
lowWatermark = record;
134-
assertLowWatermark(lowWatermark);
135-
continue;
136-
}
119+
if (taskContext.getSourceConfig().isSkipSnapshotBackfill()) {
120+
return pollWithoutBuffer();
121+
} else {
122+
return pollWithBuffer();
123+
}
124+
}
125+
// the data has been polled, no more data
126+
reachEnd.compareAndSet(false, true);
127+
return null;
128+
}
137129

138-
if (highWatermark == null && isHighWatermarkEvent(record)) {
139-
highWatermark = record;
140-
// snapshot events capture end and begin to capture stream events
141-
reachChangeLogStart = true;
142-
continue;
143-
}
130+
public Iterator<SourceRecords> pollWithoutBuffer() throws InterruptedException {
131+
checkReadException();
132+
List<DataChangeEvent> batch = queue.poll();
133+
final List<SourceRecord> records = new ArrayList<>();
134+
for (DataChangeEvent event : batch) {
135+
if (isEndWatermarkEvent(event.getRecord())) {
136+
hasNextElement.set(false);
137+
break;
138+
}
139+
records.add(event.getRecord());
140+
}
144141

145-
if (reachChangeLogStart && isEndWatermarkEvent(record)) {
146-
// capture to end watermark events, stop the loop
147-
reachChangeLogEnd = true;
148-
break;
149-
}
142+
return Collections.singletonList(new SourceRecords(records)).iterator();
143+
}
144+
145+
public Iterator<SourceRecords> pollWithBuffer() throws InterruptedException {
146+
// eg:
147+
// data input: [low watermark event][snapshot events][high watermark event][change
148+
// events][end watermark event]
149+
// data output: [low watermark event][normalized events][high watermark event]
150+
boolean reachChangeLogStart = false;
151+
boolean reachChangeLogEnd = false;
152+
SourceRecord lowWatermark = null;
153+
SourceRecord highWatermark = null;
154+
Map<Struct, SourceRecord> outputBuffer = new HashMap<>();
155+
while (!reachChangeLogEnd) {
156+
checkReadException();
157+
List<DataChangeEvent> batch = queue.poll();
158+
for (DataChangeEvent event : batch) {
159+
SourceRecord record = event.getRecord();
160+
if (lowWatermark == null) {
161+
lowWatermark = record;
162+
assertLowWatermark(lowWatermark);
163+
continue;
164+
}
165+
166+
if (highWatermark == null && isHighWatermarkEvent(record)) {
167+
highWatermark = record;
168+
// snapshot events capture end and begin to capture stream events
169+
reachChangeLogStart = true;
170+
continue;
171+
}
172+
173+
if (reachChangeLogStart && isEndWatermarkEvent(record)) {
174+
// capture to end watermark events, stop the loop
175+
reachChangeLogEnd = true;
176+
break;
177+
}
150178

151-
if (!reachChangeLogStart) {
152-
outputBuffer.put((Struct) record.key(), record);
153-
} else {
154-
if (isChangeRecordInChunkRange(record)) {
155-
// rewrite overlapping snapshot records through the record key
156-
taskContext.rewriteOutputBuffer(outputBuffer, record);
157-
}
179+
if (!reachChangeLogStart) {
180+
outputBuffer.put((Struct) record.key(), record);
181+
} else {
182+
if (isChangeRecordInChunkRange(record)) {
183+
// rewrite overlapping snapshot records through the record key
184+
taskContext.rewriteOutputBuffer(outputBuffer, record);
158185
}
159186
}
160187
}
161-
// snapshot split return its data once
162-
hasNextElement.set(false);
188+
}
189+
// snapshot split return its data once
190+
hasNextElement.set(false);
163191

164-
final List<SourceRecord> normalizedRecords = new ArrayList<>();
165-
normalizedRecords.add(lowWatermark);
166-
normalizedRecords.addAll(taskContext.formatMessageTimestamp(outputBuffer.values()));
167-
normalizedRecords.add(highWatermark);
192+
final List<SourceRecord> normalizedRecords = new ArrayList<>();
193+
normalizedRecords.add(lowWatermark);
194+
normalizedRecords.addAll(taskContext.formatMessageTimestamp(outputBuffer.values()));
195+
normalizedRecords.add(highWatermark);
168196

169-
final List<SourceRecords> sourceRecordsSet = new ArrayList<>();
170-
sourceRecordsSet.add(new SourceRecords(normalizedRecords));
171-
return sourceRecordsSet.iterator();
172-
}
173-
// the data has been polled, no more data
174-
reachEnd.compareAndSet(false, true);
175-
return null;
197+
final List<SourceRecords> sourceRecordsSet = new ArrayList<>();
198+
sourceRecordsSet.add(new SourceRecords(normalizedRecords));
199+
return sourceRecordsSet.iterator();
176200
}
177201

178202
private void checkReadException() {

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,15 @@ private List<String> testBackfillWhenWritingEvents(
517517
mongoCollection.updateOne(
518518
Filters.eq("cid", 2000L), Updates.set("address", "Pittsburgh"));
519519
mongoCollection.deleteOne(Filters.eq("cid", 1019L));
520+
521+
// Rarely happens, but if there's no operation or heartbeat events between
522+
// watermark #a (the ChangeStream opLog caused by the last event in this hook)
523+
// and watermark #b (the calculated high watermark that limits the bounded
524+
// back-filling stream fetch task), the last event of hook will be missed since
525+
// back-filling task reads between [loW, hiW) (high watermark not included).
526+
// Workaround: insert a dummy event in another collection to forcefully push
527+
// opLog forward.
528+
database.getCollection("customers_1").insertOne(new Document());
520529
};
521530

522531
switch (hookType) {

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -786,6 +786,8 @@ private void testNewlyAddedCollectionOneByOne(
786786
waitForUpsertSinkSize("sink", fetchedDataList.size());
787787
MongoDBAssertUtils.assertEqualsInAnyOrder(
788788
fetchedDataList, TestValuesTableFactory.getResultsAsStrings("sink"));
789+
// Wait 1s until snapshot phase finished, make sure the binlog data is not lost.
790+
Thread.sleep(1000L);
789791

790792
// step 3: make some changelog data for this round
791793
makeFirstPartOplogForAddressCollection(

0 commit comments

Comments
 (0)