1
1
"""The WorkflowEngine execution logic.
2
+
3
+ It responds to Pod and Workflow protocol buffer messages received by its
4
+ 'handle_message()' function, messages delivered by the message handler in the PBC Pod.
5
+ There are no other methods in this class.
6
+
7
+ Its role is to translate a pre-validated workflow definition into the ordered execution
8
+ of step "Jobs" that manifest as Pod "Instances" that run in a project directory in the
9
+ DM.
10
+
11
+ Workflow messages initiate (START) and terminate (STOP) workflows. Pod messages signal
12
+ the end of individual workflow steps and carry the exit code of the executed Job.
13
+ The engine used START messages to launch the first "step" in a workflow and the Pod
14
+ messages to signal the success (or failure) of a prior step. A step's success is used,
15
+ along with it's original workflow definition to determine the next action
16
+ (run the next step or signal the end of the workflow).
17
+
18
+ Before a START message is transmitted the author (typically the Workflow Validator)
19
+ will have created a RunningWorkflow record in the DM. The ID of this record is passed
20
+ in the START message that is sent. The engine uses this ID to find the running workflow
21
+ and the workflow. The engine creates RunningWorkflowStep records for each step that
22
+ is executed, and it uses thew InstanceLauncher to launch the Job (a Pod) for each step.
2
23
"""
3
24
4
25
import logging
8
29
from informaticsmatters .protobuf .datamanager .pod_message_pb2 import PodMessage
9
30
from informaticsmatters .protobuf .datamanager .workflow_message_pb2 import WorkflowMessage
10
31
11
- from .workflow_abc import APIAdapter , InstanceLauncher
32
+ from .workflow_abc import APIAdapter , InstanceLauncher , LaunchResult
12
33
13
34
_LOGGER : logging .Logger = logging .getLogger (__name__ )
14
35
_LOGGER .setLevel (logging .INFO )
15
36
_LOGGER .addHandler (logging .StreamHandler (sys .stdout ))
16
37
17
38
18
39
class WorkflowEngine :
19
- """The workflow engine. An event-driven engine that manages the execution
20
- of validated workflow definitions.
21
- """
40
+ """The workflow engine."""
22
41
23
42
def __init__ (
24
43
self ,
@@ -31,35 +50,29 @@ def __init__(
31
50
self ._instance_launcher = instance_launcher
32
51
33
52
def handle_message (self , msg : Message ) -> None :
34
- """Expect Pod and Workflow messages.
35
-
36
- WorkflowMessages signal the need to start (or stop) validated workflows -
37
- i.e. take a workflow and create a running workflows (and its steps).
38
-
39
- PodMessages signal the completion of a Pod (an Instance/Job) that is part
40
- of a running workflow. Given a Pod Message, we use it to identify the
41
- Instance exit code, running workflow, and step and decide what to do next,
42
- which might be to run the next step in the workflow or abandon the workflow.
53
+ """Expect Workflow and Pod messages.
43
54
44
55
Only pod messages relating to workflow instances will be delivered to this method.
45
- The Pod message has an 'instance' property that provides the UUID of
56
+ The Pod message has an 'instance' property that contains the UUID of
46
57
the instance that was run. This is used to correlate the instance with the
47
- running workflow step.
58
+ running workflow step, and (ultimately the running workflow and workflow) .
48
59
"""
49
60
assert msg
50
61
51
- _LOGGER .debug ("> WE.handle_message() : GOT WorkflowMessage :\n %s" , str (msg ))
62
+ _LOGGER .debug ("Message :\n %s" , str (msg ))
52
63
53
- # Is this a WorkflowMessage or a PodMessage?
54
64
if isinstance (msg , PodMessage ):
55
65
self ._handle_pod_message (msg )
56
66
else :
57
67
self ._handle_workflow_message (msg )
58
68
59
69
def _handle_workflow_message (self , msg : WorkflowMessage ) -> None :
60
- """Handles a WorkflowMessage. This is a message that signals a START or STOP
61
- of a workflow. On START we will load the workflow definition and run (launch)
62
- the first step."""
70
+ """WorkflowMessages signal the need to start (or stop) a workflow using its
71
+ 'action' string field (one of 'START' or 'START').
72
+ The message contains a 'running_workflow' field that contains the UUID
73
+ of an existing RunningWorkflow record in the DM. Using this
74
+ we can locate the Workflow record and interrogate that to identify which
75
+ step (or steps) to launch (run) first."""
63
76
assert msg
64
77
65
78
# ALL THIS CODE ADDED SIMPLY TO DEMONSTRATE THE USE OF THE API ADAPTER
@@ -71,40 +84,51 @@ def _handle_workflow_message(self, msg: WorkflowMessage) -> None:
71
84
72
85
_LOGGER .debug ("WE> WorkflowMessage:\n %s" , str (msg ))
73
86
74
- if msg .action == "START" :
75
- # Using the running workflow get the workflow definition
87
+ action = msg .action
88
+ r_wfid = msg .running_workflow
89
+
90
+ assert action in ["START" , "STOP" ]
91
+ if action == "START" :
92
+ # Using the running workflow...
76
93
response = self ._api_adapter .get_running_workflow (
77
- running_workflow_id = msg . running_workflow
94
+ running_workflow_id = r_wfid
78
95
)
79
96
assert "running_workflow" in response
80
97
running_workflow = response ["running_workflow" ]
81
- _LOGGER .info ("RunningWorkflow: %s" , running_workflow )
82
-
98
+ _LOGGER .debug ("RunningWorkflow: %s" , running_workflow )
99
+ # ...get the workflow definition...
83
100
workflow_id = running_workflow ["workflow" ]["id" ]
84
101
response = self ._api_adapter .get_workflow (workflow_id = workflow_id )
85
102
assert "workflow" in response
86
-
87
103
workflow = response ["workflow" ]
88
- # Now find the first step
89
- # and create a RunningWorkflowStep record prior to launching the instance
104
+
105
+ # Now find the first step and create a RunningWorkflowStep record...
106
+ first_step : str = workflow ["steps" ][0 ]["name" ]
90
107
response = self ._api_adapter .create_running_workflow_step (
91
- running_workflow_id = msg . running_workflow ,
92
- step = workflow [ "steps" ][ 0 ][ "name" ] ,
108
+ running_workflow_id = r_wfid ,
109
+ step = first_step ,
93
110
)
94
111
assert "id" in response
95
112
running_workflow_step_id = response ["id" ]
96
- # The step specification is a string here - pass it directly to the launcher
97
- # which will get the Job command and apply the provided variables to it.
113
+
114
+ # The step's 'specification' is a string here - pass it directly to the
115
+ # launcher along with any appropriate 'variables'. The launcher
116
+ # will get the step's Job command and apply the variables to it to
117
+ # form the command that will be executed for the step.
98
118
step = workflow ["steps" ][0 ]
99
119
project_id = running_workflow ["project_id" ]
100
120
variables = running_workflow ["variables" ]
101
- self ._instance_launcher .launch (
121
+ lr : LaunchResult = self ._instance_launcher .launch (
102
122
project_id = project_id ,
103
123
running_workflow_id = msg .running_workflow ,
104
124
running_workflow_step_id = running_workflow_step_id ,
105
125
step_specification = step ["specification" ],
106
126
variables = variables ,
107
127
)
128
+ assert lr .error == 0
129
+ _LOGGER .info (
130
+ "Launched initial step: %s (command=%s)" , first_step , lr .command
131
+ )
108
132
109
133
else :
110
134
_LOGGER .info ("action=%s" , msg .action )
@@ -142,12 +166,7 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
142
166
_LOGGER .debug (
143
167
"WE> PodMessage: instance=%s, exit_code=%d" , instance_id , exit_code
144
168
)
145
-
146
- # Ignore instances without a running workflow step
147
169
response = self ._api_adapter .get_instance (instance_id = instance_id )
148
- if "running_workflow_step" not in response :
149
- _LOGGER .warning ("WE> PodMessage: Without running_workflow_step" )
150
- return
151
170
running_workflow_step_id : str = response ["running_workflow_step" ]
152
171
response = self ._api_adapter .get_running_workflow_step (
153
172
running_workflow_step_id = running_workflow_step_id
@@ -176,9 +195,6 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
176
195
if success :
177
196
# Given the step for the instance just finished,
178
197
# find the next step in the workflow and launch it.
179
- # If there are no more steps then the workflow is done
180
- # so we need to set the running workflow as done
181
- # and set it's success status too.
182
198
workflow = response ["workflow" ]
183
199
for step in workflow ["steps" ]:
184
200
if step ["name" ] == step_name :
@@ -193,16 +209,25 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
193
209
running_workflow_step_id = response ["id" ]
194
210
project_id = running_workflow ["project_id" ]
195
211
variables = running_workflow ["variables" ]
196
- self ._instance_launcher .launch (
212
+ lr : LaunchResult = self ._instance_launcher .launch (
197
213
project_id = project_id ,
198
214
running_workflow_id = running_workflow_id ,
199
215
running_workflow_step_id = running_workflow_step_id ,
200
216
step_specification = next_step ["specification" ],
201
217
variables = variables ,
202
218
)
203
219
end_of_workflow = False
220
+ assert lr .error == 0
221
+ _LOGGER .info (
222
+ "Launched step: %s (command=%s)" ,
223
+ next_step ["name" ],
224
+ lr .command ,
225
+ )
204
226
break
205
227
228
+ # If there are no more steps then the workflow is done
229
+ # so we need to set the running workflow as done
230
+ # and set its success status too.
206
231
if end_of_workflow :
207
232
self ._api_adapter .set_running_workflow_done (
208
233
running_workflow_id = running_workflow_id ,
0 commit comments