Skip to content

Commit 1597838

Browse files
AlexTatemr-c
authored andcommitted
Adding inputs for new "kill" mode of the on-error parameter
1 parent dd2e1ba commit 1597838

File tree

3 files changed

+10
-5
lines changed

3 files changed

+10
-5
lines changed

cwltool/argparser.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -607,10 +607,11 @@ def arg_parser() -> argparse.ArgumentParser:
607607
parser.add_argument(
608608
"--on-error",
609609
help="Desired workflow behavior when a step fails. One of 'stop' (do "
610-
"not submit any more steps) or 'continue' (may submit other steps that "
611-
"are not downstream from the error). Default is 'stop'.",
610+
"not submit any more steps), 'continue' (may submit other steps that "
611+
"are not downstream from the error), or kill (same as stop, but also "
612+
"terminates running jobs in the active step(s)). Default is 'stop'.",
612613
default="stop",
613-
choices=("stop", "continue"),
614+
choices=("stop", "continue", "kill"),
614615
)
615616

616617
checkgroup = parser.add_mutually_exclusive_group()

cwltool/context.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ def __init__(self, kwargs: Optional[Dict[str, Any]] = None) -> None:
200200
self.default_stderr: Optional[Union[IO[bytes], TextIO]] = None
201201
self.validate_only: bool = False
202202
self.validate_stdout: Optional[Union[IO[bytes], TextIO, IO[str]]] = None
203+
self.kill_switch = threading.Event()
203204
super().__init__(kwargs)
204205
if self.tmp_outdir_prefix == "":
205206
self.tmp_outdir_prefix = self.tmpdir_prefix

cwltool/job.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,7 @@ def stderr_stdout_log_path(
341341
env=env,
342342
cwd=self.outdir,
343343
make_job_dir=lambda: runtimeContext.create_outdir(),
344+
kill_switch=runtimeContext.kill_switch,
344345
job_script_contents=job_script_contents,
345346
timelimit=self.timelimit,
346347
name=self.name,
@@ -506,8 +507,8 @@ def prepare_environment(
506507
# Set on ourselves
507508
self.environment = env
508509

509-
def process_monitor(self, sproc: "subprocess.Popen[str]") -> None:
510-
"""Watch a process, logging its max memory usage."""
510+
def process_monitor(self, sproc: "subprocess.Popen[str]", kill_switch: threading.Event) -> None:
511+
"""Watch a process, logging its max memory usage or terminating it if kill_switch is activated."""
511512
monitor = psutil.Process(sproc.pid)
512513
# Value must be list rather than integer to utilise pass-by-reference in python
513514
memory_usage: MutableSequence[Optional[int]] = [None]
@@ -849,6 +850,7 @@ def docker_monitor(
849850
cleanup_cidfile: bool,
850851
docker_exe: str,
851852
process: "subprocess.Popen[str]",
853+
kill_switch: threading.Event,
852854
) -> None:
853855
"""Record memory usage of the running Docker container."""
854856
# Todo: consider switching to `docker create` / `docker start`
@@ -925,6 +927,7 @@ def _job_popen(
925927
env: Mapping[str, str],
926928
cwd: str,
927929
make_job_dir: Callable[[], str],
930+
kill_switch: threading.Event,
928931
job_script_contents: Optional[str] = None,
929932
timelimit: Optional[int] = None,
930933
name: Optional[str] = None,

0 commit comments

Comments
 (0)