Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions tests/test_workflow_engine_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
from tests.message_queue import UnitTestMessageQueue
from tests.wapi_adapter import UnitTestWorkflowAPIAdapter
from workflow.workflow_engine import WorkflowEngine
from workflow.workflow_validator import (
ValidationLevel,
ValidationResult,
WorkflowValidator,
)


@pytest.fixture
Expand Down Expand Up @@ -55,8 +60,9 @@ def start_workflow(

# To start a workflow we need to:
# 1. Load and create a Workflow Definition
# 2. Create a Running Workflow record
# 3. Send a Workflow START message
# 2. Validate the workflow for running
# 3. Create a Running Workflow record
# 4. Send a Workflow START message
#
# 1.
workflow_path = os.path.join(
Expand All @@ -68,6 +74,13 @@ def start_workflow(
wf_response = da.create_workflow(workflow_definition=wf_definition)
print(f"Created workflow definition {wf_response}")
# 2.
vr_result: ValidationResult = WorkflowValidator.validate(
workflow_definition=wf_definition,
variables=variables,
level=ValidationLevel.RUN,
)
assert vr_result.error_num == 0
# 3.
response = da.create_running_workflow(
user_id="dlister",
workflow_id=wf_response["id"],
Expand Down
18 changes: 12 additions & 6 deletions workflow/workflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,22 +330,28 @@ def _validate_step_command(
# If the returned boolean is False then we can expect the returned str
# to contain an error message.
#
# Remember that variables can exist in the specification too!
# The full set of step variables can be obtained
# (in descending order of priority) from...
# (in ascending order of priority) from...
#
# 1. The RunningWorkflow
# 2. The Job Step Specification Variables
# 1. The Job Step Specification
# 2. The RunningWorkflow
#
# If variable 'x' is defined in all three then the RunningWorkflow's
# value must be used.

# Get any variables from the step specification.
# 1. Get any variables from the step specification.
all_variables = step_spec.pop("variables") if "variables" in step_spec else {}
# Merge running workflow variables on top of these
# 2. Merge running workflow variables on top of these
if running_workflow_variables:
all_variables |= running_workflow_variables

# This gives all the running workflow and step-specific variables.
# Now we have to inspect the workflow step 'inputs' (and 'options')
# and see if there are further variables that need constructing
# and then adding (merging) into the 'all_variables' dictionary.
#
# TBD

message, success = decode(
job["command"], all_variables, "command", TextEncoding.JINJA2_3_0
)
Expand Down
29 changes: 21 additions & 8 deletions workflow/workflow_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from enum import Enum
from typing import Any

from .decoder import validate_schema
from .decoder import get_variable_names, validate_schema


class ValidationLevel(Enum):
Expand Down Expand Up @@ -39,38 +39,39 @@ def validate(
*,
level: ValidationLevel,
workflow_definition: dict[str, Any],
workflow_inputs: dict[str, Any] | None = None,
variables: dict[str, Any] | None = None,
) -> ValidationResult:
"""Validates the workflow definition (and inputs)
based on the provided 'level'."""
assert level in ValidationLevel
assert isinstance(workflow_definition, dict)
if workflow_inputs:
assert isinstance(workflow_inputs, dict)
if variables:
assert isinstance(variables, dict)

# ALl levels require a schema validation
# ALl levels need to pass schema validation
if error := validate_schema(workflow_definition):
return ValidationResult(error_num=1, error_msg=[error])

# Now level-specific validation...
if level == ValidationLevel.RUN:
run_level_result: ValidationResult = WorkflowValidator._validate_run_level(
workflow_definition=workflow_definition,
workflow_inputs=workflow_inputs,
variables=variables,
)
if run_level_result.error_num:
return run_level_result

# OK if we get here
return _VALIDATION_SUCCESS

@classmethod
def _validate_run_level(
cls,
*,
workflow_definition: dict[str, Any],
workflow_inputs: dict[str, Any] | None = None,
variables: dict[str, Any] | None = None,
) -> ValidationResult:
assert workflow_definition
del workflow_inputs

# RUN level requires that each step specification is a valid JSON string.
# and contains properties for 'collection', 'job', and 'version'.
Expand Down Expand Up @@ -104,4 +105,16 @@ def _validate_run_level(
error_msg=[f"Specification is missing: {', '.join(missing_keys)}"],
)

# We must have values for all the inputs defined in the workflow.
wf_variables: list[str] = get_variable_names(workflow_definition)
missing_values: list[str] = []
for wf_variable in wf_variables:
if not variables or wf_variable not in variables:
missing_values.append(wf_variable)
if missing_values:
return ValidationResult(
error_num=3,
error_msg=[f"Missing input values for: {', '.join(missing_values)}"],
)

return _VALIDATION_SUCCESS