diff --git a/tests/job-definitions/job-definitions.yaml b/tests/job-definitions/job-definitions.yaml index 2cb957d..85de046 100644 --- a/tests/job-definitions/job-definitions.yaml +++ b/tests/job-definitions/job-definitions.yaml @@ -116,3 +116,11 @@ jobs: type: string default: clustered.sdf pattern: "^[A-Za-z0-9_/\\.\\-]+\\.(smi|sdf)$" + + rdkit-molprops: + command: >- + addcol.py --inputFile {{ inputFile }} --outputFile {{ outputFile }} --name {{ name }} --value {{ value }} + + cluster-butina: + command: >- + addcol.py --inputFile {{ inputFile }} --outputFile {{ outputFile }} --name {{ name }} --value {{ value }} diff --git a/tests/jobs/addcol.py b/tests/jobs/addcol.py new file mode 100644 index 0000000..33831a3 --- /dev/null +++ b/tests/jobs/addcol.py @@ -0,0 +1,16 @@ +import argparse + +parser = argparse.ArgumentParser( + prog="addcol", + description="Takes a SMILES string and writes it to a file", +) +parser.add_argument("-i", "--inputFile", required=True) +parser.add_argument("-o", "--outputFile", required=True) +parser.add_argument("-n", "--name", required=True) +parser.add_argument("-v", "--value", required=True) +args = parser.parse_args() + +with open(args.inputFile, "rt", encoding="utf8") as input_file: + content = input_file.read() + with open(args.outputFile, "wt", encoding="utf8") as output_file: + output_file.write(content) diff --git a/tests/test_decoder.py b/tests/test_decoder.py index 8ea528b..4274f4d 100644 --- a/tests/test_decoder.py +++ b/tests/test_decoder.py @@ -23,6 +23,13 @@ _SHORTCUT_EXAMPLE_1_WORKFLOW: Dict[str, Any] = yaml.safe_load(workflow_file) assert _SHORTCUT_EXAMPLE_1_WORKFLOW +_SIMPLE_PYTHON_MOLPROPS_WORKFLOW_FILE: str = os.path.join( + os.path.dirname(__file__), "workflow-definitions", "simple-python-molprops.yaml" +) +with open(_SIMPLE_PYTHON_MOLPROPS_WORKFLOW_FILE, "r", encoding="utf8") as workflow_file: + _SIMPLE_PYTHON_MOLPROPS_WORKFLOW: Dict[str, Any] = yaml.safe_load(workflow_file) +assert _SIMPLE_PYTHON_MOLPROPS_WORKFLOW + def test_validate_minimal(): # Arrange @@ -78,3 +85,56 @@ def test_validate_shortcut_example_1(): # Assert assert error is None + + +def test_validate_python_simple_molprops(): + # Arrange + + # Act + error = decoder.validate_schema(_SIMPLE_PYTHON_MOLPROPS_WORKFLOW) + + # Assert + assert error is None + + +def test_get_workflow_variables(): + # Arrange + + # Act + wf_variables = decoder.get_variable_names(_SIMPLE_PYTHON_MOLPROPS_WORKFLOW) + + # Assert + assert len(wf_variables) == 1 + assert "candidateMolecules" in wf_variables + + +def test_get_workflow_description(): + # Arrange + + # Act + description = decoder.get_description(_SIMPLE_PYTHON_MOLPROPS_WORKFLOW) + + # Assert + assert description == "A simple python experimental workflow" + + +def test_get_workflow_name(): + # Arrange + + # Act + name = decoder.get_name(_SIMPLE_PYTHON_MOLPROPS_WORKFLOW) + + # Assert + assert name == "python-workflow" + + +def test_get_workflow_steps(): + # Arrange + + # Act + steps = decoder.get_steps(_SIMPLE_PYTHON_MOLPROPS_WORKFLOW) + + # Assert + assert len(steps) == 2 + assert steps[0]["name"] == "step1" + assert steps[1]["name"] == "step2" diff --git a/tests/test_workflow_engine_examples.py b/tests/test_workflow_engine_examples.py index 106da5c..375f206 100644 --- a/tests/test_workflow_engine_examples.py +++ b/tests/test_workflow_engine_examples.py @@ -214,3 +214,34 @@ def test_workflow_engine_shortcut_example_1(basic_engine): # This test should generate a file in the simulated project directory assert project_file_exists(output_file_a) assert project_file_exists(output_file_b) + + +@pytest.mark.skip(reason="The engine does not currently create the required variables") +def test_workflow_engine_simple_python_molprops(basic_engine): + # Arrange + da, md = basic_engine + # Make sure files that should be generated by the test + # do not exist before we run the test. + output_file_a = "a.sdf" + assert not project_file_exists(output_file_a) + output_file_b = "b.sdf" + assert not project_file_exists(output_file_b) + + # Act + r_wfid = start_workflow( + md, da, "simple-python-molprops", {"candidateMolecules": "C"} + ) + + # Assert + wait_for_workflow(da, r_wfid) + # Additional, detailed checks... + # Check we only have one RunningWorkflowStep, and it succeeded + response = da.get_running_workflow_steps(running_workflow_id=r_wfid) + assert response["count"] == 2 + assert response["running_workflow_steps"][0]["done"] + assert response["running_workflow_steps"][0]["success"] + assert response["running_workflow_steps"][1]["done"] + assert response["running_workflow_steps"][1]["success"] + # This test should generate a file in the simulated project directory + assert project_file_exists(output_file_a) + assert project_file_exists(output_file_b) diff --git a/tests/test_workflow_validator_for_create_level.py b/tests/test_workflow_validator_for_create_level.py index f42555c..6a3889a 100644 --- a/tests/test_workflow_validator_for_create_level.py +++ b/tests/test_workflow_validator_for_create_level.py @@ -102,3 +102,23 @@ def test_validate_shortcut_example_1(): # Assert assert error.error_num == 0 assert error.error_msg is None + + +def test_validate_simple_python_molprops(): + # Arrange + workflow_file: str = os.path.join( + os.path.dirname(__file__), "workflow-definitions", "simple-python-molprops.yaml" + ) + with open(workflow_file, "r", encoding="utf8") as workflow_file: + workflow: dict[str, Any] = yaml.load(workflow_file, Loader=yaml.FullLoader) + assert workflow + + # Act + error = WorkflowValidator.validate( + level=ValidationLevel.CREATE, + workflow_definition=workflow, + ) + + # Assert + assert error.error_num == 0 + assert error.error_msg is None diff --git a/tests/workflow-definitions/simple-python-molprops.yaml b/tests/workflow-definitions/simple-python-molprops.yaml new file mode 100644 index 0000000..a4da0aa --- /dev/null +++ b/tests/workflow-definitions/simple-python-molprops.yaml @@ -0,0 +1,57 @@ +--- +kind: DataManagerWorkflow +kind-version: "2024.1" +name: python-workflow +description: A simple python experimental workflow +variables: + inputs: + - name: candidateMolecules + type: squonk/x-smiles + outputs: + - name: clusteredMolecules + from: + - step: step2 + output: outputFile + as: clustered-molecules.smi + +steps: +- name: step1 + description: Add column 1 + specification: >- + { + "collection": "workflow-engine-unit-test-jobs", + "job": "rdkit-molprops", + "version": "1.0.0", + "variables": { + "name": "col1", + "value": 123 + } + } + inputs: + - input: inputFile + from: + workflow-input: candidateMolecules + outputs: + - output: outputFile + as: __step1__out.smi + +- name: step2 + Description: Add column 2 + specification: >- + { + "collection": "workflow-engine-unit-test-jobs", + "job": "cluster-butina", + "version":"1.0.0", + "variables": { + "name":"col2", + "value":"999" + } + } + inputs: + - input: inputFile + from: + step: step1 + output: outputFile + outputs: + - output: outputFile + as: __step2__out.smi diff --git a/workflow/decoder.py b/workflow/decoder.py index 0cdebc5..8a8aba8 100644 --- a/workflow/decoder.py +++ b/workflow/decoder.py @@ -52,11 +52,24 @@ def get_steps(definition: dict[str, Any]) -> list[dict[str, Any]]: return response -def get_workflow_name(definition: dict[str, Any]) -> str: +def get_name(definition: dict[str, Any]) -> str: """Given a Workflow definition this function returns its name.""" return str(definition.get("name", "")) -def get_workflow_description(definition: dict[str, Any]) -> str | None: +def get_description(definition: dict[str, Any]) -> str | None: """Given a Workflow definition this function returns its description (if it has one).""" return definition.get("description") + + +def get_variable_names(definition: dict[str, Any]) -> list[str]: + """Given a Workflow definition this function returns all the names of the + variables defined at the workflow level.""" + wf_variable_names: set[str] = set() + variables: dict[str, Any] | None = definition.get("variables") + if variables: + for input_variable in variables.get("inputs", []): + name: str = input_variable["name"] + assert name not in wf_variable_names + wf_variable_names.add(name) + return list(wf_variable_names) diff --git a/workflow/workflow-schema.yaml b/workflow/workflow-schema.yaml index d61ffa3..8bccf99 100644 --- a/workflow/workflow-schema.yaml +++ b/workflow/workflow-schema.yaml @@ -27,6 +27,13 @@ properties: type: array items: $ref: "#/definitions/step" + variables: + type: object + properties: + inputs: + type: array + items: + $ref: "#/definitions/workflow-input-parameter" required: - kind - kind-version @@ -46,13 +53,27 @@ definitions: A value compatible with Kubernetes variables to allow it to be used ins Pod Label - variable-name: + parameter-name: type: string - pattern: ^[a-zA-Z_][a-zA-Z0-9_]{,63}$ - description: >- - A Job/Step variable name, as used in the Data Manager Job Specification + pattern: ^[a-zA-Z_][a-zA-Z0-9_-]*$ - # Declaration of a step from anotehr step + workflow-input-parameter: + type: object + properties: + name: + $ref: '#/definitions/parameter-name' + + # Declaration of a value from a workflow input (variable) + from-workflow-input: + type: object + additionalProperties: false + properties: + workflow-input: + $ref: '#/definitions/parameter-name' + required: + - workflow-input + + # Declaration of a value from another step from-step-output: type: object additionalProperties: false @@ -60,7 +81,7 @@ definitions: step: $ref: '#/definitions/rfc1035-label-name' output: - $ref: '#/definitions/variable-name' + $ref: '#/definitions/parameter-name' required: - step - output @@ -71,22 +92,33 @@ definitions: additionalProperties: false properties: input: - $ref: '#/definitions/variable-name' + $ref: '#/definitions/parameter-name' from: $ref: '#/definitions/from-step-output' required: - input + step-input-from-workflow: + type: object + additionalProperties: false + properties: + input: + $ref: '#/definitions/parameter-name' + from: + $ref: '#/definitions/from-workflow-input' + required: + - input + # A Step output (with an 'as' - a declared value) step-output-as: type: object additionalProperties: false properties: output: - $ref: '#/definitions/variable-name' + $ref: '#/definitions/parameter-name' as: type: string - description: The value to set the variable to + description: The value to set the parameter to required: - output - as @@ -105,12 +137,13 @@ definitions: inputs: type: array items: - oneOf: + anyOf: - $ref: "#/definitions/step-input-from-step" + - $ref: "#/definitions/step-input-from-workflow" outputs: type: array items: - oneOf: + anyOf: - $ref: "#/definitions/step-output-as" required: - name diff --git a/workflow/workflow_engine.py b/workflow/workflow_engine.py index d7c7f43..e12ed06 100644 --- a/workflow/workflow_engine.py +++ b/workflow/workflow_engine.py @@ -139,7 +139,7 @@ def _handle_workflow_start_message(self, r_wfid: str) -> None: # Launch the first step. # If there's a launch problem the step (and running workflow) will have # and error, stopping it. There will be no Pod event as the launch has failed. - self._launch(wf=wf_response, rwf=rwf_response, rwfs_id=r_wfsid, step=first_step) + self._launch(rwf=rwf_response, rwfs_id=r_wfsid, step=first_step) def _handle_pod_message(self, msg: PodMessage) -> None: """Handles a PodMessage. This is a message that signals the completion of a @@ -263,7 +263,6 @@ def _handle_pod_message(self, msg: PodMessage) -> None: ) self._launch( - wf=wf_response, rwf=rwf_response, rwfs_id=r_wfsid, step=next_step, @@ -286,7 +285,6 @@ def _validate_step_command( self, *, step: dict[str, Any], - workflow_variables: dict[str, Any] | None, running_workflow_variables: dict[str, Any] | None = None, ) -> str | dict[str, Any]: """Returns an error message if the command isn't valid. @@ -328,7 +326,7 @@ def _validate_step_command( # 'decode()' method to do this. It returns a tuple (str and boolean). # If the boolean is True then the command can be compiled # (i.e. it has no missing variables) and the launcher should not complain - # about the command (as we'll pass the same variables to it). + # about the command (as we'll pass the same variables to it. # If the returned boolean is False then we can expect the returned str # to contain an error message. # @@ -337,17 +335,17 @@ def _validate_step_command( # (in descending order of priority) from... # # 1. The RunningWorkflow - # 2. The Workflow - # 3. The Job Specification + # 2. The Job Step Specification Variables # # If variable 'x' is defined in all three then the RunningWorkflow's # value must be used. + # Get any variables from the step specification. all_variables = step_spec.pop("variables") if "variables" in step_spec else {} - if workflow_variables: - all_variables |= workflow_variables + # Merge running workflow variables on top of these if running_workflow_variables: all_variables |= running_workflow_variables + message, success = decode( job["command"], all_variables, "command", TextEncoding.JINJA2_3_0 ) @@ -356,7 +354,6 @@ def _validate_step_command( def _launch( self, *, - wf: dict[str, Any], rwf: dict[str, Any], rwfs_id: str, step: dict[str, Any], @@ -366,12 +363,15 @@ def _launch( _LOGGER.info("Validating step command: %s (step=%s)...", rwf_id, step_name) - # Now check the step command can be executed (by decoding it) - workflow_variables: dict[str, Any] | None = wf.get("variables") + # Now check the step command can be executed + # (by trying to decoding the Job command). + # + # We pass in the workflow variables (these are provided by the user + # when the workflow is run. All workflow variables will be present in the + # running workflow record) running_workflow_variables: dict[str, Any] | None = rwf.get("variables") error_or_variables: str | dict[str, Any] = self._validate_step_command( step=step, - workflow_variables=workflow_variables, running_workflow_variables=running_workflow_variables, ) if isinstance(error_or_variables, str):