|
| 1 | +import json |
| 2 | +from collections import namedtuple |
| 3 | +import logging |
| 4 | + |
| 5 | +from typing import Any, Callable, Dict, Generator, Iterable, List, Text, Union, cast |
| 6 | +import six |
| 7 | + |
| 8 | +from schema_salad.sourceline import SourceLine, cmap |
| 9 | +import schema_salad.validate as validate |
| 10 | +from .process import shortname |
| 11 | +from .errors import WorkflowException |
| 12 | + |
| 13 | +_logger = logging.getLogger("cwltool") |
| 14 | + |
| 15 | +def _get_type(tp): |
| 16 | + # type: (Any) -> Any |
| 17 | + if isinstance(tp, dict): |
| 18 | + if tp.get("type") not in ("array", "record", "enum"): |
| 19 | + return tp["type"] |
| 20 | + return tp |
| 21 | + |
| 22 | +def check_types(srctype, sinktype, linkMerge, valueFrom): |
| 23 | + # type: (Any, Any, Text, Text) -> Text |
| 24 | + """Check if the source and sink types are "pass", "warning", or "exception". |
| 25 | + """ |
| 26 | + |
| 27 | + if valueFrom: |
| 28 | + return "pass" |
| 29 | + elif not linkMerge: |
| 30 | + if can_assign_src_to_sink(srctype, sinktype, strict=True): |
| 31 | + return "pass" |
| 32 | + elif can_assign_src_to_sink(srctype, sinktype, strict=False): |
| 33 | + return "warning" |
| 34 | + else: |
| 35 | + return "exception" |
| 36 | + elif linkMerge == "merge_nested": |
| 37 | + return check_types({"items": _get_type(srctype), "type": "array"}, _get_type(sinktype), None, None) |
| 38 | + elif linkMerge == "merge_flattened": |
| 39 | + return check_types(merge_flatten_type(_get_type(srctype)), _get_type(sinktype), None, None) |
| 40 | + else: |
| 41 | + raise WorkflowException(u"Unrecognized linkMerge enu_m '%s'" % linkMerge) |
| 42 | + |
| 43 | + |
| 44 | +def merge_flatten_type(src): |
| 45 | + # type: (Any) -> Any |
| 46 | + """Return the merge flattened type of the source type |
| 47 | + """ |
| 48 | + |
| 49 | + if isinstance(src, list): |
| 50 | + return [merge_flatten_type(t) for t in src] |
| 51 | + elif isinstance(src, dict) and src.get("type") == "array": |
| 52 | + return src |
| 53 | + else: |
| 54 | + return {"items": src, "type": "array"} |
| 55 | + |
| 56 | + |
| 57 | +def can_assign_src_to_sink(src, sink, strict=False): # type: (Any, Any, bool) -> bool |
| 58 | + """Check for identical type specifications, ignoring extra keys like inputBinding. |
| 59 | +
|
| 60 | + src: admissible source types |
| 61 | + sink: admissible sink types |
| 62 | +
|
| 63 | + In non-strict comparison, at least one source type must match one sink type. |
| 64 | + In strict comparison, all source types must match at least one sink type. |
| 65 | + """ |
| 66 | + |
| 67 | + if src == "Any" or sink == "Any": |
| 68 | + return True |
| 69 | + if isinstance(src, dict) and isinstance(sink, dict): |
| 70 | + if src["type"] == "array" and sink["type"] == "array": |
| 71 | + return can_assign_src_to_sink(src["items"], sink["items"], strict) |
| 72 | + elif src["type"] == "record" and sink["type"] == "record": |
| 73 | + return _compare_records(src, sink, strict) |
| 74 | + elif src["type"] == "File" and sink["type"] == "File": |
| 75 | + for sinksf in sink.get("secondaryFiles", []): |
| 76 | + if not [1 for srcsf in src.get("secondaryFiles", []) if sinksf == srcsf]: |
| 77 | + if strict: |
| 78 | + return False |
| 79 | + return True |
| 80 | + else: |
| 81 | + return can_assign_src_to_sink(src["type"], sink["type"], strict) |
| 82 | + elif isinstance(src, list): |
| 83 | + if strict: |
| 84 | + for t in src: |
| 85 | + if not can_assign_src_to_sink(t, sink): |
| 86 | + return False |
| 87 | + return True |
| 88 | + else: |
| 89 | + for t in src: |
| 90 | + if can_assign_src_to_sink(t, sink): |
| 91 | + return True |
| 92 | + return False |
| 93 | + elif isinstance(sink, list): |
| 94 | + for t in sink: |
| 95 | + if can_assign_src_to_sink(src, t): |
| 96 | + return True |
| 97 | + return False |
| 98 | + else: |
| 99 | + return src == sink |
| 100 | + |
| 101 | + |
| 102 | +def _compare_records(src, sink, strict=False): |
| 103 | + # type: (Dict[Text, Any], Dict[Text, Any], bool) -> bool |
| 104 | + """Compare two records, ensuring they have compatible fields. |
| 105 | +
|
| 106 | + This handles normalizing record names, which will be relative to workflow |
| 107 | + step, so that they can be compared. |
| 108 | + """ |
| 109 | + |
| 110 | + def _rec_fields(rec): # type: (Dict[Text, Any]) -> Dict[Text, Any] |
| 111 | + out = {} |
| 112 | + for field in rec["fields"]: |
| 113 | + name = shortname(field["name"]) |
| 114 | + out[name] = field["type"] |
| 115 | + return out |
| 116 | + |
| 117 | + srcfields = _rec_fields(src) |
| 118 | + sinkfields = _rec_fields(sink) |
| 119 | + for key in six.iterkeys(sinkfields): |
| 120 | + if (not can_assign_src_to_sink( |
| 121 | + srcfields.get(key, "null"), sinkfields.get(key, "null"), strict) |
| 122 | + and sinkfields.get(key) is not None): |
| 123 | + _logger.info("Record comparison failure for %s and %s\n" |
| 124 | + "Did not match fields for %s: %s and %s" % |
| 125 | + (src["name"], sink["name"], key, srcfields.get(key), |
| 126 | + sinkfields.get(key))) |
| 127 | + return False |
| 128 | + return True |
| 129 | + |
| 130 | +def static_checker(workflow_inputs, workflow_outputs, step_inputs, step_outputs): |
| 131 | + # type: (List[Dict[Text, Any]], List[Dict[Text, Any]], List[Dict[Text, Any]], List[Dict[Text, Any]]) -> None |
| 132 | + """Check if all source and sink types of a workflow are compatible before run time. |
| 133 | + """ |
| 134 | + |
| 135 | + # source parameters: workflow_inputs and step_outputs |
| 136 | + # sink parameters: step_inputs and workflow_outputs |
| 137 | + |
| 138 | + # make a dictionary of source parameters, indexed by the "id" field |
| 139 | + src_parms = workflow_inputs + step_outputs |
| 140 | + src_dict = {} |
| 141 | + for parm in src_parms: |
| 142 | + src_dict[parm["id"]] = parm |
| 143 | + |
| 144 | + step_inputs_val = check_all_types(src_dict, step_inputs, "source") |
| 145 | + workflow_outputs_val = check_all_types(src_dict, workflow_outputs, "outputSource") |
| 146 | + |
| 147 | + warnings = step_inputs_val["warning"] + workflow_outputs_val["warning"] |
| 148 | + exceptions = step_inputs_val["exception"] + workflow_outputs_val["exception"] |
| 149 | + |
| 150 | + warning_msgs = [] |
| 151 | + exception_msgs = [] |
| 152 | + for warning in warnings: |
| 153 | + src = warning.src |
| 154 | + sink = warning.sink |
| 155 | + linkMerge = warning.linkMerge |
| 156 | + msg = SourceLine(src, "type").makeError( |
| 157 | + "Source '%s' of type %s may be incompatible" |
| 158 | + % (shortname(src["id"]), json.dumps(src["type"]))) + "\n" + \ |
| 159 | + SourceLine(sink, "type").makeError( |
| 160 | + " with sink '%s' of type %s" |
| 161 | + % (shortname(sink["id"]), json.dumps(sink["type"]))) |
| 162 | + if linkMerge: |
| 163 | + msg += "\n" + SourceLine(sink).makeError(" source has linkMerge method %s" % linkMerge) |
| 164 | + if sink.get("secondaryFiles") and sorted(sink.get("secondaryFiles",[])) != sorted(src.get("secondaryFiles",[])): |
| 165 | + msg += "\n" + SourceLine(sink.get("_tool_entry", warning.sink), "secondaryFiles").makeError(" sink '%s' expects secondaryFiles %s" % (shortname(sink["id"]), sink.get("secondaryFiles"))) |
| 166 | + msg += "\n" + SourceLine(src.get("_tool_entry", warning.src), "secondaryFiles").makeError(" source '%s' has secondaryFiles %s" % (shortname(src["id"]), src.get("secondaryFiles"))) |
| 167 | + warning_msgs.append(msg) |
| 168 | + for exception in exceptions: |
| 169 | + src = exception.src |
| 170 | + sink = exception.sink |
| 171 | + linkMerge = exception.linkMerge |
| 172 | + msg = SourceLine(src, "type").makeError( |
| 173 | + "Source '%s' of type %s is incompatible" |
| 174 | + % (shortname(src["id"]), json.dumps(src["type"]))) + "\n" + \ |
| 175 | + SourceLine(sink, "type").makeError( |
| 176 | + " with sink '%s' of type %s" |
| 177 | + % (shortname(sink["id"]), json.dumps(sink["type"]))) |
| 178 | + if linkMerge: |
| 179 | + msg += "\n" + SourceLine(sink).makeError(" source has linkMerge method %s" % linkMerge) |
| 180 | + exception_msgs.append(msg) |
| 181 | + |
| 182 | + for sink in step_inputs: |
| 183 | + if ('null' != sink["type"] and 'null' not in sink["type"] |
| 184 | + and "source" not in sink and "default" not in sink and "valueFrom" not in sink): |
| 185 | + msg = SourceLine(sink).makeError( |
| 186 | + "Required parameter '%s' does not have source, default, or valueFrom expression" |
| 187 | + % shortname(sink["id"])) |
| 188 | + exception_msgs.append(msg) |
| 189 | + |
| 190 | + all_warning_msg = "\n".join(warning_msgs) |
| 191 | + all_exception_msg = "\n".join(exception_msgs) |
| 192 | + |
| 193 | + if warnings: |
| 194 | + _logger.warning("Workflow checker warning:\n%s" % all_warning_msg) |
| 195 | + if exceptions: |
| 196 | + raise validate.ValidationException(all_exception_msg) |
| 197 | + |
| 198 | + |
| 199 | +SrcSink = namedtuple("SrcSink", ["src", "sink", "linkMerge"]) |
| 200 | + |
| 201 | +def check_all_types(src_dict, sinks, sourceField): |
| 202 | + # type: (Dict[Text, Any], List[Dict[Text, Any]], Text) -> Dict[Text, List[SrcSink]] |
| 203 | + # sourceField is either "soure" or "outputSource" |
| 204 | + """Given a list of sinks, check if their types match with the types of their sources. |
| 205 | + """ |
| 206 | + |
| 207 | + validation = {"warning": [], "exception": []} # type: Dict[Text, List[SrcSink]] |
| 208 | + for sink in sinks: |
| 209 | + if sourceField in sink: |
| 210 | + valueFrom = sink.get("valueFrom") |
| 211 | + if isinstance(sink[sourceField], list): |
| 212 | + srcs_of_sink = [src_dict[parm_id] for parm_id in sink[sourceField]] |
| 213 | + linkMerge = sink.get("linkMerge", ("merge_nested" |
| 214 | + if len(sink[sourceField]) > 1 else None)) |
| 215 | + else: |
| 216 | + parm_id = sink[sourceField] |
| 217 | + srcs_of_sink = [src_dict[parm_id]] |
| 218 | + linkMerge = None |
| 219 | + for src in srcs_of_sink: |
| 220 | + check_result = check_types(src, sink, linkMerge, valueFrom) |
| 221 | + if check_result == "warning": |
| 222 | + validation["warning"].append(SrcSink(src, sink, linkMerge)) |
| 223 | + elif check_result == "exception": |
| 224 | + validation["exception"].append(SrcSink(src, sink, linkMerge)) |
| 225 | + return validation |
0 commit comments