Skip to content

Commit bea2cdb

Browse files
author
Alan Christie
committed
fix: First very basic combiner run
1 parent f336119 commit bea2cdb

File tree

3 files changed

+98
-70
lines changed

3 files changed

+98
-70
lines changed

tests/test_workflow_engine_examples.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -439,10 +439,8 @@ def test_workflow_engine_simple_python_split_combine(basic_engine):
439439
print("response")
440440
pprint(response)
441441

442-
assert response["count"] == 3
443-
assert response["running_workflow_steps"][0]["done"]
444-
assert response["running_workflow_steps"][0]["success"]
445-
assert response["running_workflow_steps"][1]["done"]
446-
assert response["running_workflow_steps"][1]["success"]
447-
assert response["running_workflow_steps"][2]["done"]
448-
assert response["running_workflow_steps"][2]["success"]
442+
assert response["count"] == 4
443+
rwf_steps = response["running_workflow_steps"]
444+
for rwf_step in rwf_steps:
445+
assert rwf_step["done"]
446+
assert rwf_step["success"]

tests/workflow-definitions/simple-python-split-combine.yaml

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,21 +37,21 @@ steps:
3737
name: split
3838
variable: outputBase
3939

40-
#- name: combine
41-
# description: Combine the parallel files
42-
# specification:
43-
# collection: workflow-engine-unit-test-jobs
44-
# job: concatenate
45-
# version: "1.0.0"
46-
# variables:
47-
# outputFile: results.smi
48-
# plumbing:
49-
# - variable: inputFile
50-
# from-step:
51-
# name: parallel
52-
# variable: outputFile
53-
# - variable: outputFile
54-
# from-workflow:
55-
# variable: combination
56-
# - variable: outputFile
57-
# to-project:
40+
- name: combine
41+
description: Combine the parallel files
42+
specification:
43+
collection: workflow-engine-unit-test-jobs
44+
job: concatenate
45+
version: "1.0.0"
46+
variables:
47+
outputFile: results.smi
48+
plumbing:
49+
- variable: inputFile
50+
from-step:
51+
name: parallel
52+
variable: outputFile
53+
- variable: outputFile
54+
from-workflow:
55+
variable: combination
56+
- variable: outputFile
57+
to-project:

workflow/workflow_engine.py

Lines changed: 75 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -384,24 +384,28 @@ def _prepare_step(
384384
our_plumbing: dict[str, list[Connector]] = get_step_prior_step_plumbing(
385385
step_definition=step_definition
386386
)
387+
step_is_combiner: bool = False
387388
step_name_being_combined: str | None = None
389+
combiner_input_variable: str | None = None
390+
num_step_recplicas_being_combined: int = 0
388391
for p_step_name, connections in our_plumbing.items():
389392
for connector in connections:
390393
if our_inputs.get(connector.out, {}).get("type") == "files":
391394
step_name_being_combined = p_step_name
395+
combiner_input_variable = connector.out
396+
step_is_combiner = True
392397
break
393398
if step_name_being_combined:
394399
break
395400
if step_name_being_combined:
396-
print("*** COMBINER")
397401
response, _ = self._wapi_adapter.get_status_of_all_step_instances_by_name(
398402
name=step_name_being_combined,
399403
running_workflow_id=rwf_id,
400404
)
401405
# Assume succes...
402406
assert "count" in response
403-
num_being_combined: int = response["count"]
404-
assert num_being_combined > 0
407+
num_step_recplicas_being_combined = response["count"]
408+
assert num_step_recplicas_being_combined > 0
405409
assert "status" in response
406410

407411
all_step_instances_done: bool = True
@@ -436,10 +440,8 @@ def _prepare_step(
436440
error_msg=f"Prior instance of step '{step_name_being_combined}' has failed",
437441
)
438442

439-
if step_name_being_combined:
440-
print("*** COMBINER : Able to start")
441-
442-
# Now compile a set of variables for this step.
443+
# I think we can start this step,
444+
# so compile a set of variables for it.
443445

444446
# Start with any variables provided in the step's specification.
445447
# A map that we will add to (and maybe even over-write)...
@@ -472,15 +474,39 @@ def _prepare_step(
472474
step_definition=step_definition
473475
)
474476
for prior_step_name, connections in prior_step_plumbing.items():
475-
# Retrieve the prior "running" step
476-
# in order to get the variables that were set there...
477-
prior_step, _ = self._wapi_adapter.get_running_workflow_step_by_name(
478-
name=prior_step_name, running_workflow_id=rwf_id
479-
)
480-
# Copy "in" value to "out"...
481-
for connector in connections:
482-
assert connector.in_ in prior_step["variables"]
483-
variables[connector.out] = prior_step["variables"][connector.in_]
477+
if step_is_combiner and prior_step_name == step_name_being_combined:
478+
assert combiner_input_variable
479+
input_source_list: list[str] = []
480+
for replica in range(1, num_step_recplicas_being_combined + 1):
481+
prior_step, _ = (
482+
self._wapi_adapter.get_running_workflow_step_by_name(
483+
name=prior_step_name,
484+
replica=replica,
485+
running_workflow_id=rwf_id,
486+
)
487+
)
488+
# Copy "in" value to "out"...
489+
for connector in connections:
490+
assert connector.in_ in prior_step["variables"]
491+
if connector.out == combiner_input_variable:
492+
input_source_list.append(
493+
prior_step["variables"][connector.in_]
494+
)
495+
else:
496+
variables[connector.out] = prior_step["variables"][
497+
connector.in_
498+
]
499+
variables[combiner_input_variable] = input_source_list
500+
else:
501+
# Retrieve the prior "running" step
502+
# in order to get the variables that were set there...
503+
prior_step, _ = self._wapi_adapter.get_running_workflow_step_by_name(
504+
name=prior_step_name, running_workflow_id=rwf_id
505+
)
506+
# Copy "in" value to "out"...
507+
for connector in connections:
508+
assert connector.in_ in prior_step["variables"]
509+
variables[connector.out] = prior_step["variables"][connector.in_]
484510

485511
# Now ... can the command be compiled!?
486512
job: dict[str, Any] = self._get_step_job(step=step_definition)
@@ -494,7 +520,8 @@ def _prepare_step(
494520
return StepPreparationResponse(iterations=0)
495521

496522
# Do we replicate this step (run it more than once)?
497-
# We do if a variable in this step's plumbing
523+
#
524+
# We do if this is not a combiner step and a variable in this step's plumbing
498525
# refers to an output of a prior step whose type is 'files'.
499526
# If the prior step is a 'splitter' we populate the 'replication_values' array
500527
# with the list of files the prior step genrated for its output.
@@ -503,36 +530,39 @@ def _prepare_step(
503530
# be more than one prior step variable that is 'files'!
504531
iter_values: list[str] = []
505532
iter_variable: str | None = None
506-
for p_step_name, connections in our_plumbing.items():
507-
# We need to get the Job definition for each step
508-
# and then check whether the (output) variable is of type 'files'...
509-
wf_step: dict[str, Any] = get_step(wf, p_step_name)
510-
assert wf_step
511-
job_definition: dict[str, Any] = self._get_step_job(step=wf_step)
512-
jd_outputs: dict[str, Any] = job_defintion_decoder.get_outputs(
513-
job_definition
514-
)
515-
for connector in connections:
516-
if jd_outputs.get(connector.in_, {}).get("type") == "files":
517-
iter_variable = connector.out
518-
# Get the prior running step's output values
519-
response, _ = self._wapi_adapter.get_running_workflow_step_by_name(
520-
name=p_step_name,
521-
running_workflow_id=rwf_id,
522-
)
523-
rwfs_id = response["id"]
524-
assert rwfs_id
525-
result, _ = (
526-
self._wapi_adapter.get_running_workflow_step_output_values_for_output(
527-
running_workflow_step_id=rwfs_id,
528-
output_variable=connector.in_,
533+
if not step_is_combiner:
534+
for p_step_name, connections in our_plumbing.items():
535+
# We need to get the Job definition for each step
536+
# and then check whether the (output) variable is of type 'files'...
537+
wf_step: dict[str, Any] = get_step(wf, p_step_name)
538+
assert wf_step
539+
job_definition: dict[str, Any] = self._get_step_job(step=wf_step)
540+
jd_outputs: dict[str, Any] = job_defintion_decoder.get_outputs(
541+
job_definition
542+
)
543+
for connector in connections:
544+
if jd_outputs.get(connector.in_, {}).get("type") == "files":
545+
iter_variable = connector.out
546+
# Get the prior running step's output values
547+
response, _ = (
548+
self._wapi_adapter.get_running_workflow_step_by_name(
549+
name=p_step_name,
550+
running_workflow_id=rwf_id,
551+
)
529552
)
530-
)
531-
iter_values = result["output"].copy()
553+
rwfs_id = response["id"]
554+
assert rwfs_id
555+
result, _ = (
556+
self._wapi_adapter.get_running_workflow_step_output_values_for_output(
557+
running_workflow_step_id=rwfs_id,
558+
output_variable=connector.in_,
559+
)
560+
)
561+
iter_values = result["output"].copy()
562+
break
563+
# Stop if we've got an iteration variable
564+
if iter_variable:
532565
break
533-
# Stop if we've got an iteration variable
534-
if iter_variable:
535-
break
536566

537567
num_step_instances: int = max(1, len(iter_values))
538568
return StepPreparationResponse(

0 commit comments

Comments
 (0)