Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 71 additions & 40 deletions workflow/workflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,14 @@ def _handle_workflow_message(self, msg: WorkflowMessage) -> None:
)

def _handle_workflow_start_message(self, r_wfid: str) -> None:
"""Logic to handle a START message. Here we use the running workflow
(and workflow) to find the first step in the workflow and launch it, passing
the running workflow variables to the launcher."""
"""Logic to handle a START message. This is the beginning of a new
running workflow. We use the running workflow (and workflow) to find the
first step in the Workflow and launch it, passing the running workflow variables
to the launcher.

The first step is relatively easy (?) - all the variables
(for the first step's 'command') will (must) be defined
in the RunningWorkflow's variables."""

rwf_response, _ = self._wapi_adapter.get_running_workflow(
running_workflow_id=r_wfid
Expand Down Expand Up @@ -131,22 +136,34 @@ def _handle_workflow_start_message(self, r_wfid: str) -> None:
assert "id" in response
r_wfsid: str = response["id"]

# Launch the first step.
# If there's a launch problem the step (and running workflow) will have
# and error, stopping it. There will be no Pod event as the launch has failed.
self._launch(wf=wf_response, rwf=rwf_response, rwfs_id=r_wfsid, step=first_step)

def _handle_pod_message(self, msg: PodMessage) -> None:
"""Handles a PodMessage. This is a message that signals the completion of a
step within a workflow. Steps run as "instances" and the Pod message
identifies the Instance. Using the Instance record we can get the
"running workflow step" and then identify the "running workflow" and the
"workflow".
prior step Job within an existing running workflow.

Steps run as "instances" and the Pod message identifies the Instance.
Using the Instance record we can get the "running workflow step",
and then identify the "running workflow" and the "workflow".

First thing is to adjust the workflow step with the step's success state and
optional error code. If the step was successful we can find the next step
and launch that, or consider the last step to have run and modify the
running workflow record and set's it's success status."""
optional error code. If the step was successful, armed with the step's
Workflow we can determine what needs to be done next -
is this the end or is there another step to run?

If there's another step to run we must determine what variables are
available and present them to the next step. It doesn't matter if we
provide variables the next step's command does not need, but we MUST
provide all the variables that the next step's command does need.

We also have a 'housekeeping' responsibility - i.e. to keep the
RunningWorkflowStep and RunningWorkflow status up to date."""
assert msg

# The PodMessage has a 'instance', 'has_exit_code', and 'exit_code' values.
# The PodMessage has an 'instance', 'has_exit_code', and 'exit_code' values.
_LOGGER.info("PodMessage:\n%s", str(msg))

# ALL THIS CODE ADDED SIMPLY TO DEMONSTRATE THE USE OF THE API ADAPTER
Expand All @@ -161,6 +178,8 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
_LOGGER.error("PodMessage has no exit code")
return

# The Instance tells us whether the Step (Job) was successful
# (i.e. we can simply check the 'exit_code').
instance_id: str = msg.instance
exit_code: int = msg.exit_code
response, _ = self._wapi_adapter.get_instance(instance_id=instance_id)
Expand Down Expand Up @@ -189,14 +208,18 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
"API.get_running_workflow(%s) returned: -\n%s", r_wfid, str(rwf_response)
)

# If the Step failed there's no need for us to inspect the Workflow
# (for the next step) as we simply stop here, reporting the appropriate status).
if exit_code:
# The job was launched but it failed.
# Set a step error,
# This will also set a workflow error so we can leave.
self._set_step_error(step_name, r_wfid, r_wfsid, exit_code, "Job failed")
return

# The prior step completed successfully if we get here.
# If we get here the prior step completed successfully
# and so we can mark the Step as DOne (successfully),
# and then inspect the Workflow to determine the next step.

self._wapi_adapter.set_running_workflow_step_done(
running_workflow_step_id=r_wfsid,
Expand All @@ -207,14 +230,15 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
wf_response, _ = self._wapi_adapter.get_workflow(workflow_id=wfid)
_LOGGER.debug("API.get_workflow(%s) returned: -\n%s", wfid, str(wf_response))

# Given the step for the instance just finished (successfully),
# find the next step n the workflow
# (using the name of the prior step as an index)
# and launch it.
# We have the step from the Instance that's just finished,
# so we can use that to find the next step in the Workflow definition.
# (using the name of the completed step step as an index).
# Once found, we can launch it (with any variables we think we need).
#
# If there are no more steps then the workflow is done.
# If there are no more steps then the RunningWorkflow is set to
# finished (done).

lr: Optional[LaunchResult] = None
launch_attempted: bool = False
for step in wf_response["steps"]:
if step["name"] == step_name:
step_index = wf_response["steps"].index(step)
Expand Down Expand Up @@ -245,14 +269,14 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
step=next_step,
)

# Something was started (or there was a launch error).
# Something was started (or there was a launch error and the step
# and running workflow error will have been set).
# Regardless we can stop now.
launch_attempted = True
break

# If there's no launch result this must be the (successful) end of the workflow.
# If there is a launch result it was either successful
# (and not the end of the workflow) or unsuccessful
# (and the workflow will have been marked as done anyway).
if lr is None:
# If no launch was attempted we can assume this is the end of the running workflow.
if not launch_attempted:
self._wapi_adapter.set_running_workflow_done(
running_workflow_id=r_wfid,
success=True,
Expand Down Expand Up @@ -289,28 +313,35 @@ def _validate_step_command(

# The step's 'specification' is a string - pass it directly to the
# launcher along with any (optional) 'workflow variables'. The launcher
# will apply the variables to step's Job command but we need to handle
# will apply the variables to the step's Job command but we need to handle
# any launch problems. The validator should have checked to ensure that
# variable expansion will work, but we must prepare for the unexpected.
#
# What the engine has to do here is make sure that the definition
# What the engine has to do here is make sure that the Job
# that's about to be launched has all its configuration requirements
# satisfied (inputs, outputs and options). Basically the
# command must be successfully rendered with what we have.
# satisfied (inputs, outputs and options). Basically we must ensure
# that the Job definition's 'command' can be compiled by applying
# the available variables.
#
# To prevent launcher errors relating to decoding we get the command ourselves
# and then apply the current set of variables. And we use the JobDecoder's
# 'decode()' method to do this. It returns a tuple (str and boolean).
# If the boolean is True then the command can be compiled
# (i.e. it has no missing variables) and the launcher should not complain
# about the command (as we'll pass the same variables to it).
# If the returned boolean is False then we can expect the returned str
# to contain an error message.
#
# To do this we give the command and our variables
# to the Job Decoder's 'decode()' method. It returns a tuple (str and boolean).
# If the boolean is True then the command has no undefined configuration
# and can be launched. If it is False then the returned str contains an
# error message.
# Remember that variables can exist in the specification too!
# The full set of step variables can be obtained
# (in descending order of priority) from...
#
# Remember that variables can exist in the specification too.
# So, the full set of step variables (in ascending order of priority)
# is...
# 1. The RunningWorkflow
# 2. The Workflow
# 3. The Job Specification
#
# 1. The specification
# 2. The workflow
# 3. The RunningWorkflow
# If variable 'x' is defined in all three then the RunningWorkflow's
# value must be used.

all_variables = step_spec.pop("variables") if "variables" in step_spec else {}
if workflow_variables:
Expand Down Expand Up @@ -380,7 +411,7 @@ def _launch(
if lr.error_num:
self._set_step_error(step_name, rwf_id, rwfs_id, lr.error_num, lr.error_msg)
else:
_LOGGER.info("Launched first step '%s' (command=%s)", step_name, lr.command)
_LOGGER.info("Launched step '%s' (command=%s)", step_name, lr.command)

def _set_step_error(
self,
Expand Down