Skip to content

Commit 35d3d4a

Browse files
N-giveNathan Givens
andauthored
IWF-682 add wait_until API failure policy (#97)
Co-authored-by: Nathan Givens <[email protected]>
1 parent 534e582 commit 35d3d4a

File tree

2 files changed

+52
-9
lines changed

2 files changed

+52
-9
lines changed

iwf/tests/test_state_failure_recovery.py

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from iwf.command_results import CommandResults
77
from iwf.communication import Communication
88
from iwf.iwf_api.models import RetryPolicy
9+
from iwf.iwf_api.models.wait_until_api_failure_policy import WaitUntilApiFailurePolicy
910
from iwf.persistence import Persistence
1011
from iwf.state_decision import StateDecision
1112
from iwf.state_schema import StateSchema
@@ -16,7 +17,35 @@
1617
from iwf.workflow_state_options import WorkflowStateOptions
1718

1819

19-
class FailState(WorkflowState[None]):
20+
class FailWaitUntilState(WorkflowState[None]):
21+
def wait_until(
22+
self,
23+
ctx: WorkflowContext,
24+
input: T,
25+
persistence: Persistence,
26+
communication: Communication,
27+
):
28+
raise RuntimeError("failed wait_until")
29+
30+
def execute(
31+
self,
32+
ctx: WorkflowContext,
33+
input: T,
34+
command_results: CommandResults,
35+
persistence: Persistence,
36+
communication: Communication,
37+
):
38+
return StateDecision.single_next_state(FailExecuteState)
39+
40+
def get_state_options(self) -> WorkflowStateOptions:
41+
return WorkflowStateOptions(
42+
execute_api_retry_policy=RetryPolicy(maximum_attempts=1),
43+
wait_until_api_retry_policy=RetryPolicy(maximum_attempts=1),
44+
proceed_to_execute_when_wait_until_retry_exhausted=WaitUntilApiFailurePolicy.PROCEED_ON_FAILURE,
45+
)
46+
47+
48+
class FailExecuteState(WorkflowState[None]):
2049
def execute(
2150
self,
2251
ctx: WorkflowContext,
@@ -48,17 +77,20 @@ def execute(
4877

4978
class RecoveryWorkflow(ObjectWorkflow):
5079
def get_workflow_states(self) -> StateSchema:
51-
return StateSchema.with_starting_state(FailState(), RecoveryState())
52-
53-
54-
wf = RecoveryWorkflow()
55-
registry.add_workflow(wf)
56-
client = Client(registry)
80+
return StateSchema.with_starting_state(
81+
FailWaitUntilState(), FailExecuteState(), RecoveryState()
82+
)
5783

5884

5985
class Test(unittest.TestCase):
86+
@classmethod
87+
def setUpClass(cls):
88+
wf = RecoveryWorkflow()
89+
registry.add_workflow(wf)
90+
cls.client = Client(registry)
91+
6092
def test_workflow_recovery(self):
6193
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"
62-
client.start_workflow(RecoveryWorkflow, wf_id, 10)
63-
result = client.get_simple_workflow_result_with_wait(wf_id, str)
94+
self.client.start_workflow(RecoveryWorkflow, wf_id, 10)
95+
result = self.client.wait_for_workflow_completion(wf_id, str)
6496
assert result == "done"

iwf/workflow_state_options.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,17 @@ def _to_idl_state_options(
108108
)
109109
if options.wait_until_api_retry_policy is not None:
110110
res.wait_until_api_retry_policy = options.wait_until_api_retry_policy
111+
if options.proceed_to_execute_when_wait_until_retry_exhausted is not None:
112+
if options.proceed_to_execute_when_wait_until_retry_exhausted:
113+
res.wait_until_api_failure_policy = (
114+
WaitUntilApiFailurePolicy.PROCEED_ON_FAILURE
115+
)
116+
else:
117+
res.wait_until_api_failure_policy = (
118+
WaitUntilApiFailurePolicy.FAIL_WORKFLOW_ON_FAILURE
119+
)
120+
121+
pass
111122
if options.wait_until_api_timeout_seconds is not None:
112123
res.wait_until_api_timeout_seconds = options.wait_until_api_timeout_seconds
113124
if options.execute_api_retry_policy is not None:

0 commit comments

Comments
 (0)