diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java index 08f1bb5ccaf6fe..1f955f0a2c3d3d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java @@ -108,12 +108,13 @@ public void run() throws JobException { log.info("task has been canceled, task id is {}", getTaskId()); return; } - log.info("start to run streaming multi task, offset is {}", runningOffset.toString()); sendWriteRequest(); } private void sendWriteRequest() throws JobException { Backend backend = StreamingJobUtils.selectBackend(); + log.info("start to run streaming multi task {} in backend {}/{}, offset is {}", + taskId, backend.getId(), backend.getHost(), runningOffset.toString()); this.runningBackendId = backend.getId(); WriteRecordRequest params = buildRequestParams(); InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java index 560887d61ad1c0..0c114ae8e64d3f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java @@ -18,6 +18,7 @@ package org.apache.doris.job.offset.jdbc; import org.apache.doris.httpv2.entity.ResponseBody; +import org.apache.doris.httpv2.rest.RestApiStatusCode; import org.apache.doris.job.cdc.DataSourceConfigKeys; import org.apache.doris.job.cdc.request.CompareOffsetRequest; import org.apache.doris.job.cdc.request.FetchTableSplitsRequest; @@ -430,6 +431,10 @@ public void splitChunks(List createTbls) throws JobException { this.remainingSplits = tableSplits.values().stream() .flatMap(List::stream) .collect(Collectors.toList()); + } else { + // The source reader is automatically initialized when the split is obtained. + // In latest mode, a separate init is required.init source reader + initSourceReader(); } } @@ -490,6 +495,56 @@ private boolean checkNeedSplitChunks(Map sourceProperties) { return DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startMode); } + /** + * Source reader needs to be initialized here. + * For example, PG slots need to be created first; + * otherwise, conflicts will occur in multi-backends scenarios. + */ + private void initSourceReader() throws JobException { + Backend backend = StreamingJobUtils.selectBackend(); + JobBaseConfig requestParams = new JobBaseConfig(getJobId(), sourceType.name(), sourceProperties); + InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder() + .setApi("/api/initReader") + .setParams(new Gson().toJson(requestParams)).build(); + TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); + InternalService.PRequestCdcClientResult result = null; + try { + Future future = + BackendServiceProxy.getInstance().requestCdcClient(address, request); + result = future.get(); + TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + log.warn("Failed to init job {} reader, {}", getJobId(), result.getStatus().getErrorMsgs(0)); + throw new JobException( + "Failed to init source reader," + result.getStatus().getErrorMsgs(0) + ", response: " + + result.getResponse()); + } + String response = result.getResponse(); + try { + ResponseBody responseObj = objectMapper.readValue( + response, + new TypeReference>() { + } + ); + if (responseObj.getCode() == RestApiStatusCode.OK.code) { + log.info("Init {} source reader successfully, response: {}", getJobId(), responseObj.getData()); + return; + } else { + throw new JobException("Failed to init source reader, error: " + responseObj.getData()); + } + } catch (JobException jobex) { + log.warn("Failed to init {} source reader, {}", getJobId(), response); + throw new JobException(jobex.getMessage()); + } catch (Exception e) { + log.warn("Failed to init {} source reader, {}", getJobId(), response); + throw new JobException("Failed to init source reader, cause " + e.getMessage()); + } + } catch (ExecutionException | InterruptedException ex) { + log.warn("init source reader: ", ex); + throw new JobException(ex); + } + } + public void cleanMeta(Long jobId) throws JobException { // clean meta table StreamingJobUtils.deleteJobMeta(jobId); diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java index 2ca45ad247458d..b3302e2c78519c 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java @@ -27,6 +27,8 @@ import org.apache.doris.job.cdc.request.JobBaseConfig; import org.apache.doris.job.cdc.request.WriteRecordRequest; +import org.apache.commons.lang3.exception.ExceptionUtils; + import java.util.List; import org.slf4j.Logger; @@ -44,6 +46,18 @@ public class ClientController { @Autowired private PipelineCoordinator pipelineCoordinator; + /** init source reader */ + @RequestMapping(path = "/api/initReader", method = RequestMethod.POST) + public Object initSourceReader(@RequestBody JobBaseConfig jobConfig) { + try { + SourceReader reader = Env.getCurrentEnv().getReader(jobConfig); + return RestResponse.success("Source reader initialized successfully"); + } catch (Exception ex) { + LOG.error("Failed to create reader, jobId={}", jobConfig.getJobId(), ex); + return RestResponse.internalError(ExceptionUtils.getRootCauseMessage(ex)); + } + } + /** Fetch source splits for snapshot */ @RequestMapping(path = "/api/fetchSplits", method = RequestMethod.POST) public Object fetchSplits(@RequestBody FetchTableSplitsRequest ftsReq) { diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java index 541e3354828820..70ab3961acc3a1 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java @@ -172,7 +172,7 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc // build split Tuple2 splitFlag = createSourceSplit(offsetMeta, baseReq); split = splitFlag.f0; - closeBinlogReader(); + // closeBinlogReader(); currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq); this.setCurrentSplitRecords(currentSplitRecords); this.setCurrentSplit(split); @@ -616,6 +616,9 @@ public boolean isSnapshotSplit(SourceSplit split) { @Override public void finishSplitRecords() { this.setCurrentSplitRecords(null); + // Close after each read, the binlog client will occupy the connection. + closeBinlogReader(); + this.setCurrentReader(null); } private Map getTableSchemas(JobBaseConfig config) { diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java index 27fbf3be88b363..a3f14a953b6231 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java @@ -188,7 +188,7 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc Tuple2 splitFlag = createMySqlSplit(offsetMeta, baseReq); split = splitFlag.f0; // reset binlog reader - closeBinlogReader(); + // closeBinlogReader(); currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq); this.setCurrentSplitRecords(currentSplitRecords); this.setCurrentSplit(split); @@ -718,6 +718,9 @@ public boolean isSnapshotSplit(SourceSplit split) { @Override public void finishSplitRecords() { this.setCurrentSplitRecords(null); + // Close after each read, the binlog client will occupy the connection. + closeBinlogReader(); + this.setCurrentReader(null); } @Override diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java index 52c3674444b37c..53b648bf38afcb 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java @@ -107,6 +107,9 @@ private void createSlotForGlobalStreamSplit(PostgresDialect postgresDialect) { postgresDialect.getSlotName(), postgresDialect.getPluginName()); // skip creating the replication slot when the slot exists. if (slotInfo != null) { + LOG.info( + "The replication slot {} already exists, skip creating it.", + postgresDialect.getSlotName()); return; } PostgresReplicationConnection replicationConnection = diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy index 23f3ce7f6793f9..d16bc57e73ecbe 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy @@ -197,12 +197,30 @@ suite("test_streaming_mysql_job_priv", "p0,external,mysql,external_docker,extern sql """FLUSH PRIVILEGES""" } - sleep(30000) + def jobSucceedTaskCnt = sql """ select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ + log.info("jobSucceedTaskCnt: " + jobSucceedTaskCnt) + + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(1, SECONDS).until( + { + def jobStatus = sql """ select status, SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ + log.info("jobStatus: " + jobStatus) + // check job status running and increase a success task + jobStatus.size() == 1 && 'RUNNING' == jobStatus.get(0).get(0) && jobStatus.get(0).get(1) > jobSucceedTaskCnt.get(0).get(0) + } + ) + } catch (Exception ex){ + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex; + } // check incremental data qt_select """ SELECT * FROM ${tableName} order by name asc """ - sql """DROP USER IF EXISTS '${user}'""" sql """ DROP JOB IF EXISTS where jobname = '${jobName}' diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy index 682e575596e823..9c0cd6a464c8ca 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy @@ -68,8 +68,9 @@ suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,extern sql """GRANT SELECT, INSERT ON ALL TABLES IN SCHEMA ${pgSchema} TO ${newPgUser}""" } - // create job by new user - sql """CREATE JOB ${jobName} + test { + // create job by new user + sql """CREATE JOB ${jobName} ON STREAMING FROM POSTGRES ( "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", @@ -86,24 +87,7 @@ suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,extern "table.create.properties.replication_num" = "1" ) """ - - // check job running - try { - Awaitility.await().atMost(300, SECONDS) - .pollInterval(1, SECONDS).until( - { - def jobStatus = sql """ select status, ErrorMsg from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ - log.info("jobStatus: " + jobStatus) - // check job status - jobStatus.size() == 1 && 'PAUSED' == jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch meta") - } - ) - } catch (Exception ex){ - def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" - def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" - log.info("show job: " + showjob) - log.info("show task: " + showtask) - throw ex; + exception "Failed to init source reader" } // grant replication to user @@ -112,6 +96,25 @@ suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,extern } + // create job by new user + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${newPgUser}", + "password" = "${newPgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${tableName}", + "offset" = "latest" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + Awaitility.await().atMost(300, SECONDS) .pollInterval(3, SECONDS).until( { @@ -135,7 +138,14 @@ suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,extern sql """INSERT INTO ${pgDB}.${pgSchema}.${tableName} (name,age) VALUES ('Doris',18);""" } - sleep(30000) + Awaitility.await().atMost(300, SECONDS) + .pollInterval(3, SECONDS).until( + { + def jobSucceedTaskCount = sql """select SucceedTaskCount from jobs("type"="insert") where Name='${jobName}'""" + log.info("jobSucceedTaskCount: " + jobSucceedTaskCount) + jobSucceedTaskCount.size() == 1 && jobSucceedTaskCount.get(0).get(0) >= '2' + } + ) // check incremental data qt_select """ SELECT * FROM ${tableName} order by name asc """