Skip to content

Commit 2a6b708

Browse files
committed
fix: stashing
1 parent d946546 commit 2a6b708

File tree

3 files changed

+31
-10
lines changed

3 files changed

+31
-10
lines changed

tests/test_workflow_engine_examples.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import os
22
import time
33
from datetime import datetime, timezone
4+
from pprint import pprint
45
from typing import Any
56

67
import pytest
@@ -458,7 +459,8 @@ def test_workflow_engine_simple_python_fanout(basic_engine):
458459
# Additional, detailed checks...
459460
# Check we only have one RunningWorkflowStep, and it succeeded
460461
response = da.get_running_workflow_steps(running_workflow_id=r_wfid)
461-
print("response", response)
462+
print("response")
463+
pprint(response)
462464

463465
assert response["count"] == 2
464466
assert response["running_workflow_steps"][0]["done"]

workflow/decoder.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,3 +280,13 @@ def set_step_variables(
280280

281281
result |= options
282282
return result
283+
284+
285+
def get_step_replication_param(*, step: dict[str, Any]) -> str | Any:
286+
"""Return step's replication info"""
287+
replicator = step.get("replicate", None)
288+
if replicator:
289+
# 'using' is a dict but there can be only single value for now
290+
replicator = list(replicator["using"].values())[0]
291+
292+
return replicator

workflow/workflow_engine.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import logging
2626
import sys
2727
from http import HTTPStatus
28+
from pprint import pprint
2829
from typing import Any, Dict, Optional
2930

3031
from decoder.decoder import TextEncoding, decode
@@ -40,6 +41,7 @@
4041
)
4142

4243
from .decoder import (
44+
get_step_replication_param,
4345
get_workflow_job_input_names_for_step,
4446
set_step_variables,
4547
workflow_step_has_outputs,
@@ -466,11 +468,13 @@ def _validate_step_command(
466468
running_wf, _ = self._wapi_adapter.get_running_workflow(
467469
running_workflow_id=running_wf_id
468470
)
469-
print("running wf", running_wf)
471+
print("running wf")
472+
pprint(running_wf)
470473
workflow_id = running_wf["workflow"]["id"]
471474
workflow, _ = self._wapi_adapter.get_workflow(workflow_id=workflow_id)
472475

473-
print("workflow", workflow)
476+
print("workflow")
477+
pprint(workflow)
474478

475479
# for step in workflow["steps"]:
476480
# if step["name"] in previous_step_names:
@@ -556,10 +560,14 @@ def _launch(
556560
wf_step_data, _ = self._wapi_adapter.get_workflow_steps_driving_this_step(
557561
running_workflow_step_id=rwfs_id,
558562
)
559-
print("wf_step_data", wf_step_data)
563+
print("wf_step_data")
564+
pprint(wf_step_data)
560565
assert wf_step_data["caller_step_index"] >= 0
561566
our_step_index: int = wf_step_data["caller_step_index"]
562567

568+
print("step in _launch:", step_name)
569+
pprint(step)
570+
563571
# Now check the step command can be executed
564572
# (by trying to decoding the Job command).
565573
#
@@ -585,11 +593,7 @@ def _launch(
585593
variables: dict[str, Any] = error_or_variables
586594
print("variables", variables)
587595
# find out if and by which parameter this step should be replicated
588-
replicator = ""
589-
if replicate := step.get("replicate", {}):
590-
if using := replicate.get("using", {}):
591-
# using is a dict but there can be only single value for now
592-
replicator = list(using.values())[0]
596+
replicator = get_step_replication_param(step=step)
593597

594598
_LOGGER.info(
595599
"Launching step: RunningWorkflow=%s RunningWorkflowStep=%s step=%s"
@@ -634,6 +638,10 @@ def _launch(
634638
#
635639
# 'running_workflow_step_inputs'
636640
# A list of Job input variable names
641+
642+
print("variables")
643+
pprint(variables)
644+
637645
inputs: list[str] = []
638646
inputs.extend(iter(get_workflow_job_input_names_for_step(wf, step_name)))
639647
if replicator:
@@ -645,7 +653,8 @@ def _launch(
645653
else:
646654
single_step_variables = [variables]
647655

648-
print("single step variables", single_step_variables)
656+
print("single step variables")
657+
pprint(single_step_variables)
649658

650659
for params in single_step_variables:
651660
lp: LaunchParameters = LaunchParameters(

0 commit comments

Comments
 (0)