Skip to content

Commit 0a00582

Browse files
committed
Don't create runtime_context.kill_switch by default
So that the runtime_context object can still be pickled. Other cleanups
1 parent a9a2ab6 commit 0a00582

File tree

5 files changed

+20
-10
lines changed

5 files changed

+20
-10
lines changed

cwltool/context.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ def __init__(self, kwargs: Optional[dict[str, Any]] = None) -> None:
172172
self.select_resources: Optional[select_resources_callable] = None
173173
self.eval_timeout: float = 60
174174
self.postScatterEval: Optional[Callable[[CWLObjectType], Optional[CWLObjectType]]] = None
175-
self.on_error: Union[Literal["stop"], Literal["continue"]] = "stop"
175+
self.on_error: Union[Literal["stop"], Literal["continue"], Literal["kill"]] = "stop"
176176
self.strict_memory_limit: bool = False
177177
self.strict_cpu_limit: bool = False
178178
self.cidfile_dir: Optional[str] = None
@@ -189,7 +189,7 @@ def __init__(self, kwargs: Optional[dict[str, Any]] = None) -> None:
189189
self.default_stderr: Optional[Union[IO[bytes], TextIO]] = None
190190
self.validate_only: bool = False
191191
self.validate_stdout: Optional["SupportsWrite[str]"] = None
192-
self.kill_switch = threading.Event()
192+
self.kill_switch: Optional[threading.Event] = None
193193
super().__init__(kwargs)
194194
if self.tmp_outdir_prefix == "":
195195
self.tmp_outdir_prefix = self.tmpdir_prefix

cwltool/errors.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@ class ArgumentException(Exception):
2323
class WorkflowKillSwitch(Exception):
2424
"""When processStatus != "success" and on-error=kill, raise this exception."""
2525

26-
def __init__(self, job_id, rcode):
26+
def __init__(self, job_id: str, rcode: int) -> None:
27+
"""Record the job identifier and the error code."""
2728
self.job_id = job_id
2829
self.rcode = rcode
2930

30-
def __str__(self):
31-
return f'[job {self.job_id}] activated kill switch with return code {self.rcode}'
31+
def __str__(self) -> str:
32+
"""Represent this exception as a string."""
33+
return f"[job {self.job_id}] activated kill switch with return code {self.rcode}"

cwltool/executors.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,8 @@ def run_jobs(
468468
self.wait_for_next_completion(runtime_context)
469469
self.run_job(None, runtime_context)
470470
finally:
471-
runtime_context.workflow_eval_lock.release()
471+
if (lock := runtime_context.workflow_eval_lock) is not None:
472+
lock.release()
472473
self.taskqueue.drain()
473474
self.taskqueue.join()
474475

cwltool/job.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,9 @@ def _execute(
217217
runtime: list[str],
218218
env: MutableMapping[str, str],
219219
runtimeContext: RuntimeContext,
220-
monitor_function: Optional[Callable[["subprocess.Popen[str]"], None]] = None,
220+
monitor_function: Optional[
221+
Callable[["subprocess.Popen[str]", threading.Event], None]
222+
] = None,
221223
) -> None:
222224
"""Execute the tool, either directly or via script.
223225
@@ -319,6 +321,10 @@ def stderr_stdout_log_path(
319321
builder: Optional[Builder] = getattr(self, "builder", None)
320322
if builder is not None:
321323
job_script_contents = builder.build_job_script(commands)
324+
if runtimeContext.kill_switch is None:
325+
runtimeContext.kill_switch = kill_switch = threading.Event()
326+
else:
327+
kill_switch = runtimeContext.kill_switch
322328
rcode = _job_popen(
323329
commands,
324330
stdin_path=stdin_path,
@@ -327,7 +333,7 @@ def stderr_stdout_log_path(
327333
env=env,
328334
cwd=self.outdir,
329335
make_job_dir=lambda: runtimeContext.create_outdir(),
330-
kill_switch=runtimeContext.kill_switch,
336+
kill_switch=kill_switch,
331337
job_script_contents=job_script_contents,
332338
timelimit=self.timelimit,
333339
name=self.name,
@@ -533,7 +539,8 @@ def monitor_kill_switch() -> None:
533539
nonlocal ks_tm
534540
if kill_switch.is_set():
535541
_logger.error("[job %s] terminating by kill switch", self.name)
536-
if sproc.stdin: sproc.stdin.close()
542+
if sproc.stdin:
543+
sproc.stdin.close()
537544
sproc.terminate()
538545
else:
539546
ks_tm = Timer(interval=1, function=monitor_kill_switch)

cwltool/task_queue.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
import threading
88
from typing import Callable, Optional
99

10-
from .loghandler import _logger
1110
from .errors import WorkflowKillSwitch
11+
from .loghandler import _logger
1212

1313

1414
class TaskQueue:

0 commit comments

Comments
 (0)