Skip to content

Commit 7bb5665

Browse files
committed
Merge branch 'dev-0.2.3-log-collector' of github.com:WeDataSphere/Streamis into dev-0.2.3-log-collector
2 parents 4c44089 + 2b57568 commit 7bb5665

File tree

7 files changed

+131
-12
lines changed

7 files changed

+131
-12
lines changed

streamis-jobmanager/streamis-job-manager/streamis-job-manager-base/src/main/java/com/webank/wedatasphere/streamis/jobmanager/manager/dao/StreamJobMapper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ List<QueryJobListVo> getJobLists(@Param("projectName") String projectName, @Para
3030

3131
StreamJob getJobById(@Param("jobId") Long jobId);
3232

33+
List<StreamJob> getJobByName(@Param("jobName") String jobName);
3334

3435
List<StreamJobVersion> getJobVersions(@Param("jobId") Long jobId);
3536

streamis-jobmanager/streamis-job-manager/streamis-job-manager-base/src/main/java/com/webank/wedatasphere/streamis/jobmanager/manager/dao/impl/StreamJobMapper.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@
8686
<!-- linkis_stream_job j,linkis_stream_job_version l WHERE j.id = l.job_id and j.id=#{jobId}-->
8787
</select>
8888

89+
<select id="getJobByName" resultType="com.webank.wedatasphere.streamis.jobmanager.manager.entity.StreamJob">
90+
SELECT <include refid="Job_Column"/> FROM
91+
linkis_stream_job WHERE name = #{jobName}
92+
</select>
93+
8994
<!-- <select id="getJobById" resultType="com.webank.wedatasphere.streamis.jobmanager.manager.entity.StreamJob">-->
9095
<!-- SELECT j.`id`,j.`name`,j.`status`,j.`submit_user`,j.`description`,j.`label`,l.`create_time`,l.`version` as currentVersion,j.`job_type` FROM-->
9196
<!-- linkis_stream_job j,linkis_stream_job_version l WHERE j.id = l.job_id and j.id=#{jobId} and l.`version` = #{version}-->

streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/service/DefaultStreamJobService.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,26 @@ package com.webank.wedatasphere.streamis.jobmanager.manager.service
1717

1818
import java.util
1919
import java.util.Date
20+
2021
import com.github.pagehelper.PageInfo
2122
import com.webank.wedatasphere.streamis.jobmanager.launcher.conf.JobConfKeyConstants
2223
import com.webank.wedatasphere.streamis.jobmanager.launcher.service.StreamJobConfService
2324
import com.webank.wedatasphere.streamis.jobmanager.manager.alert.AlertLevel
24-
import org.apache.linkis.common.exception.ErrorException
25-
import org.apache.linkis.common.utils.Logging
2625
import com.webank.wedatasphere.streamis.jobmanager.manager.conf.JobConf
2726
import com.webank.wedatasphere.streamis.jobmanager.manager.dao.{StreamAlertMapper, StreamJobMapper, StreamTaskMapper}
2827
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo.{QueryJobListVo, TaskCoreNumVo, VersionDetailVo}
29-
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.{MetaJsonInfo, StreamAlertRecord, StreamJob, StreamJobVersion, StreamJobVersionFiles}
28+
import com.webank.wedatasphere.streamis.jobmanager.manager.entity._
3029
import com.webank.wedatasphere.streamis.jobmanager.manager.exception.{JobCreateErrorException, JobFetchErrorException}
3130
import com.webank.wedatasphere.streamis.jobmanager.manager.transform.JobContentParser
3231
import com.webank.wedatasphere.streamis.jobmanager.manager.transform.entity.StreamisTransformJobContent
3332
import com.webank.wedatasphere.streamis.jobmanager.manager.util.{ReaderUtils, ZipHelper}
3433
import org.apache.commons.lang.StringUtils
34+
import org.apache.linkis.common.exception.ErrorException
35+
import org.apache.linkis.common.utils.Logging
3536
import org.springframework.beans.factory.annotation.Autowired
3637
import org.springframework.stereotype.Service
3738
import org.springframework.transaction.annotation.Transactional
3839

39-
import javax.annotation.Resource
4040
import scala.collection.JavaConverters._
4141

4242

@@ -60,6 +60,8 @@ class DefaultStreamJobService extends StreamJobService with Logging {
6060
this.streamJobMapper.getJobById(jobId)
6161
}
6262

63+
override def getJobByName(jobName: String): util.List[StreamJob] = streamJobMapper.getJobByName(jobName)
64+
6365
override def getByProList(projectName: String, userName: String, jobName: String, jobStatus: Integer, jobCreator: String): PageInfo[QueryJobListVo] = {
6466
val streamJobList = streamJobMapper.getJobLists(projectName, userName, jobName, jobStatus, jobCreator)
6567
if (streamJobList != null && !streamJobList.isEmpty) {

streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/service/DefaultStreamTaskService.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,7 @@ class DefaultStreamTaskService extends StreamTaskService with Logging{
483483
}).asJava
484484
}
485485

486-
def getTask(jobId:Long, version: String): FlinkJobInfo ={
486+
def getTaskJobInfo(jobId:Long, version: String): FlinkJobInfo ={
487487
val str = streamTaskMapper.getTask(jobId, version)
488488
if (StringUtils.isBlank(str)) {
489489
return new FlinkJobInfo
@@ -524,6 +524,9 @@ class DefaultStreamTaskService extends StreamTaskService with Logging{
524524
}
525525
}
526526

527+
528+
override def getLatestTaskByJobId(jobId: Long): StreamTask = streamTaskMapper.getLatestByJobId(jobId)
529+
527530
/**
528531
* Create new task use the latest job version
529532
*
@@ -558,6 +561,8 @@ class DefaultStreamTaskService extends StreamTaskService with Logging{
558561
}
559562
}
560563

564+
override def updateTask(streamTask: StreamTask): Unit = streamTaskMapper.updateTask(streamTask)
565+
561566
/**
562567
* Just launch task by task id
563568
*

streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/service/StreamJobService.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ trait StreamJobService {
1515

1616

1717
def getJobById(jobId: Long): StreamJob
18+
19+
def getJobByName(jobName: String): util.List[StreamJob]
20+
1821
/**
1922
* Page list query
2023
* @param projectName project name

streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/service/StreamTaskService.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.entity.LogReq
2020
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.FlinkJobInfo
2121
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.StreamTask
2222
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo.{ExecResultVo, JobProgressVo, JobStatusVo, PauseResultVo, StreamTaskListVo}
23-
2423
import java.util
2524
import java.util.concurrent.Future
2625
/**
@@ -87,6 +86,8 @@ trait StreamTaskService {
8786
*/
8887
def launch(taskId: Long, execUser: String): Unit
8988

89+
def getLatestTaskByJobId(jobId: Long): StreamTask
90+
9091
/**
9192
* Create new task use the latest job version
9293
* @param jobId job id
@@ -95,6 +96,8 @@ trait StreamTaskService {
9596
*/
9697
def createTask(jobId: Long, status: Int, creator: String): StreamTask
9798

99+
def updateTask(streamTask: StreamTask): Unit
100+
98101
/**
99102
* Update the task status
100103
* @param jobId job id
@@ -146,7 +149,7 @@ trait StreamTaskService {
146149
* @param version version
147150
* @return
148151
*/
149-
def getTask(jobId: Long, version: String): FlinkJobInfo
152+
def getTaskJobInfo(jobId: Long, version: String): FlinkJobInfo
150153

151154

152155
def getStateInfo(taskId: Long): JobState

streamis-jobmanager/streamis-jobmanager-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/restful/api/JobRestfulApi.java

Lines changed: 105 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,30 @@
1515

1616
package com.webank.wedatasphere.streamis.jobmanager.restful.api;
1717

18+
import com.fasterxml.jackson.core.JsonProcessingException;
1819
import com.github.pagehelper.PageHelper;
1920
import com.github.pagehelper.PageInfo;
2021
import com.webank.wedatasphere.streamis.jobmanager.exception.JobException;
2122
import com.webank.wedatasphere.streamis.jobmanager.exception.JobExceptionManager;
2223
import com.webank.wedatasphere.streamis.jobmanager.launcher.job.JobInfo;
2324
import com.webank.wedatasphere.streamis.jobmanager.launcher.job.manager.JobLaunchManager;
2425
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.entity.LogRequestPayload;
26+
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.FlinkJobInfo;
27+
import com.webank.wedatasphere.streamis.jobmanager.manager.conf.JobConf;
2528
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.MetaJsonInfo;
2629
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.StreamJob;
2730
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.StreamJobVersion;
31+
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.StreamTask;
2832
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo.*;
2933
import com.webank.wedatasphere.streamis.jobmanager.manager.project.service.ProjectPrivilegeService;
3034
import com.webank.wedatasphere.streamis.jobmanager.manager.service.StreamJobService;
3135
import com.webank.wedatasphere.streamis.jobmanager.manager.service.StreamTaskService;
3236
import com.webank.wedatasphere.streamis.jobmanager.manager.transform.entity.StreamisTransformJobContent;
37+
import com.webank.wedatasphere.streamis.jobmanager.manager.utils.StreamTaskUtils;
38+
import org.apache.commons.collections.CollectionUtils;
3339
import org.apache.commons.lang.exception.ExceptionUtils;
3440
import org.apache.commons.lang3.StringUtils;
41+
import org.apache.linkis.httpclient.dws.DWSHttpClient;
3542
import org.apache.linkis.server.Message;
3643
import org.apache.linkis.server.security.SecurityFilter;
3744
import org.slf4j.Logger;
@@ -43,10 +50,7 @@
4350
import javax.annotation.Resource;
4451
import javax.servlet.http.HttpServletRequest;
4552
import java.io.IOException;
46-
import java.util.ArrayList;
47-
import java.util.List;
48-
import java.util.Map;
49-
import java.util.Objects;
53+
import java.util.*;
5054

5155
@RequestMapping(path = "/streamis/streamJobManager/job")
5256
@RestController
@@ -210,7 +214,7 @@ public Message detailsJob(HttpServletRequest req, @RequestParam(value = "jobId",
210214
realTimeTrafficDTOS.add(realTimeTrafficDTO);
211215

212216

213-
jobDetailsVO.setLinkisJobInfo(streamTaskService.getTask(jobId,version));
217+
jobDetailsVO.setLinkisJobInfo(streamTaskService.getTaskJobInfo(jobId,version));
214218
jobDetailsVO.setDataNumber(dataNumberDTOS);
215219
jobDetailsVO.setLoadCondition(loadConditionDTOs);
216220
jobDetailsVO.setRealTimeTraffic(realTimeTrafficDTOS);
@@ -238,6 +242,102 @@ public Message executeHistoryJob(HttpServletRequest req,
238242
return Message.ok().data("details", details);
239243
}
240244

245+
@RequestMapping(path = "/addTask", method = RequestMethod.GET)
246+
public Message addTask(HttpServletRequest req,
247+
@RequestParam(value = "jobName") String jobName,
248+
@RequestParam(value = "appId") String appId,
249+
@RequestParam(value = "appUrl") String appUrl) {
250+
String username = SecurityFilter.getLoginUsername(req);
251+
LOG.info("User {} try to add a new task for Streamis job {} with appId: {}, appUrl: {}.", username, jobName, appId, appUrl);
252+
List<StreamJob> streamJobs = streamJobService.getJobByName(jobName);
253+
if(CollectionUtils.isEmpty(streamJobs)) {
254+
return Message.error("Not exits Streamis job " + jobName);
255+
} else if(streamJobs.size() > 1) {
256+
return Message.error("Too many Streamis Job named " + jobName + ", we cannot distinguish between them.");
257+
} else if(!"spark.jar".equals(streamJobs.get(0).getJobType())) {
258+
return Message.error("Only spark.jar Job support to add new tasks.");
259+
}
260+
if (!streamJobService.hasPermission(streamJobs.get(0), username) &&
261+
!this.privilegeService.hasEditPrivilege(req, streamJobs.get(0).getProjectName())) {
262+
return Message.error("Have no permission to add new task for StreamJob [" + jobName + "].");
263+
}
264+
// 如果存在正在运行的,先将其停止掉
265+
StreamTask streamTask = streamTaskService.getLatestTaskByJobId(streamJobs.get(0).getId());
266+
if(streamTask != null && JobConf.isRunning(streamTask.getStatus())) {
267+
LOG.warn("Streamis Job {} exists running task, update its status to stopped at first.", jobName);
268+
streamTask.setStatus((Integer) JobConf.FLINK_JOB_STATUS_STOPPED().getValue());
269+
streamTaskService.updateTask(streamTask);
270+
} else {
271+
// 这里取个巧,从该工程该用户有权限的Job中找到一个Flink的历史作业,作为这个Spark Streaming作业的jobId和jobInfo
272+
// 替换掉JobInfo中的 yarn 信息,这样我们前端就可以在不修改任何逻辑的情况下正常展示Spark Streaming作业了
273+
PageInfo<QueryJobListVo> jobList = streamJobService.getByProList(streamJobs.get(0).getProjectName(), username, null, 0, null);
274+
Optional<QueryJobListVo> copyJob = jobList.getList().stream().min((job1, job2) -> {
275+
if (job1.getStatus() > 0) {
276+
return 0;
277+
} else {
278+
return 1;
279+
}
280+
});
281+
if(!copyJob.isPresent()) {
282+
return Message.error("If no Flink Job has submitted, the register to Streamis cannot be succeeded.");
283+
}
284+
StreamTask copyTask = streamTaskService.getLatestTaskByJobId(copyJob.get().getId());
285+
LOG.warn("Streamis Job {} will bind the linkisJobInfo from history Flink Job {} with linkisJobId: {}, linkisJobInfo: {}.",
286+
jobName, copyJob.get().getName(), copyTask.getLinkisJobId(), copyTask.getLinkisJobInfo());
287+
streamTask = streamTaskService.createTask(streamJobs.get(0).getId(), (Integer) JobConf.FLINK_JOB_STATUS_RUNNING().getValue(), username);
288+
streamTask.setLinkisJobId(copyTask.getLinkisJobId());
289+
streamTask.setLinkisJobInfo(copyTask.getLinkisJobInfo());
290+
}
291+
streamTask.setStartTime(new Date());
292+
streamTask.setLastUpdateTime(new Date());
293+
FlinkJobInfo flinkJobInfo;
294+
try {
295+
flinkJobInfo = DWSHttpClient.jacksonJson().readValue(streamTask.getLinkisJobInfo(), FlinkJobInfo.class);
296+
} catch (JsonProcessingException e) {
297+
LOG.error("Job {} deserialize the jobInfo from history Job failed!", jobName, e);
298+
return Message.error("Deserialize the jobInfo from history Job failed!");
299+
}
300+
flinkJobInfo.setApplicationId(appId);
301+
flinkJobInfo.setApplicationUrl(appUrl);
302+
flinkJobInfo.setName(jobName);
303+
flinkJobInfo.setStatus(JobConf.getStatusString((Integer) JobConf.FLINK_JOB_STATUS_RUNNING().getValue()));
304+
StreamTaskUtils.refreshInfo(streamTask, flinkJobInfo);
305+
streamTaskService.updateTask(streamTask);
306+
LOG.info("Streamis Job {} has added a new task successfully.", jobName);
307+
return Message.ok();
308+
}
309+
310+
@RequestMapping(path = "/stopTask", method = RequestMethod.GET)
311+
public Message stopTask(HttpServletRequest req,
312+
@RequestParam(value = "jobName") String jobName,
313+
@RequestParam(value = "appId") String appId,
314+
@RequestParam(value = "appUrl") String appUrl) {
315+
String username = SecurityFilter.getLoginUsername(req);
316+
LOG.info("User {} try to stop task for Streamis job {} with appId: {}, appUrl: {}.", username, jobName, appId, appUrl);
317+
List<StreamJob> streamJobs = streamJobService.getJobByName(jobName);
318+
if(CollectionUtils.isEmpty(streamJobs)) {
319+
return Message.error("Not exits Streamis job " + jobName);
320+
} else if(streamJobs.size() > 1) {
321+
return Message.error("Too many Streamis Job named " + jobName + ", we cannot distinguish between them.");
322+
} else if(!"spark.jar".equals(streamJobs.get(0).getJobType())) {
323+
return Message.error("Only spark.jar Job support to add new tasks.");
324+
}
325+
if (!streamJobService.hasPermission(streamJobs.get(0), username) &&
326+
!this.privilegeService.hasEditPrivilege(req, streamJobs.get(0).getProjectName())) {
327+
return Message.error("Have no permission to add new task for StreamJob [" + jobName + "].");
328+
}
329+
// 如果存在正在运行的,将其停止掉
330+
StreamTask streamTask = streamTaskService.getLatestTaskByJobId(streamJobs.get(0).getId());
331+
if(streamTask != null && JobConf.isRunning(streamTask.getStatus())) {
332+
LOG.warn("Streamis Job {} is exists running task, update its status to stopped.", jobName);
333+
streamTask.setStatus((Integer) JobConf.FLINK_JOB_STATUS_STOPPED().getValue());
334+
streamTaskService.updateTask(streamTask);
335+
} else {
336+
LOG.warn("Streamis Job {} is not exists running task, ignore to stop it.", jobName);
337+
}
338+
return Message.ok();
339+
}
340+
241341
@RequestMapping(path = "/progress", method = RequestMethod.GET)
242342
public Message progressJob(HttpServletRequest req, @RequestParam(value = "jobId", required = false) Long jobId,
243343
@RequestParam(value = "version", required = false) String version) throws IOException, JobException {

0 commit comments

Comments
 (0)