Skip to content

Commit 5a9903b

Browse files
author
Alan Christie
committed
feat: Some work on the refactored engine
1 parent b9e3f00 commit 5a9903b

File tree

5 files changed

+88
-157
lines changed

5 files changed

+88
-157
lines changed

tests/test_workflow_engine_examples.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ def wait_for_workflow(
130130
# are the responsibility of the caller.
131131
attempts = 0
132132
done = False
133+
response = None
133134
while not done:
134135
response, _ = da.get_running_workflow(running_workflow_id=r_wfid)
135136
if response["done"]:
@@ -141,6 +142,7 @@ def wait_for_workflow(
141142
time.sleep(completion_poll_period_s)
142143
# When we get here the workflow must have finished (not timed-out),
143144
# and it must have passed (or failed) according the the caller's expectation.
145+
assert response
144146
assert response["done"]
145147
assert response["success"] == expect_success
146148

@@ -400,7 +402,7 @@ def test_workflow_engine_simple_python_molprops_with_options(basic_engine):
400402
"simple-python-molprops-with-options",
401403
{
402404
"candidateMolecules": input_file_1,
403-
"outputFile": output_file_1,
405+
"clusteredMolecules": output_file_2,
404406
"rdkitPropertyName": "prop",
405407
"rdkitPropertyValue": 1.2,
406408
},

tests/workflow-definitions/example-smiles-to-file.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,12 @@ steps:
1313
collection: workflow-engine-unit-test-jobs
1414
job: smiles-to-file
1515
version: "1.0.0"
16+
variable-mapping:
17+
- variable: outputFile
18+
from-workflow:
19+
variable: outputFile
20+
- variable: smiles
21+
from-workflow:
22+
variable: smiles
23+
out:
24+
- outputFile

tests/workflow-definitions/simple-python-molprops-with-options.yaml

Lines changed: 3 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -4,62 +4,15 @@ kind-version: "2025.2"
44
name: python-workflow
55
description: A simple python experimental workflow
66

7-
# Some meaningless variables.
8-
# Just to make sure the decoder accepts this.
9-
# The Workflow engin eis not (yet) interested in this block.
10-
variables:
11-
inputs:
12-
type: object
13-
required:
14-
- inputFile
15-
properties:
16-
inputFile:
17-
title: Molecules to pick from
18-
mime-types:
19-
- squonk/x-smiles
20-
type: file
21-
seeds:
22-
title: Molecules that are already picked
23-
mime-types:
24-
- squonk/x-smiles
25-
type: file
26-
multiple: true
27-
outputs:
28-
type: object
29-
properties:
30-
outputFile:
31-
title: Output file
32-
mime-types:
33-
- chemical/x-csv
34-
creates: '{{ outputFile }}'
35-
type: file
36-
options:
37-
type: object
38-
required:
39-
- count
40-
properties:
41-
outputFile:
42-
title: Output file name
43-
type: string
44-
pattern: "^[A-Za-z0-9_/\\.\\-]+$"
45-
default: diverse.smi
46-
count:
47-
title: Number of molecules to pick
48-
type: integer
49-
minimum: 1
50-
threshold:
51-
title: Similarity threshold
52-
type: number
53-
minimum: 0
54-
maximum: 1
55-
567
steps:
578
- name: step1
589
description: Add column 1
5910
specification:
6011
collection: workflow-engine-unit-test-jobs
6112
job: rdkit-molprops
6213
version: "1.0.0"
14+
variables:
15+
outputFile: step1.out.smi
6316
variable-mapping:
6417
- variable: inputFile
6518
from-workflow:
@@ -70,9 +23,6 @@ steps:
7023
- variable: value
7124
from-workflow:
7225
variable: rdkitPropertyValue
73-
- variable: outputFile
74-
from-workflow:
75-
variable: clusteredMolecules
7626

7727
- name: step2
7828
description: Add column 2
@@ -90,6 +40,6 @@ steps:
9040
variable: outputFile
9141
- variable: outputFile
9242
from-workflow:
93-
variable: outputFile
43+
variable: clusteredMolecules
9444
out:
9545
- outputFile

workflow/decoder.py

Lines changed: 35 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
"""
55

66
import os
7-
from pprint import pprint
87
from typing import Any
98

109
import jsonschema
@@ -108,87 +107,41 @@ def get_step_input_variable_names(
108107
return variable_names
109108

110109

111-
def set_step_variables(
112-
*,
113-
workflow: dict[str, Any],
114-
inputs: list[dict[str, Any]],
115-
outputs: list[dict[str, Any]],
116-
step_outputs: dict[str, Any],
117-
previous_step_outputs: list[dict[str, Any]],
118-
workflow_variables: dict[str, Any],
119-
step_name: str,
120-
) -> dict[str, Any]:
121-
"""Prepare input- and output variables for the following step.
122-
123-
Inputs are defined in step definition but their values may
124-
come from previous step outputs.
125-
"""
126-
assert workflow
127-
128-
result = {}
129-
130-
print("ssv: wf vars:")
131-
pprint(workflow_variables)
132-
print("ssv: inputs:")
133-
pprint(inputs)
134-
print("ssv: outputs", outputs)
135-
print("ssv: step_outputs", step_outputs)
136-
print("ssv: prev step outputs", previous_step_outputs)
137-
print("ssv: step_name", step_name)
138-
139-
for item in inputs:
140-
p_key = item["input"]
141-
p_val = ""
142-
val = item["from"]
143-
if "workflow-input" in val.keys():
144-
p_val = workflow_variables[val["workflow-input"]]
145-
result[p_key] = p_val
146-
elif "step" in val.keys():
147-
# this links the variable to previous step output
148-
if previous_step_outputs:
149-
for out in previous_step_outputs:
150-
if out["output"] == val["output"]:
151-
# p_val = out["as"]
152-
if step_outputs["output"]:
153-
p_val = step_outputs["output"]
154-
print("\n!!!!!!!!!!!!!if clause!!!!!!!!!!!!!!!!!!!!!\n")
155-
print(p_val)
156-
else:
157-
# what do I need to do here??
158-
print("\n!!!!!!!!!!!!!else clause!!!!!!!!!!!!!!!!!!!!!\n")
159-
print(out)
160-
print(val)
161-
162-
# this bit handles multiple inputs: if a step
163-
# requires input from multiple steps, add them to
164-
# the list in result dict. this is the reason for
165-
# mypy ignore statements, mypy doesn't understand
166-
# redefinition
167-
if p_key in result:
168-
if not isinstance(result[p_key], set):
169-
result[p_key] = {result[p_key]} # type: ignore [assignment]
170-
result[p_key].add(p_val) # type: ignore [attr-defined]
171-
else:
172-
result[p_key] = p_val
173-
else:
174-
if val["output"] in workflow_variables:
175-
result[p_key] = workflow_variables[val["output"]]
176-
177-
for item in outputs:
178-
p_key = item["output"]
179-
# p_val = item["as"]
180-
# p_val = step_outputs["output"]
181-
p_val = "somefile.smi"
182-
result[p_key] = p_val
183-
184-
# options = set_variables_from_options_for_step(
185-
# definition=workflow,
186-
# variables=workflow_variables,
187-
# step_name=step_name,
188-
# )
189-
#
190-
# result |= options
191-
return result
110+
def get_step_workflow_variable_mapping(
111+
*, step: dict[str, Any]
112+
) -> list[tuple[str, str]]:
113+
"""Returns a list of workflow vaiable name to step variable name tuples
114+
for the given step."""
115+
variable_mapping: list[tuple[str, str]] = []
116+
if "variable-mapping" in step:
117+
for v_map in step["variable-mapping"]:
118+
if "from-workflow" in v_map:
119+
# Tuple is "from" -> "to"
120+
variable_mapping.append(
121+
(v_map["from-workflow"]["variable"], v_map["variable"])
122+
)
123+
return variable_mapping
124+
125+
126+
def get_step_prior_step_variable_mapping(
127+
*, step: dict[str, Any]
128+
) -> dict[str, list[tuple[str, str]]]:
129+
"""Returns list of tuples, indexed by prior step name, of source step vaiable name
130+
to this step's variable name."""
131+
variable_mapping: dict[str, list[tuple[str, str]]] = {}
132+
if "variable-mapping" in step:
133+
for v_map in step["variable-mapping"]:
134+
if "from-step" in v_map:
135+
step_name = v_map["from-step"]["name"]
136+
step_variable = v_map["from-step"]["variable"]
137+
# Tuple is "from" -> "to"
138+
if step_name in variable_mapping:
139+
variable_mapping[step_name].append(
140+
(step_variable, v_map["variable"])
141+
)
142+
else:
143+
variable_mapping[step_name] = [(step_variable, v_map["variable"])]
144+
return variable_mapping
192145

193146

194147
def get_step_replicator(*, step: dict[str, Any]) -> str | Any:

workflow/workflow_engine.py

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,9 @@
4141

4242
from .decoder import (
4343
get_step_input_variable_names,
44+
get_step_prior_step_variable_mapping,
4445
get_step_replicator,
45-
set_step_variables,
46+
get_step_workflow_variable_mapping,
4647
)
4748

4849
_LOGGER: logging.Logger = logging.getLogger(__name__)
@@ -485,16 +486,6 @@ def _validate_step_command(
485486

486487
print("final prev step outputs", previous_step_outputs)
487488

488-
step_vars = set_step_variables(
489-
workflow=workflow,
490-
workflow_variables=all_variables,
491-
inputs=inputs,
492-
outputs=outputs,
493-
step_outputs=step_outputs,
494-
previous_step_outputs=previous_step_outputs,
495-
step_name=running_wf_step["name"],
496-
)
497-
all_variables |= step_vars
498489
_LOGGER.debug(
499490
"Index %s (%s) all_variables=%s",
500491
our_step_index,
@@ -533,27 +524,52 @@ def _launch(
533524
wf_step_data, _ = self._wapi_adapter.get_workflow_steps_driving_this_step(
534525
running_workflow_step_id=rwfs_id,
535526
)
536-
print("wf_step_data")
537-
pprint(wf_step_data)
538527
assert wf_step_data["caller_step_index"] >= 0
539528
our_step_index: int = wf_step_data["caller_step_index"]
540529

541530
print("step in _launch:", step_name)
542531
pprint(step)
543532

533+
# Workflow variables set by the user...
534+
rwf_variables: dict[str, Any] = rwf.get("variables", {})
535+
544536
# Now check the step command can be executed
545537
# (by trying to decoding the Job command).
538+
# Before we do this we have to construct the variable map
539+
# for this step.
546540
#
547-
# We pass in the workflow variables (these are provided by the user
548-
# when the workflow is run. All workflow variables will be present in the
549-
# running workflow record)
550-
running_workflow_variables: dict[str, Any] | None = rwf.get("variables")
541+
# We start with any variables provided in the step specification
542+
all_variables: dict[str, Any] = step["specification"].get("variables", {})
543+
# We now have to iterate through the step's variable mapping block.
544+
# This will name any workflow variables (from)
545+
# and their corresponding step variable (to).
546+
step_wf_v_map: list[tuple[str, str]] = get_step_workflow_variable_mapping(
547+
step=step
548+
)
549+
for from_to in step_wf_v_map:
550+
all_variables[from_to[1]] = rwf_variables[from_to[0]]
551+
# We must now apply variables from prior steps identified in the
552+
# current step's mapping block. We're given a map indexed by
553+
# prior step name that's a list of tuples name the prior step's
554+
# variable (from) and the curent step variable (to).
555+
step_prior_v_map: dict[str, list[tuple[str, str]]] = (
556+
get_step_prior_step_variable_mapping(step=step)
557+
)
558+
for prior_step_name, v_map in step_prior_v_map.items():
559+
# Load the prior step
560+
prior_step, _ = self._wapi_adapter.get_running_workflow_step_by_name(
561+
name=prior_step_name, running_workflow_id=rwf_id
562+
)
563+
# Get its variables and copy the value
564+
for from_to in v_map:
565+
all_variables[from_to[1]] = prior_step["variables"][from_to[0]]
566+
551567
error_or_variables: str | dict[str, Any] = self._validate_step_command(
552568
running_workflow_step_id=rwfs_id,
553569
step=step,
554570
workflow_steps=wf_step_data["steps"],
555571
our_step_index=our_step_index,
556-
running_workflow_variables=running_workflow_variables,
572+
running_workflow_variables=all_variables,
557573
)
558574
if isinstance(error_or_variables, str):
559575
error_msg = error_or_variables
@@ -570,13 +586,14 @@ def _launch(
570586

571587
_LOGGER.info(
572588
"Launching step: RunningWorkflow=%s RunningWorkflowStep=%s step=%s"
573-
" (name=%s project=%s, variables=%s)",
589+
" variables=%s name=%s project=%s, (all_variables=%s)",
574590
rwf_id,
575591
rwfs_id,
576592
step_name,
593+
variables,
577594
rwf["name"],
578595
project_id,
579-
variables,
596+
all_variables,
580597
)
581598

582599
# When we launch a step we need to identify all the prior steps in the workflow,
@@ -593,7 +610,7 @@ def _launch(
593610
prior_steps: list[str] = []
594611
if our_step_index > 0:
595612
# We need the step ID of the prior step.
596-
prior_step_name: str = wf_step_data["steps"][our_step_index - 1]["name"]
613+
prior_step_name = wf_step_data["steps"][our_step_index - 1]["name"]
597614
step_response, _ = self._wapi_adapter.get_running_workflow_step_by_name(
598615
name=prior_step_name,
599616
running_workflow_id=rwf_id,

0 commit comments

Comments
 (0)