Skip to content

Commit 0f1be0c

Browse files
committed
code, type cleanup
1 parent 40ba7d2 commit 0f1be0c

File tree

5 files changed

+76
-79
lines changed

5 files changed

+76
-79
lines changed

cwltool/builder.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ def __init__(self,
113113
hints=None, # type: List[Dict[Text, Any]]
114114
timeout=None, # type: float
115115
debug=False, # type: bool
116-
resources=None, # type: Dict[Text, int]
116+
resources=None, # type: Dict[str, int]
117117
js_console=False, # type: bool
118118
mutation_manager=None, # type: Optional[MutationManager]
119119
formatgraph=None, # type: Optional[Graph]
@@ -154,7 +154,7 @@ def __init__(self,
154154
self.tmpdir = tmpdir
155155

156156
if resources is None:
157-
self.resources = {} # type: Dict[Text, int]
157+
self.resources = {} # type: Dict[str, int]
158158
else:
159159
self.resources = resources
160160

cwltool/context.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import copy
2-
import threading
2+
import threading # pylint: disable=unused-import
33

44
from .utils import DEFAULT_TMP_PREFIX
55
from .stdfsaccess import StdFsAccess
@@ -70,6 +70,8 @@ def copy(self):
7070
class RuntimeContext(ContextBase):
7171
def __init__(self, kwargs=None):
7272
# type: (Optional[Dict[str, Any]]) -> None
73+
select_resources_callable = Callable[ # pylint: disable=unused-variable
74+
[Dict[str, int], RuntimeContext], Dict[str, int]]
7375
self.user_space_docker_cmd = "" # type: Text
7476
self.secret_store = None # type: Optional[SecretStore]
7577
self.no_read_only = False # type: bool
@@ -109,7 +111,7 @@ def __init__(self, kwargs=None):
109111
self.docker_stagedir = "" # type: Text
110112
self.js_console = False # type: bool
111113
self.job_script_provider = None # type: Optional[DependenciesConfiguration]
112-
self.select_resources = None # type: Optional[Callable[[Dict[Text, int], RuntimeContext], Dict[Text, int]]]
114+
self.select_resources = None # type: Optional[select_resources_callable]
113115
self.eval_timeout = 20 # type: float
114116
self.postScatterEval = None # type: Optional[Callable[[Dict[Text, Any]], Dict[Text, Any]]]
115117
self.on_error = "stop" # type: Text

cwltool/executors.py

Lines changed: 66 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# -*- coding: utf-8 -*-
2+
""" Single and multi-threaded executors."""
13
import os
24
import tempfile
35
import threading
@@ -46,33 +48,33 @@ def run_jobs(self,
4648
process, # type: Process
4749
job_order_object, # type: Dict[Text, Any]
4850
logger,
49-
runtimeContext # type: RuntimeContext
51+
runtime_context # type: RuntimeContext
5052
): # type: (...) -> None
5153
""" Execute the jobs for the given Process. """
5254
pass
5355

5456
def execute(self,
5557
process, # type: Process
5658
job_order_object, # type: Dict[Text, Any]
57-
runtimeContext, # type: RuntimeContext
59+
runtime_context, # type: RuntimeContext
5860
logger=_logger,
5961
): # type: (...) -> Tuple[Optional[Dict[Text, Any]], Text]
6062
""" Execute the process. """
6163

62-
if not runtimeContext.basedir:
64+
if not runtime_context.basedir:
6365
raise WorkflowException("Must provide 'basedir' in runtimeContext")
6466

6567
finaloutdir = None # Type: Optional[Text]
66-
original_outdir = runtimeContext.outdir
68+
original_outdir = runtime_context.outdir
6769
if isinstance(original_outdir, string_types):
6870
finaloutdir = os.path.abspath(original_outdir)
69-
runtimeContext = runtimeContext.copy()
70-
runtimeContext.outdir = tempfile.mkdtemp(
71-
prefix=getdefault(runtimeContext.tmp_outdir_prefix, DEFAULT_TMP_PREFIX))
72-
self.output_dirs.add(runtimeContext.outdir)
73-
runtimeContext.mutation_manager = MutationManager()
74-
runtimeContext.toplevel = True
75-
runtimeContext.workflow_eval_lock = threading.Condition(threading.RLock())
71+
runtime_context = runtime_context.copy()
72+
runtime_context.outdir = tempfile.mkdtemp(
73+
prefix=getdefault(runtime_context.tmp_outdir_prefix, DEFAULT_TMP_PREFIX))
74+
self.output_dirs.add(runtime_context.outdir)
75+
runtime_context.mutation_manager = MutationManager()
76+
runtime_context.toplevel = True
77+
runtime_context.workflow_eval_lock = threading.Condition(threading.RLock())
7678

7779
job_reqs = None
7880
if "cwl:requirements" in job_order_object:
@@ -84,20 +86,20 @@ def execute(self,
8486
for req in job_reqs:
8587
process.requirements.append(req)
8688

87-
self.run_jobs(process, job_order_object, logger, runtimeContext)
89+
self.run_jobs(process, job_order_object, logger, runtime_context)
8890

8991
if self.final_output and self.final_output[0] and finaloutdir:
9092
self.final_output[0] = relocateOutputs(
9193
self.final_output[0], finaloutdir, self.output_dirs,
92-
runtimeContext.move_outputs, runtimeContext.make_fs_access(""),
93-
getdefault(runtimeContext.compute_checksum, True))
94+
runtime_context.move_outputs, runtime_context.make_fs_access(""),
95+
getdefault(runtime_context.compute_checksum, True))
9496

95-
if runtimeContext.rm_tmpdir:
97+
if runtime_context.rm_tmpdir:
9698
cleanIntermediate(self.output_dirs)
9799

98100
if self.final_output and self.final_status:
99101

100-
if runtimeContext.research_obj is not None and \
102+
if runtime_context.research_obj is not None and \
101103
isinstance(process, (JobBase, Process, WorkflowJobStep,
102104
WorkflowJob)) and process.parent_wf:
103105
process_run_id = None
@@ -118,45 +120,46 @@ def run_jobs(self,
118120
process, # type: Process
119121
job_order_object, # type: Dict[Text, Any]
120122
logger,
121-
runtimeContext # type: RuntimeContext
123+
runtime_context # type: RuntimeContext
122124
): # type: (...) -> None
123125

124126
process_run_id = None # type: Optional[str]
125127
reference_locations = {} # type: Dict[Text,Text]
126128

127129
# define provenance profile for single commandline tool
128130
if not isinstance(process, Workflow) \
129-
and runtimeContext.research_obj is not None:
130-
orcid = runtimeContext.orcid
131-
full_name = runtimeContext.cwl_full_name
131+
and runtime_context.research_obj is not None:
132+
orcid = runtime_context.orcid
133+
full_name = runtime_context.cwl_full_name
132134
process.provenance_object = CreateProvProfile(
133-
runtimeContext.research_obj, orcid, full_name)
135+
runtime_context.research_obj, orcid, full_name)
134136
process.parent_wf = process.provenance_object
135-
jobiter = process.job(job_order_object, self.output_callback, runtimeContext)
137+
jobiter = process.job(job_order_object, self.output_callback,
138+
runtime_context)
136139

137140
try:
138141
for job in jobiter:
139142
if job:
140-
if runtimeContext.builder is not None:
141-
job.builder = runtimeContext.builder
143+
if runtime_context.builder is not None:
144+
job.builder = runtime_context.builder
142145
if job.outdir:
143146
self.output_dirs.add(job.outdir)
144-
if runtimeContext.research_obj is not None:
147+
if runtime_context.research_obj is not None:
145148
if not isinstance(process, Workflow):
146-
runtimeContext.prov_obj = process.provenance_object
149+
runtime_context.prov_obj = process.provenance_object
147150
else:
148-
runtimeContext.prov_obj = job.prov_obj
149-
assert runtimeContext.prov_obj
151+
runtime_context.prov_obj = job.prov_obj
152+
assert runtime_context.prov_obj
150153
process_run_id, reference_locations = \
151-
runtimeContext.prov_obj.evaluate(
152-
process, job, job_order_object,
153-
runtimeContext.make_fs_access,
154-
runtimeContext)
155-
runtimeContext = runtimeContext.copy()
156-
runtimeContext.process_run_id = process_run_id
157-
runtimeContext.reference_locations = \
154+
runtime_context.prov_obj.evaluate(
155+
process, job, job_order_object,
156+
runtime_context.make_fs_access,
157+
runtime_context)
158+
runtime_context = runtime_context.copy()
159+
runtime_context.process_run_id = process_run_id
160+
runtime_context.reference_locations = \
158161
reference_locations
159-
job.run(runtimeContext)
162+
job.run(runtime_context)
160163
else:
161164
logger.error("Workflow cannot make any more progress.")
162165
break
@@ -182,20 +185,24 @@ def __init__(self): # type: () -> None
182185
self.exceptions = [] # type: List[WorkflowException]
183186
self.pending_jobs = [] # type: List[JobBase]
184187

185-
self.max_ram = psutil.virtual_memory().total / 2**20
188+
self.max_ram = psutil.virtual_memory().available / 2**20
186189
self.max_cores = psutil.cpu_count()
187190
self.allocated_ram = 0
188191
self.allocated_cores = 0
189192

190-
def select_resources(self, request, runtimeContext):
191-
result = {}
193+
def select_resources(self, request, runtime_context): # pylint: disable=unused-argument
194+
# type: (Dict[str, int], RuntimeContext) -> Dict[str, int]
195+
""" Naïve check for available cpu cores and memory. """
196+
result = {} # type: Dict[str, int]
192197
maxrsc = {
193198
"cores": self.max_cores,
194199
"ram": self.max_ram
195200
}
196201
for rsc in ("cores", "ram"):
197202
if request[rsc+"Min"] > maxrsc[rsc]:
198-
raise WorkflowException("Requested at least %d %s but only %d available", request[rsc+"Min"], rsc, maxrsc[rsc])
203+
raise WorkflowException(
204+
"Requested at least %d %s but only %d available" %
205+
(request[rsc+"Min"], rsc, maxrsc[rsc]))
199206
if request[rsc+"Max"] < maxrsc[rsc]:
200207
result[rsc] = request[rsc+"Max"]
201208
else:
@@ -204,8 +211,8 @@ def select_resources(self, request, runtimeContext):
204211
return result
205212

206213
def run_job(self,
207-
job, # type: JobBase
208-
runtimeContext # type: RuntimeContext
214+
job, # type: JobBase
215+
runtime_context # type: RuntimeContext
209216
): # type: (...) -> None
210217
""" Execute a single Job in a seperate thread. """
211218

@@ -216,28 +223,28 @@ def run_job(self,
216223
job = self.pending_jobs[0]
217224
if isinstance(job, JobBase):
218225
if ((self.allocated_ram + job.builder.resources["ram"]) > self.max_ram or
219-
(self.allocated_cores + job.builder.resources["cores"]) > self.max_cores):
226+
(self.allocated_cores + job.builder.resources["cores"]) > self.max_cores):
220227
return
221228

222229
self.pending_jobs.pop(0)
223230

224231
def runner():
225232
""" Job running thread. """
226233
try:
227-
job.run(runtimeContext)
234+
job.run(runtime_context)
228235
except WorkflowException as err:
229236
_logger.exception("Got workflow error")
230237
self.exceptions.append(err)
231-
except Exception as err:
238+
except Exception as err: # pylint: disable=broad-except
232239
_logger.exception("Got workflow error")
233240
self.exceptions.append(WorkflowException(Text(err)))
234241
finally:
235-
with runtimeContext.workflow_eval_lock:
242+
with runtime_context.workflow_eval_lock:
236243
self.threads.remove(thread)
237244
if isinstance(job, JobBase):
238245
self.allocated_ram -= job.builder.resources["ram"]
239246
self.allocated_cores -= job.builder.resources["cores"]
240-
runtimeContext.workflow_eval_lock.notifyAll()
247+
runtime_context.workflow_eval_lock.notifyAll()
241248

242249
thread = threading.Thread(target=runner)
243250
thread.daemon = True
@@ -258,30 +265,32 @@ def run_jobs(self,
258265
process, # type: Process
259266
job_order_object, # type: Dict[Text, Any]
260267
logger,
261-
runtimeContext # type: RuntimeContext
268+
runtime_context # type: RuntimeContext
262269
): # type: (...) -> None
263270

264-
jobiter = process.job(job_order_object, self.output_callback, runtimeContext)
271+
jobiter = process.job(job_order_object, self.output_callback,
272+
runtime_context)
265273

266-
if runtimeContext.workflow_eval_lock is None:
267-
raise WorkflowException("runtimeContext.workflow_eval_lock must not be None")
274+
if runtime_context.workflow_eval_lock is None:
275+
raise WorkflowException(
276+
"runtimeContext.workflow_eval_lock must not be None")
268277

269-
runtimeContext.workflow_eval_lock.acquire()
278+
runtime_context.workflow_eval_lock.acquire()
270279
for job in jobiter:
271280
if job is not None:
272-
if runtimeContext.builder is not None:
273-
job.builder = runtimeContext.builder
281+
if runtime_context.builder is not None:
282+
job.builder = runtime_context.builder
274283
if job.outdir:
275284
self.output_dirs.add(job.outdir)
276285

277-
self.run_job(job, runtimeContext)
286+
self.run_job(job, runtime_context)
278287

279288
if job is None:
280289
if self.threads:
281-
self.wait_for_next_completion(runtimeContext)
290+
self.wait_for_next_completion(runtime_context)
282291
else:
283292
logger.error("Workflow cannot make any more progress.")
284293
break
285294

286295
while self.threads:
287-
self.wait_for_next_completion(runtimeContext)
296+
self.wait_for_next_completion(runtime_context)

cwltool/expression.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ def do_eval(ex, # type: Union[Text, Dict]
253253
requirements, # type: List[Dict[Text, Any]]
254254
outdir, # type: Optional[Text]
255255
tmpdir, # type: Optional[Text]
256-
resources, # type: Dict[Text, int]
256+
resources, # type: Dict[str, int]
257257
context=None, # type: Any
258258
timeout=None, # type: float
259259
force_docker_pull=False, # type: bool
@@ -262,7 +262,7 @@ def do_eval(ex, # type: Union[Text, Dict]
262262
strip_whitespace=True # type: bool
263263
): # type: (...) -> Any
264264

265-
runtime = copy.copy(resources) # type: Dict[Text, Any]
265+
runtime = copy.copy(resources) # type: Dict[str, Any]
266266
runtime["tmpdir"] = docker_windows_path_adjust(tmpdir)
267267
runtime["outdir"] = docker_windows_path_adjust(outdir)
268268

cwltool/process.py

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -571,20 +571,6 @@ def __init__(self,
571571

572572
def _init_job(self, joborder, runtimeContext):
573573
# type: (Dict[Text, Text], RuntimeContext) -> Builder
574-
"""
575-
kwargs:
576-
577-
use_container: do/don't use Docker when DockerRequirement hint provided
578-
make_fs_access: make an FsAccess() object with given basedir
579-
docker_outdir: output directory inside docker for this job
580-
docker_tmpdir: tmpdir inside docker for this job
581-
docker_stagedir: stagedir inside docker for this job
582-
outdir: outdir on host for this job
583-
tmpdir: tmpdir on host for this job
584-
stagedir: stagedir on host for this job
585-
select_resources: callback to select compute resources
586-
tmp_outdir_prefix: Path prefix for intermediate output directories
587-
"""
588574

589575
job = cast(Dict[Text, Union[Dict[Text, Any], List,
590576
Text]], copy.deepcopy(joborder))
@@ -717,7 +703,7 @@ def _init_job(self, joborder, runtimeContext):
717703
return builder
718704

719705
def evalResources(self, builder, runtimeContext):
720-
# type: (Builder, RuntimeContext) -> Dict[Text, int]
706+
# type: (Builder, RuntimeContext) -> Dict[str, int]
721707
resourceReq, _ = self.get_requirement("ResourceRequirement")
722708
if resourceReq is None:
723709
resourceReq = {}
@@ -730,7 +716,7 @@ def evalResources(self, builder, runtimeContext):
730716
"tmpdirMax": 1024,
731717
"outdirMin": 1024,
732718
"outdirMax": 1024
733-
} # type: Dict[Text, int]
719+
} # type: Dict[str, int]
734720
for a in ("cores", "ram", "tmpdir", "outdir"):
735721
mn = None
736722
mx = None

0 commit comments

Comments
 (0)