Skip to content

Commit 561c1cf

Browse files
N-giveNathan Givens
andauthored
IWF-467 add decider trigger type: ANY_COMMAND_COMBINATION (#90)
* IWF-467 add decider trigger type: ANY_COMMAND_COMBINATION --------- Co-authored-by: Nathan Givens <[email protected]>
1 parent 0c7f0bd commit 561c1cf

File tree

2 files changed

+93
-12
lines changed

2 files changed

+93
-12
lines changed

iwf/command_request.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from iwf.errors import WorkflowDefinitionError
55
from iwf.iwf_api.models import CommandWaitingType
6+
from iwf.iwf_api.models.command_combination import CommandCombination
67
from iwf.iwf_api.models.command_request import (
78
CommandRequest as IdlCommandRequest,
89
)
@@ -21,8 +22,7 @@ class TimerCommand:
2122
@classmethod
2223
def by_seconds(cls, duration_seconds: int, command_id: Optional[str] = None):
2324
return TimerCommand(
24-
command_id if command_id is not None else "",
25-
duration_seconds,
25+
command_id if command_id is not None else "", duration_seconds
2626
)
2727

2828

@@ -46,7 +46,8 @@ class SignalChannelCommand:
4646
@classmethod
4747
def by_name(cls, channel_name: str, command_id: Optional[str] = None):
4848
return SignalChannelCommand(
49-
command_id if command_id is not None else "", channel_name
49+
command_id if command_id is not None else "",
50+
channel_name,
5051
)
5152

5253

@@ -57,20 +58,31 @@ def by_name(cls, channel_name: str, command_id: Optional[str] = None):
5758
class CommandRequest:
5859
commands: list[BaseCommand]
5960
command_waiting_type: CommandWaitingType
61+
command_combinations: list[CommandCombination]
6062

6163
@classmethod
6264
def for_any_command_completed(cls, *commands: BaseCommand):
6365
bc = [c for c in commands]
64-
return CommandRequest(bc, CommandWaitingType.ANY_COMPLETED)
66+
return CommandRequest(bc, CommandWaitingType.ANY_COMPLETED, [])
6567

6668
@classmethod
6769
def for_all_command_completed(cls, *commands: BaseCommand):
6870
bc = [c for c in commands]
69-
return CommandRequest(bc, CommandWaitingType.ALL_COMPLETED)
71+
return CommandRequest(bc, CommandWaitingType.ALL_COMPLETED, [])
72+
73+
@classmethod
74+
def for_any_command_combination_completed(
75+
cls, command_combinations_list: list[list[str]], *commands: BaseCommand
76+
):
77+
return CommandRequest(
78+
list(commands),
79+
CommandWaitingType.ANY_COMBINATION_COMPLETED,
80+
[CommandCombination(c) for c in command_combinations_list],
81+
)
7082

7183
@classmethod
7284
def empty(cls):
73-
return CommandRequest(list(), CommandWaitingType.ALL_COMPLETED)
85+
return CommandRequest(list(), CommandWaitingType.ALL_COMPLETED, [])
7486

7587

7688
def _to_idl_command_request(request: CommandRequest) -> IdlCommandRequest:
@@ -99,4 +111,6 @@ def _to_idl_command_request(request: CommandRequest) -> IdlCommandRequest:
99111
req.inter_state_channel_commands = internal_channel_commands
100112
if len(signal_commands) > 0:
101113
req.signal_commands = signal_commands
114+
if len(request.command_combinations) > 0:
115+
req.command_combinations = request.command_combinations
102116
return req

iwf/tests/test_signal.py

Lines changed: 73 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,17 @@
33
import unittest
44

55
from iwf.client import Client
6+
from iwf.iwf_api.models.timer_result import TimerStatus
67
from iwf.command_request import (
78
CommandRequest,
89
SignalChannelCommand,
10+
TimerCommand,
11+
)
12+
from iwf.command_results import (
13+
CommandResults,
14+
SignalChannelCommandResult,
15+
TimerCommandResult,
916
)
10-
from iwf.command_results import CommandResults, SignalChannelCommandResult
1117
from iwf.communication import Communication
1218
from iwf.communication_schema import CommunicationMethod, CommunicationSchema
1319
from iwf.iwf_api.models import ChannelRequestStatus
@@ -25,8 +31,14 @@
2531
test_channel_str = "test-str"
2632
test_idle_channel_none = "test-idle"
2733

34+
test_channel1 = "test-channel1"
35+
test_channel1_id = "test-channel1-id"
36+
test_channel2 = "test-channel2"
37+
test_channel2_id = "test-channel2-id"
38+
test_timer_id = "test-timer-id"
39+
2840

29-
class WaitState(WorkflowState[None]):
41+
class WaitState1(WorkflowState[None]):
3042
def wait_until(
3143
self,
3244
ctx: WorkflowContext,
@@ -61,7 +73,58 @@ def execute(
6173
assert sig3 == SignalChannelCommandResult(
6274
test_channel_str, "abc", ChannelRequestStatus.RECEIVED, ""
6375
)
64-
return StateDecision.graceful_complete_workflow(sig3.value)
76+
return StateDecision.single_next_state(WaitState2, sig3.value)
77+
78+
79+
class WaitState2(WorkflowState[str]):
80+
def wait_until(
81+
self,
82+
ctx: WorkflowContext,
83+
input: str,
84+
persistence: Persistence,
85+
communication: Communication,
86+
) -> CommandRequest:
87+
return CommandRequest.for_any_command_combination_completed(
88+
[
89+
[
90+
test_channel1_id,
91+
test_timer_id,
92+
]
93+
],
94+
SignalChannelCommand.by_name(test_channel1, test_channel1_id),
95+
SignalChannelCommand.by_name(test_channel2, test_channel2_id),
96+
TimerCommand.by_seconds(1, test_timer_id),
97+
)
98+
99+
def execute(
100+
self,
101+
ctx: WorkflowContext,
102+
input: str,
103+
command_results: CommandResults,
104+
persistence: Persistence,
105+
communication: Communication,
106+
) -> StateDecision:
107+
assert len(command_results.signal_channel_commands) == 2
108+
assert (
109+
len(
110+
[
111+
r
112+
for r in command_results.signal_channel_commands
113+
if r.status == ChannelRequestStatus.RECEIVED
114+
]
115+
)
116+
== 1
117+
)
118+
sig1 = command_results.signal_channel_commands[0]
119+
tim1 = command_results.timer_commands[0]
120+
assert sig1 == SignalChannelCommandResult(
121+
test_channel1, None, ChannelRequestStatus.RECEIVED, test_channel1_id
122+
)
123+
assert tim1 == TimerCommandResult(
124+
TimerStatus.FIRED,
125+
test_timer_id,
126+
)
127+
return StateDecision.graceful_complete_workflow(input)
65128

66129

67130
class WaitSignalWorkflow(ObjectWorkflow):
@@ -71,10 +134,12 @@ def get_communication_schema(self) -> CommunicationSchema:
71134
CommunicationMethod.signal_channel_def(test_channel_none, type(None)),
72135
CommunicationMethod.signal_channel_def(test_channel_str, str),
73136
CommunicationMethod.signal_channel_def(test_idle_channel_none, type(None)),
137+
CommunicationMethod.signal_channel_def(test_channel1, type(None)),
138+
CommunicationMethod.signal_channel_def(test_channel2, type(None)),
74139
)
75140

76141
def get_workflow_states(self) -> StateSchema:
77-
return StateSchema.with_starting_state(WaitState())
142+
return StateSchema.with_starting_state(WaitState1(), WaitState2())
78143

79144
@rpc()
80145
def get_idle_signal_channel_size(self, com: Communication):
@@ -90,11 +155,13 @@ def setUpClass(cls):
90155

91156
def test_signal(self):
92157
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"
93-
self.client.start_workflow(WaitSignalWorkflow, wf_id, 1)
158+
self.client.start_workflow(WaitSignalWorkflow, wf_id, 10)
94159
self.client.signal_workflow(wf_id, test_channel_int, 123)
95160
self.client.signal_workflow(wf_id, test_channel_str, "abc")
96161
self.client.signal_workflow(wf_id, test_channel_none)
97-
res = self.client.get_simple_workflow_result_with_wait(wf_id)
162+
163+
self.client.signal_workflow(wf_id, test_channel1)
164+
res = self.client.wait_for_workflow_completion(wf_id, str)
98165
assert res == "abc"
99166

100167
def test_signal_channel_size(self):

0 commit comments

Comments
 (0)