Skip to content

Commit beaa3f4

Browse files
AlexTatemr-c
authored andcommitted
This commit brings kill switch initialization and monitoring to the TaskQueue. This helps to better synchronize the kill switch event and avoid adding/executing tasks after the switch has been set.
This approach is tighter than my previous draft, but a race condition still exists where a task might be started after the kill switch has been set and announced. If this happens then the leaked job's monitor function will kill it and the subprocess' lifespan will be a maximum of the monitor's timer interval (currently 1 second). So when this rare event happens, the console output will be potentially confusing since it will show a new job starting after the kill switch has been announced.
1 parent 0a00582 commit beaa3f4

File tree

2 files changed

+14
-4
lines changed

2 files changed

+14
-4
lines changed

cwltool/executors.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ def run_jobs(
439439
logger: logging.Logger,
440440
runtime_context: RuntimeContext,
441441
) -> None:
442-
self.taskqueue: TaskQueue = TaskQueue(threading.Lock(), int(math.ceil(self.max_cores)))
442+
self.taskqueue: TaskQueue = TaskQueue(threading.Lock(), int(math.ceil(self.max_cores)), runtime_context)
443443
try:
444444
jobiter = process.job(job_order_object, self.output_callback, runtime_context)
445445

cwltool/task_queue.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from typing import Callable, Optional
99

1010
from .errors import WorkflowKillSwitch
11+
from .context import RuntimeContext
1112
from .loghandler import _logger
1213

1314

@@ -34,7 +35,7 @@ class TaskQueue:
3435
in_flight: int = 0
3536
"""The number of tasks in the queue."""
3637

37-
def __init__(self, lock: threading.Lock, thread_count: int):
38+
def __init__(self, lock: threading.Lock, thread_count: int, runtime_context: RuntimeContext):
3839
"""Create a new task queue using the specified lock and number of threads."""
3940
self.thread_count = thread_count
4041
self.task_queue: queue.Queue[Optional[Callable[[], None]]] = queue.Queue(
@@ -44,6 +45,11 @@ def __init__(self, lock: threading.Lock, thread_count: int):
4445
self.lock = lock
4546
self.error: Optional[BaseException] = None
4647

48+
if runtime_context.kill_switch is None:
49+
self.kill_switch = runtime_context.kill_switch = threading.Event()
50+
else:
51+
self.kill_switch = runtime_context.kill_switch
52+
4753
for _r in range(0, self.thread_count):
4854
t = threading.Thread(target=self._task_queue_func)
4955
self.task_queue_threads.append(t)
@@ -52,11 +58,12 @@ def __init__(self, lock: threading.Lock, thread_count: int):
5258
def _task_queue_func(self) -> None:
5359
while True:
5460
task = self.task_queue.get()
55-
if task is None:
61+
if task is None or self.kill_switch.is_set():
5662
return
5763
try:
5864
task()
5965
except WorkflowKillSwitch:
66+
self.kill_switch.set()
6067
self.drain()
6168
break
6269
except BaseException as e: # noqa: B036
@@ -96,7 +103,10 @@ def add(
96103
try:
97104
if unlock is not None:
98105
unlock.release()
99-
if check_done is not None and check_done.is_set():
106+
if (
107+
(check_done is not None and check_done.is_set())
108+
or self.kill_switch.is_set()
109+
):
100110
with self.lock:
101111
self.in_flight -= 1
102112
return

0 commit comments

Comments
 (0)