Skip to content

Commit fdce185

Browse files
authored
IWF-464: Add WaitForStateExecutionCompletion API (#85)
* IWF-464: Implement wait_for_state_execution_completion * IWF-464: Fix test * IWF-464: Add waitForKey feature * IWF-464: Fix test * IWF-464: Remove sleep from tests * IWF-464: Fix test * IWF-464: Change test * IWF-464: Lint * IWF-464: Simplify test * IWF-464: Add test complexity
1 parent 622edae commit fdce185

9 files changed

+395
-41
lines changed

iwf/client.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
WorkflowState,
2323
get_state_id,
2424
get_state_id_by_class,
25+
get_state_execution_id,
2526
should_skip_wait_until,
2627
)
2728
from iwf.workflow_state_options import _to_idl_state_options
@@ -86,6 +87,13 @@ def start_workflow(
8687

8788
unreg_opts.workflow_config_override = options.workflow_config_override
8889

90+
unreg_opts.wait_for_completion_state_ids = (
91+
options.wait_for_completion_state_ids
92+
)
93+
unreg_opts.wait_for_completion_state_execution_ids = (
94+
options.wait_for_completion_state_execution_ids
95+
)
96+
8997
# TODO: set initial search attributes here
9098

9199
starting_state_id = None
@@ -317,6 +325,47 @@ def set_workflow_search_attributes(
317325
workflow_class, workflow_id, run_id, search_attributes
318326
)
319327

328+
"""A long poll API to wait for the completion of the state.
329+
Note 1 The state_completion to wait for is needed to registered on starting workflow due to limitation in https://github.com/indeedeng/iwf/issues/349
330+
Note 2 The max polling time is configured in client_options (default to 10s)
331+
332+
Args:
333+
state_class the state class.
334+
workflow_id the workflowId
335+
state_execution_number the state execution number. E.g. if it's 2, it means the 2nd execution of the state
336+
"""
337+
338+
def wait_for_state_execution_completion_with_state_execution_id(
339+
self,
340+
state_class: type[WorkflowState],
341+
workflow_id: str,
342+
state_execution_number: int = 1,
343+
):
344+
state_execution_id = get_state_execution_id(state_class, state_execution_number)
345+
346+
self._unregistered_client.wait_for_state_execution_completion_with_state_execution_id(
347+
workflow_id, state_execution_id
348+
)
349+
350+
"""A long poll API to wait for the completion of the state.
351+
Note 1 The state_completion to wait for is needed to registered on starting workflow due to limitation in https://github.com/indeedeng/iwf/issues/349
352+
Note 2 The max polling time is configured in client_options (default to 10s)
353+
354+
Args:
355+
state_class the state class.
356+
workflow_id the workflowId
357+
wait_for_key key provided by the client and to identity workflow
358+
"""
359+
360+
def wait_for_state_execution_completion_with_wait_for_key(
361+
self, state_class: type[WorkflowState], workflow_id: str, wait_for_key: str
362+
):
363+
state_id = get_state_id_by_class(state_class)
364+
365+
self._unregistered_client.wait_for_state_execution_completion_with_wait_for_key(
366+
workflow_id, state_id, wait_for_key
367+
)
368+
320369
def _do_set_workflow_search_attributes(
321370
self,
322371
workflow_class: type[ObjectWorkflow],

iwf/client_options.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ class ClientOptions:
99
worker_url: str
1010
object_encoder: ObjectEncoder
1111
api_timeout: int = 60
12+
long_poll_api_max_wait_time_seconds: int = 10
1213

1314
@classmethod
1415
def local_default(cls):

iwf/state_decision.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,14 @@ def single_next_state(
5454
state: Union[str, type[WorkflowState]],
5555
state_input: Any = None,
5656
state_options_override: Optional[WorkflowStateOptions] = None,
57+
wait_for_key: Optional[str] = None,
5758
) -> StateDecision:
5859
return StateDecision(
59-
[StateMovement.create(state, state_input, state_options_override)]
60+
[
61+
StateMovement.create(
62+
state, state_input, state_options_override, wait_for_key
63+
)
64+
]
6065
)
6166

6267
@classmethod

iwf/state_movement.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class StateMovement:
3636
state_id: str
3737
state_input: Any = None
3838
state_options_override: Optional[WorkflowStateOptions] = None
39+
wait_for_key: Optional[str] = None
3940

4041
dead_end: typing.ClassVar[StateMovement]
4142

@@ -57,6 +58,7 @@ def create(
5758
state: Union[str, type[WorkflowState]],
5859
state_input: Any = None,
5960
state_options_override: Optional[WorkflowStateOptions] = None,
61+
wait_for_key: Optional[str] = None,
6062
) -> StateMovement:
6163
if isinstance(state, str):
6264
state_id = state
@@ -68,7 +70,9 @@ def create(
6870
state_id = get_state_id_by_class(state)
6971
if state_id.startswith(reserved_state_id_prefix):
7072
raise WorkflowDefinitionError("cannot use reserved stateId")
71-
return StateMovement(state_id, state_input, state_options_override)
73+
return StateMovement(
74+
state_id, state_input, state_options_override, wait_for_key
75+
)
7276

7377

7478
StateMovement.dead_end = StateMovement(dead_end_sys_state_id)
@@ -99,4 +103,7 @@ def _to_idl_state_movement(
99103
)
100104

101105
idl_movement.state_options = idl_state_options
106+
107+
if movement.wait_for_key is not None:
108+
idl_movement.wait_for_key = movement.wait_for_key
102109
return idl_movement
Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import inspect
22
import time
33
import unittest
4+
from time import sleep
45

56
from iwf.client import Client
6-
from iwf.command_request import CommandRequest, TimerCommand
7+
from iwf.command_request import CommandRequest
78
from iwf.command_results import CommandResults
89
from iwf.communication import Communication
910
from iwf.iwf_api.models import SearchAttributeValueType
@@ -14,25 +15,24 @@
1415
from iwf.tests.worker_server import registry
1516
from iwf.workflow import ObjectWorkflow
1617
from iwf.workflow_context import WorkflowContext
18+
from iwf.workflow_options import WorkflowOptions
1719
from iwf.workflow_state import T, WorkflowState
1820

1921
sa_keyword_key = "CustomKeywordField"
20-
sa_text_key = "CustomTextField"
2122
sa_double_key = "CustomDoubleField"
2223
sa_int_key = "CustomIntField"
2324
sa_bool_key = "CustomBoolField"
2425
sa_datetime_key = "CustomDatetimeField"
2526
sa_keyword_array_key = "CustomKeywordArrayField"
2627

2728
sa_keyword: str = "keyword"
28-
sa_text: str = "text"
2929
sa_double: float = 2.34
3030
sa_int: int = 234
31+
sa_bool: bool = False
3132
sa_datetime: str = "2024-11-12T16:00:01.731455544-08:00"
3233
sa_keyword_array: list[str] = ["keyword-1", "keyword-2"]
3334

3435
final_sa_keyword: str = "final_keyword"
35-
final_sa_text = None
3636
final_sa_int: int = 567
3737
final_sa_bool: bool = False
3838
final_sa_datetime: str = "2024-12-13T16:00:01.731455544-08:00"
@@ -47,14 +47,6 @@ def wait_until(
4747
persistence: Persistence,
4848
communication: Communication,
4949
) -> CommandRequest:
50-
persistence.set_search_attribute_keyword(sa_keyword_key, sa_keyword)
51-
persistence.set_search_attribute_text(sa_text_key, sa_text)
52-
persistence.set_search_attribute_double(sa_double_key, sa_double)
53-
persistence.set_search_attribute_int64(sa_int_key, sa_int)
54-
persistence.set_search_attribute_datetime(sa_datetime_key, sa_datetime)
55-
persistence.set_search_attribute_keyword_array(
56-
sa_keyword_array_key, sa_keyword_array
57-
)
5850
return CommandRequest.empty()
5951

6052
def execute(
@@ -65,6 +57,14 @@ def execute(
6557
persistence: Persistence,
6658
communication: Communication,
6759
) -> StateDecision:
60+
persistence.set_search_attribute_keyword(sa_keyword_key, sa_keyword)
61+
persistence.set_search_attribute_double(sa_double_key, sa_double)
62+
persistence.set_search_attribute_boolean(sa_bool_key, sa_bool)
63+
persistence.set_search_attribute_keyword_array(
64+
sa_keyword_array_key, sa_keyword_array
65+
)
66+
persistence.set_search_attribute_int64(sa_int_key, sa_int)
67+
persistence.set_search_attribute_datetime(sa_datetime_key, sa_datetime)
6868
return StateDecision.single_next_state(SearchAttributeState2)
6969

7070

@@ -76,9 +76,7 @@ def wait_until(
7676
persistence: Persistence,
7777
communication: Communication,
7878
) -> CommandRequest:
79-
return CommandRequest.for_all_command_completed(
80-
TimerCommand.by_seconds(7),
81-
)
79+
return CommandRequest.empty()
8280

8381
def execute(
8482
self,
@@ -88,14 +86,15 @@ def execute(
8886
persistence: Persistence,
8987
communication: Communication,
9088
) -> StateDecision:
89+
# Delay updating search attributes to allow for the first assertion
90+
sleep(1)
9191
persistence.set_search_attribute_keyword(sa_keyword_key, final_sa_keyword)
92-
persistence.set_search_attribute_text(sa_text_key, final_sa_text)
93-
persistence.set_search_attribute_int64(sa_int_key, final_sa_int)
9492
persistence.set_search_attribute_boolean(sa_bool_key, final_sa_bool)
95-
persistence.set_search_attribute_datetime(sa_datetime_key, final_sa_datetime)
9693
persistence.set_search_attribute_keyword_array(
9794
sa_keyword_array_key, final_sa_keyword_array
9895
)
96+
persistence.set_search_attribute_int64(sa_int_key, final_sa_int)
97+
persistence.set_search_attribute_datetime(sa_datetime_key, final_sa_datetime)
9998
return StateDecision.graceful_complete_workflow()
10099

101100

@@ -110,23 +109,20 @@ def get_persistence_schema(self) -> PersistenceSchema:
110109
PersistenceField.search_attribute_def(
111110
sa_keyword_key, SearchAttributeValueType.KEYWORD
112111
),
113-
PersistenceField.search_attribute_def(
114-
sa_text_key, SearchAttributeValueType.TEXT
115-
),
116112
PersistenceField.search_attribute_def(
117113
sa_double_key, SearchAttributeValueType.DOUBLE
118114
),
119115
PersistenceField.search_attribute_def(
120-
sa_int_key, SearchAttributeValueType.INT
116+
sa_bool_key, SearchAttributeValueType.BOOL
121117
),
122118
PersistenceField.search_attribute_def(
123-
sa_bool_key, SearchAttributeValueType.BOOL
119+
sa_keyword_array_key, SearchAttributeValueType.KEYWORD_ARRAY
124120
),
125121
PersistenceField.search_attribute_def(
126-
sa_datetime_key, SearchAttributeValueType.DATETIME
122+
sa_int_key, SearchAttributeValueType.INT
127123
),
128124
PersistenceField.search_attribute_def(
129-
sa_keyword_array_key, SearchAttributeValueType.KEYWORD_ARRAY
125+
sa_datetime_key, SearchAttributeValueType.DATETIME
130126
),
131127
)
132128

@@ -141,22 +137,27 @@ def setUpClass(cls):
141137
def test_persistence_search_attributes_workflow(self):
142138
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"
143139

144-
self.client.start_workflow(PersistenceSearchAttributesWorkflow, wf_id, 100)
140+
wf_opts = WorkflowOptions()
141+
wf_opts.add_wait_for_completion_state_ids(SearchAttributeState1)
142+
self.client.start_workflow(
143+
PersistenceSearchAttributesWorkflow, wf_id, 100, None, wf_opts
144+
)
145145

146-
# TODO: Should be replaced with wait_for_state_execution_completed once implemented
147-
# https://github.com/indeedeng/iwf-python-sdk/issues/48
148-
time.sleep(5)
146+
self.client.wait_for_state_execution_completion_with_state_execution_id(
147+
SearchAttributeState1, wf_id
148+
)
149149

150150
returned_search_attributes = self.client.get_all_search_attributes(
151-
PersistenceSearchAttributesWorkflow, wf_id
151+
PersistenceSearchAttributesWorkflow,
152+
wf_id,
152153
)
153154

154155
expected_search_attributes = dict()
155156
expected_search_attributes[sa_keyword_key] = sa_keyword
156-
expected_search_attributes[sa_text_key] = sa_text
157157
expected_search_attributes[sa_double_key] = sa_double
158-
expected_search_attributes[sa_int_key] = sa_int
158+
expected_search_attributes[sa_bool_key] = sa_bool
159159
expected_search_attributes[sa_keyword_array_key] = sa_keyword_array
160+
expected_search_attributes[sa_int_key] = sa_int
160161
expected_search_attributes[sa_datetime_key] = (
161162
"2024-11-13T00:00:01.731455544Z" # This is a bug. The iwf-server always returns utc time. See https://github.com/indeedeng/iwf/issues/261
162163
# "2024-11-12T18:00:01.731455544-06:00"
@@ -166,14 +167,8 @@ def test_persistence_search_attributes_workflow(self):
166167

167168
self.client.wait_for_workflow_completion(wf_id)
168169

169-
final_returned_search_attributes = self.client.get_all_search_attributes(
170-
PersistenceSearchAttributesWorkflow,
171-
wf_id,
172-
)
173-
174170
final_expected_search_attributes = dict()
175171
final_expected_search_attributes[sa_keyword_key] = final_sa_keyword
176-
final_expected_search_attributes[sa_text_key] = ""
177172
final_expected_search_attributes[sa_double_key] = sa_double
178173
final_expected_search_attributes[sa_int_key] = final_sa_int
179174
final_expected_search_attributes[sa_bool_key] = final_sa_bool
@@ -183,4 +178,9 @@ def test_persistence_search_attributes_workflow(self):
183178
# "2024-12-13T18:00:01.731455544-06:00"
184179
)
185180

181+
final_returned_search_attributes = self.client.get_all_search_attributes(
182+
PersistenceSearchAttributesWorkflow,
183+
wf_id,
184+
)
185+
186186
assert final_expected_search_attributes == final_returned_search_attributes

0 commit comments

Comments
 (0)