Skip to content

Commit 6553b58

Browse files
author
Alan Christie
committed
feat: Fixed APIadapter (multiprocessing)
1 parent 94d99e9 commit 6553b58

File tree

8 files changed

+224
-47
lines changed

8 files changed

+224
-47
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
.coverage
22
dist/
33
**/__pycache__/
4+
**/*.pickle

tests/api_adapter.py

Lines changed: 127 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import os
2+
from multiprocessing import Lock
3+
from pickle import Pickler, Unpickler
24
from typing import Any, Dict, List, Optional
35

46
import yaml
@@ -14,61 +16,114 @@
1416
assert _JOB_DEFINITIONS
1517

1618
_INSTANCE_ID_FORMAT: str = "instance-00000000-0000-0000-0000-{id:012d}"
19+
_TASK_ID_FORMAT: str = "task-00000000-0000-0000-0000-{id:012d}"
1720
_WORKFLOW_DEFINITION_ID_FORMAT: str = "workflow-00000000-0000-0000-0000-{id:012d}"
1821
_RUNNING_WORKFLOW_ID_FORMAT: str = "r-workflow-00000000-0000-0000-0000-{id:012d}"
1922
_RUNNING_WORKFLOW_STEP_ID_FORMAT: str = (
2023
"r-workflow-step-00000000-0000-0000-0000-{id:012d}"
2124
)
2225

26+
_WORKFLOW_PICKLE_FILE: str = "workflow.pickle"
27+
_RUNNING_WORKFLOW_PICKLE_FILE: str = "running-workflow.pickle"
28+
_RUNNING_WORKFLOW_STEP_PICKLE_FILE: str = "running-workflow-step.pickle"
29+
_INSTANCE_PICKLE_FILE: str = "instance.pickle"
30+
_TASK_PICKLE_FILE: str = "task.pickle"
31+
2332

2433
class UnitTestAPIAdapter(APIAdapter):
2534
"""A minimal API adapter. It serves-up Job Definitions
2635
from the job-definitions/job-definitions.yaml file and provides basic
2736
(in-memory) storage for Workflow Definitions and related tables."""
2837

38+
mp_lock = Lock()
39+
2940
def __init__(self):
3041
super().__init__()
31-
# A map of workflow definitions, keyed by workflow definition ID.
32-
self._workflow_definitions: Dict[str, Dict[str, Any]] = {}
33-
self._running_workflow: Dict[str, Dict[str, Any]] = {}
34-
self._running_workflow_steps: Dict[str, Dict[str, Any]] = {}
35-
self._instances: Dict[str, Dict[str, Any]] = {}
36-
self._tasks: Dict[str, Dict[str, Any]] = {}
42+
# Safely initialise the pickle files
43+
UnitTestAPIAdapter.mp_lock.acquire()
44+
with open(f"tests/{_WORKFLOW_PICKLE_FILE}", "wb") as pickle_file:
45+
Pickler(pickle_file).dump({})
46+
with open(f"tests/{_RUNNING_WORKFLOW_PICKLE_FILE}", "wb") as pickle_file:
47+
Pickler(pickle_file).dump({})
48+
with open(f"tests/{_RUNNING_WORKFLOW_STEP_PICKLE_FILE}", "wb") as pickle_file:
49+
Pickler(pickle_file).dump({})
50+
with open(f"tests/{_INSTANCE_PICKLE_FILE}", "wb") as pickle_file:
51+
Pickler(pickle_file).dump({})
52+
with open(f"tests/{_TASK_PICKLE_FILE}", "wb") as pickle_file:
53+
Pickler(pickle_file).dump({})
54+
UnitTestAPIAdapter.mp_lock.release()
3755

3856
def create_workflow(self, *, workflow_definition: Dict[str, Any]) -> str:
39-
next_id: int = len(self._workflow_definitions) + 1
57+
UnitTestAPIAdapter.mp_lock.acquire()
58+
with open(f"tests/{_WORKFLOW_PICKLE_FILE}", "rb") as pickle_file:
59+
workflow = Unpickler(pickle_file).load()
60+
61+
next_id: int = len(workflow) + 1
4062
workflow_definition_id: str = _WORKFLOW_DEFINITION_ID_FORMAT.format(id=next_id)
41-
self._workflow_definitions[workflow_definition_id] = workflow_definition
63+
workflow[workflow_definition_id] = workflow_definition
64+
65+
with open(f"tests/{_WORKFLOW_PICKLE_FILE}", "wb") as pickle_file:
66+
Pickler(pickle_file).dump(workflow)
67+
UnitTestAPIAdapter.mp_lock.release()
68+
4269
return {"id": workflow_definition_id}
4370

4471
def get_workflow(self, *, workflow_definition_id: str) -> Dict[str, Any]:
45-
if workflow_definition_id not in self._workflow_definitions:
72+
UnitTestAPIAdapter.mp_lock.acquire()
73+
with open(f"tests/{_WORKFLOW_PICKLE_FILE}", "rb") as pickle_file:
74+
workflow = Unpickler(pickle_file).load()
75+
UnitTestAPIAdapter.mp_lock.release()
76+
77+
if workflow_definition_id not in workflow:
4678
return {}
47-
return {"workflow": self._workflow_definitions[workflow_definition_id]}
79+
return {"workflow": workflow[workflow_definition_id]}
4880

4981
def get_workflow_by_name(self, *, name: str, version: str) -> Dict[str, Any]:
82+
UnitTestAPIAdapter.mp_lock.acquire()
83+
with open(f"tests/{_WORKFLOW_PICKLE_FILE}", "rb") as pickle_file:
84+
workflow = Unpickler(pickle_file).load()
85+
UnitTestAPIAdapter.mp_lock.release()
86+
5087
item = {}
51-
for wfid, value in self._workflow_definitions.items():
88+
for wfid, value in workflow.items():
5289
if value["name"] == name:
5390
item = {"id": wfid, "workflow": value}
5491
return item
5592

5693
def create_running_workflow(self, *, workflow_definition_id: str) -> str:
57-
next_id: int = len(self._running_workflow) + 1
94+
UnitTestAPIAdapter.mp_lock.acquire()
95+
with open(f"tests/{_RUNNING_WORKFLOW_PICKLE_FILE}", "rb") as pickle_file:
96+
running_workflow = Unpickler(pickle_file).load()
97+
98+
next_id: int = len(running_workflow) + 1
5899
running_workflow_id: str = _RUNNING_WORKFLOW_ID_FORMAT.format(id=next_id)
59100
record = {"done": False, "success": False, "workflow": workflow_definition_id}
60-
self._running_workflow[running_workflow_id] = record
101+
running_workflow[running_workflow_id] = record
102+
103+
with open(f"tests/{_RUNNING_WORKFLOW_PICKLE_FILE}", "wb") as pickle_file:
104+
Pickler(pickle_file).dump(running_workflow)
105+
UnitTestAPIAdapter.mp_lock.release()
106+
61107
return {"id": running_workflow_id}
62108

63109
def get_running_workflow(self, *, running_workflow_id: str) -> Dict[str, Any]:
64-
if running_workflow_id not in self._running_workflow:
110+
UnitTestAPIAdapter.mp_lock.acquire()
111+
with open(f"tests/{_RUNNING_WORKFLOW_PICKLE_FILE}", "rb") as pickle_file:
112+
running_workflow = Unpickler(pickle_file).load()
113+
UnitTestAPIAdapter.mp_lock.release()
114+
115+
if running_workflow_id not in running_workflow:
65116
return {}
66-
return {"running_workflow": self._running_workflow[running_workflow_id]}
117+
return {"running_workflow": running_workflow[running_workflow_id]}
67118

68119
def create_running_workflow_step(
69120
self, *, running_workflow_id: str, step: str
70121
) -> str:
71-
next_id: int = len(self._running_workflow_steps) + 1
122+
UnitTestAPIAdapter.mp_lock.acquire()
123+
with open(f"tests/{_RUNNING_WORKFLOW_STEP_PICKLE_FILE}", "rb") as pickle_file:
124+
running_workflow_step = Unpickler(pickle_file).load()
125+
126+
next_id: int = len(running_workflow_step) + 1
72127
running_workflow_step_id: str = _RUNNING_WORKFLOW_STEP_ID_FORMAT.format(
73128
id=next_id
74129
)
@@ -78,58 +133,99 @@ def create_running_workflow_step(
78133
"success": False,
79134
"running_workflow": running_workflow_id,
80135
}
81-
self._running_workflow_steps[running_workflow_step_id] = record
136+
running_workflow_step[running_workflow_step_id] = record
137+
138+
with open(f"tests/{_RUNNING_WORKFLOW_STEP_PICKLE_FILE}", "wb") as pickle_file:
139+
Pickler(pickle_file).dump(running_workflow_step)
140+
UnitTestAPIAdapter.mp_lock.release()
141+
82142
return {"id": running_workflow_step_id}
83143

84144
def get_running_workflow_step(
85145
self, *, running_workflow_step_id: str
86146
) -> Dict[str, Any]:
87-
if running_workflow_step_id not in self._running_workflow_steps:
147+
UnitTestAPIAdapter.mp_lock.acquire()
148+
with open(f"tests/{_RUNNING_WORKFLOW_STEP_PICKLE_FILE}", "rb") as pickle_file:
149+
running_workflow_step = Unpickler(pickle_file).load()
150+
UnitTestAPIAdapter.mp_lock.release()
151+
152+
if running_workflow_step_id not in running_workflow_step:
88153
return {}
89154
return {
90-
"running_workflow_step": self._running_workflow_steps[
91-
running_workflow_step_id
92-
]
155+
"running_workflow_step": running_workflow_step[running_workflow_step_id]
93156
}
94157

95158
def get_running_workflow_steps(
96159
self, *, running_workflow_id: str
97160
) -> List[Dict[str, Any]]:
161+
UnitTestAPIAdapter.mp_lock.acquire()
162+
with open(f"tests/{_RUNNING_WORKFLOW_STEP_PICKLE_FILE}", "rb") as pickle_file:
163+
running_workflow_step = Unpickler(pickle_file).load()
164+
UnitTestAPIAdapter.mp_lock.release()
165+
98166
steps = []
99-
for key, value in self._running_workflow_steps.items():
167+
for key, value in running_workflow_step.items():
100168
if value["running_workflow"] == running_workflow_id:
101169
item = {"running_workflow_step": value, "id": key}
102170
steps.append(item)
103171
return {"count": len(steps), "running_workflow_steps": steps}
104172

105173
def create_instance(self, *, running_workflow_step_id: str) -> Dict[str, Any]:
106-
next_id: int = len(self._instances) + 1
174+
UnitTestAPIAdapter.mp_lock.acquire()
175+
with open(f"tests/{_INSTANCE_PICKLE_FILE}", "rb") as pickle_file:
176+
instances = Unpickler(pickle_file).load()
177+
178+
next_id: int = len(instances) + 1
107179
instance_id: str = _INSTANCE_ID_FORMAT.format(id=next_id)
108180
record = {
109181
"running_workflow_step": running_workflow_step_id,
110182
}
111-
self._instances[instance_id] = record
183+
instances[instance_id] = record
184+
185+
with open(f"tests/{_INSTANCE_PICKLE_FILE}", "wb") as pickle_file:
186+
Pickler(pickle_file).dump(instances)
187+
UnitTestAPIAdapter.mp_lock.release()
188+
112189
return {"id": instance_id}
113190

114191
def get_instance(self, *, instance_id: str) -> Dict[str, Any]:
115-
if instance_id not in self._instances:
192+
UnitTestAPIAdapter.mp_lock.acquire()
193+
with open(f"tests/{_INSTANCE_PICKLE_FILE}", "rb") as pickle_file:
194+
instances = Unpickler(pickle_file).load()
195+
UnitTestAPIAdapter.mp_lock.release()
196+
197+
if instance_id not in instances:
116198
return {}
117-
return self._instances[instance_id]
199+
return instances[instance_id]
118200

119201
def create_task(self, *, instance_id: str) -> Dict[str, Any]:
120-
next_id: int = len(self._instances) + 1
121-
task_id: str = _INSTANCE_ID_FORMAT.format(id=next_id)
202+
UnitTestAPIAdapter.mp_lock.acquire()
203+
with open(f"tests/{_TASK_PICKLE_FILE}", "rb") as pickle_file:
204+
tasks = Unpickler(pickle_file).load()
205+
206+
next_id: int = len(tasks) + 1
207+
task_id: str = _TASK_ID_FORMAT.format(id=next_id)
122208
record = {
123209
"done": False,
124210
"exit_code": 0,
125211
}
126-
self._tasks[task_id] = record
212+
tasks[task_id] = record
213+
214+
with open(f"tests/{_TASK_PICKLE_FILE}", "wb") as pickle_file:
215+
Pickler(pickle_file).dump(tasks)
216+
UnitTestAPIAdapter.mp_lock.release()
217+
127218
return {"id": task_id}
128219

129220
def get_task(self, *, task_id: str) -> Dict[str, Any]:
130-
if task_id not in self._tasks:
221+
UnitTestAPIAdapter.mp_lock.acquire()
222+
with open(f"tests/{_TASK_PICKLE_FILE}", "rb") as pickle_file:
223+
tasks = Unpickler(pickle_file).load()
224+
UnitTestAPIAdapter.mp_lock.release()
225+
226+
if task_id not in tasks:
131227
return {}
132-
return self._tasks[task_id]
228+
return tasks[task_id]
133229

134230
def get_job(
135231
self, *, collection: str, job: str, version: str

tests/message_dispatcher.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@ def __init__(self, msg_queue: UnitTestMessageQueue):
1212
self._msg_queue: UnitTestMessageQueue = msg_queue
1313

1414
def send(self, message: Message) -> None:
15+
print(f"UnitTestMessageDispatcher.send:\n{message}")
1516
self._msg_queue.put(message)

tests/message_queue.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ def __init__(self, receiver: Optional[Callable[[Message], None]] = None):
1818
self._queue = Queue()
1919
self._receiver = receiver
2020

21+
def set_receiver(self, receiver: Callable[[Message], None]):
22+
"""Set or replace the receiver function, used unit-testing the WorkflowEngine."""
23+
assert receiver
24+
self._receiver = receiver
25+
2126
def run(self):
2227
while not self._stop.is_set():
2328
with suppress(Empty):

tests/test_test_instance_launcher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def test_get_nop_job(basic_launcher):
3838
workflow_id="workflow-00000000-0000-0000-0000-000000000000",
3939
running_workflow_step_id=rwfsid,
4040
workflow_definition={},
41-
step_specification={"job": "nop"},
41+
step_specification={"job": "nop", "variables": {"x": 1}},
4242
)
4343

4444
# Assert

tests/test_workflow_engine_with_examples.py renamed to tests/test_workflow_engine_examples.py

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
# A unit (functional) test for the WorkflowEngine's handling of 'Example 1'.
2+
import os
23
import time
34
from datetime import datetime, timezone
45

56
import pytest
7+
import yaml
68

79
pytestmark = pytest.mark.unit
810

@@ -23,33 +25,54 @@ def basic_engine():
2325
instance_launcher = UnitTestInstanceLauncher(
2426
api_adapter=api_adapter, msg_dispatcher=message_dispatcher
2527
)
28+
workflow_engine = WorkflowEngine(
29+
api_adapter=api_adapter, instance_launcher=instance_launcher
30+
)
31+
message_queue.set_receiver(workflow_engine.handle_message)
2632
return [
2733
api_adapter,
34+
message_queue,
2835
message_dispatcher,
29-
WorkflowEngine(api_adapter=api_adapter, instance_launcher=instance_launcher),
36+
workflow_engine,
3037
]
3138

3239

33-
def test_workflow_engine_with_example_1(basic_engine):
40+
def test_workflow_engine_with_two_step_nop(basic_engine):
3441
# Arrange
35-
da, md, _ = basic_engine
36-
# LOAD THE EXAMPLE-1 WORKFLOW DEFINITION INTO THE DATABASE
37-
# TODO
38-
# SIMULATE THE API CREATION OF A RUNNING WORKFLOW FROM THE WORKFLOW
39-
wfid = da.create_workflow(workflow_definition={"name": "example-1"})
42+
da, mq, md, _ = basic_engine
43+
44+
# Act
45+
# To test the WorkflowEngine we need to:
46+
# 1. Start the message queue
47+
# 2. Load and create a Workflow Definition
48+
# 3. Create a Running Workflow record
49+
# 4. Send a Workflow START message
50+
#
51+
# 1. (Start the message queue)
52+
mq.start()
53+
# 2. (Load/create the workflow definition to be tested)
54+
workflow_file_name = "example-two-step-nop"
55+
workflow_path = os.path.join(
56+
os.path.dirname(__file__), "workflow-definitions", f"{workflow_file_name}.yaml"
57+
)
58+
with open(workflow_path, "r", encoding="utf8") as wf_file:
59+
wf_definition = yaml.load(wf_file, Loader=yaml.FullLoader)
60+
assert wf_definition
61+
wfid = da.create_workflow(workflow_definition=wf_definition)
4062
assert wfid
63+
print(f"Created workflow definition {wfid}")
64+
# 3. (Create a running workflow record)
4165
response = da.create_running_workflow(workflow_definition_id=wfid)
4266
r_wfid = response["id"]
4367
assert r_wfid
44-
45-
# Act
46-
# SEND A MESSAGE TO THE ENGINE (VIA THE MESSAGE DISPATCHER) TO START THE WORKFLOW
47-
# THE RUNNING WORKFLOW WILL HAVE THE ID "1"
68+
print(f"Created running workflow {r_wfid}")
69+
# 4. (Send the Workflow START message)
4870
msg = WorkflowMessage()
4971
msg.timestamp = f"{datetime.now(timezone.utc).isoformat()}Z"
5072
msg.action = "START"
5173
msg.running_workflow = r_wfid
5274
md.send(msg)
75+
print("Sent START message")
5376

5477
# Assert
5578
# Wait until the workflow is done (successfully)
@@ -59,6 +82,7 @@ def test_workflow_engine_with_example_1(basic_engine):
5982
r_wf = None
6083
while not done:
6184
response = da.get_running_workflow(running_workflow_id=r_wfid)
85+
assert "running_workflow" in response
6286
r_wf = response["running_workflow"]
6387
if r_wf["done"]:
6488
done = True
@@ -67,14 +91,17 @@ def test_workflow_engine_with_example_1(basic_engine):
6791
if attempts > 10:
6892
break
6993
time.sleep(0.5)
94+
# Stop the message queue
95+
print("Stopping message queue...")
96+
mq.stop()
97+
mq.join()
98+
print("Stopped")
7099
assert r_wf
71-
# TODO - The following should be 'success' but the implementation does not set it yet
72-
assert not r_wf["success"]
100+
assert r_wf["success"]
73101
# Now check there are the right number of RunningWorkflowStep Records
74102
# (and they're all set to success/done)
75103
response = da.get_running_workflow_steps(running_workflow_id=r_wfid)
76-
# TODO - The following should not be zero but the implementation does not set it yet
77-
assert response["count"] == 0
104+
assert response["count"] == 2
78105
for step in response["running_workflow_steps"]:
79106
assert step["running_workflow_step"]["done"]
80107
assert step["running_workflow_step"]["success"]

0 commit comments

Comments
 (0)