Skip to content

Commit a9a2ab6

Browse files
AlexTatemr-c
authored andcommitted
Propagation of the WorkflowKillSwitch exception stops once it reaches an executor. The workflow_eval_lock release had to be moved to the finally block in MultithreadedJobExecutor.run_jobs(). Otherwise, TaskQueue threads running MultithreadedJobExecutor._runner() will never join() because _runner() waits indefinitely for the workflow_eval_lock in its own finally block.
1 parent de73938 commit a9a2ab6

File tree

1 file changed

+12
-3
lines changed

1 file changed

+12
-3
lines changed

cwltool/executors.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from .context import RuntimeContext, getdefault
2121
from .cuda import cuda_version_and_device_count
2222
from .cwlprov.provenance_profile import ProvenanceProfile
23-
from .errors import WorkflowException
23+
from .errors import WorkflowException, WorkflowKillSwitch
2424
from .job import JobBase
2525
from .loghandler import _logger
2626
from .mutation import MutationManager
@@ -251,6 +251,11 @@ def run_jobs(
251251
WorkflowException,
252252
): # pylint: disable=try-except-raise
253253
raise
254+
except WorkflowKillSwitch as err:
255+
_logger.error(
256+
f"Workflow kill switch activated by [job {err.job_id}] "
257+
f"because on-error={runtime_context.on_error}"
258+
)
254259
except Exception as err:
255260
logger.exception("Got workflow error")
256261
raise WorkflowException(str(err)) from err
@@ -323,6 +328,11 @@ def _runner(
323328
except WorkflowException as err:
324329
_logger.exception(f"Got workflow error: {err}")
325330
self.exceptions.append(err)
331+
except WorkflowKillSwitch as err:
332+
_logger.error(
333+
f"Workflow kill switch activated by [job {err.job_id}] "
334+
f"because on-error={runtime_context.on_error}"
335+
)
326336
except Exception as err: # pylint: disable=broad-except
327337
_logger.exception(f"Got workflow error: {err}")
328338
self.exceptions.append(WorkflowException(str(err)))
@@ -457,9 +467,8 @@ def run_jobs(
457467
while self.taskqueue.in_flight > 0:
458468
self.wait_for_next_completion(runtime_context)
459469
self.run_job(None, runtime_context)
460-
461-
runtime_context.workflow_eval_lock.release()
462470
finally:
471+
runtime_context.workflow_eval_lock.release()
463472
self.taskqueue.drain()
464473
self.taskqueue.join()
465474

0 commit comments

Comments
 (0)