Skip to content

Commit 27bc0e7

Browse files
author
Alan Christie
committed
feat: Better engine (documentation and flow)
1 parent 0a4d5b2 commit 27bc0e7

File tree

1 file changed

+164
-106
lines changed

1 file changed

+164
-106
lines changed

workflow/workflow_engine.py

Lines changed: 164 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import logging
2626
import sys
27+
from typing import Any, Dict, Optional
2728

2829
from google.protobuf.message import Message
2930
from informaticsmatters.protobuf.datamanager.pod_message_pb2 import PodMessage
@@ -75,64 +76,69 @@ def _handle_workflow_message(self, msg: WorkflowMessage) -> None:
7576
step (or steps) to launch (run) first."""
7677
assert msg
7778

78-
# ALL THIS CODE ADDED SIMPLY TO DEMONSTRATE THE USE OF THE API ADAPTER
79-
# AND THE INSTANCE LAUNCHER FOR THE SIMPLEST OF WORKFLOWS: -
80-
# THE "TWO-STEP NOP".
81-
# THERE IS NWO SPECIFICATION MANIPULATION NEEDED FOR THIS EXAMPLE
82-
# THE STEPS HAVE NO INPUTS OR OUTPUTS.
83-
# THIS FUNCTION PROBABLY NEEDS TO BE A LOT MORE SOPHISTICATED!
79+
_LOGGER.debug("WorkflowMessage:\n%s", str(msg))
80+
assert msg.action in ["START", "STOP"]
8481

85-
_LOGGER.debug("WE> WorkflowMessage:\n%s", str(msg))
86-
87-
action = msg.action
8882
r_wfid = msg.running_workflow
89-
90-
assert action in ["START", "STOP"]
91-
if action == "START":
92-
# Using the running workflow...
93-
response = self._api_adapter.get_running_workflow(
94-
running_workflow_id=r_wfid
95-
)
96-
assert "running_workflow" in response
97-
running_workflow = response["running_workflow"]
98-
_LOGGER.debug("RunningWorkflow: %s", running_workflow)
99-
# ...get the workflow definition...
100-
workflow_id = running_workflow["workflow"]["id"]
101-
response = self._api_adapter.get_workflow(workflow_id=workflow_id)
102-
assert "workflow" in response
103-
workflow = response["workflow"]
104-
105-
# Now find the first step and create a RunningWorkflowStep record...
106-
first_step: str = workflow["steps"][0]["name"]
107-
response = self._api_adapter.create_running_workflow_step(
108-
running_workflow_id=r_wfid,
109-
step=first_step,
83+
if msg.action == "START":
84+
self._handle_workflow_start_message(r_wfid)
85+
else:
86+
# STOP is not implemented yet and probably not for some time.
87+
# So just log and ignore for now!
88+
_LOGGER.warning(
89+
"Got STOP action for %s - but it's not implemented yet!", r_wfid
11090
)
111-
assert "id" in response
112-
running_workflow_step_id = response["id"]
113-
114-
# The step's 'specification' is a string here - pass it directly to the
115-
# launcher along with any appropriate 'variables'. The launcher
116-
# will get the step's Job command and apply the variables to it to
117-
# form the command that will be executed for the step.
118-
step = workflow["steps"][0]
119-
project_id = running_workflow["project_id"]
120-
variables = running_workflow["variables"]
121-
lr: LaunchResult = self._instance_launcher.launch(
122-
project_id=project_id,
123-
running_workflow_id=msg.running_workflow,
124-
running_workflow_step_id=running_workflow_step_id,
125-
step_specification=step["specification"],
126-
variables=variables,
91+
92+
def _handle_workflow_start_message(self, r_wfid: str) -> None:
93+
"""Logic to handle a START message. Here we use the running workflow
94+
(and workflow) to find the first step in the workflow and launch it, passing
95+
the running workflow variables to the launcher."""
96+
97+
response = self._api_adapter.get_running_workflow(running_workflow_id=r_wfid)
98+
assert "running_workflow" in response
99+
running_workflow = response["running_workflow"]
100+
_LOGGER.debug("RunningWorkflow: %s", running_workflow)
101+
# Now get the workflow definition (to get all the steps)
102+
wfid = running_workflow["workflow"]["id"]
103+
response = self._api_adapter.get_workflow(workflow_id=wfid)
104+
assert "workflow" in response
105+
workflow = response["workflow"]
106+
107+
# Now find the first step,
108+
# and create a corresponding RunningWorkflowStep record...
109+
first_step: Dict[str, Any] = workflow["steps"][0]
110+
first_step_name: str = first_step["name"]
111+
response = self._api_adapter.create_running_workflow_step(
112+
running_workflow_id=r_wfid,
113+
step=first_step_name,
114+
)
115+
assert "id" in response
116+
r_wfsid = response["id"]
117+
118+
# The step's 'specification' is a string - pass it directly to the
119+
# launcher along with any appropriate 'variables'. The launcher
120+
# will apply the variables to step's Job command but we need to handle
121+
# any launch problems. The validator should have checked to ensure that
122+
# variable expansion will work, but we must prepare for the unexpected.
123+
124+
project_id = running_workflow["project_id"]
125+
variables = running_workflow["variables"]
126+
lr: LaunchResult = self._instance_launcher.launch(
127+
project_id=project_id,
128+
running_workflow_id=r_wfid,
129+
running_workflow_step_id=r_wfsid,
130+
step_specification=first_step["specification"],
131+
variables=variables,
132+
)
133+
if lr.error:
134+
self._set_step_error(
135+
first_step_name, r_wfid, r_wfsid, lr.error, lr.error_msg
127136
)
128-
assert lr.error == 0
137+
else:
129138
_LOGGER.info(
130-
"Launched initial step: %s (command=%s)", first_step, lr.command
139+
"Launched first step '%s' (command=%s)", first_step_name, lr.command
131140
)
132141

133-
else:
134-
_LOGGER.info("action=%s", msg.action)
135-
136142
def _handle_pod_message(self, msg: PodMessage) -> None:
137143
"""Handles a PodMessage. This is a message that signals the completion of a
138144
step within a workflow. Steps run as "instances" and the Pod message
@@ -147,89 +153,141 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
147153
assert msg
148154

149155
# The PodMessage has a 'instance', 'has_exit_code', and 'exit_code' values.
150-
_LOGGER.debug("WE> PodMessage:\n%s", str(msg))
156+
_LOGGER.debug("PodMessage:\n%s", str(msg))
151157

152158
# ALL THIS CODE ADDED SIMPLY TO DEMONSTRATE THE USE OF THE API ADAPTER
153159
# AND THE INSTANCE LAUNCHER FOR THE SIMPLEST OF WORKFLOWS: -
154160
# THE "TWO-STEP NOP".
155-
# THERE IS NWO SPECIFICATION MANIPULATION NEEDED FOR THIS EXAMPLE
161+
# THERE IS NO SPECIFICATION MANIPULATION NEEDED FOR THIS EXAMPLE
156162
# THE STEPS HAVE NO INPUTS OR OUTPUTS.
157163
# THIS FUNCTION PROBABLY NEEDS TO BE A LOT MORE SOPHISTICATED!
158164

159165
# Ignore anything without an exit code.
160166
if not msg.has_exit_code:
161-
_LOGGER.warning("WE> PodMessage: No exit code")
167+
_LOGGER.error("PodMessage has no exit code")
162168
return
163169

164170
instance_id: str = msg.instance
165171
exit_code: int = msg.exit_code
166-
_LOGGER.debug(
167-
"WE> PodMessage: instance=%s, exit_code=%d", instance_id, exit_code
168-
)
169172
response = self._api_adapter.get_instance(instance_id=instance_id)
170-
running_workflow_step_id: str = response["running_workflow_step"]
173+
r_wfsid: str = response["running_workflow_step"]
171174
response = self._api_adapter.get_running_workflow_step(
172-
running_workflow_step_id=running_workflow_step_id
175+
running_workflow_step_id=r_wfsid
173176
)
174177
step_name: str = response["running_workflow_step"]["step"]
175178

176-
# Set the step as completed (successful or otherwise)
177-
success: bool = exit_code == 0
179+
# Get the step's running workflow record.
180+
r_wfid = response["running_workflow_step"]["running_workflow"]
181+
assert r_wfid
182+
response = self._api_adapter.get_running_workflow(running_workflow_id=r_wfid)
183+
184+
if exit_code:
185+
# The job was launched but it failed.
186+
# Set a step error,
187+
# This will also set a workflow error so we can leave.
188+
self._set_step_error(step_name, r_wfid, r_wfsid, exit_code, "Job failed")
189+
return
190+
191+
# The prior step completed successfully if we get here.
192+
178193
self._api_adapter.set_running_workflow_step_done(
179-
running_workflow_step_id=running_workflow_step_id,
180-
success=success,
194+
running_workflow_step_id=r_wfsid,
195+
success=True,
181196
)
182197

183-
# Get the step's running workflow and workflow IDs and records.
184-
running_workflow_id = response["running_workflow_step"]["running_workflow"]
185-
assert running_workflow_id
186-
response = self._api_adapter.get_running_workflow(
187-
running_workflow_id=running_workflow_id
188-
)
189198
running_workflow = response["running_workflow"]
190-
workflow_id = running_workflow["workflow"]["id"]
191-
assert workflow_id
192-
response = self._api_adapter.get_workflow(workflow_id=workflow_id)
193-
194-
end_of_workflow: bool = True
195-
if success:
196-
# Given the step for the instance just finished,
197-
# find the next step in the workflow and launch it.
198-
workflow = response["workflow"]
199-
for step in workflow["steps"]:
200-
if step["name"] == step_name:
201-
step_index = workflow["steps"].index(step)
202-
if step_index + 1 < len(workflow["steps"]):
203-
next_step = workflow["steps"][step_index + 1]
204-
response = self._api_adapter.create_running_workflow_step(
205-
running_workflow_id=running_workflow_id,
206-
step=next_step["name"],
207-
)
208-
assert "id" in response
209-
running_workflow_step_id = response["id"]
210-
project_id = running_workflow["project_id"]
211-
variables = running_workflow["variables"]
212-
lr: LaunchResult = self._instance_launcher.launch(
213-
project_id=project_id,
214-
running_workflow_id=running_workflow_id,
215-
running_workflow_step_id=running_workflow_step_id,
216-
step_specification=next_step["specification"],
217-
variables=variables,
199+
wfid = running_workflow["workflow"]["id"]
200+
assert wfid
201+
response = self._api_adapter.get_workflow(workflow_id=wfid)
202+
workflow = response["workflow"]
203+
204+
# Given the step for the instance just finished (successfully),
205+
# find the next step n the workflow
206+
# (using the name of the prior step as an index)
207+
# and launch it.
208+
#
209+
# If there are no more steps then the workflow is done.
210+
211+
lr: Optional[LaunchResult] = None
212+
for step in workflow["steps"]:
213+
if step["name"] == step_name:
214+
step_index = workflow["steps"].index(step)
215+
if step_index + 1 < len(workflow["steps"]):
216+
217+
# There's another step - for this simple logic it is the next step.
218+
219+
next_step = workflow["steps"][step_index + 1]
220+
next_step_name = next_step["name"]
221+
response = self._api_adapter.create_running_workflow_step(
222+
running_workflow_id=r_wfid,
223+
step=next_step_name,
224+
)
225+
assert "id" in response
226+
r_wfsid = response["id"]
227+
project_id = running_workflow["project_id"]
228+
variables = running_workflow["variables"]
229+
lr = self._instance_launcher.launch(
230+
project_id=project_id,
231+
running_workflow_id=r_wfid,
232+
running_workflow_step_id=r_wfsid,
233+
step_specification=next_step["specification"],
234+
variables=variables,
235+
)
236+
# Handle a launch error?
237+
if lr.error:
238+
self._set_step_error(
239+
next_step_name,
240+
r_wfid,
241+
r_wfsid,
242+
lr.error,
243+
lr.error_msg,
218244
)
219-
end_of_workflow = False
220-
assert lr.error == 0
245+
else:
221246
_LOGGER.info(
222247
"Launched step: %s (command=%s)",
223248
next_step["name"],
224249
lr.command,
225250
)
226-
break
227251

228-
# If there are no more steps then the workflow is done
229-
# so we need to set the running workflow as done
230-
# and set its success status too.
231-
if end_of_workflow:
252+
# Something was started (or there was a launch error).
253+
break
254+
255+
# If there's no launch result this must the (successful) end of the workflow.
256+
# If there is a launch result it was either successful
257+
# (and not the end of the workflow) or unsuccessful
258+
# (and the workflow will have been marked as done anyway).
259+
if lr is None:
232260
self._api_adapter.set_running_workflow_done(
233-
running_workflow_id=running_workflow_id,
234-
success=success,
261+
running_workflow_id=r_wfid,
262+
success=True,
235263
)
264+
265+
def _set_step_error(
266+
self,
267+
step_name: str,
268+
r_wfid: str,
269+
r_wfsid: str,
270+
error: Optional[int],
271+
error_msg: Optional[str],
272+
) -> None:
273+
"""Set the error state for a running workflow step (and the running workflow).
274+
Calling this method essentially 'ends' the running workflow."""
275+
_LOGGER.warning(
276+
"Failed to launch step '%s' (error=%d error_msg=%s)",
277+
step_name,
278+
error,
279+
error_msg,
280+
)
281+
self._api_adapter.set_running_workflow_step_done(
282+
running_workflow_step_id=r_wfsid,
283+
success=False,
284+
error=error,
285+
error_msg=error_msg,
286+
)
287+
# We must also set the running workflow as done (failed)
288+
self._api_adapter.set_running_workflow_done(
289+
running_workflow_id=r_wfid,
290+
success=False,
291+
error=error,
292+
error_msg=error_msg,
293+
)

0 commit comments

Comments
 (0)