Skip to content

Commit 594c0e8

Browse files
author
Alan Christie
committed
dev: Engine now collects inputs and sets LP inputs & outputs
1 parent d37caf0 commit 594c0e8

File tree

2 files changed

+17
-0
lines changed

2 files changed

+17
-0
lines changed

workflow/decoder.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,13 @@ def is_workflow_output_variable(definition: dict[str, Any], variable_name: str)
103103
return variable_name in job_definition_decoder.get_outputs(definition)
104104

105105

106+
def is_workflow_input_variable(definition: dict[str, Any], variable_name: str) -> bool:
107+
"""True if the variable name is in the workflow variables inputs list."""
108+
# We can safely pass on the workflow definition as its
109+
# root-level 'variables' block complies with job-definition variables.
110+
return variable_name in job_definition_decoder.get_inputs(definition)
111+
112+
106113
def get_workflow_variable_names(definition: dict[str, Any]) -> set[str]:
107114
"""Given a Workflow definition this function returns all the names of the
108115
variables defined in steps that need to be defined at the workflow level.

workflow/workflow_engine.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
get_step_predefined_variable_connections,
9393
get_step_prior_step_connections,
9494
get_step_workflow_variable_connections,
95+
is_workflow_input_variable,
9596
is_workflow_output_variable,
9697
)
9798

@@ -120,6 +121,7 @@ class StepPreparationResponse:
120121
replica_values: list[str] | None = None
121122
dependent_instances: set[str] | None = None
122123
outputs: set[str] | None = None
124+
inputs: set[str] | None = None
123125
error_num: int = 0
124126
error_msg: str | None = None
125127

@@ -545,6 +547,9 @@ def _prepare_step(
545547
# (and all the dependent instances have completed successfully).
546548
# We can now compile a set of variables for it.
547549

550+
# Outputs - a list of step files that are workflow inputs.
551+
# These are project files that are copied into the step instance.
552+
inputs: set[str] = set()
548553
# Outputs - a list of step files that are workflow outputs.
549554
# Any step can write files to the Project directory
550555
# but this only consists of job outputs that are also workflow outputs.
@@ -575,6 +580,8 @@ def _prepare_step(
575580
prime_variables[connector.out] = rwf_variables[connector.in_]
576581
if is_workflow_output_variable(wf, connector.in_):
577582
outputs.add(rwf_variables[connector.in_])
583+
elif is_workflow_input_variable(wf, connector.in_):
584+
inputs.add(rwf_variables[connector.in_])
578585

579586
# Add any pre-defined variables used in the step's "plumbing"
580587
for connector in get_step_predefined_variable_connections(
@@ -705,6 +712,7 @@ def _prepare_step(
705712
replica_values=iter_values,
706713
dependent_instances=dependent_instances,
707714
outputs=outputs,
715+
inputs=inputs,
708716
)
709717

710718
def _launch(
@@ -780,6 +788,8 @@ def _launch(
780788
step_replication_number=replica,
781789
total_number_of_replicas=total_replicas,
782790
step_dependent_instances=step_preparation_response.dependent_instances,
791+
step_project_inputs=step_preparation_response.inputs,
792+
step_project_outputs=step_preparation_response.outputs,
783793
)
784794
lr: LaunchResult = self._instance_launcher.launch(launch_parameters=lp)
785795
rwfs_id = lr.running_workflow_step_id

0 commit comments

Comments
 (0)