Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion tests/wapi_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"""

import os
from http import HTTPStatus
from multiprocessing import Lock
from pickle import Pickler, Unpickler
from typing import Any
Expand Down Expand Up @@ -385,4 +386,4 @@ def realise_outputs(
) -> tuple[dict[str, Any], int]:
del running_workflow_step_id
del outputs
return {}, 0
return {}, HTTPStatus.OK
27 changes: 21 additions & 6 deletions workflow/workflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import logging
import sys
from http import HTTPStatus
from typing import Any, Dict, Optional

from decoder.decoder import TextEncoding, decode
Expand Down Expand Up @@ -259,26 +260,40 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
# Project directory, while also marking the Step as DONE (successfully).
# We pass the outputs to the DM via a call to the API adapter's realise_outputs().
# In return it copies (links) these files to the Project directory.
#
# We then inspect the Workflow to determine the next step.

wfid = rwf_response["workflow"]["id"]
assert wfid
wf_response, _ = self._wapi_adapter.get_workflow(workflow_id=wfid)
_LOGGER.debug("API.get_workflow(%s) returned: -\n%s", wfid, str(wf_response))

error_num: int | None = None
error_msg: str | None = None
if output_values := get_workflow_output_values_for_step(wf_response, step_name):
# Got some output values
# Inform the DM so it can link them to the Project directory
self._wapi_adapter.realise_outputs(
response, status_code = self._wapi_adapter.realise_outputs(
running_workflow_step_id=r_wfsid,
outputs=output_values,
)
if status_code != HTTPStatus.OK:
error_num = status_code
error_msg = (
response["error"]
if "error" in response
else "Undisclosed error when realising outputs"
)

# Now we can mark this step as DONE...
if error_num is not None:
# The job was successful but linking outputs (back to the Project directory)
# appears to have failed.
self._set_step_error(step_name, r_wfid, r_wfsid, error_num, error_msg)
return

# We then inspect the Workflow to determine the next step.
self._wapi_adapter.set_running_workflow_step_done(
running_workflow_step_id=r_wfsid,
success=True,
success=error_num is None,
error_num=error_num,
error_msg=error_msg,
)

# We have the step from the Instance that's just finished,
Expand Down