Skip to content

Commit 10c3259

Browse files
author
Alan Christie
committed
feat: Add outputs to step prepration response
1 parent a1d1fea commit 10c3259

File tree

4 files changed

+37
-52
lines changed

4 files changed

+37
-52
lines changed

workflow/decoder.py

Lines changed: 8 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from dataclasses import dataclass
88
from typing import Any
99

10+
import decoder.decoder as job_defintion_decoder
1011
import jsonschema
1112
import yaml
1213

@@ -82,6 +83,13 @@ def get_description(definition: dict[str, Any]) -> str | None:
8283
return definition.get("description")
8384

8485

86+
def is_workflow_output_variable(definition: dict[str, Any], variable_name: str) -> bool:
87+
"""True if the variable name is in the workflow variables outputs list."""
88+
# We can safely pass on the workflow defitnion as its
89+
# root-level 'variables' block complies with job-defintion variables.
90+
return variable_name in job_defintion_decoder.get_outputs(definition)
91+
92+
8593
def get_workflow_variable_names(definition: dict[str, Any]) -> set[str]:
8694
"""Given a Workflow definition this function returns all the names of the
8795
variables defined in steps that need to be defined at the workflow level.
@@ -97,40 +105,6 @@ def get_workflow_variable_names(definition: dict[str, Any]) -> set[str]:
97105
return wf_variable_names
98106

99107

100-
def get_step_output_variable_names(
101-
definition: dict[str, Any], step_name: str
102-
) -> list[str]:
103-
"""Given a Workflow definition and a Step name this function returns all the names
104-
of the output variables defined at the Step level. These are the names
105-
of variables that have files assocaited with them that need copying to
106-
the Project directory (from the Instance)."""
107-
variable_names: list[str] = []
108-
steps: list[dict[str, Any]] = get_steps(definition)
109-
for step in steps:
110-
if step["name"] == step_name and "plumbing" in step:
111-
for v_map in step["plumbing"]:
112-
if "to-project" in v_map:
113-
variable_names.append(v_map["variable"])
114-
return variable_names
115-
116-
117-
def get_step_input_variable_names(
118-
definition: dict[str, Any], step_name: str
119-
) -> list[str]:
120-
"""Given a Workflow definition and a Step name this function returns all the names
121-
of the input variables defined at the Step level. These are the names
122-
of variables that have files assocaited with them that need copying to
123-
the Instance directory (from the Project)."""
124-
variable_names: list[str] = []
125-
steps: list[dict[str, Any]] = get_steps(definition)
126-
for step in steps:
127-
if step["name"] == step_name and "plumbing" in step:
128-
for v_map in step["plumbing"]:
129-
if "from-project" in v_map:
130-
variable_names.append(v_map["variable"])
131-
return variable_names
132-
133-
134108
def get_step_workflow_variable_connections(
135109
*, step_definition: dict[str, Any]
136110
) -> list[Connector]:

workflow/workflow_abc.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ class LaunchParameters:
5858
# These are required so that the step can access project files.
5959
# It is a set project-relative filenames (or directroies).
6060
step_project_inputs: set[str] | None = None
61+
# A set of step instance files that are expected to be hard-linked
62+
# into the surrounding Project directory.
63+
# It is a set instance-relative filenames (or directroies).
64+
step_project_outputs: set[str] | None = None
6165
# The application ID (a custom resource name)
6266
# used to identify the 'type' of Instance to create.
6367
# For DM Jobs this will be 'datamanagerjobs.squonk.it'

workflow/workflow_engine.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
get_step_predefined_variable_connections,
4747
get_step_prior_step_connections,
4848
get_step_workflow_variable_connections,
49+
is_workflow_output_variable,
4950
)
5051

5152
_LOGGER: logging.Logger = logging.getLogger(__name__)
@@ -56,12 +57,13 @@
5657
@dataclass
5758
class StepPreparationResponse:
5859
"""Step preparation response object. 'replicas' is +ve (non-zero) if a step
59-
can be launched - it's value indicates how many times. If a step can be launched
60+
can be launched - its value indicates how many times. If a step can be launched
6061
'variables' will not be None. If a parallel set of steps can take place
6162
(even just one) 'replica_variable' will be set and 'replica_values'
6263
will be a list containing a value for each step instance. If the step
6364
depends on a prior step the instance UUIDs of the steps will be listed
64-
in the 'dependent_instances' string list.
65+
in the 'dependent_instances' string list. If a step's outputs (files) are expected
66+
in the project directory they will be listed in 'outputs'.
6567
6668
If preparation fails 'error_num' wil be set, and 'error_msg'
6769
should contain something useful."""
@@ -71,6 +73,7 @@ class StepPreparationResponse:
7173
replica_variable: str | None = None
7274
replica_values: list[str] | None = None
7375
dependent_instances: set[str] | None = None
76+
outputs: set[str] | None = None
7477
error_num: int = 0
7578
error_msg: str | None = None
7679

@@ -479,6 +482,13 @@ def _prepare_step(
479482
# I think we can start this step,
480483
# so compile a set of variables for it.
481484

485+
# Outputs - a list of step files that are outputs,
486+
# and also designated as workflow outputs.
487+
# Any step can write files to the Projetc directory
488+
# but only job outputs that are also workflow outputs
489+
# are put in this list.
490+
outputs: set[str] = set()
491+
482492
# Start with any variables provided in the step's specification.
483493
# A map that we will add to (and maybe even over-write)...
484494
variables: dict[str, Any] = step_definition["specification"].get(
@@ -498,6 +508,8 @@ def _prepare_step(
498508
):
499509
assert connector.in_ in rwf_variables
500510
variables[connector.out] = rwf_variables[connector.in_]
511+
if is_workflow_output_variable(wf, connector.in_):
512+
outputs.add(rwf_variables[connector.in_])
501513

502514
# Process the step's "plumbing" relating to pre-defined variables.
503515
for connector in get_step_predefined_variable_connections(
@@ -607,6 +619,7 @@ def _prepare_step(
607619
replica_variable=iter_variable,
608620
replica_values=iter_values,
609621
dependent_instances=dependent_instances,
622+
outputs=outputs,
610623
)
611624

612625
def _launch(
@@ -623,6 +636,16 @@ def _launch(
623636
rwf_id: str = rwf["id"]
624637
project_id = rwf["project"]["id"]
625638

639+
_LOGGER.info("SPR.variable=%s", step_preparation_response.variables)
640+
_LOGGER.info(
641+
"SPR.replica_variable=%s", step_preparation_response.replica_variable
642+
)
643+
_LOGGER.info("SPR.replica_values=%s", step_preparation_response.replica_values)
644+
_LOGGER.info(
645+
"SPR.dependent_instances=%s", step_preparation_response.dependent_instances
646+
)
647+
_LOGGER.info("SPR.outputs=%s", step_preparation_response.outputs)
648+
626649
# Total replicas must be 1 or more
627650
total_replicas: int = step_preparation_response.replicas
628651
assert total_replicas >= 1

workflow/workflow_validator.py

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from typing import Any
66

77
from .decoder import (
8-
get_step_output_variable_names,
98
get_steps,
109
get_workflow_variable_names,
1110
validate_schema,
@@ -86,26 +85,11 @@ def _validate_tag_level(
8685
# and all the output variable names in the step are unique.
8786
duplicate_names: set[str] = set()
8887
all_step_names: set[str] = set()
89-
variable_names: set[str] = set()
9088
for step in get_steps(workflow_definition):
9189
step_name: str = step["name"]
9290
if step_name not in duplicate_names and step_name in all_step_names:
9391
duplicate_names.add(step_name)
9492
all_step_names.add(step_name)
95-
# Are output variable names unique?
96-
variable_names.clear()
97-
step_variables: list[str] = get_step_output_variable_names(
98-
workflow_definition, step_name
99-
)
100-
for step_variable in step_variables:
101-
if step_variable in variable_names:
102-
return ValidationResult(
103-
error_num=3,
104-
error_msg=[
105-
f"Duplicate step output variable: {step_variable} (step={step_name})"
106-
],
107-
)
108-
variable_names.add(step_variable)
10993
if duplicate_names:
11094
return ValidationResult(
11195
error_num=2,

0 commit comments

Comments
 (0)