Skip to content

Commit 0920752

Browse files
authored
Merge pull request #132 from common-workflow-language/curoverse-cwltool-collect-output-checksum-flag
Curoverse cwltool collect output checksum flag
2 parents b6553a9 + dcf9183 commit 0920752

File tree

2 files changed

+27
-14
lines changed

2 files changed

+27
-14
lines changed

cwltool/draft2tool.py

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,9 @@ def __init__(self, job, output_callback, cachebuilder, jobcache):
108108
def run(self, **kwargs):
109109
# type: (**Any) -> None
110110
self.output_callback(self.job.collect_output_ports(self.job.tool["outputs"],
111-
self.cachebuilder, self.outdir),
111+
self.cachebuilder,
112+
self.outdir,
113+
kwargs.get("compute_checksum", True)),
112114
"success")
113115

114116
# map files to assigned path inside a container. We need to also explicitly
@@ -337,12 +339,12 @@ def rm_pending_output_callback(output_callback, jobcachepending,
337339

338340
j.pathmapper = builder.pathmapper
339341
j.collect_outputs = partial(
340-
self.collect_output_ports, self.tool["outputs"], builder)
342+
self.collect_output_ports, self.tool["outputs"], builder, compute_checksum=kwargs.get("compute_checksum", True))
341343
j.output_callback = output_callback
342344

343345
yield j
344346

345-
def collect_output_ports(self, ports, builder, outdir):
347+
def collect_output_ports(self, ports, builder, outdir, compute_checksum=True):
346348
# type: (Set[Dict[str,Any]], Builder, str) -> Dict[unicode, Union[unicode, List[Any], Dict[unicode, Any]]]
347349
try:
348350
ret = {} # type: Dict[unicode, Union[unicode, List[Any], Dict[unicode, Any]]]
@@ -364,7 +366,7 @@ def collect_output_ports(self, ports, builder, outdir):
364366
for port in ports:
365367
fragment = shortname(port["id"])
366368
try:
367-
ret[fragment] = self.collect_output(port, builder, outdir)
369+
ret[fragment] = self.collect_output(port, builder, outdir, compute_checksum)
368370
except Exception as e:
369371
_logger.debug(u"Error collecting output for parameter '%s'" % shortname(port["id"]), exc_info=e)
370372
raise WorkflowException(u"Error collecting output for parameter '%s': %s" % (shortname(port["id"]), e))
@@ -377,7 +379,7 @@ def collect_output_ports(self, ports, builder, outdir):
377379
except validate.ValidationException as e:
378380
raise WorkflowException("Error validating output record, " + str(e) + "\n in " + json.dumps(ret, indent=4))
379381

380-
def collect_output(self, schema, builder, outdir):
382+
def collect_output(self, schema, builder, outdir, compute_checksum=True):
381383
# type: (Dict[str,Any], Builder, str) -> Union[Dict[unicode, Any], List[Union[Dict[unicode, Any], unicode]]]
382384
r = [] # type: List[Any]
383385
if "outputBinding" in schema:
@@ -410,17 +412,20 @@ def collect_output(self, schema, builder, outdir):
410412
if files["class"] == "Directory" and "listing" not in files:
411413
getListing(builder.fs_access, files)
412414
else:
413-
checksum = hashlib.sha1()
414415
with builder.fs_access.open(files["location"], "rb") as f:
415-
contents = f.read(CONTENT_LIMIT)
416+
contents = ""
417+
if binding.get("loadContents") or compute_checksum:
418+
contents = f.read(CONTENT_LIMIT)
416419
if binding.get("loadContents"):
417420
files["contents"] = contents
418-
filesize = 0
419-
while contents != "":
420-
checksum.update(contents)
421-
filesize += len(contents)
422-
contents = f.read(1024*1024)
423-
files["checksum"] = "sha1$%s" % checksum.hexdigest()
421+
if compute_checksum:
422+
checksum = hashlib.sha1()
423+
while contents != "":
424+
checksum.update(contents)
425+
contents = f.read(1024*1024)
426+
files["checksum"] = "sha1$%s" % checksum.hexdigest()
427+
f.seek(0, 2)
428+
filesize = f.tell()
424429
files["size"] = filesize
425430
if "format" in schema:
426431
files["format"] = builder.do_eval(schema["format"], context=files)
@@ -478,6 +483,6 @@ def collect_output(self, schema, builder, outdir):
478483
out = {}
479484
for f in schema["type"]["fields"]:
480485
out[shortname(f["name"])] = self.collect_output( # type: ignore
481-
f, builder, outdir)
486+
f, builder, outdir, compute_checksum)
482487
return out
483488
return r

cwltool/main.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,14 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
158158
help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
159159
"Default is 'stop.", default="stop")
160160

161+
exgroup = parser.add_mutually_exclusive_group()
162+
exgroup.add_argument("--compute-checksum", action="store_true", default=True,
163+
help="Compute checksum of contents while collecting outputs",
164+
dest="compute_checksum")
165+
exgroup.add_argument("--no-compute-checksum", action="store_false",
166+
help="Do not compute checksum of contents while collecting outputs",
167+
dest="compute_checksum")
168+
161169
parser.add_argument("workflow", type=str, nargs="?", default=None)
162170
parser.add_argument("job_order", nargs=argparse.REMAINDER)
163171

0 commit comments

Comments
 (0)