Skip to content

Commit 8fd2300

Browse files
committed
match Process.job def in subtypes
1 parent 648c525 commit 8fd2300

File tree

3 files changed

+49
-27
lines changed

3 files changed

+49
-27
lines changed

cwltool/draft2tool.py

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,18 @@ def run(self, **kwargs): # type: (**Any) -> None
6060
e, exc_info=kwargs.get('debug'))
6161
self.output_callback({}, "permanentFail")
6262

63-
def job(self, joborder, output_callback, **kwargs):
64-
# type: (Dict[Text, Text], Callable[[Any, Any], Any], **Any) -> Generator[ExpressionTool.ExpressionJob, None, None]
65-
builder = self._init_job(joborder, **kwargs)
63+
def job(self,
64+
job_order, # type: Dict[Text, Text]
65+
output_callbacks, # type: Callable[[Any, Any], Any]
66+
**kwargs # type: Any
67+
):
68+
# type: (...) -> Generator[ExpressionTool.ExpressionJob, None, None]
69+
builder = self._init_job(job_order, **kwargs)
6670

6771
j = ExpressionTool.ExpressionJob()
6872
j.builder = builder
6973
j.script = self.tool["expression"]
70-
j.output_callback = output_callback
74+
j.output_callback = output_callbacks
7175
j.requirements = self.requirements
7276
j.hints = self.hints
7377
j.outdir = None
@@ -165,8 +169,12 @@ def makePathMapper(self, reffiles, stagedir, **kwargs):
165169
dockerReq, _ = self.get_requirement("DockerRequirement")
166170
return PathMapper(reffiles, kwargs["basedir"], stagedir)
167171

168-
def job(self, joborder, output_callback, **kwargs):
169-
# type: (Dict[Text, Text], Callable[..., Any], **Any) -> Generator[Union[CommandLineJob, CallbackJob], None, None]
172+
def job(self,
173+
job_order, # type: Dict[Text, Text]
174+
output_callbacks, # type: Callable[[Any, Any], Any]
175+
**kwargs # type: Any
176+
):
177+
# type: (...) -> Generator[Union[CommandLineJob, CallbackJob], None, None]
170178

171179
jobname = uniquename(kwargs.get("name", shortname(self.tool.get("id", "job"))))
172180

@@ -175,7 +183,7 @@ def job(self, joborder, output_callback, **kwargs):
175183
cacheargs["outdir"] = "/out"
176184
cacheargs["tmpdir"] = "/tmp"
177185
cacheargs["stagedir"] = "/stage"
178-
cachebuilder = self._init_job(joborder, **cacheargs)
186+
cachebuilder = self._init_job(job_order, **cacheargs)
179187
cachebuilder.pathmapper = PathMapper(cachebuilder.files,
180188
kwargs["basedir"],
181189
cachebuilder.stagedir,
@@ -222,7 +230,7 @@ def job(self, joborder, output_callback, **kwargs):
222230
cachebuilder.outdir = jobcache
223231

224232
_logger.info("[job %s] Using cached output in %s", jobname, jobcache)
225-
yield CallbackJob(self, output_callback, cachebuilder, jobcache)
233+
yield CallbackJob(self, output_callbacks, cachebuilder, jobcache)
226234
return
227235
else:
228236
_logger.info("[job %s] Output of job will be cached in %s", jobname, jobcache)
@@ -231,19 +239,19 @@ def job(self, joborder, output_callback, **kwargs):
231239
kwargs["outdir"] = jobcache
232240
open(jobcachepending, "w").close()
233241

234-
def rm_pending_output_callback(output_callback, jobcachepending,
242+
def rm_pending_output_callback(output_callbacks, jobcachepending,
235243
outputs, processStatus):
236244
if processStatus == "success":
237245
os.remove(jobcachepending)
238-
output_callback(outputs, processStatus)
246+
output_callbacks(outputs, processStatus)
239247

240-
output_callback = cast(
248+
output_callbacks = cast(
241249
Callable[..., Any], # known bug in mypy
242250
# https://github.com/python/mypy/issues/797
243-
partial(rm_pending_output_callback, output_callback,
251+
partial(rm_pending_output_callback, output_callbacks,
244252
jobcachepending))
245253

246-
builder = self._init_job(joborder, **kwargs)
254+
builder = self._init_job(job_order, **kwargs)
247255

248256
reffiles = copy.deepcopy(builder.files)
249257

@@ -265,7 +273,7 @@ def rm_pending_output_callback(output_callback, jobcachepending,
265273
j.name,
266274
self.tool.get("id", ""),
267275
u" as part of %s" % kwargs["part_of"] if "part_of" in kwargs else "")
268-
_logger.debug(u"[job %s] %s", j.name, json.dumps(joborder, indent=4))
276+
_logger.debug(u"[job %s] %s", j.name, json.dumps(job_order, indent=4))
269277

270278
builder.pathmapper = None
271279
make_path_mapper_kwargs = kwargs
@@ -377,7 +385,7 @@ def rm_pending_output_callback(output_callback, jobcachepending,
377385
j.collect_outputs = partial(
378386
self.collect_output_ports, self.tool["outputs"], builder,
379387
compute_checksum=kwargs.get("compute_checksum", True))
380-
j.output_callback = output_callback
388+
j.output_callback = output_callbacks
381389

382390
yield j
383391

cwltool/process.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -599,8 +599,12 @@ def visit(self, op): # type: (Callable[[Dict[Text, Any]], None]) -> None
599599
op(self.tool)
600600

601601
@abc.abstractmethod
602-
def job(self, job_order, output_callbacks, **kwargs):
603-
# type: (Dict[Text, Text], Callable[[Any, Any], Any], **Any) -> Generator[Any, None, None]
602+
def job(self,
603+
job_order, # type: Dict[Text, Text]
604+
output_callbacks, # type: Callable[[Any, Any], Any]
605+
**kwargs # type: Any
606+
):
607+
# type: (...) -> Generator[Any, None, None]
604608
return None
605609

606610

cwltool/workflow.py

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -436,15 +436,19 @@ def __init__(self, toolpath_object, **kwargs):
436436

437437
# TODO: statically validate data links instead of doing it at runtime.
438438

439-
def job(self, joborder, output_callback, **kwargs):
440-
# type: (Dict[Text, Text], Callable[[Any, Any], Any], **Any) -> Generator
441-
builder = self._init_job(joborder, **kwargs)
439+
def job(self,
440+
job_order, # type: Dict[Text, Text]
441+
output_callbacks, # type: Callable[[Any, Any], Any]
442+
**kwargs # type: Any
443+
):
444+
# type: (...) -> Generator[Any, None, None]
445+
builder = self._init_job(job_order, **kwargs)
442446
wj = WorkflowJob(self, **kwargs)
443447
yield wj
444448

445449
kwargs["part_of"] = u"workflow %s" % wj.name
446450

447-
for w in wj.job(builder.job, output_callback, **kwargs):
451+
for w in wj.job(builder.job, output_callbacks, **kwargs):
448452
yield w
449453

450454
def visit(self, op):
@@ -561,17 +565,23 @@ def receive_output(self, output_callback, jobout, processStatus):
561565
processStatus = "permanentFail"
562566
output_callback(output, processStatus)
563567

564-
def job(self, joborder, output_callback, **kwargs):
565-
# type: (Dict[Text, Any], Callable[...,Any], **Any) -> Generator
568+
def job(self,
569+
job_order, # type: Dict[Text, Text]
570+
output_callbacks, # type: Callable[[Any, Any], Any]
571+
**kwargs # type: Any
572+
):
573+
# type: (...) -> Generator[Any, None, None]
566574
for i in self.tool["inputs"]:
567575
p = i["id"]
568576
field = shortname(p)
569-
joborder[field] = joborder[i["id"]]
570-
del joborder[i["id"]]
577+
job_order[field] = job_order[i["id"]]
578+
del job_order[i["id"]]
571579

572580
try:
573-
for t in self.embedded_tool.job(joborder,
574-
functools.partial(self.receive_output, output_callback),
581+
for t in self.embedded_tool.job(job_order,
582+
functools.partial(
583+
self.receive_output,
584+
output_callbacks),
575585
**kwargs):
576586
yield t
577587
except WorkflowException:

0 commit comments

Comments
 (0)