diff --git a/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java b/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java index 094766c51e4f96..3cd318aa6c5fe6 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java +++ b/fe/fe-common/src/main/java/org/apache/doris/catalog/Type.java @@ -556,6 +556,10 @@ public boolean isVarchar() { return isScalarType(PrimitiveType.VARCHAR); } + public boolean isChar() { + return isScalarType(PrimitiveType.CHAR); + } + public boolean isJsonbType() { return isScalarType(PrimitiveType.JSONB); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java index 1f955f0a2c3d3d..76dc77dfab7836 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java @@ -306,7 +306,7 @@ public String getTimeoutReason() { log.warn("Failed to get task timeout reason, response: {}", response); } } catch (ExecutionException | InterruptedException ex) { - log.error("Send get fail reason request failed: ", ex); + log.error("Send get task fail reason request failed: ", ex); } return ""; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java index 05c7570712484f..4164dcaa262827 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java @@ -355,6 +355,14 @@ public static List getColumns(JdbcClient jdbcClient, } else { col.setType(ScalarType.createVarcharType(len)); } + } else if (col.getType().isChar()) { + // The length of char needs to be multiplied by 3. + int len = col.getType().getLength() * 3; + if (len > ScalarType.MAX_CHAR_LENGTH) { + col.setType(ScalarType.createVarcharType(len)); + } else { + col.setType(ScalarType.createCharType(len)); + } } // string can not to be key diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java index 2afacf6b9e7cdd..93b141ae600c6a 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java @@ -19,6 +19,11 @@ public class Constants { public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__"; - public static final long DEBEZIUM_HEARTBEAT_INTERVAL_MS = 10000L; public static final long POLL_SPLIT_RECORDS_TIMEOUTS = 15000L; + + // Debezium default properties + public static final long DEBEZIUM_HEARTBEAT_INTERVAL_MS = 10000L; + public static final String DEBEZIUM_MAX_QUEUE_SIZE = "162580"; + public static final String DEBEZIUM_MAX_BATCH_SIZE = "40960"; + public static final String DEBEZIUM_POLL_INTERVAL_MS = "50"; } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java index 73be6f16828878..3245e00d4ccb74 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java @@ -111,12 +111,16 @@ private RecordWithMeta buildRecordResponse( lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID); recordResponse.setMeta(lastMeta); } - if (count >= fetchRecord.getFetchSize()) { - return recordResponse; - } } } } finally { + // The LSN in the commit is the current offset, which is the offset from the last + // successful write. + // Therefore, even if a subsequent write fails, it will not affect the commit. + sourceReader.commitSourceOffset(fetchRecord.getJobId(), readResult.getSplit()); + + // This must be called after commitSourceOffset; otherwise, + // PG's confirmed lsn will not proceed. sourceReader.finishSplitRecords(); } @@ -140,7 +144,6 @@ private RecordWithMeta buildRecordResponse( throw new RuntimeException("split state is null"); } - sourceReader.commitSourceOffset(fetchRecord.getJobId(), readResult.getSplit()); return recordResponse; } @@ -222,9 +225,21 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception } } } finally { + if (readResult != null) { + // The LSN in the commit is the current offset, which is the offset from the last + // successful write. + // Therefore, even if a subsequent write fails, it will not affect the commit. + sourceReader.commitSourceOffset( + writeRecordRequest.getJobId(), readResult.getSplit()); + } + + // This must be called after commitSourceOffset; otherwise, + // PG's confirmed lsn will not proceed. + // This operation must be performed before batchStreamLoad.commitOffset; + // otherwise, fe might issue the next task for this job. sourceReader.finishSplitRecords(); } - + // get offset from split state try { if (readResult.getSplitState() != null) { // Set meta information for hw @@ -251,9 +266,6 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception // request fe api batchStreamLoad.commitOffset(metaResponse, scannedRows, scannedBytes); - - // commit source offset if need - sourceReader.commitSourceOffset(writeRecordRequest.getJobId(), readResult.getSplit()); } finally { batchStreamLoad.resetTaskId(); } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java index 70ab3961acc3a1..5b404932c3f2d7 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java @@ -84,8 +84,6 @@ public abstract class JdbcIncrementalSourceReader implements SourceReader { private static final Logger LOG = LoggerFactory.getLogger(JdbcIncrementalSourceReader.class); private static ObjectMapper objectMapper = new ObjectMapper(); private SourceRecordDeserializer> serializer; - private IncrementalSourceScanFetcher snapshotReader; - private IncrementalSourceStreamFetcher binlogReader; private Fetcher currentReader; private Map tableSchemas; private SplitRecords currentSplitRecords; @@ -164,7 +162,7 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc SplitRecords currentSplitRecords = this.getCurrentSplitRecords(); if (currentSplitRecords == null) { Fetcher currentReader = this.getCurrentReader(); - if (currentReader == null || baseReq.isReload()) { + if (baseReq.isReload() || currentReader == null) { LOG.info( "No current reader or reload {}, create new split reader for job {}", baseReq.isReload(), @@ -476,7 +474,7 @@ private SplitRecords pollSplitRecordsWithSplit(SourceSplitBase split, JobBaseCon sourceRecords = pollUntilDataAvailable(currentReader, Constants.POLL_SPLIT_RECORDS_TIMEOUTS, 500); if (currentReader instanceof IncrementalSourceScanFetcher) { - closeSnapshotReader(); + closeCurrentReader(); } return new SplitRecords(currentSplitId, sourceRecords.iterator()); } @@ -540,30 +538,12 @@ private SourceRecords pollUntilDataAvailable( return new SourceRecords(new ArrayList<>()); } - private void closeSnapshotReader() { - IncrementalSourceScanFetcher reusedSnapshotReader = this.getSnapshotReader(); - if (reusedSnapshotReader != null) { - LOG.info( - "Close snapshot reader {}", reusedSnapshotReader.getClass().getCanonicalName()); - reusedSnapshotReader.close(); - Fetcher currentReader = this.getCurrentReader(); - if (reusedSnapshotReader == currentReader) { - this.setCurrentReader(null); - } - this.setSnapshotReader(null); - } - } - - private void closeBinlogReader() { - IncrementalSourceStreamFetcher reusedBinlogReader = this.getBinlogReader(); - if (reusedBinlogReader != null) { - LOG.info("Close binlog reader {}", reusedBinlogReader.getClass().getCanonicalName()); - reusedBinlogReader.close(); - Fetcher currentReader = this.getCurrentReader(); - if (reusedBinlogReader == currentReader) { - this.setCurrentReader(null); - } - this.setBinlogReader(null); + private void closeCurrentReader() { + Fetcher currentReader = this.getCurrentReader(); + if (currentReader != null) { + LOG.info("Close current reader {}", currentReader.getClass().getCanonicalName()); + currentReader.close(); + this.setCurrentReader(null); } } @@ -617,8 +597,7 @@ public boolean isSnapshotSplit(SourceSplit split) { public void finishSplitRecords() { this.setCurrentSplitRecords(null); // Close after each read, the binlog client will occupy the connection. - closeBinlogReader(); - this.setCurrentReader(null); + closeCurrentReader(); } private Map getTableSchemas(JobBaseConfig config) { @@ -636,8 +615,7 @@ protected abstract Map discoverTableSchemas( @Override public void close(JobBaseConfig jobConfig) { LOG.info("Close source reader for job {}", jobConfig.getJobId()); - closeSnapshotReader(); - closeBinlogReader(); + closeCurrentReader(); currentReader = null; currentSplitRecords = null; currentSplit = null; diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java index 795deb55c261d4..d20225c0974cbb 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java @@ -108,8 +108,6 @@ public class MySqlSourceReader implements SourceReader { private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER = new FlinkJsonTableChangeSerializer(); private SourceRecordDeserializer> serializer; - private SnapshotSplitReader snapshotReader; - private BinlogSplitReader binlogReader; private DebeziumReader currentReader; private Map tableSchemas; private SplitRecords currentSplitRecords; @@ -180,7 +178,7 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc SplitRecords currentSplitRecords = this.getCurrentSplitRecords(); if (currentSplitRecords == null) { DebeziumReader currentReader = this.getCurrentReader(); - if (currentReader == null || baseReq.isReload()) { + if (baseReq.isReload() || currentReader == null) { LOG.info( "No current reader or reload {}, create new split reader", baseReq.isReload()); @@ -446,7 +444,7 @@ private SplitRecords pollSplitRecordsWithSplit(MySqlSplit split, JobBaseConfig j sourceRecords = pollUntilDataAvailable(currentReader, Constants.POLL_SPLIT_RECORDS_TIMEOUTS, 500); if (currentReader instanceof SnapshotSplitReader) { - closeSnapshotReader(); + closeCurrentReader(); } return new SplitRecords(currentSplitId, sourceRecords.iterator()); } @@ -514,61 +512,33 @@ private SourceRecords pollUntilDataAvailable( private SnapshotSplitReader getSnapshotSplitReader(JobBaseConfig config) { MySqlSourceConfig sourceConfig = getSourceConfig(config); - SnapshotSplitReader snapshotReader = this.getSnapshotReader(); - if (snapshotReader == null) { - final MySqlConnection jdbcConnection = - DebeziumUtils.createMySqlConnection(sourceConfig); - final BinaryLogClient binaryLogClient = - DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); - final StatefulTaskContext statefulTaskContext = - new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection); - snapshotReader = new SnapshotSplitReader(statefulTaskContext, 0); - this.setSnapshotReader(snapshotReader); - } + final MySqlConnection jdbcConnection = DebeziumUtils.createMySqlConnection(sourceConfig); + final BinaryLogClient binaryLogClient = + DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); + final StatefulTaskContext statefulTaskContext = + new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection); + SnapshotSplitReader snapshotReader = new SnapshotSplitReader(statefulTaskContext, 0); return snapshotReader; } private BinlogSplitReader getBinlogSplitReader(JobBaseConfig config) { MySqlSourceConfig sourceConfig = getSourceConfig(config); - BinlogSplitReader binlogReader = this.getBinlogReader(); - if (binlogReader == null) { - final MySqlConnection jdbcConnection = - DebeziumUtils.createMySqlConnection(sourceConfig); - final BinaryLogClient binaryLogClient = - DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); - final StatefulTaskContext statefulTaskContext = - new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection); - binlogReader = new BinlogSplitReader(statefulTaskContext, 0); - this.setBinlogReader(binlogReader); - } + final MySqlConnection jdbcConnection = DebeziumUtils.createMySqlConnection(sourceConfig); + final BinaryLogClient binaryLogClient = + DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration()); + final StatefulTaskContext statefulTaskContext = + new StatefulTaskContext(sourceConfig, binaryLogClient, jdbcConnection); + BinlogSplitReader binlogReader = new BinlogSplitReader(statefulTaskContext, 0); return binlogReader; } - private void closeSnapshotReader() { - SnapshotSplitReader reusedSnapshotReader = this.getSnapshotReader(); - if (reusedSnapshotReader != null) { - LOG.info( - "Close snapshot reader {}", reusedSnapshotReader.getClass().getCanonicalName()); - reusedSnapshotReader.close(); - DebeziumReader currentReader = this.getCurrentReader(); - if (reusedSnapshotReader == currentReader) { - this.setCurrentReader(null); - } - this.setSnapshotReader(null); - } - } - - private void closeBinlogReader() { - BinlogSplitReader reusedBinlogReader = this.getBinlogReader(); - if (reusedBinlogReader != null) { - LOG.info("Close binlog reader {}", reusedBinlogReader.getClass().getCanonicalName()); - reusedBinlogReader.close(); - DebeziumReader currentReader = this.getCurrentReader(); - if (reusedBinlogReader == currentReader) { - this.setCurrentReader(null); - } - this.setBinlogReader(null); + private void closeCurrentReader() { + DebeziumReader currentReader = this.getCurrentReader(); + if (currentReader != null) { + LOG.info("Close current reader {}", currentReader.getClass().getCanonicalName()); + currentReader.close(); } + this.setCurrentReader(null); } private MySqlSourceConfig getSourceConfig(JobBaseConfig config) { @@ -649,14 +619,8 @@ private MySqlSourceConfig generateMySqlConfig(Map cdcConfig, Str jdbcProperteis.putAll(cu.getOriginalProperties()); configFactory.jdbcProperties(jdbcProperteis); - // Properties dbzProps = new Properties(); - // dbzProps.setProperty( - // MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS.name(), - // String.valueOf(Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS)); - // configFactory.debeziumProperties(dbzProps); - // - // configFactory.heartbeatInterval( - // Duration.ofMillis(Constants.DEBEZIUM_HEARTBEAT_INTERVAL_MS)); + Properties dbzProps = ConfigUtil.getDefaultDebeziumProps(); + configFactory.debeziumProperties(dbzProps); if (cdcConfig.containsKey(DataSourceConfigKeys.SPLIT_SIZE)) { configFactory.splitSize( Integer.parseInt(cdcConfig.get(DataSourceConfigKeys.SPLIT_SIZE))); @@ -719,8 +683,7 @@ public boolean isSnapshotSplit(SourceSplit split) { public void finishSplitRecords() { this.setCurrentSplitRecords(null); // Close after each read, the binlog client will occupy the connection. - closeBinlogReader(); - this.setCurrentReader(null); + closeCurrentReader(); } @Override @@ -777,8 +740,7 @@ private Map discoverTableSchemas(JobBaseConfi @Override public void close(JobBaseConfig jobConfig) { LOG.info("Close source reader for job {}", jobConfig.getJobId()); - closeSnapshotReader(); - closeBinlogReader(); + closeCurrentReader(); currentReader = null; currentSplitRecords = null; if (tableSchemas != null) { diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java index 53b648bf38afcb..c1e062df5a6b34 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java @@ -200,7 +200,7 @@ private PostgresSourceConfig generatePostgresConfig(Map cdcConfi Integer.parseInt(cdcConfig.get(DataSourceConfigKeys.SPLIT_SIZE))); } - Properties dbzProps = new Properties(); + Properties dbzProps = ConfigUtil.getDefaultDebeziumProps(); dbzProps.put("interval.handling.mode", "string"); configFactory.debeziumProperties(dbzProps); @@ -219,28 +219,22 @@ private String getSlotName(Long jobId) { @Override protected IncrementalSourceScanFetcher getSnapshotSplitReader(JobBaseConfig config) { PostgresSourceConfig sourceConfig = getSourceConfig(config); - IncrementalSourceScanFetcher snapshotReader = this.getSnapshotReader(); - if (snapshotReader == null) { - PostgresDialect dialect = new PostgresDialect(sourceConfig); - PostgresSourceFetchTaskContext taskContext = - new PostgresSourceFetchTaskContext(sourceConfig, dialect); - snapshotReader = new IncrementalSourceScanFetcher(taskContext, 0); - this.setSnapshotReader(snapshotReader); - } + PostgresDialect dialect = new PostgresDialect(sourceConfig); + PostgresSourceFetchTaskContext taskContext = + new PostgresSourceFetchTaskContext(sourceConfig, dialect); + IncrementalSourceScanFetcher snapshotReader = + new IncrementalSourceScanFetcher(taskContext, 0); return snapshotReader; } @Override protected IncrementalSourceStreamFetcher getBinlogSplitReader(JobBaseConfig config) { PostgresSourceConfig sourceConfig = getSourceConfig(config); - IncrementalSourceStreamFetcher binlogReader = this.getBinlogReader(); - if (binlogReader == null) { - PostgresDialect dialect = new PostgresDialect(sourceConfig); - PostgresSourceFetchTaskContext taskContext = - new PostgresSourceFetchTaskContext(sourceConfig, dialect); - binlogReader = new IncrementalSourceStreamFetcher(taskContext, 0); - this.setBinlogReader(binlogReader); - } + PostgresDialect dialect = new PostgresDialect(sourceConfig); + PostgresSourceFetchTaskContext taskContext = + new PostgresSourceFetchTaskContext(sourceConfig, dialect); + IncrementalSourceStreamFetcher binlogReader = + new IncrementalSourceStreamFetcher(taskContext, 0); return binlogReader; } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java index b8503adf7b5302..016c9ddf312e0d 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java @@ -17,10 +17,13 @@ package org.apache.doris.cdcclient.utils; +import org.apache.doris.cdcclient.common.Constants; + import org.apache.commons.lang3.StringUtils; import java.time.ZoneId; import java.util.Map; +import java.util.Properties; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; @@ -93,6 +96,15 @@ public static ZoneId getPostgresServerTimeZoneFromProps(java.util.Properties pro return ZoneId.systemDefault(); } + /** Optimized debezium parameters */ + public static Properties getDefaultDebeziumProps() { + Properties properties = new Properties(); + properties.setProperty("max.queue.size", Constants.DEBEZIUM_MAX_QUEUE_SIZE); + properties.setProperty("max.batch.size", Constants.DEBEZIUM_MAX_BATCH_SIZE); + properties.setProperty("poll.interval.ms", Constants.DEBEZIUM_POLL_INTERVAL_MS); + return properties; + } + public static boolean is13Timestamp(String s) { return s != null && s.matches("\\d{13}"); } diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.out index ca7379dbcf5182..e4878ae835914f 100644 --- a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.out +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.out @@ -34,7 +34,7 @@ datetime datetime Yes false \N NONE timestamp1 datetime Yes false \N NONE timestamp2 datetime(3) Yes false \N NONE timestamp3 datetime(6) Yes false \N NONE -char char(5) Yes false \N NONE +char char(15) Yes false \N NONE varchar varchar(30) Yes false \N NONE text text Yes false \N NONE blob text Yes false \N NONE diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out index 1efdc563a1474b..878a4a01f72cff 100644 --- a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out @@ -7,7 +7,7 @@ bigint_col bigint Yes false \N NONE real_col float Yes false \N NONE double_col double Yes false \N NONE numeric_col decimal(20,6) Yes false \N NONE -char_col char(10) Yes false \N NONE +char_col char(30) Yes false \N NONE varchar_col text Yes false \N NONE text_col text Yes false \N NONE boolean_col boolean Yes false \N NONE diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy index 2da9437ab4b4ec..2febce4cb36909 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy @@ -146,7 +146,7 @@ suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_do sql """DELETE FROM ${mysqlDb}.${table1} WHERE name = 'A1';""" } - sleep(30000); // wait for cdc incremental data + sleep(60000); // wait for cdc incremental data // check incremental data qt_select_binlog_table1 """ SELECT * FROM ${table1} order by name asc """ @@ -163,7 +163,7 @@ suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_do sql """INSERT INTO ${mysqlDb}.${table1} (name,age) VALUES ('Apache',40);""" } - sleep(30000); // wait for cdc incremental data + sleep(60000); // wait for cdc incremental data // check incremental data qt_select_next_binlog_table1 """ SELECT * FROM ${table1} order by name asc """ diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.groovy index 564f4d6c4abde8..1955476bee154c 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.groovy @@ -140,7 +140,7 @@ suite("test_streaming_mysql_job_all_type", "p0,external,mysql,external_docker,ex sql """insert into ${mysqlDb}.${table1} values (2,100,100000,1000000000,100000000000,12345,12345.67,123456789.12345,12345678901234567890.1234567890,123456789012345678901234567890.123456789012345678901234567890,12345.6789,123.456,true,-10,-1000,-100000,-100000000,-1000000000000,-12345.6789,-123.456,-12345,-12345.67,-123456789.12345,-12345678901234567890.1234567890,-123456789012345678901234567890.123456789012345678901234567890,2024,'12:34:56','12:34:56.789','12:34:56.789123','2024-01-01','2024-01-01 12:34:56','2024-01-01 12:34:56','2024-01-01 12:34:56.789','2024-01-01 12:34:56.789123','hello','hello123','this is a text field','this is a blob','{"id":10,"name":"mock"}','Option2',b'101010','bin_data_123','varbin_data','Value2');""" } - sleep(30000); // wait for cdc incremental data + sleep(60000); // wait for cdc incremental data // check incremental data qt_select_all_types_null2 """select * from ${currentDb}.${table1} order by 1;""" diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy index 541941b816cf74..57666ddf5db16a 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.groovy @@ -128,7 +128,7 @@ suite("test_streaming_postgres_job_all_type", "p0,external,pg,external_docker,ex sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (2,2,200,2000,7.89,0.12,99999.000001,'char2','varchar2','another text',false,'2025-01-01','23:59:59','23:59:59+00','2025-01-01 23:59:59','2025-01-01 23:59:59+00','2 hours',decode('DEADBEEF', 'hex'),'11111111-2222-3333-4444-555555555556'::uuid,'{"x":10}','{"y":20}','10.0.0.1','10.0.0.0/16','08:00:2b:aa:bb:cc',B'11110000',B'1111',ARRAY[10,20],ARRAY['x','y'],'(3,4)');""" } - sleep(30000); // wait for cdc incremental data + sleep(60000); // wait for cdc incremental data // check incremental data qt_select_all_types_null2 """select * from ${currentDb}.${table1} order by 1;"""