Skip to content

Commit fe6b2ea

Browse files
authored
Merge pull request #988 from common-workflow-language/less-deep-copy
Eliminate unnecessary deep copies of job order objects.
2 parents cd1ba3d + bfbba8c commit fe6b2ea

File tree

5 files changed

+28
-25
lines changed

5 files changed

+28
-25
lines changed

cwltool/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
from __future__ import absolute_import
2-
__author__ = 'peter.amstutz@curoverse.com'
2+
__author__ = 'pamstutz@veritasgenetics.com'

cwltool/argparser.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,8 +270,9 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
270270
dest="ga4gh_tool_registries", default=[])
271271

272272
parser.add_argument("--on-error",
273-
help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
274-
"Default is 'stop'.", default="stop", choices=("stop", "continue"))
273+
help="Desired workflow behavior when a step fails. One of 'stop' (do not submit any more steps) or "
274+
"'continue' (may submit other steps that are not downstream from the error). Default is 'stop'.",
275+
default="stop", choices=("stop", "continue"))
275276

276277
exgroup = parser.add_mutually_exclusive_group()
277278
exgroup.add_argument("--compute-checksum", action="store_true", default=True,

cwltool/command_line_tool.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import tempfile
1313
import threading
1414
from functools import cmp_to_key, partial
15-
from typing import (Any, Callable, Dict, Generator, List, MutableMapping,
15+
from typing import (Any, Callable, Dict, Generator, List, Mapping, MutableMapping,
1616
MutableSequence, Optional, Set, Union, cast)
1717

1818
import shellescape
@@ -103,7 +103,7 @@ def run(self, runtimeContext): # type: (RuntimeContext) -> None
103103
self.output_callback({}, "permanentFail")
104104

105105
def job(self,
106-
job_order, # type: MutableMapping[Text, Text]
106+
job_order, # type: Mapping[Text, Text]
107107
output_callbacks, # type: Callable[[Any, Any], Any]
108108
runtimeContext # type: RuntimeContext
109109
):
@@ -276,7 +276,7 @@ def updatePathmap(self, outdir, pathmap, fn):
276276
self.updatePathmap(os.path.join(outdir, fn["basename"]), pathmap, ls)
277277

278278
def job(self,
279-
job_order, # type: MutableMapping[Text, Text]
279+
job_order, # type: Mapping[Text, Text]
280280
output_callbacks, # type: Callable[[Any, Any], Any]
281281
runtimeContext # RuntimeContext
282282
):
@@ -403,7 +403,7 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
403403
self.tool.get("id", ""),
404404
u" as part of %s" % runtimeContext.part_of
405405
if runtimeContext.part_of else "")
406-
_logger.debug(u"[job %s] %s", j.name, json_dumps(job_order,
406+
_logger.debug(u"[job %s] %s", j.name, json_dumps(builder.job,
407407
indent=4))
408408

409409
builder.pathmapper = self.make_path_mapper(

cwltool/process.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from collections import Iterable # pylint: disable=unused-import
1717
from io import open
1818
from typing import (Any, Callable, Dict, Generator, Iterator, List,
19-
MutableMapping, MutableSequence, Optional, Set, Tuple,
19+
Mapping, MutableMapping, MutableSequence, Optional, Set, Tuple,
2020
Union, cast)
2121

2222
from pkg_resources import resource_stream
@@ -363,7 +363,8 @@ def cleanIntermediate(output_dirs): # type: (Iterable[Text]) -> None
363363
def add_sizes(fsaccess, obj): # type: (StdFsAccess, Dict[Text, Any]) -> None
364364
if 'location' in obj:
365365
try:
366-
obj["size"] = fsaccess.size(obj["location"])
366+
if "size" not in obj:
367+
obj["size"] = fsaccess.size(obj["location"])
367368
except OSError:
368369
pass
369370
elif 'contents' in obj:
@@ -587,7 +588,7 @@ def __init__(self,
587588
var_spool_cwl_detector(self.tool)
588589

589590
def _init_job(self, joborder, runtime_context):
590-
# type: (MutableMapping[Text, Text], RuntimeContext) -> Builder
591+
# type: (Mapping[Text, Text], RuntimeContext) -> Builder
591592

592593
job = cast(Dict[Text, Union[Dict[Text, Any], List[Any], Text, None]],
593594
copy.deepcopy(joborder))
@@ -791,7 +792,7 @@ def visit(self, op): # type: (Callable[[MutableMapping[Text, Any]], None]) -> N
791792

792793
@abc.abstractmethod
793794
def job(self,
794-
job_order, # type: MutableMapping[Text, Text]
795+
job_order, # type: Mapping[Text, Text]
795796
output_callbacks, # type: Callable[[Any, Any], Any]
796797
runtimeContext # type: RuntimeContext
797798
): # type: (...) -> Generator[Any, None, None]

cwltool/workflow.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import tempfile
99
from collections import namedtuple
1010
from typing import (Any, Callable, Dict, Generator, Iterable, List,
11-
MutableMapping, MutableSequence, Optional, Tuple, Union)
11+
Mapping, MutableMapping, MutableSequence, Optional, Tuple, Union)
1212
from uuid import UUID # pylint: disable=unused-import
1313

1414
from ruamel.yaml.comments import CommentedMap
@@ -165,7 +165,7 @@ def object_from_state(state, # Dict[Text, WorkflowStateItem]
165165
return None
166166

167167
if inputobj.get(iid) is None and "default" in inp:
168-
inputobj[iid] = copy.deepcopy(inp["default"])
168+
inputobj[iid] = inp["default"]
169169

170170
if iid not in inputobj and ("valueFrom" in inp or incomplete):
171171
inputobj[iid] = None
@@ -189,7 +189,7 @@ def __init__(self, step):
189189
self.parent_wf = step.parent_wf
190190

191191
def job(self,
192-
joborder, # type: MutableMapping[Text, Text]
192+
joborder, # type: Mapping[Text, Text]
193193
output_callback, # type: functools.partial[None]
194194
runtimeContext # type: RuntimeContext
195195
):
@@ -415,7 +415,7 @@ def run(self, runtimeContext):
415415
_logger.info(u"[%s] start", self.name)
416416

417417
def job(self,
418-
joborder, # type: MutableMapping[Text, Any]
418+
joborder, # type: Mapping[Text, Any]
419419
output_callback, # type: Callable[[Any, Any], Any]
420420
runtimeContext # type: RuntimeContext
421421
): # type: (...) -> Generator
@@ -434,10 +434,10 @@ def job(self,
434434
inp_id = shortname(inp["id"])
435435
if inp_id in joborder:
436436
self.state[inp["id"]] = WorkflowStateItem(
437-
inp, copy.deepcopy(joborder[inp_id]), "success")
437+
inp, joborder[inp_id], "success")
438438
elif "default" in inp:
439439
self.state[inp["id"]] = WorkflowStateItem(
440-
inp, copy.deepcopy(inp["default"]), "success")
440+
inp, inp["default"], "success")
441441
else:
442442
raise WorkflowException(
443443
u"Input '%s' not in input object and does not have a "
@@ -565,7 +565,7 @@ def make_workflow_step(self,
565565
return WorkflowStep(toolpath_object, pos, loadingContext, parentworkflowProv)
566566

567567
def job(self,
568-
job_order, # type: MutableMapping[Text, Text]
568+
job_order, # type: Mapping[Text, Text]
569569
output_callbacks, # type: Callable[[Any, Any], Any]
570570
runtimeContext # type: RuntimeContext
571571
): # type: (...) -> Generator[Any, None, None]
@@ -761,7 +761,7 @@ def receive_output(self, output_callback, jobout, processStatus):
761761
output_callback(output, processStatus)
762762

763763
def job(self,
764-
job_order, # type: MutableMapping[Text, Text]
764+
job_order, # type: Mapping[Text, Text]
765765
output_callbacks, # type: Callable[[Any, Any], Any]
766766
runtimeContext, # type: RuntimeContext
767767
): # type: (...) -> Generator[Any, None, None]
@@ -775,15 +775,16 @@ def job(self,
775775
self.prov_obj.start_process(
776776
process_name, datetime.datetime.now(),
777777
self.embedded_tool.provenance_object.workflow_run_uri)
778+
779+
step_input = {}
778780
for inp in self.tool["inputs"]:
779781
field = shortname(inp["id"])
780782
if not inp.get("not_connected"):
781-
job_order[field] = job_order[inp["id"]]
782-
del job_order[inp["id"]]
783+
step_input[field] = job_order[inp["id"]]
783784

784785
try:
785786
for tool in self.embedded_tool.job(
786-
job_order,
787+
step_input,
787788
functools.partial(self.receive_output, output_callbacks),
788789
runtimeContext):
789790
yield tool
@@ -878,7 +879,7 @@ def dotproduct_scatter(process, # type: WorkflowJobStep
878879

879880
steps = []
880881
for index in range(0, jobl):
881-
sjobo = copy.deepcopy(joborder)
882+
sjobo = copy.copy(joborder)
882883
for key in scatter_keys:
883884
sjobo[key] = joborder[key][index]
884885

@@ -909,7 +910,7 @@ def nested_crossproduct_scatter(process, # type: WorkflowJobStep
909910

910911
steps = []
911912
for index in range(0, jobl):
912-
sjob = copy.deepcopy(joborder)
913+
sjob = copy.copy(joborder)
913914
sjob[scatter_key] = joborder[scatter_key][index]
914915

915916
if len(scatter_keys) == 1:
@@ -967,7 +968,7 @@ def _flat_crossproduct_scatter(process, # type: WorkflowJobStep
967968
steps = []
968969
put = startindex
969970
for index in range(0, jobl):
970-
sjob = copy.deepcopy(joborder)
971+
sjob = copy.copy(joborder)
971972
sjob[scatter_key] = joborder[scatter_key][index]
972973

973974
if len(scatter_keys) == 1:

0 commit comments

Comments
 (0)