Skip to content

Commit 19e58ae

Browse files
author
Alan Christie
committed
refactor: More combiner logic
1 parent c08ed5c commit 19e58ae

File tree

6 files changed

+119
-159
lines changed

6 files changed

+119
-159
lines changed

tests/instance_launcher.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,12 @@ def launch(self, *, launch_parameters: LaunchParameters) -> LaunchResult:
7676

7777
os.makedirs(EXECUTION_DIRECTORY, exist_ok=True)
7878

79+
if launch_parameters.step_replication_number:
80+
assert (
81+
launch_parameters.step_replication_number
82+
<= launch_parameters.total_number_of_replicas
83+
)
84+
7985
# Create a running workflow step
8086
assert launch_parameters.running_workflow_id
8187
assert launch_parameters.step_name

tests/test_test_wapi_adapter.py

Lines changed: 0 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -421,72 +421,6 @@ def test_create_instance_and_get_step_instance_directory_by_name():
421421
assert response["instance_directory"] == f".{i_id}"
422422

423423

424-
def test_get_workflow_steps_driving_this_step_when_1st_step():
425-
# Arrange
426-
utaa = UnitTestWorkflowAPIAdapter()
427-
response = utaa.create_workflow(
428-
workflow_definition={
429-
"name": "blah",
430-
"steps": [{"name": "step-1"}, {"name": "step-2"}, {"name": "step-3"}],
431-
}
432-
)
433-
response = utaa.create_running_workflow(
434-
user_id="dlister",
435-
workflow_id=response["id"],
436-
project_id=TEST_PROJECT_ID,
437-
variables={},
438-
)
439-
response, _ = utaa.create_running_workflow_step(
440-
running_workflow_id=response["id"], step="step-1"
441-
)
442-
rwfs_id = response["id"]
443-
444-
# Act
445-
response, _ = utaa.get_workflow_steps_driving_this_step(
446-
running_workflow_step_id=rwfs_id
447-
)
448-
449-
# Assert
450-
assert response["caller_step_index"] == 0
451-
assert len(response["steps"]) == 3
452-
assert response["steps"][0]["name"] == "step-1"
453-
assert response["steps"][1]["name"] == "step-2"
454-
assert response["steps"][2]["name"] == "step-3"
455-
456-
457-
def test_get_workflow_steps_driving_this_step_when_2nd_step():
458-
# Arrange
459-
utaa = UnitTestWorkflowAPIAdapter()
460-
response = utaa.create_workflow(
461-
workflow_definition={
462-
"name": "blah",
463-
"steps": [{"name": "step-1"}, {"name": "step-2"}, {"name": "step-3"}],
464-
}
465-
)
466-
response = utaa.create_running_workflow(
467-
user_id="dlister",
468-
workflow_id=response["id"],
469-
project_id=TEST_PROJECT_ID,
470-
variables={},
471-
)
472-
response, _ = utaa.create_running_workflow_step(
473-
running_workflow_id=response["id"], step="step-2"
474-
)
475-
rwfs_id = response["id"]
476-
477-
# Act
478-
response, _ = utaa.get_workflow_steps_driving_this_step(
479-
running_workflow_step_id=rwfs_id
480-
)
481-
482-
# Assert
483-
assert response["caller_step_index"] == 1
484-
assert len(response["steps"]) == 3
485-
assert response["steps"][0]["name"] == "step-1"
486-
assert response["steps"][1]["name"] == "step-2"
487-
assert response["steps"][2]["name"] == "step-3"
488-
489-
490424
def test_get_running_workflow_step_by_name():
491425
# Arrange
492426
utaa = UnitTestWorkflowAPIAdapter()

tests/wapi_adapter.py

Lines changed: 6 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,12 @@ def get_running_steps(
112112
# Does nothing at the moment - this is used for the STOP logic.
113113
return {"count": 0, "steps": []}, 0
114114

115+
def get_status_of_all_step_instances_by_name(
116+
self, *, running_workflow_id: str, step_name: str
117+
) -> tuple[dict[str, Any], int]:
118+
# Need to implement!
119+
return {"count": 0, "status": []}, 0
120+
115121
def set_running_workflow_done(
116122
self,
117123
*,
@@ -249,44 +255,6 @@ def set_running_workflow_step_done(
249255
Pickler(pickle_file).dump(running_workflow_step)
250256
UnitTestWorkflowAPIAdapter.lock.release()
251257

252-
def get_workflow_steps_driving_this_step(
253-
self,
254-
*,
255-
running_workflow_step_id: str,
256-
) -> tuple[dict[str, Any], int]:
257-
# To accomplish this we get the running workflow for the step,
258-
# then the workflow, then the steps from that workflow.
259-
# We return a dictionary and an HTTP response code.
260-
UnitTestWorkflowAPIAdapter.lock.acquire()
261-
with open(_RUNNING_WORKFLOW_STEP_PICKLE_FILE, "rb") as pickle_file:
262-
running_workflow_step = Unpickler(pickle_file).load()
263-
UnitTestWorkflowAPIAdapter.lock.release()
264-
265-
assert running_workflow_step_id in running_workflow_step
266-
267-
running_workflow_id: str = running_workflow_step[running_workflow_step_id][
268-
"running_workflow"
269-
]["id"]
270-
rwf_response, _ = self.get_running_workflow(
271-
running_workflow_id=running_workflow_id
272-
)
273-
assert rwf_response
274-
workflow_id: str = rwf_response["workflow"]["id"]
275-
wf_response, _ = self.get_workflow(workflow_id=workflow_id)
276-
assert wf_response
277-
# Find the caller's python in the step sequence (-1 if not found)
278-
caller_step_index: int = -1
279-
index: int = 0
280-
for step in wf_response["steps"]:
281-
if step["name"] == running_workflow_step[running_workflow_step_id]["name"]:
282-
caller_step_index = index
283-
break
284-
index += 1
285-
return {
286-
"caller_step_index": caller_step_index,
287-
"steps": wf_response["steps"].copy(),
288-
}, 0
289-
290258
def get_instance(self, *, instance_id: str) -> tuple[dict[str, Any], int]:
291259
UnitTestWorkflowAPIAdapter.lock.acquire()
292260
with open(_INSTANCE_PICKLE_FILE, "rb") as pickle_file:

tests/workflow-definitions/simple-python-split-combine.yaml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,19 @@ steps:
3636
from-step:
3737
name: split
3838
variable: outputBase
39+
40+
#- name: combine
41+
# description: Add some params
42+
# specification:
43+
# collection: workflow-engine-unit-test-jobs
44+
# job: concatenate
45+
# version: "1.0.0"
46+
# variables:
47+
# outputFile: results.smi
48+
# plumbing:
49+
# - variable: inputFile
50+
# from-step:
51+
# variable: outputFile
52+
# name: parallel
3953
out:
4054
- outputFile

workflow/workflow_abc.py

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,12 @@ class LaunchParameters:
4242
# If only one instance of the step is expected to run
4343
# this value can be left at 0 (zero). If this step's launch
4444
# is expected to be executed more than once the value should be
45-
# non-zero (and unique for this workflow run).
45+
# 1..'N'.
4646
step_replication_number: int = 0
47+
# The total number of replicas of this instance that are expected to be laucnhed.
48+
# if step_replication_number is set, this has to be set. It is 'N'.
49+
# If step_replication_number is zero this value is ignored.
50+
total_number_of_replicas: int = 0
4751
# The application ID (a custom resource name)
4852
# used to identify the 'type' of Instance to create.
4953
# For DM Jobs this will be 'datamanagerjobs.squonk.it'
@@ -144,7 +148,7 @@ def get_running_workflow(
144148
# "running_user": "alan",
145149
# "running_user_api_token": "123456789",
146150
# "done": False,
147-
# "success": false,
151+
# "success": False,
148152
# "error_num": 0,
149153
# "error_msg": "",
150154
# "workflow": {
@@ -171,7 +175,29 @@ def get_running_steps(
171175
# "count": 1,
172176
# "steps": [
173177
# {
174-
# "name:": "step-1234"
178+
# "name": "step-1234"
179+
# }
180+
# ]
181+
# }
182+
183+
@abstractmethod
184+
def get_status_of_all_step_instances_by_name(
185+
self, *, running_workflow_id: str, step_name: str
186+
) -> tuple[dict[str, Any], int]:
187+
"""Get a list of step execution statuses for the named step."""
188+
# Should return:
189+
# {
190+
# "count": 2,
191+
# "status": [
192+
# {
193+
# "done": True,
194+
# "success": True,
195+
# "running_workflow_step_id": "step-0001"
196+
# },
197+
# {
198+
# "done": False,
199+
# "success": False,
200+
# "running_workflow_step_id": "step-0002"
175201
# }
176202
# ]
177203
# }
@@ -195,9 +221,9 @@ def get_running_workflow_step(
195221
"""Get a RunningWorkflowStep Record"""
196222
# Should return:
197223
# {
198-
# "name:": "step-1234",
224+
# "name": "step-1234",
199225
# "done": False,
200-
# "success": false,
226+
# "success": False,
201227
# "error_num": 0,
202228
# "error_msg": "",
203229
# "variables": {
@@ -234,9 +260,9 @@ def get_running_workflow_step_by_name(
234260
# Should return:
235261
# {
236262
# "id": "r-workflow-step-00000000-0000-0000-0000-000000000001",
237-
# "name:": "step-1234",
263+
# "name": "step-1234",
238264
# "done": False,
239-
# "success": false,
265+
# "success": False,
240266
# "error_num": 0,
241267
# "error_msg": "",
242268
# "variables": {
@@ -274,26 +300,6 @@ def set_running_workflow_step_done(
274300
"""Set the success value for a RunningWorkflowStep Record,
275301
If not successful an error code and message should be provided."""
276302

277-
@abstractmethod
278-
def get_workflow_steps_driving_this_step(
279-
self,
280-
*,
281-
running_workflow_step_id: str,
282-
) -> tuple[dict[str, Any], int]:
283-
"""Get all the step records that belong to the Workflow for the given
284-
RunningWorkflowStep record ID. You are also given the caller's position
285-
in the list, which will be -1 if the caller is not present."""
286-
# It should return:
287-
# {
288-
# "caller_step_index": 0,
289-
# "steps": [
290-
# {
291-
# "name": "step-name"
292-
# "specification": "{}",
293-
# }
294-
# ]
295-
# }
296-
297303
@abstractmethod
298304
def get_instance(self, *, instance_id: str) -> tuple[dict[str, Any], int]:
299305
"""Get an Instance Record"""

0 commit comments

Comments
 (0)