Skip to content

Commit 17da698

Browse files
Merge pull request #38 from InformaticsMatters/concat
Initial combiner mechanism
2 parents 4b1e868 + cdddc35 commit 17da698

22 files changed

+576
-520
lines changed

tests/instance_launcher.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,20 @@ def launch(self, *, launch_parameters: LaunchParameters) -> LaunchResult:
7676

7777
os.makedirs(EXECUTION_DIRECTORY, exist_ok=True)
7878

79+
if launch_parameters.step_replication_number:
80+
assert (
81+
launch_parameters.step_replication_number
82+
<= launch_parameters.total_number_of_replicas
83+
)
84+
7985
# Create a running workflow step
8086
assert launch_parameters.running_workflow_id
8187
assert launch_parameters.step_name
8288
response, _ = self._api_adapter.create_running_workflow_step(
8389
running_workflow_id=launch_parameters.running_workflow_id,
8490
step=launch_parameters.step_name,
8591
replica=launch_parameters.step_replication_number,
92+
replicas=launch_parameters.total_number_of_replicas,
8693
)
8794
assert "id" in response
8895
rwfs_id: str = response["id"]

tests/job-definitions/job-definitions.yaml

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,34 @@ jobs:
131131
132132
concatenate:
133133
command: >-
134-
concatenate.py {% for ifile in inputFile %}{{ ifile }} {% endfor %} --outputFile {{ outputFile }}
134+
concatenate.py --inputFile {{ inputFile }} --outputFile {{ outputFile }}
135+
# Simulate a multiple input files Job (combiner)...
136+
variables:
137+
inputs:
138+
properties:
139+
inputFile:
140+
type: files
141+
options:
142+
type: object
143+
properties:
144+
inputDirPrefix:
145+
title: Optional inoput directory prefix
146+
type: string
147+
outputs:
148+
properties:
149+
outputBase:
150+
creates: '{{ outputFile }}'
151+
type: file
135152

136153
splitsmiles:
137154
command: >-
138155
copyf.py {{ inputFile }}
139-
# Simulate multiple output files...
156+
# Simulate a multiple output files Job (splitetr)...
140157
variables:
158+
inputs:
159+
properties:
160+
inputFile:
161+
type: file
141162
outputs:
142163
properties:
143164
outputBase:

tests/jobs/concatenate.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@
22

33
parser = argparse.ArgumentParser(
44
prog="addcol",
5-
description="Takes a list of files and writes them into single outputfile",
5+
description="Takes an optional directory prefix and a file,"
6+
" and combines all the input files that are found"
7+
" into single outputfile",
68
)
7-
parser.add_argument("inputFile", nargs="+", type=argparse.FileType("r"))
9+
parser.add_argument("--inputDirPrefix")
10+
parser.add_argument("--inputFile", required=True)
811
parser.add_argument("-o", "--outputFile", required=True)
912
args = parser.parse_args()
1013

1114

1215
with open(args.outputFile, "wt", encoding="utf8") as ofile:
13-
for f in args.inputFile:
14-
ofile.write(f.read())
16+
with open(args.inputFile, "rt", encoding="utf8") as ifile:
17+
ofile.write(ifile.read())

tests/test_decoder.py

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -184,41 +184,3 @@ def test_get_workflow_steps():
184184
assert len(steps) == 2
185185
assert steps[0]["name"] == "step1"
186186
assert steps[1]["name"] == "step2"
187-
188-
189-
def test_get_step_input_variable_names_when_duplicates():
190-
# Arrange
191-
workflow_filename: str = os.path.join(
192-
os.path.dirname(__file__),
193-
"workflow-definitions",
194-
"duplicate-step-input-output-variable-names.yaml",
195-
)
196-
with open(workflow_filename, "r", encoding="utf8") as wf_file:
197-
definition: Dict[str, Any] = yaml.safe_load(wf_file)
198-
199-
# Act
200-
inputs = decoder.get_step_input_variable_names(definition, "step-1")
201-
202-
# Assert
203-
assert len(inputs) == 2
204-
assert inputs[0] == "inputFile"
205-
assert inputs[1] == "inputFile"
206-
207-
208-
def test_get_step_output_variable_names_when_duplicates():
209-
# Arrange
210-
workflow_filename: str = os.path.join(
211-
os.path.dirname(__file__),
212-
"workflow-definitions",
213-
"duplicate-step-input-output-variable-names.yaml",
214-
)
215-
with open(workflow_filename, "r", encoding="utf8") as wf_file:
216-
definition: Dict[str, Any] = yaml.safe_load(wf_file)
217-
218-
# Act
219-
outputs = decoder.get_step_output_variable_names(definition, "step-2")
220-
221-
# Assert
222-
assert len(outputs) == 2
223-
assert outputs[0] == "outputFile"
224-
assert outputs[1] == "outputFile"

tests/test_test_wapi_adapter.py

Lines changed: 0 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -421,72 +421,6 @@ def test_create_instance_and_get_step_instance_directory_by_name():
421421
assert response["instance_directory"] == f".{i_id}"
422422

423423

424-
def test_get_workflow_steps_driving_this_step_when_1st_step():
425-
# Arrange
426-
utaa = UnitTestWorkflowAPIAdapter()
427-
response = utaa.create_workflow(
428-
workflow_definition={
429-
"name": "blah",
430-
"steps": [{"name": "step-1"}, {"name": "step-2"}, {"name": "step-3"}],
431-
}
432-
)
433-
response = utaa.create_running_workflow(
434-
user_id="dlister",
435-
workflow_id=response["id"],
436-
project_id=TEST_PROJECT_ID,
437-
variables={},
438-
)
439-
response, _ = utaa.create_running_workflow_step(
440-
running_workflow_id=response["id"], step="step-1"
441-
)
442-
rwfs_id = response["id"]
443-
444-
# Act
445-
response, _ = utaa.get_workflow_steps_driving_this_step(
446-
running_workflow_step_id=rwfs_id
447-
)
448-
449-
# Assert
450-
assert response["caller_step_index"] == 0
451-
assert len(response["steps"]) == 3
452-
assert response["steps"][0]["name"] == "step-1"
453-
assert response["steps"][1]["name"] == "step-2"
454-
assert response["steps"][2]["name"] == "step-3"
455-
456-
457-
def test_get_workflow_steps_driving_this_step_when_2nd_step():
458-
# Arrange
459-
utaa = UnitTestWorkflowAPIAdapter()
460-
response = utaa.create_workflow(
461-
workflow_definition={
462-
"name": "blah",
463-
"steps": [{"name": "step-1"}, {"name": "step-2"}, {"name": "step-3"}],
464-
}
465-
)
466-
response = utaa.create_running_workflow(
467-
user_id="dlister",
468-
workflow_id=response["id"],
469-
project_id=TEST_PROJECT_ID,
470-
variables={},
471-
)
472-
response, _ = utaa.create_running_workflow_step(
473-
running_workflow_id=response["id"], step="step-2"
474-
)
475-
rwfs_id = response["id"]
476-
477-
# Act
478-
response, _ = utaa.get_workflow_steps_driving_this_step(
479-
running_workflow_step_id=rwfs_id
480-
)
481-
482-
# Assert
483-
assert response["caller_step_index"] == 1
484-
assert len(response["steps"]) == 3
485-
assert response["steps"][0]["name"] == "step-1"
486-
assert response["steps"][1]["name"] == "step-2"
487-
assert response["steps"][2]["name"] == "step-3"
488-
489-
490424
def test_get_running_workflow_step_by_name():
491425
# Arrange
492426
utaa = UnitTestWorkflowAPIAdapter()

tests/test_workflow_engine_examples.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -398,12 +398,12 @@ def test_workflow_engine_simple_python_molprops_with_options(basic_engine):
398398
assert project_file_exists(output_file_2)
399399

400400

401-
def test_workflow_engine_simple_python_fanout(basic_engine):
401+
def test_workflow_engine_simple_python_split_combine(basic_engine):
402402
# Arrange
403403
md, da = basic_engine
404404

405405
da.mock_get_running_workflow_step_output_values_for_output(
406-
step_name="first-step",
406+
step_name="split",
407407
output_variable="outputBase",
408408
output=["chunk_1.smi", "chunk_2.smi"],
409409
)
@@ -427,8 +427,8 @@ def test_workflow_engine_simple_python_fanout(basic_engine):
427427
r_wfid = start_workflow(
428428
md,
429429
da,
430-
"simple-python-fanout",
431-
{"candidateMolecules": input_file_1},
430+
"simple-python-split-combine",
431+
{"candidateMolecules": input_file_1, "combination": "combination.smi"},
432432
)
433433

434434
# Assert
@@ -439,10 +439,8 @@ def test_workflow_engine_simple_python_fanout(basic_engine):
439439
print("response")
440440
pprint(response)
441441

442-
assert response["count"] == 3
443-
assert response["running_workflow_steps"][0]["done"]
444-
assert response["running_workflow_steps"][0]["success"]
445-
assert response["running_workflow_steps"][1]["done"]
446-
assert response["running_workflow_steps"][1]["success"]
447-
assert response["running_workflow_steps"][2]["done"]
448-
assert response["running_workflow_steps"][2]["success"]
442+
assert response["count"] == 4
443+
rwf_steps = response["running_workflow_steps"]
444+
for rwf_step in rwf_steps:
445+
assert rwf_step["done"]
446+
assert rwf_step["success"]

tests/test_workflow_validator_for_run_level.py

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -215,27 +215,3 @@ def test_validate_simple_python_molprops_with_missing_input():
215215
assert error.error_msg == [
216216
"Missing workflow variable values for: candidateMolecules"
217217
]
218-
219-
220-
def test_validate_duplicate_step_output_variable_names():
221-
# Arrange
222-
workflow_filename: str = os.path.join(
223-
os.path.dirname(__file__),
224-
"workflow-definitions",
225-
"duplicate-step-input-output-variable-names.yaml",
226-
)
227-
with open(workflow_filename, "r", encoding="utf8") as workflow_file:
228-
workflow: dict[str, Any] = yaml.load(workflow_file, Loader=yaml.FullLoader)
229-
assert workflow
230-
231-
# Act
232-
error = WorkflowValidator.validate(
233-
level=ValidationLevel.TAG,
234-
workflow_definition=workflow,
235-
)
236-
237-
# Assert
238-
assert error.error_num == 3
239-
assert error.error_msg == [
240-
"Duplicate step output variable: outputFile (step=step-2)"
241-
]

tests/test_workflow_validator_for_tag_level.py

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -149,27 +149,3 @@ def test_validate_simple_python_molprops_with_options():
149149
# Assert
150150
assert error.error_num == 0
151151
assert error.error_msg is None
152-
153-
154-
def test_validate_duplicate_step_output_variable_names():
155-
# Arrange
156-
workflow_filename: str = os.path.join(
157-
os.path.dirname(__file__),
158-
"workflow-definitions",
159-
"duplicate-step-input-output-variable-names.yaml",
160-
)
161-
with open(workflow_filename, "r", encoding="utf8") as workflow_file:
162-
workflow: dict[str, Any] = yaml.load(workflow_file, Loader=yaml.FullLoader)
163-
assert workflow
164-
165-
# Act
166-
error = WorkflowValidator.validate(
167-
level=ValidationLevel.TAG,
168-
workflow_definition=workflow,
169-
)
170-
171-
# Assert
172-
assert error.error_num == 3
173-
assert error.error_msg == [
174-
"Duplicate step output variable: outputFile (step=step-2)"
175-
]

tests/wapi_adapter.py

Lines changed: 24 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,26 @@ def get_running_steps(
112112
# Does nothing at the moment - this is used for the STOP logic.
113113
return {"count": 0, "steps": []}, 0
114114

115+
def get_status_of_all_step_instances_by_name(
116+
self, *, running_workflow_id: str, name: str
117+
) -> tuple[dict[str, Any], int]:
118+
UnitTestWorkflowAPIAdapter.lock.acquire()
119+
with open(_RUNNING_WORKFLOW_STEP_PICKLE_FILE, "rb") as pickle_file:
120+
running_workflow_step = Unpickler(pickle_file).load()
121+
UnitTestWorkflowAPIAdapter.lock.release()
122+
123+
steps: list[dict[str, Any]] = []
124+
for rwfs_id, record in running_workflow_step.items():
125+
if record["running_workflow"]["id"] != running_workflow_id:
126+
continue
127+
if record["name"] == name:
128+
response = record
129+
response["id"] = rwfs_id
130+
if record["replica"] == 0:
131+
_ = response.pop("replica")
132+
steps.append(response)
133+
return {"count": len(steps), "status": steps}, 0
134+
115135
def set_running_workflow_done(
116136
self,
117137
*,
@@ -140,10 +160,11 @@ def create_running_workflow_step(
140160
running_workflow_id: str,
141161
step: str,
142162
replica: int = 0,
163+
replicas: int = 1,
143164
prior_running_workflow_step_id: str | None = None,
144165
) -> tuple[dict[str, Any], int]:
145-
if replica:
146-
assert replica > 0
166+
assert replica >= 0
167+
assert replicas > replica
147168

148169
UnitTestWorkflowAPIAdapter.lock.acquire()
149170
with open(_RUNNING_WORKFLOW_STEP_PICKLE_FILE, "rb") as pickle_file:
@@ -158,6 +179,7 @@ def create_running_workflow_step(
158179
"done": False,
159180
"success": False,
160181
"replica": replica,
182+
"replicas": replicas,
161183
"variables": {},
162184
"running_workflow": {"id": running_workflow_id},
163185
}
@@ -249,44 +271,6 @@ def set_running_workflow_step_done(
249271
Pickler(pickle_file).dump(running_workflow_step)
250272
UnitTestWorkflowAPIAdapter.lock.release()
251273

252-
def get_workflow_steps_driving_this_step(
253-
self,
254-
*,
255-
running_workflow_step_id: str,
256-
) -> tuple[dict[str, Any], int]:
257-
# To accomplish this we get the running workflow for the step,
258-
# then the workflow, then the steps from that workflow.
259-
# We return a dictionary and an HTTP response code.
260-
UnitTestWorkflowAPIAdapter.lock.acquire()
261-
with open(_RUNNING_WORKFLOW_STEP_PICKLE_FILE, "rb") as pickle_file:
262-
running_workflow_step = Unpickler(pickle_file).load()
263-
UnitTestWorkflowAPIAdapter.lock.release()
264-
265-
assert running_workflow_step_id in running_workflow_step
266-
267-
running_workflow_id: str = running_workflow_step[running_workflow_step_id][
268-
"running_workflow"
269-
]["id"]
270-
rwf_response, _ = self.get_running_workflow(
271-
running_workflow_id=running_workflow_id
272-
)
273-
assert rwf_response
274-
workflow_id: str = rwf_response["workflow"]["id"]
275-
wf_response, _ = self.get_workflow(workflow_id=workflow_id)
276-
assert wf_response
277-
# Find the caller's python in the step sequence (-1 if not found)
278-
caller_step_index: int = -1
279-
index: int = 0
280-
for step in wf_response["steps"]:
281-
if step["name"] == running_workflow_step[running_workflow_step_id]["name"]:
282-
caller_step_index = index
283-
break
284-
index += 1
285-
return {
286-
"caller_step_index": caller_step_index,
287-
"steps": wf_response["steps"].copy(),
288-
}, 0
289-
290274
def get_instance(self, *, instance_id: str) -> tuple[dict[str, Any], int]:
291275
UnitTestWorkflowAPIAdapter.lock.acquire()
292276
with open(_INSTANCE_PICKLE_FILE, "rb") as pickle_file:

0 commit comments

Comments
 (0)