Skip to content

Commit e998045

Browse files
author
Alan Christie
committed
fix: basic engine implementation
1 parent 6553b58 commit e998045

File tree

6 files changed

+198
-15
lines changed

6 files changed

+198
-15
lines changed

tests/api_adapter.py

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,15 @@ def create_workflow(self, *, workflow_definition: Dict[str, Any]) -> str:
6868

6969
return {"id": workflow_definition_id}
7070

71-
def get_workflow(self, *, workflow_definition_id: str) -> Dict[str, Any]:
71+
def get_workflow(self, *, workflow_id: str) -> Dict[str, Any]:
7272
UnitTestAPIAdapter.mp_lock.acquire()
7373
with open(f"tests/{_WORKFLOW_PICKLE_FILE}", "rb") as pickle_file:
7474
workflow = Unpickler(pickle_file).load()
7575
UnitTestAPIAdapter.mp_lock.release()
7676

77-
if workflow_definition_id not in workflow:
77+
if workflow_id not in workflow:
7878
return {}
79-
return {"workflow": workflow[workflow_definition_id]}
79+
return {"workflow": workflow[workflow_id]}
8080

8181
def get_workflow_by_name(self, *, name: str, version: str) -> Dict[str, Any]:
8282
UnitTestAPIAdapter.mp_lock.acquire()
@@ -106,6 +106,21 @@ def create_running_workflow(self, *, workflow_definition_id: str) -> str:
106106

107107
return {"id": running_workflow_id}
108108

109+
def set_running_workflow_done(
110+
self, *, running_workflow_id: str, success: bool
111+
) -> None:
112+
UnitTestAPIAdapter.mp_lock.acquire()
113+
with open(f"tests/{_RUNNING_WORKFLOW_PICKLE_FILE}", "rb") as pickle_file:
114+
running_workflow = Unpickler(pickle_file).load()
115+
116+
assert running_workflow_id in running_workflow
117+
running_workflow[running_workflow_id]["done"] = True
118+
running_workflow[running_workflow_id]["success"] = success
119+
120+
with open(f"tests/{_RUNNING_WORKFLOW_PICKLE_FILE}", "wb") as pickle_file:
121+
Pickler(pickle_file).dump(running_workflow)
122+
UnitTestAPIAdapter.mp_lock.release()
123+
109124
def get_running_workflow(self, *, running_workflow_id: str) -> Dict[str, Any]:
110125
UnitTestAPIAdapter.mp_lock.acquire()
111126
with open(f"tests/{_RUNNING_WORKFLOW_PICKLE_FILE}", "rb") as pickle_file:
@@ -155,6 +170,21 @@ def get_running_workflow_step(
155170
"running_workflow_step": running_workflow_step[running_workflow_step_id]
156171
}
157172

173+
def set_running_workflow_step_done(
174+
self, *, running_workflow_step_id: str, success: bool
175+
) -> None:
176+
UnitTestAPIAdapter.mp_lock.acquire()
177+
with open(f"tests/{_RUNNING_WORKFLOW_STEP_PICKLE_FILE}", "rb") as pickle_file:
178+
running_workflow_step = Unpickler(pickle_file).load()
179+
180+
assert running_workflow_step_id in running_workflow_step
181+
running_workflow_step[running_workflow_step_id]["done"] = True
182+
running_workflow_step[running_workflow_step_id]["success"] = success
183+
184+
with open(f"tests/{_RUNNING_WORKFLOW_STEP_PICKLE_FILE}", "wb") as pickle_file:
185+
Pickler(pickle_file).dump(running_workflow_step)
186+
UnitTestAPIAdapter.mp_lock.release()
187+
158188
def get_running_workflow_steps(
159189
self, *, running_workflow_id: str
160190
) -> List[Dict[str, Any]]:

tests/test_test_api_adapter.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@ def test_get_unknown_workflow():
2525
utaa = UnitTestAPIAdapter()
2626

2727
# Act
28-
wfd = utaa.get_workflow(
29-
workflow_definition_id="workflow-00000000-0000-0000-0000-000000000001"
30-
)
28+
wfd = utaa.get_workflow(workflow_id="workflow-00000000-0000-0000-0000-000000000001")
3129

3230
# Assert
3331
assert wfd == {}
@@ -51,7 +49,7 @@ def test_get_workflow():
5149
wfid = response["id"]
5250

5351
# Act
54-
wf = utaa.get_workflow(workflow_definition_id=wfid)
52+
wf = utaa.get_workflow(workflow_id=wfid)
5553

5654
# Assert
5755
assert wf["workflow"]["name"] == "blah"

tests/test_workflow_engine_examples.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ def test_workflow_engine_with_two_step_nop(basic_engine):
9797
mq.join()
9898
print("Stopped")
9999
assert r_wf
100+
assert r_wf["done"]
100101
assert r_wf["success"]
101102
# Now check there are the right number of RunningWorkflowStep Records
102103
# (and they're all set to success/done)

tests/workflow-definitions/example-two-step-nop.yaml

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,16 @@ kind-version: "2024.1"
44
name: two-step-nop
55
steps:
66
- name: step-1
7-
specification: "{'collection': 'workflow-engine-unit-test-jobs', 'job': 'nop', 'version: '1.0.0'}"
7+
specification: >-
8+
{
9+
'collection': 'workflow-engine-unit-test-jobs',
10+
'job': 'nop',
11+
'version': '1.0.0'
12+
}
813
- name: step-2
9-
specification: "{'collection': 'workflow-engine-unit-test-jobs', 'job': 'nop', 'version: '1.0.0'}"
14+
specification: >-
15+
{
16+
'collection': 'workflow-engine-unit-test-jobs',
17+
'job': 'nop',
18+
'version': '1.0.0'
19+
}

workflow/workflow_abc.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def create_workflow(
6969
def get_workflow(
7070
self,
7171
*,
72-
workflow_definition_id: str,
72+
workflow_id: str,
7373
) -> Dict[str, Any]:
7474
"""Get a Workflow Record by ID."""
7575
# If present this should return:
@@ -103,6 +103,12 @@ def create_running_workflow(
103103
# "id": "r-workflow-00000000-0000-0000-0000-000000000001",
104104
# }
105105

106+
@abstractmethod
107+
def set_running_workflow_done(
108+
self, *, running_workflow_id: str, success: bool
109+
) -> None:
110+
"""Set the success value for a RunningWorkflow Record"""
111+
106112
@abstractmethod
107113
def get_running_workflow(self, *, running_workflow_id: str) -> Dict[str, Any]:
108114
"""Get a RunningWorkflow Record"""
@@ -120,6 +126,7 @@ def create_running_workflow_step(
120126
self,
121127
*,
122128
running_workflow_id: str,
129+
step: str,
123130
) -> Dict[str, Any]:
124131
"""Create a RunningWorkflowStep Record (from a RunningWorkflow)"""
125132
# Should return:
@@ -142,6 +149,12 @@ def get_running_workflow_step(
142149
# },
143150
# }
144151

152+
@abstractmethod
153+
def set_running_workflow_step_done(
154+
self, *, running_workflow_step_id: str, success: bool
155+
) -> None:
156+
"""Set the success value for a RunningWorkflowStep Record"""
157+
145158
@abstractmethod
146159
def get_running_workflow_steps(
147160
self, *, running_workflow_id: str

workflow/workflow_engine.py

Lines changed: 136 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
"""The WorkflowEngine execution logic.
22
"""
33

4+
import ast
45
import logging
56
import sys
7+
from typing import Any, Dict
68

79
from google.protobuf.message import Message
810
from informaticsmatters.protobuf.datamanager.pod_message_pb2 import PodMessage
@@ -54,22 +56,151 @@ def handle_message(self, msg: Message) -> None:
5456
self._handle_workflow_message(msg)
5557

5658
def _handle_workflow_message(self, msg: WorkflowMessage) -> None:
59+
"""Handles a WorkflowMessage. This is a message that signals a START or STOP
60+
of a workflow. On START we will load the workflow definition and run (launch)
61+
the first step."""
5762
assert msg
5863

64+
# ALL THIS CODE ADDED SIMPLY TO DEMONSTRATE THE USE OF THE API ADAPTER
65+
# AND THE INSTANCE LAUNCHER FOR THE SIMPLEST OF WORKFLOWS: -
66+
# THE "TWO-STEP NOP".
67+
# THERE IS NWO SPECIFICATION MANIPULATION NEEDED FOR THIS EXAMPLE
68+
# THE STEPS HAVE NO INPUTS OR OUTPUTS.
69+
# THIS FUNCTION PROBABLY NEEDS TO BE A LOT MORE SOPHISTICATED!
70+
5971
_LOGGER.info("WE> WorkflowMessage:\n%s", str(msg))
6072
if msg.action == "START":
61-
_LOGGER.info("action=%s", msg.action)
62-
# Load the workflow definition and run the first step...
63-
rf = self._api_adapter.get_running_workflow(
73+
# Using the running workflow get the workflow definition
74+
response = self._api_adapter.get_running_workflow(
6475
running_workflow_id=msg.running_workflow
6576
)
66-
assert rf
67-
_LOGGER.info("RunningWorkflow: %s", rf)
77+
assert "running_workflow" in response
78+
running_workflow = response["running_workflow"]
79+
_LOGGER.info("RunningWorkflow: %s", running_workflow)
80+
workflow_id = running_workflow["workflow"]["id"]
81+
response = self._api_adapter.get_workflow(workflow_id=workflow_id)
82+
assert "workflow" in response
83+
workflow = response["workflow"]
84+
# Now find the first step
85+
# and create a RunningWorkflowStep record prior to launching the instance
86+
response = self._api_adapter.create_running_workflow_step(
87+
running_workflow_id=msg.running_workflow,
88+
step=workflow["steps"][0]["name"],
89+
)
90+
assert "id" in response
91+
running_workflow_step_id = response["id"]
92+
# The specification is a string here.
93+
# It needs to be a dictionary for the launch() method.
94+
step = workflow["steps"][0]
95+
step_specification: Dict[str, Any] = ast.literal_eval(step["specification"])
96+
self._instance_launcher.launch(
97+
project_id="project-000",
98+
workflow_id=workflow_id,
99+
running_workflow_step_id=running_workflow_step_id,
100+
workflow_definition=workflow,
101+
step_specification=step_specification,
102+
)
68103

69104
else:
70105
_LOGGER.info("action=%s", msg.action)
71106

72107
def _handle_pod_message(self, msg: PodMessage) -> None:
108+
"""Handles a PodMessage. This is a message that signals the completion of a
109+
step within a workflow. Steps run as "instances" and the Pod message
110+
identifies the Instance. Using the Instance record we can get the
111+
"running workflow step" and then identify the "running workflow" and the
112+
"workflow".
113+
114+
First thing is to adjust the workflow step with the step's success state and
115+
optional error code. If the step was successful we can find the next step
116+
and launch that, or consider the last step to have run and modify the
117+
running workflow record and set's it's success status."""
73118
assert msg
74119

120+
# The PodMessage has a 'instance', 'has_exit_code', and 'exit_code' values.
75121
_LOGGER.info("WE> PodMessage:\n%s", str(msg))
122+
123+
# ALL THIS CODE ADDED SIMPLY TO DEMONSTRATE THE USE OF THE API ADAPTER
124+
# AND THE INSTANCE LAUNCHER FOR THE SIMPLEST OF WORKFLOWS: -
125+
# THE "TWO-STEP NOP".
126+
# THERE IS NWO SPECIFICATION MANIPULATION NEEDED FOR THIS EXAMPLE
127+
# THE STEPS HAVE NO INPUTS OR OUTPUTS.
128+
# THIS FUNCTION PROBABLY NEEDS TO BE A LOT MORE SOPHISTICATED!
129+
130+
# Ignore anything without an exit code.
131+
if not msg.has_exit_code:
132+
_LOGGER.warning("WE> PodMessage: No exit code")
133+
return
134+
135+
instance_id: str = msg.instance
136+
exit_code: int = msg.exit_code
137+
_LOGGER.info(
138+
"WE> PodMessage: instance=%s, exit_code=%d", instance_id, exit_code
139+
)
140+
141+
# Ignore instances without a running workflow step
142+
response = self._api_adapter.get_instance(instance_id=instance_id)
143+
if "running_workflow_step" not in response:
144+
_LOGGER.warning("WE> PodMessage: Without running_workflow_step")
145+
return
146+
running_workflow_step_id: str = response["running_workflow_step"]
147+
response = self._api_adapter.get_running_workflow_step(
148+
running_workflow_step_id=running_workflow_step_id
149+
)
150+
step_name: str = response["running_workflow_step"]["step"]
151+
152+
# Set the step as completed (successful or otherwise)
153+
success: bool = exit_code == 0
154+
self._api_adapter.set_running_workflow_step_done(
155+
running_workflow_step_id=running_workflow_step_id,
156+
success=success,
157+
)
158+
159+
# Get the step's running workflow and workflow IDs and records.
160+
running_workflow_id = response["running_workflow_step"]["running_workflow"]
161+
assert running_workflow_id
162+
response = self._api_adapter.get_running_workflow(
163+
running_workflow_id=running_workflow_id
164+
)
165+
workflow_id = response["running_workflow"]["workflow"]["id"]
166+
assert workflow_id
167+
response = self._api_adapter.get_workflow(workflow_id=workflow_id)
168+
workflow = response["workflow"]
169+
170+
end_of_workflow: bool = False
171+
if success:
172+
# Given the step for the instance just finished,
173+
# find the next step in the workflow and launch it.
174+
# If there are no more steps then the workflow is done
175+
# so we need to set the running workflow as done
176+
# and set it's success status too.
177+
for step in workflow["steps"]:
178+
if step["name"] == step_name:
179+
step_index = workflow["steps"].index(step)
180+
if step_index + 1 < len(workflow["steps"]):
181+
next_step = workflow["steps"][step_index + 1]
182+
response = self._api_adapter.create_running_workflow_step(
183+
running_workflow_id=running_workflow_id,
184+
step=next_step["name"],
185+
)
186+
assert "id" in response
187+
running_workflow_step_id = response["id"]
188+
step_specification: Dict[str, Any] = ast.literal_eval(
189+
next_step["specification"]
190+
)
191+
self._instance_launcher.launch(
192+
project_id="project-000",
193+
workflow_id=workflow_id,
194+
running_workflow_step_id=running_workflow_step_id,
195+
workflow_definition=workflow,
196+
step_specification=step_specification,
197+
)
198+
break
199+
else:
200+
end_of_workflow = True
201+
202+
if end_of_workflow:
203+
self._api_adapter.set_running_workflow_done(
204+
running_workflow_id=running_workflow_id,
205+
success=success,
206+
)

0 commit comments

Comments
 (0)