Skip to content

Commit e1a6067

Browse files
committed
Merge branch 'main' into simple-linear
2 parents 4e41436 + 6c2c4d0 commit e1a6067

27 files changed

+1404
-316
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
dist/
33
**/__pycache__/
44

5-
65
*~
76
\#*#
87
\#*
8+
9+
**/*.pickle
10+
tests/project-root/project-*/

poetry.lock

Lines changed: 211 additions & 108 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ packages = [
1414
[tool.poetry.dependencies]
1515
python = "^3.12"
1616
im-protobuf = "^7.1.0"
17+
im-data-manager-job-decoder = "^2.1.0"
1718
jsonschema = "^4.21.1"
1819
pyyaml = ">= 5.3.1, < 7.0"
1920

tests/api_adapter.py

Lines changed: 217 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,23 @@
1+
"""The UnitTest API Adapter.
2+
3+
This 'simulates' the sort of responses you can expect from the DM API/Model.
4+
It stimulates a Database by using dictionaries that are pickled to (and unpickled from)
5+
the file system after acquiring a lock object. Pickling is required to
6+
(because it's a simple built-in mechanism in Python) to persist data
7+
between processes in the multi-processing framework we run in because we ultimately need
8+
to simulate the multi-pod messaging-based framework of the DM.
9+
10+
A separate pickle file is used for each 'simulated' model table and the object
11+
initialiser resets all the pickle files (located in 'tests/pickle-files').
12+
13+
Job definitions are loaded (statically) from the content of the
14+
'tests/job-definitions/job-definitions.yaml' file and yielded through the 'get_job()'
15+
method.
16+
"""
17+
118
import os
19+
from multiprocessing import Lock
20+
from pickle import Pickler, Unpickler
221
from typing import Any, Dict, List, Optional
322

423
import yaml
@@ -13,62 +32,162 @@
1332
_JOB_DEFINITIONS: Dict[str, Any] = yaml.load(jd_file, Loader=yaml.FullLoader)
1433
assert _JOB_DEFINITIONS
1534

35+
# Table UUID formats
1636
_INSTANCE_ID_FORMAT: str = "instance-00000000-0000-0000-0000-{id:012d}"
37+
_TASK_ID_FORMAT: str = "task-00000000-0000-0000-0000-{id:012d}"
1738
_WORKFLOW_DEFINITION_ID_FORMAT: str = "workflow-00000000-0000-0000-0000-{id:012d}"
1839
_RUNNING_WORKFLOW_ID_FORMAT: str = "r-workflow-00000000-0000-0000-0000-{id:012d}"
1940
_RUNNING_WORKFLOW_STEP_ID_FORMAT: str = (
2041
"r-workflow-step-00000000-0000-0000-0000-{id:012d}"
2142
)
2243

44+
# Pickle files (for each 'Table')
45+
_PICKLE_DIRECTORY: str = "tests/pickle-files"
46+
_WORKFLOW_PICKLE_FILE: str = f"{_PICKLE_DIRECTORY}/workflow.pickle"
47+
_RUNNING_WORKFLOW_PICKLE_FILE: str = f"{_PICKLE_DIRECTORY}/running-workflow.pickle"
48+
_RUNNING_WORKFLOW_STEP_PICKLE_FILE: str = (
49+
f"{_PICKLE_DIRECTORY}/running-workflow-step.pickle"
50+
)
51+
_INSTANCE_PICKLE_FILE: str = f"{_PICKLE_DIRECTORY}/instance.pickle"
52+
_TASK_PICKLE_FILE: str = f"{_PICKLE_DIRECTORY}/task.pickle"
53+
2354

2455
class UnitTestAPIAdapter(APIAdapter):
2556
"""A minimal API adapter. It serves-up Job Definitions
2657
from the job-definitions/job-definitions.yaml file and provides basic
27-
(in-memory) storage for Workflow Definitions and related tables."""
58+
storage for Workflow Definitions and related tables.
59+
60+
Because the adapter is used by the multi-processing test suite, it uses both a lock
61+
and pickle files to store data, so that data can be shared between processes.
62+
"""
63+
64+
lock = Lock()
2865

2966
def __init__(self):
3067
super().__init__()
31-
# A map of workflow definitions, keyed by workflow definition ID.
32-
self._workflow_definitions: Dict[str, Dict[str, Any]] = {}
33-
self._running_workflow: Dict[str, Dict[str, Any]] = {}
34-
self._running_workflow_steps: Dict[str, Dict[str, Any]] = {}
35-
self._instances: Dict[str, Dict[str, Any]] = {}
36-
self._tasks: Dict[str, Dict[str, Any]] = {}
68+
# Safely initialise the pickle files
69+
UnitTestAPIAdapter.lock.acquire()
70+
if not os.path.exists(_PICKLE_DIRECTORY):
71+
os.makedirs(_PICKLE_DIRECTORY)
72+
for file in [
73+
_WORKFLOW_PICKLE_FILE,
74+
_RUNNING_WORKFLOW_PICKLE_FILE,
75+
_RUNNING_WORKFLOW_STEP_PICKLE_FILE,
76+
_INSTANCE_PICKLE_FILE,
77+
_TASK_PICKLE_FILE,
78+
]:
79+
with open(file, "wb") as pickle_file:
80+
Pickler(pickle_file).dump({})
81+
UnitTestAPIAdapter.lock.release()
3782

3883
def create_workflow(self, *, workflow_definition: Dict[str, Any]) -> str:
39-
next_id: int = len(self._workflow_definitions) + 1
84+
UnitTestAPIAdapter.lock.acquire()
85+
with open(_WORKFLOW_PICKLE_FILE, "rb") as pickle_file:
86+
workflow = Unpickler(pickle_file).load()
87+
88+
next_id: int = len(workflow) + 1
4089
workflow_definition_id: str = _WORKFLOW_DEFINITION_ID_FORMAT.format(id=next_id)
41-
self._workflow_definitions[workflow_definition_id] = workflow_definition
90+
workflow[workflow_definition_id] = workflow_definition
91+
92+
with open(_WORKFLOW_PICKLE_FILE, "wb") as pickle_file:
93+
Pickler(pickle_file).dump(workflow)
94+
UnitTestAPIAdapter.lock.release()
95+
4296
return {"id": workflow_definition_id}
4397

44-
def get_workflow(self, *, workflow_definition_id: str) -> Dict[str, Any]:
45-
if workflow_definition_id not in self._workflow_definitions:
46-
return {}
47-
return {"workflow": self._workflow_definitions[workflow_definition_id]}
98+
def get_workflow(self, *, workflow_id: str) -> Dict[str, Any]:
99+
UnitTestAPIAdapter.lock.acquire()
100+
with open(_WORKFLOW_PICKLE_FILE, "rb") as pickle_file:
101+
workflow = Unpickler(pickle_file).load()
102+
UnitTestAPIAdapter.lock.release()
103+
104+
return {"workflow": workflow[workflow_id]} if workflow_id in workflow else {}
48105

49106
def get_workflow_by_name(self, *, name: str, version: str) -> Dict[str, Any]:
107+
UnitTestAPIAdapter.lock.acquire()
108+
with open(_WORKFLOW_PICKLE_FILE, "rb") as pickle_file:
109+
workflow = Unpickler(pickle_file).load()
110+
UnitTestAPIAdapter.lock.release()
111+
50112
item = {}
51-
for wfid, value in self._workflow_definitions.items():
113+
for wfid, value in workflow.items():
52114
if value["name"] == name:
53115
item = {"id": wfid, "workflow": value}
54116
return item
55117

56-
def create_running_workflow(self, *, workflow_definition_id: str) -> str:
57-
next_id: int = len(self._running_workflow) + 1
118+
def create_running_workflow(
119+
self,
120+
*,
121+
user_id: str,
122+
workflow_id: str,
123+
project_id: str,
124+
variables: Dict[str, Any],
125+
) -> str:
126+
assert user_id
127+
assert isinstance(variables, dict)
128+
129+
UnitTestAPIAdapter.lock.acquire()
130+
with open(_RUNNING_WORKFLOW_PICKLE_FILE, "rb") as pickle_file:
131+
running_workflow = Unpickler(pickle_file).load()
132+
133+
next_id: int = len(running_workflow) + 1
58134
running_workflow_id: str = _RUNNING_WORKFLOW_ID_FORMAT.format(id=next_id)
59-
record = {"done": False, "success": False, "workflow": workflow_definition_id}
60-
self._running_workflow[running_workflow_id] = record
135+
record = {
136+
"user_id": user_id,
137+
"done": False,
138+
"success": False,
139+
"workflow": workflow_id,
140+
"project_id": project_id,
141+
"variables": variables,
142+
}
143+
running_workflow[running_workflow_id] = record
144+
145+
with open(_RUNNING_WORKFLOW_PICKLE_FILE, "wb") as pickle_file:
146+
Pickler(pickle_file).dump(running_workflow)
147+
UnitTestAPIAdapter.lock.release()
148+
61149
return {"id": running_workflow_id}
62150

151+
def set_running_workflow_done(
152+
self,
153+
*,
154+
running_workflow_id: str,
155+
success: bool,
156+
error: Optional[int] = None,
157+
error_msg: Optional[str] = None,
158+
) -> None:
159+
UnitTestAPIAdapter.lock.acquire()
160+
with open(_RUNNING_WORKFLOW_PICKLE_FILE, "rb") as pickle_file:
161+
running_workflow = Unpickler(pickle_file).load()
162+
163+
assert running_workflow_id in running_workflow
164+
running_workflow[running_workflow_id]["done"] = True
165+
running_workflow[running_workflow_id]["success"] = success
166+
running_workflow[running_workflow_id]["error"] = error
167+
running_workflow[running_workflow_id]["error_msg"] = error_msg
168+
169+
with open(_RUNNING_WORKFLOW_PICKLE_FILE, "wb") as pickle_file:
170+
Pickler(pickle_file).dump(running_workflow)
171+
UnitTestAPIAdapter.lock.release()
172+
63173
def get_running_workflow(self, *, running_workflow_id: str) -> Dict[str, Any]:
64-
if running_workflow_id not in self._running_workflow:
174+
UnitTestAPIAdapter.lock.acquire()
175+
with open(_RUNNING_WORKFLOW_PICKLE_FILE, "rb") as pickle_file:
176+
running_workflow = Unpickler(pickle_file).load()
177+
UnitTestAPIAdapter.lock.release()
178+
179+
if running_workflow_id not in running_workflow:
65180
return {}
66-
return {"running_workflow": self._running_workflow[running_workflow_id]}
181+
return {"running_workflow": running_workflow[running_workflow_id]}
67182

68183
def create_running_workflow_step(
69184
self, *, running_workflow_id: str, step: str
70185
) -> str:
71-
next_id: int = len(self._running_workflow_steps) + 1
186+
UnitTestAPIAdapter.lock.acquire()
187+
with open(_RUNNING_WORKFLOW_STEP_PICKLE_FILE, "rb") as pickle_file:
188+
running_workflow_step = Unpickler(pickle_file).load()
189+
190+
next_id: int = len(running_workflow_step) + 1
72191
running_workflow_step_id: str = _RUNNING_WORKFLOW_STEP_ID_FORMAT.format(
73192
id=next_id
74193
)
@@ -78,64 +197,124 @@ def create_running_workflow_step(
78197
"success": False,
79198
"running_workflow": running_workflow_id,
80199
}
81-
self._running_workflow_steps[running_workflow_step_id] = record
200+
running_workflow_step[running_workflow_step_id] = record
201+
202+
with open(_RUNNING_WORKFLOW_STEP_PICKLE_FILE, "wb") as pickle_file:
203+
Pickler(pickle_file).dump(running_workflow_step)
204+
UnitTestAPIAdapter.lock.release()
205+
82206
return {"id": running_workflow_step_id}
83207

84208
def get_running_workflow_step(
85209
self, *, running_workflow_step_id: str
86210
) -> Dict[str, Any]:
87-
if running_workflow_step_id not in self._running_workflow_steps:
211+
UnitTestAPIAdapter.lock.acquire()
212+
with open(_RUNNING_WORKFLOW_STEP_PICKLE_FILE, "rb") as pickle_file:
213+
running_workflow_step = Unpickler(pickle_file).load()
214+
UnitTestAPIAdapter.lock.release()
215+
216+
if running_workflow_step_id not in running_workflow_step:
88217
return {}
89218
return {
90-
"running_workflow_step": self._running_workflow_steps[
91-
running_workflow_step_id
92-
]
219+
"running_workflow_step": running_workflow_step[running_workflow_step_id]
93220
}
94221

222+
def set_running_workflow_step_done(
223+
self,
224+
*,
225+
running_workflow_step_id: str,
226+
success: bool,
227+
error: Optional[int] = None,
228+
error_msg: Optional[str] = None,
229+
) -> None:
230+
UnitTestAPIAdapter.lock.acquire()
231+
with open(_RUNNING_WORKFLOW_STEP_PICKLE_FILE, "rb") as pickle_file:
232+
running_workflow_step = Unpickler(pickle_file).load()
233+
234+
assert running_workflow_step_id in running_workflow_step
235+
running_workflow_step[running_workflow_step_id]["done"] = True
236+
running_workflow_step[running_workflow_step_id]["success"] = success
237+
running_workflow_step[running_workflow_step_id]["error"] = error
238+
running_workflow_step[running_workflow_step_id]["error_msg"] = error_msg
239+
240+
with open(_RUNNING_WORKFLOW_STEP_PICKLE_FILE, "wb") as pickle_file:
241+
Pickler(pickle_file).dump(running_workflow_step)
242+
UnitTestAPIAdapter.lock.release()
243+
95244
def get_running_workflow_steps(
96245
self, *, running_workflow_id: str
97246
) -> List[Dict[str, Any]]:
247+
UnitTestAPIAdapter.lock.acquire()
248+
with open(_RUNNING_WORKFLOW_STEP_PICKLE_FILE, "rb") as pickle_file:
249+
running_workflow_step = Unpickler(pickle_file).load()
250+
UnitTestAPIAdapter.lock.release()
251+
98252
steps = []
99-
for key, value in self._running_workflow_steps.items():
253+
for key, value in running_workflow_step.items():
100254
if value["running_workflow"] == running_workflow_id:
101255
item = {"running_workflow_step": value, "id": key}
102256
steps.append(item)
103257
return {"count": len(steps), "running_workflow_steps": steps}
104258

105259
def create_instance(self, *, running_workflow_step_id: str) -> Dict[str, Any]:
106-
next_id: int = len(self._instances) + 1
260+
UnitTestAPIAdapter.lock.acquire()
261+
with open(_INSTANCE_PICKLE_FILE, "rb") as pickle_file:
262+
instances = Unpickler(pickle_file).load()
263+
264+
next_id: int = len(instances) + 1
107265
instance_id: str = _INSTANCE_ID_FORMAT.format(id=next_id)
108266
record = {
109267
"running_workflow_step": running_workflow_step_id,
110268
}
111-
self._instances[instance_id] = record
269+
instances[instance_id] = record
270+
271+
with open(_INSTANCE_PICKLE_FILE, "wb") as pickle_file:
272+
Pickler(pickle_file).dump(instances)
273+
UnitTestAPIAdapter.lock.release()
274+
112275
return {"id": instance_id}
113276

114277
def get_instance(self, *, instance_id: str) -> Dict[str, Any]:
115-
if instance_id not in self._instances:
116-
return {}
117-
return self._instances[instance_id]
278+
UnitTestAPIAdapter.lock.acquire()
279+
with open(_INSTANCE_PICKLE_FILE, "rb") as pickle_file:
280+
instances = Unpickler(pickle_file).load()
281+
UnitTestAPIAdapter.lock.release()
282+
283+
return {} if instance_id not in instances else instances[instance_id]
118284

119285
def create_task(self, *, instance_id: str) -> Dict[str, Any]:
120-
next_id: int = len(self._instances) + 1
121-
task_id: str = _INSTANCE_ID_FORMAT.format(id=next_id)
286+
UnitTestAPIAdapter.lock.acquire()
287+
with open(_TASK_PICKLE_FILE, "rb") as pickle_file:
288+
tasks = Unpickler(pickle_file).load()
289+
290+
next_id: int = len(tasks) + 1
291+
task_id: str = _TASK_ID_FORMAT.format(id=next_id)
122292
record = {
123293
"done": False,
124294
"exit_code": 0,
125295
}
126-
self._tasks[task_id] = record
296+
tasks[task_id] = record
297+
298+
with open(_TASK_PICKLE_FILE, "wb") as pickle_file:
299+
Pickler(pickle_file).dump(tasks)
300+
UnitTestAPIAdapter.lock.release()
301+
127302
return {"id": task_id}
128303

129304
def get_task(self, *, task_id: str) -> Dict[str, Any]:
130-
if task_id not in self._tasks:
131-
return {}
132-
return self._tasks[task_id]
305+
UnitTestAPIAdapter.lock.acquire()
306+
with open(_TASK_PICKLE_FILE, "rb") as pickle_file:
307+
tasks = Unpickler(pickle_file).load()
308+
UnitTestAPIAdapter.lock.release()
309+
310+
return {} if task_id not in tasks else tasks[task_id]
133311

134312
def get_job(
135313
self, *, collection: str, job: str, version: str
136314
) -> Optional[Dict[str, Any]]:
137315
assert collection == _JOB_DEFINITIONS["collection"]
138316
assert job in _JOB_DEFINITIONS["jobs"]
317+
assert version
139318

140319
jd = _JOB_DEFINITIONS["jobs"][job]
141320
response = {"command": jd["command"]}

tests/config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
"""An emulation of the Data Manager config module.
2+
A module of common constants."""
3+
4+
# The project ID used in the tests (there is only one atm)
5+
TEST_PROJECT_ID: str = "project-00000000-0000-0000-0000-000000000001"

0 commit comments

Comments
 (0)