Skip to content

Commit 4711816

Browse files
author
Alan Christie
committed
refactor: Major 'launcher' refactor based on needs of exiting DM launcher logic
1 parent a37803e commit 4711816

8 files changed

+163
-129
lines changed

tests/api_adapter.py

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
_TASK_PICKLE_FILE: str = f"{_PICKLE_DIRECTORY}/task.pickle"
5353

5454

55-
class UnitTestAPIAdapter(WorkflowAPIAdapter):
55+
class UnitTestWorkflowAPIAdapter(WorkflowAPIAdapter):
5656
"""A minimal API adapter. It serves-up Job Definitions
5757
from the job-definitions/job-definitions.yaml file and provides basic
5858
storage for Workflow Definitions and related tables.
@@ -66,7 +66,7 @@ class UnitTestAPIAdapter(WorkflowAPIAdapter):
6666
def __init__(self):
6767
super().__init__()
6868
# Safely initialise the pickle files
69-
UnitTestAPIAdapter.lock.acquire()
69+
UnitTestWorkflowAPIAdapter.lock.acquire()
7070
if not os.path.exists(_PICKLE_DIRECTORY):
7171
os.makedirs(_PICKLE_DIRECTORY)
7272
for file in [
@@ -78,10 +78,10 @@ def __init__(self):
7878
]:
7979
with open(file, "wb") as pickle_file:
8080
Pickler(pickle_file).dump({})
81-
UnitTestAPIAdapter.lock.release()
81+
UnitTestWorkflowAPIAdapter.lock.release()
8282

8383
def create_workflow(self, *, workflow_definition: Dict[str, Any]) -> str:
84-
UnitTestAPIAdapter.lock.acquire()
84+
UnitTestWorkflowAPIAdapter.lock.acquire()
8585
with open(_WORKFLOW_PICKLE_FILE, "rb") as pickle_file:
8686
workflow = Unpickler(pickle_file).load()
8787

@@ -91,23 +91,23 @@ def create_workflow(self, *, workflow_definition: Dict[str, Any]) -> str:
9191

9292
with open(_WORKFLOW_PICKLE_FILE, "wb") as pickle_file:
9393
Pickler(pickle_file).dump(workflow)
94-
UnitTestAPIAdapter.lock.release()
94+
UnitTestWorkflowAPIAdapter.lock.release()
9595

9696
return {"id": workflow_definition_id}
9797

9898
def get_workflow(self, *, workflow_id: str) -> Dict[str, Any]:
99-
UnitTestAPIAdapter.lock.acquire()
99+
UnitTestWorkflowAPIAdapter.lock.acquire()
100100
with open(_WORKFLOW_PICKLE_FILE, "rb") as pickle_file:
101101
workflow = Unpickler(pickle_file).load()
102-
UnitTestAPIAdapter.lock.release()
102+
UnitTestWorkflowAPIAdapter.lock.release()
103103

104104
return {"workflow": workflow[workflow_id]} if workflow_id in workflow else {}
105105

106106
def get_workflow_by_name(self, *, name: str, version: str) -> Dict[str, Any]:
107-
UnitTestAPIAdapter.lock.acquire()
107+
UnitTestWorkflowAPIAdapter.lock.acquire()
108108
with open(_WORKFLOW_PICKLE_FILE, "rb") as pickle_file:
109109
workflow = Unpickler(pickle_file).load()
110-
UnitTestAPIAdapter.lock.release()
110+
UnitTestWorkflowAPIAdapter.lock.release()
111111

112112
item = {}
113113
for wfid, value in workflow.items():
@@ -126,7 +126,7 @@ def create_running_workflow(
126126
assert user_id
127127
assert isinstance(variables, dict)
128128

129-
UnitTestAPIAdapter.lock.acquire()
129+
UnitTestWorkflowAPIAdapter.lock.acquire()
130130
with open(_RUNNING_WORKFLOW_PICKLE_FILE, "rb") as pickle_file:
131131
running_workflow = Unpickler(pickle_file).load()
132132

@@ -144,7 +144,7 @@ def create_running_workflow(
144144

145145
with open(_RUNNING_WORKFLOW_PICKLE_FILE, "wb") as pickle_file:
146146
Pickler(pickle_file).dump(running_workflow)
147-
UnitTestAPIAdapter.lock.release()
147+
UnitTestWorkflowAPIAdapter.lock.release()
148148

149149
return {"id": running_workflow_id}
150150

@@ -156,7 +156,7 @@ def set_running_workflow_done(
156156
error: Optional[int] = None,
157157
error_msg: Optional[str] = None,
158158
) -> None:
159-
UnitTestAPIAdapter.lock.acquire()
159+
UnitTestWorkflowAPIAdapter.lock.acquire()
160160
with open(_RUNNING_WORKFLOW_PICKLE_FILE, "rb") as pickle_file:
161161
running_workflow = Unpickler(pickle_file).load()
162162

@@ -168,13 +168,13 @@ def set_running_workflow_done(
168168

169169
with open(_RUNNING_WORKFLOW_PICKLE_FILE, "wb") as pickle_file:
170170
Pickler(pickle_file).dump(running_workflow)
171-
UnitTestAPIAdapter.lock.release()
171+
UnitTestWorkflowAPIAdapter.lock.release()
172172

173173
def get_running_workflow(self, *, running_workflow_id: str) -> Dict[str, Any]:
174-
UnitTestAPIAdapter.lock.acquire()
174+
UnitTestWorkflowAPIAdapter.lock.acquire()
175175
with open(_RUNNING_WORKFLOW_PICKLE_FILE, "rb") as pickle_file:
176176
running_workflow = Unpickler(pickle_file).load()
177-
UnitTestAPIAdapter.lock.release()
177+
UnitTestWorkflowAPIAdapter.lock.release()
178178

179179
if running_workflow_id not in running_workflow:
180180
return {}
@@ -183,7 +183,7 @@ def get_running_workflow(self, *, running_workflow_id: str) -> Dict[str, Any]:
183183
def create_running_workflow_step(
184184
self, *, running_workflow_id: str, step: str
185185
) -> str:
186-
UnitTestAPIAdapter.lock.acquire()
186+
UnitTestWorkflowAPIAdapter.lock.acquire()
187187
with open(_RUNNING_WORKFLOW_STEP_PICKLE_FILE, "rb") as pickle_file:
188188
running_workflow_step = Unpickler(pickle_file).load()
189189

@@ -201,17 +201,17 @@ def create_running_workflow_step(
201201

202202
with open(_RUNNING_WORKFLOW_STEP_PICKLE_FILE, "wb") as pickle_file:
203203
Pickler(pickle_file).dump(running_workflow_step)
204-
UnitTestAPIAdapter.lock.release()
204+
UnitTestWorkflowAPIAdapter.lock.release()
205205

206206
return {"id": running_workflow_step_id}
207207

208208
def get_running_workflow_step(
209209
self, *, running_workflow_step_id: str
210210
) -> Dict[str, Any]:
211-
UnitTestAPIAdapter.lock.acquire()
211+
UnitTestWorkflowAPIAdapter.lock.acquire()
212212
with open(_RUNNING_WORKFLOW_STEP_PICKLE_FILE, "rb") as pickle_file:
213213
running_workflow_step = Unpickler(pickle_file).load()
214-
UnitTestAPIAdapter.lock.release()
214+
UnitTestWorkflowAPIAdapter.lock.release()
215215

216216
if running_workflow_step_id not in running_workflow_step:
217217
return {}
@@ -227,7 +227,7 @@ def set_running_workflow_step_done(
227227
error: Optional[int] = None,
228228
error_msg: Optional[str] = None,
229229
) -> None:
230-
UnitTestAPIAdapter.lock.acquire()
230+
UnitTestWorkflowAPIAdapter.lock.acquire()
231231
with open(_RUNNING_WORKFLOW_STEP_PICKLE_FILE, "rb") as pickle_file:
232232
running_workflow_step = Unpickler(pickle_file).load()
233233

@@ -239,15 +239,15 @@ def set_running_workflow_step_done(
239239

240240
with open(_RUNNING_WORKFLOW_STEP_PICKLE_FILE, "wb") as pickle_file:
241241
Pickler(pickle_file).dump(running_workflow_step)
242-
UnitTestAPIAdapter.lock.release()
242+
UnitTestWorkflowAPIAdapter.lock.release()
243243

244244
def get_running_workflow_steps(
245245
self, *, running_workflow_id: str
246246
) -> List[Dict[str, Any]]:
247-
UnitTestAPIAdapter.lock.acquire()
247+
UnitTestWorkflowAPIAdapter.lock.acquire()
248248
with open(_RUNNING_WORKFLOW_STEP_PICKLE_FILE, "rb") as pickle_file:
249249
running_workflow_step = Unpickler(pickle_file).load()
250-
UnitTestAPIAdapter.lock.release()
250+
UnitTestWorkflowAPIAdapter.lock.release()
251251

252252
steps = []
253253
for key, value in running_workflow_step.items():
@@ -257,7 +257,7 @@ def get_running_workflow_steps(
257257
return {"count": len(steps), "running_workflow_steps": steps}
258258

259259
def create_instance(self, *, running_workflow_step_id: str) -> Dict[str, Any]:
260-
UnitTestAPIAdapter.lock.acquire()
260+
UnitTestWorkflowAPIAdapter.lock.acquire()
261261
with open(_INSTANCE_PICKLE_FILE, "rb") as pickle_file:
262262
instances = Unpickler(pickle_file).load()
263263

@@ -270,20 +270,20 @@ def create_instance(self, *, running_workflow_step_id: str) -> Dict[str, Any]:
270270

271271
with open(_INSTANCE_PICKLE_FILE, "wb") as pickle_file:
272272
Pickler(pickle_file).dump(instances)
273-
UnitTestAPIAdapter.lock.release()
273+
UnitTestWorkflowAPIAdapter.lock.release()
274274

275275
return {"id": instance_id}
276276

277277
def get_instance(self, *, instance_id: str) -> Dict[str, Any]:
278-
UnitTestAPIAdapter.lock.acquire()
278+
UnitTestWorkflowAPIAdapter.lock.acquire()
279279
with open(_INSTANCE_PICKLE_FILE, "rb") as pickle_file:
280280
instances = Unpickler(pickle_file).load()
281-
UnitTestAPIAdapter.lock.release()
281+
UnitTestWorkflowAPIAdapter.lock.release()
282282

283283
return {} if instance_id not in instances else instances[instance_id]
284284

285285
def create_task(self, *, instance_id: str) -> Dict[str, Any]:
286-
UnitTestAPIAdapter.lock.acquire()
286+
UnitTestWorkflowAPIAdapter.lock.acquire()
287287
with open(_TASK_PICKLE_FILE, "rb") as pickle_file:
288288
tasks = Unpickler(pickle_file).load()
289289

@@ -297,15 +297,15 @@ def create_task(self, *, instance_id: str) -> Dict[str, Any]:
297297

298298
with open(_TASK_PICKLE_FILE, "wb") as pickle_file:
299299
Pickler(pickle_file).dump(tasks)
300-
UnitTestAPIAdapter.lock.release()
300+
UnitTestWorkflowAPIAdapter.lock.release()
301301

302302
return {"id": task_id}
303303

304304
def get_task(self, *, task_id: str) -> Dict[str, Any]:
305-
UnitTestAPIAdapter.lock.acquire()
305+
UnitTestWorkflowAPIAdapter.lock.acquire()
306306
with open(_TASK_PICKLE_FILE, "rb") as pickle_file:
307307
tasks = Unpickler(pickle_file).load()
308-
UnitTestAPIAdapter.lock.release()
308+
UnitTestWorkflowAPIAdapter.lock.release()
309309

310310
return {} if task_id not in tasks else tasks[task_id]
311311

tests/instance_launcher.py

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,21 @@
1515
(so the start of each test begins with an empty project directory).
1616
"""
1717

18-
import json
1918
import os
2019
import shutil
2120
import subprocess
2221
from datetime import datetime, timezone
2322
from subprocess import CompletedProcess
24-
from typing import Any, Dict, List
23+
from typing import List
2524

2625
from decoder import decoder as job_decoder
2726
from decoder.decoder import TextEncoding
2827
from informaticsmatters.protobuf.datamanager.pod_message_pb2 import PodMessage
2928

30-
from tests.api_adapter import UnitTestAPIAdapter
29+
from tests.api_adapter import UnitTestWorkflowAPIAdapter
3130
from tests.config import TEST_PROJECT_ID
3231
from tests.message_dispatcher import UnitTestMessageDispatcher
33-
from workflow.workflow_abc import InstanceLauncher, LaunchResult
32+
from workflow.workflow_abc import InstanceLauncher, LaunchParameters, LaunchResult
3433

3534
# Full path to the 'jobs' directory
3635
_JOB_PATH: str = os.path.join(os.path.dirname(__file__), "jobs")
@@ -48,65 +47,55 @@ class UnitTestInstanceLauncher(InstanceLauncher):
4847
"""A unit test instance launcher."""
4948

5049
def __init__(
51-
self, api_adapter: UnitTestAPIAdapter, msg_dispatcher: UnitTestMessageDispatcher
50+
self,
51+
wapi_adapter: UnitTestWorkflowAPIAdapter,
52+
msg_dispatcher: UnitTestMessageDispatcher,
5253
):
5354
super().__init__()
5455

55-
self._api_adapter = api_adapter
56+
self._api_adapter = wapi_adapter
5657
self._msg_dispatcher = msg_dispatcher
5758

5859
# Every launcher starts with an empty execution directory...
5960
print(f"Removing execution directory ({_EXECUTION_DIRECTORY})")
6061
assert _EXECUTION_DIRECTORY.startswith("tests/project-root")
6162
shutil.rmtree(_EXECUTION_DIRECTORY, ignore_errors=True)
6263

63-
def launch(
64-
self,
65-
*,
66-
project_id: str,
67-
running_workflow_id: str,
68-
running_workflow_step_id: str,
69-
step_specification: str,
70-
variables: Dict[str, Any],
71-
) -> LaunchResult:
72-
assert project_id
73-
assert running_workflow_id
74-
assert step_specification
75-
assert isinstance(step_specification, str)
76-
assert isinstance(variables, dict)
77-
78-
assert project_id == TEST_PROJECT_ID
64+
def launch(self, launch_parameters: LaunchParameters) -> LaunchResult:
65+
assert launch_parameters
66+
assert launch_parameters.project_id == TEST_PROJECT_ID
67+
assert launch_parameters.specification
68+
assert isinstance(launch_parameters.specification, dict)
7969

8070
os.makedirs(_EXECUTION_DIRECTORY, exist_ok=True)
8171

8272
# We're passed a RunningWorkflowStep ID but a record is expected to have been
8373
# created bt the caller, we simply create instance records.
8474
response = self._api_adapter.get_running_workflow_step(
85-
running_workflow_step_id=running_workflow_step_id
75+
running_workflow_step_id=launch_parameters.running_workflow_step_id
8676
)
8777
assert "running_workflow_step" in response
8878
# Now simulate the creation of a Task and Instance record
8979
response = self._api_adapter.create_instance(
90-
running_workflow_step_id=running_workflow_step_id
80+
running_workflow_step_id=launch_parameters.running_workflow_step_id
9181
)
9282
instance_id = response["id"]
9383
response = self._api_adapter.create_task(instance_id=instance_id)
9484
task_id = response["id"]
9585

9686
# Apply variables to the step's Job command.
97-
step_specification_map = json.loads(step_specification)
9887
job = self._api_adapter.get_job(
99-
collection=step_specification_map["collection"],
100-
job=step_specification_map["job"],
88+
collection=launch_parameters.specification["collection"],
89+
job=launch_parameters.specification["job"],
10190
version="do-not-care",
10291
)
10392
assert job
10493

10594
# Now apply the variables to the command
10695
decoded_command, status = job_decoder.decode(
10796
job["command"],
108-
variables,
109-
running_workflow_step_id,
97+
launch_parameters.specification_variables,
98+
launch_parameters.running_workflow_step_id,
11099
TextEncoding.JINJA2_3_0,
111100
)
112101
print(f"Decoded command: {decoded_command}")

0 commit comments

Comments
 (0)