Skip to content

Commit 003d387

Browse files
committed
Merge branch 'dev-0.2.3-log-collector' of github.com:WeDataSphere/Streamis into dev-0.2.3-log-collector
2 parents ee23a76 + 8dab38c commit 003d387

File tree

1 file changed

+107
-36
lines changed
  • streamis-jobmanager/streamis-jobmanager-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/restful/api

1 file changed

+107
-36
lines changed

streamis-jobmanager/streamis-jobmanager-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/restful/api/JobRestfulApi.java

Lines changed: 107 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.commons.collections.CollectionUtils;
4040
import org.apache.commons.lang.exception.ExceptionUtils;
4141
import org.apache.commons.lang3.StringUtils;
42+
import org.apache.linkis.common.utils.Utils;
4243
import org.apache.linkis.httpclient.dws.DWSHttpClient;
4344
import org.apache.linkis.server.Message;
4445
import org.apache.linkis.server.security.SecurityFilter;
@@ -50,7 +51,6 @@
5051

5152
import javax.annotation.Resource;
5253
import javax.servlet.http.HttpServletRequest;
53-
import java.io.IOException;
5454
import java.util.*;
5555
import java.util.function.Function;
5656
import java.util.stream.Collectors;
@@ -194,44 +194,107 @@ public Message killJob(HttpServletRequest req,
194194

195195
@RequestMapping(path = "/details", method = RequestMethod.GET)
196196
public Message detailsJob(HttpServletRequest req, @RequestParam(value = "jobId", required = false) Long jobId,
197-
@RequestParam(value = "version", required = false) String version) throws JobException {
197+
@RequestParam(value = "version", required = false) String version) throws JobException, JsonProcessingException {
198198
if (jobId == null) {
199199
JobExceptionManager.createException(30301, "jobId");
200200
}
201-
// TODO This is just sample datas, waiting for it completed. We have planned it to a later release, welcome all partners to join us to realize this powerful feature.
201+
202+
StreamJob streamJob = streamJobService.getJobById(jobId);
203+
if(streamJob == null) {
204+
return Message.error("not exists job " + jobId);
205+
}
206+
FlinkJobInfo flinkJobInfo = streamTaskService.getTaskJobInfo(jobId,version);
202207
JobDetailsVo jobDetailsVO = new JobDetailsVo();
203208
List<JobDetailsVo.DataNumberDTO> dataNumberDTOS = new ArrayList<>();
204-
JobDetailsVo.DataNumberDTO dataNumberDTO = new JobDetailsVo.DataNumberDTO();
205-
dataNumberDTO.setDataName("kafka topic");
206-
dataNumberDTO.setDataNumber(109345);
207-
dataNumberDTOS.add(dataNumberDTO);
208-
209209
List<JobDetailsVo.LoadConditionDTO> loadConditionDTOs = new ArrayList<>();
210-
JobDetailsVo.LoadConditionDTO loadConditionDTO = new JobDetailsVo.LoadConditionDTO();
211-
loadConditionDTO.setType("jobManager");
212-
loadConditionDTO.setHost("localhost");
213-
loadConditionDTO.setMemory("1.5");
214-
loadConditionDTO.setTotalMemory("2.0");
215-
loadConditionDTO.setGcLastTime("2020-08-01");
216-
loadConditionDTO.setGcLastConsume("1");
217-
loadConditionDTO.setGcTotalTime("2min");
218-
loadConditionDTOs.add(loadConditionDTO);
219-
220210
List<JobDetailsVo.RealTimeTrafficDTO> realTimeTrafficDTOS = new ArrayList<>();
221-
JobDetailsVo.RealTimeTrafficDTO realTimeTrafficDTO = new JobDetailsVo.RealTimeTrafficDTO();
222-
realTimeTrafficDTO.setSourceKey("kafka topic");
223-
realTimeTrafficDTO.setSourceSpeed("100 Records/S");
224-
realTimeTrafficDTO.setTransformKey("transform");
225-
realTimeTrafficDTO.setSinkKey("hbase key");
226-
realTimeTrafficDTO.setSinkSpeed("10 Records/S");
227-
realTimeTrafficDTOS.add(realTimeTrafficDTO);
228-
211+
JobStateInfo[] jobStateInfos = flinkJobInfo.getJobStates();
212+
if(JobConf.SUPPORTED_MANAGEMENT_JOB_TYPES().getValue().contains(streamJob.getJobType()) ||
213+
(jobStateInfos == null || jobStateInfos.length == 0)) {
214+
// TODO This is just sample datas, waiting for it completed. We have planned it to a later release, welcome all partners to join us to realize this powerful feature.
215+
JobDetailsVo.DataNumberDTO dataNumberDTO = new JobDetailsVo.DataNumberDTO();
216+
dataNumberDTO.setDataName("kafka topic");
217+
dataNumberDTO.setDataNumber(109345);
218+
dataNumberDTOS.add(dataNumberDTO);
219+
220+
JobDetailsVo.LoadConditionDTO loadConditionDTO = new JobDetailsVo.LoadConditionDTO();
221+
loadConditionDTO.setType("jobManager");
222+
loadConditionDTO.setHost("localhost");
223+
loadConditionDTO.setMemory("1.5");
224+
loadConditionDTO.setTotalMemory("2.0");
225+
loadConditionDTO.setGcLastTime("2020-08-01");
226+
loadConditionDTO.setGcLastConsume("1");
227+
loadConditionDTO.setGcTotalTime("2min");
228+
loadConditionDTOs.add(loadConditionDTO);
229+
230+
JobDetailsVo.RealTimeTrafficDTO realTimeTrafficDTO = new JobDetailsVo.RealTimeTrafficDTO();
231+
realTimeTrafficDTO.setSourceKey("kafka topic");
232+
realTimeTrafficDTO.setSourceSpeed("100 Records/S");
233+
realTimeTrafficDTO.setTransformKey("transform");
234+
realTimeTrafficDTO.setSinkKey("hbase key");
235+
realTimeTrafficDTO.setSinkSpeed("10 Records/S");
236+
realTimeTrafficDTOS.add(realTimeTrafficDTO);
229237

230-
jobDetailsVO.setLinkisJobInfo(streamTaskService.getTaskJobInfo(jobId,version));
238+
} else {
239+
String metricsStr = jobStateInfos[0].getLocation();
240+
Map<String, Object> metricsMap = DWSHttpClient.jacksonJson().readValue(metricsStr, Map.class);
241+
JobDetailsVo.DataNumberDTO dataNumberDTO = new JobDetailsVo.DataNumberDTO();
242+
dataNumberDTO.setDataName("waitingBatchs");
243+
dataNumberDTO.setDataNumber(Math.toIntExact((Long) metricsMap.get("waitingBatchs")));
244+
dataNumberDTOS.add(dataNumberDTO);
245+
List<Map<String, Object>> executors = (List<Map<String, Object>>) metricsMap.get("executors");
246+
if(executors != null && !executors.isEmpty()) {
247+
executors.forEach(map -> {
248+
JobDetailsVo.LoadConditionDTO loadConditionDTO = new JobDetailsVo.LoadConditionDTO();
249+
loadConditionDTO.setType((String) map.get("type"));
250+
loadConditionDTO.setHost((String) map.get("host"));
251+
loadConditionDTO.setMemory((String) map.get("memory"));
252+
loadConditionDTO.setTotalMemory((String) map.get("totalMemory"));
253+
loadConditionDTO.setGcLastTime((String) map.get("gcLastTime"));
254+
loadConditionDTO.setGcLastConsume((String) map.get("gcLastConsume"));
255+
loadConditionDTO.setGcTotalTime((String) map.get("gcTotalTime"));
256+
loadConditionDTOs.add(loadConditionDTO);
257+
});
258+
} else {
259+
JobDetailsVo.LoadConditionDTO loadConditionDTO = new JobDetailsVo.LoadConditionDTO();
260+
loadConditionDTO.setType("sparkAppMaster");
261+
loadConditionDTO.setHost("<Unknown>");
262+
loadConditionDTO.setMemory("<Unknown>");
263+
loadConditionDTO.setTotalMemory("<Unknown>");
264+
loadConditionDTO.setGcLastTime("<Unknown>");
265+
loadConditionDTO.setGcLastConsume("<Unknown>");
266+
loadConditionDTO.setGcTotalTime("<Unknown>");
267+
loadConditionDTOs.add(loadConditionDTO);
268+
}
269+
JobDetailsVo.RealTimeTrafficDTO realTimeTrafficDTO = new JobDetailsVo.RealTimeTrafficDTO();
270+
List<Map<String, Object>> batchMetrics = (List<Map<String, Object>>) metricsMap.get("batchMetrics");
271+
if(batchMetrics != null && !batchMetrics.isEmpty()) {
272+
batchMetrics.stream().max(Comparator.comparing(map -> String.valueOf(map.get("batchTime")))).ifPresent(batchMetric -> {
273+
realTimeTrafficDTO.setSourceKey((String) metricsMap.getOrDefault("source", "<Unknown>"));
274+
realTimeTrafficDTO.setSourceSpeed(batchMetric.get("inputRecords") + " Records");
275+
realTimeTrafficDTO.setTransformKey("processing");
276+
realTimeTrafficDTO.setSinkKey((String) metricsMap.getOrDefault("sink", "<Unknown>"));
277+
String totalDelay = "<Unknown>";
278+
if(batchMetric.containsKey("totalDelay") && batchMetric.get("totalDelay") != null) {
279+
totalDelay = Utils.msDurationToString((long) batchMetric.get("totalDelay")) + " totalDelay";
280+
} else if(batchMetric.containsKey("taskExecuteTime") && batchMetric.get("taskExecuteTime") != null) {
281+
totalDelay = Utils.msDurationToString((long) batchMetric.get("taskExecuteTime")) + " executeTime(Last Batch)";
282+
}
283+
realTimeTrafficDTO.setSinkSpeed(totalDelay);
284+
});
285+
} else {
286+
realTimeTrafficDTO.setSourceKey("<Unknown Source>");
287+
realTimeTrafficDTO.setSourceSpeed("<Unknown> Records/S");
288+
realTimeTrafficDTO.setTransformKey("<Unknown Transform>");
289+
realTimeTrafficDTO.setSinkKey("<Unknown Sink>");
290+
realTimeTrafficDTO.setSinkSpeed("<Unknown> Records/S");
291+
}
292+
realTimeTrafficDTOS.add(realTimeTrafficDTO);
293+
}
231294
jobDetailsVO.setDataNumber(dataNumberDTOS);
232295
jobDetailsVO.setLoadCondition(loadConditionDTOs);
233296
jobDetailsVO.setRealTimeTraffic(realTimeTrafficDTOS);
234-
297+
jobDetailsVO.setLinkisJobInfo(flinkJobInfo);
235298
return Message.ok().data("details", jobDetailsVO);
236299
}
237300

@@ -276,12 +339,7 @@ public Message addTask(HttpServletRequest req,
276339
}
277340
// 如果存在正在运行的,先将其停止掉
278341
StreamTask streamTask = streamTaskService.getLatestTaskByJobId(streamJobs.get(0).getId());
279-
if(streamTask != null && JobConf.isRunning(streamTask.getStatus())) {
280-
LOG.warn("Streamis Job {} exists running task, update its status from Running to stopped at first.", jobName);
281-
streamTask.setStatus((Integer) JobConf.FLINK_JOB_STATUS_STOPPED().getValue());
282-
streamTask.setErrDesc("stopped by App's new task.");
283-
streamTaskService.updateTask(streamTask);
284-
} else if(streamTask == null) {
342+
if(streamTask == null) {
285343
// 这里取个巧,从该工程该用户有权限的Job中找到一个Flink的历史作业,作为这个Spark Streaming作业的jobId和jobInfo
286344
// 替换掉JobInfo中的 yarn 信息,这样我们前端就可以在不修改任何逻辑的情况下正常展示Spark Streaming作业了
287345
PageInfo<QueryJobListVo> jobList = streamJobService.getByProList(streamJobs.get(0).getProjectName(), username, null, null, null);
@@ -306,6 +364,19 @@ public Message addTask(HttpServletRequest req,
306364
if(streamTask == null) {
307365
return Message.error("no Flink task has been executed, the register to Streamis cannot be succeeded.");
308366
}
367+
} else {
368+
if(JobConf.isRunning(streamTask.getStatus())) {
369+
LOG.warn("Streamis Job {} exists running task, update its status from Running to stopped at first.", jobName);
370+
streamTask.setStatus((Integer) JobConf.FLINK_JOB_STATUS_STOPPED().getValue());
371+
streamTask.setErrDesc("stopped by App's new task.");
372+
streamTaskService.updateTask(streamTask);
373+
}
374+
StreamTask newStreamTask = streamTaskService.createTask(streamJobs.get(0).getId(), (Integer) JobConf.FLINK_JOB_STATUS_RUNNING().getValue(), username);
375+
streamTask.setId(newStreamTask.getId());
376+
streamTask.setVersion(newStreamTask.getVersion());
377+
streamTask.setErrDesc("");
378+
streamTask.setStatus(newStreamTask.getStatus());
379+
streamTask.setSubmitUser(username);
309380
}
310381
streamTask.setStartTime(new Date());
311382
streamTask.setLastUpdateTime(new Date());
@@ -314,7 +385,7 @@ public Message addTask(HttpServletRequest req,
314385
flinkJobInfo.setApplicationId(appId);
315386
flinkJobInfo.setApplicationUrl(appUrl);
316387
flinkJobInfo.setName(jobName);
317-
flinkJobInfo.setStatus(JobConf.getStatusString((Integer) JobConf.FLINK_JOB_STATUS_RUNNING().getValue()));
388+
flinkJobInfo.setStatus(JobConf.getStatusString(finalStreamTask.getStatus()));
318389
StreamTaskUtils.refreshInfo(finalStreamTask, flinkJobInfo);
319390
streamTaskService.updateTask(finalStreamTask);
320391
LOG.info("Streamis Job {} has added a new task successfully.", jobName);
@@ -359,7 +430,7 @@ public Message updateTask(HttpServletRequest req,
359430
jobName, flinkJobInfo.getApplicationId(), appId);
360431
return Message.ok("the request appId is not equals to the running task appId " + flinkJobInfo.getApplicationId());
361432
}
362-
JobStateInfo jobStateInfo = new JobStateInfo();
433+
JobStateInfo jobStateInfo = new JobStateInfo();
363434
jobStateInfo.setTimestamp(System.currentTimeMillis());
364435
jobStateInfo.setLocation(metrics);
365436
flinkJobInfo.setJobStates(new JobStateInfo[]{jobStateInfo});

0 commit comments

Comments
 (0)