Skip to content

Commit d41fe6b

Browse files
branch-4.0: [Fix](Streaming) fix get remote meta failed to pause streaming job #59760 (#59808)
Cherry-picked from #59760 Co-authored-by: wudi <[email protected]>
1 parent 31f0d7b commit d41fe6b

File tree

4 files changed

+35
-14
lines changed

4 files changed

+35
-14
lines changed

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, M
111111
@Getter
112112
@Setter
113113
@SerializedName("fr")
114-
protected FailureReason failureReason;
114+
protected volatile FailureReason failureReason;
115115
@Getter
116116
@Setter
117117
protected long latestAutoResumeTimestamp;
@@ -505,7 +505,7 @@ public List<AbstractStreamingTask> queryAllStreamTasks() {
505505
return tasks;
506506
}
507507

508-
protected void fetchMeta() {
508+
protected void fetchMeta() throws JobException {
509509
try {
510510
if (tvfType != null) {
511511
if (originTvfProps == null) {
@@ -516,10 +516,18 @@ protected void fetchMeta() {
516516
offsetProvider.fetchRemoteMeta(new HashMap<>());
517517
}
518518
} catch (Exception ex) {
519-
//todo: The job status = MANUAL_PAUSE_ERR, No need to set failureReason again
520519
log.warn("fetch remote meta failed, job id: {}", getJobId(), ex);
521-
failureReason = new FailureReason(InternalErrorCode.GET_REMOTE_DATA_ERROR,
522-
"Failed to fetch meta, " + ex.getMessage());
520+
if (this.getFailureReason() == null
521+
|| !InternalErrorCode.MANUAL_PAUSE_ERR.equals(this.getFailureReason().getCode())) {
522+
// When a job is manually paused, it does not need to be set again,
523+
// otherwise, it may be woken up by auto resume.
524+
this.setFailureReason(
525+
new FailureReason(InternalErrorCode.GET_REMOTE_DATA_ERROR,
526+
"Failed to fetch meta, " + ex.getMessage()));
527+
// If fetching meta fails, the job is paused
528+
// and auto resume will automatically wake it up.
529+
this.updateJobStatus(JobStatus.PAUSED);
530+
}
523531
}
524532
}
525533

fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.doris.job.cdc.request.JobBaseConfig;
2727

2828
import org.apache.commons.lang3.StringUtils;
29+
import org.apache.commons.lang3.exception.ExceptionUtils;
2930
import org.apache.flink.api.connector.source.SourceSplit;
3031
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
3132
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
@@ -116,8 +117,8 @@ private void createSlotForGlobalStreamSplit(PostgresDialect postgresDialect) {
116117
} catch (Throwable t) {
117118
throw new CdcClientException(
118119
String.format(
119-
"Fail to get or create slot for global stream split, the slot name is %s. Due to: ",
120-
postgresDialect.getSlotName()),
120+
"Fail to get or create slot, the slot name is %s. Due to: %s ",
121+
postgresDialect.getSlotName(), ExceptionUtils.getRootCauseMessage(t)),
121122
t);
122123
}
123124
}

regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -173,11 +173,23 @@ suite("test_streaming_mysql_job_priv", "p0,external,mysql,external_docker,extern
173173
sql """INSERT INTO ${mysqlDb}.${tableName} (name,age) VALUES ('DorisTestPriv',28);"""
174174
}
175175

176-
sleep(20000)
177-
178-
def jobErrorMsg = sql """select ErrorMsg from jobs("type"="insert") where Name='${jobName}'"""
179-
log.info("jobErrorMsg: " + jobErrorMsg)
180-
assert jobErrorMsg.get(0).get(0).contains("Failed to fetch meta")
176+
try {
177+
Awaitility.await().atMost(300, SECONDS)
178+
.pollInterval(1, SECONDS).until(
179+
{
180+
def jobStatus = sql """ select status, ErrorMsg from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
181+
log.info("jobStatus: " + jobStatus)
182+
// check job status
183+
jobStatus.size() == 1 && 'PAUSED' == jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch meta")
184+
}
185+
)
186+
} catch (Exception ex){
187+
def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'"""
188+
def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'"""
189+
log.info("show job: " + showjob)
190+
log.info("show task: " + showtask)
191+
throw ex;
192+
}
181193

182194
// grant binlog priv to mysqluser
183195
connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") {

regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_priv.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,12 @@ suite("test_streaming_postgres_job_priv", "p0,external,pg,external_docker,extern
9090
// check job running
9191
try {
9292
Awaitility.await().atMost(300, SECONDS)
93-
.pollInterval(3, SECONDS).until(
93+
.pollInterval(1, SECONDS).until(
9494
{
9595
def jobStatus = sql """ select status, ErrorMsg from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
9696
log.info("jobStatus: " + jobStatus)
9797
// check job status
98-
jobStatus.size() == 1 && 'RUNNING' == jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch meta")
98+
jobStatus.size() == 1 && 'PAUSED' == jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch meta")
9999
}
100100
)
101101
} catch (Exception ex){

0 commit comments

Comments
 (0)