Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions tests/test_test_api_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,27 @@ def test_get_running_workflow():
assert response["variables"] == {"x": 1}


def test_get_running_steps_when_none_running():
# Arrange
utaa = UnitTestWorkflowAPIAdapter()
response = utaa.create_workflow(workflow_definition={"name": "blah"})
wfid = response["id"]
response = utaa.create_running_workflow(
user_id="dlister",
workflow_id=wfid,
project_id=TEST_PROJECT_ID,
variables={"x": 1},
)
rwfid = response["id"]

# Act
response, _ = utaa.get_running_steps(running_workflow_id=rwfid)

# Assert
assert response["count"] == 0
assert response["steps"] == []


def test_set_running_workflow_done_when_success():
# Arrange
utaa = UnitTestWorkflowAPIAdapter()
Expand Down
6 changes: 6 additions & 0 deletions tests/wapi_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ def get_running_workflow(
response["id"] = running_workflow_id
return response, 0

def get_running_steps(
self, *, running_workflow_id: str
) -> tuple[dict[str, Any], int]:
# Does nothing at the moment - this is used for the STOP logic.
return {"count": 0, "steps": []}, 0

def set_running_workflow_done(
self,
*,
Expand Down
18 changes: 17 additions & 1 deletion workflow/workflow_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,22 @@ def get_running_workflow(
# }
# If not present an empty dictionary should be returned.

@abstractmethod
def get_running_steps(
self, *, running_workflow_id: str
) -> tuple[dict[str, Any], int]:
"""Get a list of steps (their names) that are currently running for the
given RunningWorkflow Record"""
# Should return:
# {
# "count": 1,
# "steps": [
# {
# "name:": "step-1234"
# }
# ]
# }

@abstractmethod
def set_running_workflow_done(
self,
Expand Down Expand Up @@ -201,7 +217,7 @@ def get_running_workflow_step(
# can be expected in the response: -
#
# "prior_running_workflow_step": {
# "id": "r-worflkow-step-00000000-0000-0000-0000-000000000001",
# "id": "r-workflow-step-00000000-0000-0000-0000-000000000001",
# },

@abstractmethod
Expand Down
46 changes: 39 additions & 7 deletions workflow/workflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,25 +77,23 @@ def handle_message(self, msg: Message) -> None:

def _handle_workflow_message(self, msg: WorkflowMessage) -> None:
"""WorkflowMessages signal the need to start (or stop) a workflow using its
'action' string field (one of 'START' or 'START').
'action' string field (one of 'START' or 'STOP').
The message contains a 'running_workflow' field that contains the UUID
of an existing RunningWorkflow record in the DM. Using this
we can locate the Workflow record and interrogate that to identify which
step (or steps) to launch (run) first."""
assert msg

_LOGGER.info("WorkflowMessage:\n%s", str(msg))
assert msg.action in ["START", "STOP"]
if msg.action not in ["START", "STOP"]:
_LOGGER.error("Ignoring unsupported action (%s)", msg.action)
return

r_wfid = msg.running_workflow
if msg.action == "START":
self._handle_workflow_start_message(r_wfid)
else:
# STOP is not implemented yet and probably not for some time.
# So just log and ignore for now!
_LOGGER.warning(
"Got STOP action for %s - but it's not implemented yet!", r_wfid
)
self._handle_workflow_stop_message(r_wfid)

def _handle_workflow_start_message(self, r_wfid: str) -> None:
"""Logic to handle a START message. This is the beginning of a new
Expand Down Expand Up @@ -142,6 +140,40 @@ def _handle_workflow_start_message(self, r_wfid: str) -> None:
# and error, stopping it. There will be no Pod event as the launch has failed.
self._launch(rwf=rwf_response, rwfs_id=r_wfsid, step=first_step)

def _handle_workflow_stop_message(self, r_wfid: str) -> None:
"""Logic to handle a STOP message."""
# Do nothing if the running workflow has already stopped.
rwf_response, _ = self._wapi_adapter.get_running_workflow(
running_workflow_id=r_wfid
)
_LOGGER.debug(
"API.get_running_workflow(%s) returned: -\n%s", r_wfid, str(rwf_response)
)
if not rwf_response:
_LOGGER.debug("Running workflow does not exist (%s)", r_wfid)
return
elif rwf_response["done"] is True:
_LOGGER.debug("Running workflow already stopped (%s)", r_wfid)
return

# For this version all we can do is check that no steps are running.
# If no steps are running we can safely mark the running workflow as stopped.
response, _ = self._wapi_adapter.get_running_steps(running_workflow_id=r_wfid)
_LOGGER.debug(
"API.get_running_steps(%s) returned: -\n%s", r_wfid, str(response)
)
if response:
if count := response["count"]:
msg: str = "1 step is" if count == 1 else f"{count} steps are"
_LOGGER.debug("Ignoring STOP for %s. %s still running", r_wfid, msg)
else:
self._wapi_adapter.set_running_workflow_done(
running_workflow_id=r_wfid,
success=False,
error_num=1,
error_msg="User stopped",
)

def _handle_pod_message(self, msg: PodMessage) -> None:
"""Handles a PodMessage. This is a message that signals the completion of a
prior step Job within an existing running workflow.
Expand Down