|
38 | 38 | WorkflowAPIAdapter,
|
39 | 39 | )
|
40 | 40 |
|
41 |
| -from .decoder import set_step_variables |
| 41 | +from .decoder import get_workflow_input_names_for_step, set_step_variables |
42 | 42 |
|
43 | 43 | _LOGGER: logging.Logger = logging.getLogger(__name__)
|
44 | 44 | _LOGGER.setLevel(logging.INFO)
|
@@ -138,7 +138,7 @@ def _handle_workflow_start_message(self, r_wfid: str) -> None:
|
138 | 138 | # Launch the first step.
|
139 | 139 | # If there's a launch problem the step (and running workflow) will have
|
140 | 140 | # and error, stopping it. There will be no Pod event as the launch has failed.
|
141 |
| - self._launch(rwf=rwf_response, rwfs_id=r_wfsid, step=first_step) |
| 141 | + self._launch(wf=wf_response, rwf=rwf_response, rwfs_id=r_wfsid, step=first_step) |
142 | 142 |
|
143 | 143 | def _handle_workflow_stop_message(self, r_wfid: str) -> None:
|
144 | 144 | """Logic to handle a STOP message."""
|
@@ -302,6 +302,7 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
|
302 | 302 | )
|
303 | 303 |
|
304 | 304 | self._launch(
|
| 305 | + wf=wf_response, |
305 | 306 | rwf=rwf_response,
|
306 | 307 | rwfs_id=r_wfsid,
|
307 | 308 | step=next_step,
|
@@ -479,6 +480,7 @@ def _validate_step_command(
|
479 | 480 | def _launch(
|
480 | 481 | self,
|
481 | 482 | *,
|
| 483 | + wf: dict[str, Any], |
482 | 484 | rwf: dict[str, Any],
|
483 | 485 | rwfs_id: str,
|
484 | 486 | step: dict[str, Any],
|
@@ -555,12 +557,19 @@ def _launch(
|
555 | 557 | prior_steps.append(step_response["id"])
|
556 | 558 |
|
557 | 559 | # We must also identify workflow inputs that are required by the step we are
|
558 |
| - # about to launch and pass those using: - |
| 560 | + # about to launch and pass those using a launch parameter. The launcher |
| 561 | + # will ensure these are copied into out instance directory before we are run. |
559 | 562 | #
|
560 | 563 | # 'running_workflow_step_inputs'
|
561 |
| - # A list of string pairs (input filename and output filename) |
| 564 | + # A list of string pairs (input/Project filename and output/Instance filename) |
562 | 565 | # (with relative paths if appropriate.
|
563 |
| - inputs: list[tuple[str, str]] | None = None |
| 566 | + inputs: list[tuple[str, str]] = [] |
| 567 | + for wf_input_name in get_workflow_input_names_for_step(wf, step_name): |
| 568 | + # The variable must be known. |
| 569 | + # It should have been checked by the time we get here! |
| 570 | + assert wf_input_name in variables |
| 571 | + # No name change of inputs in this version |
| 572 | + inputs.append((variables[wf_input_name], variables[wf_input_name])) |
564 | 573 |
|
565 | 574 | lp: LaunchParameters = LaunchParameters(
|
566 | 575 | project_id=project_id,
|
|
0 commit comments