2222import com .webank .wedatasphere .streamis .jobmanager .exception .JobExceptionManager ;
2323import com .webank .wedatasphere .streamis .jobmanager .launcher .job .JobInfo ;
2424import com .webank .wedatasphere .streamis .jobmanager .launcher .job .manager .JobLaunchManager ;
25+ import com .webank .wedatasphere .streamis .jobmanager .launcher .job .state .JobStateInfo ;
2526import com .webank .wedatasphere .streamis .jobmanager .launcher .linkis .entity .LogRequestPayload ;
2627import com .webank .wedatasphere .streamis .jobmanager .launcher .linkis .job .FlinkJobInfo ;
2728import com .webank .wedatasphere .streamis .jobmanager .manager .conf .JobConf ;
5152import javax .servlet .http .HttpServletRequest ;
5253import java .io .IOException ;
5354import java .util .*;
55+ import java .util .function .Function ;
5456import java .util .stream .Collectors ;
5557
5658@ RequestMapping (path = "/streamis/streamJobManager/job" )
@@ -277,6 +279,7 @@ public Message addTask(HttpServletRequest req,
277279 if (streamTask != null && JobConf .isRunning (streamTask .getStatus ())) {
278280 LOG .warn ("Streamis Job {} exists running task, update its status from Running to stopped at first." , jobName );
279281 streamTask .setStatus ((Integer ) JobConf .FLINK_JOB_STATUS_STOPPED ().getValue ());
282+ streamTask .setErrDesc ("stopped by App's new task." );
280283 streamTaskService .updateTask (streamTask );
281284 } else if (streamTask == null ) {
282285 // 这里取个巧,从该工程该用户有权限的Job中找到一个Flink的历史作业,作为这个Spark Streaming作业的jobId和jobInfo
@@ -306,21 +309,65 @@ public Message addTask(HttpServletRequest req,
306309 }
307310 streamTask .setStartTime (new Date ());
308311 streamTask .setLastUpdateTime (new Date ());
312+ StreamTask finalStreamTask = streamTask ;
313+ return withFlinkJobInfo (jobName , streamTask .getLinkisJobInfo (), flinkJobInfo -> {
314+ flinkJobInfo .setApplicationId (appId );
315+ flinkJobInfo .setApplicationUrl (appUrl );
316+ flinkJobInfo .setName (jobName );
317+ flinkJobInfo .setStatus (JobConf .getStatusString ((Integer ) JobConf .FLINK_JOB_STATUS_RUNNING ().getValue ()));
318+ StreamTaskUtils .refreshInfo (finalStreamTask , flinkJobInfo );
319+ streamTaskService .updateTask (finalStreamTask );
320+ LOG .info ("Streamis Job {} has added a new task successfully." , jobName );
321+ return Message .ok ();
322+ });
323+ }
324+
325+ private Message withFlinkJobInfo (String jobName , String flinkJobInfoStr , Function <FlinkJobInfo , Message > flinkJobInfoFunction ) {
309326 FlinkJobInfo flinkJobInfo ;
310327 try {
311- flinkJobInfo = DWSHttpClient .jacksonJson ().readValue (streamTask . getLinkisJobInfo () , FlinkJobInfo .class );
328+ flinkJobInfo = DWSHttpClient .jacksonJson ().readValue (flinkJobInfoStr , FlinkJobInfo .class );
312329 } catch (JsonProcessingException e ) {
313- LOG .error ("Job {} deserialize the jobInfo from history Job failed!" , jobName , e );
314- return Message .error ("Deserialize the jobInfo from history Job failed!" );
315- }
316- flinkJobInfo .setApplicationId (appId );
317- flinkJobInfo .setApplicationUrl (appUrl );
318- flinkJobInfo .setName (jobName );
319- flinkJobInfo .setStatus (JobConf .getStatusString ((Integer ) JobConf .FLINK_JOB_STATUS_RUNNING ().getValue ()));
320- StreamTaskUtils .refreshInfo (streamTask , flinkJobInfo );
321- streamTaskService .updateTask (streamTask );
322- LOG .info ("Streamis Job {} has added a new task successfully." , jobName );
323- return Message .ok ();
330+ LOG .error ("Job {} deserialize the flinkJobInfo string to object failed!" , jobName , e );
331+ return Message .error ("Deserialize the flinkJobInfo string to object failed!" );
332+ }
333+ return flinkJobInfoFunction .apply (flinkJobInfo );
334+ }
335+
336+ @ RequestMapping (path = "/updateTask" , method = RequestMethod .GET )
337+ public Message updateTask (HttpServletRequest req ,
338+ @ RequestParam (value = "jobName" ) String jobName ,
339+ @ RequestParam (value = "appId" ) String appId ,
340+ @ RequestParam (value = "metrics" ) String metrics ) {
341+ String username = SecurityFilter .getLoginUsername (req );
342+ LOG .info ("User {} try to update task for Streamis job {} with appId: {}." , username , jobName , appId );
343+ List <StreamJob > streamJobs = streamJobService .getJobByName (jobName );
344+ if (CollectionUtils .isEmpty (streamJobs )) {
345+ return Message .error ("Not exits Streamis job " + jobName );
346+ } else if (streamJobs .size () > 1 ) {
347+ return Message .error ("Too many Streamis Job named " + jobName + ", we cannot distinguish between them." );
348+ } else if (!"spark.jar" .equals (streamJobs .get (0 ).getJobType ())) {
349+ return Message .error ("Only spark.jar Job support to update task." );
350+ }
351+ StreamTask streamTask = streamTaskService .getLatestTaskByJobId (streamJobs .get (0 ).getId ());
352+ if (streamTask == null ) {
353+ LOG .warn ("Job {} is not exists running task, ignore to update its metrics." , jobName );
354+ return Message .ok ("not exists running task, ignore it." );
355+ }
356+ return withFlinkJobInfo (jobName , streamTask .getLinkisJobInfo (), flinkJobInfo -> {
357+ if (!flinkJobInfo .getApplicationId ().equals (appId )) {
358+ LOG .warn ("Job {} with running task <appId: {}> is not equals to the request appId: {}, ignore to update its metrics." ,
359+ jobName , flinkJobInfo .getApplicationId (), appId );
360+ return Message .ok ("the request appId is not equals to the running task appId " + flinkJobInfo .getApplicationId ());
361+ }
362+ JobStateInfo jobStateInfo = new JobStateInfo ();
363+ jobStateInfo .setTimestamp (System .currentTimeMillis ());
364+ jobStateInfo .setLocation (metrics );
365+ flinkJobInfo .setJobStates (new JobStateInfo []{jobStateInfo });
366+ StreamTaskUtils .refreshInfo (streamTask , flinkJobInfo );
367+ streamTaskService .updateTask (streamTask );
368+ LOG .info ("Streamis Job {} has updated the task metrics successfully." , jobName );
369+ return Message .ok ();
370+ });
324371 }
325372
326373 @ RequestMapping (path = "/stopTask" , method = RequestMethod .GET )
@@ -345,13 +392,23 @@ public Message stopTask(HttpServletRequest req,
345392 // 如果存在正在运行的,将其停止掉
346393 StreamTask streamTask = streamTaskService .getLatestTaskByJobId (streamJobs .get (0 ).getId ());
347394 if (streamTask != null && JobConf .isRunning (streamTask .getStatus ())) {
348- LOG .warn ("Streamis Job {} is exists running task, update its status to stopped." , jobName );
349- streamTask .setStatus ((Integer ) JobConf .FLINK_JOB_STATUS_STOPPED ().getValue ());
350- streamTaskService .updateTask (streamTask );
395+ return withFlinkJobInfo (jobName , streamTask .getLinkisJobInfo (), flinkJobInfo -> {
396+ if (flinkJobInfo .getApplicationId ().equals (appId )) {
397+ LOG .warn ("Streamis Job {} is exists running task, update its status to stopped." , jobName );
398+ streamTask .setStatus ((Integer ) JobConf .FLINK_JOB_STATUS_STOPPED ().getValue ());
399+ streamTask .setErrDesc ("stopped by App itself." );
400+ streamTaskService .updateTask (streamTask );
401+ return Message .ok ();
402+ } else {
403+ LOG .warn ("Job {} with running task <appId: {}> is not equals to the request appId: {}, ignore to stop it." ,
404+ jobName , flinkJobInfo .getApplicationId (), appId );
405+ return Message .ok ("the request appId is not equals to the running task appId " + flinkJobInfo .getApplicationId ());
406+ }
407+ });
351408 } else {
352409 LOG .warn ("Streamis Job {} is not exists running task, ignore to stop it." , jobName );
410+ return Message .ok ();
353411 }
354- return Message .ok ();
355412 }
356413
357414 @ RequestMapping (path = "/progress" , method = RequestMethod .GET )
@@ -411,6 +468,12 @@ public Message getLog(HttpServletRequest req,
411468 logType = StringUtils .isBlank (logType ) ? "client" : logType ;
412469 String username = SecurityFilter .getLoginUsername (req );
413470 StreamJob streamJob = this .streamJobService .getJobById (jobId );
471+ if (streamJob == null ) {
472+ return Message .error ("not exists job " + jobId );
473+ } else if (!JobConf .SUPPORTED_MANAGEMENT_JOB_TYPES ().getValue ().contains (streamJob .getJobType ()) &&
474+ "client" .equals (logType )) {
475+ return Message .error ("Job " + streamJob .getName () + " is not supported to get client logs." );
476+ }
414477 if (!streamJobService .hasPermission (streamJob , username ) &&
415478 !this .privilegeService .hasAccessPrivilege (req , streamJob .getProjectName ())) {
416479 return Message .error ("Have no permission to fetch logs from StreamJob [" + jobId + "]" );
0 commit comments