Skip to content

Commit e88b36b

Browse files
author
Alan Christie
committed
feat: Major launch refactoring - now gets job and validates variables
1 parent 5962207 commit e88b36b

File tree

2 files changed

+141
-75
lines changed

2 files changed

+141
-75
lines changed

.pre-commit-config.yaml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,9 @@ repos:
7171
name: pylint
7272
entry: pylint
7373
additional_dependencies:
74-
- jinja2==3.0.3
75-
- jsonschema >= 3.2.0, < 4.0
74+
- jinja2 >= 3.1.3
75+
- jsonschema >= 4.21.1
7676
- pyyaml >= 5.3.1, < 7.0
77-
- im-protobuf >= 7.0.0
77+
- im-protobuf >= 8.2.0
78+
- im-data-manager-job-decoder >= 2.1.0
7879
files: ^workflow/.*\.py$

workflow/workflow_engine.py

Lines changed: 137 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import sys
2828
from typing import Any, Dict, Optional
2929

30+
from decoder.decoder import TextEncoding, decode
3031
from google.protobuf.message import Message
3132
from informaticsmatters.protobuf.datamanager.pod_message_pb2 import PodMessage
3233
from informaticsmatters.protobuf.datamanager.workflow_message_pb2 import WorkflowMessage
@@ -107,7 +108,6 @@ def _handle_workflow_start_message(self, r_wfid: str) -> None:
107108
"API.get_running_workflow(%s) returned: -\n%s", r_wfid, str(rwf_response)
108109
)
109110
assert "running_user" in rwf_response
110-
launching_user_name: str = rwf_response["running_user"]
111111
# Now get the workflow definition (to get all the steps)
112112
wfid = rwf_response["workflow"]["id"]
113113
wf_response, _ = self._wapi_adapter.get_workflow(workflow_id=wfid)
@@ -117,6 +117,7 @@ def _handle_workflow_start_message(self, r_wfid: str) -> None:
117117
# and create a corresponding RunningWorkflowStep record...
118118
first_step: Dict[str, Any] = wf_response["steps"][0]
119119
first_step_name: str = first_step["name"]
120+
# We need this even if the following goes wrong.
120121
response, _ = self._wapi_adapter.create_running_workflow_step(
121122
running_workflow_id=r_wfid,
122123
step=first_step_name,
@@ -128,48 +129,9 @@ def _handle_workflow_start_message(self, r_wfid: str) -> None:
128129
str(response),
129130
)
130131
assert "id" in response
131-
r_wfsid = response["id"]
132+
r_wfsid: str = response["id"]
132133

133-
# The step's 'specification' is a string - pass it directly to the
134-
# launcher along with any (optional) 'variables'. The launcher
135-
# will apply the variables to step's Job command but we need to handle
136-
# any launch problems. The validator should have checked to ensure that
137-
# variable expansion will work, but we must prepare for the unexpected.
138-
139-
project_id = rwf_response["project"]["id"]
140-
variables: dict[str, Any] | None = rwf_response.get("variables")
141-
142-
_LOGGER.info(
143-
"Launching first step: RunningWorkflow=%s RunningWorkflowStep=%s step=%s"
144-
" (name=%s project=%s, variables=%s)",
145-
r_wfid,
146-
r_wfsid,
147-
first_step_name,
148-
rwf_response["name"],
149-
project_id,
150-
variables,
151-
)
152-
153-
lp: LaunchParameters = LaunchParameters(
154-
project_id=project_id,
155-
name=first_step_name,
156-
debug=rwf_response.get("debug"),
157-
launching_user_name=launching_user_name,
158-
launching_user_api_token=rwf_response["running_user_api_token"],
159-
specification=json.loads(first_step["specification"]),
160-
specification_variables=variables,
161-
running_workflow_id=r_wfid,
162-
running_workflow_step_id=r_wfsid,
163-
)
164-
lr: LaunchResult = self._instance_launcher.launch(launch_parameters=lp)
165-
if lr.error_num:
166-
self._set_step_error(
167-
first_step_name, r_wfid, r_wfsid, lr.error_num, lr.error_msg
168-
)
169-
else:
170-
_LOGGER.info(
171-
"Launched first step '%s' (command=%s)", first_step_name, lr.command
172-
)
134+
self._launch(wf=wf_response, rwf=rwf_response, rwfs_id=r_wfsid, step=first_step)
173135

174136
def _handle_pod_message(self, msg: PodMessage) -> None:
175137
"""Handles a PodMessage. This is a message that signals the completion of a
@@ -266,43 +228,22 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
266228
running_workflow_id=r_wfid,
267229
step=next_step_name,
268230
)
231+
assert "id" in rwfs_response
232+
r_wfsid = rwfs_response["id"]
233+
assert r_wfsid
269234
_LOGGER.debug(
270235
"API.create_running_workflow_step(%s, %s) returned: -\n%s",
271236
r_wfid,
272237
next_step_name,
273238
str(response),
274239
)
275-
assert "id" in rwfs_response
276-
new_r_wfsid: str = rwfs_response["id"]
277-
project_id: str = rwf_response["project"]["id"]
278-
variables: dict[str, Any] | None = rwf_response.get("variables")
279-
lp: LaunchParameters = LaunchParameters(
280-
project_id=project_id,
281-
name=next_step_name,
282-
debug=rwf_response.get("debug"),
283-
launching_user_name=rwf_response["running_user"],
284-
launching_user_api_token=rwf_response["running_user_api_token"],
285-
specification=json.loads(next_step["specification"]),
286-
specification_variables=variables,
287-
running_workflow_id=r_wfid,
288-
running_workflow_step_id=new_r_wfsid,
240+
241+
self._launch(
242+
wf=wf_response,
243+
rwf=rwf_response,
244+
rwfs_id=r_wfsid,
245+
step=next_step,
289246
)
290-
lr = self._instance_launcher.launch(launch_parameters=lp)
291-
# Handle a launch error?
292-
if lr.error_num:
293-
self._set_step_error(
294-
next_step_name,
295-
r_wfid,
296-
new_r_wfsid,
297-
lr.error_num,
298-
lr.error_msg,
299-
)
300-
else:
301-
_LOGGER.info(
302-
"Launched step: %s (command=%s)",
303-
next_step["name"],
304-
lr.command,
305-
)
306247

307248
# Something was started (or there was a launch error).
308249
break
@@ -317,6 +258,130 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
317258
success=True,
318259
)
319260

261+
def _validate_step_command(
262+
self,
263+
*,
264+
step: dict[str, Any],
265+
workflow_variables: dict[str, Any] | None,
266+
running_workflow_variables: dict[str, Any] | None = None,
267+
) -> str | dict[str, Any]:
268+
"""Returns an error message if the command isn't valid.
269+
Without a message we return all the variables that were (successfully)
270+
applied to the command."""
271+
# We get the Job from the step specification, which must contain
272+
# the keys "collection", "job", and "version". Here we assume that
273+
# the workflow definition has passed the RUN-level validation
274+
# which means we can get these values.
275+
step_spec: dict[str, Any] = json.loads(step["specification"])
276+
job_collection: str = step_spec["collection"]
277+
job_job: str = step_spec["job"]
278+
job_version: str = step_spec["version"]
279+
job, _ = self._wapi_adapter.get_job(
280+
collection=job_collection, job=job_job, version=job_version
281+
)
282+
_LOGGER.debug(
283+
"API.get_job(%s, %s, %s) returned: -\n%s",
284+
job_collection,
285+
job_job,
286+
job_version,
287+
str(job),
288+
)
289+
290+
# The step's 'specification' is a string - pass it directly to the
291+
# launcher along with any (optional) 'workflow variables'. The launcher
292+
# will apply the variables to step's Job command but we need to handle
293+
# any launch problems. The validator should have checked to ensure that
294+
# variable expansion will work, but we must prepare for the unexpected.
295+
#
296+
# What the engine has to do here is make sure that the definition
297+
# that's about to be launched has all its configuration requirements
298+
# satisfied (inputs, outputs and options). Basically the
299+
# command must be successfully rendered with what we have.
300+
#
301+
# To do this we give the command and our variables
302+
# to the Job Decoder's 'decode()' method. It returns a tuple (str and boolean).
303+
# If the boolean is True then the command has no undefined configuration
304+
# and can be launched. If it is False then the returned str contains an
305+
# error message.
306+
#
307+
# Remember that variables can exist in (ascending order of priority): -
308+
# 1. The specification
309+
# 2. The workflow
310+
# 2. The RunningWorkflow
311+
312+
all_variables: dict[str, Any] = {}
313+
if "variables" in step_spec:
314+
all_variables = step_spec.pop("variables")
315+
if workflow_variables:
316+
all_variables = all_variables | workflow_variables
317+
if running_workflow_variables:
318+
all_variables = all_variables | running_workflow_variables
319+
message, success = decode(
320+
job["command"], all_variables, "command", TextEncoding.JINJA2_3_0
321+
)
322+
323+
return all_variables if success else message
324+
325+
def _launch(
326+
self,
327+
*,
328+
wf: dict[str, Any],
329+
rwf: dict[str, Any],
330+
rwfs_id: str,
331+
step: dict[str, Any],
332+
) -> None:
333+
step_name: str = step["name"]
334+
rwf_id: str = rwf["id"]
335+
336+
_LOGGER.info("Validating step command: %s (step=%s)...", rwf_id, step_name)
337+
338+
# Now check the step command can be executed (by decoding it)
339+
workflow_variables: dict[str, Any] | None = wf.get("variables")
340+
running_workflow_variables: dict[str, Any] | None = rwf.get("variables")
341+
error_or_variables: str | dict[str, Any] = self._validate_step_command(
342+
step=step,
343+
workflow_variables=workflow_variables,
344+
running_workflow_variables=running_workflow_variables,
345+
)
346+
if isinstance(error_or_variables, str):
347+
error_msg = error_or_variables
348+
_LOGGER.warning(
349+
"First step '%s' failed command validation (%s)", step_name, error_msg
350+
)
351+
self._set_step_error(step_name, rwf_id, rwfs_id, 1, error_msg)
352+
return
353+
354+
project_id = rwf["project"]["id"]
355+
variables: dict[str, Any] = error_or_variables
356+
357+
_LOGGER.info(
358+
"Launching first step: RunningWorkflow=%s RunningWorkflowStep=%s step=%s"
359+
" (name=%s project=%s, variables=%s)",
360+
rwf_id,
361+
rwfs_id,
362+
step_name,
363+
rwf["name"],
364+
project_id,
365+
variables,
366+
)
367+
368+
lp: LaunchParameters = LaunchParameters(
369+
project_id=project_id,
370+
name=step_name,
371+
debug=rwf.get("debug"),
372+
launching_user_name=rwf["running_user"],
373+
launching_user_api_token=rwf["running_user_api_token"],
374+
specification=json.loads(step["specification"]),
375+
specification_variables=variables,
376+
running_workflow_id=rwf_id,
377+
running_workflow_step_id=rwfs_id,
378+
)
379+
lr: LaunchResult = self._instance_launcher.launch(launch_parameters=lp)
380+
if lr.error_num:
381+
self._set_step_error(step_name, rwf_id, rwfs_id, lr.error_num, lr.error_msg)
382+
else:
383+
_LOGGER.info("Launched first step '%s' (command=%s)", step_name, lr.command)
384+
320385
def _set_step_error(
321386
self,
322387
step_name: str,

0 commit comments

Comments
 (0)