17
17
18
18
class WorkflowEngine :
19
19
"""The workflow engine. An event-driven engine that manages the execution
20
- of workflow instances. The engine is responsible for launching instances or
21
- reporting failures and conclusions.
20
+ of validated workflow definitions.
22
21
"""
23
22
24
23
def __init__ (
@@ -32,20 +31,24 @@ def __init__(
32
31
self ._instance_launcher = instance_launcher
33
32
34
33
def handle_message (self , msg : Message ) -> None :
35
- """Given a Pod Message, we use it to identify the Pod (Instance) exit code,
36
- workflow and step and decide what to do next.
34
+ """Expect Pod and Workflow messages.
35
+
36
+ WorkflowMessages signal the need to start (or stop) validated workflows -
37
+ i.e. take a workflow and create a running workflows (and its steps).
38
+
39
+ PodMessages signal the completion of a Pod (an Instance/Job) that is part
40
+ of a running workflow. Given a Pod Message, we use it to identify the
41
+ Instance exit code, running workflow, and step and decide what to do next,
42
+ which might be to run the next step in the workflow or abandon the workflow.
37
43
38
44
Only pod messages relating to workflow instances will be delivered to this method.
39
45
The Pod message has an 'instance' property that provides the UUID of
40
- the instance that was run. This can be used to correlate the instance with the
46
+ the instance that was run. This is used to correlate the instance with the
41
47
running workflow step.
42
-
43
- Additionally we will encounter WorkflowMessages that signal the need to
44
- start and stop workflows.
45
48
"""
46
49
assert msg
47
50
48
- _LOGGER .info ("> WE.handle_message() : GOT WorkflowMessage:\n %s" , str (msg ))
51
+ _LOGGER .debug ("> WE.handle_message() : GOT WorkflowMessage:\n %s" , str (msg ))
49
52
50
53
# Is this a WorkflowMessage or a PodMessage?
51
54
if isinstance (msg , PodMessage ):
@@ -66,7 +69,8 @@ def _handle_workflow_message(self, msg: WorkflowMessage) -> None:
66
69
# THE STEPS HAVE NO INPUTS OR OUTPUTS.
67
70
# THIS FUNCTION PROBABLY NEEDS TO BE A LOT MORE SOPHISTICATED!
68
71
69
- _LOGGER .info ("WE> WorkflowMessage:\n %s" , str (msg ))
72
+ _LOGGER .debug ("WE> WorkflowMessage:\n %s" , str (msg ))
73
+
70
74
if msg .action == "START" :
71
75
# Using the running workflow get the workflow definition
72
76
response = self ._api_adapter .get_running_workflow (
@@ -75,11 +79,11 @@ def _handle_workflow_message(self, msg: WorkflowMessage) -> None:
75
79
assert "running_workflow" in response
76
80
running_workflow = response ["running_workflow" ]
77
81
_LOGGER .info ("RunningWorkflow: %s" , running_workflow )
78
- project_id = running_workflow [ "project_id" ]
82
+
79
83
workflow_id = running_workflow ["workflow" ]["id" ]
80
- variables = running_workflow ["variables" ]
81
84
response = self ._api_adapter .get_workflow (workflow_id = workflow_id )
82
85
assert "workflow" in response
86
+
83
87
workflow = response ["workflow" ]
84
88
# Now find the first step
85
89
# and create a RunningWorkflowStep record prior to launching the instance
@@ -92,6 +96,8 @@ def _handle_workflow_message(self, msg: WorkflowMessage) -> None:
92
96
# The step specification is a string here - pass it directly to the launcher
93
97
# which will get the Job command and apply the provided variables to it.
94
98
step = workflow ["steps" ][0 ]
99
+ project_id = running_workflow ["project_id" ]
100
+ variables = running_workflow ["variables" ]
95
101
self ._instance_launcher .launch (
96
102
project_id = project_id ,
97
103
running_workflow_id = msg .running_workflow ,
@@ -117,7 +123,7 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
117
123
assert msg
118
124
119
125
# The PodMessage has a 'instance', 'has_exit_code', and 'exit_code' values.
120
- _LOGGER .info ("WE> PodMessage:\n %s" , str (msg ))
126
+ _LOGGER .debug ("WE> PodMessage:\n %s" , str (msg ))
121
127
122
128
# ALL THIS CODE ADDED SIMPLY TO DEMONSTRATE THE USE OF THE API ADAPTER
123
129
# AND THE INSTANCE LAUNCHER FOR THE SIMPLEST OF WORKFLOWS: -
@@ -133,7 +139,7 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
133
139
134
140
instance_id : str = msg .instance
135
141
exit_code : int = msg .exit_code
136
- _LOGGER .info (
142
+ _LOGGER .debug (
137
143
"WE> PodMessage: instance=%s, exit_code=%d" , instance_id , exit_code
138
144
)
139
145
@@ -161,9 +167,8 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
161
167
response = self ._api_adapter .get_running_workflow (
162
168
running_workflow_id = running_workflow_id
163
169
)
164
- project_id = response ["running_workflow" ]["project_id" ]
165
- workflow_id = response ["running_workflow" ]["workflow" ]["id" ]
166
- variables = response ["running_workflow" ]["variables" ]
170
+ running_workflow = response ["running_workflow" ]
171
+ workflow_id = running_workflow ["workflow" ]["id" ]
167
172
assert workflow_id
168
173
response = self ._api_adapter .get_workflow (workflow_id = workflow_id )
169
174
@@ -186,6 +191,8 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
186
191
)
187
192
assert "id" in response
188
193
running_workflow_step_id = response ["id" ]
194
+ project_id = running_workflow ["project_id" ]
195
+ variables = running_workflow ["variables" ]
189
196
self ._instance_launcher .launch (
190
197
project_id = project_id ,
191
198
running_workflow_id = running_workflow_id ,
0 commit comments