Skip to content

Commit 65a5d79

Browse files
Merge pull request #24 from InformaticsMatters/sc-3453
Initial support for STOP [sc-3453]
2 parents 11c2c3c + c1bf90e commit 65a5d79

File tree

4 files changed

+83
-8
lines changed

4 files changed

+83
-8
lines changed

tests/test_test_api_adapter.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,27 @@ def test_get_running_workflow():
9595
assert response["variables"] == {"x": 1}
9696

9797

98+
def test_get_running_steps_when_none_running():
99+
# Arrange
100+
utaa = UnitTestWorkflowAPIAdapter()
101+
response = utaa.create_workflow(workflow_definition={"name": "blah"})
102+
wfid = response["id"]
103+
response = utaa.create_running_workflow(
104+
user_id="dlister",
105+
workflow_id=wfid,
106+
project_id=TEST_PROJECT_ID,
107+
variables={"x": 1},
108+
)
109+
rwfid = response["id"]
110+
111+
# Act
112+
response, _ = utaa.get_running_steps(running_workflow_id=rwfid)
113+
114+
# Assert
115+
assert response["count"] == 0
116+
assert response["steps"] == []
117+
118+
98119
def test_set_running_workflow_done_when_success():
99120
# Arrange
100121
utaa = UnitTestWorkflowAPIAdapter()

tests/wapi_adapter.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,12 @@ def get_running_workflow(
102102
response["id"] = running_workflow_id
103103
return response, 0
104104

105+
def get_running_steps(
106+
self, *, running_workflow_id: str
107+
) -> tuple[dict[str, Any], int]:
108+
# Does nothing at the moment - this is used for the STOP logic.
109+
return {"count": 0, "steps": []}, 0
110+
105111
def set_running_workflow_done(
106112
self,
107113
*,

workflow/workflow_abc.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,22 @@ def get_running_workflow(
149149
# }
150150
# If not present an empty dictionary should be returned.
151151

152+
@abstractmethod
153+
def get_running_steps(
154+
self, *, running_workflow_id: str
155+
) -> tuple[dict[str, Any], int]:
156+
"""Get a list of steps (their names) that are currently running for the
157+
given RunningWorkflow Record"""
158+
# Should return:
159+
# {
160+
# "count": 1,
161+
# "steps": [
162+
# {
163+
# "name:": "step-1234"
164+
# }
165+
# ]
166+
# }
167+
152168
@abstractmethod
153169
def set_running_workflow_done(
154170
self,
@@ -201,7 +217,7 @@ def get_running_workflow_step(
201217
# can be expected in the response: -
202218
#
203219
# "prior_running_workflow_step": {
204-
# "id": "r-worflkow-step-00000000-0000-0000-0000-000000000001",
220+
# "id": "r-workflow-step-00000000-0000-0000-0000-000000000001",
205221
# },
206222

207223
@abstractmethod

workflow/workflow_engine.py

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,25 +77,23 @@ def handle_message(self, msg: Message) -> None:
7777

7878
def _handle_workflow_message(self, msg: WorkflowMessage) -> None:
7979
"""WorkflowMessages signal the need to start (or stop) a workflow using its
80-
'action' string field (one of 'START' or 'START').
80+
'action' string field (one of 'START' or 'STOP').
8181
The message contains a 'running_workflow' field that contains the UUID
8282
of an existing RunningWorkflow record in the DM. Using this
8383
we can locate the Workflow record and interrogate that to identify which
8484
step (or steps) to launch (run) first."""
8585
assert msg
8686

8787
_LOGGER.info("WorkflowMessage:\n%s", str(msg))
88-
assert msg.action in ["START", "STOP"]
88+
if msg.action not in ["START", "STOP"]:
89+
_LOGGER.error("Ignoring unsupported action (%s)", msg.action)
90+
return
8991

9092
r_wfid = msg.running_workflow
9193
if msg.action == "START":
9294
self._handle_workflow_start_message(r_wfid)
9395
else:
94-
# STOP is not implemented yet and probably not for some time.
95-
# So just log and ignore for now!
96-
_LOGGER.warning(
97-
"Got STOP action for %s - but it's not implemented yet!", r_wfid
98-
)
96+
self._handle_workflow_stop_message(r_wfid)
9997

10098
def _handle_workflow_start_message(self, r_wfid: str) -> None:
10199
"""Logic to handle a START message. This is the beginning of a new
@@ -142,6 +140,40 @@ def _handle_workflow_start_message(self, r_wfid: str) -> None:
142140
# and error, stopping it. There will be no Pod event as the launch has failed.
143141
self._launch(rwf=rwf_response, rwfs_id=r_wfsid, step=first_step)
144142

143+
def _handle_workflow_stop_message(self, r_wfid: str) -> None:
144+
"""Logic to handle a STOP message."""
145+
# Do nothing if the running workflow has already stopped.
146+
rwf_response, _ = self._wapi_adapter.get_running_workflow(
147+
running_workflow_id=r_wfid
148+
)
149+
_LOGGER.debug(
150+
"API.get_running_workflow(%s) returned: -\n%s", r_wfid, str(rwf_response)
151+
)
152+
if not rwf_response:
153+
_LOGGER.debug("Running workflow does not exist (%s)", r_wfid)
154+
return
155+
elif rwf_response["done"] is True:
156+
_LOGGER.debug("Running workflow already stopped (%s)", r_wfid)
157+
return
158+
159+
# For this version all we can do is check that no steps are running.
160+
# If no steps are running we can safely mark the running workflow as stopped.
161+
response, _ = self._wapi_adapter.get_running_steps(running_workflow_id=r_wfid)
162+
_LOGGER.debug(
163+
"API.get_running_steps(%s) returned: -\n%s", r_wfid, str(response)
164+
)
165+
if response:
166+
if count := response["count"]:
167+
msg: str = "1 step is" if count == 1 else f"{count} steps are"
168+
_LOGGER.debug("Ignoring STOP for %s. %s still running", r_wfid, msg)
169+
else:
170+
self._wapi_adapter.set_running_workflow_done(
171+
running_workflow_id=r_wfid,
172+
success=False,
173+
error_num=1,
174+
error_msg="User stopped",
175+
)
176+
145177
def _handle_pod_message(self, msg: PodMessage) -> None:
146178
"""Handles a PodMessage. This is a message that signals the completion of a
147179
prior step Job within an existing running workflow.

0 commit comments

Comments
 (0)