Skip to content

Commit d686a7c

Browse files
author
Alan Christie
committed
feat: Launcher now creates PodMessage
1 parent b9fe02e commit d686a7c

File tree

3 files changed

+39
-14
lines changed

3 files changed

+39
-14
lines changed

tests/instance_launcher.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import os
22
import subprocess
3+
from datetime import datetime, timezone
34
from subprocess import CompletedProcess
4-
from typing import Any, Callable, Dict, List, Optional
5+
from typing import Any, Dict, List
56

67
from informaticsmatters.protobuf.datamanager.pod_message_pb2 import PodMessage
78

@@ -32,15 +33,27 @@ def launch(
3233
*,
3334
project_id: str,
3435
workflow_id: str,
36+
running_workflow_step_id: str,
3537
workflow_definition: Dict[str, Any],
3638
step: str,
3739
step_specification: Dict[str, Any],
38-
completion_callback: Optional[Callable[[PodMessage], None]],
3940
) -> LaunchResult:
4041
assert project_id
4142
assert workflow_id
4243
assert step_specification
4344

45+
# We're passed a RunningWorkflowStep ID but a record is expected to have been
46+
# created bt the caller, we simply create instance records.
47+
response = self._api_adapter.get_running_workflow_step(
48+
running_workflow_step_id=running_workflow_step_id
49+
)
50+
assert "running_workflow_step" in response
51+
# Now simulate the creation of an Instance record
52+
response = self._api_adapter.create_instance(
53+
running_workflow_step_id=running_workflow_step_id
54+
)
55+
instance_id = response["instance_id"]
56+
4457
# Just run the Python module that matched the 'job' in the step specification.
4558
# Don't care about 'version' or 'collection'.
4659
job: str = step_specification["job"]
@@ -52,10 +65,20 @@ def launch(
5265
completed_process: CompletedProcess = subprocess.run(job_cmd, check=True)
5366
assert completed_process.returncode == 0
5467

68+
# Simulate a PodMessage (that will contain the instance ID),
69+
# filling-in only the fields that are of use to the Engine.
70+
pod_message = PodMessage()
71+
pod_message.timestamp = f"{datetime.now(timezone.utc).isoformat()}Z"
72+
pod_message.phase = "Completed"
73+
pod_message.instance = instance_id
74+
pod_message.has_exit_code = True
75+
pod_message.exit_code = 0
76+
self._msg_dispatcher.send(pod_message)
77+
5578
return LaunchResult(
5679
error=0,
5780
error_msg=None,
58-
instance_id="instance-00000000-0000-0000-0000-000000000000",
81+
instance_id=instance_id,
5982
task_id="task-00000000-0000-0000-0000-000000000000",
6083
command=" ".join(job_cmd),
6184
)

tests/test_test_instance_launcher.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,31 @@ def basic_launcher():
1515
api_adapter = UnitTestAPIAdapter()
1616
message_queue = UnitTestMessageQueue()
1717
message_dispatcher = UnitTestMessageDispatcher(msg_queue=message_queue)
18-
return UnitTestInstanceLauncher(
18+
instance_launcher = UnitTestInstanceLauncher(
1919
api_adapter=api_adapter, msg_dispatcher=message_dispatcher
2020
)
21+
return [api_adapter, instance_launcher]
2122

2223

2324
def test_get_nop_job(basic_launcher):
2425
# Arrange
26+
utaa = basic_launcher[0]
27+
launcher = basic_launcher[1]
28+
response = utaa.create_workflow(workflow_definition={"name": "blah"})
29+
response = utaa.create_running_workflow(workflow_definition_id=response["id"])
30+
response = utaa.create_running_workflow_step(
31+
running_workflow_id=response["id"], step="step-1"
32+
)
33+
rwfsid = response["id"]
2534

2635
# Act
27-
result = basic_launcher.launch(
36+
result = launcher.launch(
2837
project_id="project-00000000-0000-0000-0000-000000000000",
2938
workflow_id="workflow-00000000-0000-0000-0000-000000000000",
39+
running_workflow_step_id=rwfsid,
3040
workflow_definition={},
3141
step="step-1",
3242
step_specification={"job": "nop"},
33-
completion_callback=None,
3443
)
3544

3645
# Assert

workflow/workflow_abc.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@
44

55
from abc import ABC, abstractmethod
66
from dataclasses import dataclass
7-
from typing import Any, Callable, Dict, List, Optional
7+
from typing import Any, Dict, List, Optional
88

99
from google.protobuf.message import Message
10-
from informaticsmatters.protobuf.datamanager.pod_message_pb2 import PodMessage
1110

1211

1312
@dataclass
@@ -33,7 +32,6 @@ def launch(
3332
workflow_definition: Dict[str, Any],
3433
step: str,
3534
step_specification: Dict[str, Any],
36-
completion_callback: Optional[Callable[[PodMessage], None]],
3735
) -> LaunchResult:
3836
"""Launch a (Job) Instance"""
3937

@@ -48,11 +46,6 @@ def launch(
4846
# step_specification by the WE? Remember that we have to deal with
4947
# "input Handlers" that manipulate the specification variables.
5048
# See _instance_preamble() in the DM's api_instance.py module.
51-
#
52-
# The completion_callback is only used in local testing and is a function
53-
# that should be able to process a PodMessage that indicates a workflow Job
54-
# has completed. When the WorkflowEngine is embedded in the data Manager
55-
# the Data Manager will not permit the use of this parameter.
5649

5750

5851
class APIAdapter(ABC):

0 commit comments

Comments
 (0)