Skip to content

Commit 8dd9308

Browse files
author
Alan Christie
committed
fix: replica always starts at 0
1 parent 94fd202 commit 8dd9308

File tree

3 files changed

+41
-39
lines changed

3 files changed

+41
-39
lines changed

tests/wapi_adapter.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,11 +160,11 @@ def create_running_workflow_step(
160160
running_workflow_id: str,
161161
step: str,
162162
replica: int = 0,
163-
replicas: int = 0,
163+
replicas: int = 1,
164164
prior_running_workflow_step_id: str | None = None,
165165
) -> tuple[dict[str, Any], int]:
166-
if replica:
167-
assert replica > 0
166+
assert replica >= 0
167+
assert replicas > replica
168168

169169
UnitTestWorkflowAPIAdapter.lock.acquire()
170170
with open(_RUNNING_WORKFLOW_STEP_PICKLE_FILE, "rb") as pickle_file:

workflow/workflow_abc.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ class LaunchParameters:
4545
# 1..'N'.
4646
step_replication_number: int = 0
4747
# The total number of replicas of this instance that are expected to be laucnhed.
48-
# if step_replication_number is set, this has to be set. It is 'N'.
49-
# If step_replication_number is zero this value is ignored.
50-
total_number_of_replicas: int = 0
48+
# This cannot be less than 1 and must be grater than any value of
49+
# 'step_replication_number' that will be used fo rthe same step.
50+
total_number_of_replicas: int = 1
5151
# The application ID (a custom resource name)
5252
# used to identify the 'type' of Instance to create.
5353
# For DM Jobs this will be 'datamanagerjobs.squonk.it'

workflow/workflow_engine.py

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -54,17 +54,17 @@
5454

5555
@dataclass
5656
class StepPreparationResponse:
57-
"""Step preparation response object. Iterations is +ve (non-zero) if a step
57+
"""Step preparation response object. 'replicas' is +ve (non-zero) if a step
5858
can be launched - it's value indicates how many times. If a step can be launched
5959
'variables' will not be None. If a parallel set of steps can take place
60-
(even just one) 'iteration_variable' will be set and 'iteration_values'
61-
will be a list containing a value for each step. If prparation failed
62-
'error_msg' chould contain something useful."""
60+
(even just one) 'replica_variable' will be set and 'replica_values'
61+
will be a list containing a value for each step instance. If preparation fails
62+
'erro_num' wil be set, and 'error_msg' should contain something useful."""
6363

64-
iterations: int
64+
replicas: int
6565
variables: dict[str, Any] | None = None
66-
iteration_variable: str | None = None
67-
iteration_values: list[str] | None = None
66+
replica_variable: str | None = None
67+
replica_values: list[str] | None = None
6868
error_num: int = 0
6969
error_msg: str | None = None
7070

@@ -307,9 +307,10 @@ def _handle_pod_message(self, msg: PodMessage) -> None:
307307
sp_resp = self._prepare_step(
308308
wf=wf_response, step_definition=next_step, rwf=rwf_response
309309
)
310-
if sp_resp.iterations == 0:
310+
if sp_resp.replicas == 0:
311311
# Cannot prepare variables for this step,
312-
# it might be a combiner step and some prior steps may still
312+
# it might be a step dependent on more than one prior step
313+
# (like a 'combiner') and some prior steps may still
313314
# be running ... or something's gone wrong.
314315
if sp_resp.error_num:
315316
self._wapi_adapter.set_running_workflow_done(
@@ -443,7 +444,7 @@ def _prepare_step(
443444
step_name,
444445
step_name_being_combined,
445446
)
446-
return StepPreparationResponse(iterations=0)
447+
return StepPreparationResponse(replicas=0)
447448
elif not all_step_instances_successful:
448449
# Can't move on - all prior steps are done,
449450
# but at least one was not successful.
@@ -454,7 +455,7 @@ def _prepare_step(
454455
step_name_being_combined,
455456
)
456457
return StepPreparationResponse(
457-
iterations=0,
458+
replicas=0,
458459
error_num=1,
459460
error_msg=f"Prior instance of step '{step_name_being_combined}' has failed",
460461
)
@@ -482,20 +483,20 @@ def _prepare_step(
482483
assert connector.in_ in rwf_variables
483484
variables[connector.out] = rwf_variables[connector.in_]
484485

485-
# Now process variables (from the "plumbing" block)
486+
# Now process variables (in the "plumbing" block)
486487
# that relate to values used in prior steps.
487488
#
488489
# The decoder gives us a map indexed by prior step name that's a list of
489490
# "in" "out" connectors as above. If this is a combiner step remember
490-
# that we need to inspect variables from all the prior steps.
491+
# that the combiner_input_variable is a used as a list.
491492
prior_step_plumbing: dict[str, list[Connector]] = get_step_prior_step_plumbing(
492493
step_definition=step_definition
493494
)
494495
for prior_step_name, connections in prior_step_plumbing.items():
495496
if step_is_combiner and prior_step_name == step_name_being_combined:
496497
assert combiner_input_variable
497498
input_source_list: list[str] = []
498-
for replica in range(1, num_step_recplicas_being_combined + 1):
499+
for replica in range(num_step_recplicas_being_combined):
499500
prior_step, _ = (
500501
self._wapi_adapter.get_running_workflow_step_by_name(
501502
name=prior_step_name,
@@ -538,7 +539,7 @@ def _prepare_step(
538539
if not success:
539540
msg = f"Failed command validation for step {step_name} error_msg={message}"
540541
_LOGGER.warning(msg)
541-
return StepPreparationResponse(iterations=0, error_num=2, error_msg=msg)
542+
return StepPreparationResponse(replicas=0, error_num=2, error_msg=msg)
542543

543544
# Do we replicate this step (run it more than once)?
544545
#
@@ -588,9 +589,9 @@ def _prepare_step(
588589
num_step_instances: int = max(1, len(iter_values))
589590
return StepPreparationResponse(
590591
variables=variables,
591-
iterations=num_step_instances,
592-
iteration_variable=iter_variable,
593-
iteration_values=iter_values,
592+
replicas=num_step_instances,
593+
replica_variable=iter_variable,
594+
replica_values=iter_values,
594595
)
595596

596597
def _launch(
@@ -607,30 +608,31 @@ def _launch(
607608
rwf_id: str = rwf["id"]
608609
project_id = rwf["project"]["id"]
609610

610-
# A step replication number,
611-
# used only for steps expected to run in parallel (even if just once)
612-
step_replication_number: int = 0
613-
total_replicas: int = step_preparation_response.iterations
611+
# Total replicas must be 1 or more
612+
total_replicas: int = step_preparation_response.replicas
613+
assert total_replicas >= 1
614+
614615
variables = step_preparation_response.variables
615616
assert variables is not None
616-
for iteration in range(step_preparation_response.iterations):
617+
for replica in range(step_preparation_response.replicas):
617618

618-
# If we are replicating this step then we must replace the step's variable
619+
# If we are replicating this step more than once
620+
# the 'replica_variable' will be set.
621+
# We must replace the step's variable
619622
# with a value expected for this iteration.
620-
if step_preparation_response.iteration_variable:
621-
assert step_preparation_response.iteration_values
622-
iter_value: str = step_preparation_response.iteration_values[iteration]
623+
if step_preparation_response.replica_variable:
624+
assert step_preparation_response.replica_values
625+
iter_value: str = step_preparation_response.replica_values[replica]
623626
_LOGGER.info(
624-
"Replicating step: %s iteration=%s variable=%s value=%s",
627+
"Replicating step: %s replica=%s variable=%s value=%s",
625628
step_name,
626-
iteration,
627-
step_preparation_response.iteration_variable,
629+
replica,
630+
step_preparation_response.replica_variable,
628631
iter_value,
629632
)
630633
# Over-write the replicating variable
631634
# and set the replication number to a unique +ve non-zero value...
632-
variables[step_preparation_response.iteration_variable] = iter_value
633-
step_replication_number = iteration + 1
635+
variables[step_preparation_response.replica_variable] = iter_value
634636

635637
_LOGGER.info(
636638
"Launching step: %s RunningWorkflow=%s (name=%s)"
@@ -652,7 +654,7 @@ def _launch(
652654
variables=variables,
653655
running_workflow_id=rwf_id,
654656
step_name=step_name,
655-
step_replication_number=step_replication_number,
657+
step_replication_number=replica,
656658
total_number_of_replicas=total_replicas,
657659
)
658660
lr: LaunchResult = self._instance_launcher.launch(launch_parameters=lp)

0 commit comments

Comments
 (0)