From 55fb6606d7add7ec08c108499d5938ab30d9500c Mon Sep 17 00:00:00 2001 From: Alan Christie Date: Thu, 20 Mar 2025 15:43:27 +0100 Subject: [PATCH] docs: Significant doc update --- workflow/workflow_engine.py | 111 +++++++++++++++++++++++------------- 1 file changed, 71 insertions(+), 40 deletions(-) diff --git a/workflow/workflow_engine.py b/workflow/workflow_engine.py index 9128d05..d7c7f43 100644 --- a/workflow/workflow_engine.py +++ b/workflow/workflow_engine.py @@ -97,9 +97,14 @@ def _handle_workflow_message(self, msg: WorkflowMessage) -> None: ) def _handle_workflow_start_message(self, r_wfid: str) -> None: - """Logic to handle a START message. Here we use the running workflow - (and workflow) to find the first step in the workflow and launch it, passing - the running workflow variables to the launcher.""" + """Logic to handle a START message. This is the beginning of a new + running workflow. We use the running workflow (and workflow) to find the + first step in the Workflow and launch it, passing the running workflow variables + to the launcher. + + The first step is relatively easy (?) - all the variables + (for the first step's 'command') will (must) be defined + in the RunningWorkflow's variables.""" rwf_response, _ = self._wapi_adapter.get_running_workflow( running_workflow_id=r_wfid @@ -131,22 +136,34 @@ def _handle_workflow_start_message(self, r_wfid: str) -> None: assert "id" in response r_wfsid: str = response["id"] + # Launch the first step. + # If there's a launch problem the step (and running workflow) will have + # and error, stopping it. There will be no Pod event as the launch has failed. self._launch(wf=wf_response, rwf=rwf_response, rwfs_id=r_wfsid, step=first_step) def _handle_pod_message(self, msg: PodMessage) -> None: """Handles a PodMessage. This is a message that signals the completion of a - step within a workflow. Steps run as "instances" and the Pod message - identifies the Instance. Using the Instance record we can get the - "running workflow step" and then identify the "running workflow" and the - "workflow". + prior step Job within an existing running workflow. + + Steps run as "instances" and the Pod message identifies the Instance. + Using the Instance record we can get the "running workflow step", + and then identify the "running workflow" and the "workflow". First thing is to adjust the workflow step with the step's success state and - optional error code. If the step was successful we can find the next step - and launch that, or consider the last step to have run and modify the - running workflow record and set's it's success status.""" + optional error code. If the step was successful, armed with the step's + Workflow we can determine what needs to be done next - + is this the end or is there another step to run? + + If there's another step to run we must determine what variables are + available and present them to the next step. It doesn't matter if we + provide variables the next step's command does not need, but we MUST + provide all the variables that the next step's command does need. + + We also have a 'housekeeping' responsibility - i.e. to keep the + RunningWorkflowStep and RunningWorkflow status up to date.""" assert msg - # The PodMessage has a 'instance', 'has_exit_code', and 'exit_code' values. + # The PodMessage has an 'instance', 'has_exit_code', and 'exit_code' values. _LOGGER.info("PodMessage:\n%s", str(msg)) # ALL THIS CODE ADDED SIMPLY TO DEMONSTRATE THE USE OF THE API ADAPTER @@ -161,6 +178,8 @@ def _handle_pod_message(self, msg: PodMessage) -> None: _LOGGER.error("PodMessage has no exit code") return + # The Instance tells us whether the Step (Job) was successful + # (i.e. we can simply check the 'exit_code'). instance_id: str = msg.instance exit_code: int = msg.exit_code response, _ = self._wapi_adapter.get_instance(instance_id=instance_id) @@ -189,6 +208,8 @@ def _handle_pod_message(self, msg: PodMessage) -> None: "API.get_running_workflow(%s) returned: -\n%s", r_wfid, str(rwf_response) ) + # If the Step failed there's no need for us to inspect the Workflow + # (for the next step) as we simply stop here, reporting the appropriate status). if exit_code: # The job was launched but it failed. # Set a step error, @@ -196,7 +217,9 @@ def _handle_pod_message(self, msg: PodMessage) -> None: self._set_step_error(step_name, r_wfid, r_wfsid, exit_code, "Job failed") return - # The prior step completed successfully if we get here. + # If we get here the prior step completed successfully + # and so we can mark the Step as DOne (successfully), + # and then inspect the Workflow to determine the next step. self._wapi_adapter.set_running_workflow_step_done( running_workflow_step_id=r_wfsid, @@ -207,14 +230,15 @@ def _handle_pod_message(self, msg: PodMessage) -> None: wf_response, _ = self._wapi_adapter.get_workflow(workflow_id=wfid) _LOGGER.debug("API.get_workflow(%s) returned: -\n%s", wfid, str(wf_response)) - # Given the step for the instance just finished (successfully), - # find the next step n the workflow - # (using the name of the prior step as an index) - # and launch it. + # We have the step from the Instance that's just finished, + # so we can use that to find the next step in the Workflow definition. + # (using the name of the completed step step as an index). + # Once found, we can launch it (with any variables we think we need). # - # If there are no more steps then the workflow is done. + # If there are no more steps then the RunningWorkflow is set to + # finished (done). - lr: Optional[LaunchResult] = None + launch_attempted: bool = False for step in wf_response["steps"]: if step["name"] == step_name: step_index = wf_response["steps"].index(step) @@ -245,14 +269,14 @@ def _handle_pod_message(self, msg: PodMessage) -> None: step=next_step, ) - # Something was started (or there was a launch error). + # Something was started (or there was a launch error and the step + # and running workflow error will have been set). + # Regardless we can stop now. + launch_attempted = True break - # If there's no launch result this must be the (successful) end of the workflow. - # If there is a launch result it was either successful - # (and not the end of the workflow) or unsuccessful - # (and the workflow will have been marked as done anyway). - if lr is None: + # If no launch was attempted we can assume this is the end of the running workflow. + if not launch_attempted: self._wapi_adapter.set_running_workflow_done( running_workflow_id=r_wfid, success=True, @@ -289,28 +313,35 @@ def _validate_step_command( # The step's 'specification' is a string - pass it directly to the # launcher along with any (optional) 'workflow variables'. The launcher - # will apply the variables to step's Job command but we need to handle + # will apply the variables to the step's Job command but we need to handle # any launch problems. The validator should have checked to ensure that # variable expansion will work, but we must prepare for the unexpected. # - # What the engine has to do here is make sure that the definition + # What the engine has to do here is make sure that the Job # that's about to be launched has all its configuration requirements - # satisfied (inputs, outputs and options). Basically the - # command must be successfully rendered with what we have. + # satisfied (inputs, outputs and options). Basically we must ensure + # that the Job definition's 'command' can be compiled by applying + # the available variables. + # + # To prevent launcher errors relating to decoding we get the command ourselves + # and then apply the current set of variables. And we use the JobDecoder's + # 'decode()' method to do this. It returns a tuple (str and boolean). + # If the boolean is True then the command can be compiled + # (i.e. it has no missing variables) and the launcher should not complain + # about the command (as we'll pass the same variables to it). + # If the returned boolean is False then we can expect the returned str + # to contain an error message. # - # To do this we give the command and our variables - # to the Job Decoder's 'decode()' method. It returns a tuple (str and boolean). - # If the boolean is True then the command has no undefined configuration - # and can be launched. If it is False then the returned str contains an - # error message. + # Remember that variables can exist in the specification too! + # The full set of step variables can be obtained + # (in descending order of priority) from... # - # Remember that variables can exist in the specification too. - # So, the full set of step variables (in ascending order of priority) - # is... + # 1. The RunningWorkflow + # 2. The Workflow + # 3. The Job Specification # - # 1. The specification - # 2. The workflow - # 3. The RunningWorkflow + # If variable 'x' is defined in all three then the RunningWorkflow's + # value must be used. all_variables = step_spec.pop("variables") if "variables" in step_spec else {} if workflow_variables: @@ -380,7 +411,7 @@ def _launch( if lr.error_num: self._set_step_error(step_name, rwf_id, rwfs_id, lr.error_num, lr.error_msg) else: - _LOGGER.info("Launched first step '%s' (command=%s)", step_name, lr.command) + _LOGGER.info("Launched step '%s' (command=%s)", step_name, lr.command) def _set_step_error( self,