Skip to content

Commit 572e865

Browse files
AlexTatemr-c
authored andcommitted
Adding inputs for new "kill" mode of the on-error parameter
1 parent c3c92eb commit 572e865

File tree

3 files changed

+10
-5
lines changed

3 files changed

+10
-5
lines changed

cwltool/argparser.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -596,10 +596,11 @@ def arg_parser() -> argparse.ArgumentParser:
596596
parser.add_argument(
597597
"--on-error",
598598
help="Desired workflow behavior when a step fails. One of 'stop' (do "
599-
"not submit any more steps) or 'continue' (may submit other steps that "
600-
"are not downstream from the error). Default is 'stop'.",
599+
"not submit any more steps), 'continue' (may submit other steps that "
600+
"are not downstream from the error), or kill (same as stop, but also "
601+
"terminates running jobs in the active step(s)). Default is 'stop'.",
601602
default="stop",
602-
choices=("stop", "continue"),
603+
choices=("stop", "continue", "kill"),
603604
)
604605

605606
checkgroup = parser.add_mutually_exclusive_group()

cwltool/context.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ def __init__(self, kwargs: Optional[dict[str, Any]] = None) -> None:
189189
self.default_stderr: Optional[Union[IO[bytes], TextIO]] = None
190190
self.validate_only: bool = False
191191
self.validate_stdout: Optional["SupportsWrite[str]"] = None
192+
self.kill_switch = threading.Event()
192193
super().__init__(kwargs)
193194
if self.tmp_outdir_prefix == "":
194195
self.tmp_outdir_prefix = self.tmpdir_prefix

cwltool/job.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,7 @@ def stderr_stdout_log_path(
327327
env=env,
328328
cwd=self.outdir,
329329
make_job_dir=lambda: runtimeContext.create_outdir(),
330+
kill_switch=runtimeContext.kill_switch,
330331
job_script_contents=job_script_contents,
331332
timelimit=self.timelimit,
332333
name=self.name,
@@ -492,8 +493,8 @@ def prepare_environment(
492493
# Set on ourselves
493494
self.environment = env
494495

495-
def process_monitor(self, sproc: "subprocess.Popen[str]") -> None:
496-
"""Watch a process, logging its max memory usage."""
496+
def process_monitor(self, sproc: "subprocess.Popen[str]", kill_switch: threading.Event) -> None:
497+
"""Watch a process, logging its max memory usage or terminating it if kill_switch is activated."""
497498
monitor = psutil.Process(sproc.pid)
498499
# Value must be list rather than integer to utilise pass-by-reference in python
499500
memory_usage: MutableSequence[Optional[int]] = [None]
@@ -835,6 +836,7 @@ def docker_monitor(
835836
cleanup_cidfile: bool,
836837
docker_exe: str,
837838
process: "subprocess.Popen[str]",
839+
kill_switch: threading.Event,
838840
) -> None:
839841
"""Record memory usage of the running Docker container."""
840842
# Todo: consider switching to `docker create` / `docker start`
@@ -911,6 +913,7 @@ def _job_popen(
911913
env: Mapping[str, str],
912914
cwd: str,
913915
make_job_dir: Callable[[], str],
916+
kill_switch: threading.Event,
914917
job_script_contents: Optional[str] = None,
915918
timelimit: Optional[int] = None,
916919
name: Optional[str] = None,

0 commit comments

Comments
 (0)