Skip to content

Commit 526f36f

Browse files
authored
Fix updater (#1051)
* Internally update and use v1.1 semantics * Simplify interfaces of load_tool methods to make them less error prone * Get loadListing from parameter schema, not inputBinding. * Don't accidentally relocate symlinked files * Fix loadListing on output parameter schema.
1 parent 4c2667e commit 526f36f

23 files changed

+369
-290
lines changed

cwltool/builder.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,7 @@ def __init__(self,
135135
loadListing, # type: Text
136136
outdir, # type: Text
137137
tmpdir, # type: Text
138-
stagedir, # type: Text
139-
cwl_version, # type: Text
138+
stagedir # type: Text
140139
): # type: (...) -> None
141140

142141
self.job = job
@@ -146,7 +145,6 @@ def __init__(self,
146145
self.names = names
147146
self.requirements = requirements
148147
self.hints = hints
149-
self.cwl_version = cwl_version
150148
self.resources = resources
151149
self.mutation_manager = mutation_manager
152150
self.formatgraph = formatgraph
@@ -291,14 +289,10 @@ def _capture_files(f):
291289
if "secondaryFiles" not in datum:
292290
datum["secondaryFiles"] = []
293291
for sf in aslist(schema["secondaryFiles"]):
294-
sf_required = True
295-
if isinstance(sf, MutableMapping) and "pattern" in sf and self.cwl_version in ['v1.1.0-dev1']:
296-
if 'required' in sf:
297-
sf_required = self.do_eval(sf['required'], context=datum)
298-
elif isinstance(sf, string_types):
299-
sf = {"pattern": sf}
292+
if 'required' in sf:
293+
sf_required = self.do_eval(sf['required'], context=datum)
300294
else:
301-
raise validate.ValidationException("Not a secondary file definition: %s" % sf)
295+
sf_required = True
302296

303297
if "$(" in sf["pattern"] or "${" in sf["pattern"]:
304298
sfpath = self.do_eval(sf["pattern"], context=datum)
@@ -318,7 +312,7 @@ def _capture_files(f):
318312
sf_location = datum["location"][0:datum["location"].rindex("/")+1]+sfname
319313
if isinstance(sfname, MutableMapping):
320314
datum["secondaryFiles"].append(sfname)
321-
elif discover_secondaryFiles and os.path.exists(uri_file_path(sf_location)):
315+
elif discover_secondaryFiles and self.fs_access.exists(sf_location):
322316
datum["secondaryFiles"].append({
323317
"location": sf_location,
324318
"basename": sfname,
@@ -341,7 +335,7 @@ def _capture_files(f):
341335
visit_class(datum.get("secondaryFiles", []), ("File", "Directory"), _capture_files)
342336

343337
if schema["type"] == "Directory":
344-
ll = self.loadListing or (binding and binding.get("loadListing"))
338+
ll = schema.get("loadListing") or self.loadListing
345339
if ll and ll != "no_listing":
346340
get_listing(self.fs_access, datum, (ll == "deep_listing"))
347341
self.files.append(datum)

cwltool/checker.py

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,14 @@ def _rec_fields(rec): # type: (MutableMapping[Text, Any]) -> MutableMapping[Tex
126126
return False
127127
return True
128128

129+
def missing_subset(fullset, subset):
130+
# type: (List, List) -> List
131+
missing = []
132+
for i in subset:
133+
if i not in fullset:
134+
missing.append(i)
135+
return missing
136+
129137
def static_checker(workflow_inputs, workflow_outputs, step_inputs, step_outputs, param_to_step):
130138
# type: (List[Dict[Text, Any]], List[Dict[Text, Any]], List[Dict[Text, Any]], List[Dict[Text, Any]], Dict[Text, Dict[Text, Any]]) -> None
131139
"""Check if all source and sink types of a workflow are compatible before run time.
@@ -152,19 +160,18 @@ def static_checker(workflow_inputs, workflow_outputs, step_inputs, step_outputs,
152160
src = warning.src
153161
sink = warning.sink
154162
linkMerge = warning.linkMerge
155-
if sink.get("secondaryFiles") and sorted(
156-
sink.get("secondaryFiles", [])) != sorted(src.get("secondaryFiles", [])):
157-
msg1 = "Sink '%s'" % (shortname(sink["id"]))
158-
msg2 = SourceLine(sink.get("_tool_entry", sink), "secondaryFiles").makeError(
159-
"expects secondaryFiles: %s but" % (sink.get("secondaryFiles")))
160-
if "secondaryFiles" in src:
161-
msg3 = SourceLine(src, "secondaryFiles").makeError(
162-
"source '%s' has secondaryFiles %s." % (shortname(src["id"]), src.get("secondaryFiles")))
163-
else:
164-
msg3 = SourceLine(src, "id").makeError(
165-
"source '%s' does not include secondaryFiles." % (shortname(src["id"])))
166-
msg4 = SourceLine(src, "id").makeError("To fix, add secondaryFiles: %s to definition of '%s'." % (sink.get("secondaryFiles"), shortname(src["id"])))
167-
msg = SourceLine(sink).makeError("%s\n%s" % (msg1, bullets([msg2, msg3, msg4], " ")))
163+
sinksf = sorted([p["pattern"] for p in sink.get("secondaryFiles", []) if p.get("required", True)])
164+
srcsf = sorted([p["pattern"] for p in src.get("secondaryFiles", [])])
165+
# Every secondaryFile required by the sink, should be declared
166+
# by the source
167+
missing = missing_subset(srcsf, sinksf)
168+
if missing:
169+
msg1 = "Parameter '%s' requires secondaryFiles %s but" % (shortname(sink["id"]), missing)
170+
msg3 = SourceLine(src, "id").makeError(
171+
"source '%s' does not provide those secondaryFiles." % (shortname(src["id"])))
172+
msg4 = SourceLine(src.get("_tool_entry", src), "secondaryFiles").makeError("To resolve, add missing secondaryFiles patterns to definition of '%s' or" % (shortname(src["id"])))
173+
msg5 = SourceLine(sink.get("_tool_entry", sink), "secondaryFiles").makeError("mark missing secondaryFiles in definition of '%s' as optional." % shortname(sink["id"]))
174+
msg = SourceLine(sink).makeError("%s\n%s" % (msg1, bullets([msg3, msg4, msg5], " ")))
168175
elif sink.get("not_connected"):
169176
msg = SourceLine(sink, "type").makeError(
170177
"'%s' is not an input parameter of %s, expected %s"

cwltool/command_line_tool.py

Lines changed: 13 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -292,11 +292,7 @@ def job(self,
292292
):
293293
# type: (...) -> Generator[Union[JobBase, CallbackJob], None, None]
294294

295-
require_prefix = ""
296-
if self.metadata["cwlVersion"] == "v1.0":
297-
require_prefix = "http://commonwl.org/cwltool#"
298-
299-
workReuse, _ = self.get_requirement(require_prefix + "WorkReuse")
295+
workReuse, _ = self.get_requirement("WorkReuse")
300296
enableReuse = workReuse.get("enableReuse", True) if workReuse else True
301297

302298
jobname = uniquename(runtimeContext.name or shortname(self.tool.get("id", "job")))
@@ -518,10 +514,7 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
518514
j.tmpdir = builder.tmpdir
519515
j.stagedir = builder.stagedir
520516

521-
inplaceUpdateReq, _ = self.get_requirement("http://commonwl.org/cwltool#InplaceUpdateRequirement")
522-
if not inplaceUpdateReq and self.metadata["cwlVersion"] == "v1.1.0-dev1":
523-
inplaceUpdateReq, _ = self.get_requirement("InplaceUpdateRequirement")
524-
517+
inplaceUpdateReq, _ = self.get_requirement("InplaceUpdateRequirement")
525518
if inplaceUpdateReq is not None:
526519
j.inplace_update = inplaceUpdateReq["inplaceUpdate"]
527520
normalizeFilesDirs(j.generatefiles)
@@ -553,16 +546,14 @@ def register_reader(f):
553546
adjustDirObjs(builder.files, register_reader)
554547
adjustDirObjs(builder.bindings, register_reader)
555548

556-
timelimit, _ = self.get_requirement(require_prefix + "TimeLimit")
549+
timelimit, _ = self.get_requirement("TimeLimit")
557550
if timelimit is not None:
558551
with SourceLine(timelimit, "timelimit", validate.ValidationException, debug):
559552
j.timelimit = builder.do_eval(timelimit["timelimit"])
560553
if not isinstance(j.timelimit, int) or j.timelimit < 0:
561554
raise Exception("timelimit must be an integer >= 0, got: %s" % j.timelimit)
562555

563-
if self.metadata["cwlVersion"] == "v1.0":
564-
j.networkaccess = True
565-
networkaccess, _ = self.get_requirement(require_prefix + "NetworkAccess")
556+
networkaccess, _ = self.get_requirement("NetworkAccess")
566557
if networkaccess is not None:
567558
with SourceLine(networkaccess, "networkAccess", validate.ValidationException, debug):
568559
j.networkaccess = builder.do_eval(networkaccess["networkAccess"])
@@ -711,7 +702,7 @@ def collect_output(self,
711702
rfile = files.copy()
712703
revmap(rfile)
713704
if files["class"] == "Directory":
714-
ll = builder.loadListing or (binding and binding.get("loadListing"))
705+
ll = schema.get("loadListing") or builder.loadListing
715706
if ll and ll != "no_listing":
716707
get_listing(fs_access, files, (ll == "deep_listing"))
717708
else:
@@ -762,34 +753,22 @@ def collect_output(self,
762753
primary.setdefault("secondaryFiles", [])
763754
pathprefix = primary["path"][0:primary["path"].rindex("/")+1]
764755
for sf in aslist(schema["secondaryFiles"]):
765-
if isinstance(sf, MutableMapping) and 'pattern' in sf:
766-
if 'required' in sf:
767-
sf_required = sf['required']
768-
else:
769-
sf_required = False
770-
sf = sf['pattern']
756+
if 'required' in sf:
757+
sf_required = builder.do_eval(sf['required'], context=primary)
771758
else:
772759
sf_required = False
773760

774-
if isinstance(sf, MutableMapping) or "$(" in sf or "${" in sf:
775-
sfpath = builder.do_eval(sf, context=primary)
776-
subst = False
761+
if "$(" in sf["pattern"] or "${" in sf["pattern"]:
762+
sfpath = builder.do_eval(sf["pattern"], context=primary)
777763
else:
778-
if sf.endswith('?') and \
779-
self.metadata['cwlVersion'] in ['v1.1.0-dev1']:
780-
sf_required = False
781-
sf = sf[:-1]
782-
sfpath = sf
783-
subst = True
764+
sfpath = substitute(primary["basename"], sf["pattern"])
765+
784766
for sfitem in aslist(sfpath):
785767
if not sfitem:
786768
continue
787769
if isinstance(sfitem, string_types):
788-
if subst:
789-
sfitem = {"path": substitute(primary["path"], sfitem)}
790-
else:
791-
sfitem = {"path": pathprefix+sfitem}
792-
if not os.path.exists(sfitem['path']) and sf_required:
770+
sfitem = {"path": pathprefix+sfitem}
771+
if not fs_access.exists(sfitem['path']) and sf_required:
793772
raise WorkflowException(
794773
"Missing required secondary file '%s'" % (
795774
sfitem["path"]))

cwltool/context.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ def __init__(self, kwargs=None):
6464
self.host_provenance = False # type: bool
6565
self.user_provenance = False # type: bool
6666
self.prov_obj = None # type: Optional[ProvenanceProfile]
67+
self.do_update = True # type: bool
68+
self.jobdefaults = None # type: Optional[MutableMapping[Text, Any]]
6769

6870
super(LoadingContext, self).__init__(kwargs)
6971

cwltool/executors.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def execute(self,
7777

7878
job_reqs = None
7979
if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
80-
if process.metadata["cwlVersion"] == 'v1.0':
80+
if process.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
8181
raise WorkflowException(
8282
"`cwl:requirements` in the input object is not part of CWL "
8383
"v1.0. You can adjust to use `cwltool:overrides` instead; or you "
@@ -87,7 +87,7 @@ def execute(self,
8787
elif ("cwl:defaults" in process.metadata
8888
and "https://w3id.org/cwl/cwl#requirements"
8989
in process.metadata["cwl:defaults"]):
90-
if process.metadata["cwlVersion"] == 'v1.0':
90+
if process.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
9191
raise WorkflowException(
9292
"`cwl:requirements` in the input object is not part of CWL "
9393
"v1.0. You can adjust to use `cwltool:overrides` instead; or you "

0 commit comments

Comments
 (0)