Skip to content

Commit a500e8f

Browse files
authored
Refactor to use loading / runtime context instead of kwargs (#788)
* Major refactor to get rid of kwargs and use well defined context objects. * Unwind incorrect changes to Process object constructor interface. * Remove get_feature, use get_requirement everywhere.
1 parent 886a6ac commit a500e8f

19 files changed

+557
-570
lines changed

README.rst

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -541,13 +541,13 @@ executor
541541
A toplevel workflow execution loop, should synchronously execute a process
542542
object and return an output object.
543543

544-
makeTool
544+
construct_tool_object
545545
::
546546

547-
makeTool(toolpath_object, **kwargs)
547+
construct_tool_object(toolpath_object, **kwargs)
548548
(Dict[Text, Any], **Any) -> Process
549549

550-
Construct a Process object from a document.
550+
Hook to construct a Process object (eg CommandLineTool) object from a document.
551551

552552
selectResources
553553
::

cwltool/builder.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import copy
44
import logging
55
from typing import (Any, Callable, Dict, List, # pylint: disable=unused-import
6-
Optional, Set, Text, Type, Union)
6+
Optional, Set, Text, Type, Union, Tuple)
77

88
from rdflib import Graph, URIRef # pylint: disable=unused-import
99
from rdflib.namespace import OWL, RDFS
@@ -20,7 +20,7 @@
2020
from .pathmapper import (PathMapper, # pylint: disable=unused-import
2121
get_listing, normalizeFilesDirs, visit_class)
2222
from .stdfsaccess import StdFsAccess # pylint: disable=unused-import
23-
from .utils import (aslist, docker_windows_path_adjust, get_feature,
23+
from .utils import (aslist, docker_windows_path_adjust,
2424
json_dumps, onWindows)
2525

2626
CONTENT_LIMIT = 64 * 1024
@@ -85,7 +85,23 @@ def check_format(actual_file, # type: Union[Dict[Text, Any], List, Text]
8585
u"File has an incompatible format: {}".format(
8686
json_dumps(afile, indent=4)))
8787

88-
class Builder(object):
88+
class HasReqsHints(object):
89+
def __init__(self):
90+
self.requirements = [] # List[Dict[Text, Any]]
91+
self.hints = [] # List[Dict[Text, Any]]
92+
93+
def get_requirement(self,
94+
feature # type: Text
95+
): # type: (...) -> Tuple[Optional[Any], Optional[bool]]
96+
for item in reversed(self.requirements):
97+
if item["class"] == feature:
98+
return (item, True)
99+
for item in reversed(self.hints):
100+
if item["class"] == feature:
101+
return (item, False)
102+
return (None, None)
103+
104+
class Builder(HasReqsHints):
89105
def __init__(self,
90106
job, # type: Dict[Text, Union[Dict[Text, Any], List, Text]]
91107
files, # type: List[Dict[Text, Text]]
@@ -96,9 +112,9 @@ def __init__(self,
96112
hints, # type: List[Dict[Text, Any]]
97113
timeout, # type: float
98114
debug, # type: bool
99-
resources, # type: Dict[Text, Union[int, Text, None]]
115+
resources, # type: Dict[Text, int]
100116
js_console, # type: bool
101-
mutation_manager, # type: MutationManager
117+
mutation_manager, # type: Optional[MutationManager]
102118
formatgraph, # type: Optional[Graph]
103119
make_fs_access, # type: Type[StdFsAccess]
104120
fs_access, # type: StdFsAccess
@@ -107,7 +123,7 @@ def __init__(self,
107123
outdir, # type: Text
108124
tmpdir, # type: Text
109125
stagedir, # type: Text
110-
job_script_provider, # type: Optional[Any]
126+
job_script_provider # type: Optional[Any]
111127
): # type: (...) -> None
112128
self.names = names
113129
self.schemaDefs = schemaDefs
@@ -299,7 +315,7 @@ def tostr(self, value): # type: (Any) -> Text
299315
raise WorkflowException(u"%s object missing \"path\": %s" % (value["class"], value))
300316

301317
# Path adjust for windows file path when passing to docker, docker accepts unix like path only
302-
(docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
318+
(docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
303319
if onWindows() and docker_req is not None:
304320
# docker_req is none only when there is no dockerRequirement
305321
# mentioned in hints and Requirement

cwltool/command_line_tool.py

Lines changed: 48 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from .utils import (aslist, convert_pathsep_to_unix,
4141
docker_windows_path_adjust, json_dumps, onWindows,
4242
windows_default_container_id)
43+
from .context import LoadingContext, RuntimeContext, getdefault
4344

4445
ACCEPTLIST_EN_STRICT_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
4546
ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*") # Accept anything
@@ -80,25 +81,23 @@ def __init__(self,
8081
self.tmpdir = tmpdir
8182
self.script = script
8283

83-
def run(self, **kwargs): # type: (**Any) -> None
84+
def run(self, runtimeContext): # type: (RuntimeContext) -> None
8485
try:
8586
ev = self.builder.do_eval(self.script)
8687
normalizeFilesDirs(ev)
8788
self.output_callback(ev, "success")
8889
except Exception as e:
8990
_logger.warning(u"Failed to evaluate expression:\n%s",
90-
e, exc_info=kwargs.get('debug'))
91+
e, exc_info=runtimeContext.debug)
9192
self.output_callback({}, "permanentFail")
9293

9394
def job(self,
9495
job_order, # type: Dict[Text, Text]
9596
output_callbacks, # type: Callable[[Any, Any], Any]
96-
mutation_manager, # type: MutationManager
97-
basedir, # type: Text
98-
**kwargs # type: Any
97+
runtimeContext # type: RuntimeContext
9998
):
10099
# type: (...) -> Generator[ExpressionTool.ExpressionJob, None, None]
101-
builder = self._init_job(job_order, mutation_manager, basedir, **kwargs)
100+
builder = self._init_job(job_order, runtimeContext)
102101

103102
yield ExpressionTool.ExpressionJob(
104103
builder, self.tool["expression"], output_callbacks,
@@ -169,13 +168,13 @@ def __init__(self, job, output_callback, cachebuilder, jobcache):
169168
self.cachebuilder = cachebuilder
170169
self.outdir = jobcache
171170

172-
def run(self, **kwargs):
173-
# type: (**Any) -> None
171+
def run(self, runtimeContext):
172+
# type: (RuntimeContext) -> None
174173
self.output_callback(self.job.collect_output_ports(
175174
self.job.tool["outputs"],
176175
self.cachebuilder,
177176
self.outdir,
178-
kwargs.get("compute_checksum", True)), "success")
177+
getdefault(runtimeContext.compute_checksum, True)), "success")
179178

180179

181180
def check_adjust(builder, file_o):
@@ -216,33 +215,29 @@ def make_path_mapper(reffiles, stagedir, basedir, separateDirs=True):
216215
OutputPorts = Dict[Text, Union[None, Text, List[Union[Dict[Text, Any], Text]], Dict[Text, Any]]]
217216

218217
class CommandLineTool(Process):
219-
def __init__(self, toolpath_object, eval_timeout, debug, js_console,
220-
force_docker_pull, job_script_provider, **kwargs):
221-
# type: (Dict[Text, Any], float, bool, bool, bool, Optional[DependenciesConfiguration], **Any) -> None
218+
def __init__(self, toolpath_object, loadingContext):
219+
# type: (Dict[Text, Any], LoadingContext) -> None
222220
super(CommandLineTool, self).__init__(
223-
toolpath_object, eval_timeout, debug, js_console,
224-
force_docker_pull, job_script_provider, **kwargs)
225-
self.find_default_container = kwargs.get("find_default_container", None)
221+
toolpath_object, loadingContext)
226222

227223
def make_job_runner(self,
228-
use_container=True, # type: Optional[bool]
229-
**kwargs # type: Any
224+
runtimeContext # type: RuntimeContext
230225
): # type: (...) -> Type[JobBase]
231226
dockerReq, _ = self.get_requirement("DockerRequirement")
232-
if not dockerReq and use_container:
233-
if self.find_default_container:
234-
default_container = self.find_default_container(self)
227+
if not dockerReq and runtimeContext.use_container:
228+
if runtimeContext.find_default_container:
229+
default_container = runtimeContext.find_default_container(self)
235230
if default_container:
236231
self.requirements.insert(0, {
237232
"class": "DockerRequirement",
238233
"dockerPull": default_container
239234
})
240235
dockerReq = self.requirements[0]
241-
if default_container == windows_default_container_id and use_container and onWindows():
236+
if default_container == windows_default_container_id and runtimeContext.use_container and onWindows():
242237
_logger.warning(DEFAULT_CONTAINER_MSG % (windows_default_container_id, windows_default_container_id))
243238

244-
if dockerReq and use_container:
245-
if kwargs.get('singularity'):
239+
if dockerReq and runtimeContext.use_container:
240+
if runtimeContext.singularity:
246241
return SingularityCommandLineJob
247242
else:
248243
return DockerCommandLineJob
@@ -268,9 +263,7 @@ def updatePathmap(self, outdir, pathmap, fn):
268263
def job(self,
269264
job_order, # type: Dict[Text, Text]
270265
output_callbacks, # type: Callable[[Any, Any], Any]
271-
mutation_manager, # type: MutationManager
272-
basedir, # type: Text
273-
**kwargs # type: Any
266+
runtimeContext # RuntimeContext
274267
):
275268
# type: (...) -> Generator[Union[JobBase, CallbackJob], None, None]
276269

@@ -281,15 +274,15 @@ def job(self,
281274
workReuse = self.get_requirement(require_prefix+"WorkReuse")[0]
282275
enableReuse = workReuse.get("enableReuse", True) if workReuse else True
283276

284-
jobname = uniquename(kwargs.get("name", shortname(self.tool.get("id", "job"))))
285-
if kwargs.get("cachedir") and enableReuse:
286-
cacheargs = kwargs.copy()
287-
cacheargs["outdir"] = "/out"
288-
cacheargs["tmpdir"] = "/tmp"
289-
cacheargs["stagedir"] = "/stage"
290-
cachebuilder = self._init_job(job_order, mutation_manager, basedir, **cacheargs)
277+
jobname = uniquename(runtimeContext.name or shortname(self.tool.get("id", "job")))
278+
if runtimeContext.cachedir and enableReuse:
279+
cachecontext = runtimeContext.copy()
280+
cachecontext.outdir = "/out"
281+
cachecontext.tmpdir = "/tmp"
282+
cachecontext.stagedir = "/stage"
283+
cachebuilder = self._init_job(job_order, cachecontext)
291284
cachebuilder.pathmapper = PathMapper(cachebuilder.files,
292-
basedir,
285+
runtimeContext.basedir,
293286
cachebuilder.stagedir,
294287
separateDirs=False)
295288
_check_adjust = partial(check_adjust, cachebuilder)
@@ -298,10 +291,10 @@ def job(self,
298291

299292
cmdline = flatten(list(map(cachebuilder.generate_arg, cachebuilder.bindings)))
300293
(docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
301-
if docker_req and kwargs.get("use_container"):
294+
if docker_req and runtimeContext.use_container:
302295
dockerimg = docker_req.get("dockerImageId") or docker_req.get("dockerPull")
303-
elif kwargs.get("default_container", None) is not None and kwargs.get("use_container"):
304-
dockerimg = kwargs.get("default_container")
296+
elif runtimeContext.default_container is not None and runtimeContext.use_container:
297+
dockerimg = runtimeContext.default_container
305298
else:
306299
dockerimg = None
307300

@@ -340,12 +333,12 @@ def job(self,
340333
_logger.debug("[job %s] keydictstr is %s -> %s", jobname,
341334
keydictstr, cachekey)
342335

343-
jobcache = os.path.join(kwargs["cachedir"], cachekey)
336+
jobcache = os.path.join(runtimeContext.cachedir, cachekey)
344337
jobcachepending = jobcache + ".pending"
345338

346339
if os.path.isdir(jobcache) and not os.path.isfile(jobcachepending):
347-
if docker_req and kwargs.get("use_container"):
348-
cachebuilder.outdir = kwargs.get("docker_outdir") or "/var/spool/cwl"
340+
if docker_req and runtimeContext.use_container:
341+
cachebuilder.outdir = runtimeContext.docker_outdir or "/var/spool/cwl"
349342
else:
350343
cachebuilder.outdir = jobcache
351344

@@ -356,7 +349,8 @@ def job(self,
356349
_logger.info("[job %s] Output of job will be cached in %s", jobname, jobcache)
357350
shutil.rmtree(jobcache, True)
358351
os.makedirs(jobcache)
359-
kwargs["outdir"] = jobcache
352+
runtimeContext = runtimeContext.copy()
353+
runtimeContext.outdir = jobcache
360354
open(jobcachepending, "w").close()
361355

362356
def rm_pending_output_callback(output_callbacks, jobcachepending,
@@ -367,18 +361,12 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
367361

368362
output_callbacks = partial(
369363
rm_pending_output_callback, output_callbacks, jobcachepending)
370-
# output_callbacks = cast(
371-
# Callable[..., Any], # known bug in mypy
372-
# # https://github.com/python/mypy/issues/797
373-
# partial(rm_pending_output_callback, output_callbacks,
374-
# jobcachepending))
375364

376-
377-
builder = self._init_job(job_order, mutation_manager, basedir, **kwargs)
365+
builder = self._init_job(job_order, runtimeContext)
378366

379367
reffiles = copy.deepcopy(builder.files)
380368

381-
j = self.make_job_runner(**kwargs)(
369+
j = self.make_job_runner(runtimeContext)(
382370
builder, builder.job, make_path_mapper, self.requirements,
383371
self.hints, jobname)
384372
j.successCodes = self.tool.get("successCodes")
@@ -391,13 +379,13 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
391379
_logger.debug(u"[job %s] initializing from %s%s",
392380
j.name,
393381
self.tool.get("id", ""),
394-
u" as part of %s" % kwargs["part_of"]
395-
if "part_of" in kwargs else "")
382+
u" as part of %s" % runtimeContext.part_of
383+
if runtimeContext.part_of else "")
396384
_logger.debug(u"[job %s] %s", j.name, json_dumps(job_order,
397385
indent=4))
398386

399387
builder.pathmapper = make_path_mapper(
400-
reffiles, builder.stagedir, basedir, kwargs.get("separateDirs", True))
388+
reffiles, builder.stagedir, runtimeContext.basedir, True)
401389
builder.requirements = j.requirements
402390

403391
_check_adjust = partial(check_adjust, builder)
@@ -475,12 +463,12 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
475463
json_dumps(builder.bindings, indent=4))
476464

477465
dockerReq = self.get_requirement("DockerRequirement")[0]
478-
if dockerReq and kwargs.get("use_container"):
479-
out_prefix = kwargs.get("tmp_outdir_prefix", 'tmp')
480-
j.outdir = kwargs.get("outdir") or \
466+
if dockerReq and runtimeContext.use_container:
467+
out_prefix = getdefault(runtimeContext.tmp_outdir_prefix, 'tmp')
468+
j.outdir = runtimeContext.outdir or \
481469
tempfile.mkdtemp(prefix=out_prefix) # type: ignore
482-
tmpdir_prefix = kwargs.get('tmpdir_prefix', 'tmp')
483-
j.tmpdir = kwargs.get("tmpdir") or \
470+
tmpdir_prefix = getdefault(runtimeContext.tmpdir_prefix, 'tmp')
471+
j.tmpdir = runtimeContext.tmpdir or \
484472
tempfile.mkdtemp(prefix=tmpdir_prefix) # type: ignore
485473
j.stagedir = tempfile.mkdtemp(prefix=tmpdir_prefix)
486474
else:
@@ -494,8 +482,8 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
494482
j.inplace_update = inplaceUpdateReq["inplaceUpdate"]
495483
normalizeFilesDirs(j.generatefiles)
496484

497-
readers = {}
498-
muts = set()
485+
readers = {} # type: Dict[Text, Any]
486+
muts = set() # type: Set[Text]
499487

500488
if builder.mutation_manager:
501489
def register_mut(f):
@@ -558,7 +546,7 @@ def register_reader(f):
558546
j.pathmapper = builder.pathmapper
559547
j.collect_outputs = partial(
560548
self.collect_output_ports, self.tool["outputs"], builder,
561-
compute_checksum=kwargs.get("compute_checksum", True),
549+
compute_checksum=getdefault(runtimeContext.compute_checksum, True),
562550
jobname=jobname,
563551
readers=readers)
564552
j.output_callback = output_callbacks

0 commit comments

Comments
 (0)