Skip to content

Commit f336119

Browse files
author
Alan Christie
committed
feat: Minor work on combiner logic
1 parent 7d0363e commit f336119

File tree

1 file changed

+26
-5
lines changed

1 file changed

+26
-5
lines changed

workflow/workflow_engine.py

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -393,16 +393,19 @@ def _prepare_step(
393393
if step_name_being_combined:
394394
break
395395
if step_name_being_combined:
396+
print("*** COMBINER")
396397
response, _ = self._wapi_adapter.get_status_of_all_step_instances_by_name(
398+
name=step_name_being_combined,
397399
running_workflow_id=rwf_id,
398-
step_name=step_name_being_combined,
399400
)
400401
# Assume succes...
401-
all_step_instances_done: bool = True
402-
all_step_instances_successful: bool = True
403402
assert "count" in response
404-
assert response["count"] > 0
403+
num_being_combined: int = response["count"]
404+
assert num_being_combined > 0
405405
assert "status" in response
406+
407+
all_step_instances_done: bool = True
408+
all_step_instances_successful: bool = True
406409
for status in response["status"]:
407410
if not status["done"]:
408411
all_step_instances_done = False
@@ -412,15 +415,30 @@ def _prepare_step(
412415
break
413416
if not all_step_instances_done:
414417
# Can't move on - but other steps need to finish.
418+
_LOGGER.debug(
419+
"Assessing start of combiner step (%s)"
420+
" but not all steps (%s) to be combined are done",
421+
step_name,
422+
step_name_being_combined,
423+
)
415424
return StepPreparationResponse(iterations=0)
416425
elif not all_step_instances_successful:
417426
# Can't move on - all prior steps are done,
418427
# but at least one was in error.
428+
_LOGGER.debug(
429+
"Assessing start of combiner step (%s)"
430+
" but at least one step (%s) to be combined failed",
431+
step_name,
432+
step_name_being_combined,
433+
)
419434
return StepPreparationResponse(
420435
iterations=0,
421-
error_msg="A prior step 'step_name_being_combined' iteration has failed",
436+
error_msg=f"Prior instance of step '{step_name_being_combined}' has failed",
422437
)
423438

439+
if step_name_being_combined:
440+
print("*** COMBINER : Able to start")
441+
424442
# Now compile a set of variables for this step.
425443

426444
# Start with any variables provided in the step's specification.
@@ -447,6 +465,9 @@ def _prepare_step(
447465
# related to values used in prior steps. The decoder gives
448466
# us a map indexed by prior step name that's a list of "in" "out"
449467
# tuples as above.
468+
#
469+
# If this is a combiner step remember that we need to inspect
470+
# variables from all the prior steps.
450471
prior_step_plumbing: dict[str, list[Connector]] = get_step_prior_step_plumbing(
451472
step_definition=step_definition
452473
)

0 commit comments

Comments
 (0)