Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,10 @@ public boolean isVarchar() {
return isScalarType(PrimitiveType.VARCHAR);
}

public boolean isChar() {
return isScalarType(PrimitiveType.CHAR);
}

public boolean isJsonbType() {
return isScalarType(PrimitiveType.JSONB);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ public String getTimeoutReason() {
log.warn("Failed to get task timeout reason, response: {}", response);
}
} catch (ExecutionException | InterruptedException ex) {
log.error("Send get fail reason request failed: ", ex);
log.error("Send get task fail reason request failed: ", ex);
}
return "";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,14 @@ public static List<Column> getColumns(JdbcClient jdbcClient,
} else {
col.setType(ScalarType.createVarcharType(len));
}
} else if (col.getType().isChar()) {
// The length of char needs to be multiplied by 3.
int len = col.getType().getLength() * 3;
if (len > ScalarType.MAX_CHAR_LENGTH) {
col.setType(ScalarType.createVarcharType(len));
} else {
col.setType(ScalarType.createCharType(len));
}
}

// string can not to be key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@

public class Constants {
public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
public static final long DEBEZIUM_HEARTBEAT_INTERVAL_MS = 10000L;
public static final long POLL_SPLIT_RECORDS_TIMEOUTS = 15000L;

// Debezium default properties
public static final long DEBEZIUM_HEARTBEAT_INTERVAL_MS = 10000L;
public static final String DEBEZIUM_MAX_QUEUE_SIZE = "162580";
public static final String DEBEZIUM_MAX_BATCH_SIZE = "40960";
public static final String DEBEZIUM_POLL_INTERVAL_MS = "50";
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,16 @@ private RecordWithMeta buildRecordResponse(
lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
recordResponse.setMeta(lastMeta);
}
if (count >= fetchRecord.getFetchSize()) {
return recordResponse;
}
}
}
} finally {
// The LSN in the commit is the current offset, which is the offset from the last
// successful write.
// Therefore, even if a subsequent write fails, it will not affect the commit.
sourceReader.commitSourceOffset(fetchRecord.getJobId(), readResult.getSplit());

// This must be called after commitSourceOffset; otherwise,
// PG's confirmed lsn will not proceed.
sourceReader.finishSplitRecords();
}

Expand All @@ -140,7 +144,6 @@ private RecordWithMeta buildRecordResponse(
throw new RuntimeException("split state is null");
}

sourceReader.commitSourceOffset(fetchRecord.getJobId(), readResult.getSplit());
return recordResponse;
}

Expand Down Expand Up @@ -222,9 +225,21 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception
}
}
} finally {
if (readResult != null) {
// The LSN in the commit is the current offset, which is the offset from the last
// successful write.
// Therefore, even if a subsequent write fails, it will not affect the commit.
sourceReader.commitSourceOffset(
writeRecordRequest.getJobId(), readResult.getSplit());
}

// This must be called after commitSourceOffset; otherwise,
// PG's confirmed lsn will not proceed.
// This operation must be performed before batchStreamLoad.commitOffset;
// otherwise, fe might issue the next task for this job.
sourceReader.finishSplitRecords();
}

// get offset from split state
try {
if (readResult.getSplitState() != null) {
// Set meta information for hw
Expand All @@ -251,9 +266,6 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception

// request fe api
batchStreamLoad.commitOffset(metaResponse, scannedRows, scannedBytes);

// commit source offset if need
sourceReader.commitSourceOffset(writeRecordRequest.getJobId(), readResult.getSplit());
} finally {
batchStreamLoad.resetTaskId();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ public abstract class JdbcIncrementalSourceReader implements SourceReader {
private static final Logger LOG = LoggerFactory.getLogger(JdbcIncrementalSourceReader.class);
private static ObjectMapper objectMapper = new ObjectMapper();
private SourceRecordDeserializer<SourceRecord, List<String>> serializer;
private IncrementalSourceScanFetcher snapshotReader;
private IncrementalSourceStreamFetcher binlogReader;
private Fetcher<SourceRecords, SourceSplitBase> currentReader;
private Map<TableId, TableChanges.TableChange> tableSchemas;
private SplitRecords currentSplitRecords;
Expand Down Expand Up @@ -164,7 +162,7 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc
SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
if (currentSplitRecords == null) {
Fetcher<SourceRecords, SourceSplitBase> currentReader = this.getCurrentReader();
if (currentReader == null || baseReq.isReload()) {
if (baseReq.isReload() || currentReader == null) {
LOG.info(
"No current reader or reload {}, create new split reader for job {}",
baseReq.isReload(),
Expand Down Expand Up @@ -476,7 +474,7 @@ private SplitRecords pollSplitRecordsWithSplit(SourceSplitBase split, JobBaseCon
sourceRecords =
pollUntilDataAvailable(currentReader, Constants.POLL_SPLIT_RECORDS_TIMEOUTS, 500);
if (currentReader instanceof IncrementalSourceScanFetcher) {
closeSnapshotReader();
closeCurrentReader();
}
return new SplitRecords(currentSplitId, sourceRecords.iterator());
}
Expand Down Expand Up @@ -540,30 +538,12 @@ private SourceRecords pollUntilDataAvailable(
return new SourceRecords(new ArrayList<>());
}

private void closeSnapshotReader() {
IncrementalSourceScanFetcher reusedSnapshotReader = this.getSnapshotReader();
if (reusedSnapshotReader != null) {
LOG.info(
"Close snapshot reader {}", reusedSnapshotReader.getClass().getCanonicalName());
reusedSnapshotReader.close();
Fetcher<SourceRecords, SourceSplitBase> currentReader = this.getCurrentReader();
if (reusedSnapshotReader == currentReader) {
this.setCurrentReader(null);
}
this.setSnapshotReader(null);
}
}

private void closeBinlogReader() {
IncrementalSourceStreamFetcher reusedBinlogReader = this.getBinlogReader();
if (reusedBinlogReader != null) {
LOG.info("Close binlog reader {}", reusedBinlogReader.getClass().getCanonicalName());
reusedBinlogReader.close();
Fetcher<SourceRecords, SourceSplitBase> currentReader = this.getCurrentReader();
if (reusedBinlogReader == currentReader) {
this.setCurrentReader(null);
}
this.setBinlogReader(null);
private void closeCurrentReader() {
Fetcher<SourceRecords, SourceSplitBase> currentReader = this.getCurrentReader();
if (currentReader != null) {
LOG.info("Close current reader {}", currentReader.getClass().getCanonicalName());
currentReader.close();
this.setCurrentReader(null);
}
}

Expand Down Expand Up @@ -617,8 +597,7 @@ public boolean isSnapshotSplit(SourceSplit split) {
public void finishSplitRecords() {
this.setCurrentSplitRecords(null);
// Close after each read, the binlog client will occupy the connection.
closeBinlogReader();
this.setCurrentReader(null);
closeCurrentReader();
}

private Map<TableId, TableChanges.TableChange> getTableSchemas(JobBaseConfig config) {
Expand All @@ -636,8 +615,7 @@ protected abstract Map<TableId, TableChanges.TableChange> discoverTableSchemas(
@Override
public void close(JobBaseConfig jobConfig) {
LOG.info("Close source reader for job {}", jobConfig.getJobId());
closeSnapshotReader();
closeBinlogReader();
closeCurrentReader();
currentReader = null;
currentSplitRecords = null;
currentSplit = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,6 @@ public class MySqlSourceReader implements SourceReader {
private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER =
new FlinkJsonTableChangeSerializer();
private SourceRecordDeserializer<SourceRecord, List<String>> serializer;
private SnapshotSplitReader snapshotReader;
private BinlogSplitReader binlogReader;
private DebeziumReader<SourceRecords, MySqlSplit> currentReader;
private Map<TableId, TableChanges.TableChange> tableSchemas;
private SplitRecords currentSplitRecords;
Expand Down Expand Up @@ -180,7 +178,7 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc
SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
if (currentSplitRecords == null) {
DebeziumReader<SourceRecords, MySqlSplit> currentReader = this.getCurrentReader();
if (currentReader == null || baseReq.isReload()) {
if (baseReq.isReload() || currentReader == null) {
LOG.info(
"No current reader or reload {}, create new split reader",
baseReq.isReload());
Expand Down Expand Up @@ -446,7 +444,7 @@ private SplitRecords pollSplitRecordsWithSplit(MySqlSplit split, JobBaseConfig j
sourceRecords =
pollUntilDataAvailable(currentReader, Constants.POLL_SPLIT_RECORDS_TIMEOUTS, 500);
if (currentReader instanceof SnapshotSplitReader) {
closeSnapshotReader();
closeCurrentReader();
}
return new SplitRecords(currentSplitId, sourceRecords.iterator());
}
Expand Down Expand Up @@ -514,61 +512,33 @@ private SourceRecords pollUntilDataAvailable(

private SnapshotSplitReader getSnapshotSplitReader(JobBaseConfig config) {
MySqlSourceConfig sourceConfig = getSourceConfig(config);
SnapshotSplitReader snapshotReader = this.getSnapshotReader();
if (snapshotReader == null) {
final MySqlConnection jdbcConnection =
DebeziumUtils.createMySqlConnection(sourceConfig);
final BinaryLogClient binaryLogClient =
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
final StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
snapshotReader = new SnapshotSplitReader(statefulTaskContext, 0);
this.setSnapshotReader(snapshotReader);
}
final MySqlConnection jdbcConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
final BinaryLogClient binaryLogClient =
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
final StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
SnapshotSplitReader snapshotReader = new SnapshotSplitReader(statefulTaskContext, 0);
return snapshotReader;
}

private BinlogSplitReader getBinlogSplitReader(JobBaseConfig config) {
MySqlSourceConfig sourceConfig = getSourceConfig(config);
BinlogSplitReader binlogReader = this.getBinlogReader();
if (binlogReader == null) {
final MySqlConnection jdbcConnection =
DebeziumUtils.createMySqlConnection(sourceConfig);
final BinaryLogClient binaryLogClient =
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
final StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
binlogReader = new BinlogSplitReader(statefulTaskContext, 0);
this.setBinlogReader(binlogReader);
}
final MySqlConnection jdbcConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
final BinaryLogClient binaryLogClient =
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
final StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection);
BinlogSplitReader binlogReader = new BinlogSplitReader(statefulTaskContext, 0);
return binlogReader;
}

private void closeSnapshotReader() {
SnapshotSplitReader reusedSnapshotReader = this.getSnapshotReader();
if (reusedSnapshotReader != null) {
LOG.info(
"Close snapshot reader {}", reusedSnapshotReader.getClass().getCanonicalName());
reusedSnapshotReader.close();
DebeziumReader<SourceRecords, MySqlSplit> currentReader = this.getCurrentReader();
if (reusedSnapshotReader == currentReader) {
this.setCurrentReader(null);
}
this.setSnapshotReader(null);
}
}

private void closeBinlogReader() {
BinlogSplitReader reusedBinlogReader = this.getBinlogReader();
if (reusedBinlogReader != null) {
LOG.info("Close binlog reader {}", reusedBinlogReader.getClass().getCanonicalName());
reusedBinlogReader.close();
DebeziumReader<SourceRecords, MySqlSplit> currentReader = this.getCurrentReader();
if (reusedBinlogReader == currentReader) {
this.setCurrentReader(null);
}
this.setBinlogReader(null);
private void closeCurrentReader() {
DebeziumReader<SourceRecords, MySqlSplit> currentReader = this.getCurrentReader();
if (currentReader != null) {
LOG.info("Close current reader {}", currentReader.getClass().getCanonicalName());
currentReader.close();
}
this.setCurrentReader(null);
}

private MySqlSourceConfig getSourceConfig(JobBaseConfig config) {
Expand Down Expand Up @@ -649,14 +619,8 @@ private MySqlSourceConfig generateMySqlConfig(Map<String, String> cdcConfig, Str
jdbcProperteis.putAll(cu.getOriginalProperties());
configFactory.jdbcProperties(jdbcProperteis);

// Properties dbzProps = new Properties();
// dbzProps.setProperty(
// MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS.name(),
// String.valueOf(Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS));
// configFactory.debeziumProperties(dbzProps);
//
// configFactory.heartbeatInterval(
// Duration.ofMillis(Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS));
Properties dbzProps = ConfigUtil.getDefaultDebeziumProps();
configFactory.debeziumProperties(dbzProps);
if (cdcConfig.containsKey(DataSourceConfigKeys.SPLIT_SIZE)) {
configFactory.splitSize(
Integer.parseInt(cdcConfig.get(DataSourceConfigKeys.SPLIT_SIZE)));
Expand Down Expand Up @@ -719,8 +683,7 @@ public boolean isSnapshotSplit(SourceSplit split) {
public void finishSplitRecords() {
this.setCurrentSplitRecords(null);
// Close after each read, the binlog client will occupy the connection.
closeBinlogReader();
this.setCurrentReader(null);
closeCurrentReader();
}

@Override
Expand Down Expand Up @@ -777,8 +740,7 @@ private Map<TableId, TableChanges.TableChange> discoverTableSchemas(JobBaseConfi
@Override
public void close(JobBaseConfig jobConfig) {
LOG.info("Close source reader for job {}", jobConfig.getJobId());
closeSnapshotReader();
closeBinlogReader();
closeCurrentReader();
currentReader = null;
currentSplitRecords = null;
if (tableSchemas != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ private PostgresSourceConfig generatePostgresConfig(Map<String, String> cdcConfi
Integer.parseInt(cdcConfig.get(DataSourceConfigKeys.SPLIT_SIZE)));
}

Properties dbzProps = new Properties();
Properties dbzProps = ConfigUtil.getDefaultDebeziumProps();
dbzProps.put("interval.handling.mode", "string");
configFactory.debeziumProperties(dbzProps);

Expand All @@ -219,28 +219,22 @@ private String getSlotName(Long jobId) {
@Override
protected IncrementalSourceScanFetcher getSnapshotSplitReader(JobBaseConfig config) {
PostgresSourceConfig sourceConfig = getSourceConfig(config);
IncrementalSourceScanFetcher snapshotReader = this.getSnapshotReader();
if (snapshotReader == null) {
PostgresDialect dialect = new PostgresDialect(sourceConfig);
PostgresSourceFetchTaskContext taskContext =
new PostgresSourceFetchTaskContext(sourceConfig, dialect);
snapshotReader = new IncrementalSourceScanFetcher(taskContext, 0);
this.setSnapshotReader(snapshotReader);
}
PostgresDialect dialect = new PostgresDialect(sourceConfig);
PostgresSourceFetchTaskContext taskContext =
new PostgresSourceFetchTaskContext(sourceConfig, dialect);
IncrementalSourceScanFetcher snapshotReader =
new IncrementalSourceScanFetcher(taskContext, 0);
return snapshotReader;
}

@Override
protected IncrementalSourceStreamFetcher getBinlogSplitReader(JobBaseConfig config) {
PostgresSourceConfig sourceConfig = getSourceConfig(config);
IncrementalSourceStreamFetcher binlogReader = this.getBinlogReader();
if (binlogReader == null) {
PostgresDialect dialect = new PostgresDialect(sourceConfig);
PostgresSourceFetchTaskContext taskContext =
new PostgresSourceFetchTaskContext(sourceConfig, dialect);
binlogReader = new IncrementalSourceStreamFetcher(taskContext, 0);
this.setBinlogReader(binlogReader);
}
PostgresDialect dialect = new PostgresDialect(sourceConfig);
PostgresSourceFetchTaskContext taskContext =
new PostgresSourceFetchTaskContext(sourceConfig, dialect);
IncrementalSourceStreamFetcher binlogReader =
new IncrementalSourceStreamFetcher(taskContext, 0);
return binlogReader;
}

Expand Down
Loading
Loading