Skip to content

Commit 83e3fb6

Browse files
author
Alan Christie
committed
fix: Better doc and launcher now handles failures
1 parent e998045 commit 83e3fb6

File tree

2 files changed

+25
-15
lines changed

2 files changed

+25
-15
lines changed

tests/api_adapter.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
_JOB_DEFINITIONS: Dict[str, Any] = yaml.load(jd_file, Loader=yaml.FullLoader)
1616
assert _JOB_DEFINITIONS
1717

18+
# Table UUID formats
1819
_INSTANCE_ID_FORMAT: str = "instance-00000000-0000-0000-0000-{id:012d}"
1920
_TASK_ID_FORMAT: str = "task-00000000-0000-0000-0000-{id:012d}"
2021
_WORKFLOW_DEFINITION_ID_FORMAT: str = "workflow-00000000-0000-0000-0000-{id:012d}"
@@ -23,6 +24,7 @@
2324
"r-workflow-step-00000000-0000-0000-0000-{id:012d}"
2425
)
2526

27+
# Pickle files (representing each 'Table')
2628
_WORKFLOW_PICKLE_FILE: str = "workflow.pickle"
2729
_RUNNING_WORKFLOW_PICKLE_FILE: str = "running-workflow.pickle"
2830
_RUNNING_WORKFLOW_STEP_PICKLE_FILE: str = "running-workflow-step.pickle"
@@ -33,7 +35,11 @@
3335
class UnitTestAPIAdapter(APIAdapter):
3436
"""A minimal API adapter. It serves-up Job Definitions
3537
from the job-definitions/job-definitions.yaml file and provides basic
36-
(in-memory) storage for Workflow Definitions and related tables."""
38+
storage for Workflow Definitions and related tables.
39+
40+
Because the adapter is used by the multi-processing test suite, it uses both a lock
41+
and pickle files to store data, so that data can be shared between processes.
42+
"""
3743

3844
mp_lock = Lock()
3945

@@ -74,9 +80,7 @@ def get_workflow(self, *, workflow_id: str) -> Dict[str, Any]:
7480
workflow = Unpickler(pickle_file).load()
7581
UnitTestAPIAdapter.mp_lock.release()
7682

77-
if workflow_id not in workflow:
78-
return {}
79-
return {"workflow": workflow[workflow_id]}
83+
return {"workflow": workflow[workflow_id]} if workflow_id in workflow else {}
8084

8185
def get_workflow_by_name(self, *, name: str, version: str) -> Dict[str, Any]:
8286
UnitTestAPIAdapter.mp_lock.acquire()
@@ -224,9 +228,7 @@ def get_instance(self, *, instance_id: str) -> Dict[str, Any]:
224228
instances = Unpickler(pickle_file).load()
225229
UnitTestAPIAdapter.mp_lock.release()
226230

227-
if instance_id not in instances:
228-
return {}
229-
return instances[instance_id]
231+
return {} if instance_id not in instances else instances[instance_id]
230232

231233
def create_task(self, *, instance_id: str) -> Dict[str, Any]:
232234
UnitTestAPIAdapter.mp_lock.acquire()
@@ -253,9 +255,7 @@ def get_task(self, *, task_id: str) -> Dict[str, Any]:
253255
tasks = Unpickler(pickle_file).load()
254256
UnitTestAPIAdapter.mp_lock.release()
255257

256-
if task_id not in tasks:
257-
return {}
258-
return tasks[task_id]
258+
return {} if task_id not in tasks else tasks[task_id]
259259

260260
def get_job(
261261
self, *, collection: str, job: str, version: str

tests/instance_launcher.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,19 @@
1616
class UnitTestInstanceLauncher(InstanceLauncher):
1717
"""A unit test instance launcher, which runs the
1818
Python module that matches the job name in the provided specification.
19-
It also uses the UnitTestMessageDispatcher to send the simulated
20-
'end of instance' PodMessage (to the WorkflowEngine).
19+
20+
The Python module used to satisfy the step matches the job name in the
21+
step specification. If the step_specification's 'job' is 'my_job', then the launcher
22+
will run the Python module 'my_job.py' in the 'jobs' directory. The
23+
module is run synchronously - i.e. the launch() method waits for the
24+
module to complete.
25+
26+
It then uses the UnitTestMessageDispatcher to send a simulated
27+
'end of instance' PodMessage that will be received by the WorkflowEngine's
28+
'handle_message()' method. The 'exit code' of the module is passed to the
29+
WorkflowEngine through the PodMessage - so if the module fails (i.e. returns
30+
a non-zero exit code) then the WorkflowEngine will see that the PodMessage.
31+
This allows you to write jobs that fail and see how the WorkflowEngine responds.
2132
"""
2233

2334
def __init__(
@@ -63,8 +74,7 @@ def launch(
6374

6475
job_cmd: List[str] = ["python", job_module]
6576
print(f"Running job command: {job_cmd}")
66-
completed_process: CompletedProcess = subprocess.run(job_cmd, check=True)
67-
assert completed_process.returncode == 0
77+
completed_process: CompletedProcess = subprocess.run(job_cmd, check=False)
6878

6979
# Simulate a PodMessage (that will contain the instance ID),
7080
# filling-in only the fields that are of use to the Engine.
@@ -74,7 +84,7 @@ def launch(
7484
pod_message.instance = instance_id
7585
pod_message.task = task_id
7686
pod_message.has_exit_code = True
77-
pod_message.exit_code = 0
87+
pod_message.exit_code = completed_process.returncode
7888
self._msg_dispatcher.send(pod_message)
7989

8090
return LaunchResult(

0 commit comments

Comments
 (0)