Skip to content

Commit e65f291

Browse files
committed
Merge branch 'dev-0.2.3-log-collector' of github.com:WeDataSphere/Streamis into dev-0.2.3-log-collector
2 parents ab8d480 + aef48de commit e65f291

File tree

4 files changed

+133
-24
lines changed

4 files changed

+133
-24
lines changed

streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/conf/JobConf.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,4 +84,7 @@ object JobConf {
8484

8585
val TASK_SUBMIT_TIME_MAX: CommonVars[TimeType] = CommonVars("wds.streamis.task.submit.time.max", new TimeType("5m"))
8686

87+
val SUPPORTED_JOB_TYPES: CommonVars[String] = CommonVars("wds.streamis.supported.job.types", "flink.jar,flink.sql,spark.jar")
88+
89+
val SUPPORTED_MANAGEMENT_JOB_TYPES: CommonVars[String] = CommonVars("wds.streamis.management.supported.job.types", "flink.jar,flink.sql")
8790
}

streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/service/DefaultStreamJobService.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import com.webank.wedatasphere.streamis.jobmanager.launcher.service.StreamJobCon
2424
import com.webank.wedatasphere.streamis.jobmanager.manager.alert.AlertLevel
2525
import com.webank.wedatasphere.streamis.jobmanager.manager.conf.JobConf
2626
import com.webank.wedatasphere.streamis.jobmanager.manager.dao.{StreamAlertMapper, StreamJobMapper, StreamTaskMapper}
27-
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo.{QueryJobListVo, TaskCoreNumVo, VersionDetailVo}
2827
import com.webank.wedatasphere.streamis.jobmanager.manager.entity._
28+
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo.{QueryJobListVo, TaskCoreNumVo, VersionDetailVo}
2929
import com.webank.wedatasphere.streamis.jobmanager.manager.exception.{JobCreateErrorException, JobFetchErrorException}
3030
import com.webank.wedatasphere.streamis.jobmanager.manager.transform.JobContentParser
3131
import com.webank.wedatasphere.streamis.jobmanager.manager.transform.entity.StreamisTransformJobContent
@@ -128,6 +128,9 @@ class DefaultStreamJobService extends StreamJobService with Logging {
128128
override def createStreamJob(metaJsonInfo: MetaJsonInfo, userName: String): StreamJobVersion = {
129129
if(StringUtils.isBlank(metaJsonInfo.getJobType))
130130
throw new JobCreateErrorException(30030, s"jobType is needed.")
131+
else if(!JobConf.SUPPORTED_JOB_TYPES.getValue.contains(metaJsonInfo.getJobType)) {
132+
throw new JobCreateErrorException(30030, s"jobType ${metaJsonInfo.getJobType} is not supported.")
133+
}
131134
if(metaJsonInfo.getJobContent == null || metaJsonInfo.getJobContent.isEmpty)
132135
throw new JobCreateErrorException(30030, s"jobContent is needed.")
133136
val job = streamJobMapper.getCurrentJob(metaJsonInfo.getProjectName, metaJsonInfo.getJobName)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package com.webank.wedatasphere.streamis.jobmanager.manager.transform.parser
2+
3+
import java.util
4+
5+
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.{StreamJob, StreamJobVersion, StreamJobVersionFiles, StreamisFile}
6+
import com.webank.wedatasphere.streamis.jobmanager.manager.exception.JobExecuteErrorException
7+
import com.webank.wedatasphere.streamis.jobmanager.manager.transform.entity.{StreamisJarTransformJobContent, StreamisTransformJobContent}
8+
import org.apache.commons.lang.StringUtils
9+
import org.apache.linkis.common.utils.JsonUtils
10+
import org.apache.linkis.manager.label.entity.engine.RunType
11+
import org.apache.linkis.manager.label.entity.engine.RunType.RunType
12+
import org.springframework.stereotype.Component
13+
14+
import scala.collection.JavaConverters._
15+
16+
/**
17+
*
18+
* @date 2022-10-19
19+
* @author enjoyyin
20+
* @since 0.5.0
21+
*/
22+
@Component
23+
class SparkJarJobContentParser extends AbstractJobContentParser {
24+
25+
override val jobType: String = "spark.jar"
26+
override val runType: RunType = RunType.JAR
27+
28+
override def parseTo(job: StreamJob, jobVersion: StreamJobVersion): StreamisTransformJobContent = {
29+
val createFile: String => StreamisFile = fileName => {
30+
val file = new StreamJobVersionFiles()
31+
file.setFileName(fileName)
32+
file.setCreateBy(job.getCreateBy)
33+
file.setCreateTime(job.getCreateTime)
34+
file.setJobId(job.getId)
35+
file.setJobVersionId(jobVersion.getId)
36+
file.setVersion(jobVersion.getVersion)
37+
file.setStorePath("<Unknown>")
38+
file.setStoreType("<Unknown>")
39+
file
40+
}
41+
val transformJobContent = new StreamisJarTransformJobContent
42+
val jobContent = JsonUtils.jackson.readValue(jobVersion.getJobContent, classOf[util.Map[String, Object]])
43+
jobContent.get("main.class.jar") match {
44+
case mainClassJar: String =>
45+
transformJobContent.setMainClassJar(createFile(mainClassJar))
46+
case _ => throw new JobExecuteErrorException(30500, "main.class.jar is needed.")
47+
}
48+
jobContent.get("main.class") match {
49+
case mainClass: String =>
50+
transformJobContent.setMainClass(mainClass)
51+
case _ => throw new JobExecuteErrorException(30500, "main.class is needed.")
52+
}
53+
jobContent.get("args") match {
54+
case args: util.List[String] =>
55+
transformJobContent.setArgs(args)
56+
case _ =>
57+
}
58+
jobContent.get("hdfs.jars") match {
59+
case hdfsJars: util.List[String] =>
60+
transformJobContent.setHdfsJars(hdfsJars)
61+
case _ =>
62+
}
63+
jobContent.get("dependency.jars") match {
64+
case dependencyJars: util.List[String] =>
65+
val parsedDependencyJars = dependencyJars.asScala.filter(StringUtils.isNotBlank).map(createFile).asJava
66+
transformJobContent.setDependencyJars(parsedDependencyJars)
67+
case _ =>
68+
}
69+
jobContent.get("resources") match {
70+
case resources: util.List[String] =>
71+
val parsedResources = resources.asScala.filter(StringUtils.isNotBlank).map(createFile).asJava
72+
transformJobContent.setResources(parsedResources)
73+
case _ =>
74+
}
75+
transformJobContent
76+
}
77+
78+
}

streamis-jobmanager/streamis-jobmanager-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/restful/api/JobRestfulApi.java

Lines changed: 48 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import javax.servlet.http.HttpServletRequest;
5252
import java.io.IOException;
5353
import java.util.*;
54+
import java.util.stream.Collectors;
5455

5556
@RequestMapping(path = "/streamis/streamJobManager/job")
5657
@RestController
@@ -77,7 +78,7 @@ public Message getJobList(HttpServletRequest req,
7778
@RequestParam(value = "projectName", required = false) String projectName,
7879
@RequestParam(value = "jobName", required = false) String jobName,
7980
@RequestParam(value = "jobStatus", required = false) Integer jobStatus,
80-
@RequestParam(value = "jobCreator", required = false) String jobCreator) throws JobException {
81+
@RequestParam(value = "jobCreator", required = false) String jobCreator) {
8182
String username = SecurityFilter.getLoginUsername(req);
8283
if(StringUtils.isBlank(projectName)){
8384
return Message.error("Project name cannot be empty(项目名不能为空,请指定)");
@@ -100,7 +101,7 @@ public Message getJobList(HttpServletRequest req,
100101
}
101102

102103
@RequestMapping(path = "/createOrUpdate", method = RequestMethod.POST)
103-
public Message createOrUpdate(HttpServletRequest req, @Validated @RequestBody MetaJsonInfo metaJsonInfo) throws Exception {
104+
public Message createOrUpdate(HttpServletRequest req, @Validated @RequestBody MetaJsonInfo metaJsonInfo) {
104105
String username = SecurityFilter.getLoginUsername(req);
105106
String projectName = metaJsonInfo.getProjectName();
106107
if (StringUtils.isBlank(projectName)){
@@ -142,6 +143,11 @@ public Message executeJob(HttpServletRequest req, @RequestBody Map<String, Objec
142143
long jobId = Long.parseLong(json.get("jobId").toString());
143144
LOG.info("{} try to execute job {}.", userName, jobId);
144145
StreamJob streamJob = this.streamJobService.getJobById(jobId);
146+
if(streamJob == null) {
147+
return Message.error("not exists job " + jobId);
148+
} else if(!JobConf.SUPPORTED_MANAGEMENT_JOB_TYPES().getValue().contains(streamJob.getJobType())) {
149+
return Message.error("Job " + streamJob.getName() + " is not supported to execute.");
150+
}
145151
if (!streamJobService.hasPermission(streamJob, userName) &&
146152
!this.privilegeService.hasEditPrivilege(req, streamJob.getProjectName())) {
147153
return Message.error("Have no permission to execute StreamJob [" + jobId + "]");
@@ -166,6 +172,11 @@ public Message killJob(HttpServletRequest req,
166172
}
167173
LOG.info("{} try to kill job {}.", userName, jobId);
168174
StreamJob streamJob = this.streamJobService.getJobById(jobId);
175+
if(streamJob == null) {
176+
return Message.error("not exists job " + jobId);
177+
} else if(!JobConf.SUPPORTED_MANAGEMENT_JOB_TYPES().getValue().contains(streamJob.getJobType())) {
178+
return Message.error("Job " + streamJob.getName() + " is not supported to stop.");
179+
}
169180
if (!streamJobService.hasPermission(streamJob, userName) &&
170181
!this.privilegeService.hasEditPrivilege(req, streamJob.getProjectName())) {
171182
return Message.error("Have no permission to kill/stop StreamJob [" + jobId + "]");
@@ -181,7 +192,7 @@ public Message killJob(HttpServletRequest req,
181192

182193
@RequestMapping(path = "/details", method = RequestMethod.GET)
183194
public Message detailsJob(HttpServletRequest req, @RequestParam(value = "jobId", required = false) Long jobId,
184-
@RequestParam(value = "version", required = false) String version) throws IOException, JobException {
195+
@RequestParam(value = "version", required = false) String version) throws JobException {
185196
if (jobId == null) {
186197
JobExceptionManager.createException(30301, "jobId");
187198
}
@@ -225,7 +236,7 @@ public Message detailsJob(HttpServletRequest req, @RequestParam(value = "jobId",
225236
@RequestMapping(path = "/execute/history", method = RequestMethod.GET)
226237
public Message executeHistoryJob(HttpServletRequest req,
227238
@RequestParam(value = "jobId", required = false) Long jobId,
228-
@RequestParam(value = "version", required = false) String version) throws IOException, JobException {
239+
@RequestParam(value = "version", required = false) String version) throws JobException {
229240
String username = SecurityFilter.getLoginUsername(req);
230241
if (jobId == null) {
231242
throw JobExceptionManager.createException(30301, "jobId");
@@ -264,29 +275,34 @@ public Message addTask(HttpServletRequest req,
264275
// 如果存在正在运行的,先将其停止掉
265276
StreamTask streamTask = streamTaskService.getLatestTaskByJobId(streamJobs.get(0).getId());
266277
if(streamTask != null && JobConf.isRunning(streamTask.getStatus())) {
267-
LOG.warn("Streamis Job {} exists running task, update its status to stopped at first.", jobName);
278+
LOG.warn("Streamis Job {} exists running task, update its status from Running to stopped at first.", jobName);
268279
streamTask.setStatus((Integer) JobConf.FLINK_JOB_STATUS_STOPPED().getValue());
269280
streamTaskService.updateTask(streamTask);
270-
} else {
281+
} else if(streamTask == null) {
271282
// 这里取个巧,从该工程该用户有权限的Job中找到一个Flink的历史作业,作为这个Spark Streaming作业的jobId和jobInfo
272283
// 替换掉JobInfo中的 yarn 信息,这样我们前端就可以在不修改任何逻辑的情况下正常展示Spark Streaming作业了
273284
PageInfo<QueryJobListVo> jobList = streamJobService.getByProList(streamJobs.get(0).getProjectName(), username, null, 0, null);
274-
Optional<QueryJobListVo> copyJob = jobList.getList().stream().min((job1, job2) -> {
275-
if (job1.getStatus() > 0) {
276-
return 0;
285+
List<QueryJobListVo> copyJobs = jobList.getList().stream().filter(job -> !job.getJobType().startsWith("spark") && job.getStatus() > 0)
286+
.collect(Collectors.toList());
287+
if(copyJobs.isEmpty()) {
288+
return Message.error("no Flink Job has submitted, the register to Streamis cannot be succeeded.");
289+
}
290+
int index = 0;
291+
while(streamTask == null && index < copyJobs.size()) {
292+
StreamTask copyTask = streamTaskService.getLatestTaskByJobId(copyJobs.get(index).getId());
293+
if(copyTask == null) {
294+
index ++;
277295
} else {
278-
return 1;
296+
LOG.warn("Streamis Job {} will bind the linkisJobInfo from history Flink Job {} with linkisJobId: {}, linkisJobInfo: {}.",
297+
jobName, copyJobs.get(index).getName(), copyTask.getLinkisJobId(), copyTask.getLinkisJobInfo());
298+
streamTask = streamTaskService.createTask(streamJobs.get(0).getId(), (Integer) JobConf.FLINK_JOB_STATUS_RUNNING().getValue(), username);
299+
streamTask.setLinkisJobId(copyTask.getLinkisJobId());
300+
streamTask.setLinkisJobInfo(copyTask.getLinkisJobInfo());
279301
}
280-
});
281-
if(!copyJob.isPresent()) {
282-
return Message.error("If no Flink Job has submitted, the register to Streamis cannot be succeeded.");
283302
}
284-
StreamTask copyTask = streamTaskService.getLatestTaskByJobId(copyJob.get().getId());
285-
LOG.warn("Streamis Job {} will bind the linkisJobInfo from history Flink Job {} with linkisJobId: {}, linkisJobInfo: {}.",
286-
jobName, copyJob.get().getName(), copyTask.getLinkisJobId(), copyTask.getLinkisJobInfo());
287-
streamTask = streamTaskService.createTask(streamJobs.get(0).getId(), (Integer) JobConf.FLINK_JOB_STATUS_RUNNING().getValue(), username);
288-
streamTask.setLinkisJobId(copyTask.getLinkisJobId());
289-
streamTask.setLinkisJobInfo(copyTask.getLinkisJobInfo());
303+
if(streamTask == null) {
304+
return Message.error("no Flink Job has submitted, the register to Streamis cannot be succeeded.");
305+
}
290306
}
291307
streamTask.setStartTime(new Date());
292308
streamTask.setLastUpdateTime(new Date());
@@ -320,11 +336,11 @@ public Message stopTask(HttpServletRequest req,
320336
} else if(streamJobs.size() > 1) {
321337
return Message.error("Too many Streamis Job named " + jobName + ", we cannot distinguish between them.");
322338
} else if(!"spark.jar".equals(streamJobs.get(0).getJobType())) {
323-
return Message.error("Only spark.jar Job support to add new tasks.");
339+
return Message.error("Only spark.jar Job support to stop task.");
324340
}
325341
if (!streamJobService.hasPermission(streamJobs.get(0), username) &&
326342
!this.privilegeService.hasEditPrivilege(req, streamJobs.get(0).getProjectName())) {
327-
return Message.error("Have no permission to add new task for StreamJob [" + jobName + "].");
343+
return Message.error("Have no permission to stop task for StreamJob [" + jobName + "].");
328344
}
329345
// 如果存在正在运行的,将其停止掉
330346
StreamTask streamTask = streamTaskService.getLatestTaskByJobId(streamJobs.get(0).getId());
@@ -340,12 +356,17 @@ public Message stopTask(HttpServletRequest req,
340356

341357
@RequestMapping(path = "/progress", method = RequestMethod.GET)
342358
public Message progressJob(HttpServletRequest req, @RequestParam(value = "jobId", required = false) Long jobId,
343-
@RequestParam(value = "version", required = false) String version) throws IOException, JobException {
359+
@RequestParam(value = "version", required = false) String version) throws JobException {
344360
String username = SecurityFilter.getLoginUsername(req);
345361
if (jobId == null) {
346362
throw JobExceptionManager.createException(30301, "jobId");
347363
}
348364
StreamJob streamJob = this.streamJobService.getJobById(jobId);
365+
if(streamJob == null) {
366+
return Message.error("not exists job " + jobId);
367+
} else if(!JobConf.SUPPORTED_MANAGEMENT_JOB_TYPES().getValue().contains(streamJob.getJobType())) {
368+
return Message.error("Job " + streamJob.getName() + " is not supported to get progress.");
369+
}
349370
if (!streamJobService.hasPermission(streamJob, username) &&
350371
!this.privilegeService.hasAccessPrivilege(req, streamJob.getProjectName())) {
351372
return Message.error("Have no permission to view the progress of StreamJob [" + jobId + "]");
@@ -371,7 +392,6 @@ public Message uploadDetailsJob(HttpServletRequest req, @RequestParam(value = "j
371392
public Message getAlert(HttpServletRequest req, @RequestParam(value = "jobId", required = false) Long jobId,
372393
@RequestParam(value = "version", required = false) String version) {
373394
String username = SecurityFilter.getLoginUsername(req);
374-
375395
return Message.ok().data("list", streamJobService.getAlert(username, jobId, version));
376396
}
377397

@@ -436,6 +456,11 @@ public Message snapshot(@PathVariable("jobId")Long jobId, HttpServletRequest req
436456
try{
437457
String username = SecurityFilter.getLoginUsername(request);
438458
StreamJob streamJob = this.streamJobService.getJobById(jobId);
459+
if(streamJob == null) {
460+
return Message.error("not exists job " + jobId);
461+
} else if(!JobConf.SUPPORTED_MANAGEMENT_JOB_TYPES().getValue().contains(streamJob.getJobType())) {
462+
return Message.error("Job " + streamJob.getName() + " is not supported to do snapshot.");
463+
}
439464
if (!streamJobService.hasPermission(streamJob, username) &&
440465
!this.privilegeService.hasEditPrivilege(request, streamJob.getProjectName())){
441466
return Message.error("Have no permission to do snapshot for StreamJob [" + jobId + "]");

0 commit comments

Comments
 (0)