Skip to content

Commit 01dc49d

Browse files
author
gelxiogong
committed
fix high available execute
1 parent 19d66a2 commit 01dc49d

File tree

2 files changed

+21
-8
lines changed
  • streamis-jobmanager
    • streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/service
    • streamis-jobmanager-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/restful/api

2 files changed

+21
-8
lines changed

streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/service/DefaultStreamJobInspectService.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,9 @@ class DefaultStreamJobInspectService extends StreamJobInspectService with Loggin
173173
case Some(source) =>
174174
inspectVo = SourceUtils.manageJobProjectFile(highAvailablePolicy,source)
175175
case None =>
176-
logger.warn("this job source is null")
176+
logger.warn("this job source is null")
177+
inspectVo.setHighAvailable(true)
178+
inspectVo.setMsg("用户直接从页面上传,job的source为空,跳过高可用检查")
177179
}
178180
inspectVo
179181
}

streamis-jobmanager/streamis-jobmanager-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/restful/api/JobRestfulApi.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.webank.wedatasphere.streamis.jobmanager.exception.JobException;
2222
import com.webank.wedatasphere.streamis.jobmanager.exception.JobExceptionManager;
2323
import com.webank.wedatasphere.streamis.jobmanager.launcher.conf.JobConfKeyConstants;
24+
import com.webank.wedatasphere.streamis.jobmanager.launcher.dao.StreamJobConfMapper;
2425
import com.webank.wedatasphere.streamis.jobmanager.launcher.job.JobInfo;
2526
import com.webank.wedatasphere.streamis.jobmanager.launcher.job.conf.JobConf;
2627
import com.webank.wedatasphere.streamis.jobmanager.launcher.job.manager.JobLaunchManager;
@@ -41,6 +42,7 @@
4142
import com.webank.wedatasphere.streamis.jobmanager.manager.service.StreamTaskService;
4243
import com.webank.wedatasphere.streamis.jobmanager.manager.transform.entity.RealtimeLogEntity;
4344
import com.webank.wedatasphere.streamis.jobmanager.manager.transform.entity.StreamisTransformJobContent;
45+
import com.webank.wedatasphere.streamis.jobmanager.manager.utils.SourceUtils;
4446
import com.webank.wedatasphere.streamis.jobmanager.manager.utils.StreamTaskUtils;
4547
import com.webank.wedatasphere.streamis.jobmanager.utils.RegularUtil;
4648
import com.webank.wedatasphere.streamis.jobmanager.vo.BulkUpdateLabel;
@@ -86,6 +88,8 @@ public class JobRestfulApi {
8688
@Autowired
8789
private DefaultStreamJobService defaultStreamJobService;
8890

91+
@Autowired
92+
private StreamJobConfMapper streamJobConfMapper;
8993
@Resource
9094
private JobLaunchManager<? extends JobInfo> jobLaunchManager;
9195

@@ -290,6 +294,7 @@ public Message executeInspect(HttpServletRequest req, @RequestParam(value = "job
290294
inspections = inspectResult.stream().map(JobInspectVo::getInspectName)
291295
.collect(Collectors.toList());
292296
} catch (Exception e){
297+
LOG.warn(e.getMessage());
293298
return Message.error("Fail to inspect job " + jobId + " of the execution(任务执行前检查失败), message: " + e.getMessage());
294299
}
295300

@@ -338,13 +343,19 @@ public Message executeJob(HttpServletRequest req, @RequestBody Map<String, Objec
338343
return Message.error("The system does not enable the detach feature ,detach job cannot start [" + jobId + "]");
339344
}
340345
StreamJobVersion jobVersion = this.defaultStreamJobService.getLatestJobVersion(jobId);
341-
String source = jobVersion.getSource();
342-
HashMap<Object, Object> sourceMap = new HashMap<>();
343-
sourceMap = BDPJettyServerHelper.gson().fromJson(source, HashMap.class);
344-
if (sourceMap.containsKey("isHighAvailable")) {
345-
if(!((Boolean) sourceMap.get("isHighAvailable"))){
346-
return Message.error("The master and backup cluster materials do not match, please check the material");
347-
}
346+
String highAvailablePolicy = streamJobConfMapper.getRawConfValue(jobId, "wds.streamis.app.highavailable.policy");
347+
JobHighAvailableVo inspectVo = new JobHighAvailableVo();
348+
Optional<String> sourceOption = Optional.ofNullable(jobVersion.getSource());
349+
if(sourceOption.isPresent()) {
350+
String source = sourceOption.get();
351+
inspectVo = SourceUtils.manageJobProjectFile(highAvailablePolicy, source);
352+
} else {
353+
LOG.warn("this job source is null");
354+
inspectVo.setHighAvailable(true);
355+
inspectVo.setMsg("用户直接从页面上传,job的source为空,跳过高可用检查");
356+
}
357+
if (!inspectVo.isHighAvailable()){
358+
return Message.error("The master and backup cluster materials do not match, please check the material");
348359
}
349360
try {
350361
streamTaskService.execute(jobId, 0L, userName);

0 commit comments

Comments
 (0)