Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ repos:
name: pylint
entry: pylint
additional_dependencies:
- jinja2==3.0.3
- jsonschema >= 3.2.0, < 4.0
- jinja2 >= 3.1.3
- jsonschema >= 4.21.1
- pyyaml >= 5.3.1, < 7.0
- im-protobuf >= 7.0.0
- im-protobuf >= 8.2.0
- im-data-manager-job-decoder >= 2.1.0
files: ^workflow/.*\.py$
13 changes: 11 additions & 2 deletions tests/job-definitions/job-definitions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,24 @@ jobs:

shortcut-example-1-process-b:
command: >-
shortcut-example-1-process-b.py --inputFile {{ inputFile }}--outputFile {{ outputFile }}
shortcut-example-1-process-b.py --inputFile {{ inputFile }} --outputFile {{ outputFile }}
variables:
inputs:
type: object
required:
- inputFile
properties:
inputFile:
title: Molecules to pick from
mime-types:
- chemical/x-mdl-sdfile
type: file
outputs:
type: object
properties:
title: Output file
mime-types:
- chemical/x-mdl-sdfile
- squonk/x-smiles
creates: '{{ outputFile }}'
type: file
options:
Expand Down
209 changes: 137 additions & 72 deletions workflow/workflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import sys
from typing import Any, Dict, Optional

from decoder.decoder import TextEncoding, decode
from google.protobuf.message import Message
from informaticsmatters.protobuf.datamanager.pod_message_pb2 import PodMessage
from informaticsmatters.protobuf.datamanager.workflow_message_pb2 import WorkflowMessage
Expand Down Expand Up @@ -107,7 +108,6 @@ def _handle_workflow_start_message(self, r_wfid: str) -> None:
"API.get_running_workflow(%s) returned: -\n%s", r_wfid, str(rwf_response)
)
assert "running_user" in rwf_response
launching_user_name: str = rwf_response["running_user"]
# Now get the workflow definition (to get all the steps)
wfid = rwf_response["workflow"]["id"]
wf_response, _ = self._wapi_adapter.get_workflow(workflow_id=wfid)
Expand All @@ -117,6 +117,7 @@ def _handle_workflow_start_message(self, r_wfid: str) -> None:
# and create a corresponding RunningWorkflowStep record...
first_step: Dict[str, Any] = wf_response["steps"][0]
first_step_name: str = first_step["name"]
# We need this even if the following goes wrong.
response, _ = self._wapi_adapter.create_running_workflow_step(
running_workflow_id=r_wfid,
step=first_step_name,
Expand All @@ -128,48 +129,9 @@ def _handle_workflow_start_message(self, r_wfid: str) -> None:
str(response),
)
assert "id" in response
r_wfsid = response["id"]
r_wfsid: str = response["id"]

# The step's 'specification' is a string - pass it directly to the
# launcher along with any (optional) 'variables'. The launcher
# will apply the variables to step's Job command but we need to handle
# any launch problems. The validator should have checked to ensure that
# variable expansion will work, but we must prepare for the unexpected.

project_id = rwf_response["project"]["id"]
variables: dict[str, Any] | None = rwf_response.get("variables")

_LOGGER.info(
"Launching first step: RunningWorkflow=%s RunningWorkflowStep=%s step=%s"
" (name=%s project=%s, variables=%s)",
r_wfid,
r_wfsid,
first_step_name,
rwf_response["name"],
project_id,
variables,
)

lp: LaunchParameters = LaunchParameters(
project_id=project_id,
name=first_step_name,
debug=rwf_response.get("debug"),
launching_user_name=launching_user_name,
launching_user_api_token=rwf_response["running_user_api_token"],
specification=json.loads(first_step["specification"]),
specification_variables=variables,
running_workflow_id=r_wfid,
running_workflow_step_id=r_wfsid,
)
lr: LaunchResult = self._instance_launcher.launch(launch_parameters=lp)
if lr.error_num:
self._set_step_error(
first_step_name, r_wfid, r_wfsid, lr.error_num, lr.error_msg
)
else:
_LOGGER.info(
"Launched first step '%s' (command=%s)", first_step_name, lr.command
)
self._launch(wf=wf_response, rwf=rwf_response, rwfs_id=r_wfsid, step=first_step)

def _handle_pod_message(self, msg: PodMessage) -> None:
"""Handles a PodMessage. This is a message that signals the completion of a
Expand Down Expand Up @@ -266,43 +228,22 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
running_workflow_id=r_wfid,
step=next_step_name,
)
assert "id" in rwfs_response
r_wfsid = rwfs_response["id"]
assert r_wfsid
_LOGGER.debug(
"API.create_running_workflow_step(%s, %s) returned: -\n%s",
r_wfid,
next_step_name,
str(response),
)
assert "id" in rwfs_response
new_r_wfsid: str = rwfs_response["id"]
project_id: str = rwf_response["project"]["id"]
variables: dict[str, Any] | None = rwf_response.get("variables")
lp: LaunchParameters = LaunchParameters(
project_id=project_id,
name=next_step_name,
debug=rwf_response.get("debug"),
launching_user_name=rwf_response["running_user"],
launching_user_api_token=rwf_response["running_user_api_token"],
specification=json.loads(next_step["specification"]),
specification_variables=variables,
running_workflow_id=r_wfid,
running_workflow_step_id=new_r_wfsid,

self._launch(
wf=wf_response,
rwf=rwf_response,
rwfs_id=r_wfsid,
step=next_step,
)
lr = self._instance_launcher.launch(launch_parameters=lp)
# Handle a launch error?
if lr.error_num:
self._set_step_error(
next_step_name,
r_wfid,
new_r_wfsid,
lr.error_num,
lr.error_msg,
)
else:
_LOGGER.info(
"Launched step: %s (command=%s)",
next_step["name"],
lr.command,
)

# Something was started (or there was a launch error).
break
Expand All @@ -317,6 +258,130 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
success=True,
)

def _validate_step_command(
self,
*,
step: dict[str, Any],
workflow_variables: dict[str, Any] | None,
running_workflow_variables: dict[str, Any] | None = None,
) -> str | dict[str, Any]:
"""Returns an error message if the command isn't valid.
Without a message we return all the variables that were (successfully)
applied to the command."""
# We get the Job from the step specification, which must contain
# the keys "collection", "job", and "version". Here we assume that
# the workflow definition has passed the RUN-level validation
# which means we can get these values.
step_spec: dict[str, Any] = json.loads(step["specification"])
job_collection: str = step_spec["collection"]
job_job: str = step_spec["job"]
job_version: str = step_spec["version"]
job, _ = self._wapi_adapter.get_job(
collection=job_collection, job=job_job, version=job_version
)
_LOGGER.debug(
"API.get_job(%s, %s, %s) returned: -\n%s",
job_collection,
job_job,
job_version,
str(job),
)

# The step's 'specification' is a string - pass it directly to the
# launcher along with any (optional) 'workflow variables'. The launcher
# will apply the variables to step's Job command but we need to handle
# any launch problems. The validator should have checked to ensure that
# variable expansion will work, but we must prepare for the unexpected.
#
# What the engine has to do here is make sure that the definition
# that's about to be launched has all its configuration requirements
# satisfied (inputs, outputs and options). Basically the
# command must be successfully rendered with what we have.
#
# To do this we give the command and our variables
# to the Job Decoder's 'decode()' method. It returns a tuple (str and boolean).
# If the boolean is True then the command has no undefined configuration
# and can be launched. If it is False then the returned str contains an
# error message.
#
# Remember that variables can exist in (ascending order of priority): -
# 1. The specification
# 2. The workflow
# 2. The RunningWorkflow

all_variables: dict[str, Any] = {}
if "variables" in step_spec:
all_variables = step_spec.pop("variables")
if workflow_variables:
all_variables = all_variables | workflow_variables
if running_workflow_variables:
all_variables = all_variables | running_workflow_variables
message, success = decode(
job["command"], all_variables, "command", TextEncoding.JINJA2_3_0
)

return all_variables if success else message

def _launch(
self,
*,
wf: dict[str, Any],
rwf: dict[str, Any],
rwfs_id: str,
step: dict[str, Any],
) -> None:
step_name: str = step["name"]
rwf_id: str = rwf["id"]

_LOGGER.info("Validating step command: %s (step=%s)...", rwf_id, step_name)

# Now check the step command can be executed (by decoding it)
workflow_variables: dict[str, Any] | None = wf.get("variables")
running_workflow_variables: dict[str, Any] | None = rwf.get("variables")
error_or_variables: str | dict[str, Any] = self._validate_step_command(
step=step,
workflow_variables=workflow_variables,
running_workflow_variables=running_workflow_variables,
)
if isinstance(error_or_variables, str):
error_msg = error_or_variables
_LOGGER.warning(
"First step '%s' failed command validation (%s)", step_name, error_msg
)
self._set_step_error(step_name, rwf_id, rwfs_id, 1, error_msg)
return

project_id = rwf["project"]["id"]
variables: dict[str, Any] = error_or_variables

_LOGGER.info(
"Launching first step: RunningWorkflow=%s RunningWorkflowStep=%s step=%s"
" (name=%s project=%s, variables=%s)",
rwf_id,
rwfs_id,
step_name,
rwf["name"],
project_id,
variables,
)

lp: LaunchParameters = LaunchParameters(
project_id=project_id,
name=step_name,
debug=rwf.get("debug"),
launching_user_name=rwf["running_user"],
launching_user_api_token=rwf["running_user_api_token"],
specification=json.loads(step["specification"]),
specification_variables=variables,
running_workflow_id=rwf_id,
running_workflow_step_id=rwfs_id,
)
lr: LaunchResult = self._instance_launcher.launch(launch_parameters=lp)
if lr.error_num:
self._set_step_error(step_name, rwf_id, rwfs_id, lr.error_num, lr.error_msg)
else:
_LOGGER.info("Launched first step '%s' (command=%s)", step_name, lr.command)

def _set_step_error(
self,
step_name: str,
Expand Down
53 changes: 53 additions & 0 deletions workflow/workflow_validator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""The WorkflowEngine validation logic."""

import json
from dataclasses import dataclass
from enum import Enum
from typing import Any
Expand Down Expand Up @@ -47,7 +48,59 @@ def validate(
if workflow_inputs:
assert isinstance(workflow_inputs, dict)

# ALl levels require a schema validation
if error := validate_schema(workflow_definition):
return ValidationResult(error_num=1, error_msg=[error])

if level == ValidationLevel.RUN:
run_level_result: ValidationResult = WorkflowValidator._validate_run_level(
workflow_definition=workflow_definition,
workflow_inputs=workflow_inputs,
)
if run_level_result.error_num:
return run_level_result

return _VALIDATION_SUCCESS

@classmethod
def _validate_run_level(
cls,
*,
workflow_definition: dict[str, Any],
workflow_inputs: dict[str, Any] | None = None,
) -> ValidationResult:
assert workflow_definition
del workflow_inputs

# RUN level requires that the specification is a valid JSON string.
# and contains properties for 'collection', 'job', and 'version'.
try:
specification = json.loads(workflow_definition["specification"])
except json.decoder.JSONDecodeError as e:
return ValidationResult(
error_num=1,
error_msg=[
f"Error decoding specification, which is not valid JSON: {e}"
],
)
except TypeError as e:
return ValidationResult(
error_num=2,
error_msg=[
f"Error decoding specification, which is not valid JSON: {e}"
],
)
expected_keys: set[str] = {"collection", "job", "version"}
missing_keys: list[str] = []
missing_keys.extend(
expected_key
for expected_key in expected_keys
if expected_key not in specification
)
if missing_keys:
return ValidationResult(
error_num=2,
error_msg=[f"Specification is missing: {', '.join(missing_keys)}"],
)

return _VALIDATION_SUCCESS