Skip to content

Commit 8f1c098

Browse files
author
Alan Christie
committed
fix: Major refactoring of logic (for new launch/workflow API)
1 parent 77e3cff commit 8f1c098

File tree

6 files changed

+121
-247
lines changed

6 files changed

+121
-247
lines changed

tests/instance_launcher.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,18 +68,32 @@ def __init__(
6868
elif os.path.isdir(file_path):
6969
shutil.rmtree(file_path)
7070

71-
def launch(self, launch_parameters: LaunchParameters) -> LaunchResult:
71+
def launch(self, *, launch_parameters: LaunchParameters) -> LaunchResult:
7272
assert launch_parameters
7373
assert launch_parameters.project_id == TEST_PROJECT_ID
7474
assert launch_parameters.specification
7575
assert isinstance(launch_parameters.specification, dict)
7676

7777
os.makedirs(EXECUTION_DIRECTORY, exist_ok=True)
7878

79-
# Create an Instance record (and dummy Task ID)
80-
response = self._api_adapter.create_instance(
81-
running_workflow_step_id=launch_parameters.running_workflow_step_id
79+
# Create a running workflow step
80+
assert launch_parameters.running_workflow_id
81+
assert launch_parameters.step_name
82+
response, _ = self._api_adapter.create_running_workflow_step(
83+
running_workflow_id=launch_parameters.running_workflow_id,
84+
step=launch_parameters.step_name,
85+
replica=launch_parameters.step_replication_number,
8286
)
87+
assert "id" in response
88+
rwfs_id: str = response["id"]
89+
# And add the variables we've been provided with
90+
if launch_parameters.variables:
91+
_ = self._api_adapter.set_running_workflow_step_variables(
92+
running_workflow_step_id=rwfs_id, variables=launch_parameters.variables
93+
)
94+
95+
# Create an Instance record (and dummy Task ID)
96+
response = self._api_adapter.create_instance(running_workflow_step_id=rwfs_id)
8397
instance_id = response["id"]
8498
task_id = "task-00000000-0000-0000-0000-000000000001"
8599

@@ -96,8 +110,8 @@ def launch(self, launch_parameters: LaunchParameters) -> LaunchResult:
96110
# The command may not need any, but we do the decoding anyway.
97111
decoded_command, status = job_decoder.decode(
98112
job["command"],
99-
launch_parameters.specification_variables,
100-
launch_parameters.running_workflow_step_id,
113+
launch_parameters.variables,
114+
rwfs_id,
101115
TextEncoding.JINJA2_3_0,
102116
)
103117
print(f"Decoded command: {decoded_command}")
@@ -129,6 +143,7 @@ def launch(self, launch_parameters: LaunchParameters) -> LaunchResult:
129143
self._msg_dispatcher.send(pod_message)
130144

131145
return LaunchResult(
146+
running_workflow_step_id=rwfs_id,
132147
instance_id=instance_id,
133148
task_id=task_id,
134149
command=" ".join(subprocess_cmd),

tests/test_test_instance_launcher.py

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -33,23 +33,18 @@ def test_launch_nop(basic_launcher):
3333
project_id=TEST_PROJECT_ID,
3434
variables={},
3535
)
36-
response, _ = utaa.create_running_workflow_step(
37-
running_workflow_id=response["id"], step="step-1"
38-
)
39-
rwfsid = response["id"]
4036
lp: LaunchParameters = LaunchParameters(
4137
project_id=TEST_PROJECT_ID,
4238
name="Test Instance",
4339
launching_user_name="dlister",
4440
launching_user_api_token="1234567890",
4541
running_workflow_id=rwfid,
46-
running_workflow_step_id=rwfsid,
42+
step_name="step-1",
4743
specification={"collection": "workflow-engine-unit-test-jobs", "job": "nop"},
48-
specification_variables={},
4944
)
5045

5146
# Act
52-
result = launcher.launch(lp)
47+
result = launcher.launch(launch_parameters=lp)
5348

5449
# Assert
5550
assert result.error_num == 0
@@ -69,26 +64,21 @@ def test_launch_nop_fail(basic_launcher):
6964
variables={},
7065
)
7166
rwfid = response["id"]
72-
response, _ = utaa.create_running_workflow_step(
73-
running_workflow_id=response["id"], step="step-1"
74-
)
75-
rwfsid = response["id"]
7667
lp: LaunchParameters = LaunchParameters(
7768
project_id=TEST_PROJECT_ID,
7869
name="Test Instance",
7970
launching_user_name="dlister",
8071
launching_user_api_token="1234567890",
8172
running_workflow_id=rwfid,
82-
running_workflow_step_id=rwfsid,
73+
step_name="step-1",
8374
specification={
8475
"collection": "workflow-engine-unit-test-jobs",
8576
"job": "nop-fail",
8677
},
87-
specification_variables={},
8878
)
8979

9080
# Act
91-
result = launcher.launch(lp)
81+
result = launcher.launch(launch_parameters=lp)
9282

9383
# Assert
9484
assert result.error_num == 0
@@ -108,26 +98,22 @@ def test_launch_smiles_to_file(basic_launcher):
10898
variables={},
10999
)
110100
rwfid = response["id"]
111-
response, _ = utaa.create_running_workflow_step(
112-
running_workflow_id=response["id"], step="step-1"
113-
)
114-
rwfsid = response["id"]
115101
lp: LaunchParameters = LaunchParameters(
116102
project_id=TEST_PROJECT_ID,
117103
name="Test Instance",
118104
launching_user_name="dlister",
119105
launching_user_api_token="1234567890",
120106
running_workflow_id=rwfid,
121-
running_workflow_step_id=rwfsid,
107+
step_name="step-1",
122108
specification={
123109
"collection": "workflow-engine-unit-test-jobs",
124110
"job": "smiles-to-file",
125111
},
126-
specification_variables={"smiles": "C1=CC=CC=C1", "outputFile": "output.smi"},
112+
variables={"smiles": "C1=CC=CC=C1", "outputFile": "output.smi"},
127113
)
128114

129115
# Act
130-
result = launcher.launch(lp)
116+
result = launcher.launch(launch_parameters=lp)
131117

132118
# Assert
133119
assert result.error_num == 0

tests/test_workflow_engine_examples.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,7 @@ def test_workflow_engine_simple_python_molprops_with_options(basic_engine):
398398
assert project_file_exists(output_file_2)
399399

400400

401+
@pytest.mark.skip(reason="WIP")
401402
def test_workflow_engine_simple_python_fanout(basic_engine):
402403
# Arrange
403404
md, da = basic_engine

tests/workflow-definitions/simple-python-fanout.yaml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@ steps:
1313
job: splitsmiles
1414
version: "1.0.0"
1515
variables:
16-
name: "count"
16+
name: count
1717
value: "1"
18+
outputFile: results.smi
1819
variable-mapping:
1920
- variable: inputFile
2021
from-workflow:
@@ -27,8 +28,9 @@ steps:
2728
job: append-col
2829
version: "1.0.0"
2930
variables:
30-
name: "desc1"
31+
name: desc1
3132
value: "777"
33+
outputFile: results.smi
3234
replicate:
3335
using:
3436
variable: inputFile

workflow/workflow_abc.py

Lines changed: 12 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -27,36 +27,23 @@ class LaunchParameters:
2727
specification: dict[str, Any]
2828
# An alternative way to pass variables to the specification.
2929
# If used it will replace any 'variables' already present in the specification.
30-
specification_variables: dict[str, Any] | None = None
30+
variables: dict[str, Any] | None = None
3131
# A string. In DM v4 converted to a boolean and set in the
3232
# instance Pod as a label. Setting this means the Instances
3333
# that are created will not be automatically removed by the Job operator.
3434
debug: str | None = None
3535
# The RunningWorkflow UUID.
3636
# Required if the Instance is part of a Workflow step.
3737
running_workflow_id: str | None = None
38-
# The RunningWorkflowStep UUID.
38+
# The RunningWorkflow's step name.
3939
# Required if the Instance is part of a Workflow step.
40-
running_workflow_step_id: str | None = None
41-
# A list of prior workflow steps that this step depends upon.
42-
#
43-
# This list gives the InstanceLauncher an opportunity to take the outputs
44-
# of a prior instance and link them to the instance directory for the
45-
# instance to be launched. We need to do this for Workflows because Instances
46-
# run as apart of a Workflow do not automatically have their outputs copied (linked)
47-
# to the Project directory when they complete. As an example, a step that relies
48-
# on the output files from two prior steps will provide the following list: -
49-
#
50-
# ["r-workflow-step-a04d", "r-workflow-step-d904"]
51-
running_workflow_step_prior_steps: list[str] | None = None
52-
# Workflow step Job inputs (for this step Instance). These Workflow Inputs (files)
53-
# are a list of Job input variable names for file variables where the
54-
# file is expected to be present in the Project directory. It is simply a list of
55-
# Job variable names. The launcher is expected to find the 'value' of these
56-
# variables and then move the file to the instance directory.
57-
#
58-
# ["inputFile"]
59-
running_workflow_step_inputs: list[str] | None = None
40+
step_name: str | None = None
41+
# The step replication number.
42+
# If only one instance of the step is expected to run
43+
# this value can be left at 0 (zero). If this step's launch
44+
# is expected to be executed more than once the value should be
45+
# non-zero (and unique for this workflow run).
46+
step_replication_number: int = 0
6047
# The application ID (a custom resource name)
6148
# used to identify the 'type' of Instance to create.
6249
# For DM Jobs this will be 'datamanagerjobs.squonk.it'
@@ -75,6 +62,9 @@ class LaunchResult:
7562
# The following optional properties
7663
# may not be present if there's a launch error.
7764
#
65+
# A running workflow step UUID
66+
# (if the step is part of a running workflow)
67+
running_workflow_step_id: str | None = None
7868
# The Instance UUID that was created for you.
7969
instance_id: str | None = None
8070
# The Task UUID that is handling the Instance launch
@@ -94,7 +84,6 @@ def launch(
9484
self,
9585
*,
9686
launch_parameters: LaunchParameters,
97-
**kwargs: str,
9887
) -> LaunchResult:
9988
"""Launch a (Job) Instance"""
10089

@@ -199,25 +188,6 @@ def set_running_workflow_done(
199188
"""Set the success value for a RunningWorkflow Record.
200189
If not successful an error code and message should be provided."""
201190

202-
@abstractmethod
203-
def create_running_workflow_step(
204-
self,
205-
*,
206-
running_workflow_id: str,
207-
step: str,
208-
replica: int = 0,
209-
prior_running_workflow_step_id: str | None = None,
210-
) -> tuple[dict[str, Any], int]:
211-
"""Create a RunningWorkflowStep Record (from a RunningWorkflow).
212-
If this is a replica (concurrent execution) of a step the replica
213-
value must be set to a value greater than 0. The replica is unique
214-
for a given step and is used to distinguish between running workflow steps
215-
generated from the same step name."""
216-
# Should return:
217-
# {
218-
# "id": "r-workflow-step-00000000-0000-0000-0000-000000000001",
219-
# }
220-
221191
@abstractmethod
222192
def get_running_workflow_step(
223193
self, *, running_workflow_step_id: str
@@ -292,17 +262,6 @@ def get_running_workflow_step_by_name(
292262
# "id": "r-workflow-step-00000000-0000-0000-0000-000000000001",
293263
# },
294264

295-
@abstractmethod
296-
def set_running_workflow_step_variables(
297-
self,
298-
*,
299-
running_workflow_step_id: str,
300-
variables: dict[str, Any],
301-
) -> None:
302-
"""Set the variables used prior to decoding the step command for each step.
303-
This can be used to understand step failures but will also be vital
304-
when adding variables values to subsequent steps from prior step values."""
305-
306265
@abstractmethod
307266
def set_running_workflow_step_done(
308267
self,

0 commit comments

Comments
 (0)