Skip to content

Commit c968555

Browse files
Merge pull request #9 from InformaticsMatters/prototype-work-alan
More work towards variables
2 parents 5860b72 + 96a2e7e commit c968555

File tree

3 files changed

+48
-16
lines changed

3 files changed

+48
-16
lines changed

tests/test_workflow_engine_examples.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@
1616
from tests.message_queue import UnitTestMessageQueue
1717
from tests.wapi_adapter import UnitTestWorkflowAPIAdapter
1818
from workflow.workflow_engine import WorkflowEngine
19+
from workflow.workflow_validator import (
20+
ValidationLevel,
21+
ValidationResult,
22+
WorkflowValidator,
23+
)
1924

2025

2126
@pytest.fixture
@@ -55,8 +60,9 @@ def start_workflow(
5560

5661
# To start a workflow we need to:
5762
# 1. Load and create a Workflow Definition
58-
# 2. Create a Running Workflow record
59-
# 3. Send a Workflow START message
63+
# 2. Validate the workflow for running
64+
# 3. Create a Running Workflow record
65+
# 4. Send a Workflow START message
6066
#
6167
# 1.
6268
workflow_path = os.path.join(
@@ -68,6 +74,13 @@ def start_workflow(
6874
wf_response = da.create_workflow(workflow_definition=wf_definition)
6975
print(f"Created workflow definition {wf_response}")
7076
# 2.
77+
vr_result: ValidationResult = WorkflowValidator.validate(
78+
workflow_definition=wf_definition,
79+
variables=variables,
80+
level=ValidationLevel.RUN,
81+
)
82+
assert vr_result.error_num == 0
83+
# 3.
7184
response = da.create_running_workflow(
7285
user_id="dlister",
7386
workflow_id=wf_response["id"],

workflow/workflow_engine.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -330,22 +330,28 @@ def _validate_step_command(
330330
# If the returned boolean is False then we can expect the returned str
331331
# to contain an error message.
332332
#
333-
# Remember that variables can exist in the specification too!
334333
# The full set of step variables can be obtained
335-
# (in descending order of priority) from...
334+
# (in ascending order of priority) from...
336335
#
337-
# 1. The RunningWorkflow
338-
# 2. The Job Step Specification Variables
336+
# 1. The Job Step Specification
337+
# 2. The RunningWorkflow
339338
#
340339
# If variable 'x' is defined in all three then the RunningWorkflow's
341340
# value must be used.
342341

343-
# Get any variables from the step specification.
342+
# 1. Get any variables from the step specification.
344343
all_variables = step_spec.pop("variables") if "variables" in step_spec else {}
345-
# Merge running workflow variables on top of these
344+
# 2. Merge running workflow variables on top of these
346345
if running_workflow_variables:
347346
all_variables |= running_workflow_variables
348347

348+
# This gives all the running workflow and step-specific variables.
349+
# Now we have to inspect the workflow step 'inputs' (and 'options')
350+
# and see if there are further variables that need constructing
351+
# and then adding (merging) into the 'all_variables' dictionary.
352+
#
353+
# TBD
354+
349355
message, success = decode(
350356
job["command"], all_variables, "command", TextEncoding.JINJA2_3_0
351357
)

workflow/workflow_validator.py

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from enum import Enum
66
from typing import Any
77

8-
from .decoder import validate_schema
8+
from .decoder import get_variable_names, validate_schema
99

1010

1111
class ValidationLevel(Enum):
@@ -39,38 +39,39 @@ def validate(
3939
*,
4040
level: ValidationLevel,
4141
workflow_definition: dict[str, Any],
42-
workflow_inputs: dict[str, Any] | None = None,
42+
variables: dict[str, Any] | None = None,
4343
) -> ValidationResult:
4444
"""Validates the workflow definition (and inputs)
4545
based on the provided 'level'."""
4646
assert level in ValidationLevel
4747
assert isinstance(workflow_definition, dict)
48-
if workflow_inputs:
49-
assert isinstance(workflow_inputs, dict)
48+
if variables:
49+
assert isinstance(variables, dict)
5050

51-
# ALl levels require a schema validation
51+
# ALl levels need to pass schema validation
5252
if error := validate_schema(workflow_definition):
5353
return ValidationResult(error_num=1, error_msg=[error])
5454

55+
# Now level-specific validation...
5556
if level == ValidationLevel.RUN:
5657
run_level_result: ValidationResult = WorkflowValidator._validate_run_level(
5758
workflow_definition=workflow_definition,
58-
workflow_inputs=workflow_inputs,
59+
variables=variables,
5960
)
6061
if run_level_result.error_num:
6162
return run_level_result
6263

64+
# OK if we get here
6365
return _VALIDATION_SUCCESS
6466

6567
@classmethod
6668
def _validate_run_level(
6769
cls,
6870
*,
6971
workflow_definition: dict[str, Any],
70-
workflow_inputs: dict[str, Any] | None = None,
72+
variables: dict[str, Any] | None = None,
7173
) -> ValidationResult:
7274
assert workflow_definition
73-
del workflow_inputs
7475

7576
# RUN level requires that each step specification is a valid JSON string.
7677
# and contains properties for 'collection', 'job', and 'version'.
@@ -104,4 +105,16 @@ def _validate_run_level(
104105
error_msg=[f"Specification is missing: {', '.join(missing_keys)}"],
105106
)
106107

108+
# We must have values for all the inputs defined in the workflow.
109+
wf_variables: list[str] = get_variable_names(workflow_definition)
110+
missing_values: list[str] = []
111+
for wf_variable in wf_variables:
112+
if not variables or wf_variable not in variables:
113+
missing_values.append(wf_variable)
114+
if missing_values:
115+
return ValidationResult(
116+
error_num=3,
117+
error_msg=[f"Missing input values for: {', '.join(missing_values)}"],
118+
)
119+
107120
return _VALIDATION_SUCCESS

0 commit comments

Comments
 (0)