Skip to content

Commit 9a8e654

Browse files
authored
Merge pull request #800 from common-workflow-language/multiprocess
Multiprocess executor with resource accounting
2 parents 977962e + 7ad7834 commit 9a8e654

File tree

11 files changed

+174
-93
lines changed

11 files changed

+174
-93
lines changed

cwltool/argparser.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
2222
help="Output directory, default current directory")
2323

2424
parser.add_argument("--parallel", action="store_true", default=False,
25-
help="[experimental] Run jobs in parallel. "
26-
"Does not currently keep track of ResourceRequirements like the number of cores"
27-
"or memory and can overload this system")
25+
help="[experimental] Run jobs in parallel. ")
2826
envgroup = parser.add_mutually_exclusive_group()
2927
envgroup.add_argument("--preserve-environment", type=Text, action="append",
3028
help="Preserve specific environment variable when "

cwltool/builder.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ def __init__(self,
113113
hints=None, # type: List[Dict[Text, Any]]
114114
timeout=None, # type: float
115115
debug=False, # type: bool
116-
resources=None, # type: Dict[Text, int]
116+
resources=None, # type: Dict[str, int]
117117
js_console=False, # type: bool
118118
mutation_manager=None, # type: Optional[MutationManager]
119119
formatgraph=None, # type: Optional[Graph]
@@ -154,7 +154,7 @@ def __init__(self,
154154
self.tmpdir = tmpdir
155155

156156
if resources is None:
157-
self.resources = {} # type: Dict[Text, int]
157+
self.resources = {} # type: Dict[str, int]
158158
else:
159159
self.resources = resources
160160

cwltool/context.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import copy
2+
import threading # pylint: disable=unused-import
3+
24
from .utils import DEFAULT_TMP_PREFIX
35
from .stdfsaccess import StdFsAccess
46
from typing import (Any, Callable, Dict, # pylint: disable=unused-import
@@ -68,6 +70,8 @@ def copy(self):
6870
class RuntimeContext(ContextBase):
6971
def __init__(self, kwargs=None):
7072
# type: (Optional[Dict[str, Any]]) -> None
73+
select_resources_callable = Callable[ # pylint: disable=unused-variable
74+
[Dict[str, int], RuntimeContext], Dict[str, int]]
7175
self.user_space_docker_cmd = "" # type: Text
7276
self.secret_store = None # type: Optional[SecretStore]
7377
self.no_read_only = False # type: bool
@@ -107,7 +111,7 @@ def __init__(self, kwargs=None):
107111
self.docker_stagedir = "" # type: Text
108112
self.js_console = False # type: bool
109113
self.job_script_provider = None # type: Optional[DependenciesConfiguration]
110-
self.select_resources = None # type: Optional[Callable[[Dict[Text, int]], Dict[Text, int]]]
114+
self.select_resources = None # type: Optional[select_resources_callable]
111115
self.eval_timeout = 20 # type: float
112116
self.postScatterEval = None # type: Optional[Callable[[Dict[Text, Any]], Dict[Text, Any]]]
113117
self.on_error = "stop" # type: Text
@@ -116,6 +120,7 @@ def __init__(self, kwargs=None):
116120
self.cidfile_dir = None
117121
self.cidfile_prefix = None
118122

123+
self.workflow_eval_lock = None # type: Optional[threading.Condition]
119124
self.research_obj = None # type: Optional[ResearchObject]
120125
self.orcid = None
121126
self.cwl_full_name = None

cwltool/docker.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,4 +339,6 @@ def create_runtime(self, env, runtimeContext):
339339
for t, v in self.environment.items():
340340
runtime.append(u"--env=%s=%s" % (t, v))
341341

342+
runtime.append("--memory=%dm" % self.builder.resources["ram"])
343+
342344
return runtime

cwltool/executors.py

Lines changed: 140 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# -*- coding: utf-8 -*-
2+
""" Single and multi-threaded executors."""
13
import os
24
import tempfile
35
import threading
@@ -10,6 +12,8 @@
1012
import six
1113
from six import string_types
1214

15+
import psutil
16+
1317
from .builder import Builder # pylint: disable=unused-import
1418
from .errors import WorkflowException
1519
from .loghandler import _logger
@@ -44,32 +48,33 @@ def run_jobs(self,
4448
process, # type: Process
4549
job_order_object, # type: Dict[Text, Any]
4650
logger,
47-
runtimeContext # type: RuntimeContext
51+
runtime_context # type: RuntimeContext
4852
): # type: (...) -> None
4953
""" Execute the jobs for the given Process. """
5054
pass
5155

5256
def execute(self,
5357
process, # type: Process
5458
job_order_object, # type: Dict[Text, Any]
55-
runtimeContext, # type: RuntimeContext
59+
runtime_context, # type: RuntimeContext
5660
logger=_logger,
5761
): # type: (...) -> Tuple[Optional[Dict[Text, Any]], Text]
5862
""" Execute the process. """
5963

60-
if not runtimeContext.basedir:
64+
if not runtime_context.basedir:
6165
raise WorkflowException("Must provide 'basedir' in runtimeContext")
6266

6367
finaloutdir = None # Type: Optional[Text]
64-
original_outdir = runtimeContext.outdir
68+
original_outdir = runtime_context.outdir
6569
if isinstance(original_outdir, string_types):
6670
finaloutdir = os.path.abspath(original_outdir)
67-
runtimeContext = runtimeContext.copy()
68-
runtimeContext.outdir = tempfile.mkdtemp(
69-
prefix=getdefault(runtimeContext.tmp_outdir_prefix, DEFAULT_TMP_PREFIX))
70-
self.output_dirs.add(runtimeContext.outdir)
71-
runtimeContext.mutation_manager = MutationManager()
72-
runtimeContext.toplevel = True
71+
runtime_context = runtime_context.copy()
72+
runtime_context.outdir = tempfile.mkdtemp(
73+
prefix=getdefault(runtime_context.tmp_outdir_prefix, DEFAULT_TMP_PREFIX))
74+
self.output_dirs.add(runtime_context.outdir)
75+
runtime_context.mutation_manager = MutationManager()
76+
runtime_context.toplevel = True
77+
runtime_context.workflow_eval_lock = threading.Condition(threading.RLock())
7378

7479
job_reqs = None
7580
if "cwl:requirements" in job_order_object:
@@ -81,20 +86,20 @@ def execute(self,
8186
for req in job_reqs:
8287
process.requirements.append(req)
8388

84-
self.run_jobs(process, job_order_object, logger, runtimeContext)
89+
self.run_jobs(process, job_order_object, logger, runtime_context)
8590

8691
if self.final_output and self.final_output[0] and finaloutdir:
8792
self.final_output[0] = relocateOutputs(
8893
self.final_output[0], finaloutdir, self.output_dirs,
89-
runtimeContext.move_outputs, runtimeContext.make_fs_access(""),
90-
getdefault(runtimeContext.compute_checksum, True))
94+
runtime_context.move_outputs, runtime_context.make_fs_access(""),
95+
getdefault(runtime_context.compute_checksum, True))
9196

92-
if runtimeContext.rm_tmpdir:
97+
if runtime_context.rm_tmpdir:
9398
cleanIntermediate(self.output_dirs)
9499

95100
if self.final_output and self.final_status:
96101

97-
if runtimeContext.research_obj is not None and \
102+
if runtime_context.research_obj is not None and \
98103
isinstance(process, (JobBase, Process, WorkflowJobStep,
99104
WorkflowJob)) and process.parent_wf:
100105
process_run_id = None
@@ -115,45 +120,46 @@ def run_jobs(self,
115120
process, # type: Process
116121
job_order_object, # type: Dict[Text, Any]
117122
logger,
118-
runtimeContext # type: RuntimeContext
123+
runtime_context # type: RuntimeContext
119124
): # type: (...) -> None
120125

121126
process_run_id = None # type: Optional[str]
122127
reference_locations = {} # type: Dict[Text,Text]
123128

124129
# define provenance profile for single commandline tool
125130
if not isinstance(process, Workflow) \
126-
and runtimeContext.research_obj is not None:
127-
orcid = runtimeContext.orcid
128-
full_name = runtimeContext.cwl_full_name
131+
and runtime_context.research_obj is not None:
132+
orcid = runtime_context.orcid
133+
full_name = runtime_context.cwl_full_name
129134
process.provenance_object = CreateProvProfile(
130-
runtimeContext.research_obj, orcid, full_name)
135+
runtime_context.research_obj, orcid, full_name)
131136
process.parent_wf = process.provenance_object
132-
jobiter = process.job(job_order_object, self.output_callback, runtimeContext)
137+
jobiter = process.job(job_order_object, self.output_callback,
138+
runtime_context)
133139

134140
try:
135141
for job in jobiter:
136142
if job:
137-
if runtimeContext.builder is not None:
138-
job.builder = runtimeContext.builder
143+
if runtime_context.builder is not None:
144+
job.builder = runtime_context.builder
139145
if job.outdir:
140146
self.output_dirs.add(job.outdir)
141-
if runtimeContext.research_obj is not None:
147+
if runtime_context.research_obj is not None:
142148
if not isinstance(process, Workflow):
143-
runtimeContext.prov_obj = process.provenance_object
149+
runtime_context.prov_obj = process.provenance_object
144150
else:
145-
runtimeContext.prov_obj = job.prov_obj
146-
assert runtimeContext.prov_obj
151+
runtime_context.prov_obj = job.prov_obj
152+
assert runtime_context.prov_obj
147153
process_run_id, reference_locations = \
148-
runtimeContext.prov_obj.evaluate(
149-
process, job, job_order_object,
150-
runtimeContext.make_fs_access,
151-
runtimeContext)
152-
runtimeContext = runtimeContext.copy()
153-
runtimeContext.process_run_id = process_run_id
154-
runtimeContext.reference_locations = \
154+
runtime_context.prov_obj.evaluate(
155+
process, job, job_order_object,
156+
runtime_context.make_fs_access,
157+
runtime_context)
158+
runtime_context = runtime_context.copy()
159+
runtime_context.process_run_id = process_run_id
160+
runtime_context.reference_locations = \
155161
reference_locations
156-
job.run(runtimeContext)
162+
job.run(runtime_context)
157163
else:
158164
logger.error("Workflow cannot make any more progress.")
159165
break
@@ -168,60 +174,130 @@ class MultithreadedJobExecutor(JobExecutor):
168174
"""
169175
Experimental multi-threaded CWL executor.
170176
171-
Can easily overload a system as it does not do resource accounting.
177+
Does simple resource accounting, will not start a job unless it
178+
has cores / ram available, but does not make any attempt to
179+
optimize usage.
172180
"""
181+
173182
def __init__(self): # type: () -> None
174183
super(MultithreadedJobExecutor, self).__init__()
175184
self.threads = set() # type: Set[threading.Thread]
176185
self.exceptions = [] # type: List[WorkflowException]
186+
self.pending_jobs = [] # type: List[JobBase]
187+
self.pending_jobs_lock = threading.Lock()
188+
189+
self.max_ram = psutil.virtual_memory().available / 2**20
190+
self.max_cores = psutil.cpu_count()
191+
self.allocated_ram = 0
192+
self.allocated_cores = 0
193+
194+
def select_resources(self, request, runtime_context): # pylint: disable=unused-argument
195+
# type: (Dict[str, int], RuntimeContext) -> Dict[str, int]
196+
""" Naïve check for available cpu cores and memory. """
197+
result = {} # type: Dict[str, int]
198+
maxrsc = {
199+
"cores": self.max_cores,
200+
"ram": self.max_ram
201+
}
202+
for rsc in ("cores", "ram"):
203+
if request[rsc+"Min"] > maxrsc[rsc]:
204+
raise WorkflowException(
205+
"Requested at least %d %s but only %d available" %
206+
(request[rsc+"Min"], rsc, maxrsc[rsc]))
207+
if request[rsc+"Max"] < maxrsc[rsc]:
208+
result[rsc] = request[rsc+"Max"]
209+
else:
210+
result[rsc] = maxrsc[rsc]
211+
212+
return result
177213

178214
def run_job(self,
179-
job, # type: JobBase
180-
runtimeContext # type: RuntimeContext
215+
job, # type: JobBase
216+
runtime_context # type: RuntimeContext
181217
): # type: (...) -> None
182218
""" Execute a single Job in a seperate thread. """
183-
def runner():
184-
""" Job running thread. """
185-
try:
186-
job.run(runtimeContext)
187-
except WorkflowException as err:
188-
self.exceptions.append(err)
189-
except Exception as err:
190-
self.exceptions.append(WorkflowException(Text(err)))
191-
self.threads.remove(thread)
192-
193-
thread = threading.Thread(target=runner)
194-
thread.daemon = True
195-
self.threads.add(thread)
196-
thread.start()
197-
198-
def wait_for_next_completion(self): # type: () -> None
199-
""" Check for exceptions while waiting for the jobs to finish. """
219+
220+
if job is not None:
221+
with self.pending_jobs_lock:
222+
self.pending_jobs.append(job)
223+
224+
while self.pending_jobs:
225+
with self.pending_jobs_lock:
226+
job = self.pending_jobs[0]
227+
if isinstance(job, JobBase):
228+
if ((self.allocated_ram + job.builder.resources["ram"])
229+
> self.max_ram or
230+
(self.allocated_cores + job.builder.resources["cores"])
231+
> self.max_cores):
232+
return
233+
self.pending_jobs.remove(job)
234+
235+
def runner(my_job, my_runtime_context):
236+
""" Job running thread. """
237+
try:
238+
my_job.run(my_runtime_context)
239+
except WorkflowException as err:
240+
_logger.exception("Got workflow error")
241+
self.exceptions.append(err)
242+
except Exception as err: # pylint: disable=broad-except
243+
_logger.exception("Got workflow error")
244+
self.exceptions.append(WorkflowException(Text(err)))
245+
finally:
246+
with my_runtime_context.workflow_eval_lock:
247+
self.threads.remove(threading.current_thread())
248+
if isinstance(my_job, JobBase):
249+
self.allocated_ram -= my_job.builder.resources["ram"]
250+
self.allocated_cores -= my_job.builder.resources["cores"]
251+
my_runtime_context.workflow_eval_lock.notifyAll()
252+
253+
thread = threading.Thread(
254+
target=runner, args=(job, runtime_context))
255+
thread.daemon = True
256+
self.threads.add(thread)
257+
if isinstance(job, JobBase):
258+
self.allocated_ram += job.builder.resources["ram"]
259+
self.allocated_cores += job.builder.resources["cores"]
260+
thread.start()
261+
262+
263+
def wait_for_next_completion(self, runtimeContext): # type: (RuntimeContext) -> None
264+
""" Wait for jobs to finish. """
265+
if runtimeContext.workflow_eval_lock is not None:
266+
runtimeContext.workflow_eval_lock.wait()
200267
if self.exceptions:
201268
raise self.exceptions[0]
202269

203270
def run_jobs(self,
204271
process, # type: Process
205272
job_order_object, # type: Dict[Text, Any]
206273
logger,
207-
runtimeContext # type: RuntimeContext
274+
runtime_context # type: RuntimeContext
208275
): # type: (...) -> None
209276

210-
jobiter = process.job(job_order_object, self.output_callback, runtimeContext)
277+
jobiter = process.job(job_order_object, self.output_callback,
278+
runtime_context)
211279

280+
if runtime_context.workflow_eval_lock is None:
281+
raise WorkflowException(
282+
"runtimeContext.workflow_eval_lock must not be None")
283+
284+
runtime_context.workflow_eval_lock.acquire()
212285
for job in jobiter:
213-
if job:
214-
if runtimeContext.builder is not None:
215-
job.builder = runtimeContext.builder
286+
if job is not None:
287+
if runtime_context.builder is not None:
288+
job.builder = runtime_context.builder
216289
if job.outdir:
217290
self.output_dirs.add(job.outdir)
218-
self.run_job(job, runtimeContext)
219-
else:
291+
292+
self.run_job(job, runtime_context)
293+
294+
if job is None:
220295
if self.threads:
221-
self.wait_for_next_completion()
296+
self.wait_for_next_completion(runtime_context)
222297
else:
223298
logger.error("Workflow cannot make any more progress.")
224299
break
225300

226301
while self.threads:
227-
self.wait_for_next_completion()
302+
self.wait_for_next_completion(runtime_context)
303+
runtime_context.workflow_eval_lock.release()

cwltool/expression.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ def do_eval(ex, # type: Union[Text, Dict]
253253
requirements, # type: List[Dict[Text, Any]]
254254
outdir, # type: Optional[Text]
255255
tmpdir, # type: Optional[Text]
256-
resources, # type: Dict[Text, int]
256+
resources, # type: Dict[str, int]
257257
context=None, # type: Any
258258
timeout=None, # type: float
259259
force_docker_pull=False, # type: bool
@@ -262,7 +262,7 @@ def do_eval(ex, # type: Union[Text, Dict]
262262
strip_whitespace=True # type: bool
263263
): # type: (...) -> Any
264264

265-
runtime = copy.copy(resources) # type: Dict[Text, Any]
265+
runtime = copy.copy(resources) # type: Dict[str, Any]
266266
runtime["tmpdir"] = docker_windows_path_adjust(tmpdir)
267267
runtime["outdir"] = docker_windows_path_adjust(outdir)
268268

0 commit comments

Comments
 (0)