|
25 | 25 |
|
26 | 26 |
|
27 | 27 | @dataclass
|
28 |
| -class Translation: |
29 |
| - """A source ("in_") to destination ("out") variable map.""" |
| 28 | +class Connector: |
| 29 | + """A connection - connexts a plumbing source variable ("in_") |
| 30 | + to destination variable ("out").""" |
30 | 31 |
|
31 | 32 | in_: str
|
32 | 33 | out: str
|
@@ -83,13 +84,13 @@ def get_description(definition: dict[str, Any]) -> str | None:
|
83 | 84 |
|
84 | 85 | def get_workflow_variable_names(definition: dict[str, Any]) -> set[str]:
|
85 | 86 | """Given a Workflow definition this function returns all the names of the
|
86 |
| - variables that need to be defined at the workflow level. These are the 'variables' |
87 |
| - used in every steps' variabale-mapping block. |
| 87 | + variables defined in steps that need to be defined at the workflow level. |
| 88 | + These are the 'variables' used in every step's 'plumbing' block. |
88 | 89 | """
|
89 | 90 | wf_variable_names: set[str] = set()
|
90 | 91 | steps: list[dict[str, Any]] = get_steps(definition)
|
91 | 92 | for step in steps:
|
92 |
| - if v_map := step.get("variable-mapping"): |
| 93 | + if v_map := step.get("plumbing"): |
93 | 94 | for v in v_map:
|
94 | 95 | if "from-workflow" in v:
|
95 | 96 | wf_variable_names.add(v["from-workflow"]["variable"])
|
@@ -126,40 +127,38 @@ def get_step_input_variable_names(
|
126 | 127 | return variable_names
|
127 | 128 |
|
128 | 129 |
|
129 |
| -def get_step_workflow_variable_mapping(*, step: dict[str, Any]) -> list[Translation]: |
| 130 | +def get_step_workflow_plumbing(*, step: dict[str, Any]) -> list[Connector]: |
130 | 131 | """Returns a list of workflow vaiable name to step variable name
|
131 | 132 | Translation objects for the given step."""
|
132 |
| - variable_mapping: list[Translation] = [] |
133 |
| - if "variable-mapping" in step: |
134 |
| - for v_map in step["variable-mapping"]: |
| 133 | + variable_mapping: list[Connector] = [] |
| 134 | + if "plumbing" in step: |
| 135 | + for v_map in step["plumbing"]: |
135 | 136 | if "from-workflow" in v_map:
|
136 | 137 | variable_mapping.append(
|
137 |
| - Translation( |
| 138 | + Connector( |
138 | 139 | in_=v_map["from-workflow"]["variable"], out=v_map["variable"]
|
139 | 140 | )
|
140 | 141 | )
|
141 | 142 | return variable_mapping
|
142 | 143 |
|
143 | 144 |
|
144 |
| -def get_step_prior_step_variable_mapping( |
145 |
| - *, step: dict[str, Any] |
146 |
| -) -> dict[str, list[Translation]]: |
| 145 | +def get_step_prior_step_plumbing(*, step: dict[str, Any]) -> dict[str, list[Connector]]: |
147 | 146 | """Returns list of Translation objects, indexed by prior step name,
|
148 | 147 | that identify source step (output) variable name to this step's (input)
|
149 | 148 | variable name."""
|
150 |
| - variable_mapping: dict[str, list[Translation]] = {} |
151 |
| - if "variable-mapping" in step: |
152 |
| - for v_map in step["variable-mapping"]: |
| 149 | + variable_mapping: dict[str, list[Connector]] = {} |
| 150 | + if "plumbing" in step: |
| 151 | + for v_map in step["plumbing"]: |
153 | 152 | if "from-step" in v_map:
|
154 | 153 | step_name = v_map["from-step"]["name"]
|
155 | 154 | step_variable = v_map["from-step"]["variable"]
|
156 | 155 | # Tuple is "from" -> "to"
|
157 | 156 | if step_name in variable_mapping:
|
158 | 157 | variable_mapping[step_name].append(
|
159 |
| - Translation(in_=step_variable, out=v_map["variable"]) |
| 158 | + Connector(in_=step_variable, out=v_map["variable"]) |
160 | 159 | )
|
161 | 160 | else:
|
162 | 161 | variable_mapping[step_name] = [
|
163 |
| - Translation(in_=step_variable, out=v_map["variable"]) |
| 162 | + Connector(in_=step_variable, out=v_map["variable"]) |
164 | 163 | ]
|
165 | 164 | return variable_mapping
|
0 commit comments