Skip to content

Commit 71113cb

Browse files
authored
Content loading (#1315)
* Adjusting loadContents and entry behavior * Improve InitialWorkDir logic to handle more values in listing and entry * Make sure IWD listing is normalized before adding to pathmapper. Update 1.2.0-dev4 schema.
1 parent 5a29b07 commit 71113cb

File tree

6 files changed

+292
-124
lines changed

6 files changed

+292
-124
lines changed

cwltool/builder.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,7 @@ def content_limit_respected_read_bytes(f): # type: (IO[bytes]) -> bytes
5353
contents = f.read(CONTENT_LIMIT + 1)
5454
if len(contents) > CONTENT_LIMIT:
5555
raise WorkflowException(
56-
"loadContents handling encountered buffer that is exceeds maximum lenght of %d bytes"
57-
% CONTENT_LIMIT
56+
"file is too large, loadContents limited to %d bytes" % CONTENT_LIMIT
5857
)
5958
return contents
6059

@@ -380,11 +379,26 @@ def _capture_files(f: CWLObjectType) -> CWLObjectType:
380379
if schema["type"] == "File":
381380
datum = cast(CWLObjectType, datum)
382381
self.files.append(datum)
383-
if (binding and binding.get("loadContents")) or schema.get(
384-
"loadContents"
385-
):
386-
with self.fs_access.open(cast(str, datum["location"]), "rb") as f2:
387-
datum["contents"] = content_limit_respected_read(f2)
382+
383+
loadContents_sourceline = (
384+
None
385+
) # type: Union[None, MutableMapping[str, Union[str, List[int]]], CWLObjectType]
386+
if binding and binding.get("loadContents"):
387+
loadContents_sourceline = binding
388+
elif schema.get("loadContents"):
389+
loadContents_sourceline = schema
390+
391+
if loadContents_sourceline and loadContents_sourceline["loadContents"]:
392+
with SourceLine(
393+
loadContents_sourceline, "loadContents", WorkflowException
394+
):
395+
try:
396+
with self.fs_access.open(
397+
cast(str, datum["location"]), "rb"
398+
) as f2:
399+
datum["contents"] = content_limit_respected_read(f2)
400+
except Exception as e:
401+
raise Exception("Reading %s\n%s" % (datum["location"], e))
388402

389403
if "secondaryFiles" in schema:
390404
if "secondaryFiles" not in datum:

cwltool/command_line_tool.py

Lines changed: 161 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,166 @@ def updatePathmap(
461461
os.path.join(outdir, cast(str, fn["basename"])), pathmap, ls
462462
)
463463

464+
def _initialworkdir(self, j: JobBase, builder: Builder) -> None:
465+
initialWorkdir, _ = self.get_requirement("InitialWorkDirRequirement")
466+
if initialWorkdir is None:
467+
return
468+
469+
ls = [] # type: List[CWLObjectType]
470+
if isinstance(initialWorkdir["listing"], str):
471+
# "listing" is just a string (must be an expression) so
472+
# just evaluate it and use the result as if it was in
473+
# listing
474+
ls = cast(List[CWLObjectType], builder.do_eval(initialWorkdir["listing"]))
475+
else:
476+
# "listing" is an array of either expressions or Dirent so
477+
# evaluate each item
478+
for t in cast(
479+
MutableSequence[Union[str, CWLObjectType]], initialWorkdir["listing"],
480+
):
481+
if isinstance(t, Mapping) and "entry" in t:
482+
# Dirent
483+
entry = builder.do_eval(
484+
cast(str, t["entry"]), strip_whitespace=False
485+
)
486+
if entry is None:
487+
continue
488+
489+
if isinstance(entry, MutableSequence):
490+
# Nested list. If it is a list of File or
491+
# Directory objects, add it to the
492+
# file list, otherwise JSON serialize it.
493+
filelist = True
494+
for e in entry:
495+
if not isinstance(e, MutableMapping) or e.get(
496+
"class"
497+
) not in ("File", "Directory"):
498+
filelist = False
499+
break
500+
501+
if filelist:
502+
if "entryname" in t:
503+
raise SourceLine(
504+
t, "entryname", WorkflowException
505+
).makeError(
506+
"'entryname' is invalid when 'entry' returns list of File or Directory"
507+
)
508+
for e in entry:
509+
ec = cast(CWLObjectType, e)
510+
ec["writeable"] = t.get("writable", False)
511+
ls.extend(cast(List[CWLObjectType], entry))
512+
continue
513+
514+
et = {} # type: CWLObjectType
515+
if isinstance(entry, Mapping) and entry.get("class") in (
516+
"File",
517+
"Directory",
518+
):
519+
et["entry"] = entry
520+
else:
521+
et["entry"] = (
522+
entry
523+
if isinstance(entry, str)
524+
else json_dumps(entry, sort_keys=True)
525+
)
526+
527+
if "entryname" in t:
528+
en = builder.do_eval(cast(str, t["entryname"]))
529+
if not isinstance(en, str):
530+
raise SourceLine(
531+
t, "entryname", WorkflowException
532+
).makeError("'entryname' must be a string")
533+
et["entryname"] = en
534+
else:
535+
et["entryname"] = None
536+
et["writable"] = t.get("writable", False)
537+
ls.append(et)
538+
else:
539+
# Expression, must return a Dirent, File, Directory
540+
# or array of such.
541+
initwd_item = builder.do_eval(t)
542+
if not initwd_item:
543+
continue
544+
if isinstance(initwd_item, MutableSequence):
545+
ls.extend(cast(List[CWLObjectType], initwd_item))
546+
else:
547+
ls.append(cast(CWLObjectType, initwd_item))
548+
549+
for i, t2 in enumerate(ls):
550+
if not isinstance(t2, Mapping):
551+
raise SourceLine(
552+
initialWorkdir, "listing", WorkflowException
553+
).makeError(
554+
"Entry at index %s of listing is not a record, was %s"
555+
% (i, type(t2))
556+
)
557+
558+
if "entry" not in t2:
559+
continue
560+
561+
# Dirent
562+
if isinstance(t2["entry"], str):
563+
if not t2["entryname"]:
564+
raise SourceLine(
565+
initialWorkdir, "listing", WorkflowException
566+
).makeError("Entry at index %s of listing missing entryname" % (i))
567+
ls[i] = {
568+
"class": "File",
569+
"basename": t2["entryname"],
570+
"contents": t2["entry"],
571+
"writable": t2.get("writable"),
572+
}
573+
continue
574+
575+
if not isinstance(t2["entry"], Mapping):
576+
raise SourceLine(
577+
initialWorkdir, "listing", WorkflowException
578+
).makeError(
579+
"Entry at index %s of listing is not a record, was %s"
580+
% (i, type(t2["entry"]))
581+
)
582+
583+
if t2["entry"].get("class") not in ("File", "Directory"):
584+
raise SourceLine(
585+
initialWorkdir, "listing", WorkflowException
586+
).makeError(
587+
"Entry at index %s of listing is not a File or Directory object, was %s"
588+
% (i, t2)
589+
)
590+
591+
if t2.get("entryname") or t2.get("writable"):
592+
t2 = copy.deepcopy(t2)
593+
t2entry = cast(CWLObjectType, t2["entry"])
594+
if t2.get("entryname"):
595+
t2entry["basename"] = t2["entryname"]
596+
t2entry["writable"] = t2.get("writable")
597+
598+
ls[i] = cast(CWLObjectType, t2["entry"])
599+
600+
for i, t3 in enumerate(ls):
601+
if t3.get("class") not in ("File", "Directory"):
602+
# Check that every item is a File or Directory object now
603+
raise SourceLine(
604+
initialWorkdir, "listing", WorkflowException
605+
).makeError(
606+
"Entry at index %s of listing is not a Dirent, File or Directory object, was %s"
607+
% (i, t2)
608+
)
609+
610+
normalizeFilesDirs(ls)
611+
612+
j.generatefiles["listing"] = ls
613+
for entry in ls:
614+
self.updatePathmap(
615+
builder.outdir, cast(PathMapper, builder.pathmapper), entry
616+
)
617+
618+
visit_class(
619+
[builder.files, builder.bindings],
620+
("File", "Directory"),
621+
partial(check_adjust, builder),
622+
)
623+
464624
def job(
465625
self,
466626
job_order: CWLObjectType,
@@ -666,64 +826,7 @@ def update_status_output_callback(
666826
[builder.files, builder.bindings], ("File", "Directory"), _check_adjust
667827
)
668828

669-
initialWorkdir, _ = self.get_requirement("InitialWorkDirRequirement")
670-
if initialWorkdir is not None:
671-
ls = [] # type: List[CWLObjectType]
672-
if isinstance(initialWorkdir["listing"], str):
673-
ls = cast(
674-
List[CWLObjectType], builder.do_eval(initialWorkdir["listing"])
675-
)
676-
else:
677-
for t in cast(
678-
MutableSequence[Union[str, CWLObjectType]],
679-
initialWorkdir["listing"],
680-
):
681-
if isinstance(t, Mapping) and "entry" in t:
682-
entry_exp = builder.do_eval(
683-
cast(str, t["entry"]), strip_whitespace=False
684-
)
685-
for entry in aslist(entry_exp):
686-
et = {"entry": entry}
687-
if "entryname" in t:
688-
et["entryname"] = builder.do_eval(
689-
cast(str, t["entryname"])
690-
)
691-
else:
692-
et["entryname"] = None
693-
et["writable"] = t.get("writable", False)
694-
if et["entry"] is not None:
695-
ls.append(et)
696-
else:
697-
initwd_item = builder.do_eval(t)
698-
if not initwd_item:
699-
continue
700-
if isinstance(initwd_item, MutableSequence):
701-
ls.extend(cast(List[CWLObjectType], initwd_item))
702-
else:
703-
ls.append(cast(CWLObjectType, initwd_item))
704-
for i, t2 in enumerate(ls):
705-
if "entry" in t2:
706-
if isinstance(t2["entry"], str):
707-
ls[i] = {
708-
"class": "File",
709-
"basename": t2["entryname"],
710-
"contents": t2["entry"],
711-
"writable": t2.get("writable"),
712-
}
713-
else:
714-
if t2.get("entryname") or t2.get("writable"):
715-
t2 = copy.deepcopy(t2)
716-
t2entry = cast(CWLObjectType, t2["entry"])
717-
if t2.get("entryname"):
718-
t2entry["basename"] = t2["entryname"]
719-
t2entry["writable"] = t2.get("writable")
720-
ls[i] = cast(CWLObjectType, t2["entry"])
721-
j.generatefiles["listing"] = ls
722-
for entry in ls:
723-
self.updatePathmap(builder.outdir, builder.pathmapper, entry)
724-
visit_class(
725-
[builder.files, builder.bindings], ("File", "Directory"), _check_adjust
726-
)
829+
self._initialworkdir(j, builder)
727830

728831
if debug:
729832
_logger.debug(

0 commit comments

Comments
 (0)