Skip to content

Commit a5113f3

Browse files
author
Alan Christie
committed
fix: Fix iteration input path
1 parent d67ce50 commit a5113f3

File tree

1 file changed

+13
-3
lines changed

1 file changed

+13
-3
lines changed

workflow/workflow_engine.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ class StepPreparationResponse:
118118

119119
replicas: int
120120
replica_variable: str | None = None
121+
replica_instance_id: str | None = None
121122
variables: dict[str, Any] = field(default_factory=dict)
122123
replica_values: list[str] = field(default_factory=list)
123124
dependent_instances: set[str] = field(default_factory=set)
@@ -520,7 +521,7 @@ def _prepare_step(
520521
# and undo our assumption if not...
521522
all_step_instances_done: bool = True
522523

523-
# If anything' still running we must leave.
524+
# If anything is still running we must leave.
524525
# If anything's failed we must 'fail' the running workflow.
525526
all_step_instances_successful: bool = True
526527
for status in response["status"]:
@@ -703,6 +704,7 @@ def _prepare_step(
703704
# 3 times, with each being given a different file as its input.
704705
iter_values: list[str] = []
705706
iter_variable: str | None = None
707+
iter_instance_id: str | None = None
706708
if not we_are_a_combiner:
707709
for p_step_name, connections in plumbing_of_prior_steps.items():
708710
# We need to get the Job definition for each step
@@ -731,6 +733,8 @@ def _prepare_step(
731733
)
732734
rwfs_id = response["id"]
733735
assert rwfs_id
736+
iter_instance_id = response["instance_id"]
737+
assert iter_instance_id
734738
result, _ = (
735739
self._wapi_adapter.get_running_workflow_step_output_values_for_output(
736740
running_workflow_step_id=rwfs_id,
@@ -778,6 +782,7 @@ def _prepare_step(
778782
replicas=num_step_instances,
779783
replica_variable=iter_variable,
780784
replica_values=iter_values,
785+
replica_instance_id=iter_instance_id,
781786
dependent_instances=dependent_instances,
782787
outputs=outputs,
783788
inputs=inputs,
@@ -823,15 +828,20 @@ def _launch(
823828
assert step_preparation_response.replica_values
824829
iter_value: str = step_preparation_response.replica_values[replica]
825830
_LOGGER.info(
826-
"Replicating step: %s replica=%s variable=%s value=%s",
831+
"Replicating step: %s replica=%s variable=%s value=%s origin=%s",
827832
step_name,
828833
replica,
829834
step_preparation_response.replica_variable,
830835
iter_value,
836+
step_preparation_response.replica_instance_id,
831837
)
832838
# Over-write the replicating variable
833839
# and set the replication number to a unique +ve non-zero value...
834-
variables[step_preparation_response.replica_variable] = iter_value
840+
variables[step_preparation_response.replica_variable] = (
841+
f"{self._instance_id_dir_prefix}"
842+
f"{step_preparation_response.replica_instance_id}"
843+
f"/{iter_value}"
844+
)
835845

836846
_LOGGER.info(
837847
"Launching step: %s RunningWorkflow=%s (name=%s)"

0 commit comments

Comments
 (0)