@@ -97,9 +97,14 @@ def _handle_workflow_message(self, msg: WorkflowMessage) -> None:
97
97
)
98
98
99
99
def _handle_workflow_start_message (self , r_wfid : str ) -> None :
100
- """Logic to handle a START message. Here we use the running workflow
101
- (and workflow) to find the first step in the workflow and launch it, passing
102
- the running workflow variables to the launcher."""
100
+ """Logic to handle a START message. This is the beginning of a new
101
+ running workflow. We use the running workflow (and workflow) to find the
102
+ first step in the Workflow and launch it, passing the running workflow variables
103
+ to the launcher.
104
+
105
+ The first step is relatively easy (?) - all the variables
106
+ (for the first step's 'command') will (must) be defined
107
+ in the RunningWorkflow's variables."""
103
108
104
109
rwf_response , _ = self ._wapi_adapter .get_running_workflow (
105
110
running_workflow_id = r_wfid
@@ -131,22 +136,34 @@ def _handle_workflow_start_message(self, r_wfid: str) -> None:
131
136
assert "id" in response
132
137
r_wfsid : str = response ["id" ]
133
138
139
+ # Launch the first step.
140
+ # If there's a launch problem the step (and running workflow) will have
141
+ # and error, stopping it. There will be no Pod event as the launch has failed.
134
142
self ._launch (wf = wf_response , rwf = rwf_response , rwfs_id = r_wfsid , step = first_step )
135
143
136
144
def _handle_pod_message (self , msg : PodMessage ) -> None :
137
145
"""Handles a PodMessage. This is a message that signals the completion of a
138
- step within a workflow. Steps run as "instances" and the Pod message
139
- identifies the Instance. Using the Instance record we can get the
140
- "running workflow step" and then identify the "running workflow" and the
141
- "workflow".
146
+ prior step Job within an existing running workflow.
147
+
148
+ Steps run as "instances" and the Pod message identifies the Instance.
149
+ Using the Instance record we can get the "running workflow step",
150
+ and then identify the "running workflow" and the "workflow".
142
151
143
152
First thing is to adjust the workflow step with the step's success state and
144
- optional error code. If the step was successful we can find the next step
145
- and launch that, or consider the last step to have run and modify the
146
- running workflow record and set's it's success status."""
153
+ optional error code. If the step was successful, armed with the step's
154
+ Workflow we can determine what needs to be done next -
155
+ is this the end or is there another step to run?
156
+
157
+ If there's another step to run we must determine what variables are
158
+ available and present them to the next step. It doesn't matter if we
159
+ provide variables the next step's command does not need, but we MUST
160
+ provide all the variables that the next step's command does need.
161
+
162
+ We also have a 'housekeeping' responsibility - i.e. to keep the
163
+ RunningWorkflowStep and RunningWorkflow status up to date."""
147
164
assert msg
148
165
149
- # The PodMessage has a 'instance', 'has_exit_code', and 'exit_code' values.
166
+ # The PodMessage has an 'instance', 'has_exit_code', and 'exit_code' values.
150
167
_LOGGER .info ("PodMessage:\n %s" , str (msg ))
151
168
152
169
# ALL THIS CODE ADDED SIMPLY TO DEMONSTRATE THE USE OF THE API ADAPTER
@@ -161,6 +178,8 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
161
178
_LOGGER .error ("PodMessage has no exit code" )
162
179
return
163
180
181
+ # The Instance tells us whether the Step (Job) was successful
182
+ # (i.e. we can simply check the 'exit_code').
164
183
instance_id : str = msg .instance
165
184
exit_code : int = msg .exit_code
166
185
response , _ = self ._wapi_adapter .get_instance (instance_id = instance_id )
@@ -189,14 +208,18 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
189
208
"API.get_running_workflow(%s) returned: -\n %s" , r_wfid , str (rwf_response )
190
209
)
191
210
211
+ # If the Step failed there's no need for us to inspect the Workflow
212
+ # (for the next step) as we simply stop here, reporting the appropriate status).
192
213
if exit_code :
193
214
# The job was launched but it failed.
194
215
# Set a step error,
195
216
# This will also set a workflow error so we can leave.
196
217
self ._set_step_error (step_name , r_wfid , r_wfsid , exit_code , "Job failed" )
197
218
return
198
219
199
- # The prior step completed successfully if we get here.
220
+ # If we get here the prior step completed successfully
221
+ # and so we can mark the Step as DOne (successfully),
222
+ # and then inspect the Workflow to determine the next step.
200
223
201
224
self ._wapi_adapter .set_running_workflow_step_done (
202
225
running_workflow_step_id = r_wfsid ,
@@ -207,14 +230,15 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
207
230
wf_response , _ = self ._wapi_adapter .get_workflow (workflow_id = wfid )
208
231
_LOGGER .debug ("API.get_workflow(%s) returned: -\n %s" , wfid , str (wf_response ))
209
232
210
- # Given the step for the instance just finished (successfully) ,
211
- # find the next step n the workflow
212
- # (using the name of the prior step as an index)
213
- # and launch it.
233
+ # We have the step from the Instance that's just finished,
234
+ # so we can use that to find the next step in the Workflow definition.
235
+ # (using the name of the completed step step as an index).
236
+ # Once found, we can launch it (with any variables we think we need) .
214
237
#
215
- # If there are no more steps then the workflow is done.
238
+ # If there are no more steps then the RunningWorkflow is set to
239
+ # finished (done).
216
240
217
- lr : Optional [ LaunchResult ] = None
241
+ launch_attempted : bool = False
218
242
for step in wf_response ["steps" ]:
219
243
if step ["name" ] == step_name :
220
244
step_index = wf_response ["steps" ].index (step )
@@ -245,14 +269,14 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
245
269
step = next_step ,
246
270
)
247
271
248
- # Something was started (or there was a launch error).
272
+ # Something was started (or there was a launch error and the step
273
+ # and running workflow error will have been set).
274
+ # Regardless we can stop now.
275
+ launch_attempted = True
249
276
break
250
277
251
- # If there's no launch result this must be the (successful) end of the workflow.
252
- # If there is a launch result it was either successful
253
- # (and not the end of the workflow) or unsuccessful
254
- # (and the workflow will have been marked as done anyway).
255
- if lr is None :
278
+ # If no launch was attempted we can assume this is the end of the running workflow.
279
+ if not launch_attempted :
256
280
self ._wapi_adapter .set_running_workflow_done (
257
281
running_workflow_id = r_wfid ,
258
282
success = True ,
@@ -289,28 +313,35 @@ def _validate_step_command(
289
313
290
314
# The step's 'specification' is a string - pass it directly to the
291
315
# launcher along with any (optional) 'workflow variables'. The launcher
292
- # will apply the variables to step's Job command but we need to handle
316
+ # will apply the variables to the step's Job command but we need to handle
293
317
# any launch problems. The validator should have checked to ensure that
294
318
# variable expansion will work, but we must prepare for the unexpected.
295
319
#
296
- # What the engine has to do here is make sure that the definition
320
+ # What the engine has to do here is make sure that the Job
297
321
# that's about to be launched has all its configuration requirements
298
- # satisfied (inputs, outputs and options). Basically the
299
- # command must be successfully rendered with what we have.
322
+ # satisfied (inputs, outputs and options). Basically we must ensure
323
+ # that the Job definition's 'command' can be compiled by applying
324
+ # the available variables.
325
+ #
326
+ # To prevent launcher errors relating to decoding we get the command ourselves
327
+ # and then apply the current set of variables. And we use the JobDecoder's
328
+ # 'decode()' method to do this. It returns a tuple (str and boolean).
329
+ # If the boolean is True then the command can be compiled
330
+ # (i.e. it has no missing variables) and the launcher should not complain
331
+ # about the command (as we'll pass the same variables to it).
332
+ # If the returned boolean is False then we can expect the returned str
333
+ # to contain an error message.
300
334
#
301
- # To do this we give the command and our variables
302
- # to the Job Decoder's 'decode()' method. It returns a tuple (str and boolean).
303
- # If the boolean is True then the command has no undefined configuration
304
- # and can be launched. If it is False then the returned str contains an
305
- # error message.
335
+ # Remember that variables can exist in the specification too!
336
+ # The full set of step variables can be obtained
337
+ # (in descending order of priority) from...
306
338
#
307
- # Remember that variables can exist in the specification too.
308
- # So, the full set of step variables (in ascending order of priority)
309
- # is...
339
+ # 1. The RunningWorkflow
340
+ # 2. The Workflow
341
+ # 3. The Job Specification
310
342
#
311
- # 1. The specification
312
- # 2. The workflow
313
- # 3. The RunningWorkflow
343
+ # If variable 'x' is defined in all three then the RunningWorkflow's
344
+ # value must be used.
314
345
315
346
all_variables = step_spec .pop ("variables" ) if "variables" in step_spec else {}
316
347
if workflow_variables :
@@ -380,7 +411,7 @@ def _launch(
380
411
if lr .error_num :
381
412
self ._set_step_error (step_name , rwf_id , rwfs_id , lr .error_num , lr .error_msg )
382
413
else :
383
- _LOGGER .info ("Launched first step '%s' (command=%s)" , step_name , lr .command )
414
+ _LOGGER .info ("Launched step '%s' (command=%s)" , step_name , lr .command )
384
415
385
416
def _set_step_error (
386
417
self ,
0 commit comments