diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f58c27e..b18a3da 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -71,8 +71,9 @@ repos: name: pylint entry: pylint additional_dependencies: - - jinja2==3.0.3 - - jsonschema >= 3.2.0, < 4.0 + - jinja2 >= 3.1.3 + - jsonschema >= 4.21.1 - pyyaml >= 5.3.1, < 7.0 - - im-protobuf >= 7.0.0 + - im-protobuf >= 8.2.0 + - im-data-manager-job-decoder >= 2.1.0 files: ^workflow/.*\.py$ diff --git a/tests/job-definitions/job-definitions.yaml b/tests/job-definitions/job-definitions.yaml index 17b9335..2cb957d 100644 --- a/tests/job-definitions/job-definitions.yaml +++ b/tests/job-definitions/job-definitions.yaml @@ -85,15 +85,24 @@ jobs: shortcut-example-1-process-b: command: >- - shortcut-example-1-process-b.py --inputFile {{ inputFile }}--outputFile {{ outputFile }} + shortcut-example-1-process-b.py --inputFile {{ inputFile }} --outputFile {{ outputFile }} variables: + inputs: + type: object + required: + - inputFile + properties: + inputFile: + title: Molecules to pick from + mime-types: + - chemical/x-mdl-sdfile + type: file outputs: type: object properties: title: Output file mime-types: - chemical/x-mdl-sdfile - - squonk/x-smiles creates: '{{ outputFile }}' type: file options: diff --git a/workflow/workflow_engine.py b/workflow/workflow_engine.py index 4594359..7ba43ea 100644 --- a/workflow/workflow_engine.py +++ b/workflow/workflow_engine.py @@ -27,6 +27,7 @@ import sys from typing import Any, Dict, Optional +from decoder.decoder import TextEncoding, decode from google.protobuf.message import Message from informaticsmatters.protobuf.datamanager.pod_message_pb2 import PodMessage from informaticsmatters.protobuf.datamanager.workflow_message_pb2 import WorkflowMessage @@ -107,7 +108,6 @@ def _handle_workflow_start_message(self, r_wfid: str) -> None: "API.get_running_workflow(%s) returned: -\n%s", r_wfid, str(rwf_response) ) assert "running_user" in rwf_response - launching_user_name: str = rwf_response["running_user"] # Now get the workflow definition (to get all the steps) wfid = rwf_response["workflow"]["id"] wf_response, _ = self._wapi_adapter.get_workflow(workflow_id=wfid) @@ -117,6 +117,7 @@ def _handle_workflow_start_message(self, r_wfid: str) -> None: # and create a corresponding RunningWorkflowStep record... first_step: Dict[str, Any] = wf_response["steps"][0] first_step_name: str = first_step["name"] + # We need this even if the following goes wrong. response, _ = self._wapi_adapter.create_running_workflow_step( running_workflow_id=r_wfid, step=first_step_name, @@ -128,48 +129,9 @@ def _handle_workflow_start_message(self, r_wfid: str) -> None: str(response), ) assert "id" in response - r_wfsid = response["id"] + r_wfsid: str = response["id"] - # The step's 'specification' is a string - pass it directly to the - # launcher along with any (optional) 'variables'. The launcher - # will apply the variables to step's Job command but we need to handle - # any launch problems. The validator should have checked to ensure that - # variable expansion will work, but we must prepare for the unexpected. - - project_id = rwf_response["project"]["id"] - variables: dict[str, Any] | None = rwf_response.get("variables") - - _LOGGER.info( - "Launching first step: RunningWorkflow=%s RunningWorkflowStep=%s step=%s" - " (name=%s project=%s, variables=%s)", - r_wfid, - r_wfsid, - first_step_name, - rwf_response["name"], - project_id, - variables, - ) - - lp: LaunchParameters = LaunchParameters( - project_id=project_id, - name=first_step_name, - debug=rwf_response.get("debug"), - launching_user_name=launching_user_name, - launching_user_api_token=rwf_response["running_user_api_token"], - specification=json.loads(first_step["specification"]), - specification_variables=variables, - running_workflow_id=r_wfid, - running_workflow_step_id=r_wfsid, - ) - lr: LaunchResult = self._instance_launcher.launch(launch_parameters=lp) - if lr.error_num: - self._set_step_error( - first_step_name, r_wfid, r_wfsid, lr.error_num, lr.error_msg - ) - else: - _LOGGER.info( - "Launched first step '%s' (command=%s)", first_step_name, lr.command - ) + self._launch(wf=wf_response, rwf=rwf_response, rwfs_id=r_wfsid, step=first_step) def _handle_pod_message(self, msg: PodMessage) -> None: """Handles a PodMessage. This is a message that signals the completion of a @@ -266,43 +228,22 @@ def _handle_pod_message(self, msg: PodMessage) -> None: running_workflow_id=r_wfid, step=next_step_name, ) + assert "id" in rwfs_response + r_wfsid = rwfs_response["id"] + assert r_wfsid _LOGGER.debug( "API.create_running_workflow_step(%s, %s) returned: -\n%s", r_wfid, next_step_name, str(response), ) - assert "id" in rwfs_response - new_r_wfsid: str = rwfs_response["id"] - project_id: str = rwf_response["project"]["id"] - variables: dict[str, Any] | None = rwf_response.get("variables") - lp: LaunchParameters = LaunchParameters( - project_id=project_id, - name=next_step_name, - debug=rwf_response.get("debug"), - launching_user_name=rwf_response["running_user"], - launching_user_api_token=rwf_response["running_user_api_token"], - specification=json.loads(next_step["specification"]), - specification_variables=variables, - running_workflow_id=r_wfid, - running_workflow_step_id=new_r_wfsid, + + self._launch( + wf=wf_response, + rwf=rwf_response, + rwfs_id=r_wfsid, + step=next_step, ) - lr = self._instance_launcher.launch(launch_parameters=lp) - # Handle a launch error? - if lr.error_num: - self._set_step_error( - next_step_name, - r_wfid, - new_r_wfsid, - lr.error_num, - lr.error_msg, - ) - else: - _LOGGER.info( - "Launched step: %s (command=%s)", - next_step["name"], - lr.command, - ) # Something was started (or there was a launch error). break @@ -317,6 +258,130 @@ def _handle_pod_message(self, msg: PodMessage) -> None: success=True, ) + def _validate_step_command( + self, + *, + step: dict[str, Any], + workflow_variables: dict[str, Any] | None, + running_workflow_variables: dict[str, Any] | None = None, + ) -> str | dict[str, Any]: + """Returns an error message if the command isn't valid. + Without a message we return all the variables that were (successfully) + applied to the command.""" + # We get the Job from the step specification, which must contain + # the keys "collection", "job", and "version". Here we assume that + # the workflow definition has passed the RUN-level validation + # which means we can get these values. + step_spec: dict[str, Any] = json.loads(step["specification"]) + job_collection: str = step_spec["collection"] + job_job: str = step_spec["job"] + job_version: str = step_spec["version"] + job, _ = self._wapi_adapter.get_job( + collection=job_collection, job=job_job, version=job_version + ) + _LOGGER.debug( + "API.get_job(%s, %s, %s) returned: -\n%s", + job_collection, + job_job, + job_version, + str(job), + ) + + # The step's 'specification' is a string - pass it directly to the + # launcher along with any (optional) 'workflow variables'. The launcher + # will apply the variables to step's Job command but we need to handle + # any launch problems. The validator should have checked to ensure that + # variable expansion will work, but we must prepare for the unexpected. + # + # What the engine has to do here is make sure that the definition + # that's about to be launched has all its configuration requirements + # satisfied (inputs, outputs and options). Basically the + # command must be successfully rendered with what we have. + # + # To do this we give the command and our variables + # to the Job Decoder's 'decode()' method. It returns a tuple (str and boolean). + # If the boolean is True then the command has no undefined configuration + # and can be launched. If it is False then the returned str contains an + # error message. + # + # Remember that variables can exist in (ascending order of priority): - + # 1. The specification + # 2. The workflow + # 2. The RunningWorkflow + + all_variables: dict[str, Any] = {} + if "variables" in step_spec: + all_variables = step_spec.pop("variables") + if workflow_variables: + all_variables = all_variables | workflow_variables + if running_workflow_variables: + all_variables = all_variables | running_workflow_variables + message, success = decode( + job["command"], all_variables, "command", TextEncoding.JINJA2_3_0 + ) + + return all_variables if success else message + + def _launch( + self, + *, + wf: dict[str, Any], + rwf: dict[str, Any], + rwfs_id: str, + step: dict[str, Any], + ) -> None: + step_name: str = step["name"] + rwf_id: str = rwf["id"] + + _LOGGER.info("Validating step command: %s (step=%s)...", rwf_id, step_name) + + # Now check the step command can be executed (by decoding it) + workflow_variables: dict[str, Any] | None = wf.get("variables") + running_workflow_variables: dict[str, Any] | None = rwf.get("variables") + error_or_variables: str | dict[str, Any] = self._validate_step_command( + step=step, + workflow_variables=workflow_variables, + running_workflow_variables=running_workflow_variables, + ) + if isinstance(error_or_variables, str): + error_msg = error_or_variables + _LOGGER.warning( + "First step '%s' failed command validation (%s)", step_name, error_msg + ) + self._set_step_error(step_name, rwf_id, rwfs_id, 1, error_msg) + return + + project_id = rwf["project"]["id"] + variables: dict[str, Any] = error_or_variables + + _LOGGER.info( + "Launching first step: RunningWorkflow=%s RunningWorkflowStep=%s step=%s" + " (name=%s project=%s, variables=%s)", + rwf_id, + rwfs_id, + step_name, + rwf["name"], + project_id, + variables, + ) + + lp: LaunchParameters = LaunchParameters( + project_id=project_id, + name=step_name, + debug=rwf.get("debug"), + launching_user_name=rwf["running_user"], + launching_user_api_token=rwf["running_user_api_token"], + specification=json.loads(step["specification"]), + specification_variables=variables, + running_workflow_id=rwf_id, + running_workflow_step_id=rwfs_id, + ) + lr: LaunchResult = self._instance_launcher.launch(launch_parameters=lp) + if lr.error_num: + self._set_step_error(step_name, rwf_id, rwfs_id, lr.error_num, lr.error_msg) + else: + _LOGGER.info("Launched first step '%s' (command=%s)", step_name, lr.command) + def _set_step_error( self, step_name: str, diff --git a/workflow/workflow_validator.py b/workflow/workflow_validator.py index 0ca8386..db6f6e9 100644 --- a/workflow/workflow_validator.py +++ b/workflow/workflow_validator.py @@ -1,5 +1,6 @@ """The WorkflowEngine validation logic.""" +import json from dataclasses import dataclass from enum import Enum from typing import Any @@ -47,7 +48,59 @@ def validate( if workflow_inputs: assert isinstance(workflow_inputs, dict) + # ALl levels require a schema validation if error := validate_schema(workflow_definition): return ValidationResult(error_num=1, error_msg=[error]) + if level == ValidationLevel.RUN: + run_level_result: ValidationResult = WorkflowValidator._validate_run_level( + workflow_definition=workflow_definition, + workflow_inputs=workflow_inputs, + ) + if run_level_result.error_num: + return run_level_result + + return _VALIDATION_SUCCESS + + @classmethod + def _validate_run_level( + cls, + *, + workflow_definition: dict[str, Any], + workflow_inputs: dict[str, Any] | None = None, + ) -> ValidationResult: + assert workflow_definition + del workflow_inputs + + # RUN level requires that the specification is a valid JSON string. + # and contains properties for 'collection', 'job', and 'version'. + try: + specification = json.loads(workflow_definition["specification"]) + except json.decoder.JSONDecodeError as e: + return ValidationResult( + error_num=1, + error_msg=[ + f"Error decoding specification, which is not valid JSON: {e}" + ], + ) + except TypeError as e: + return ValidationResult( + error_num=2, + error_msg=[ + f"Error decoding specification, which is not valid JSON: {e}" + ], + ) + expected_keys: set[str] = {"collection", "job", "version"} + missing_keys: list[str] = [] + missing_keys.extend( + expected_key + for expected_key in expected_keys + if expected_key not in specification + ) + if missing_keys: + return ValidationResult( + error_num=2, + error_msg=[f"Specification is missing: {', '.join(missing_keys)}"], + ) + return _VALIDATION_SUCCESS