Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,13 @@ public void run() throws JobException {
log.info("task has been canceled, task id is {}", getTaskId());
return;
}
log.info("start to run streaming multi task, offset is {}", runningOffset.toString());
sendWriteRequest();
}

private void sendWriteRequest() throws JobException {
Backend backend = StreamingJobUtils.selectBackend();
log.info("start to run streaming multi task {} in backend {}/{}, offset is {}",
taskId, backend.getId(), backend.getHost(), runningOffset.toString());
this.runningBackendId = backend.getId();
WriteRecordRequest params = buildRequestParams();
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.job.offset.jdbc;

import org.apache.doris.httpv2.entity.ResponseBody;
import org.apache.doris.httpv2.rest.RestApiStatusCode;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
import org.apache.doris.job.cdc.request.CompareOffsetRequest;
import org.apache.doris.job.cdc.request.FetchTableSplitsRequest;
Expand Down Expand Up @@ -430,6 +431,10 @@ public void splitChunks(List<String> createTbls) throws JobException {
this.remainingSplits = tableSplits.values().stream()
.flatMap(List::stream)
.collect(Collectors.toList());
} else {
// The source reader is automatically initialized when the split is obtained.
// In latest mode, a separate init is required.init source reader
initSourceReader();
}
}

Expand Down Expand Up @@ -490,6 +495,56 @@ private boolean checkNeedSplitChunks(Map<String, String> sourceProperties) {
return DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startMode);
}

/**
* Source reader needs to be initialized here.
* For example, PG slots need to be created first;
* otherwise, conflicts will occur in multi-backends scenarios.
*/
private void initSourceReader() throws JobException {
Backend backend = StreamingJobUtils.selectBackend();
JobBaseConfig requestParams = new JobBaseConfig(getJobId(), sourceType.name(), sourceProperties);
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/initReader")
.setParams(new Gson().toJson(requestParams)).build();
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
InternalService.PRequestCdcClientResult result = null;
try {
Future<PRequestCdcClientResult> future =
BackendServiceProxy.getInstance().requestCdcClient(address, request);
result = future.get();
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
log.warn("Failed to init job {} reader, {}", getJobId(), result.getStatus().getErrorMsgs(0));
throw new JobException(
"Failed to init source reader," + result.getStatus().getErrorMsgs(0) + ", response: "
+ result.getResponse());
}
String response = result.getResponse();
try {
ResponseBody<String> responseObj = objectMapper.readValue(
response,
new TypeReference<ResponseBody<String>>() {
}
);
if (responseObj.getCode() == RestApiStatusCode.OK.code) {
log.info("Init {} source reader successfully, response: {}", getJobId(), responseObj.getData());
return;
} else {
throw new JobException("Failed to init source reader, error: " + responseObj.getData());
}
} catch (JobException jobex) {
log.warn("Failed to init {} source reader, {}", getJobId(), response);
throw new JobException(jobex.getMessage());
} catch (Exception e) {
log.warn("Failed to init {} source reader, {}", getJobId(), response);
throw new JobException("Failed to init source reader, cause " + e.getMessage());
}
} catch (ExecutionException | InterruptedException ex) {
log.warn("init source reader: ", ex);
throw new JobException(ex);
}
}

public void cleanMeta(Long jobId) throws JobException {
// clean meta table
StreamingJobUtils.deleteJobMeta(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.doris.job.cdc.request.JobBaseConfig;
import org.apache.doris.job.cdc.request.WriteRecordRequest;

import org.apache.commons.lang3.exception.ExceptionUtils;

import java.util.List;

import org.slf4j.Logger;
Expand All @@ -44,6 +46,18 @@ public class ClientController {

@Autowired private PipelineCoordinator pipelineCoordinator;

/** init source reader */
@RequestMapping(path = "/api/initReader", method = RequestMethod.POST)
public Object initSourceReader(@RequestBody JobBaseConfig jobConfig) {
try {
SourceReader reader = Env.getCurrentEnv().getReader(jobConfig);
return RestResponse.success("Source reader initialized successfully");
} catch (Exception ex) {
LOG.error("Failed to create reader, jobId={}", jobConfig.getJobId(), ex);
return RestResponse.internalError(ExceptionUtils.getRootCauseMessage(ex));
}
}

/** Fetch source splits for snapshot */
@RequestMapping(path = "/api/fetchSplits", method = RequestMethod.POST)
public Object fetchSplits(@RequestBody FetchTableSplitsRequest ftsReq) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc
// build split
Tuple2<SourceSplitBase, Boolean> splitFlag = createSourceSplit(offsetMeta, baseReq);
split = splitFlag.f0;
closeBinlogReader();
// closeBinlogReader();
currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq);
this.setCurrentSplitRecords(currentSplitRecords);
this.setCurrentSplit(split);
Expand Down Expand Up @@ -616,6 +616,9 @@ public boolean isSnapshotSplit(SourceSplit split) {
@Override
public void finishSplitRecords() {
this.setCurrentSplitRecords(null);
// Close after each read, the binlog client will occupy the connection.
closeBinlogReader();
this.setCurrentReader(null);
}

private Map<TableId, TableChanges.TableChange> getTableSchemas(JobBaseConfig config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc
Tuple2<MySqlSplit, Boolean> splitFlag = createMySqlSplit(offsetMeta, baseReq);
split = splitFlag.f0;
// reset binlog reader
closeBinlogReader();
// closeBinlogReader();
currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq);
this.setCurrentSplitRecords(currentSplitRecords);
this.setCurrentSplit(split);
Expand Down Expand Up @@ -718,6 +718,9 @@ public boolean isSnapshotSplit(SourceSplit split) {
@Override
public void finishSplitRecords() {
this.setCurrentSplitRecords(null);
// Close after each read, the binlog client will occupy the connection.
closeBinlogReader();
this.setCurrentReader(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ private void createSlotForGlobalStreamSplit(PostgresDialect postgresDialect) {
postgresDialect.getSlotName(), postgresDialect.getPluginName());
// skip creating the replication slot when the slot exists.
if (slotInfo != null) {
LOG.info(
"The replication slot {} already exists, skip creating it.",
postgresDialect.getSlotName());
return;
}
PostgresReplicationConnection replicationConnection =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,30 @@ suite("test_streaming_mysql_job_priv", "p0,external,mysql,external_docker,extern
sql """FLUSH PRIVILEGES"""
}

sleep(30000)
def jobSucceedTaskCnt = sql """ select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
log.info("jobSucceedTaskCnt: " + jobSucceedTaskCnt)

try {
Awaitility.await().atMost(300, SECONDS)
.pollInterval(1, SECONDS).until(
{
def jobStatus = sql """ select status, SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
log.info("jobStatus: " + jobStatus)
// check job status running and increase a success task
jobStatus.size() == 1 && 'RUNNING' == jobStatus.get(0).get(0) && jobStatus.get(0).get(1) > jobSucceedTaskCnt.get(0).get(0)
}
)
} catch (Exception ex){
def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'"""
def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'"""
log.info("show job: " + showjob)
log.info("show task: " + showtask)
throw ex;
}

// check incremental data
qt_select """ SELECT * FROM ${tableName} order by name asc """


sql """DROP USER IF EXISTS '${user}'"""
sql """
DROP JOB IF EXISTS where jobname = '${jobName}'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,extern
sql """GRANT SELECT, INSERT ON ALL TABLES IN SCHEMA ${pgSchema} TO ${newPgUser}"""
}

// create job by new user
sql """CREATE JOB ${jobName}
test {
// create job by new user
sql """CREATE JOB ${jobName}
ON STREAMING
FROM POSTGRES (
"jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
Expand All @@ -86,24 +87,7 @@ suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,extern
"table.create.properties.replication_num" = "1"
)
"""

// check job running
try {
Awaitility.await().atMost(300, SECONDS)
.pollInterval(1, SECONDS).until(
{
def jobStatus = sql """ select status, ErrorMsg from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
log.info("jobStatus: " + jobStatus)
// check job status
jobStatus.size() == 1 && 'PAUSED' == jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch meta")
}
)
} catch (Exception ex){
def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'"""
def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'"""
log.info("show job: " + showjob)
log.info("show task: " + showtask)
throw ex;
exception "Failed to init source reader"
}

// grant replication to user
Expand All @@ -112,6 +96,25 @@ suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,extern
}


// create job by new user
sql """CREATE JOB ${jobName}
ON STREAMING
FROM POSTGRES (
"jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
"driver_url" = "${driver_url}",
"driver_class" = "org.postgresql.Driver",
"user" = "${newPgUser}",
"password" = "${newPgPassword}",
"database" = "${pgDB}",
"schema" = "${pgSchema}",
"include_tables" = "${tableName}",
"offset" = "latest"
)
TO DATABASE ${currentDb} (
"table.create.properties.replication_num" = "1"
)
"""

Awaitility.await().atMost(300, SECONDS)
.pollInterval(3, SECONDS).until(
{
Expand All @@ -135,7 +138,14 @@ suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,extern
sql """INSERT INTO ${pgDB}.${pgSchema}.${tableName} (name,age) VALUES ('Doris',18);"""
}

sleep(30000)
Awaitility.await().atMost(300, SECONDS)
.pollInterval(3, SECONDS).until(
{
def jobSucceedTaskCount = sql """select SucceedTaskCount from jobs("type"="insert") where Name='${jobName}'"""
log.info("jobSucceedTaskCount: " + jobSucceedTaskCount)
jobSucceedTaskCount.size() == 1 && jobSucceedTaskCount.get(0).get(0) >= '2'
}
)

// check incremental data
qt_select """ SELECT * FROM ${tableName} order by name asc """
Expand Down
Loading