Skip to content

Commit 69e39c2

Browse files
authored
Merge pull request #155 from common-workflow-language/fsaccess-refactor
Refactor fs_access abstraction
2 parents 6ccef90 + 431c8fb commit 69e39c2

File tree

4 files changed

+26
-17
lines changed

4 files changed

+26
-17
lines changed

cwltool/cwltest.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,9 @@ def run_test(args, i, t): # type: (argparse.Namespace, Any, Dict[str,str]) -> i
123123
_logger.error(u"""Test failed: %s""", " ".join([pipes.quote(tc) for tc in test_command]))
124124
_logger.error(outstr)
125125
_logger.error(u"Parse error %s", str(e))
126+
except KeyboardInterrupt:
127+
_logger.error(u"""Test interrupted: %s""", " ".join([pipes.quote(tc) for tc in test_command]))
128+
return 1
126129

127130
failed = False
128131

cwltool/draft2tool.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -365,16 +365,17 @@ def collect_output_ports(self, ports, builder, outdir, compute_checksum=True):
365365
# type: (Set[Dict[str,Any]], Builder, str) -> Dict[unicode, Union[unicode, List[Any], Dict[unicode, Any]]]
366366
try:
367367
ret = {} # type: Dict[unicode, Union[unicode, List[Any], Dict[unicode, Any]]]
368-
custom_output = builder.fs_access.join(outdir, "cwl.output.json")
369-
if builder.fs_access.exists(custom_output):
370-
with builder.fs_access.open(custom_output, "r") as f:
368+
fs_access = builder.make_fs_access(outdir)
369+
custom_output = fs_access.join(outdir, "cwl.output.json")
370+
if fs_access.exists(custom_output):
371+
with fs_access.open(custom_output, "r") as f:
371372
ret = json.load(f)
372373
_logger.debug(u"Raw output from %s: %s", custom_output, json.dumps(ret, indent=4))
373374
else:
374375
for port in ports:
375376
fragment = shortname(port["id"])
376377
try:
377-
ret[fragment] = self.collect_output(port, builder, outdir, compute_checksum)
378+
ret[fragment] = self.collect_output(port, builder, outdir, fs_access, compute_checksum=compute_checksum)
378379
except Exception as e:
379380
_logger.debug(u"Error collecting output for parameter '%s'" % shortname(port["id"]), exc_info=e)
380381
raise WorkflowException(u"Error collecting output for parameter '%s': %s" % (shortname(port["id"]), e))
@@ -388,14 +389,14 @@ def collect_output_ports(self, ports, builder, outdir, compute_checksum=True):
388389
adjustDirObjs(ret, remove_path)
389390
normalizeFilesDirs(ret)
390391
if compute_checksum:
391-
adjustFileObjs(ret, partial(compute_checksums, builder.fs_access))
392+
adjustFileObjs(ret, partial(compute_checksums, fs_access))
392393

393394
validate.validate_ex(self.names.get_name("outputs_record_schema", ""), ret)
394395
return ret if ret is not None else {}
395396
except validate.ValidationException as e:
396397
raise WorkflowException("Error validating output record, " + str(e) + "\n in " + json.dumps(ret, indent=4))
397398

398-
def collect_output(self, schema, builder, outdir, compute_checksum=True):
399+
def collect_output(self, schema, builder, outdir, fs_access, compute_checksum=True):
399400
# type: (Dict[str,Any], Builder, str) -> Union[Dict[unicode, Any], List[Union[Dict[unicode, Any], unicode]]]
400401
r = [] # type: List[Any]
401402
if "outputBinding" in schema:
@@ -419,16 +420,16 @@ def collect_output(self, schema, builder, outdir, compute_checksum=True):
419420
raise WorkflowException("glob patterns must not start with '/'")
420421
try:
421422
r.extend([{"location": g,
422-
"class": "File" if builder.fs_access.isfile(g) else "Directory"}
423-
for g in builder.fs_access.glob(builder.fs_access.join(outdir, gb))])
423+
"class": "File" if fs_access.isfile(g) else "Directory"}
424+
for g in fs_access.glob(fs_access.join(outdir, gb))])
424425
except (OSError, IOError) as e:
425426
_logger.warn(str(e))
426427

427428
for files in r:
428429
if files["class"] == "Directory" and "listing" not in files:
429-
getListing(builder.fs_access, files)
430+
getListing(fs_access, files)
430431
else:
431-
with builder.fs_access.open(files["location"], "rb") as f:
432+
with fs_access.open(files["location"], "rb") as f:
432433
contents = ""
433434
if binding.get("loadContents") or compute_checksum:
434435
contents = f.read(CONTENT_LIMIT)
@@ -488,7 +489,7 @@ def collect_output(self, schema, builder, outdir, compute_checksum=True):
488489
sfpath = {"location": substitute(primary["location"], sf), "class": "File"}
489490

490491
for sfitem in aslist(sfpath):
491-
if builder.fs_access.exists(sfitem["location"]):
492+
if fs_access.exists(sfitem["location"]):
492493
primary["secondaryFiles"].append(sfitem)
493494

494495
if not r and optional:
@@ -499,6 +500,6 @@ def collect_output(self, schema, builder, outdir, compute_checksum=True):
499500
out = {}
500501
for f in schema["type"]["fields"]:
501502
out[shortname(f["name"])] = self.collect_output( # type: ignore
502-
f, builder, outdir, compute_checksum)
503+
f, builder, outdir, fs_access, compute_checksum=compute_checksum)
503504
return out
504505
return r

cwltool/main.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,8 @@ def generate_parser(toolparser, tool, namemap):
351351
return toolparser
352352

353353

354-
def load_job_order(args, t, stdin, print_input_deps=False, relative_deps=False, stdout=sys.stdout):
354+
def load_job_order(args, t, stdin, print_input_deps=False, relative_deps=False,
355+
stdout=sys.stdout, make_fs_access=None):
355356
# type: (argparse.Namespace, Process, IO[Any], bool, bool, IO[Any]) -> Union[int,Tuple[Dict[str,Any],str]]
356357

357358
job_order_object = None
@@ -439,7 +440,7 @@ def pathToLoc(p):
439440
adjustFileObjs(job_order_object, pathToLoc)
440441
normalizeFilesDirs(job_order_object)
441442
adjustDirObjs(job_order_object, cast(Callable[..., Any],
442-
functools.partial(getListing, StdFsAccess(input_basedir))))
443+
functools.partial(getListing, make_fs_access(input_basedir))))
443444

444445
if "cwl:tool" in job_order_object:
445446
del job_order_object["cwl:tool"]
@@ -572,7 +573,8 @@ def main(argsl=None,
572573
stdout=sys.stdout,
573574
stderr=sys.stderr,
574575
versionfunc=versionstring,
575-
job_order_object=None):
576+
job_order_object=None,
577+
make_fs_access=StdFsAccess):
576578
# type: (List[str], argparse.Namespace, Callable[..., Union[str, Dict[str, str]]], Callable[..., Process], Callable[[Dict[str, int]], Dict[str, int]], IO[Any], IO[Any], IO[Any], Callable[[], unicode], Union[int, Tuple[Dict[str, Any], str]]) -> int
577579

578580
_logger.removeHandler(defaultStreamHandler)
@@ -695,7 +697,8 @@ def main(argsl=None,
695697
job_order_object = load_job_order(args, tool, stdin,
696698
print_input_deps=args.print_input_deps,
697699
relative_deps=args.relative_deps,
698-
stdout=stdout)
700+
stdout=stdout,
701+
make_fs_access=make_fs_access)
699702

700703
if isinstance(job_order_object, int):
701704
return job_order_object
@@ -714,6 +717,7 @@ def main(argsl=None,
714717
out = executor(tool, job_order_object[0],
715718
makeTool=makeTool,
716719
select_resources=selectResources,
720+
make_fs_access=make_fs_access,
717721
**vars(args))
718722

719723
# This is the workflow output, it needs to be written

cwltool/process.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,8 @@ def _init_job(self, joborder, **kwargs):
428428
builder.tmpdir = os.path.realpath(kwargs.get("tmpdir") or tempfile.mkdtemp())
429429
builder.stagedir = os.path.realpath(kwargs.get("stagedir") or tempfile.mkdtemp())
430430

431-
builder.fs_access = kwargs.get("fs_access") or StdFsAccess(kwargs["basedir"])
431+
builder.make_fs_access = kwargs.get("make_fs_access") or StdFsAccess
432+
builder.fs_access = builder.make_fs_access(kwargs["basedir"])
432433

433434
if self.formatgraph:
434435
for i in self.tool["inputs"]:

0 commit comments

Comments
 (0)