From 306a3dc79571920b9812858fc856d852d4cf0bdd Mon Sep 17 00:00:00 2001 From: wudi Date: Mon, 12 Jan 2026 23:48:37 +0800 Subject: [PATCH 1/6] fix pg consumer in multi backend --- .../cdcclient/source/reader/JdbcIncrementalSourceReader.java | 5 ++++- .../cdcclient/source/reader/mysql/MySqlSourceReader.java | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java index f9e11f6b029aa3..e1aaa1a821a268 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java @@ -171,7 +171,7 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc // build split Tuple2 splitFlag = createSourceSplit(offsetMeta, baseReq); split = splitFlag.f0; - closeBinlogReader(); + // closeBinlogReader(); currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq); this.setCurrentSplitRecords(currentSplitRecords); this.setCurrentSplit(split); @@ -615,6 +615,9 @@ public boolean isSnapshotSplit(SourceSplit split) { @Override public void finishSplitRecords() { this.setCurrentSplitRecords(null); + // Close after each read, the binlog client will occupy the connection. + closeBinlogReader(); + this.setCurrentReader(null); } private Map getTableSchemas(JobBaseConfig config) { diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java index 27fbf3be88b363..a3f14a953b6231 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java @@ -188,7 +188,7 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc Tuple2 splitFlag = createMySqlSplit(offsetMeta, baseReq); split = splitFlag.f0; // reset binlog reader - closeBinlogReader(); + // closeBinlogReader(); currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq); this.setCurrentSplitRecords(currentSplitRecords); this.setCurrentSplit(split); @@ -718,6 +718,9 @@ public boolean isSnapshotSplit(SourceSplit split) { @Override public void finishSplitRecords() { this.setCurrentSplitRecords(null); + // Close after each read, the binlog client will occupy the connection. + closeBinlogReader(); + this.setCurrentReader(null); } @Override From 2fa81dc0487f2371d61b7b2d8b1c31809a57b32a Mon Sep 17 00:00:00 2001 From: wudi Date: Tue, 13 Jan 2026 00:13:19 +0800 Subject: [PATCH 2/6] add log --- .../job/extensions/insert/streaming/StreamingMultiTblTask.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java index 07d9acf9d3f99c..a4f92fae2e9e13 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java @@ -107,12 +107,13 @@ public void run() throws JobException { log.info("task has been canceled, task id is {}", getTaskId()); return; } - log.info("start to run streaming multi task, offset is {}", runningOffset.toString()); sendWriteRequest(); } private void sendWriteRequest() throws JobException { Backend backend = StreamingJobUtils.selectBackend(); + log.info("start to run streaming multi task {} in backend {}/{}, offset is {}", + taskId, backend.getId(), backend.getHost(), runningOffset.toString()); WriteRecordRequest params = buildRequestParams(); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() .setApi("/api/writeRecords") From 8cf107498158967824a733a06b3096c4d511d79a Mon Sep 17 00:00:00 2001 From: wudi Date: Tue, 13 Jan 2026 12:07:59 +0800 Subject: [PATCH 3/6] add init reader for create pg slot --- .../offset/jdbc/JdbcSourceOffsetProvider.java | 57 ++++++++++++++++++- .../controller/ClientController.java | 14 +++++ .../reader/postgres/PostgresSourceReader.java | 3 + .../test_streaming_postgres_job_priv.groovy | 43 +++++++------- 4 files changed, 95 insertions(+), 22 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java index 560887d61ad1c0..f2314ddc68dfd9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java @@ -18,6 +18,7 @@ package org.apache.doris.job.offset.jdbc; import org.apache.doris.httpv2.entity.ResponseBody; +import org.apache.doris.httpv2.rest.RestApiStatusCode; import org.apache.doris.job.cdc.DataSourceConfigKeys; import org.apache.doris.job.cdc.request.CompareOffsetRequest; import org.apache.doris.job.cdc.request.FetchTableSplitsRequest; @@ -40,7 +41,6 @@ import org.apache.doris.system.Backend; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TStatusCode; - import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @@ -51,7 +51,6 @@ import lombok.extern.log4j.Log4j2; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; - import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; @@ -430,6 +429,10 @@ public void splitChunks(List createTbls) throws JobException { this.remainingSplits = tableSplits.values().stream() .flatMap(List::stream) .collect(Collectors.toList()); + } else { + // The source reader is automatically initialized when the split is obtained. + // In latest mode, a separate init is required.init source reader + initSourceReader(); } } @@ -490,6 +493,56 @@ private boolean checkNeedSplitChunks(Map sourceProperties) { return DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startMode); } + /** + * Source reader needs to be initialized here. + * For example, PG slots need to be created first; + * otherwise, conflicts will occur in multi-backends scenarios. + */ + private void initSourceReader() throws JobException { + Backend backend = StreamingJobUtils.selectBackend(); + JobBaseConfig requestParams = new JobBaseConfig(getJobId(), sourceType.name(), sourceProperties); + InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() + .setApi("/api/initReader") + .setParams(new Gson().toJson(requestParams)).build(); + TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); + InternalService.PRequestCdcClientResult result = null; + try { + Future future = + BackendServiceProxy.getInstance().requestCdcClient(address, request); + result = future.get(); + TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + log.warn("Failed to init job {} reader, {}", getJobId(), result.getStatus().getErrorMsgs(0)); + throw new JobException( + "Failed to init source reader," + result.getStatus().getErrorMsgs(0) + ", response: " + + result.getResponse()); + } + String response = result.getResponse(); + try { + ResponseBody responseObj = objectMapper.readValue( + response, + new TypeReference>() { + } + ); + if (responseObj.getCode() == RestApiStatusCode.OK.code) { + log.info("Init {} source reader successfully, response: {}", getJobId(), responseObj.getData()); + return; + } else { + throw new JobException("Failed to init source reader, error: " + responseObj.getData()); + } + } catch (JobException jobex) { + log.warn("Failed to init {} source reader, {}", getJobId(), response); + throw new JobException(jobex.getMessage()); + } catch (Exception e) { + log.warn("Failed to init {} source reader, {}", getJobId(), response); + throw new JobException("Failed to init source reader, cause " + e.getMessage()); + } + } catch (ExecutionException | InterruptedException ex) { + log.warn("init source reader: ", ex); + throw new JobException(ex); + } + } + public void cleanMeta(Long jobId) throws JobException { // clean meta table StreamingJobUtils.deleteJobMeta(jobId); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java index 2ca45ad247458d..b3302e2c78519c 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java @@ -27,6 +27,8 @@ import org.apache.doris.job.cdc.request.JobBaseConfig; import org.apache.doris.job.cdc.request.WriteRecordRequest; +import org.apache.commons.lang3.exception.ExceptionUtils; + import java.util.List; import org.slf4j.Logger; @@ -44,6 +46,18 @@ public class ClientController { @Autowired private PipelineCoordinator pipelineCoordinator; + /** init source reader */ + @RequestMapping(path = "/api/initReader", method = RequestMethod.POST) + public Object initSourceReader(@RequestBody JobBaseConfig jobConfig) { + try { + SourceReader reader = Env.getCurrentEnv().getReader(jobConfig); + return RestResponse.success("Source reader initialized successfully"); + } catch (Exception ex) { + LOG.error("Failed to create reader, jobId={}", jobConfig.getJobId(), ex); + return RestResponse.internalError(ExceptionUtils.getRootCauseMessage(ex)); + } + } + /** Fetch source splits for snapshot */ @RequestMapping(path = "/api/fetchSplits", method = RequestMethod.POST) public Object fetchSplits(@RequestBody FetchTableSplitsRequest ftsReq) { diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java index 52c3674444b37c..53b648bf38afcb 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java @@ -107,6 +107,9 @@ private void createSlotForGlobalStreamSplit(PostgresDialect postgresDialect) { postgresDialect.getSlotName(), postgresDialect.getPluginName()); // skip creating the replication slot when the slot exists. if (slotInfo != null) { + LOG.info( + "The replication slot {} already exists, skip creating it.", + postgresDialect.getSlotName()); return; } PostgresReplicationConnection replicationConnection = diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy index 682e575596e823..1211099bec9335 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy @@ -68,8 +68,9 @@ suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,extern sql """GRANT SELECT, INSERT ON ALL TABLES IN SCHEMA ${pgSchema} TO ${newPgUser}""" } - // create job by new user - sql """CREATE JOB ${jobName} + test { + // create job by new user + sql """CREATE JOB ${jobName} ON STREAMING FROM POSTGRES ( "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", @@ -86,24 +87,7 @@ suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,extern "table.create.properties.replication_num" = "1" ) """ - - // check job running - try { - Awaitility.await().atMost(300, SECONDS) - .pollInterval(1, SECONDS).until( - { - def jobStatus = sql """ select status, ErrorMsg from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ - log.info("jobStatus: " + jobStatus) - // check job status - jobStatus.size() == 1 && 'PAUSED' == jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch meta") - } - ) - } catch (Exception ex){ - def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" - def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" - log.info("show job: " + showjob) - log.info("show task: " + showtask) - throw ex; + exception "Failed to init source reader" } // grant replication to user @@ -112,6 +96,25 @@ suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,extern } + // create job by new user + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${newPgUser}", + "password" = "${newPgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${tableName}", + "offset" = "latest" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + Awaitility.await().atMost(300, SECONDS) .pollInterval(3, SECONDS).until( { From 0897c8b0f2b0473ce60c75edd77bb2c6386668ce Mon Sep 17 00:00:00 2001 From: wudi Date: Tue, 13 Jan 2026 12:12:20 +0800 Subject: [PATCH 4/6] fix --- .../apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java index f2314ddc68dfd9..66343d90a415fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java @@ -41,6 +41,7 @@ import org.apache.doris.system.Backend; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TStatusCode; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @@ -49,6 +50,7 @@ import lombok.Getter; import lombok.Setter; import lombok.extern.log4j.Log4j2; + import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; import java.util.ArrayList; From 01d0b0d0ee11a0389ee2a193476af72656d361ce Mon Sep 17 00:00:00 2001 From: wudi Date: Tue, 13 Jan 2026 12:26:49 +0800 Subject: [PATCH 5/6] fix style --- .../apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java index 66343d90a415fd..0c114ae8e64d3f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java @@ -50,9 +50,9 @@ import lombok.Getter; import lombok.Setter; import lombok.extern.log4j.Log4j2; - import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; + import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; From 8426e972918ddf9563c6f4c196577b1f860d3c79 Mon Sep 17 00:00:00 2001 From: wudi Date: Tue, 13 Jan 2026 15:52:34 +0800 Subject: [PATCH 6/6] fix unstale case --- .../cdc/test_streaming_mysql_job_priv.groovy | 22 +++++++++++++++++-- .../test_streaming_postgres_job_priv.groovy | 9 +++++++- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy index 23f3ce7f6793f9..d16bc57e73ecbe 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy @@ -197,12 +197,30 @@ suite("test_streaming_mysql_job_priv", "p0,external,mysql,external_docker,extern sql """FLUSH PRIVILEGES""" } - sleep(30000) + def jobSucceedTaskCnt = sql """ select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ + log.info("jobSucceedTaskCnt: " + jobSucceedTaskCnt) + + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(1, SECONDS).until( + { + def jobStatus = sql """ select status, SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ + log.info("jobStatus: " + jobStatus) + // check job status running and increase a success task + jobStatus.size() == 1 && 'RUNNING' == jobStatus.get(0).get(0) && jobStatus.get(0).get(1) > jobSucceedTaskCnt.get(0).get(0) + } + ) + } catch (Exception ex){ + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex; + } // check incremental data qt_select """ SELECT * FROM ${tableName} order by name asc """ - sql """DROP USER IF EXISTS '${user}'""" sql """ DROP JOB IF EXISTS where jobname = '${jobName}' diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy index 1211099bec9335..9c0cd6a464c8ca 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy @@ -138,7 +138,14 @@ suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,extern sql """INSERT INTO ${pgDB}.${pgSchema}.${tableName} (name,age) VALUES ('Doris',18);""" } - sleep(30000) + Awaitility.await().atMost(300, SECONDS) + .pollInterval(3, SECONDS).until( + { + def jobSucceedTaskCount = sql """select SucceedTaskCount from jobs("type"="insert") where Name='${jobName}'""" + log.info("jobSucceedTaskCount: " + jobSucceedTaskCount) + jobSucceedTaskCount.size() == 1 && jobSucceedTaskCount.get(0).get(0) >= '2' + } + ) // check incremental data qt_select """ SELECT * FROM ${tableName} order by name asc """