Skip to content

Commit 55fb660

Browse files
author
Alan Christie
committed
docs: Significant doc update
1 parent 53ad3ca commit 55fb660

File tree

1 file changed

+71
-40
lines changed

1 file changed

+71
-40
lines changed

workflow/workflow_engine.py

Lines changed: 71 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,14 @@ def _handle_workflow_message(self, msg: WorkflowMessage) -> None:
9797
)
9898

9999
def _handle_workflow_start_message(self, r_wfid: str) -> None:
100-
"""Logic to handle a START message. Here we use the running workflow
101-
(and workflow) to find the first step in the workflow and launch it, passing
102-
the running workflow variables to the launcher."""
100+
"""Logic to handle a START message. This is the beginning of a new
101+
running workflow. We use the running workflow (and workflow) to find the
102+
first step in the Workflow and launch it, passing the running workflow variables
103+
to the launcher.
104+
105+
The first step is relatively easy (?) - all the variables
106+
(for the first step's 'command') will (must) be defined
107+
in the RunningWorkflow's variables."""
103108

104109
rwf_response, _ = self._wapi_adapter.get_running_workflow(
105110
running_workflow_id=r_wfid
@@ -131,22 +136,34 @@ def _handle_workflow_start_message(self, r_wfid: str) -> None:
131136
assert "id" in response
132137
r_wfsid: str = response["id"]
133138

139+
# Launch the first step.
140+
# If there's a launch problem the step (and running workflow) will have
141+
# and error, stopping it. There will be no Pod event as the launch has failed.
134142
self._launch(wf=wf_response, rwf=rwf_response, rwfs_id=r_wfsid, step=first_step)
135143

136144
def _handle_pod_message(self, msg: PodMessage) -> None:
137145
"""Handles a PodMessage. This is a message that signals the completion of a
138-
step within a workflow. Steps run as "instances" and the Pod message
139-
identifies the Instance. Using the Instance record we can get the
140-
"running workflow step" and then identify the "running workflow" and the
141-
"workflow".
146+
prior step Job within an existing running workflow.
147+
148+
Steps run as "instances" and the Pod message identifies the Instance.
149+
Using the Instance record we can get the "running workflow step",
150+
and then identify the "running workflow" and the "workflow".
142151
143152
First thing is to adjust the workflow step with the step's success state and
144-
optional error code. If the step was successful we can find the next step
145-
and launch that, or consider the last step to have run and modify the
146-
running workflow record and set's it's success status."""
153+
optional error code. If the step was successful, armed with the step's
154+
Workflow we can determine what needs to be done next -
155+
is this the end or is there another step to run?
156+
157+
If there's another step to run we must determine what variables are
158+
available and present them to the next step. It doesn't matter if we
159+
provide variables the next step's command does not need, but we MUST
160+
provide all the variables that the next step's command does need.
161+
162+
We also have a 'housekeeping' responsibility - i.e. to keep the
163+
RunningWorkflowStep and RunningWorkflow status up to date."""
147164
assert msg
148165

149-
# The PodMessage has a 'instance', 'has_exit_code', and 'exit_code' values.
166+
# The PodMessage has an 'instance', 'has_exit_code', and 'exit_code' values.
150167
_LOGGER.info("PodMessage:\n%s", str(msg))
151168

152169
# ALL THIS CODE ADDED SIMPLY TO DEMONSTRATE THE USE OF THE API ADAPTER
@@ -161,6 +178,8 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
161178
_LOGGER.error("PodMessage has no exit code")
162179
return
163180

181+
# The Instance tells us whether the Step (Job) was successful
182+
# (i.e. we can simply check the 'exit_code').
164183
instance_id: str = msg.instance
165184
exit_code: int = msg.exit_code
166185
response, _ = self._wapi_adapter.get_instance(instance_id=instance_id)
@@ -189,14 +208,18 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
189208
"API.get_running_workflow(%s) returned: -\n%s", r_wfid, str(rwf_response)
190209
)
191210

211+
# If the Step failed there's no need for us to inspect the Workflow
212+
# (for the next step) as we simply stop here, reporting the appropriate status).
192213
if exit_code:
193214
# The job was launched but it failed.
194215
# Set a step error,
195216
# This will also set a workflow error so we can leave.
196217
self._set_step_error(step_name, r_wfid, r_wfsid, exit_code, "Job failed")
197218
return
198219

199-
# The prior step completed successfully if we get here.
220+
# If we get here the prior step completed successfully
221+
# and so we can mark the Step as DOne (successfully),
222+
# and then inspect the Workflow to determine the next step.
200223

201224
self._wapi_adapter.set_running_workflow_step_done(
202225
running_workflow_step_id=r_wfsid,
@@ -207,14 +230,15 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
207230
wf_response, _ = self._wapi_adapter.get_workflow(workflow_id=wfid)
208231
_LOGGER.debug("API.get_workflow(%s) returned: -\n%s", wfid, str(wf_response))
209232

210-
# Given the step for the instance just finished (successfully),
211-
# find the next step n the workflow
212-
# (using the name of the prior step as an index)
213-
# and launch it.
233+
# We have the step from the Instance that's just finished,
234+
# so we can use that to find the next step in the Workflow definition.
235+
# (using the name of the completed step step as an index).
236+
# Once found, we can launch it (with any variables we think we need).
214237
#
215-
# If there are no more steps then the workflow is done.
238+
# If there are no more steps then the RunningWorkflow is set to
239+
# finished (done).
216240

217-
lr: Optional[LaunchResult] = None
241+
launch_attempted: bool = False
218242
for step in wf_response["steps"]:
219243
if step["name"] == step_name:
220244
step_index = wf_response["steps"].index(step)
@@ -245,14 +269,14 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
245269
step=next_step,
246270
)
247271

248-
# Something was started (or there was a launch error).
272+
# Something was started (or there was a launch error and the step
273+
# and running workflow error will have been set).
274+
# Regardless we can stop now.
275+
launch_attempted = True
249276
break
250277

251-
# If there's no launch result this must be the (successful) end of the workflow.
252-
# If there is a launch result it was either successful
253-
# (and not the end of the workflow) or unsuccessful
254-
# (and the workflow will have been marked as done anyway).
255-
if lr is None:
278+
# If no launch was attempted we can assume this is the end of the running workflow.
279+
if not launch_attempted:
256280
self._wapi_adapter.set_running_workflow_done(
257281
running_workflow_id=r_wfid,
258282
success=True,
@@ -289,28 +313,35 @@ def _validate_step_command(
289313

290314
# The step's 'specification' is a string - pass it directly to the
291315
# launcher along with any (optional) 'workflow variables'. The launcher
292-
# will apply the variables to step's Job command but we need to handle
316+
# will apply the variables to the step's Job command but we need to handle
293317
# any launch problems. The validator should have checked to ensure that
294318
# variable expansion will work, but we must prepare for the unexpected.
295319
#
296-
# What the engine has to do here is make sure that the definition
320+
# What the engine has to do here is make sure that the Job
297321
# that's about to be launched has all its configuration requirements
298-
# satisfied (inputs, outputs and options). Basically the
299-
# command must be successfully rendered with what we have.
322+
# satisfied (inputs, outputs and options). Basically we must ensure
323+
# that the Job definition's 'command' can be compiled by applying
324+
# the available variables.
325+
#
326+
# To prevent launcher errors relating to decoding we get the command ourselves
327+
# and then apply the current set of variables. And we use the JobDecoder's
328+
# 'decode()' method to do this. It returns a tuple (str and boolean).
329+
# If the boolean is True then the command can be compiled
330+
# (i.e. it has no missing variables) and the launcher should not complain
331+
# about the command (as we'll pass the same variables to it).
332+
# If the returned boolean is False then we can expect the returned str
333+
# to contain an error message.
300334
#
301-
# To do this we give the command and our variables
302-
# to the Job Decoder's 'decode()' method. It returns a tuple (str and boolean).
303-
# If the boolean is True then the command has no undefined configuration
304-
# and can be launched. If it is False then the returned str contains an
305-
# error message.
335+
# Remember that variables can exist in the specification too!
336+
# The full set of step variables can be obtained
337+
# (in descending order of priority) from...
306338
#
307-
# Remember that variables can exist in the specification too.
308-
# So, the full set of step variables (in ascending order of priority)
309-
# is...
339+
# 1. The RunningWorkflow
340+
# 2. The Workflow
341+
# 3. The Job Specification
310342
#
311-
# 1. The specification
312-
# 2. The workflow
313-
# 3. The RunningWorkflow
343+
# If variable 'x' is defined in all three then the RunningWorkflow's
344+
# value must be used.
314345

315346
all_variables = step_spec.pop("variables") if "variables" in step_spec else {}
316347
if workflow_variables:
@@ -380,7 +411,7 @@ def _launch(
380411
if lr.error_num:
381412
self._set_step_error(step_name, rwf_id, rwfs_id, lr.error_num, lr.error_msg)
382413
else:
383-
_LOGGER.info("Launched first step '%s' (command=%s)", step_name, lr.command)
414+
_LOGGER.info("Launched step '%s' (command=%s)", step_name, lr.command)
384415

385416
def _set_step_error(
386417
self,

0 commit comments

Comments
 (0)