Skip to content

Commit 7620a51

Browse files
Merge pull request #5 from InformaticsMatters/prototype-work-alan
Rework launcher
2 parents a7c1260 + e88b36b commit 7620a51

File tree

4 files changed

+205
-77
lines changed

4 files changed

+205
-77
lines changed

.pre-commit-config.yaml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,9 @@ repos:
7171
name: pylint
7272
entry: pylint
7373
additional_dependencies:
74-
- jinja2==3.0.3
75-
- jsonschema >= 3.2.0, < 4.0
74+
- jinja2 >= 3.1.3
75+
- jsonschema >= 4.21.1
7676
- pyyaml >= 5.3.1, < 7.0
77-
- im-protobuf >= 7.0.0
77+
- im-protobuf >= 8.2.0
78+
- im-data-manager-job-decoder >= 2.1.0
7879
files: ^workflow/.*\.py$

tests/job-definitions/job-definitions.yaml

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,24 @@ jobs:
8585

8686
shortcut-example-1-process-b:
8787
command: >-
88-
shortcut-example-1-process-b.py --inputFile {{ inputFile }}--outputFile {{ outputFile }}
88+
shortcut-example-1-process-b.py --inputFile {{ inputFile }} --outputFile {{ outputFile }}
8989
variables:
90+
inputs:
91+
type: object
92+
required:
93+
- inputFile
94+
properties:
95+
inputFile:
96+
title: Molecules to pick from
97+
mime-types:
98+
- chemical/x-mdl-sdfile
99+
type: file
90100
outputs:
91101
type: object
92102
properties:
93103
title: Output file
94104
mime-types:
95105
- chemical/x-mdl-sdfile
96-
- squonk/x-smiles
97106
creates: '{{ outputFile }}'
98107
type: file
99108
options:

workflow/workflow_engine.py

Lines changed: 137 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import sys
2828
from typing import Any, Dict, Optional
2929

30+
from decoder.decoder import TextEncoding, decode
3031
from google.protobuf.message import Message
3132
from informaticsmatters.protobuf.datamanager.pod_message_pb2 import PodMessage
3233
from informaticsmatters.protobuf.datamanager.workflow_message_pb2 import WorkflowMessage
@@ -107,7 +108,6 @@ def _handle_workflow_start_message(self, r_wfid: str) -> None:
107108
"API.get_running_workflow(%s) returned: -\n%s", r_wfid, str(rwf_response)
108109
)
109110
assert "running_user" in rwf_response
110-
launching_user_name: str = rwf_response["running_user"]
111111
# Now get the workflow definition (to get all the steps)
112112
wfid = rwf_response["workflow"]["id"]
113113
wf_response, _ = self._wapi_adapter.get_workflow(workflow_id=wfid)
@@ -117,6 +117,7 @@ def _handle_workflow_start_message(self, r_wfid: str) -> None:
117117
# and create a corresponding RunningWorkflowStep record...
118118
first_step: Dict[str, Any] = wf_response["steps"][0]
119119
first_step_name: str = first_step["name"]
120+
# We need this even if the following goes wrong.
120121
response, _ = self._wapi_adapter.create_running_workflow_step(
121122
running_workflow_id=r_wfid,
122123
step=first_step_name,
@@ -128,48 +129,9 @@ def _handle_workflow_start_message(self, r_wfid: str) -> None:
128129
str(response),
129130
)
130131
assert "id" in response
131-
r_wfsid = response["id"]
132+
r_wfsid: str = response["id"]
132133

133-
# The step's 'specification' is a string - pass it directly to the
134-
# launcher along with any (optional) 'variables'. The launcher
135-
# will apply the variables to step's Job command but we need to handle
136-
# any launch problems. The validator should have checked to ensure that
137-
# variable expansion will work, but we must prepare for the unexpected.
138-
139-
project_id = rwf_response["project"]["id"]
140-
variables: dict[str, Any] | None = rwf_response.get("variables")
141-
142-
_LOGGER.info(
143-
"Launching first step: RunningWorkflow=%s RunningWorkflowStep=%s step=%s"
144-
" (name=%s project=%s, variables=%s)",
145-
r_wfid,
146-
r_wfsid,
147-
first_step_name,
148-
rwf_response["name"],
149-
project_id,
150-
variables,
151-
)
152-
153-
lp: LaunchParameters = LaunchParameters(
154-
project_id=project_id,
155-
name=first_step_name,
156-
debug=rwf_response.get("debug"),
157-
launching_user_name=launching_user_name,
158-
launching_user_api_token=rwf_response["running_user_api_token"],
159-
specification=json.loads(first_step["specification"]),
160-
specification_variables=variables,
161-
running_workflow_id=r_wfid,
162-
running_workflow_step_id=r_wfsid,
163-
)
164-
lr: LaunchResult = self._instance_launcher.launch(launch_parameters=lp)
165-
if lr.error_num:
166-
self._set_step_error(
167-
first_step_name, r_wfid, r_wfsid, lr.error_num, lr.error_msg
168-
)
169-
else:
170-
_LOGGER.info(
171-
"Launched first step '%s' (command=%s)", first_step_name, lr.command
172-
)
134+
self._launch(wf=wf_response, rwf=rwf_response, rwfs_id=r_wfsid, step=first_step)
173135

174136
def _handle_pod_message(self, msg: PodMessage) -> None:
175137
"""Handles a PodMessage. This is a message that signals the completion of a
@@ -266,43 +228,22 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
266228
running_workflow_id=r_wfid,
267229
step=next_step_name,
268230
)
231+
assert "id" in rwfs_response
232+
r_wfsid = rwfs_response["id"]
233+
assert r_wfsid
269234
_LOGGER.debug(
270235
"API.create_running_workflow_step(%s, %s) returned: -\n%s",
271236
r_wfid,
272237
next_step_name,
273238
str(response),
274239
)
275-
assert "id" in rwfs_response
276-
new_r_wfsid: str = rwfs_response["id"]
277-
project_id: str = rwf_response["project"]["id"]
278-
variables: dict[str, Any] | None = rwf_response.get("variables")
279-
lp: LaunchParameters = LaunchParameters(
280-
project_id=project_id,
281-
name=next_step_name,
282-
debug=rwf_response.get("debug"),
283-
launching_user_name=rwf_response["running_user"],
284-
launching_user_api_token=rwf_response["running_user_api_token"],
285-
specification=json.loads(next_step["specification"]),
286-
specification_variables=variables,
287-
running_workflow_id=r_wfid,
288-
running_workflow_step_id=new_r_wfsid,
240+
241+
self._launch(
242+
wf=wf_response,
243+
rwf=rwf_response,
244+
rwfs_id=r_wfsid,
245+
step=next_step,
289246
)
290-
lr = self._instance_launcher.launch(launch_parameters=lp)
291-
# Handle a launch error?
292-
if lr.error_num:
293-
self._set_step_error(
294-
next_step_name,
295-
r_wfid,
296-
new_r_wfsid,
297-
lr.error_num,
298-
lr.error_msg,
299-
)
300-
else:
301-
_LOGGER.info(
302-
"Launched step: %s (command=%s)",
303-
next_step["name"],
304-
lr.command,
305-
)
306247

307248
# Something was started (or there was a launch error).
308249
break
@@ -317,6 +258,130 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
317258
success=True,
318259
)
319260

261+
def _validate_step_command(
262+
self,
263+
*,
264+
step: dict[str, Any],
265+
workflow_variables: dict[str, Any] | None,
266+
running_workflow_variables: dict[str, Any] | None = None,
267+
) -> str | dict[str, Any]:
268+
"""Returns an error message if the command isn't valid.
269+
Without a message we return all the variables that were (successfully)
270+
applied to the command."""
271+
# We get the Job from the step specification, which must contain
272+
# the keys "collection", "job", and "version". Here we assume that
273+
# the workflow definition has passed the RUN-level validation
274+
# which means we can get these values.
275+
step_spec: dict[str, Any] = json.loads(step["specification"])
276+
job_collection: str = step_spec["collection"]
277+
job_job: str = step_spec["job"]
278+
job_version: str = step_spec["version"]
279+
job, _ = self._wapi_adapter.get_job(
280+
collection=job_collection, job=job_job, version=job_version
281+
)
282+
_LOGGER.debug(
283+
"API.get_job(%s, %s, %s) returned: -\n%s",
284+
job_collection,
285+
job_job,
286+
job_version,
287+
str(job),
288+
)
289+
290+
# The step's 'specification' is a string - pass it directly to the
291+
# launcher along with any (optional) 'workflow variables'. The launcher
292+
# will apply the variables to step's Job command but we need to handle
293+
# any launch problems. The validator should have checked to ensure that
294+
# variable expansion will work, but we must prepare for the unexpected.
295+
#
296+
# What the engine has to do here is make sure that the definition
297+
# that's about to be launched has all its configuration requirements
298+
# satisfied (inputs, outputs and options). Basically the
299+
# command must be successfully rendered with what we have.
300+
#
301+
# To do this we give the command and our variables
302+
# to the Job Decoder's 'decode()' method. It returns a tuple (str and boolean).
303+
# If the boolean is True then the command has no undefined configuration
304+
# and can be launched. If it is False then the returned str contains an
305+
# error message.
306+
#
307+
# Remember that variables can exist in (ascending order of priority): -
308+
# 1. The specification
309+
# 2. The workflow
310+
# 2. The RunningWorkflow
311+
312+
all_variables: dict[str, Any] = {}
313+
if "variables" in step_spec:
314+
all_variables = step_spec.pop("variables")
315+
if workflow_variables:
316+
all_variables = all_variables | workflow_variables
317+
if running_workflow_variables:
318+
all_variables = all_variables | running_workflow_variables
319+
message, success = decode(
320+
job["command"], all_variables, "command", TextEncoding.JINJA2_3_0
321+
)
322+
323+
return all_variables if success else message
324+
325+
def _launch(
326+
self,
327+
*,
328+
wf: dict[str, Any],
329+
rwf: dict[str, Any],
330+
rwfs_id: str,
331+
step: dict[str, Any],
332+
) -> None:
333+
step_name: str = step["name"]
334+
rwf_id: str = rwf["id"]
335+
336+
_LOGGER.info("Validating step command: %s (step=%s)...", rwf_id, step_name)
337+
338+
# Now check the step command can be executed (by decoding it)
339+
workflow_variables: dict[str, Any] | None = wf.get("variables")
340+
running_workflow_variables: dict[str, Any] | None = rwf.get("variables")
341+
error_or_variables: str | dict[str, Any] = self._validate_step_command(
342+
step=step,
343+
workflow_variables=workflow_variables,
344+
running_workflow_variables=running_workflow_variables,
345+
)
346+
if isinstance(error_or_variables, str):
347+
error_msg = error_or_variables
348+
_LOGGER.warning(
349+
"First step '%s' failed command validation (%s)", step_name, error_msg
350+
)
351+
self._set_step_error(step_name, rwf_id, rwfs_id, 1, error_msg)
352+
return
353+
354+
project_id = rwf["project"]["id"]
355+
variables: dict[str, Any] = error_or_variables
356+
357+
_LOGGER.info(
358+
"Launching first step: RunningWorkflow=%s RunningWorkflowStep=%s step=%s"
359+
" (name=%s project=%s, variables=%s)",
360+
rwf_id,
361+
rwfs_id,
362+
step_name,
363+
rwf["name"],
364+
project_id,
365+
variables,
366+
)
367+
368+
lp: LaunchParameters = LaunchParameters(
369+
project_id=project_id,
370+
name=step_name,
371+
debug=rwf.get("debug"),
372+
launching_user_name=rwf["running_user"],
373+
launching_user_api_token=rwf["running_user_api_token"],
374+
specification=json.loads(step["specification"]),
375+
specification_variables=variables,
376+
running_workflow_id=rwf_id,
377+
running_workflow_step_id=rwfs_id,
378+
)
379+
lr: LaunchResult = self._instance_launcher.launch(launch_parameters=lp)
380+
if lr.error_num:
381+
self._set_step_error(step_name, rwf_id, rwfs_id, lr.error_num, lr.error_msg)
382+
else:
383+
_LOGGER.info("Launched first step '%s' (command=%s)", step_name, lr.command)
384+
320385
def _set_step_error(
321386
self,
322387
step_name: str,

workflow/workflow_validator.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""The WorkflowEngine validation logic."""
22

3+
import json
34
from dataclasses import dataclass
45
from enum import Enum
56
from typing import Any
@@ -47,7 +48,59 @@ def validate(
4748
if workflow_inputs:
4849
assert isinstance(workflow_inputs, dict)
4950

51+
# ALl levels require a schema validation
5052
if error := validate_schema(workflow_definition):
5153
return ValidationResult(error_num=1, error_msg=[error])
5254

55+
if level == ValidationLevel.RUN:
56+
run_level_result: ValidationResult = WorkflowValidator._validate_run_level(
57+
workflow_definition=workflow_definition,
58+
workflow_inputs=workflow_inputs,
59+
)
60+
if run_level_result.error_num:
61+
return run_level_result
62+
63+
return _VALIDATION_SUCCESS
64+
65+
@classmethod
66+
def _validate_run_level(
67+
cls,
68+
*,
69+
workflow_definition: dict[str, Any],
70+
workflow_inputs: dict[str, Any] | None = None,
71+
) -> ValidationResult:
72+
assert workflow_definition
73+
del workflow_inputs
74+
75+
# RUN level requires that the specification is a valid JSON string.
76+
# and contains properties for 'collection', 'job', and 'version'.
77+
try:
78+
specification = json.loads(workflow_definition["specification"])
79+
except json.decoder.JSONDecodeError as e:
80+
return ValidationResult(
81+
error_num=1,
82+
error_msg=[
83+
f"Error decoding specification, which is not valid JSON: {e}"
84+
],
85+
)
86+
except TypeError as e:
87+
return ValidationResult(
88+
error_num=2,
89+
error_msg=[
90+
f"Error decoding specification, which is not valid JSON: {e}"
91+
],
92+
)
93+
expected_keys: set[str] = {"collection", "job", "version"}
94+
missing_keys: list[str] = []
95+
missing_keys.extend(
96+
expected_key
97+
for expected_key in expected_keys
98+
if expected_key not in specification
99+
)
100+
if missing_keys:
101+
return ValidationResult(
102+
error_num=2,
103+
error_msg=[f"Specification is missing: {', '.join(missing_keys)}"],
104+
)
105+
53106
return _VALIDATION_SUCCESS

0 commit comments

Comments
 (0)