Skip to content

Commit e0e7fd0

Browse files
author
Alan Christie
committed
feat: Refactor design (to include message dispatcher)
1 parent 1c46ee3 commit e0e7fd0

12 files changed

+199
-74
lines changed

tests/database_adapter.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,9 @@ def get_workflow_by_name(
2929
def get_job(
3030
self, *, collection: str, job: str, version: str
3131
) -> Optional[Dict[str, Any]]:
32-
if collection != _JOB_DEFINITIONS["collection"]:
33-
return 1
34-
if job not in _JOB_DEFINITIONS["jobs"]:
35-
return 2
36-
if version != _JOB_DEFINITIONS["jobs"][job]["version"]:
37-
return 3
32+
assert collection == _JOB_DEFINITIONS["collection"]
33+
assert job in _JOB_DEFINITIONS["jobs"]
34+
3835
jd = _JOB_DEFINITIONS["jobs"][job]
3936
response = {"command": jd["command"]}
4037
if "variables" in jd:

tests/instance_launcher.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import os
2+
import subprocess
3+
from subprocess import CompletedProcess
4+
from typing import Any, Callable, Dict, List, Optional
5+
6+
from informaticsmatters.protobuf.datamanager.pod_message_pb2 import PodMessage
7+
8+
from workflow.workflow_abc import InstanceLauncher, LaunchResult
9+
10+
_JOB_DIRECTORY: str = os.path.join(os.path.dirname(__file__), "jobs")
11+
12+
_SUCCESS_LAUNCH_RESULT: LaunchResult = LaunchResult(
13+
error=0,
14+
error_msg=None,
15+
instance_id="instance-00000000-0000-0000-0000-000000000000",
16+
task_id="task-00000000-0000-0000-0000-000000000000",
17+
)
18+
19+
20+
class UnitTestInstanceLauncher(InstanceLauncher):
21+
"""A unit test instance launcher, which runs the
22+
Python module that matches the job name in the provided specification.
23+
"""
24+
25+
def launch(
26+
self,
27+
*,
28+
project_id: str,
29+
workflow_id: str,
30+
workflow_definition: Dict[str, Any],
31+
step: str,
32+
step_specification: Dict[str, Any],
33+
completion_callback: Optional[Callable[[PodMessage], None]],
34+
) -> LaunchResult:
35+
assert project_id
36+
assert workflow_id
37+
assert step_specification
38+
39+
# Just run the Python module that matched the 'job' in the step specification.
40+
# Don't care about 'version' or 'collection'.
41+
job: str = step_specification["job"]
42+
job_module = f"{_JOB_DIRECTORY}/{job}.py"
43+
assert os.path.isfile(job_module)
44+
45+
job_cmd: List[str] = ["python", job_module]
46+
print(f"Running job command: {job_cmd}")
47+
completed_process: CompletedProcess = subprocess.run(job_cmd, check=True)
48+
assert completed_process.returncode == 0
49+
50+
return _SUCCESS_LAUNCH_RESULT

tests/job-definitions/job-definitions.yaml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,17 @@
1010
# All jobs used for unit testing must be defined in this file.
1111
#
1212
# For each job you must provide: -
13-
# - Job version (and only one version for each Job)
1413
# - Job command
14+
#
15+
# The command is meaningless as the job that is run is expected to be
16+
# a Python 3 module called <job>.py in the tests/jobs directory.
17+
# So a nop job would have a corresponding Python module called nop.py.
18+
# The test instance_launcher module will run the job.
1519

1620
collection: workflow-engine-unit-test-jobs
1721

1822
jobs:
1923

2024
nop:
21-
version: "1.0.0"
2225
command: >-
2326
python --version

tests/jobs/nop.py

Whitespace-only changes.

tests/message_dispatcher.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from google.protobuf.message import Message
2+
3+
from workflow.workflow_abc import MessageDispatcher
4+
5+
6+
class UnitTestMessageDispatcher(MessageDispatcher):
7+
"""A minimal Message dispatcher to support testing."""
8+
9+
def send(self, message: Message) -> None:
10+
assert message

tests/test_database_adapter.py renamed to tests/test_test_database_adapter.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
# Tests for the decoder package.
2-
import os
3-
from typing import Any, Dict
42

53
import pytest
6-
import yaml
74

85
pytestmark = pytest.mark.unit
96

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Tests for the decoder package.
2+
3+
import pytest
4+
5+
pytestmark = pytest.mark.unit
6+
7+
from tests.instance_launcher import UnitTestInstanceLauncher
8+
9+
10+
def test_get_nop_job():
11+
# Arrange
12+
util = UnitTestInstanceLauncher()
13+
14+
# Act
15+
result = util.launch(
16+
project_id="project-00000000-0000-0000-0000-000000000000",
17+
workflow_id="workflow-00000000-0000-0000-0000-000000000000",
18+
workflow_definition={},
19+
step="step-1",
20+
step_specification={"job": "nop"},
21+
completion_callback=None,
22+
)
23+
24+
# Assert
25+
assert result.error == 0
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Tests for the decoder package.
2+
3+
import pytest
4+
5+
pytestmark = pytest.mark.unit
6+
7+
from tests.message_dispatcher import UnitTestMessageDispatcher
8+
9+
10+
def test_get_nop_job():
11+
# Arrange
12+
utmd = UnitTestMessageDispatcher()
13+
14+
# Act
15+
utmd.send(1)
16+
17+
# Assert

tests/test_worflow_validator.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,16 @@
55
pytestmark = pytest.mark.unit
66

77
from tests.database_adapter import UnitTestDatabaseAdapter
8+
from tests.message_dispatcher import UnitTestMessageDispatcher
89
from tests.test_decoder_minimal import _MINIMAL_WORKFLOW
910
from workflow.worklfow_validator import ValidationLevel, WorkflowValidator
1011

1112

1213
def test_validate_minimal_for_create():
1314
# Arrange
1415
db_adapter = UnitTestDatabaseAdapter()
15-
validator = WorkflowValidator(db_adapter=db_adapter)
16+
msg_dispatcher = UnitTestMessageDispatcher()
17+
validator = WorkflowValidator(db_adapter=db_adapter, msg_dispatcher=msg_dispatcher)
1618

1719
# Act
1820
error = validator.validate(

workflow/workflow_abc.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44

55
from abc import ABC, abstractmethod
66
from dataclasses import dataclass
7-
from typing import Any, Dict, Optional
7+
from typing import Any, Callable, Dict, Optional
8+
9+
from google.protobuf.message import Message
10+
from informaticsmatters.protobuf.datamanager.pod_message_pb2 import PodMessage
811

912

1013
@dataclass
@@ -29,6 +32,7 @@ def launch(
2932
workflow_definition: Dict[str, Any],
3033
step: str,
3134
step_specification: Dict[str, Any],
35+
completion_callback: Optional[Callable[[PodMessage], None]],
3236
) -> LaunchResult:
3337
"""Launch a (Job) Instance"""
3438

@@ -43,6 +47,11 @@ def launch(
4347
# step_specification by the WE? Remember that we have to deal with
4448
# "input Handlers" that manipulate the specification variables.
4549
# See _instance_preamble() in the DM's api_instance.py module.
50+
#
51+
# The completion_callback is only used in local testing and is a function
52+
# that should be able to process a PodMessage that indicates a workflow Job
53+
# has completed. When the WorkflowEngine is embedded in the data Manager
54+
# the Data Manager will not permit the use of this parameter.
4655

4756

4857
class DatabaseAdapter(ABC):
@@ -79,3 +88,11 @@ def get_job(
7988
version: str,
8089
) -> Optional[Dict[str, Any]]:
8190
"""Get a Job"""
91+
92+
93+
class MessageDispatcher(ABC):
94+
"""The class handling the sending of messages (on the Data Manager message bus)."""
95+
96+
@abstractmethod
97+
def send(self, message: Message) -> None:
98+
"""Send a message"""

0 commit comments

Comments
 (0)