Skip to content

Commit ae3e6cc

Browse files
author
Alan Christie
committed
style: A much improved database adapter ABC
1 parent a607716 commit ae3e6cc

File tree

4 files changed

+163
-25
lines changed

4 files changed

+163
-25
lines changed

tests/database_adapter.py

Lines changed: 63 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import os
2-
from typing import Any, Dict, Optional
2+
from typing import Any, Dict, List, Optional
33

44
import yaml
55

@@ -14,6 +14,10 @@
1414
assert _JOB_DEFINITIONS
1515

1616
_WORKFLOW_DEFINITION_ID_FORMAT: str = "workflow-00000000-0000-0000-0000-{id:012d}"
17+
_RUNNING_WORKFLOW_ID_FORMAT: str = "r-workflow-00000000-0000-0000-0000-{id:012d}"
18+
_RUNNING_WORKFLOW_STEP_ID_FORMAT: str = (
19+
"r-workflow-step-00000000-0000-0000-0000-{id:012d}"
20+
)
1721

1822

1923
class UnitTestDatabaseAdapter(DatabaseAdapter):
@@ -25,23 +29,72 @@ def __init__(self):
2529
super().__init__()
2630
# A map of workflow definitions, keyed by workflow definition ID.
2731
self._workflow_definitions: Dict[str, Dict[str, Any]] = {}
32+
self._running_workflow: Dict[str, Dict[str, Any]] = {}
33+
self._running_workflow_steps: Dict[str, Dict[str, Any]] = {}
2834

2935
def save_workflow(self, *, workflow_definition: Dict[str, Any]) -> str:
3036
next_id: int = len(self._workflow_definitions) + 1
3137
workflow_definition_id: str = _WORKFLOW_DEFINITION_ID_FORMAT.format(id=next_id)
3238
self._workflow_definitions[workflow_definition_id] = workflow_definition
33-
return workflow_definition_id
39+
return {"id": workflow_definition_id}
3440

35-
def get_workflow(self, *, workflow_definition_id: str) -> Optional[Dict[str, Any]]:
36-
return self._workflow_definitions.get(workflow_definition_id, None)
41+
def get_workflow(self, *, workflow_definition_id: str) -> Dict[str, Any]:
42+
if workflow_definition_id not in self._workflow_definitions:
43+
return {}
44+
return {"workflow": self._workflow_definitions[workflow_definition_id]}
3745

38-
def get_workflow_by_name(
39-
self, *, name: str, version: str
40-
) -> Optional[Dict[str, Any]]:
41-
return next(
42-
(wd for wd in self._workflow_definitions.values() if wd["name"] == name),
43-
None,
46+
def get_workflow_by_name(self, *, name: str, version: str) -> Dict[str, Any]:
47+
item = {}
48+
for wfid, value in self._workflow_definitions.items():
49+
if value["name"] == name:
50+
item = {"id": wfid, "workflow": value}
51+
return item
52+
53+
def create_running_workflow(self, *, workflow_definition_id: str) -> str:
54+
next_id: int = len(self._running_workflow) + 1
55+
running_workflow_id: str = _RUNNING_WORKFLOW_ID_FORMAT.format(id=next_id)
56+
record = {"done": False, "success": False, "workflow": workflow_definition_id}
57+
self._running_workflow[running_workflow_id] = record
58+
return {"id": running_workflow_id}
59+
60+
def get_running_workflow(self, *, running_workflow_id: str) -> Dict[str, Any]:
61+
if running_workflow_id not in self._running_workflow:
62+
return {}
63+
return {"running-workflow": self._running_workflow[running_workflow_id]}
64+
65+
def create_running_workflow_step(self, *, running_workflow_id: str) -> str:
66+
next_id: int = len(self._running_workflow_steps) + 1
67+
running_workflow_step_id: str = _RUNNING_WORKFLOW_STEP_ID_FORMAT.format(
68+
id=next_id
4469
)
70+
record = {
71+
"done": False,
72+
"success": False,
73+
"running-workflow": running_workflow_id,
74+
}
75+
self._running_workflow_steps[running_workflow_step_id] = record
76+
return {"id": running_workflow_step_id}
77+
78+
def get_running_workflow_step(
79+
self, *, running_workflow_step_id: str
80+
) -> Dict[str, Any]:
81+
if running_workflow_step_id not in self._running_workflow_steps:
82+
return {}
83+
return {"id": self._running_workflow_steps[running_workflow_step_id]}
84+
85+
def get_running_workflow_steps(
86+
self, *, running_workflow_id: str
87+
) -> List[Dict[str, Any]]:
88+
steps = []
89+
for key, value in self._running_workflow_steps.items():
90+
if value["running-workflow"] == running_workflow_id:
91+
item = {"running-workflow-step": value, "id": key}
92+
steps.append(item)
93+
count = len(steps)
94+
response = {"count": len(steps)}
95+
if count:
96+
response["running-workflow-steps"] = steps
97+
return response
4598

4699
def get_job(
47100
self, *, collection: str, job: str, version: str

tests/test_test_database_adapter.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ def test_get_unknown_workflow():
2525
utda = UnitTestDatabaseAdapter()
2626

2727
# Act
28-
wfid = utda.get_workflow(
28+
wfd = utda.get_workflow(
2929
workflow_definition_id="workflow-00000000-0000-0000-0000-000000000001"
3030
)
3131

3232
# Assert
33-
assert wfid is None
33+
assert wfd == {}
3434

3535

3636
def test_save_workflow():
@@ -41,28 +41,30 @@ def test_save_workflow():
4141
wfid = utda.save_workflow(workflow_definition={"name": "blah"})
4242

4343
# Assert
44-
assert wfid == "workflow-00000000-0000-0000-0000-000000000001"
44+
assert wfid == {"id": "workflow-00000000-0000-0000-0000-000000000001"}
4545

4646

4747
def test_get_workflow():
4848
# Arrange
4949
utda = UnitTestDatabaseAdapter()
50-
wfid = utda.save_workflow(workflow_definition={"name": "blah"})
50+
response = utda.save_workflow(workflow_definition={"name": "blah"})
51+
wfid = response["id"]
5152

5253
# Act
5354
wf = utda.get_workflow(workflow_definition_id=wfid)
5455

5556
# Assert
56-
assert wf == {"name": "blah"}
57+
assert wf["workflow"]["name"] == "blah"
5758

5859

5960
def test_get_workflow_by_name():
6061
# Arrange
6162
utda = UnitTestDatabaseAdapter()
62-
wfid = utda.save_workflow(workflow_definition={"name": "blah"})
63+
_ = utda.save_workflow(workflow_definition={"name": "blah"})
6364

6465
# Act
65-
wf = utda.get_workflow_by_name(name="blah", version="1.0.0")
66+
response = utda.get_workflow_by_name(name="blah", version="1.0.0")
6667

6768
# Assert
68-
assert wf == {"name": "blah"}
69+
assert response["workflow"]["name"] == "blah"
70+
assert "id" in response

workflow/workflow_abc.py

Lines changed: 88 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from abc import ABC, abstractmethod
66
from dataclasses import dataclass
7-
from typing import Any, Callable, Dict, Optional
7+
from typing import Any, Callable, Dict, List, Optional
88

99
from google.protobuf.message import Message
1010
from informaticsmatters.protobuf.datamanager.pod_message_pb2 import PodMessage
@@ -70,23 +70,106 @@ def save_workflow(
7070
workflow_definition: Dict[str, Any],
7171
) -> str:
7272
"""Save a Workflow, getting an ID in return"""
73+
# Should return:
74+
# {
75+
# "id": "workflow-00000000-0000-0000-0000-000000000001",
76+
# }
7377

7478
@abstractmethod
7579
def get_workflow(
7680
self,
7781
*,
7882
workflow_definition_id: str,
79-
) -> Optional[Dict[str, Any]]:
80-
"""Get a Workflow by ID"""
83+
) -> Dict[str, Any]:
84+
"""Get a Workflow Record by ID."""
85+
# If present this should return:
86+
# {
87+
# "workflow": <workflow>,
88+
# }
8189

8290
@abstractmethod
8391
def get_workflow_by_name(
8492
self,
8593
*,
8694
name: str,
8795
version: str,
88-
) -> Optional[Dict[str, Any]]:
89-
"""Get a Workflow by name"""
96+
) -> Dict[str, Any]:
97+
"""Get a Workflow Record by name"""
98+
# If present this should return:
99+
# {
100+
# "id": "workflow-00000000-0000-0000-0000-000000000001",
101+
# "workflow": <workflow>,
102+
# }
103+
104+
@abstractmethod
105+
def create_running_workflow(
106+
self,
107+
*,
108+
workflow_definition_id: str,
109+
) -> str:
110+
"""Create a RunningWorkflow Record (from a Workflow)"""
111+
# Should return:
112+
# {
113+
# "id": "r-workflow-00000000-0000-0000-0000-000000000001",
114+
# }
115+
116+
@abstractmethod
117+
def get_running_workflow(self, *, running_workflow_id: str) -> Dict[str, Any]:
118+
"""Get a RunningWorkflow Record"""
119+
# Should return:
120+
# {
121+
# "running-workflow": {
122+
# "done": False,
123+
# "success": false,
124+
# "workflow": "workflow-000",
125+
# }
126+
# }
127+
128+
@abstractmethod
129+
def create_running_workflow_step(
130+
self,
131+
*,
132+
running_workflow_id: str,
133+
) -> str:
134+
"""Create a RunningWorkflowStep Record (from a RunningWorkflow)"""
135+
# Should return:
136+
# {
137+
# "id": "r-workflow-step-00000000-0000-0000-0000-000000000001",
138+
# }
139+
140+
@abstractmethod
141+
def get_running_workflow_step(
142+
self, *, running_workflow_step_id: str
143+
) -> Dict[str, Any]:
144+
"""Get a RunningWorkflowStep Record"""
145+
# Should return:
146+
# {
147+
# "running-workflow-step": {
148+
# "done": False,
149+
# "success": false,
150+
# "running-workflow": "r-workflow-000",
151+
# },
152+
# }
153+
154+
@abstractmethod
155+
def get_running_workflow_steps(
156+
self, *, running_workflow_id: str
157+
) -> List[Dict[str, Any]]:
158+
"""Gets all the RunningWorkflowStep Records (for a RunningWorkflow)"""
159+
# Should return:
160+
# {
161+
# "count": 1,
162+
# "running-workflow-steps": [
163+
# {
164+
# "id": "r-workflow-step-00000000-0000-0000-0000-000000000001",
165+
# "running-workflow-step": {
166+
# "done": False,
167+
# "success": false,
168+
# "workflow": "workflow-000",
169+
# }
170+
# ...
171+
# ]
172+
# }
90173

91174
@abstractmethod
92175
def get_job(

workflow/workflow_engine.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ class WorkflowEngine:
1515
def __init__(
1616
self,
1717
*,
18-
instance_launcher: InstanceLauncher,
1918
db_adapter: DatabaseAdapter,
19+
instance_launcher: InstanceLauncher,
2020
):
2121
# Keep the dependent objects
22-
self._instance_launcher = instance_launcher
2322
self._db_adapter = db_adapter
23+
self._instance_launcher = instance_launcher
2424

2525
def handle_message(self, msg: Message) -> None:
2626
"""Given a Pod Message, we use it to identify the Pod (Instance) exit code,

0 commit comments

Comments
 (0)