Skip to content

Commit 5f455a4

Browse files
author
Peter Amstutz
committed
The multiprocess scheduler that cwltool has always deserved.
1 parent 45e9692 commit 5f455a4

File tree

4 files changed

+77
-22
lines changed

4 files changed

+77
-22
lines changed

cwltool/executors.py

Lines changed: 71 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import six
1111
from six import string_types
1212

13+
import psutil
14+
1315
from .builder import Builder # pylint: disable=unused-import
1416
from .errors import WorkflowException
1517
from .job import JobBase # pylint: disable=unused-import
@@ -129,35 +131,82 @@ class MultithreadedJobExecutor(JobExecutor):
129131
"""
130132
Experimental multi-threaded CWL executor.
131133
132-
Can easily overload a system as it does not do resource accounting.
134+
Does simple resource accounting, will not start a job unless it
135+
has cores / ram available, but does not make any attempt to
136+
optimize usage.
133137
"""
138+
134139
def __init__(self): # type: () -> None
135140
super(MultithreadedJobExecutor, self).__init__()
136141
self.threads = set() # type: Set[threading.Thread]
137142
self.exceptions = [] # type: List[WorkflowException]
143+
self.pending_jobs = []
144+
145+
self.max_ram = psutil.virtual_memory().total / 2**20
146+
self.max_cores = psutil.cpu_count()
147+
self.allocated_ram = 0
148+
self.allocated_cores = 0
149+
150+
def select_resources(self, request, builder):
151+
result = {}
152+
maxrsc = {
153+
"cores": self.max_cores,
154+
"ram": self.max_ram,
155+
"tmpdir": psutil.disk_usage(builder.tmpdir).free / 2**20,
156+
"outdir": psutil.disk_usage(builder.outdir).free / 2**20
157+
}
158+
for rsc in ("cores", "ram", "tmpdir", "outdir"):
159+
key = rsc + "Size" if rsc.endswith("dir") else rsc
160+
if request[rsc+"Min"] > maxrsc[rsc]:
161+
raise WorkflowException("Requested at least %d %s but only %d available", request[rsc+"Min"], rsc, maxrsc[rsc])
162+
if request[rsc+"Max"] < maxrsc[rsc]:
163+
result[key] = request[rsc+"Max"]
164+
else:
165+
result[key] = maxrsc[rsc]
166+
167+
return result
138168

139169
def run_job(self,
140170
job, # type: JobBase
141171
runtimeContext # type: RuntimeContext
142172
): # type: (...) -> None
143173
""" Execute a single Job in a seperate thread. """
144-
def runner():
145-
""" Job running thread. """
146-
try:
147-
job.run(runtimeContext)
148-
except WorkflowException as err:
149-
self.exceptions.append(err)
150-
except Exception as err:
151-
self.exceptions.append(WorkflowException(Text(err)))
152-
finally:
153-
with runtimeContext.workflow_eval_lock:
154-
self.threads.remove(thread)
155-
runtimeContext.notifyAll()
156-
157-
thread = threading.Thread(target=runner)
158-
thread.daemon = True
159-
self.threads.add(thread)
160-
thread.start()
174+
175+
if job is not None:
176+
self.pending_jobs.append(job)
177+
178+
while self.pending_jobs:
179+
job = self.pending_jobs[0]
180+
if isinstance(job, JobBase):
181+
if ((self.allocated_ram + job.builder.resources["ram"]) > self.max_ram or
182+
(self.allocated_cores + job.builder.resources["cores"]) > self.max_cores):
183+
return
184+
185+
self.pending_jobs.pop(0)
186+
187+
def runner():
188+
""" Job running thread. """
189+
try:
190+
job.run(runtimeContext)
191+
except WorkflowException as err:
192+
self.exceptions.append(err)
193+
except Exception as err:
194+
self.exceptions.append(WorkflowException(Text(err)))
195+
finally:
196+
with runtimeContext.workflow_eval_lock:
197+
self.threads.remove(thread)
198+
if isinstance(job, JobBase):
199+
self.allocated_ram -= job.builder.resources["ram"]
200+
self.allocated_cores -= job.builder.resources["cores"]
201+
runtimeContext.workflow_eval_lock.notifyAll()
202+
203+
thread = threading.Thread(target=runner)
204+
thread.daemon = True
205+
self.threads.add(thread)
206+
if isinstance(job, JobBase):
207+
self.allocated_ram += job.builder.resources["ram"]
208+
self.allocated_cores += job.builder.resources["cores"]
209+
thread.start()
161210

162211
def wait_for_next_completion(self, runtimeContext): # type: () -> None
163212
""" Wait for jobs to finish. """
@@ -181,8 +230,10 @@ def run_jobs(self,
181230
job.builder = runtimeContext.builder
182231
if job.outdir:
183232
self.output_dirs.add(job.outdir)
184-
self.run_job(job, runtimeContext)
185-
else:
233+
234+
self.run_job(job, runtimeContext)
235+
236+
if job is None:
186237
if self.threads:
187238
self.wait_for_next_completion(runtimeContext)
188239
else:

cwltool/main.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,7 @@ def main(argsl=None, # type: List[str]
559559
if not executor:
560560
if args.parallel:
561561
executor = MultithreadedJobExecutor()
562+
runtimeContext.select_resources = executor.select_resources
562563
else:
563564
executor = SingleJobExecutor()
564565
assert executor is not None

cwltool/process.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -719,7 +719,9 @@ def _init_job(self, joborder, runtimeContext):
719719
else: # PY2
720720
key = lambda dict: dict["position"]
721721
bindings.sort(key=key)
722-
builder.resources = self.evalResources(builder, runtimeContext)
722+
723+
if self.tool[u"class"] != 'Workflow':
724+
builder.resources = self.evalResources(builder, runtimeContext)
723725
return builder
724726

725727
def evalResources(self, builder, runtimeContext):
@@ -754,7 +756,7 @@ def evalResources(self, builder, runtimeContext):
754756
request[a + "Max"] = cast(int, mx)
755757

756758
if runtimeContext.select_resources:
757-
return runtimeContext.select_resources(request)
759+
return runtimeContext.select_resources(request, builder)
758760
else:
759761
return {
760762
"cores": request["coresMin"],

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
'typing >= 3.5.3',
5959
'mypy-extensions',
6060
'six >= 1.8.0',
61+
'psutil'
6162
],
6263
extras_require={
6364
':os.name=="posix"': ['subprocess32 >= 3.5.0'],

0 commit comments

Comments
 (0)