Skip to content

Commit 45e9692

Browse files
author
Peter Amstutz
committed
Correct locking for multiprocess executor.
1 parent 2a8af96 commit 45e9692

File tree

4 files changed

+20
-9
lines changed

4 files changed

+20
-9
lines changed

cwltool/context.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import copy
2+
import threading
3+
24
from .utils import DEFAULT_TMP_PREFIX
35
from .stdfsaccess import StdFsAccess
46
from typing import (Any, Callable, Dict, # pylint: disable=unused-import
@@ -108,6 +110,8 @@ def __init__(self, kwargs=None):
108110
self.cidfile_dir = None
109111
self.cidfile_prefix = None
110112

113+
self.workflow_eval_lock = None # type: threading.Condition
114+
111115
super(RuntimeContext, self).__init__(kwargs)
112116

113117

cwltool/docker.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,4 +339,7 @@ def create_runtime(self, env, runtimeContext):
339339
for t, v in self.environment.items():
340340
runtime.append(u"--env=%s=%s" % (t, v))
341341

342+
runtime.append("--cpu-count=%d" % self.builder.resources.cores)
343+
runtime.append("--memory=%dm" % self.builder.resources.ram)
344+
342345
return runtime

cwltool/executors.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ def execute(self,
6969
self.output_dirs.add(runtimeContext.outdir)
7070
runtimeContext.mutation_manager = MutationManager()
7171
runtimeContext.toplevel = True
72+
runtimeContext.workflow_eval_lock = threading.Condition(threading.RLock())
7273

7374
job_reqs = None
7475
if "cwl:requirements" in job_order_object:
@@ -148,15 +149,19 @@ def runner():
148149
self.exceptions.append(err)
149150
except Exception as err:
150151
self.exceptions.append(WorkflowException(Text(err)))
151-
self.threads.remove(thread)
152+
finally:
153+
with runtimeContext.workflow_eval_lock:
154+
self.threads.remove(thread)
155+
runtimeContext.notifyAll()
152156

153157
thread = threading.Thread(target=runner)
154158
thread.daemon = True
155159
self.threads.add(thread)
156160
thread.start()
157161

158-
def wait_for_next_completion(self): # type: () -> None
159-
""" Check for exceptions while waiting for the jobs to finish. """
162+
def wait_for_next_completion(self, runtimeContext): # type: () -> None
163+
""" Wait for jobs to finish. """
164+
runtimeContext.workflow_eval_lock.wait()
160165
if self.exceptions:
161166
raise self.exceptions[0]
162167

@@ -169,19 +174,20 @@ def run_jobs(self,
169174

170175
jobiter = process.job(job_order_object, self.output_callback, runtimeContext)
171176

177+
runtimeContext.workflow_eval_lock.acquire()
172178
for job in jobiter:
173-
if job:
179+
if job is not None:
174180
if runtimeContext.builder is not None:
175181
job.builder = runtimeContext.builder
176182
if job.outdir:
177183
self.output_dirs.add(job.outdir)
178184
self.run_job(job, runtimeContext)
179185
else:
180186
if self.threads:
181-
self.wait_for_next_completion()
187+
self.wait_for_next_completion(runtimeContext)
182188
else:
183189
logger.error("Workflow cannot make any more progress.")
184190
break
185191

186192
while self.threads:
187-
self.wait_for_next_completion()
193+
self.wait_for_next_completion(runtimeContext)

cwltool/job.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@
3333

3434
needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""")
3535

36-
job_output_lock = Lock()
37-
3836
FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "0") == "1"
3937

4038
SHELL_COMMAND_TEMPLATE = """#!/bin/bash
@@ -326,7 +324,7 @@ def _execute(self,
326324
host_outdir, p.target[len(container_outdir)+1:])
327325
os.remove(host_outdir_tgt)
328326

329-
with job_output_lock:
327+
with runtimeContext.workflow_eval_lock:
330328
self.output_callback(outputs, processStatus)
331329

332330
if self.stagedir and os.path.exists(self.stagedir):

0 commit comments

Comments
 (0)