Skip to content

Commit b9fe02e

Browse files
author
Alan Christie
committed
feat: Launcher now uses API adapter (and better API adapter & tests)
1 parent 0b3f7de commit b9fe02e

File tree

6 files changed

+176
-33
lines changed

6 files changed

+176
-33
lines changed

tests/api_adapter.py

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
_JOB_DEFINITIONS: Dict[str, Any] = yaml.load(jd_file, Loader=yaml.FullLoader)
1414
assert _JOB_DEFINITIONS
1515

16+
_INSTANCE_ID_FORMAT: str = "instance-00000000-0000-0000-0000-{id:012d}"
1617
_WORKFLOW_DEFINITION_ID_FORMAT: str = "workflow-00000000-0000-0000-0000-{id:012d}"
1718
_RUNNING_WORKFLOW_ID_FORMAT: str = "r-workflow-00000000-0000-0000-0000-{id:012d}"
1819
_RUNNING_WORKFLOW_STEP_ID_FORMAT: str = (
@@ -31,8 +32,9 @@ def __init__(self):
3132
self._workflow_definitions: Dict[str, Dict[str, Any]] = {}
3233
self._running_workflow: Dict[str, Dict[str, Any]] = {}
3334
self._running_workflow_steps: Dict[str, Dict[str, Any]] = {}
35+
self._instances: Dict[str, Dict[str, Any]] = {}
3436

35-
def save_workflow(self, *, workflow_definition: Dict[str, Any]) -> str:
37+
def create_workflow(self, *, workflow_definition: Dict[str, Any]) -> str:
3638
next_id: int = len(self._workflow_definitions) + 1
3739
workflow_definition_id: str = _WORKFLOW_DEFINITION_ID_FORMAT.format(id=next_id)
3840
self._workflow_definitions[workflow_definition_id] = workflow_definition
@@ -60,17 +62,20 @@ def create_running_workflow(self, *, workflow_definition_id: str) -> str:
6062
def get_running_workflow(self, *, running_workflow_id: str) -> Dict[str, Any]:
6163
if running_workflow_id not in self._running_workflow:
6264
return {}
63-
return {"running-workflow": self._running_workflow[running_workflow_id]}
65+
return {"running_workflow": self._running_workflow[running_workflow_id]}
6466

65-
def create_running_workflow_step(self, *, running_workflow_id: str) -> str:
67+
def create_running_workflow_step(
68+
self, *, running_workflow_id: str, step: str
69+
) -> str:
6670
next_id: int = len(self._running_workflow_steps) + 1
6771
running_workflow_step_id: str = _RUNNING_WORKFLOW_STEP_ID_FORMAT.format(
6872
id=next_id
6973
)
7074
record = {
75+
"step": step,
7176
"done": False,
7277
"success": False,
73-
"running-workflow": running_workflow_id,
78+
"running_workflow": running_workflow_id,
7479
}
7580
self._running_workflow_steps[running_workflow_step_id] = record
7681
return {"id": running_workflow_step_id}
@@ -80,17 +85,35 @@ def get_running_workflow_step(
8085
) -> Dict[str, Any]:
8186
if running_workflow_step_id not in self._running_workflow_steps:
8287
return {}
83-
return {"id": self._running_workflow_steps[running_workflow_step_id]}
88+
return {
89+
"running_workflow_step": self._running_workflow_steps[
90+
running_workflow_step_id
91+
]
92+
}
8493

8594
def get_running_workflow_steps(
8695
self, *, running_workflow_id: str
8796
) -> List[Dict[str, Any]]:
8897
steps = []
8998
for key, value in self._running_workflow_steps.items():
90-
if value["running-workflow"] == running_workflow_id:
91-
item = {"running-workflow-step": value, "id": key}
99+
if value["running_workflow"] == running_workflow_id:
100+
item = {"running_workflow_step": value, "id": key}
92101
steps.append(item)
93-
return {"count": len(steps), "running-workflow-steps": steps}
102+
return {"count": len(steps), "running_workflow_steps": steps}
103+
104+
def create_instance(self, *, running_workflow_step_id: str) -> Dict[str, Any]:
105+
next_id: int = len(self._instances) + 1
106+
instance_id: str = _INSTANCE_ID_FORMAT.format(id=next_id)
107+
record = {
108+
"running_workflow_step": running_workflow_step_id,
109+
}
110+
self._instances[instance_id] = record
111+
return {"instance_id": running_workflow_step_id}
112+
113+
def get_instance(self, *, instance_id: str) -> Dict[str, Any]:
114+
if instance_id not in self._instances:
115+
return {}
116+
return {self._instances[instance_id]}
94117

95118
def get_job(
96119
self, *, collection: str, job: str, version: str

tests/instance_launcher.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
from informaticsmatters.protobuf.datamanager.pod_message_pb2 import PodMessage
77

8+
from tests.api_adapter import UnitTestAPIAdapter
89
from tests.message_dispatcher import UnitTestMessageDispatcher
910
from workflow.workflow_abc import InstanceLauncher, LaunchResult
1011

@@ -18,9 +19,12 @@ class UnitTestInstanceLauncher(InstanceLauncher):
1819
'end of instance' PodMessage (to the WorkflowEngine).
1920
"""
2021

21-
def __init__(self, msg_dispatcher: UnitTestMessageDispatcher):
22+
def __init__(
23+
self, api_adapter: UnitTestAPIAdapter, msg_dispatcher: UnitTestMessageDispatcher
24+
):
2225
super().__init__()
2326

27+
self._api_adapter = api_adapter
2428
self._msg_dispatcher = msg_dispatcher
2529

2630
def launch(

tests/test_test_api_adapter.py

Lines changed: 95 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,21 +33,21 @@ def test_get_unknown_workflow():
3333
assert wfd == {}
3434

3535

36-
def test_save_workflow():
36+
def test_create_workflow():
3737
# Arrange
3838
utaa = UnitTestAPIAdapter()
3939

4040
# Act
41-
wfid = utaa.save_workflow(workflow_definition={"name": "blah"})
41+
response = utaa.create_workflow(workflow_definition={"name": "blah"})
4242

4343
# Assert
44-
assert wfid == {"id": "workflow-00000000-0000-0000-0000-000000000001"}
44+
assert response["id"] == "workflow-00000000-0000-0000-0000-000000000001"
4545

4646

4747
def test_get_workflow():
4848
# Arrange
4949
utaa = UnitTestAPIAdapter()
50-
response = utaa.save_workflow(workflow_definition={"name": "blah"})
50+
response = utaa.create_workflow(workflow_definition={"name": "blah"})
5151
wfid = response["id"]
5252

5353
# Act
@@ -60,11 +60,101 @@ def test_get_workflow():
6060
def test_get_workflow_by_name():
6161
# Arrange
6262
utaa = UnitTestAPIAdapter()
63-
_ = utaa.save_workflow(workflow_definition={"name": "blah"})
63+
_ = utaa.create_workflow(workflow_definition={"name": "blah"})
6464

6565
# Act
6666
response = utaa.get_workflow_by_name(name="blah", version="1.0.0")
6767

6868
# Assert
6969
assert response["workflow"]["name"] == "blah"
7070
assert "id" in response
71+
72+
73+
def test_create_running_workflow():
74+
# Arrange
75+
utaa = UnitTestAPIAdapter()
76+
response = utaa.create_workflow(workflow_definition={"name": "blah"})
77+
78+
# Act
79+
response = utaa.create_running_workflow(workflow_definition_id=response["id"])
80+
81+
# Assert
82+
assert response["id"] == "r-workflow-00000000-0000-0000-0000-000000000001"
83+
84+
85+
def test_get_running_workflow():
86+
# Arrange
87+
utaa = UnitTestAPIAdapter()
88+
response = utaa.create_workflow(workflow_definition={"name": "blah"})
89+
wfid = response["id"]
90+
response = utaa.create_running_workflow(workflow_definition_id=wfid)
91+
rwfid = response["id"]
92+
93+
# Act
94+
response = utaa.get_running_workflow(running_workflow_id=rwfid)
95+
96+
# Assert
97+
rwf = response["running_workflow"]
98+
assert not rwf["done"]
99+
assert rwf["workflow"] == wfid
100+
101+
102+
def test_create_running_workflow_step():
103+
# Arrange
104+
utaa = UnitTestAPIAdapter()
105+
response = utaa.create_workflow(workflow_definition={"name": "blah"})
106+
response = utaa.create_running_workflow(workflow_definition_id=response["id"])
107+
108+
# Act
109+
response = utaa.create_running_workflow_step(
110+
running_workflow_id=response["id"], step="step-1"
111+
)
112+
113+
# Assert
114+
assert response["id"] == "r-workflow-step-00000000-0000-0000-0000-000000000001"
115+
116+
117+
def test_get_running_workflow_step():
118+
# Arrange
119+
utaa = UnitTestAPIAdapter()
120+
response = utaa.create_workflow(workflow_definition={"name": "blah"})
121+
wfid = response["id"]
122+
response = utaa.create_running_workflow(workflow_definition_id=wfid)
123+
rwfid = response["id"]
124+
response = utaa.create_running_workflow_step(
125+
running_workflow_id=rwfid, step="step-1"
126+
)
127+
rwfsid = response["id"]
128+
129+
# Act
130+
response = utaa.get_running_workflow_step(running_workflow_step_id=rwfsid)
131+
132+
# Assert
133+
rwfs = response["running_workflow_step"]
134+
assert rwfs["step"] == "step-1"
135+
assert not rwfs["done"]
136+
assert rwfs["running_workflow"] == rwfid
137+
138+
139+
def test_get_running_workflow_steps():
140+
# Arrange
141+
utaa = UnitTestAPIAdapter()
142+
response = utaa.create_workflow(workflow_definition={"name": "blah"})
143+
wfid = response["id"]
144+
response = utaa.create_running_workflow(workflow_definition_id=wfid)
145+
rwfid = response["id"]
146+
response = utaa.create_running_workflow_step(
147+
running_workflow_id=rwfid, step="step-1"
148+
)
149+
rwfsid = response["id"]
150+
151+
# Act
152+
response = utaa.get_running_workflow_steps(running_workflow_id=rwfid)
153+
154+
# Assert
155+
assert response["count"] == 1
156+
rwfs = response["running_workflow_steps"][0]
157+
assert rwfs["id"] == rwfsid
158+
assert rwfs["running_workflow_step"]["step"] == "step-1"
159+
assert not rwfs["running_workflow_step"]["done"]
160+
assert rwfs["running_workflow_step"]["running_workflow"] == rwfid

tests/test_test_instance_launcher.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,20 @@
44

55
pytestmark = pytest.mark.unit
66

7+
from tests.api_adapter import UnitTestAPIAdapter
78
from tests.instance_launcher import UnitTestInstanceLauncher
89
from tests.message_dispatcher import UnitTestMessageDispatcher
910
from tests.message_queue import UnitTestMessageQueue
1011

1112

1213
@pytest.fixture
1314
def basic_launcher():
14-
utmq = UnitTestMessageQueue()
15-
utmd = UnitTestMessageDispatcher(msg_queue=utmq)
16-
return UnitTestInstanceLauncher(msg_dispatcher=utmd)
15+
api_adapter = UnitTestAPIAdapter()
16+
message_queue = UnitTestMessageQueue()
17+
message_dispatcher = UnitTestMessageDispatcher(msg_queue=message_queue)
18+
return UnitTestInstanceLauncher(
19+
api_adapter=api_adapter, msg_dispatcher=message_dispatcher
20+
)
1721

1822

1923
def test_get_nop_job(basic_launcher):

tests/test_workflow_engine_with_examples.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ def basic_engine():
2020
api_adapter = UnitTestAPIAdapter()
2121
message_queue = UnitTestMessageQueue()
2222
message_dispatcher = UnitTestMessageDispatcher(msg_queue=message_queue)
23-
instance_launcher = UnitTestInstanceLauncher(msg_dispatcher=message_dispatcher)
23+
instance_launcher = UnitTestInstanceLauncher(
24+
api_adapter=api_adapter, msg_dispatcher=message_dispatcher
25+
)
2426
return [
2527
api_adapter,
2628
message_dispatcher,
@@ -34,7 +36,7 @@ def test_workflow_engine_with_example_1(basic_engine):
3436
# LOAD THE EXAMPLE-1 WORKFLOW DEFINITION INTO THE DATABASE
3537
# TODO
3638
# SIMULATE THE API CREATION OF A RUNNING WORKFLOW FROM THE WORKFLOW
37-
wfid = da.save_workflow(workflow_definition={"name": "example-1"})
39+
wfid = da.create_workflow(workflow_definition={"name": "example-1"})
3840
assert wfid
3941
response = da.create_running_workflow(workflow_definition_id=wfid)
4042
r_wfid = response["id"]
@@ -57,7 +59,7 @@ def test_workflow_engine_with_example_1(basic_engine):
5759
r_wf = None
5860
while not done:
5961
response = da.get_running_workflow(running_workflow_id=r_wfid)
60-
r_wf = response["running-workflow"]
62+
r_wf = response["running_workflow"]
6163
if r_wf["done"]:
6264
done = True
6365
else:
@@ -73,6 +75,6 @@ def test_workflow_engine_with_example_1(basic_engine):
7375
response = da.get_running_workflow_steps(running_workflow_id=r_wfid)
7476
# TODO - The following should not be zero but the implementation does not set it yet
7577
assert response["count"] == 0
76-
for step in response["running-workflow-steps"]:
77-
assert step["running-workflow-step"]["done"]
78-
assert step["running-workflow-step"]["success"]
78+
for step in response["running_workflow_steps"]:
79+
assert step["running_workflow_step"]["done"]
80+
assert step["running_workflow_step"]["success"]

workflow/workflow_abc.py

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,12 @@ class APIAdapter(ABC):
6161
records returning dictionary (API-like) responses."""
6262

6363
@abstractmethod
64-
def save_workflow(
64+
def create_workflow(
6565
self,
6666
*,
6767
workflow_definition: Dict[str, Any],
68-
) -> str:
69-
"""Save a Workflow, getting an ID in return"""
68+
) -> Dict[str, Any]:
69+
"""Create a Workflow, getting an ID in return"""
7070
# Should return:
7171
# {
7272
# "id": "workflow-00000000-0000-0000-0000-000000000001",
@@ -103,7 +103,7 @@ def create_running_workflow(
103103
self,
104104
*,
105105
workflow_definition_id: str,
106-
) -> str:
106+
) -> Dict[str, Any]:
107107
"""Create a RunningWorkflow Record (from a Workflow)"""
108108
# Should return:
109109
# {
@@ -115,7 +115,7 @@ def get_running_workflow(self, *, running_workflow_id: str) -> Dict[str, Any]:
115115
"""Get a RunningWorkflow Record"""
116116
# Should return:
117117
# {
118-
# "running-workflow": {
118+
# "running_workflow": {
119119
# "done": False,
120120
# "success": false,
121121
# "workflow": "workflow-000",
@@ -127,7 +127,7 @@ def create_running_workflow_step(
127127
self,
128128
*,
129129
running_workflow_id: str,
130-
) -> str:
130+
) -> Dict[str, Any]:
131131
"""Create a RunningWorkflowStep Record (from a RunningWorkflow)"""
132132
# Should return:
133133
# {
@@ -141,10 +141,11 @@ def get_running_workflow_step(
141141
"""Get a RunningWorkflowStep Record"""
142142
# Should return:
143143
# {
144-
# "running-workflow-step": {
144+
# "running_workflow_step": {
145+
# "step:": "step-1234",
145146
# "done": False,
146147
# "success": false,
147-
# "running-workflow": "r-workflow-000",
148+
# "running_workflow": "r-workflow-000",
148149
# },
149150
# }
150151

@@ -156,10 +157,11 @@ def get_running_workflow_steps(
156157
# Should return:
157158
# {
158159
# "count": 1,
159-
# "running-workflow-steps": [
160+
# "running_workflow_steps": [
160161
# {
161162
# "id": "r-workflow-step-00000000-0000-0000-0000-000000000001",
162-
# "running-workflow-step": {
163+
# "running_workflow_step": {
164+
# "step:": "step-1234",
163165
# "done": False,
164166
# "success": false,
165167
# "workflow": "workflow-000",
@@ -168,6 +170,24 @@ def get_running_workflow_steps(
168170
# ]
169171
# }
170172

173+
@abstractmethod
174+
def create_instance(self, running_workflow_step_id: str) -> Dict[str, Any]:
175+
"""Create an Instance Record (for a RunningWorkflowStep)"""
176+
# Should return:
177+
# {
178+
# "instance_id": "instance-00000000-0000-0000-0000-000000000001",
179+
# "task_id": "task-00000000-0000-0000-0000-000000000001",
180+
# }
181+
182+
@abstractmethod
183+
def get_instance(self, *, instance_id: str) -> Dict[str, Any]:
184+
"""Get an Instance Record"""
185+
# Should return:
186+
# {
187+
# "running_workflow_step": "r-workflow-step-00000000-0000-0000-0000-000000000001",
188+
# [...],
189+
# }
190+
171191
@abstractmethod
172192
def get_job(
173193
self,

0 commit comments

Comments
 (0)