Skip to content

Commit b52333d

Browse files
AlexTatemr-c
authored andcommitted
JobBase._execute() previously skipped some important post-job actions when exiting due to kill switch. Those actions have been placed under a finally block so that they are executed by both the "switching" job and the "responding" jobs.
However, some of these post actions added a lot of redundant and unhelpful terminal output when handling jobs killed DUE TO the kill switch. The verbose output obscured the error's cause which isn't helpful. Two new process statuses have been added in order to better handle the event: - indeterminant: a default value for processStatus. - killed: the job was killed due to the kill switch being set. This approach also means that partial outputs aren't collected from jobs that have been killed.
1 parent beaa3f4 commit b52333d

File tree

3 files changed

+77
-71
lines changed

3 files changed

+77
-71
lines changed

cwltool/job.py

Lines changed: 55 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,7 @@ def _execute(
284284
"{}".format(runtimeContext)
285285
)
286286
outputs: CWLObjectType = {}
287+
processStatus = "indeterminate"
287288
try:
288289
stdin_path = None
289290
if self.stdin is not None:
@@ -355,6 +356,7 @@ def stderr_stdout_log_path(
355356

356357
if processStatus != "success":
357358
if runtimeContext.kill_switch.is_set():
359+
processStatus = "killed"
358360
return
359361
elif rcode < 0:
360362
_logger.warning(
@@ -398,62 +400,64 @@ def stderr_stdout_log_path(
398400
_logger.error("[job %s] Job error:\n%s", self.name, str(err))
399401
processStatus = "permanentFail"
400402
except WorkflowKillSwitch:
403+
processStatus = "permanentFail"
401404
raise
402405
except Exception:
403406
_logger.exception("Exception while running job")
404407
processStatus = "permanentFail"
405-
if (
406-
runtimeContext.research_obj is not None
407-
and self.prov_obj is not None
408-
and runtimeContext.process_run_id is not None
409-
):
410-
# creating entities for the outputs produced by each step (in the provenance document)
411-
self.prov_obj.record_process_end(
412-
str(self.name),
413-
runtimeContext.process_run_id,
414-
outputs,
415-
datetime.datetime.now(),
416-
)
417-
if processStatus != "success":
418-
_logger.warning("[job %s] completed %s", self.name, processStatus)
419-
else:
420-
_logger.info("[job %s] completed %s", self.name, processStatus)
421-
422-
if _logger.isEnabledFor(logging.DEBUG):
423-
_logger.debug("[job %s] outputs %s", self.name, json_dumps(outputs, indent=4))
424-
425-
if self.generatemapper is not None and runtimeContext.secret_store is not None:
426-
# Delete any runtime-generated files containing secrets.
427-
for _, p in self.generatemapper.items():
428-
if p.type == "CreateFile":
429-
if runtimeContext.secret_store.has_secret(p.resolved):
430-
host_outdir = self.outdir
431-
container_outdir = self.builder.outdir
432-
host_outdir_tgt = p.target
433-
if p.target.startswith(container_outdir + "/"):
434-
host_outdir_tgt = os.path.join(
435-
host_outdir, p.target[len(container_outdir) + 1 :]
436-
)
437-
os.remove(host_outdir_tgt)
438-
439-
if runtimeContext.workflow_eval_lock is None:
440-
raise WorkflowException("runtimeContext.workflow_eval_lock must not be None")
441-
442-
if self.output_callback:
443-
with runtimeContext.workflow_eval_lock:
444-
self.output_callback(outputs, processStatus)
445-
446-
if runtimeContext.rm_tmpdir and self.stagedir is not None and os.path.exists(self.stagedir):
447-
_logger.debug(
448-
"[job %s] Removing input staging directory %s",
449-
self.name,
450-
self.stagedir,
451-
)
452-
shutil.rmtree(self.stagedir, True)
408+
finally:
409+
if (
410+
runtimeContext.research_obj is not None
411+
and self.prov_obj is not None
412+
and runtimeContext.process_run_id is not None
413+
):
414+
# creating entities for the outputs produced by each step (in the provenance document)
415+
self.prov_obj.record_process_end(
416+
str(self.name),
417+
runtimeContext.process_run_id,
418+
outputs,
419+
datetime.datetime.now(),
420+
)
421+
if processStatus != "success":
422+
_logger.warning("[job %s] completed %s", self.name, processStatus)
423+
else:
424+
_logger.info("[job %s] completed %s", self.name, processStatus)
425+
426+
if _logger.isEnabledFor(logging.DEBUG):
427+
_logger.debug("[job %s] outputs %s", self.name, json_dumps(outputs, indent=4))
428+
429+
if self.generatemapper is not None and runtimeContext.secret_store is not None:
430+
# Delete any runtime-generated files containing secrets.
431+
for _, p in self.generatemapper.items():
432+
if p.type == "CreateFile":
433+
if runtimeContext.secret_store.has_secret(p.resolved):
434+
host_outdir = self.outdir
435+
container_outdir = self.builder.outdir
436+
host_outdir_tgt = p.target
437+
if p.target.startswith(container_outdir + "/"):
438+
host_outdir_tgt = os.path.join(
439+
host_outdir, p.target[len(container_outdir) + 1 :]
440+
)
441+
os.remove(host_outdir_tgt)
442+
443+
if runtimeContext.workflow_eval_lock is None:
444+
raise WorkflowException("runtimeContext.workflow_eval_lock must not be None")
445+
446+
if self.output_callback:
447+
with runtimeContext.workflow_eval_lock:
448+
self.output_callback(outputs, processStatus)
449+
450+
if runtimeContext.rm_tmpdir and self.stagedir is not None and os.path.exists(self.stagedir):
451+
_logger.debug(
452+
"[job %s] Removing input staging directory %s",
453+
self.name,
454+
self.stagedir,
455+
)
456+
shutil.rmtree(self.stagedir, True)
453457

454-
if runtimeContext.rm_tmpdir:
455-
_logger.debug("[job %s] Removing temporary directory %s", self.name, self.tmpdir)
456-
shutil.rmtree(self.tmpdir, True)
458+
if runtimeContext.rm_tmpdir:
459+
_logger.debug("[job %s] Removing temporary directory %s", self.name, self.tmpdir)
460+
shutil.rmtree(self.tmpdir, True)
457461

458462
@abstractmethod
459463
def _required_env(self) -> dict[str, str]:

cwltool/workflow.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -401,12 +401,13 @@ def receive_output(
401401
processStatus: str,
402402
) -> None:
403403
output = {}
404-
for i in self.tool["outputs"]:
405-
field = shortname(i["id"])
406-
if field in jobout:
407-
output[i["id"]] = jobout[field]
408-
else:
409-
processStatus = "permanentFail"
404+
if processStatus != "killed":
405+
for i in self.tool["outputs"]:
406+
field = shortname(i["id"])
407+
if field in jobout:
408+
output[i["id"]] = jobout[field]
409+
else:
410+
processStatus = "permanentFail"
410411
output_callback(output, processStatus)
411412

412413
def job(

cwltool/workflow_job.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -548,24 +548,25 @@ def receive_output(
548548
jobout: CWLObjectType,
549549
processStatus: str,
550550
) -> None:
551-
for i in outputparms:
552-
if "id" in i:
553-
iid = cast(str, i["id"])
554-
if iid in jobout:
555-
self.state[iid] = WorkflowStateItem(i, jobout[iid], processStatus)
556-
else:
557-
_logger.error("[%s] Output is missing expected field %s", step.name, iid)
558-
processStatus = "permanentFail"
559551
if _logger.isEnabledFor(logging.DEBUG):
560552
_logger.debug("[%s] produced output %s", step.name, json_dumps(jobout, indent=4))
553+
if processStatus != "killed":
554+
for i in outputparms:
555+
if "id" in i:
556+
iid = cast(str, i["id"])
557+
if iid in jobout:
558+
self.state[iid] = WorkflowStateItem(i, jobout[iid], processStatus)
559+
else:
560+
_logger.error("[%s] Output is missing expected field %s", step.name, iid)
561+
processStatus = "permanentFail"
561562

562-
if processStatus not in ("success", "skipped"):
563-
if self.processStatus != "permanentFail":
564-
self.processStatus = processStatus
563+
if processStatus not in ("success", "skipped"):
564+
if self.processStatus != "permanentFail":
565+
self.processStatus = processStatus
565566

566-
_logger.warning("[%s] completed %s", step.name, processStatus)
567-
else:
568-
_logger.info("[%s] completed %s", step.name, processStatus)
567+
_logger.warning("[%s] completed %s", step.name, processStatus)
568+
else:
569+
_logger.info("[%s] completed %s", step.name, processStatus)
569570

570571
step.completed = True
571572
# Release the iterable related to this step to

0 commit comments

Comments
 (0)