Skip to content

Commit 646128f

Browse files
author
Alan Christie
committed
feat: Populate dependent_instances
1 parent 02ad4ea commit 646128f

File tree

1 file changed

+24
-6
lines changed

1 file changed

+24
-6
lines changed

workflow/workflow_engine.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,18 @@ class StepPreparationResponse:
5959
can be launched - it's value indicates how many times. If a step can be launched
6060
'variables' will not be None. If a parallel set of steps can take place
6161
(even just one) 'replica_variable' will be set and 'replica_values'
62-
will be a list containing a value for each step instance. If preparation fails
63-
'erro_num' wil be set, and 'error_msg' should contain something useful."""
62+
will be a list containing a value for each step instance. If the step
63+
depends on a prior step the instance UUIDs of the steps will be listed
64+
in the 'dependent_instances' string list.
65+
66+
If preparation fails 'error_num' wil be set, and 'error_msg'
67+
should contain something useful."""
6468

6569
replicas: int
6670
variables: dict[str, Any] | None = None
6771
replica_variable: str | None = None
6872
replica_values: list[str] | None = None
73+
dependent_instances: set[str] | None = None
6974
error_num: int = 0
7075
error_msg: str | None = None
7176

@@ -402,14 +407,14 @@ def _prepare_step(
402407
our_inputs: dict[str, Any] = job_defintion_decoder.get_inputs(
403408
our_job_definition
404409
)
405-
our_plumbing: dict[str, list[Connector]] = get_step_prior_step_connections(
406-
step_definition=step_definition
410+
plumbing_of_prior_steps: dict[str, list[Connector]] = (
411+
get_step_prior_step_connections(step_definition=step_definition)
407412
)
408413
step_is_combiner: bool = False
409414
step_name_being_combined: str | None = None
410415
combiner_input_variable: str | None = None
411416
num_step_recplicas_being_combined: int = 0
412-
for p_step_name, connections in our_plumbing.items():
417+
for p_step_name, connections in plumbing_of_prior_steps.items():
413418
for connector in connections:
414419
if our_inputs.get(connector.out, {}).get("type") == "files":
415420
step_name_being_combined = p_step_name
@@ -549,7 +554,7 @@ def _prepare_step(
549554
iter_values: list[str] = []
550555
iter_variable: str | None = None
551556
if not step_is_combiner:
552-
for p_step_name, connections in our_plumbing.items():
557+
for p_step_name, connections in plumbing_of_prior_steps.items():
553558
# We need to get the Job definition for each step
554559
# and then check whether the (output) variable is of type 'files'...
555560
wf_step: dict[str, Any] = get_step(wf, p_step_name)
@@ -582,12 +587,24 @@ def _prepare_step(
582587
if iter_variable:
583588
break
584589

590+
# Get the list of instances we depend upon.
591+
dependent_instances: set[str] = set()
592+
for p_step_name in plumbing_of_prior_steps:
593+
# Assume any step can have multiple instances
594+
response, _ = self._wapi_adapter.get_status_of_all_step_instances_by_name(
595+
name=p_step_name,
596+
running_workflow_id=rwf_id,
597+
)
598+
for step in response["steps"]:
599+
dependent_instances.add(step["instance_id"])
600+
585601
num_step_instances: int = max(1, len(iter_values))
586602
return StepPreparationResponse(
587603
variables=variables,
588604
replicas=num_step_instances,
589605
replica_variable=iter_variable,
590606
replica_values=iter_values,
607+
dependent_instances=dependent_instances,
591608
)
592609

593610
def _launch(
@@ -652,6 +669,7 @@ def _launch(
652669
step_name=step_name,
653670
step_replication_number=replica,
654671
total_number_of_replicas=total_replicas,
672+
dependent_instances=step_preparation_response.dependent_instances,
655673
)
656674
lr: LaunchResult = self._instance_launcher.launch(launch_parameters=lp)
657675
rwfs_id = lr.running_workflow_step_id

0 commit comments

Comments
 (0)