1515
1616package com .webank .wedatasphere .streamis .jobmanager .manager .service
1717
18+ import java .util
19+ import java .util .concurrent .Future
20+ import java .util .{Calendar , function }
21+
1822import com .webank .wedatasphere .streamis .jobmanager .launcher .conf .JobConfKeyConstants
1923import com .webank .wedatasphere .streamis .jobmanager .launcher .dao .StreamJobConfMapper
2024import com .webank .wedatasphere .streamis .jobmanager .launcher .job .manager .JobLaunchManager
@@ -27,17 +31,18 @@ import com.webank.wedatasphere.streamis.jobmanager.manager.SpringContextHolder
2731import com .webank .wedatasphere .streamis .jobmanager .manager .conf .JobConf
2832import com .webank .wedatasphere .streamis .jobmanager .manager .conf .JobConf .FLINK_JOB_STATUS_FAILED
2933import com .webank .wedatasphere .streamis .jobmanager .manager .dao .{StreamJobMapper , StreamTaskMapper }
30- import com .webank .wedatasphere .streamis .jobmanager .manager .entity .StreamTask
31- import com .webank .wedatasphere .streamis .jobmanager .manager .entity .vo .{ ExecResultVo , JobProgressVo , JobStatusVo , PauseResultVo , ScheduleResultVo , StreamTaskListVo }
32- import com .webank .wedatasphere .streamis .jobmanager .manager .exception .{JobErrorException , JobExecuteErrorException , JobFetchErrorException , JobPauseErrorException , JobTaskErrorException }
34+ import com .webank .wedatasphere .streamis .jobmanager .manager .entity .vo . _
35+ import com .webank .wedatasphere .streamis .jobmanager .manager .entity .{ StreamJob , StreamTask }
36+ import com .webank .wedatasphere .streamis .jobmanager .manager .exception .{JobExecuteErrorException , JobFetchErrorException , JobPauseErrorException , JobTaskErrorException }
3337import com .webank .wedatasphere .streamis .jobmanager .manager .scheduler .FutureScheduler
3438import com .webank .wedatasphere .streamis .jobmanager .manager .scheduler .events .AbstractStreamisSchedulerEvent .StreamisEventInfo
35- import com .webank .wedatasphere .streamis .jobmanager .manager .scheduler .events .{ AbstractStreamisSchedulerEvent , StreamisPhaseInSchedulerEvent }
39+ import com .webank .wedatasphere .streamis .jobmanager .manager .scheduler .events .StreamisPhaseInSchedulerEvent
3640import com .webank .wedatasphere .streamis .jobmanager .manager .scheduler .events .StreamisPhaseInSchedulerEvent .ScheduleCommand
3741import com .webank .wedatasphere .streamis .jobmanager .manager .transform .exception .TransformFailedErrorException
38- import com .webank .wedatasphere .streamis .jobmanager .manager .transform .{StreamisTransformJobBuilder , Transform }
42+ import com .webank .wedatasphere .streamis .jobmanager .manager .transform .{StreamisTransformJobBuilder , TaskMetricsParser , Transform }
3943import com .webank .wedatasphere .streamis .jobmanager .manager .util .DateUtils
4044import com .webank .wedatasphere .streamis .jobmanager .manager .utils .StreamTaskUtils
45+ import javax .annotation .Resource
4146import org .apache .commons .lang .StringUtils
4247import org .apache .linkis .common .utils .{Logging , Utils }
4348import org .apache .linkis .httpclient .dws .DWSHttpClient
@@ -47,10 +52,6 @@ import org.springframework.beans.factory.annotation.Autowired
4752import org .springframework .stereotype .Service
4853import org .springframework .transaction .annotation .Transactional
4954
50- import java .util
51- import java .util .{Calendar , Date , function }
52- import java .util .concurrent .Future
53- import javax .annotation .Resource
5455import scala .collection .JavaConverters ._
5556
5657
@@ -60,6 +61,7 @@ class DefaultStreamTaskService extends StreamTaskService with Logging{
6061 @ Autowired private var streamTaskMapper : StreamTaskMapper = _
6162 @ Autowired private var streamJobMapper : StreamJobMapper = _
6263 @ Autowired private var streamisTransformJobBuilders : Array [StreamisTransformJobBuilder ] = _
64+ @ Autowired private var taskMetricsParser : Array [TaskMetricsParser ] = _
6365
6466 @ Resource
6567 private var jobLaunchManager : JobLaunchManager [_ <: JobInfo ] = _
@@ -721,4 +723,15 @@ class DefaultStreamTaskService extends StreamTaskService with Logging{
721723 }
722724 }
723725
726+ override def getJobDetailsVO (streamJob : StreamJob , version : String ): JobDetailsVo = {
727+ val flinkJobInfo = getTaskJobInfo(streamJob.getId, version)
728+ val jobStateInfos = flinkJobInfo.getJobStates
729+ val metricsStr = if (JobConf .SUPPORTED_MANAGEMENT_JOB_TYPES .getValue.contains(streamJob.getJobType)) null
730+ else if (jobStateInfos == null || jobStateInfos.length == 0 ) null
731+ else jobStateInfos(0 ).getLocation
732+ taskMetricsParser.find(_.canParse(streamJob)).map(_.parse(metricsStr)).filter { jobDetailsVO =>
733+ jobDetailsVO.setLinkisJobInfo(flinkJobInfo)
734+ true
735+ }.getOrElse(throw new JobFetchErrorException (30030 , s " Cannot find a TaskMetricsParser to parse job details. " ))
736+ }
724737}
0 commit comments