diff --git a/tests/test_workflow_engine_examples.py b/tests/test_workflow_engine_examples.py index 375f206..2a6a535 100644 --- a/tests/test_workflow_engine_examples.py +++ b/tests/test_workflow_engine_examples.py @@ -16,6 +16,11 @@ from tests.message_queue import UnitTestMessageQueue from tests.wapi_adapter import UnitTestWorkflowAPIAdapter from workflow.workflow_engine import WorkflowEngine +from workflow.workflow_validator import ( + ValidationLevel, + ValidationResult, + WorkflowValidator, +) @pytest.fixture @@ -55,8 +60,9 @@ def start_workflow( # To start a workflow we need to: # 1. Load and create a Workflow Definition - # 2. Create a Running Workflow record - # 3. Send a Workflow START message + # 2. Validate the workflow for running + # 3. Create a Running Workflow record + # 4. Send a Workflow START message # # 1. workflow_path = os.path.join( @@ -68,6 +74,13 @@ def start_workflow( wf_response = da.create_workflow(workflow_definition=wf_definition) print(f"Created workflow definition {wf_response}") # 2. + vr_result: ValidationResult = WorkflowValidator.validate( + workflow_definition=wf_definition, + variables=variables, + level=ValidationLevel.RUN, + ) + assert vr_result.error_num == 0 + # 3. response = da.create_running_workflow( user_id="dlister", workflow_id=wf_response["id"], diff --git a/workflow/workflow_engine.py b/workflow/workflow_engine.py index e12ed06..bc7c285 100644 --- a/workflow/workflow_engine.py +++ b/workflow/workflow_engine.py @@ -330,22 +330,28 @@ def _validate_step_command( # If the returned boolean is False then we can expect the returned str # to contain an error message. # - # Remember that variables can exist in the specification too! # The full set of step variables can be obtained - # (in descending order of priority) from... + # (in ascending order of priority) from... # - # 1. The RunningWorkflow - # 2. The Job Step Specification Variables + # 1. The Job Step Specification + # 2. The RunningWorkflow # # If variable 'x' is defined in all three then the RunningWorkflow's # value must be used. - # Get any variables from the step specification. + # 1. Get any variables from the step specification. all_variables = step_spec.pop("variables") if "variables" in step_spec else {} - # Merge running workflow variables on top of these + # 2. Merge running workflow variables on top of these if running_workflow_variables: all_variables |= running_workflow_variables + # This gives all the running workflow and step-specific variables. + # Now we have to inspect the workflow step 'inputs' (and 'options') + # and see if there are further variables that need constructing + # and then adding (merging) into the 'all_variables' dictionary. + # + # TBD + message, success = decode( job["command"], all_variables, "command", TextEncoding.JINJA2_3_0 ) diff --git a/workflow/workflow_validator.py b/workflow/workflow_validator.py index 8040fd2..0ea38cf 100644 --- a/workflow/workflow_validator.py +++ b/workflow/workflow_validator.py @@ -5,7 +5,7 @@ from enum import Enum from typing import Any -from .decoder import validate_schema +from .decoder import get_variable_names, validate_schema class ValidationLevel(Enum): @@ -39,27 +39,29 @@ def validate( *, level: ValidationLevel, workflow_definition: dict[str, Any], - workflow_inputs: dict[str, Any] | None = None, + variables: dict[str, Any] | None = None, ) -> ValidationResult: """Validates the workflow definition (and inputs) based on the provided 'level'.""" assert level in ValidationLevel assert isinstance(workflow_definition, dict) - if workflow_inputs: - assert isinstance(workflow_inputs, dict) + if variables: + assert isinstance(variables, dict) - # ALl levels require a schema validation + # ALl levels need to pass schema validation if error := validate_schema(workflow_definition): return ValidationResult(error_num=1, error_msg=[error]) + # Now level-specific validation... if level == ValidationLevel.RUN: run_level_result: ValidationResult = WorkflowValidator._validate_run_level( workflow_definition=workflow_definition, - workflow_inputs=workflow_inputs, + variables=variables, ) if run_level_result.error_num: return run_level_result + # OK if we get here return _VALIDATION_SUCCESS @classmethod @@ -67,10 +69,9 @@ def _validate_run_level( cls, *, workflow_definition: dict[str, Any], - workflow_inputs: dict[str, Any] | None = None, + variables: dict[str, Any] | None = None, ) -> ValidationResult: assert workflow_definition - del workflow_inputs # RUN level requires that each step specification is a valid JSON string. # and contains properties for 'collection', 'job', and 'version'. @@ -104,4 +105,16 @@ def _validate_run_level( error_msg=[f"Specification is missing: {', '.join(missing_keys)}"], ) + # We must have values for all the inputs defined in the workflow. + wf_variables: list[str] = get_variable_names(workflow_definition) + missing_values: list[str] = [] + for wf_variable in wf_variables: + if not variables or wf_variable not in variables: + missing_values.append(wf_variable) + if missing_values: + return ValidationResult( + error_num=3, + error_msg=[f"Missing input values for: {', '.join(missing_values)}"], + ) + return _VALIDATION_SUCCESS