Skip to content

Commit abc4c3f

Browse files
committed
JobBase._execute() previously skipped some important post-job actions when exiting due to kill switch. Those actions have been placed under a finally block so that they are executed by both the "switching" job and the "responding" jobs.
However, some of these post actions added a lot of redundant and unhelpful terminal output when handling jobs killed DUE TO the kill switch. The verbose output obscured the error's cause which isn't helpful. Two new process statuses have been added in order to better handle the event: - indeterminant: a default value for processStatus. - killed: the job was killed due to the kill switch being set. This approach also means that partial outputs aren't collected from jobs that have been killed.
1 parent b302eca commit abc4c3f

File tree

3 files changed

+77
-71
lines changed

3 files changed

+77
-71
lines changed

cwltool/job.py

Lines changed: 55 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ def _execute(
298298
"{}".format(runtimeContext)
299299
)
300300
outputs: CWLObjectType = {}
301+
processStatus = "indeterminate"
301302
try:
302303
stdin_path = None
303304
if self.stdin is not None:
@@ -369,6 +370,7 @@ def stderr_stdout_log_path(
369370

370371
if processStatus != "success":
371372
if runtimeContext.kill_switch.is_set():
373+
processStatus = "killed"
372374
return
373375
elif rcode < 0:
374376
_logger.warning(
@@ -412,62 +414,64 @@ def stderr_stdout_log_path(
412414
_logger.error("[job %s] Job error:\n%s", self.name, str(err))
413415
processStatus = "permanentFail"
414416
except WorkflowKillSwitch:
417+
processStatus = "permanentFail"
415418
raise
416419
except Exception:
417420
_logger.exception("Exception while running job")
418421
processStatus = "permanentFail"
419-
if (
420-
runtimeContext.research_obj is not None
421-
and self.prov_obj is not None
422-
and runtimeContext.process_run_id is not None
423-
):
424-
# creating entities for the outputs produced by each step (in the provenance document)
425-
self.prov_obj.record_process_end(
426-
str(self.name),
427-
runtimeContext.process_run_id,
428-
outputs,
429-
datetime.datetime.now(),
430-
)
431-
if processStatus != "success":
432-
_logger.warning("[job %s] completed %s", self.name, processStatus)
433-
else:
434-
_logger.info("[job %s] completed %s", self.name, processStatus)
435-
436-
if _logger.isEnabledFor(logging.DEBUG):
437-
_logger.debug("[job %s] outputs %s", self.name, json_dumps(outputs, indent=4))
438-
439-
if self.generatemapper is not None and runtimeContext.secret_store is not None:
440-
# Delete any runtime-generated files containing secrets.
441-
for _, p in self.generatemapper.items():
442-
if p.type == "CreateFile":
443-
if runtimeContext.secret_store.has_secret(p.resolved):
444-
host_outdir = self.outdir
445-
container_outdir = self.builder.outdir
446-
host_outdir_tgt = p.target
447-
if p.target.startswith(container_outdir + "/"):
448-
host_outdir_tgt = os.path.join(
449-
host_outdir, p.target[len(container_outdir) + 1 :]
450-
)
451-
os.remove(host_outdir_tgt)
452-
453-
if runtimeContext.workflow_eval_lock is None:
454-
raise WorkflowException("runtimeContext.workflow_eval_lock must not be None")
455-
456-
if self.output_callback:
457-
with runtimeContext.workflow_eval_lock:
458-
self.output_callback(outputs, processStatus)
459-
460-
if runtimeContext.rm_tmpdir and self.stagedir is not None and os.path.exists(self.stagedir):
461-
_logger.debug(
462-
"[job %s] Removing input staging directory %s",
463-
self.name,
464-
self.stagedir,
465-
)
466-
shutil.rmtree(self.stagedir, True)
422+
finally:
423+
if (
424+
runtimeContext.research_obj is not None
425+
and self.prov_obj is not None
426+
and runtimeContext.process_run_id is not None
427+
):
428+
# creating entities for the outputs produced by each step (in the provenance document)
429+
self.prov_obj.record_process_end(
430+
str(self.name),
431+
runtimeContext.process_run_id,
432+
outputs,
433+
datetime.datetime.now(),
434+
)
435+
if processStatus != "success":
436+
_logger.warning("[job %s] completed %s", self.name, processStatus)
437+
else:
438+
_logger.info("[job %s] completed %s", self.name, processStatus)
439+
440+
if _logger.isEnabledFor(logging.DEBUG):
441+
_logger.debug("[job %s] outputs %s", self.name, json_dumps(outputs, indent=4))
442+
443+
if self.generatemapper is not None and runtimeContext.secret_store is not None:
444+
# Delete any runtime-generated files containing secrets.
445+
for _, p in self.generatemapper.items():
446+
if p.type == "CreateFile":
447+
if runtimeContext.secret_store.has_secret(p.resolved):
448+
host_outdir = self.outdir
449+
container_outdir = self.builder.outdir
450+
host_outdir_tgt = p.target
451+
if p.target.startswith(container_outdir + "/"):
452+
host_outdir_tgt = os.path.join(
453+
host_outdir, p.target[len(container_outdir) + 1 :]
454+
)
455+
os.remove(host_outdir_tgt)
456+
457+
if runtimeContext.workflow_eval_lock is None:
458+
raise WorkflowException("runtimeContext.workflow_eval_lock must not be None")
459+
460+
if self.output_callback:
461+
with runtimeContext.workflow_eval_lock:
462+
self.output_callback(outputs, processStatus)
463+
464+
if runtimeContext.rm_tmpdir and self.stagedir is not None and os.path.exists(self.stagedir):
465+
_logger.debug(
466+
"[job %s] Removing input staging directory %s",
467+
self.name,
468+
self.stagedir,
469+
)
470+
shutil.rmtree(self.stagedir, True)
467471

468-
if runtimeContext.rm_tmpdir:
469-
_logger.debug("[job %s] Removing temporary directory %s", self.name, self.tmpdir)
470-
shutil.rmtree(self.tmpdir, True)
472+
if runtimeContext.rm_tmpdir:
473+
_logger.debug("[job %s] Removing temporary directory %s", self.name, self.tmpdir)
474+
shutil.rmtree(self.tmpdir, True)
471475

472476
@abstractmethod
473477
def _required_env(self) -> Dict[str, str]:

cwltool/workflow.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -409,12 +409,13 @@ def receive_output(
409409
processStatus: str,
410410
) -> None:
411411
output = {}
412-
for i in self.tool["outputs"]:
413-
field = shortname(i["id"])
414-
if field in jobout:
415-
output[i["id"]] = jobout[field]
416-
else:
417-
processStatus = "permanentFail"
412+
if processStatus != "killed":
413+
for i in self.tool["outputs"]:
414+
field = shortname(i["id"])
415+
if field in jobout:
416+
output[i["id"]] = jobout[field]
417+
else:
418+
processStatus = "permanentFail"
418419
output_callback(output, processStatus)
419420

420421
def job(

cwltool/workflow_job.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -552,24 +552,25 @@ def receive_output(
552552
jobout: CWLObjectType,
553553
processStatus: str,
554554
) -> None:
555-
for i in outputparms:
556-
if "id" in i:
557-
iid = cast(str, i["id"])
558-
if iid in jobout:
559-
self.state[iid] = WorkflowStateItem(i, jobout[iid], processStatus)
560-
else:
561-
_logger.error("[%s] Output is missing expected field %s", step.name, iid)
562-
processStatus = "permanentFail"
563555
if _logger.isEnabledFor(logging.DEBUG):
564556
_logger.debug("[%s] produced output %s", step.name, json_dumps(jobout, indent=4))
557+
if processStatus != "killed":
558+
for i in outputparms:
559+
if "id" in i:
560+
iid = cast(str, i["id"])
561+
if iid in jobout:
562+
self.state[iid] = WorkflowStateItem(i, jobout[iid], processStatus)
563+
else:
564+
_logger.error("[%s] Output is missing expected field %s", step.name, iid)
565+
processStatus = "permanentFail"
565566

566-
if processStatus not in ("success", "skipped"):
567-
if self.processStatus != "permanentFail":
568-
self.processStatus = processStatus
567+
if processStatus not in ("success", "skipped"):
568+
if self.processStatus != "permanentFail":
569+
self.processStatus = processStatus
569570

570-
_logger.warning("[%s] completed %s", step.name, processStatus)
571-
else:
572-
_logger.info("[%s] completed %s", step.name, processStatus)
571+
_logger.warning("[%s] completed %s", step.name, processStatus)
572+
else:
573+
_logger.info("[%s] completed %s", step.name, processStatus)
573574

574575
step.completed = True
575576
# Release the iterable related to this step to

0 commit comments

Comments
 (0)