Skip to content

Commit 07cd88e

Browse files
branch-4.0: [Fix](Streamingjob) fix postgres incr consumer too slow #59919 (#59977)
Cherry-picked from #59919 Co-authored-by: wudi <[email protected]>
1 parent d704481 commit 07cd88e

File tree

14 files changed

+102
-127
lines changed

14 files changed

+102
-127
lines changed

fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,10 @@ public boolean isVarchar() {
556556
return isScalarType(PrimitiveType.VARCHAR);
557557
}
558558

559+
public boolean isChar() {
560+
return isScalarType(PrimitiveType.CHAR);
561+
}
562+
559563
public boolean isJsonbType() {
560564
return isScalarType(PrimitiveType.JSONB);
561565
}

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ public String getTimeoutReason() {
306306
log.warn("Failed to get task timeout reason, response: {}", response);
307307
}
308308
} catch (ExecutionException | InterruptedException ex) {
309-
log.error("Send get fail reason request failed: ", ex);
309+
log.error("Send get task fail reason request failed: ", ex);
310310
}
311311
return "";
312312
}

fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,14 @@ public static List<Column> getColumns(JdbcClient jdbcClient,
355355
} else {
356356
col.setType(ScalarType.createVarcharType(len));
357357
}
358+
} else if (col.getType().isChar()) {
359+
// The length of char needs to be multiplied by 3.
360+
int len = col.getType().getLength() * 3;
361+
if (len > ScalarType.MAX_CHAR_LENGTH) {
362+
col.setType(ScalarType.createVarcharType(len));
363+
} else {
364+
col.setType(ScalarType.createCharType(len));
365+
}
358366
}
359367

360368
// string can not to be key

fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@
1919

2020
public class Constants {
2121
public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
22-
public static final long DEBEZIUM_HEARTBEAT_INTERVAL_MS = 10000L;
2322
public static final long POLL_SPLIT_RECORDS_TIMEOUTS = 15000L;
23+
24+
// Debezium default properties
25+
public static final long DEBEZIUM_HEARTBEAT_INTERVAL_MS = 10000L;
26+
public static final String DEBEZIUM_MAX_QUEUE_SIZE = "162580";
27+
public static final String DEBEZIUM_MAX_BATCH_SIZE = "40960";
28+
public static final String DEBEZIUM_POLL_INTERVAL_MS = "50";
2429
}

fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -111,12 +111,16 @@ private RecordWithMeta buildRecordResponse(
111111
lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
112112
recordResponse.setMeta(lastMeta);
113113
}
114-
if (count >= fetchRecord.getFetchSize()) {
115-
return recordResponse;
116-
}
117114
}
118115
}
119116
} finally {
117+
// The LSN in the commit is the current offset, which is the offset from the last
118+
// successful write.
119+
// Therefore, even if a subsequent write fails, it will not affect the commit.
120+
sourceReader.commitSourceOffset(fetchRecord.getJobId(), readResult.getSplit());
121+
122+
// This must be called after commitSourceOffset; otherwise,
123+
// PG's confirmed lsn will not proceed.
120124
sourceReader.finishSplitRecords();
121125
}
122126

@@ -140,7 +144,6 @@ private RecordWithMeta buildRecordResponse(
140144
throw new RuntimeException("split state is null");
141145
}
142146

143-
sourceReader.commitSourceOffset(fetchRecord.getJobId(), readResult.getSplit());
144147
return recordResponse;
145148
}
146149

@@ -222,9 +225,21 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception
222225
}
223226
}
224227
} finally {
228+
if (readResult != null) {
229+
// The LSN in the commit is the current offset, which is the offset from the last
230+
// successful write.
231+
// Therefore, even if a subsequent write fails, it will not affect the commit.
232+
sourceReader.commitSourceOffset(
233+
writeRecordRequest.getJobId(), readResult.getSplit());
234+
}
235+
236+
// This must be called after commitSourceOffset; otherwise,
237+
// PG's confirmed lsn will not proceed.
238+
// This operation must be performed before batchStreamLoad.commitOffset;
239+
// otherwise, fe might issue the next task for this job.
225240
sourceReader.finishSplitRecords();
226241
}
227-
242+
// get offset from split state
228243
try {
229244
if (readResult.getSplitState() != null) {
230245
// Set meta information for hw
@@ -251,9 +266,6 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception
251266

252267
// request fe api
253268
batchStreamLoad.commitOffset(metaResponse, scannedRows, scannedBytes);
254-
255-
// commit source offset if need
256-
sourceReader.commitSourceOffset(writeRecordRequest.getJobId(), readResult.getSplit());
257269
} finally {
258270
batchStreamLoad.resetTaskId();
259271
}

fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java

Lines changed: 10 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,6 @@ public abstract class JdbcIncrementalSourceReader implements SourceReader {
8484
private static final Logger LOG = LoggerFactory.getLogger(JdbcIncrementalSourceReader.class);
8585
private static ObjectMapper objectMapper = new ObjectMapper();
8686
private SourceRecordDeserializer<SourceRecord, List<String>> serializer;
87-
private IncrementalSourceScanFetcher snapshotReader;
88-
private IncrementalSourceStreamFetcher binlogReader;
8987
private Fetcher<SourceRecords, SourceSplitBase> currentReader;
9088
private Map<TableId, TableChanges.TableChange> tableSchemas;
9189
private SplitRecords currentSplitRecords;
@@ -164,7 +162,7 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc
164162
SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
165163
if (currentSplitRecords == null) {
166164
Fetcher<SourceRecords, SourceSplitBase> currentReader = this.getCurrentReader();
167-
if (currentReader == null || baseReq.isReload()) {
165+
if (baseReq.isReload() || currentReader == null) {
168166
LOG.info(
169167
"No current reader or reload {}, create new split reader for job {}",
170168
baseReq.isReload(),
@@ -476,7 +474,7 @@ private SplitRecords pollSplitRecordsWithSplit(SourceSplitBase split, JobBaseCon
476474
sourceRecords =
477475
pollUntilDataAvailable(currentReader, Constants.POLL_SPLIT_RECORDS_TIMEOUTS, 500);
478476
if (currentReader instanceof IncrementalSourceScanFetcher) {
479-
closeSnapshotReader();
477+
closeCurrentReader();
480478
}
481479
return new SplitRecords(currentSplitId, sourceRecords.iterator());
482480
}
@@ -540,30 +538,12 @@ private SourceRecords pollUntilDataAvailable(
540538
return new SourceRecords(new ArrayList<>());
541539
}
542540

543-
private void closeSnapshotReader() {
544-
IncrementalSourceScanFetcher reusedSnapshotReader = this.getSnapshotReader();
545-
if (reusedSnapshotReader != null) {
546-
LOG.info(
547-
"Close snapshot reader {}", reusedSnapshotReader.getClass().getCanonicalName());
548-
reusedSnapshotReader.close();
549-
Fetcher<SourceRecords, SourceSplitBase> currentReader = this.getCurrentReader();
550-
if (reusedSnapshotReader == currentReader) {
551-
this.setCurrentReader(null);
552-
}
553-
this.setSnapshotReader(null);
554-
}
555-
}
556-
557-
private void closeBinlogReader() {
558-
IncrementalSourceStreamFetcher reusedBinlogReader = this.getBinlogReader();
559-
if (reusedBinlogReader != null) {
560-
LOG.info("Close binlog reader {}", reusedBinlogReader.getClass().getCanonicalName());
561-
reusedBinlogReader.close();
562-
Fetcher<SourceRecords, SourceSplitBase> currentReader = this.getCurrentReader();
563-
if (reusedBinlogReader == currentReader) {
564-
this.setCurrentReader(null);
565-
}
566-
this.setBinlogReader(null);
541+
private void closeCurrentReader() {
542+
Fetcher<SourceRecords, SourceSplitBase> currentReader = this.getCurrentReader();
543+
if (currentReader != null) {
544+
LOG.info("Close current reader {}", currentReader.getClass().getCanonicalName());
545+
currentReader.close();
546+
this.setCurrentReader(null);
567547
}
568548
}
569549

@@ -617,8 +597,7 @@ public boolean isSnapshotSplit(SourceSplit split) {
617597
public void finishSplitRecords() {
618598
this.setCurrentSplitRecords(null);
619599
// Close after each read, the binlog client will occupy the connection.
620-
closeBinlogReader();
621-
this.setCurrentReader(null);
600+
closeCurrentReader();
622601
}
623602

624603
private Map<TableId, TableChanges.TableChange> getTableSchemas(JobBaseConfig config) {
@@ -636,8 +615,7 @@ protected abstract Map<TableId, TableChanges.TableChange> discoverTableSchemas(
636615
@Override
637616
public void close(JobBaseConfig jobConfig) {
638617
LOG.info("Close source reader for job {}", jobConfig.getJobId());
639-
closeSnapshotReader();
640-
closeBinlogReader();
618+
closeCurrentReader();
641619
currentReader = null;
642620
currentSplitRecords = null;
643621
currentSplit = null;

fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java

Lines changed: 24 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,6 @@ public class MySqlSourceReader implements SourceReader {
108108
private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER =
109109
new FlinkJsonTableChangeSerializer();
110110
private SourceRecordDeserializer<SourceRecord, List<String>> serializer;
111-
private SnapshotSplitReader snapshotReader;
112-
private BinlogSplitReader binlogReader;
113111
private DebeziumReader<SourceRecords, MySqlSplit> currentReader;
114112
private Map<TableId, TableChanges.TableChange> tableSchemas;
115113
private SplitRecords currentSplitRecords;
@@ -180,7 +178,7 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc
180178
SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
181179
if (currentSplitRecords == null) {
182180
DebeziumReader<SourceRecords, MySqlSplit> currentReader = this.getCurrentReader();
183-
if (currentReader == null || baseReq.isReload()) {
181+
if (baseReq.isReload() || currentReader == null) {
184182
LOG.info(
185183
"No current reader or reload {}, create new split reader",
186184
baseReq.isReload());
@@ -446,7 +444,7 @@ private SplitRecords pollSplitRecordsWithSplit(MySqlSplit split, JobBaseConfig j
446444
sourceRecords =
447445
pollUntilDataAvailable(currentReader, Constants.POLL_SPLIT_RECORDS_TIMEOUTS, 500);
448446
if (currentReader instanceof SnapshotSplitReader) {
449-
closeSnapshotReader();
447+
closeCurrentReader();
450448
}
451449
return new SplitRecords(currentSplitId, sourceRecords.iterator());
452450
}
@@ -514,61 +512,33 @@ private SourceRecords pollUntilDataAvailable(
514512

515513
private SnapshotSplitReader getSnapshotSplitReader(JobBaseConfig config) {
516514
MySqlSourceConfig sourceConfig = getSourceConfig(config);
517-
SnapshotSplitReader snapshotReader = this.getSnapshotReader();
518-
if (snapshotReader == null) {
519-
final MySqlConnection jdbcConnection =
520-
DebeziumUtils.createMySqlConnection(sourceConfig);
521-
final BinaryLogClient binaryLogClient =
522-
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
523-
final StatefulTaskContext statefulTaskContext =
524-
new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
525-
snapshotReader = new SnapshotSplitReader(statefulTaskContext, 0);
526-
this.setSnapshotReader(snapshotReader);
527-
}
515+
final MySqlConnection jdbcConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
516+
final BinaryLogClient binaryLogClient =
517+
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
518+
final StatefulTaskContext statefulTaskContext =
519+
new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
520+
SnapshotSplitReader snapshotReader = new SnapshotSplitReader(statefulTaskContext, 0);
528521
return snapshotReader;
529522
}
530523

531524
private BinlogSplitReader getBinlogSplitReader(JobBaseConfig config) {
532525
MySqlSourceConfig sourceConfig = getSourceConfig(config);
533-
BinlogSplitReader binlogReader = this.getBinlogReader();
534-
if (binlogReader == null) {
535-
final MySqlConnection jdbcConnection =
536-
DebeziumUtils.createMySqlConnection(sourceConfig);
537-
final BinaryLogClient binaryLogClient =
538-
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
539-
final StatefulTaskContext statefulTaskContext =
540-
new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
541-
binlogReader = new BinlogSplitReader(statefulTaskContext, 0);
542-
this.setBinlogReader(binlogReader);
543-
}
526+
final MySqlConnection jdbcConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
527+
final BinaryLogClient binaryLogClient =
528+
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
529+
final StatefulTaskContext statefulTaskContext =
530+
new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
531+
BinlogSplitReader binlogReader = new BinlogSplitReader(statefulTaskContext, 0);
544532
return binlogReader;
545533
}
546534

547-
private void closeSnapshotReader() {
548-
SnapshotSplitReader reusedSnapshotReader = this.getSnapshotReader();
549-
if (reusedSnapshotReader != null) {
550-
LOG.info(
551-
"Close snapshot reader {}", reusedSnapshotReader.getClass().getCanonicalName());
552-
reusedSnapshotReader.close();
553-
DebeziumReader<SourceRecords, MySqlSplit> currentReader = this.getCurrentReader();
554-
if (reusedSnapshotReader == currentReader) {
555-
this.setCurrentReader(null);
556-
}
557-
this.setSnapshotReader(null);
558-
}
559-
}
560-
561-
private void closeBinlogReader() {
562-
BinlogSplitReader reusedBinlogReader = this.getBinlogReader();
563-
if (reusedBinlogReader != null) {
564-
LOG.info("Close binlog reader {}", reusedBinlogReader.getClass().getCanonicalName());
565-
reusedBinlogReader.close();
566-
DebeziumReader<SourceRecords, MySqlSplit> currentReader = this.getCurrentReader();
567-
if (reusedBinlogReader == currentReader) {
568-
this.setCurrentReader(null);
569-
}
570-
this.setBinlogReader(null);
535+
private void closeCurrentReader() {
536+
DebeziumReader<SourceRecords, MySqlSplit> currentReader = this.getCurrentReader();
537+
if (currentReader != null) {
538+
LOG.info("Close current reader {}", currentReader.getClass().getCanonicalName());
539+
currentReader.close();
571540
}
541+
this.setCurrentReader(null);
572542
}
573543

574544
private MySqlSourceConfig getSourceConfig(JobBaseConfig config) {
@@ -649,14 +619,8 @@ private MySqlSourceConfig generateMySqlConfig(Map<String, String> cdcConfig, Str
649619
jdbcProperteis.putAll(cu.getOriginalProperties());
650620
configFactory.jdbcProperties(jdbcProperteis);
651621

652-
// Properties dbzProps = new Properties();
653-
// dbzProps.setProperty(
654-
// MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS.name(),
655-
// String.valueOf(Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS));
656-
// configFactory.debeziumProperties(dbzProps);
657-
//
658-
// configFactory.heartbeatInterval(
659-
// Duration.ofMillis(Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS));
622+
Properties dbzProps = ConfigUtil.getDefaultDebeziumProps();
623+
configFactory.debeziumProperties(dbzProps);
660624
if (cdcConfig.containsKey(DataSourceConfigKeys.SPLIT_SIZE)) {
661625
configFactory.splitSize(
662626
Integer.parseInt(cdcConfig.get(DataSourceConfigKeys.SPLIT_SIZE)));
@@ -719,8 +683,7 @@ public boolean isSnapshotSplit(SourceSplit split) {
719683
public void finishSplitRecords() {
720684
this.setCurrentSplitRecords(null);
721685
// Close after each read, the binlog client will occupy the connection.
722-
closeBinlogReader();
723-
this.setCurrentReader(null);
686+
closeCurrentReader();
724687
}
725688

726689
@Override
@@ -777,8 +740,7 @@ private Map<TableId, TableChanges.TableChange> discoverTableSchemas(JobBaseConfi
777740
@Override
778741
public void close(JobBaseConfig jobConfig) {
779742
LOG.info("Close source reader for job {}", jobConfig.getJobId());
780-
closeSnapshotReader();
781-
closeBinlogReader();
743+
closeCurrentReader();
782744
currentReader = null;
783745
currentSplitRecords = null;
784746
if (tableSchemas != null) {

fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ private PostgresSourceConfig generatePostgresConfig(Map<String, String> cdcConfi
200200
Integer.parseInt(cdcConfig.get(DataSourceConfigKeys.SPLIT_SIZE)));
201201
}
202202

203-
Properties dbzProps = new Properties();
203+
Properties dbzProps = ConfigUtil.getDefaultDebeziumProps();
204204
dbzProps.put("interval.handling.mode", "string");
205205
configFactory.debeziumProperties(dbzProps);
206206

@@ -219,28 +219,22 @@ private String getSlotName(Long jobId) {
219219
@Override
220220
protected IncrementalSourceScanFetcher getSnapshotSplitReader(JobBaseConfig config) {
221221
PostgresSourceConfig sourceConfig = getSourceConfig(config);
222-
IncrementalSourceScanFetcher snapshotReader = this.getSnapshotReader();
223-
if (snapshotReader == null) {
224-
PostgresDialect dialect = new PostgresDialect(sourceConfig);
225-
PostgresSourceFetchTaskContext taskContext =
226-
new PostgresSourceFetchTaskContext(sourceConfig, dialect);
227-
snapshotReader = new IncrementalSourceScanFetcher(taskContext, 0);
228-
this.setSnapshotReader(snapshotReader);
229-
}
222+
PostgresDialect dialect = new PostgresDialect(sourceConfig);
223+
PostgresSourceFetchTaskContext taskContext =
224+
new PostgresSourceFetchTaskContext(sourceConfig, dialect);
225+
IncrementalSourceScanFetcher snapshotReader =
226+
new IncrementalSourceScanFetcher(taskContext, 0);
230227
return snapshotReader;
231228
}
232229

233230
@Override
234231
protected IncrementalSourceStreamFetcher getBinlogSplitReader(JobBaseConfig config) {
235232
PostgresSourceConfig sourceConfig = getSourceConfig(config);
236-
IncrementalSourceStreamFetcher binlogReader = this.getBinlogReader();
237-
if (binlogReader == null) {
238-
PostgresDialect dialect = new PostgresDialect(sourceConfig);
239-
PostgresSourceFetchTaskContext taskContext =
240-
new PostgresSourceFetchTaskContext(sourceConfig, dialect);
241-
binlogReader = new IncrementalSourceStreamFetcher(taskContext, 0);
242-
this.setBinlogReader(binlogReader);
243-
}
233+
PostgresDialect dialect = new PostgresDialect(sourceConfig);
234+
PostgresSourceFetchTaskContext taskContext =
235+
new PostgresSourceFetchTaskContext(sourceConfig, dialect);
236+
IncrementalSourceStreamFetcher binlogReader =
237+
new IncrementalSourceStreamFetcher(taskContext, 0);
244238
return binlogReader;
245239
}
246240

0 commit comments

Comments
 (0)