diff --git a/tests/test_test_api_adapter.py b/tests/test_test_api_adapter.py index 9f23559..2af21d1 100644 --- a/tests/test_test_api_adapter.py +++ b/tests/test_test_api_adapter.py @@ -95,6 +95,27 @@ def test_get_running_workflow(): assert response["variables"] == {"x": 1} +def test_get_running_steps_when_none_running(): + # Arrange + utaa = UnitTestWorkflowAPIAdapter() + response = utaa.create_workflow(workflow_definition={"name": "blah"}) + wfid = response["id"] + response = utaa.create_running_workflow( + user_id="dlister", + workflow_id=wfid, + project_id=TEST_PROJECT_ID, + variables={"x": 1}, + ) + rwfid = response["id"] + + # Act + response, _ = utaa.get_running_steps(running_workflow_id=rwfid) + + # Assert + assert response["count"] == 0 + assert response["steps"] == [] + + def test_set_running_workflow_done_when_success(): # Arrange utaa = UnitTestWorkflowAPIAdapter() diff --git a/tests/wapi_adapter.py b/tests/wapi_adapter.py index 4fa260a..aaa633d 100644 --- a/tests/wapi_adapter.py +++ b/tests/wapi_adapter.py @@ -102,6 +102,12 @@ def get_running_workflow( response["id"] = running_workflow_id return response, 0 + def get_running_steps( + self, *, running_workflow_id: str + ) -> tuple[dict[str, Any], int]: + # Does nothing at the moment - this is used for the STOP logic. + return {"count": 0, "steps": []}, 0 + def set_running_workflow_done( self, *, diff --git a/workflow/workflow_abc.py b/workflow/workflow_abc.py index d276219..1d31d93 100644 --- a/workflow/workflow_abc.py +++ b/workflow/workflow_abc.py @@ -149,6 +149,22 @@ def get_running_workflow( # } # If not present an empty dictionary should be returned. + @abstractmethod + def get_running_steps( + self, *, running_workflow_id: str + ) -> tuple[dict[str, Any], int]: + """Get a list of steps (their names) that are currently running for the + given RunningWorkflow Record""" + # Should return: + # { + # "count": 1, + # "steps": [ + # { + # "name:": "step-1234" + # } + # ] + # } + @abstractmethod def set_running_workflow_done( self, @@ -201,7 +217,7 @@ def get_running_workflow_step( # can be expected in the response: - # # "prior_running_workflow_step": { - # "id": "r-worflkow-step-00000000-0000-0000-0000-000000000001", + # "id": "r-workflow-step-00000000-0000-0000-0000-000000000001", # }, @abstractmethod diff --git a/workflow/workflow_engine.py b/workflow/workflow_engine.py index 3be7a42..15b409a 100644 --- a/workflow/workflow_engine.py +++ b/workflow/workflow_engine.py @@ -77,7 +77,7 @@ def handle_message(self, msg: Message) -> None: def _handle_workflow_message(self, msg: WorkflowMessage) -> None: """WorkflowMessages signal the need to start (or stop) a workflow using its - 'action' string field (one of 'START' or 'START'). + 'action' string field (one of 'START' or 'STOP'). The message contains a 'running_workflow' field that contains the UUID of an existing RunningWorkflow record in the DM. Using this we can locate the Workflow record and interrogate that to identify which @@ -85,17 +85,15 @@ def _handle_workflow_message(self, msg: WorkflowMessage) -> None: assert msg _LOGGER.info("WorkflowMessage:\n%s", str(msg)) - assert msg.action in ["START", "STOP"] + if msg.action not in ["START", "STOP"]: + _LOGGER.error("Ignoring unsupported action (%s)", msg.action) + return r_wfid = msg.running_workflow if msg.action == "START": self._handle_workflow_start_message(r_wfid) else: - # STOP is not implemented yet and probably not for some time. - # So just log and ignore for now! - _LOGGER.warning( - "Got STOP action for %s - but it's not implemented yet!", r_wfid - ) + self._handle_workflow_stop_message(r_wfid) def _handle_workflow_start_message(self, r_wfid: str) -> None: """Logic to handle a START message. This is the beginning of a new @@ -142,6 +140,40 @@ def _handle_workflow_start_message(self, r_wfid: str) -> None: # and error, stopping it. There will be no Pod event as the launch has failed. self._launch(rwf=rwf_response, rwfs_id=r_wfsid, step=first_step) + def _handle_workflow_stop_message(self, r_wfid: str) -> None: + """Logic to handle a STOP message.""" + # Do nothing if the running workflow has already stopped. + rwf_response, _ = self._wapi_adapter.get_running_workflow( + running_workflow_id=r_wfid + ) + _LOGGER.debug( + "API.get_running_workflow(%s) returned: -\n%s", r_wfid, str(rwf_response) + ) + if not rwf_response: + _LOGGER.debug("Running workflow does not exist (%s)", r_wfid) + return + elif rwf_response["done"] is True: + _LOGGER.debug("Running workflow already stopped (%s)", r_wfid) + return + + # For this version all we can do is check that no steps are running. + # If no steps are running we can safely mark the running workflow as stopped. + response, _ = self._wapi_adapter.get_running_steps(running_workflow_id=r_wfid) + _LOGGER.debug( + "API.get_running_steps(%s) returned: -\n%s", r_wfid, str(response) + ) + if response: + if count := response["count"]: + msg: str = "1 step is" if count == 1 else f"{count} steps are" + _LOGGER.debug("Ignoring STOP for %s. %s still running", r_wfid, msg) + else: + self._wapi_adapter.set_running_workflow_done( + running_workflow_id=r_wfid, + success=False, + error_num=1, + error_msg="User stopped", + ) + def _handle_pod_message(self, msg: PodMessage) -> None: """Handles a PodMessage. This is a message that signals the completion of a prior step Job within an existing running workflow.