@@ -396,16 +396,27 @@ private RunStepResponse runIngester(RunStepResponse runStepResponse, Collection<
396396 stepStatusListeners .forEach ((StepStatusListener listener ) -> {
397397 listener .onStatusChange (runStepResponse .getJobId (), 0 , JobStatus .RUNNING_PREFIX + step , 0 , 0 , "starting step execution" );
398398 });
399- if ( !isStopped .get () && (uris == null || uris .size () == 0 )) {
399+
400+ if (uris == null || uris .size () == 0 ) {
401+ JsonNode jobDoc = null ;
402+ final String stepStatus ;
403+ if (isStopped .get ()) {
404+ stepStatus = JobStatus .CANCELED_PREFIX + step ;
405+ }
406+ else {
407+ stepStatus = JobStatus .COMPLETED_PREFIX + step ;
408+ }
409+
400410 stepStatusListeners .forEach ((StepStatusListener listener ) -> {
401- listener .onStatusChange (runStepResponse .getJobId (), 100 , JobStatus .COMPLETED_PREFIX + step , 0 , 0 , "provided file path returned 0 items" );
411+ listener .onStatusChange (runStepResponse .getJobId (), 100 , stepStatus , 0 , 0 ,
412+ (stepStatus .contains (JobStatus .COMPLETED_PREFIX ) ? "provided file path returned 0 items" : "job was stopped" ));
402413 });
403414 stepFinishedListeners .forEach ((StepFinishedListener ::onStepFinished ));
404415 runStepResponse .setCounts (0 ,0 ,0 ,0 ,0 );
405- runStepResponse .withStatus (JobStatus . COMPLETED_PREFIX + step );
406- JsonNode jobDoc ;
416+ runStepResponse .withStatus (stepStatus );
417+
407418 try {
408- jobDoc = jobDocManager .postJobs (jobId , JobStatus .COMPLETED_PREFIX + step , step , step , runStepResponse );
419+ jobDoc = jobDocManager .postJobs (jobId , stepStatus , step , stepStatus . contains ( JobStatus .COMPLETED_PREFIX ) ? step : null , runStepResponse );
409420 }
410421 catch (Exception e ) {
411422 throw e ;
0 commit comments