Skip to content

Commit df32413

Browse files
committed
Merge branch 'dev-0.2.3-log-collector' of github.com:WeDataSphere/Streamis into dev-0.2.3-log-collector
2 parents 5eb36e6 + 9d31ce6 commit df32413

File tree

7 files changed

+218
-106
lines changed

7 files changed

+218
-106
lines changed

streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/service/DefaultStreamTaskService.scala

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515

1616
package com.webank.wedatasphere.streamis.jobmanager.manager.service
1717

18+
import java.util
19+
import java.util.concurrent.Future
20+
import java.util.{Calendar, function}
21+
1822
import com.webank.wedatasphere.streamis.jobmanager.launcher.conf.JobConfKeyConstants
1923
import com.webank.wedatasphere.streamis.jobmanager.launcher.dao.StreamJobConfMapper
2024
import com.webank.wedatasphere.streamis.jobmanager.launcher.job.manager.JobLaunchManager
@@ -27,17 +31,18 @@ import com.webank.wedatasphere.streamis.jobmanager.manager.SpringContextHolder
2731
import com.webank.wedatasphere.streamis.jobmanager.manager.conf.JobConf
2832
import com.webank.wedatasphere.streamis.jobmanager.manager.conf.JobConf.FLINK_JOB_STATUS_FAILED
2933
import com.webank.wedatasphere.streamis.jobmanager.manager.dao.{StreamJobMapper, StreamTaskMapper}
30-
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.StreamTask
31-
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo.{ExecResultVo, JobProgressVo, JobStatusVo, PauseResultVo, ScheduleResultVo, StreamTaskListVo}
32-
import com.webank.wedatasphere.streamis.jobmanager.manager.exception.{JobErrorException, JobExecuteErrorException, JobFetchErrorException, JobPauseErrorException, JobTaskErrorException}
34+
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo._
35+
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.{StreamJob, StreamTask}
36+
import com.webank.wedatasphere.streamis.jobmanager.manager.exception.{JobExecuteErrorException, JobFetchErrorException, JobPauseErrorException, JobTaskErrorException}
3337
import com.webank.wedatasphere.streamis.jobmanager.manager.scheduler.FutureScheduler
3438
import com.webank.wedatasphere.streamis.jobmanager.manager.scheduler.events.AbstractStreamisSchedulerEvent.StreamisEventInfo
35-
import com.webank.wedatasphere.streamis.jobmanager.manager.scheduler.events.{AbstractStreamisSchedulerEvent, StreamisPhaseInSchedulerEvent}
39+
import com.webank.wedatasphere.streamis.jobmanager.manager.scheduler.events.StreamisPhaseInSchedulerEvent
3640
import com.webank.wedatasphere.streamis.jobmanager.manager.scheduler.events.StreamisPhaseInSchedulerEvent.ScheduleCommand
3741
import com.webank.wedatasphere.streamis.jobmanager.manager.transform.exception.TransformFailedErrorException
38-
import com.webank.wedatasphere.streamis.jobmanager.manager.transform.{StreamisTransformJobBuilder, Transform}
42+
import com.webank.wedatasphere.streamis.jobmanager.manager.transform.{StreamisTransformJobBuilder, TaskMetricsParser, Transform}
3943
import com.webank.wedatasphere.streamis.jobmanager.manager.util.DateUtils
4044
import com.webank.wedatasphere.streamis.jobmanager.manager.utils.StreamTaskUtils
45+
import javax.annotation.Resource
4146
import org.apache.commons.lang.StringUtils
4247
import org.apache.linkis.common.utils.{Logging, Utils}
4348
import org.apache.linkis.httpclient.dws.DWSHttpClient
@@ -47,10 +52,6 @@ import org.springframework.beans.factory.annotation.Autowired
4752
import org.springframework.stereotype.Service
4853
import org.springframework.transaction.annotation.Transactional
4954

50-
import java.util
51-
import java.util.{Calendar, Date, function}
52-
import java.util.concurrent.Future
53-
import javax.annotation.Resource
5455
import scala.collection.JavaConverters._
5556

5657

@@ -60,6 +61,7 @@ class DefaultStreamTaskService extends StreamTaskService with Logging{
6061
@Autowired private var streamTaskMapper:StreamTaskMapper=_
6162
@Autowired private var streamJobMapper:StreamJobMapper=_
6263
@Autowired private var streamisTransformJobBuilders: Array[StreamisTransformJobBuilder] = _
64+
@Autowired private var taskMetricsParser: Array[TaskMetricsParser] = _
6365

6466
@Resource
6567
private var jobLaunchManager: JobLaunchManager[_ <: JobInfo] = _
@@ -721,4 +723,15 @@ class DefaultStreamTaskService extends StreamTaskService with Logging{
721723
}
722724
}
723725

726+
override def getJobDetailsVO(streamJob: StreamJob, version: String): JobDetailsVo = {
727+
val flinkJobInfo = getTaskJobInfo(streamJob.getId, version)
728+
val jobStateInfos = flinkJobInfo.getJobStates
729+
val metricsStr = if (JobConf.SUPPORTED_MANAGEMENT_JOB_TYPES.getValue.contains(streamJob.getJobType)) null
730+
else if(jobStateInfos == null || jobStateInfos.length == 0) null
731+
else jobStateInfos(0).getLocation
732+
taskMetricsParser.find(_.canParse(streamJob)).map(_.parse(metricsStr)).filter { jobDetailsVO =>
733+
jobDetailsVO.setLinkisJobInfo(flinkJobInfo)
734+
true
735+
}.getOrElse(throw new JobFetchErrorException(30030, s"Cannot find a TaskMetricsParser to parse job details."))
736+
}
724737
}

streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/service/StreamTaskService.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ package com.webank.wedatasphere.streamis.jobmanager.manager.service
1818
import com.webank.wedatasphere.streamis.jobmanager.launcher.job.state.JobState
1919
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.entity.LogRequestPayload
2020
import com.webank.wedatasphere.streamis.jobmanager.launcher.linkis.job.FlinkJobInfo
21-
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.StreamTask
22-
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo.{ExecResultVo, JobProgressVo, JobStatusVo, PauseResultVo, StreamTaskListVo}
21+
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.{StreamJob, StreamTask}
22+
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo.{ExecResultVo, JobDetailsVo, JobProgressVo, JobStatusVo, PauseResultVo, StreamTaskListVo}
2323
import java.util
2424
import java.util.concurrent.Future
2525
/**
@@ -154,4 +154,6 @@ trait StreamTaskService {
154154

155155
def getStateInfo(taskId: Long): JobState
156156

157+
def getJobDetailsVO(streamJob: StreamJob, version: String): JobDetailsVo
158+
157159
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.webank.wedatasphere.streamis.jobmanager.manager.transform
2+
3+
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.StreamJob
4+
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo.JobDetailsVo
5+
6+
/**
7+
*
8+
* @date 2022-10-21
9+
* @author enjoyyin
10+
* @since 0.5.0
11+
*/
12+
trait TaskMetricsParser {
13+
14+
def canParse(streamJob: StreamJob): Boolean
15+
16+
def parse(metrics: String): JobDetailsVo
17+
18+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.webank.wedatasphere.streamis.jobmanager.manager.transform.parser
2+
3+
import java.util
4+
5+
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo.JobDetailsVo
6+
import com.webank.wedatasphere.streamis.jobmanager.manager.transform.TaskMetricsParser
7+
import org.apache.commons.lang3.StringUtils
8+
import org.apache.linkis.httpclient.dws.DWSHttpClient
9+
10+
/**
11+
*
12+
* @date 2022-10-21
13+
* @author enjoyyin
14+
* @since 0.5.0
15+
*/
16+
trait AbstractTaskMetricsParser extends TaskMetricsParser {
17+
18+
override def parse(metrics: String): JobDetailsVo = {
19+
val jobDetailsVO = new JobDetailsVo
20+
val dataNumberDTOS = new util.ArrayList[JobDetailsVo.DataNumberDTO]
21+
val loadConditionDTOs = new util.ArrayList[JobDetailsVo.LoadConditionDTO]
22+
val realTimeTrafficDTOS = new util.ArrayList[JobDetailsVo.RealTimeTrafficDTO]
23+
jobDetailsVO.setDataNumber(dataNumberDTOS)
24+
jobDetailsVO.setLoadCondition(loadConditionDTOs)
25+
jobDetailsVO.setRealTimeTraffic(realTimeTrafficDTOS)
26+
val metricsMap = if(StringUtils.isNotBlank(metrics)) DWSHttpClient.jacksonJson.readValue(metrics, classOf[util.Map[String, Object]])
27+
else new util.HashMap[String, Object](0)
28+
parse(metricsMap, dataNumberDTOS, loadConditionDTOs, realTimeTrafficDTOS)
29+
jobDetailsVO
30+
}
31+
32+
protected def parse(metricsMap: util.Map[String, Object],
33+
dataNumberDTOS: util.List[JobDetailsVo.DataNumberDTO],
34+
loadConditionDTOs: util.List[JobDetailsVo.LoadConditionDTO],
35+
realTimeTrafficDTOS: util.List[JobDetailsVo.RealTimeTrafficDTO]): Unit
36+
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package com.webank.wedatasphere.streamis.jobmanager.manager.transform.parser
2+
3+
import java.util
4+
5+
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.StreamJob
6+
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo.JobDetailsVo
7+
import org.springframework.stereotype.Component
8+
9+
/**
10+
*
11+
* @date 2022-10-21
12+
* @author enjoyyin
13+
* @since 0.5.0
14+
*/
15+
@Component
16+
class FlinkTaskMetricsParser extends AbstractTaskMetricsParser {
17+
18+
override def canParse(streamJob: StreamJob): Boolean = streamJob.getJobType.startsWith("flink.")
19+
20+
override def parse(metricsMap: util.Map[String, Object],
21+
dataNumberDTOS: util.List[JobDetailsVo.DataNumberDTO],
22+
loadConditionDTOs: util.List[JobDetailsVo.LoadConditionDTO],
23+
realTimeTrafficDTOS: util.List[JobDetailsVo.RealTimeTrafficDTO]): Unit = {
24+
// TODO This is just sample datas, waiting for it completed. We have planned it to a later release, welcome all partners to join us to realize this powerful feature.
25+
val dataNumberDTO = new JobDetailsVo.DataNumberDTO
26+
dataNumberDTO.setDataName("kafka topic")
27+
dataNumberDTO.setDataNumber(109345)
28+
dataNumberDTOS.add(dataNumberDTO)
29+
30+
val loadConditionDTO = new JobDetailsVo.LoadConditionDTO
31+
loadConditionDTO.setType("jobManager")
32+
loadConditionDTO.setHost("localhost")
33+
loadConditionDTO.setMemory("1.5")
34+
loadConditionDTO.setTotalMemory("2.0")
35+
loadConditionDTO.setGcLastTime("2020-08-01")
36+
loadConditionDTO.setGcLastConsume("1")
37+
loadConditionDTO.setGcTotalTime("2min")
38+
loadConditionDTOs.add(loadConditionDTO)
39+
40+
val realTimeTrafficDTO = new JobDetailsVo.RealTimeTrafficDTO
41+
realTimeTrafficDTO.setSourceKey("kafka topic")
42+
realTimeTrafficDTO.setSourceSpeed("100 Records/S")
43+
realTimeTrafficDTO.setTransformKey("transform")
44+
realTimeTrafficDTO.setSinkKey("hbase key")
45+
realTimeTrafficDTO.setSinkSpeed("10 Records/S")
46+
realTimeTrafficDTOS.add(realTimeTrafficDTO)
47+
}
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package com.webank.wedatasphere.streamis.jobmanager.manager.transform.parser
2+
3+
import java.util
4+
5+
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.StreamJob
6+
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.vo.JobDetailsVo
7+
import org.apache.linkis.common.utils.Utils
8+
import org.springframework.stereotype.Component
9+
10+
import scala.collection.JavaConverters._
11+
12+
/**
13+
*
14+
* @date 2022-10-21
15+
* @author enjoyyin
16+
* @since 0.5.0
17+
*/
18+
@Component
19+
class SparkTaskMetricsParser extends AbstractTaskMetricsParser {
20+
21+
override protected def parse(metricsMap: util.Map[String, Object],
22+
dataNumberDTOS: util.List[JobDetailsVo.DataNumberDTO],
23+
loadConditionDTOs: util.List[JobDetailsVo.LoadConditionDTO],
24+
realTimeTrafficDTOS: util.List[JobDetailsVo.RealTimeTrafficDTO]): Unit = {
25+
val addDataNumberDTO: String => () = key => {
26+
val batch = new JobDetailsVo.DataNumberDTO
27+
batch.setDataName(key)
28+
batch.setDataNumber(metricsMap.get(key).toString.toInt)
29+
dataNumberDTOS.add(batch)
30+
}
31+
addDataNumberDTO("waitingBatchs")
32+
addDataNumberDTO("runningBatchs")
33+
addDataNumberDTO("completedBatchs")
34+
metricsMap.get("executors") match {
35+
case executors: util.List[util.Map[String, AnyRef]] if !executors.isEmpty =>
36+
executors.asScala.foreach { executor =>
37+
val loadConditionDTO = new JobDetailsVo.LoadConditionDTO
38+
loadConditionDTO.setType(executor.get("type").asInstanceOf[String])
39+
loadConditionDTO.setHost(executor.get("host").asInstanceOf[String])
40+
loadConditionDTO.setMemory(executor.get("memory").asInstanceOf[String])
41+
loadConditionDTO.setTotalMemory(executor.get("totalMemory").asInstanceOf[String])
42+
loadConditionDTO.setGcLastTime(executor.get("gcLastTime").asInstanceOf[String])
43+
loadConditionDTO.setGcLastConsume(executor.get("gcLastConsume").asInstanceOf[String])
44+
loadConditionDTO.setGcTotalTime(executor.get("gcTotalTime").asInstanceOf[String])
45+
loadConditionDTOs.add(loadConditionDTO)
46+
}
47+
case _ =>
48+
val loadConditionDTO = new JobDetailsVo.LoadConditionDTO
49+
loadConditionDTO.setType("Driver")
50+
loadConditionDTO.setHost("<Unknown>")
51+
loadConditionDTO.setMemory("<Unknown>")
52+
loadConditionDTO.setTotalMemory("<Unknown>")
53+
loadConditionDTO.setGcLastTime("<Unknown>")
54+
loadConditionDTO.setGcLastConsume("<Unknown>")
55+
loadConditionDTO.setGcTotalTime("<Unknown>")
56+
loadConditionDTOs.add(loadConditionDTO)
57+
}
58+
val realTimeTrafficDTO = new JobDetailsVo.RealTimeTrafficDTO
59+
metricsMap.get("batchMetrics") match {
60+
case batchMetrics: util.List[util.Map[String, Object]] if !batchMetrics.isEmpty =>
61+
val batchMetric = batchMetrics.asScala.maxBy(_.get("batchTime").asInstanceOf[String])
62+
realTimeTrafficDTO.setSourceKey(metricsMap.getOrDefault("source", "<Unknown>").asInstanceOf[String])
63+
realTimeTrafficDTO.setSourceSpeed(batchMetric.get("inputRecords") + " Records")
64+
realTimeTrafficDTO.setTransformKey("processing")
65+
realTimeTrafficDTO.setSinkKey(metricsMap.getOrDefault("sink", "<Unknown>").asInstanceOf[String])
66+
val sinkSpeed = if (batchMetric.containsKey("totalDelay") && batchMetric.get("totalDelay") != null)
67+
Utils.msDurationToString(batchMetric.get("totalDelay").asInstanceOf[Long]) + " totalDelay"
68+
else if (batchMetric.containsKey("taskExecuteTime") && batchMetric.get("taskExecuteTime") != null)
69+
Utils.msDurationToString(batchMetric.get("taskExecuteTime").asInstanceOf[Long]) + " executeTime(Last Batch)"
70+
else "<Unknown>"
71+
realTimeTrafficDTO.setSinkSpeed(sinkSpeed)
72+
case _ =>
73+
realTimeTrafficDTO.setSourceKey("<Unknown Source>")
74+
realTimeTrafficDTO.setSourceSpeed("<Unknown> Records/S")
75+
realTimeTrafficDTO.setTransformKey("<Unknown Transform>")
76+
realTimeTrafficDTO.setSinkKey("<Unknown Sink>")
77+
realTimeTrafficDTO.setSinkSpeed("<Unknown> Records/S")
78+
}
79+
realTimeTrafficDTOS.add(realTimeTrafficDTO)
80+
}
81+
82+
override def canParse(streamJob: StreamJob): Boolean = streamJob.getJobType.startsWith("spark.")
83+
}

0 commit comments

Comments
 (0)