Skip to content

Commit 6bee2d9

Browse files
committed
fix
1 parent 022929f commit 6bee2d9

File tree

7 files changed

+112
-165
lines changed

7 files changed

+112
-165
lines changed

fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java

Lines changed: 64 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -113,36 +113,38 @@ private RecordWithMeta buildRecordResponse(
113113
}
114114
}
115115
}
116+
} finally {
117+
// The LSN in the commit is the current offset, which is the offset from the last
118+
// successful write.
119+
// Therefore, even if a subsequent write fails, it will not affect the commit.
120+
sourceReader.commitSourceOffset(fetchRecord.getJobId(), readResult.getSplit());
116121

117-
if (readResult.getSplitState() != null) {
118-
// Set meta information for hw
119-
if (sourceReader.isSnapshotSplit(split)) {
120-
Map<String, String> offsetRes =
121-
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
122-
offsetRes.put(SPLIT_ID, split.splitId());
123-
recordResponse.setMeta(offsetRes);
124-
}
122+
// This must be called after commitSourceOffset; otherwise,
123+
// PG's confirmed lsn will not proceed.
124+
sourceReader.finishSplitRecords();
125+
}
125126

126-
// set meta for binlog event
127-
if (sourceReader.isBinlogSplit(split)) {
128-
Map<String, String> offsetRes =
129-
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
130-
offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
131-
recordResponse.setMeta(offsetRes);
132-
}
133-
} else {
134-
throw new RuntimeException("split state is null");
127+
if (readResult.getSplitState() != null) {
128+
// Set meta information for hw
129+
if (sourceReader.isSnapshotSplit(split)) {
130+
Map<String, String> offsetRes =
131+
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
132+
offsetRes.put(SPLIT_ID, split.splitId());
133+
recordResponse.setMeta(offsetRes);
135134
}
136135

137-
sourceReader.commitSourceOffset(fetchRecord.getJobId(), readResult.getSplit());
138-
return recordResponse;
139-
} finally {
140-
// This must be called after commitSourceOffset; otherwise, PG's confirmed lsn will not
141-
// proceed.
142-
if (sourceReader != null) {
143-
sourceReader.finishSplitRecords();
136+
// set meta for binlog event
137+
if (sourceReader.isBinlogSplit(split)) {
138+
Map<String, String> offsetRes =
139+
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
140+
offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
141+
recordResponse.setMeta(offsetRes);
144142
}
143+
} else {
144+
throw new RuntimeException("split state is null");
145145
}
146+
147+
return recordResponse;
146148
}
147149

148150
public CompletableFuture<Void> writeRecordsAsync(WriteRecordRequest writeRecordRequest) {
@@ -222,47 +224,50 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception
222224
break;
223225
}
224226
}
227+
} finally {
228+
if (readResult != null) {
229+
// The LSN in the commit is the current offset, which is the offset from the last
230+
// successful write.
231+
// Therefore, even if a subsequent write fails, it will not affect the commit.
232+
sourceReader.commitSourceOffset(
233+
writeRecordRequest.getJobId(), readResult.getSplit());
234+
}
225235

226-
// get offset from split state
227-
try {
228-
if (readResult.getSplitState() != null) {
229-
// Set meta information for hw
230-
if (sourceReader.isSnapshotSplit(readResult.getSplit())) {
231-
Map<String, String> offsetRes =
232-
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
233-
offsetRes.put(SPLIT_ID, readResult.getSplit().splitId());
234-
metaResponse = offsetRes;
235-
}
236-
237-
// set meta for binlog event
238-
if (sourceReader.isBinlogSplit(readResult.getSplit())) {
239-
Map<String, String> offsetRes =
240-
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
241-
offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
242-
metaResponse = offsetRes;
243-
}
244-
} else {
245-
throw new RuntimeException("split state is null");
236+
// This must be called after commitSourceOffset; otherwise,
237+
// PG's confirmed lsn will not proceed.
238+
// This operation must be performed before batchStreamLoad.commitOffset;
239+
// otherwise, fe might issue the next task for this job.
240+
sourceReader.finishSplitRecords();
241+
}
242+
// get offset from split state
243+
try {
244+
if (readResult.getSplitState() != null) {
245+
// Set meta information for hw
246+
if (sourceReader.isSnapshotSplit(readResult.getSplit())) {
247+
Map<String, String> offsetRes =
248+
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
249+
offsetRes.put(SPLIT_ID, readResult.getSplit().splitId());
250+
metaResponse = offsetRes;
246251
}
247252

248-
// wait all stream load finish
249-
batchStreamLoad.forceFlush();
253+
// set meta for binlog event
254+
if (sourceReader.isBinlogSplit(readResult.getSplit())) {
255+
Map<String, String> offsetRes =
256+
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
257+
offsetRes.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
258+
metaResponse = offsetRes;
259+
}
260+
} else {
261+
throw new RuntimeException("split state is null");
262+
}
250263

251-
// request fe api
252-
batchStreamLoad.commitOffset(metaResponse, scannedRows, scannedBytes);
264+
// wait all stream load finish
265+
batchStreamLoad.forceFlush();
253266

254-
// commit source offset if need
255-
sourceReader.commitSourceOffset(
256-
writeRecordRequest.getJobId(), readResult.getSplit());
257-
} finally {
258-
batchStreamLoad.resetTaskId();
259-
}
267+
// request fe api
268+
batchStreamLoad.commitOffset(metaResponse, scannedRows, scannedBytes);
260269
} finally {
261-
// This must be called after commitSourceOffset; otherwise, PG's confirmed lsn will not
262-
// proceed.
263-
if (sourceReader != null) {
264-
sourceReader.finishSplitRecords();
265-
}
270+
batchStreamLoad.resetTaskId();
266271
}
267272
}
268273

fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java

Lines changed: 12 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ public abstract class JdbcIncrementalSourceReader implements SourceReader {
8484
private static final Logger LOG = LoggerFactory.getLogger(JdbcIncrementalSourceReader.class);
8585
private static ObjectMapper objectMapper = new ObjectMapper();
8686
private SourceRecordDeserializer<SourceRecord, List<String>> serializer;
87-
private IncrementalSourceScanFetcher snapshotReader;
88-
private IncrementalSourceStreamFetcher binlogReader;
87+
// private IncrementalSourceScanFetcher snapshotReader;
88+
// private IncrementalSourceStreamFetcher binlogReader;
8989
private Fetcher<SourceRecords, SourceSplitBase> currentReader;
9090
private Map<TableId, TableChanges.TableChange> tableSchemas;
9191
private SplitRecords currentSplitRecords;
@@ -164,7 +164,7 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc
164164
SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
165165
if (currentSplitRecords == null) {
166166
Fetcher<SourceRecords, SourceSplitBase> currentReader = this.getCurrentReader();
167-
if (currentReader == null || baseReq.isReload()) {
167+
if (baseReq.isReload() || currentReader == null) {
168168
LOG.info(
169169
"No current reader or reload {}, create new split reader for job {}",
170170
baseReq.isReload(),
@@ -476,7 +476,7 @@ private SplitRecords pollSplitRecordsWithSplit(SourceSplitBase split, JobBaseCon
476476
sourceRecords =
477477
pollUntilDataAvailable(currentReader, Constants.POLL_SPLIT_RECORDS_TIMEOUTS, 500);
478478
if (currentReader instanceof IncrementalSourceScanFetcher) {
479-
closeSnapshotReader();
479+
closeCurrentReader();
480480
}
481481
return new SplitRecords(currentSplitId, sourceRecords.iterator());
482482
}
@@ -540,30 +540,12 @@ private SourceRecords pollUntilDataAvailable(
540540
return new SourceRecords(new ArrayList<>());
541541
}
542542

543-
private void closeSnapshotReader() {
544-
IncrementalSourceScanFetcher reusedSnapshotReader = this.getSnapshotReader();
545-
if (reusedSnapshotReader != null) {
546-
LOG.info(
547-
"Close snapshot reader {}", reusedSnapshotReader.getClass().getCanonicalName());
548-
reusedSnapshotReader.close();
549-
Fetcher<SourceRecords, SourceSplitBase> currentReader = this.getCurrentReader();
550-
if (reusedSnapshotReader == currentReader) {
551-
this.setCurrentReader(null);
552-
}
553-
this.setSnapshotReader(null);
554-
}
555-
}
556-
557-
private void closeBinlogReader() {
558-
IncrementalSourceStreamFetcher reusedBinlogReader = this.getBinlogReader();
559-
if (reusedBinlogReader != null) {
560-
LOG.info("Close binlog reader {}", reusedBinlogReader.getClass().getCanonicalName());
561-
reusedBinlogReader.close();
562-
Fetcher<SourceRecords, SourceSplitBase> currentReader = this.getCurrentReader();
563-
if (reusedBinlogReader == currentReader) {
564-
this.setCurrentReader(null);
565-
}
566-
this.setBinlogReader(null);
543+
private void closeCurrentReader() {
544+
Fetcher<SourceRecords, SourceSplitBase> currentReader = this.getCurrentReader();
545+
if (currentReader != null) {
546+
LOG.info("Close current reader {}", currentReader.getClass().getCanonicalName());
547+
currentReader.close();
548+
this.setCurrentReader(null);
567549
}
568550
}
569551

@@ -617,8 +599,7 @@ public boolean isSnapshotSplit(SourceSplit split) {
617599
public void finishSplitRecords() {
618600
this.setCurrentSplitRecords(null);
619601
// Close after each read, the binlog client will occupy the connection.
620-
closeBinlogReader();
621-
this.setCurrentReader(null);
602+
closeCurrentReader();
622603
}
623604

624605
private Map<TableId, TableChanges.TableChange> getTableSchemas(JobBaseConfig config) {
@@ -636,8 +617,7 @@ protected abstract Map<TableId, TableChanges.TableChange> discoverTableSchemas(
636617
@Override
637618
public void close(JobBaseConfig jobConfig) {
638619
LOG.info("Close source reader for job {}", jobConfig.getJobId());
639-
closeSnapshotReader();
640-
closeBinlogReader();
620+
closeCurrentReader();
641621
currentReader = null;
642622
currentSplitRecords = null;
643623
currentSplit = null;

fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java

Lines changed: 22 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,6 @@ public class MySqlSourceReader implements SourceReader {
108108
private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER =
109109
new FlinkJsonTableChangeSerializer();
110110
private SourceRecordDeserializer<SourceRecord, List<String>> serializer;
111-
private SnapshotSplitReader snapshotReader;
112-
private BinlogSplitReader binlogReader;
113111
private DebeziumReader<SourceRecords, MySqlSplit> currentReader;
114112
private Map<TableId, TableChanges.TableChange> tableSchemas;
115113
private SplitRecords currentSplitRecords;
@@ -180,7 +178,7 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc
180178
SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
181179
if (currentSplitRecords == null) {
182180
DebeziumReader<SourceRecords, MySqlSplit> currentReader = this.getCurrentReader();
183-
if (currentReader == null || baseReq.isReload()) {
181+
if (baseReq.isReload() || currentReader == null) {
184182
LOG.info(
185183
"No current reader or reload {}, create new split reader",
186184
baseReq.isReload());
@@ -446,7 +444,7 @@ private SplitRecords pollSplitRecordsWithSplit(MySqlSplit split, JobBaseConfig j
446444
sourceRecords =
447445
pollUntilDataAvailable(currentReader, Constants.POLL_SPLIT_RECORDS_TIMEOUTS, 500);
448446
if (currentReader instanceof SnapshotSplitReader) {
449-
closeSnapshotReader();
447+
closeCurrentReader();
450448
}
451449
return new SplitRecords(currentSplitId, sourceRecords.iterator());
452450
}
@@ -514,61 +512,33 @@ private SourceRecords pollUntilDataAvailable(
514512

515513
private SnapshotSplitReader getSnapshotSplitReader(JobBaseConfig config) {
516514
MySqlSourceConfig sourceConfig = getSourceConfig(config);
517-
SnapshotSplitReader snapshotReader = this.getSnapshotReader();
518-
if (snapshotReader == null) {
519-
final MySqlConnection jdbcConnection =
520-
DebeziumUtils.createMySqlConnection(sourceConfig);
521-
final BinaryLogClient binaryLogClient =
522-
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
523-
final StatefulTaskContext statefulTaskContext =
524-
new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
525-
snapshotReader = new SnapshotSplitReader(statefulTaskContext, 0);
526-
this.setSnapshotReader(snapshotReader);
527-
}
515+
final MySqlConnection jdbcConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
516+
final BinaryLogClient binaryLogClient =
517+
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
518+
final StatefulTaskContext statefulTaskContext =
519+
new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
520+
SnapshotSplitReader snapshotReader = new SnapshotSplitReader(statefulTaskContext, 0);
528521
return snapshotReader;
529522
}
530523

531524
private BinlogSplitReader getBinlogSplitReader(JobBaseConfig config) {
532525
MySqlSourceConfig sourceConfig = getSourceConfig(config);
533-
BinlogSplitReader binlogReader = this.getBinlogReader();
534-
if (binlogReader == null) {
535-
final MySqlConnection jdbcConnection =
536-
DebeziumUtils.createMySqlConnection(sourceConfig);
537-
final BinaryLogClient binaryLogClient =
538-
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
539-
final StatefulTaskContext statefulTaskContext =
540-
new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
541-
binlogReader = new BinlogSplitReader(statefulTaskContext, 0);
542-
this.setBinlogReader(binlogReader);
543-
}
526+
final MySqlConnection jdbcConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
527+
final BinaryLogClient binaryLogClient =
528+
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
529+
final StatefulTaskContext statefulTaskContext =
530+
new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
531+
BinlogSplitReader binlogReader = new BinlogSplitReader(statefulTaskContext, 0);
544532
return binlogReader;
545533
}
546534

547-
private void closeSnapshotReader() {
548-
SnapshotSplitReader reusedSnapshotReader = this.getSnapshotReader();
549-
if (reusedSnapshotReader != null) {
550-
LOG.info(
551-
"Close snapshot reader {}", reusedSnapshotReader.getClass().getCanonicalName());
552-
reusedSnapshotReader.close();
553-
DebeziumReader<SourceRecords, MySqlSplit> currentReader = this.getCurrentReader();
554-
if (reusedSnapshotReader == currentReader) {
555-
this.setCurrentReader(null);
556-
}
557-
this.setSnapshotReader(null);
558-
}
559-
}
560-
561-
private void closeBinlogReader() {
562-
BinlogSplitReader reusedBinlogReader = this.getBinlogReader();
563-
if (reusedBinlogReader != null) {
564-
LOG.info("Close binlog reader {}", reusedBinlogReader.getClass().getCanonicalName());
565-
reusedBinlogReader.close();
566-
DebeziumReader<SourceRecords, MySqlSplit> currentReader = this.getCurrentReader();
567-
if (reusedBinlogReader == currentReader) {
568-
this.setCurrentReader(null);
569-
}
570-
this.setBinlogReader(null);
535+
private void closeCurrentReader() {
536+
DebeziumReader<SourceRecords, MySqlSplit> currentReader = this.getCurrentReader();
537+
if (currentReader != null) {
538+
LOG.info("Close current reader {}", currentReader.getClass().getCanonicalName());
539+
currentReader.close();
571540
}
541+
this.setCurrentReader(null);
572542
}
573543

574544
private MySqlSourceConfig getSourceConfig(JobBaseConfig config) {
@@ -719,8 +689,7 @@ public boolean isSnapshotSplit(SourceSplit split) {
719689
public void finishSplitRecords() {
720690
this.setCurrentSplitRecords(null);
721691
// Close after each read, the binlog client will occupy the connection.
722-
closeBinlogReader();
723-
this.setCurrentReader(null);
692+
closeCurrentReader();
724693
}
725694

726695
@Override
@@ -777,8 +746,7 @@ private Map<TableId, TableChanges.TableChange> discoverTableSchemas(JobBaseConfi
777746
@Override
778747
public void close(JobBaseConfig jobConfig) {
779748
LOG.info("Close source reader for job {}", jobConfig.getJobId());
780-
closeSnapshotReader();
781-
closeBinlogReader();
749+
closeCurrentReader();
782750
currentReader = null;
783751
currentSplitRecords = null;
784752
if (tableSchemas != null) {

0 commit comments

Comments
 (0)