diff --git a/pyproject.toml b/pyproject.toml index c7f7655..f2ea162 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,7 @@ warn_unused_ignores = true [tool.pylint] disable = [ + "fixme", "R", "too-few-public-methods", "too-many-arguments", diff --git a/tests/test_decoder.py b/tests/test_decoder.py index c2468b6..a57ab3e 100644 --- a/tests/test_decoder.py +++ b/tests/test_decoder.py @@ -255,3 +255,77 @@ def test_set_variables_from_options_for_step_for_simnple_python_molprops_with_op assert "value" in new_variables assert new_variables["name"] == "propertyName" assert new_variables["value"] == "propertyValue" + + +def test_get_workflow_inputs_for_step_with_name_step1(): + # Arrange + + # Act + inputs = decoder.get_workflow_input_names_for_step( + _SIMPLE_PYTHON_MOLPROPS_WITH_OPTIONS_WORKFLOW, "step1" + ) + + # Assert + assert len(inputs) == 1 + assert "candidateMolecules" in inputs + + +def test_get_workflow_inputs_for_step_with_name_step2(): + # Arrange + + # Act + inputs = decoder.get_workflow_input_names_for_step( + _SIMPLE_PYTHON_MOLPROPS_WITH_OPTIONS_WORKFLOW, "step2" + ) + + # Assert + assert not inputs + + +def test_get_workflow_inputs_for_step_with_unkown_step_name(): + # Arrange + + # Act + inputs = decoder.get_workflow_input_names_for_step( + _SIMPLE_PYTHON_MOLPROPS_WITH_OPTIONS_WORKFLOW, "unknown" + ) + + # Assert + assert not inputs + + +def test_get_workflow_outputs_for_step_with_name_step1(): + # Arrange + + # Act + outputs = decoder.get_workflow_output_values_for_step( + _SIMPLE_PYTHON_MOLPROPS_WITH_OPTIONS_WORKFLOW, "step1" + ) + + # Assert + assert not outputs + + +def test_get_workflow_outputs_for_step_with_name_step2(): + # Arrange + + # Act + outputs = decoder.get_workflow_output_values_for_step( + _SIMPLE_PYTHON_MOLPROPS_WITH_OPTIONS_WORKFLOW, "step2" + ) + + # Assert + assert len(outputs) == 1 + assert "clustered-molecules.smi" in outputs + + +def test_get_workflow_outputs_for_step_with_unkown_step_name(): + # Arrange + + # Act + outputs = decoder.get_workflow_output_values_for_step( + _SIMPLE_PYTHON_MOLPROPS_WITH_OPTIONS_WORKFLOW, "unknown" + ) + + # Assert + assert not outputs diff --git a/tests/test_test_api_adapter.py b/tests/test_test_api_adapter.py index 2af21d1..673e725 100644 --- a/tests/test_test_api_adapter.py +++ b/tests/test_test_api_adapter.py @@ -409,3 +409,49 @@ def test_get_workflow_steps_driving_this_step_when_2nd_step(): assert response["steps"][0]["name"] == "step-1" assert response["steps"][1]["name"] == "step-2" assert response["steps"][2]["name"] == "step-3" + + +def test_get_running_workflow_step_by_name(): + # Arrange + utaa = UnitTestWorkflowAPIAdapter() + response = utaa.create_workflow( + workflow_definition={ + "name": "blah", + "steps": [{"name": "step-1"}, {"name": "step-2"}, {"name": "step-3"}], + } + ) + response = utaa.create_running_workflow( + user_id="dlister", + workflow_id=response["id"], + project_id=TEST_PROJECT_ID, + variables={}, + ) + rwf_id = response["id"] + response, _ = utaa.create_running_workflow_step( + running_workflow_id=rwf_id, step="step-2" + ) + rwfs_id = response["id"] + + # Act + response, _ = utaa.get_running_workflow_step_by_name( + name="step-2", running_workflow_id=rwf_id + ) + + # Assert + assert response["running_workflow"]["id"] == rwf_id + assert response["name"] == "step-2" + assert response["id"] == rwfs_id + + +def test_basic_realise(): + # Arrange + utaa = UnitTestWorkflowAPIAdapter() + + # Act + response, _ = utaa.realise_outputs( + running_workflow_step_id="r-workflow-step-00000000-0000-0000-0000-000000000001", + outputs=["a.txt"], + ) + + # Assert + assert not response diff --git a/tests/wapi_adapter.py b/tests/wapi_adapter.py index bb51af8..7f73187 100644 --- a/tests/wapi_adapter.py +++ b/tests/wapi_adapter.py @@ -178,6 +178,25 @@ def get_running_workflow_step( response["id"] = running_workflow_step_id return response, 0 + def get_running_workflow_step_by_name( + self, *, name: str, running_workflow_id: str + ) -> dict[str, Any]: + UnitTestWorkflowAPIAdapter.lock.acquire() + with open(_RUNNING_WORKFLOW_STEP_PICKLE_FILE, "rb") as pickle_file: + running_workflow_step = Unpickler(pickle_file).load() + UnitTestWorkflowAPIAdapter.lock.release() + + print(f"name={name} running_workflow_id={running_workflow_id}") + for rwfs_id, record in running_workflow_step.items(): + print(f"rwfs_id={rwfs_id} record={record}") + if record["running_workflow"]["id"] != running_workflow_id: + continue + if record["name"] == name: + response = record + response["id"] = rwfs_id + return response, 0 + return {}, 0 + def set_running_workflow_step_variables( self, *, @@ -362,8 +381,8 @@ def get_running_workflow_steps(self, *, running_workflow_id: str) -> dict[str, A return {"count": len(steps), "running_workflow_steps": steps} def realise_outputs( - self, *, running_workflow_step_id: str, variables: dict[str, str] + self, *, running_workflow_step_id: str, outputs: list[str, str] ) -> tuple[dict[str, Any], int]: del running_workflow_step_id - del variables + del outputs return {}, 0 diff --git a/workflow/decoder.py b/workflow/decoder.py index 652e54c..5aeb8c6 100644 --- a/workflow/decoder.py +++ b/workflow/decoder.py @@ -82,6 +82,47 @@ def get_variable_names(definition: dict[str, Any]) -> list[str]: return wf_variable_names +def get_workflow_input_names_for_step( + definition: dict[str, Any], name: str +) -> list[str]: + """Given a Workflow definition and a step name we return a list of workflow + input variable names the step expects. To do this we iterate through the step's + inputs to find those that are declared 'from->workflow-input'. + + To get the input (a filename) the caller simply looks these names up + in the variable map.""" + inputs: list[str] = [] + for step in definition.get("steps", {}): + if step["name"] == name and "inputs" in step: + # Find all the workflow inputs. + # This gives us the name of the workflow input variable + # and the name of the step input (Job) variable. + inputs.extend( + step_input["from"]["workflow-input"] + for step_input in step["inputs"] + if "from" in step_input and "workflow-input" in step_input["from"] + ) + return inputs + + +def get_workflow_output_values_for_step( + definition: dict[str, Any], name: str +) -> list[str]: + """Given a Workflow definition and a step name we return a list of workflow + out variable names the step creates. To do this we iterate through the workflows's + outputs to find those that are declared 'from' our step.""" + wf_outputs = definition.get("variable-mapping", {}).get("outputs", {}) + outputs: list[str] = [] + outputs.extend( + output["as"] + for output in wf_outputs + if "from" in output + and "step" in output["from"] + and output["from"]["step"] == name + ) + return outputs + + def set_variables_from_options_for_step( definition: dict[str, Any], variables: dict[str, Any], step_name: str ) -> dict[str, Any]: diff --git a/workflow/workflow_abc.py b/workflow/workflow_abc.py index 2a3db46..2e7c263 100644 --- a/workflow/workflow_abc.py +++ b/workflow/workflow_abc.py @@ -242,6 +242,37 @@ def get_running_workflow_step( # "id": "r-workflow-step-00000000-0000-0000-0000-000000000001", # }, + @abstractmethod + def get_running_workflow_step_by_name( + self, *, name: str, running_workflow_id: str + ) -> tuple[dict[str, Any], int]: + """Get a RunningWorkflowStep Record given a step name + (and its RUnningWorkflow ID)""" + # Should return: + # { + # "id": "r-workflow-step-00000000-0000-0000-0000-000000000001", + # "name:": "step-1234", + # "done": False, + # "success": false, + # "error_num": 0, + # "error_msg": "", + # "variables": { + # "x": 1, + # "y": 2, + # }, + # "running_workflow": { + # "id": "r-workflow-00000000-0000-0000-0000-000000000001", + # }, + # } + # If not present an empty dictionary should be returned. + # + # For steps that are not the first in a workflow the following field + # can be expected in the response: - + # + # "prior_running_workflow_step": { + # "id": "r-workflow-step-00000000-0000-0000-0000-000000000001", + # }, + @abstractmethod def set_running_workflow_step_variables( self, @@ -312,12 +343,11 @@ def get_job( @abstractmethod def realise_outputs( - self, *, running_workflow_step_id: str, outputs: list[tuple[str, str]] + self, *, running_workflow_step_id: str, outputs: list[str] ) -> tuple[dict[str, Any], int]: - """Copy (link) the step's files as outputs into the Project directory, - while also renaming the file. A step ID is provided, along with a list of - outputs (files in the instance directory) and the required counterpart file - in the Project directory.""" + """Copy (link) the step's files as outputs into the Project directory. + A step ID is provided, along with a list of outputs + (files that will be in the step's instance directory).""" # Should return an empty map or: # { # "error": "", diff --git a/workflow/workflow_engine.py b/workflow/workflow_engine.py index 15b409a..8d0e35a 100644 --- a/workflow/workflow_engine.py +++ b/workflow/workflow_engine.py @@ -38,7 +38,11 @@ WorkflowAPIAdapter, ) -from .decoder import set_step_variables +from .decoder import ( + get_workflow_input_names_for_step, + get_workflow_output_values_for_step, + set_step_variables, +) _LOGGER: logging.Logger = logging.getLogger(__name__) _LOGGER.setLevel(logging.INFO) @@ -138,7 +142,7 @@ def _handle_workflow_start_message(self, r_wfid: str) -> None: # Launch the first step. # If there's a launch problem the step (and running workflow) will have # and error, stopping it. There will be no Pod event as the launch has failed. - self._launch(rwf=rwf_response, rwfs_id=r_wfsid, step=first_step) + self._launch(wf=wf_response, rwf=rwf_response, rwfs_id=r_wfsid, step=first_step) def _handle_workflow_stop_message(self, r_wfid: str) -> None: """Logic to handle a STOP message.""" @@ -250,19 +254,33 @@ def _handle_pod_message(self, msg: PodMessage) -> None: self._set_step_error(step_name, r_wfid, r_wfsid, exit_code, "Job failed") return - # If we get here the prior step completed successfully - # and so we can mark the Step as DOne (successfully), - # and then inspect the Workflow to determine the next step. + # If we get here the prior step completed successfully and we can decide + # whether the step has outputs (files) that need to be written to the + # Project directory, while also marking the Step as DONE (successfully). + # We pass the outputs to the DM via a call to the API adapter's realise_outputs(). + # In return it copies (links) these files to the Project directory. + # + # We then inspect the Workflow to determine the next step. - self._wapi_adapter.set_running_workflow_step_done( - running_workflow_step_id=r_wfsid, - success=True, - ) wfid = rwf_response["workflow"]["id"] assert wfid wf_response, _ = self._wapi_adapter.get_workflow(workflow_id=wfid) _LOGGER.debug("API.get_workflow(%s) returned: -\n%s", wfid, str(wf_response)) + if output_values := get_workflow_output_values_for_step(wf_response, step_name): + # Got some output values + # Inform the DM so it can link them to the Project directory + self._wapi_adapter.realise_outputs( + running_workflow_step_id=r_wfsid, + outputs=output_values, + ) + + # Now we can mark this step as DONE... + self._wapi_adapter.set_running_workflow_step_done( + running_workflow_step_id=r_wfsid, + success=True, + ) + # We have the step from the Instance that's just finished, # so we can use that to find the next step in the Workflow definition. # (using the name of the completed step step as an index). @@ -296,6 +314,7 @@ def _handle_pod_message(self, msg: PodMessage) -> None: ) self._launch( + wf=wf_response, rwf=rwf_response, rwfs_id=r_wfsid, step=next_step, @@ -319,11 +338,18 @@ def _validate_step_command( *, running_workflow_step_id: str, step: dict[str, Any], + workflow_steps: list[dict[str, Any]], + our_step_index: int, running_workflow_variables: dict[str, Any] | None = None, ) -> str | dict[str, Any]: """Returns an error message if the command isn't valid. Without a message we return all the variables that were (successfully) - applied to the command.""" + applied to the command. + + We are also given a list of steps in workflow_steps and out position in + the list with our_step_index.""" + assert our_step_index >= 0 + # We get the Job from the step specification, which must contain # the keys "collection", "job", and "version". Here we assume that # the workflow definition has passed the RUN-level validation @@ -380,52 +406,46 @@ def _validate_step_command( if running_workflow_variables: all_variables |= running_workflow_variables - # This gives all the running workflow and step-specific variables. - # Now we have to inspect the workflow step 'inputs' (and 'options') - # and see if there are further variables that need constructing - # and then adding (merging) into the 'all_variables' dictionary. - - wf_step_data, _ = self._wapi_adapter.get_workflow_steps_driving_this_step( - running_workflow_step_id=running_workflow_step_id, - ) - # We must always process the current step's variables _LOGGER.debug("Validating step %s (%s)", step, running_workflow_step_id) inputs = step.get("inputs", []) outputs = step.get("outputs", []) previous_step_outputs = [] - our_index: int = wf_step_data["caller_step_index"] - assert our_index >= 0 _LOGGER.debug( - "We are at workflow step index %d (%s)", our_index, running_workflow_step_id + "We are at workflow step index %d (%s)", + our_step_index, + running_workflow_step_id, ) - if our_index > 0: + if our_step_index > 0: # resolve all previous steps previous_step_names = set() for inp in inputs: if step_name := inp["from"].get("step", None): previous_step_names.add(step_name) - for step in wf_step_data["steps"]: + for step in workflow_steps: if step["name"] in previous_step_names: previous_step_outputs.extend(step.get("outputs", [])) _LOGGER.debug( "Index %s (%s) workflow_variables=%s", - our_index, + our_step_index, running_workflow_step_id, all_variables, ) _LOGGER.debug( - "Index %s (%s) inputs=%s", our_index, running_workflow_step_id, inputs + "Index %s (%s) inputs=%s", our_step_index, running_workflow_step_id, inputs ) _LOGGER.debug( - "Index %s (%s) outputs=%s", our_index, running_workflow_step_id, outputs + "Index %s (%s) outputs=%s", + our_step_index, + running_workflow_step_id, + outputs, ) _LOGGER.debug( "Index %s (%s) previous_step_outputs=%s", - our_index, + our_step_index, running_workflow_step_id, previous_step_outputs, ) @@ -452,7 +472,7 @@ def _validate_step_command( all_variables |= step_vars _LOGGER.debug( "Index %s (%s) all_variables=%s", - our_index, + our_step_index, running_workflow_step_id, all_variables, ) @@ -472,6 +492,7 @@ def _validate_step_command( def _launch( self, *, + wf: dict[str, Any], rwf: dict[str, Any], rwfs_id: str, step: dict[str, Any], @@ -481,6 +502,15 @@ def _launch( _LOGGER.info("Validating step command: %s (step=%s)...", rwf_id, step_name) + # Get step data - importantly, giving us the sequence of steps in the response. + # Steps will be in wf_step_data["steps"] and our position in the list + # is wf_step_data["caller_step_index"] + wf_step_data, _ = self._wapi_adapter.get_workflow_steps_driving_this_step( + running_workflow_step_id=rwfs_id, + ) + assert wf_step_data["caller_step_index"] >= 0 + our_step_index: int = wf_step_data["caller_step_index"] + # Now check the step command can be executed # (by trying to decoding the Job command). # @@ -491,6 +521,8 @@ def _launch( error_or_variables: str | dict[str, Any] = self._validate_step_command( running_workflow_step_id=rwfs_id, step=step, + workflow_steps=wf_step_data["steps"], + our_step_index=our_step_index, running_workflow_variables=running_workflow_variables, ) if isinstance(error_or_variables, str): @@ -514,6 +546,43 @@ def _launch( variables, ) + # When we launch a step we need to identify all the prior steps in the workflow, + # those we depend on. The DataManager will then link their outputs to + # out instance directory. For simple workflows there is only one prior step, + # and it's the one immediately prior to this one. + # + # We put all the prior step IDs in: - + # 'running_workflow_step_prior_steps' + # A list of step UUID strings. + # + # In this 'simple' linear implementation that is simply the immediately + # preceding step. + prior_steps: list[str] = [] + if our_step_index > 0: + # We need the step ID of the prior step. + prior_step_name: str = wf_step_data["steps"][our_step_index - 1]["name"] + step_response, _ = self._wapi_adapter.get_running_workflow_step_by_name( + name=prior_step_name, + running_workflow_id=rwf_id, + ) + assert "id" in step_response + prior_steps.append(step_response["id"]) + + # We must also identify workflow inputs that are required by the step we are + # about to launch and pass those using a launch parameter. The launcher + # will ensure these are copied into out instance directory before we are run. + # + # 'running_workflow_step_inputs' + # A list of string pairs (input/Project filename and output/Instance filename) + # (with relative paths if appropriate. + inputs: list[tuple[str, str]] = [] + for wf_input_name in get_workflow_input_names_for_step(wf, step_name): + # The variable must be known. + # It should have been checked by the time we get here! + assert wf_input_name in variables + # No name change of inputs in this version + inputs.append((variables[wf_input_name], variables[wf_input_name])) + lp: LaunchParameters = LaunchParameters( project_id=project_id, name=step_name, @@ -524,6 +593,8 @@ def _launch( specification_variables=variables, running_workflow_id=rwf_id, running_workflow_step_id=rwfs_id, + running_workflow_step_prior_steps=prior_steps, + running_workflow_step_inputs=inputs, ) lr: LaunchResult = self._instance_launcher.launch(launch_parameters=lp) if lr.error_num: