Skip to content

Commit 7ad7834

Browse files
committed
additional thread safety
1 parent 0f1be0c commit 7ad7834

File tree

1 file changed

+26
-19
lines changed

1 file changed

+26
-19
lines changed

cwltool/executors.py

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ def __init__(self): # type: () -> None
184184
self.threads = set() # type: Set[threading.Thread]
185185
self.exceptions = [] # type: List[WorkflowException]
186186
self.pending_jobs = [] # type: List[JobBase]
187+
self.pending_jobs_lock = threading.Lock()
187188

188189
self.max_ram = psutil.virtual_memory().available / 2**20
189190
self.max_cores = psutil.cpu_count()
@@ -217,43 +218,48 @@ def run_job(self,
217218
""" Execute a single Job in a seperate thread. """
218219

219220
if job is not None:
220-
self.pending_jobs.append(job)
221+
with self.pending_jobs_lock:
222+
self.pending_jobs.append(job)
221223

222224
while self.pending_jobs:
223-
job = self.pending_jobs[0]
224-
if isinstance(job, JobBase):
225-
if ((self.allocated_ram + job.builder.resources["ram"]) > self.max_ram or
226-
(self.allocated_cores + job.builder.resources["cores"]) > self.max_cores):
227-
return
228-
229-
self.pending_jobs.pop(0)
230-
231-
def runner():
225+
with self.pending_jobs_lock:
226+
job = self.pending_jobs[0]
227+
if isinstance(job, JobBase):
228+
if ((self.allocated_ram + job.builder.resources["ram"])
229+
> self.max_ram or
230+
(self.allocated_cores + job.builder.resources["cores"])
231+
> self.max_cores):
232+
return
233+
self.pending_jobs.remove(job)
234+
235+
def runner(my_job, my_runtime_context):
232236
""" Job running thread. """
233237
try:
234-
job.run(runtime_context)
238+
my_job.run(my_runtime_context)
235239
except WorkflowException as err:
236240
_logger.exception("Got workflow error")
237241
self.exceptions.append(err)
238242
except Exception as err: # pylint: disable=broad-except
239243
_logger.exception("Got workflow error")
240244
self.exceptions.append(WorkflowException(Text(err)))
241245
finally:
242-
with runtime_context.workflow_eval_lock:
243-
self.threads.remove(thread)
244-
if isinstance(job, JobBase):
245-
self.allocated_ram -= job.builder.resources["ram"]
246-
self.allocated_cores -= job.builder.resources["cores"]
247-
runtime_context.workflow_eval_lock.notifyAll()
248-
249-
thread = threading.Thread(target=runner)
246+
with my_runtime_context.workflow_eval_lock:
247+
self.threads.remove(threading.current_thread())
248+
if isinstance(my_job, JobBase):
249+
self.allocated_ram -= my_job.builder.resources["ram"]
250+
self.allocated_cores -= my_job.builder.resources["cores"]
251+
my_runtime_context.workflow_eval_lock.notifyAll()
252+
253+
thread = threading.Thread(
254+
target=runner, args=(job, runtime_context))
250255
thread.daemon = True
251256
self.threads.add(thread)
252257
if isinstance(job, JobBase):
253258
self.allocated_ram += job.builder.resources["ram"]
254259
self.allocated_cores += job.builder.resources["cores"]
255260
thread.start()
256261

262+
257263
def wait_for_next_completion(self, runtimeContext): # type: (RuntimeContext) -> None
258264
""" Wait for jobs to finish. """
259265
if runtimeContext.workflow_eval_lock is not None:
@@ -294,3 +300,4 @@ def run_jobs(self,
294300

295301
while self.threads:
296302
self.wait_for_next_completion(runtime_context)
303+
runtime_context.workflow_eval_lock.release()

0 commit comments

Comments
 (0)