diff --git a/tests/wapi_adapter.py b/tests/wapi_adapter.py index 7f73187..0f9915f 100644 --- a/tests/wapi_adapter.py +++ b/tests/wapi_adapter.py @@ -16,6 +16,7 @@ """ import os +from http import HTTPStatus from multiprocessing import Lock from pickle import Pickler, Unpickler from typing import Any @@ -385,4 +386,4 @@ def realise_outputs( ) -> tuple[dict[str, Any], int]: del running_workflow_step_id del outputs - return {}, 0 + return {}, HTTPStatus.OK diff --git a/workflow/workflow_engine.py b/workflow/workflow_engine.py index 85f57b4..e1061cc 100644 --- a/workflow/workflow_engine.py +++ b/workflow/workflow_engine.py @@ -24,6 +24,7 @@ import logging import sys +from http import HTTPStatus from typing import Any, Dict, Optional from decoder.decoder import TextEncoding, decode @@ -259,26 +260,40 @@ def _handle_pod_message(self, msg: PodMessage) -> None: # Project directory, while also marking the Step as DONE (successfully). # We pass the outputs to the DM via a call to the API adapter's realise_outputs(). # In return it copies (links) these files to the Project directory. - # - # We then inspect the Workflow to determine the next step. - wfid = rwf_response["workflow"]["id"] assert wfid wf_response, _ = self._wapi_adapter.get_workflow(workflow_id=wfid) _LOGGER.debug("API.get_workflow(%s) returned: -\n%s", wfid, str(wf_response)) + error_num: int | None = None + error_msg: str | None = None if output_values := get_workflow_output_values_for_step(wf_response, step_name): # Got some output values # Inform the DM so it can link them to the Project directory - self._wapi_adapter.realise_outputs( + response, status_code = self._wapi_adapter.realise_outputs( running_workflow_step_id=r_wfsid, outputs=output_values, ) + if status_code != HTTPStatus.OK: + error_num = status_code + error_msg = ( + response["error"] + if "error" in response + else "Undisclosed error when realising outputs" + ) - # Now we can mark this step as DONE... + if error_num is not None: + # The job was successful but linking outputs (back to the Project directory) + # appears to have failed. + self._set_step_error(step_name, r_wfid, r_wfsid, error_num, error_msg) + return + + # We then inspect the Workflow to determine the next step. self._wapi_adapter.set_running_workflow_step_done( running_workflow_step_id=r_wfsid, - success=True, + success=error_num is None, + error_num=error_num, + error_msg=error_msg, ) # We have the step from the Instance that's just finished,