Skip to content

Commit 3d0359a

Browse files
Optimize the update task and stop task logic.
1 parent db0e298 commit 3d0359a

File tree

1 file changed

+27
-17
lines changed
  • streamis-jobmanager/streamis-jobmanager-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/restful/api

1 file changed

+27
-17
lines changed

streamis-jobmanager/streamis-jobmanager-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/restful/api/JobRestfulApi.java

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -175,19 +175,22 @@ public Message killJob(HttpServletRequest req,
175175
StreamJob streamJob = this.streamJobService.getJobById(jobId);
176176
if(streamJob == null) {
177177
return Message.error("not exists job " + jobId);
178-
} else if(!JobConf.SUPPORTED_MANAGEMENT_JOB_TYPES().getValue().contains(streamJob.getJobType())) {
179-
return Message.error("Job " + streamJob.getName() + " is not supported to stop.");
180178
}
181179
if (!streamJobService.hasPermission(streamJob, userName) &&
182180
!this.privilegeService.hasEditPrivilege(req, streamJob.getProjectName())) {
183181
return Message.error("Have no permission to kill/stop StreamJob [" + jobId + "]");
184182
}
185-
try {
186-
PauseResultVo resultVo = streamTaskService.pause(jobId, 0L, userName, Objects.nonNull(snapshot)? snapshot : false);
187-
return snapshot? Message.ok().data("path", resultVo.getSnapshotPath()) : Message.ok();
188-
} catch (Exception e) {
189-
LOG.error("{} kill job {} failed!", userName, jobId, e);
190-
return Message.error(ExceptionUtils.getRootCauseMessage(e));
183+
if(JobConf.SUPPORTED_MANAGEMENT_JOB_TYPES().getValue().contains(streamJob.getJobType())) {
184+
try {
185+
PauseResultVo resultVo = streamTaskService.pause(jobId, 0L, userName, Objects.nonNull(snapshot)? snapshot : false);
186+
return snapshot? Message.ok().data("path", resultVo.getSnapshotPath()) : Message.ok();
187+
} catch (Exception e) {
188+
LOG.error("{} kill job {} failed!", userName, jobId, e);
189+
return Message.error(ExceptionUtils.getRootCauseMessage(e));
190+
}
191+
} else {
192+
LOG.error("{} try to kill not-supported-management job {} with name {}.", userName, jobId, streamJob.getName());
193+
return tryStopTask(req, userName, streamJob, null);
191194
}
192195
}
193196

@@ -334,6 +337,9 @@ public Message updateTask(HttpServletRequest req,
334337
if(streamTask == null) {
335338
LOG.warn("Job {} is not exists running task, ignore to update its metrics.", jobName);
336339
return Message.ok("not exists running task, ignore it.");
340+
} else if(JobConf.isCompleted(streamTask.getStatus())) {
341+
LOG.warn("The task of job {} is completed, ignore to update its metrics.", jobName);
342+
return Message.ok("Task is completed, ignore to update its metrics.");
337343
}
338344
return withFlinkJobInfo(jobName, streamTask.getLinkisJobInfo(), flinkJobInfo -> {
339345
if(!flinkJobInfo.getApplicationId().equals(appId)) {
@@ -367,28 +373,32 @@ public Message stopTask(HttpServletRequest req,
367373
} else if(!"spark.jar".equals(streamJobs.get(0).getJobType())) {
368374
return Message.error("Only spark.jar Job support to stop task.");
369375
}
370-
if (!streamJobService.hasPermission(streamJobs.get(0), username) &&
371-
!this.privilegeService.hasEditPrivilege(req, streamJobs.get(0).getProjectName())) {
372-
return Message.error("Have no permission to stop task for StreamJob [" + jobName + "].");
376+
return tryStopTask(req, username, streamJobs.get(0), appId);
377+
}
378+
379+
private Message tryStopTask(HttpServletRequest req, String username, StreamJob streamJob, String appId) {
380+
if (!streamJobService.hasPermission(streamJob, username) &&
381+
!this.privilegeService.hasEditPrivilege(req, streamJob.getProjectName())) {
382+
return Message.error("Have no permission to stop task for StreamJob [" + streamJob.getName() + "].");
373383
}
374384
// 如果存在正在运行的,将其停止掉
375-
StreamTask streamTask = streamTaskService.getLatestTaskByJobId(streamJobs.get(0).getId());
385+
StreamTask streamTask = streamTaskService.getLatestTaskByJobId(streamJob.getId());
376386
if(streamTask != null && JobConf.isRunning(streamTask.getStatus())) {
377-
return withFlinkJobInfo(jobName, streamTask.getLinkisJobInfo(), flinkJobInfo -> {
378-
if(flinkJobInfo.getApplicationId().equals(appId)) {
379-
LOG.warn("Streamis Job {} is exists running task, update its status to stopped.", jobName);
387+
return withFlinkJobInfo(streamJob.getName(), streamTask.getLinkisJobInfo(), flinkJobInfo -> {
388+
if(appId == null || flinkJobInfo.getApplicationId().equals(appId)) {
389+
LOG.warn("Streamis Job {} is exists running task, update its status to stopped.", streamJob.getName());
380390
streamTask.setStatus((Integer) JobConf.FLINK_JOB_STATUS_STOPPED().getValue());
381391
streamTask.setErrDesc("stopped by App itself.");
382392
streamTaskService.updateTask(streamTask);
383393
return Message.ok();
384394
} else {
385395
LOG.warn("Job {} with running task <appId: {}> is not equals to the request appId: {}, ignore to stop it.",
386-
jobName, flinkJobInfo.getApplicationId(), appId);
396+
streamJob.getName(), flinkJobInfo.getApplicationId(), appId);
387397
return Message.ok("the request appId is not equals to the running task appId " + flinkJobInfo.getApplicationId());
388398
}
389399
});
390400
} else {
391-
LOG.warn("Streamis Job {} is not exists running task, ignore to stop it.", jobName);
401+
LOG.warn("Streamis Job {} is not exists running task, ignore to stop it.", streamJob.getName());
392402
return Message.ok();
393403
}
394404
}

0 commit comments

Comments
 (0)