Skip to content

Commit 343d932

Browse files
authored
IWF-465: Initial search attributes (#88)
1 parent f000333 commit 343d932

File tree

5 files changed

+165
-26
lines changed

5 files changed

+165
-26
lines changed

iwf/client.py

Lines changed: 59 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from typing_extensions import deprecated
55

66
from iwf.client_options import ClientOptions
7-
from iwf.errors import InvalidArgumentError
7+
from iwf.errors import InvalidArgumentError, WorkflowDefinitionError
88
from iwf.iwf_api.models import (
99
SearchAttribute,
1010
SearchAttributeKeyAndType,
@@ -16,6 +16,7 @@
1616
from iwf.stop_workflow_options import StopWorkflowOptions
1717
from iwf.unregistered_client import UnregisteredClient, UnregisteredWorkflowOptions
1818
from iwf.utils.iwf_typing import unset_to_none
19+
from iwf.utils.persistence_utils import get_search_attribute_value
1920
from iwf.workflow import ObjectWorkflow, get_workflow_type_by_class
2021
from iwf.workflow_options import WorkflowOptions
2122
from iwf.workflow_state import (
@@ -94,7 +95,12 @@ def start_workflow(
9495
options.wait_for_completion_state_execution_ids
9596
)
9697

97-
# TODO: set initial search attributes here
98+
if options.initial_search_attributes:
99+
sa_types = self._registry.get_search_attribute_types(wf_type)
100+
converted_sas = convert_to_sa_list(
101+
sa_types, options.initial_search_attributes
102+
)
103+
unreg_opts.initial_search_attributes = converted_sas
98104

99105
starting_state_id = None
100106

@@ -404,22 +410,55 @@ def _do_set_workflow_search_attributes(
404410
)
405411

406412

407-
def get_search_attribute_value(
408-
sa_type: SearchAttributeValueType, attribute: SearchAttribute
413+
def convert_to_sa_list(
414+
sa_types: dict[str, SearchAttributeValueType], initial_sas: dict[str, Any]
409415
):
410-
if (
411-
sa_type == SearchAttributeValueType.KEYWORD
412-
or sa_type == SearchAttributeValueType.DATETIME
413-
or sa_type == SearchAttributeValueType.TEXT
414-
):
415-
return unset_to_none(attribute.string_value)
416-
elif sa_type == SearchAttributeValueType.INT:
417-
return unset_to_none(attribute.integer_value)
418-
elif sa_type == SearchAttributeValueType.DOUBLE:
419-
return unset_to_none(attribute.double_value)
420-
elif sa_type == SearchAttributeValueType.BOOL:
421-
return unset_to_none(attribute.bool_value)
422-
elif sa_type == SearchAttributeValueType.KEYWORD_ARRAY:
423-
return unset_to_none(attribute.string_array_value)
424-
else:
425-
raise ValueError(f"not supported search attribute value type, {sa_type}")
416+
converted_sas: list[SearchAttribute] = []
417+
if initial_sas:
418+
for initial_sa_key, initial_sa_val in initial_sas.items():
419+
if initial_sa_key not in sa_types:
420+
raise WorkflowDefinitionError(
421+
f"key {initial_sa_key} is not defined as search attribute, all keys are: {','.join(sa_types)}"
422+
)
423+
424+
val_type = sa_types[initial_sa_key]
425+
new_sa = SearchAttribute(key=initial_sa_key, value_type=val_type)
426+
is_val_correct_type = False
427+
if val_type == SearchAttributeValueType.INT:
428+
if isinstance(initial_sa_val, int):
429+
new_sa.integer_value = initial_sa_val
430+
converted_sas.append(new_sa)
431+
is_val_correct_type = True
432+
elif val_type == SearchAttributeValueType.DOUBLE:
433+
if isinstance(initial_sa_val, float):
434+
new_sa.double_value = initial_sa_val
435+
converted_sas.append(new_sa)
436+
is_val_correct_type = True
437+
elif val_type == SearchAttributeValueType.BOOL:
438+
if isinstance(initial_sa_val, bool):
439+
new_sa.bool_value = initial_sa_val
440+
converted_sas.append(new_sa)
441+
is_val_correct_type = True
442+
elif (
443+
val_type == SearchAttributeValueType.KEYWORD
444+
or val_type == SearchAttributeValueType.TEXT
445+
or val_type == SearchAttributeValueType.DATETIME
446+
):
447+
if isinstance(initial_sa_val, str):
448+
new_sa.string_value = initial_sa_val
449+
converted_sas.append(new_sa)
450+
is_val_correct_type = True
451+
elif val_type == SearchAttributeValueType.KEYWORD_ARRAY:
452+
if isinstance(initial_sa_val, list):
453+
new_sa.string_array_value = initial_sa_val
454+
converted_sas.append(new_sa)
455+
is_val_correct_type = True
456+
else:
457+
raise ValueError("unsupported type")
458+
459+
if not is_val_correct_type:
460+
raise InvalidArgumentError(
461+
f"search attribute value is not set correctly for key {initial_sa_key} with value type {val_type}"
462+
)
463+
464+
return converted_sas

iwf/tests/test_persistence_search_attributes.py

Lines changed: 67 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from time import sleep
55

66
from iwf.client import Client
7-
from iwf.command_request import CommandRequest
7+
from iwf.command_request import CommandRequest, TimerCommand
88
from iwf.command_results import CommandResults
99
from iwf.communication import Communication
1010
from iwf.iwf_api.models import SearchAttributeValueType
@@ -25,6 +25,13 @@
2525
sa_datetime_key = "CustomDatetimeField"
2626
sa_keyword_array_key = "CustomKeywordArrayField"
2727

28+
initial_sa_keyword: str = "initial_keyword"
29+
initial_sa_double: float = 1.11
30+
initial_sa_int: int = 1
31+
initial_sa_bool: bool = True
32+
initial_sa_datetime: str = "2024-11-09T16:00:01.731455544-08:00"
33+
initial_sa_keyword_array: list[str] = ["initial_keyword-1", "initial_keyword-2"]
34+
2835
sa_keyword: str = "keyword"
2936
sa_double: float = 2.34
3037
sa_int: int = 234
@@ -39,15 +46,17 @@
3946
final_sa_keyword_array: list[str] = ["final_keyword-1", "final_keyword-2"]
4047

4148

42-
class SearchAttributeState1(WorkflowState[None]):
49+
class SearchAttributeStateInit(WorkflowState[None]):
4350
def wait_until(
4451
self,
4552
ctx: WorkflowContext,
4653
input: T,
4754
persistence: Persistence,
4855
communication: Communication,
4956
) -> CommandRequest:
50-
return CommandRequest.empty()
57+
return CommandRequest.for_all_command_completed(
58+
TimerCommand.by_seconds(2),
59+
)
5160

5261
def execute(
5362
self,
@@ -65,6 +74,29 @@ def execute(
6574
)
6675
persistence.set_search_attribute_int64(sa_int_key, sa_int)
6776
persistence.set_search_attribute_datetime(sa_datetime_key, sa_datetime)
77+
return StateDecision.single_next_state(SearchAttributeState1)
78+
79+
80+
class SearchAttributeState1(WorkflowState[None]):
81+
def wait_until(
82+
self,
83+
ctx: WorkflowContext,
84+
input: T,
85+
persistence: Persistence,
86+
communication: Communication,
87+
) -> CommandRequest:
88+
return CommandRequest.for_all_command_completed(
89+
TimerCommand.by_seconds(2),
90+
)
91+
92+
def execute(
93+
self,
94+
ctx: WorkflowContext,
95+
input: T,
96+
command_results: CommandResults,
97+
persistence: Persistence,
98+
communication: Communication,
99+
) -> StateDecision:
68100
return StateDecision.single_next_state(SearchAttributeState2)
69101

70102

@@ -86,7 +118,7 @@ def execute(
86118
persistence: Persistence,
87119
communication: Communication,
88120
) -> StateDecision:
89-
# Delay updating search attributes to allow for the first assertion
121+
# Delay updating search attributes to allow for the previous assertion
90122
sleep(1)
91123
persistence.set_search_attribute_keyword(sa_keyword_key, final_sa_keyword)
92124
persistence.set_search_attribute_boolean(sa_bool_key, final_sa_bool)
@@ -101,7 +133,7 @@ def execute(
101133
class PersistenceSearchAttributesWorkflow(ObjectWorkflow):
102134
def get_workflow_states(self) -> StateSchema:
103135
return StateSchema.with_starting_state(
104-
SearchAttributeState1(), SearchAttributeState2()
136+
SearchAttributeStateInit(), SearchAttributeState1(), SearchAttributeState2()
105137
)
106138

107139
def get_persistence_schema(self) -> PersistenceSchema:
@@ -137,12 +169,41 @@ def setUpClass(cls):
137169
def test_persistence_search_attributes_workflow(self):
138170
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"
139171

140-
wf_opts = WorkflowOptions()
172+
wf_opts = WorkflowOptions(
173+
initial_search_attributes={
174+
sa_keyword_key: initial_sa_keyword,
175+
sa_double_key: initial_sa_double,
176+
sa_int_key: initial_sa_int,
177+
sa_bool_key: initial_sa_bool,
178+
sa_datetime_key: initial_sa_datetime,
179+
sa_keyword_array_key: initial_sa_keyword_array,
180+
}
181+
)
141182
wf_opts.add_wait_for_completion_state_ids(SearchAttributeState1)
142183
self.client.start_workflow(
143184
PersistenceSearchAttributesWorkflow, wf_id, 100, None, wf_opts
144185
)
145186

187+
initial_returned_search_attributes = self.client.get_all_search_attributes(
188+
PersistenceSearchAttributesWorkflow,
189+
wf_id,
190+
)
191+
192+
initial_expected_search_attributes = dict()
193+
initial_expected_search_attributes[sa_keyword_key] = initial_sa_keyword
194+
initial_expected_search_attributes[sa_double_key] = initial_sa_double
195+
initial_expected_search_attributes[sa_bool_key] = initial_sa_bool
196+
initial_expected_search_attributes[sa_keyword_array_key] = (
197+
initial_sa_keyword_array
198+
)
199+
initial_expected_search_attributes[sa_int_key] = initial_sa_int
200+
initial_expected_search_attributes[sa_datetime_key] = (
201+
"2024-11-10T00:00:01.731455544Z" # This is a bug. The iwf-server always returns utc time. See https://github.com/indeedeng/iwf/issues/261
202+
# "2024-11-09T18:00:01.731455544-06:00"
203+
)
204+
205+
assert initial_expected_search_attributes == initial_returned_search_attributes
206+
146207
self.client.wait_for_state_execution_completion_with_state_execution_id(
147208
SearchAttributeState1, wf_id
148209
)

iwf/unregistered_client.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
parse_unexpected_error,
1313
process_http_error,
1414
process_workflow_abnormal_exit_error,
15+
InvalidArgumentError,
1516
)
1617
from iwf.iwf_api import Client, errors
1718
from iwf.iwf_api.api.default import (
@@ -62,6 +63,8 @@
6263
from iwf.iwf_api.types import Response
6364
from iwf.reset_workflow_type_and_options import ResetWorkflowTypeAndOptions
6465
from iwf.stop_workflow_options import StopWorkflowOptions
66+
from iwf.utils.iwf_typing import assert_not_unset
67+
from iwf.utils.persistence_utils import get_search_attribute_value
6568

6669

6770
@dataclass
@@ -76,6 +79,7 @@ class UnregisteredWorkflowOptions:
7679
initial_data_attributes: Optional[dict[str, Any]] = None
7780
wait_for_completion_state_execution_ids: Optional[list[str]] = None
7881
wait_for_completion_state_ids: Optional[list[str]] = None
82+
initial_search_attributes: Optional[list[SearchAttribute]] = None
7983

8084

8185
T = TypeVar("T")
@@ -157,6 +161,17 @@ def start_workflow(
157161
options.workflow_already_started_options
158162
)
159163

164+
if options.initial_search_attributes:
165+
for search_attribute in options.initial_search_attributes:
166+
val = get_search_attribute_value(
167+
assert_not_unset(search_attribute.value_type), search_attribute
168+
)
169+
if val is None:
170+
raise InvalidArgumentError(
171+
f"search attribute value is not set correctly for key {search_attribute.key} with value type {search_attribute.value_type}"
172+
)
173+
start_options.search_attributes = options.initial_search_attributes
174+
160175
if options.initial_data_attributes:
161176
das = []
162177
for key, value in options.initial_data_attributes.items():

iwf/utils/persistence_utils.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from iwf.iwf_api.models import SearchAttributeValueType, SearchAttribute
2+
from iwf.utils.iwf_typing import unset_to_none
3+
4+
5+
def get_search_attribute_value(
6+
sa_type: SearchAttributeValueType, attribute: SearchAttribute
7+
):
8+
if (
9+
sa_type == SearchAttributeValueType.KEYWORD
10+
or sa_type == SearchAttributeValueType.DATETIME
11+
or sa_type == SearchAttributeValueType.TEXT
12+
):
13+
return unset_to_none(attribute.string_value)
14+
elif sa_type == SearchAttributeValueType.INT:
15+
return unset_to_none(attribute.integer_value)
16+
elif sa_type == SearchAttributeValueType.DOUBLE:
17+
return unset_to_none(attribute.double_value)
18+
elif sa_type == SearchAttributeValueType.BOOL:
19+
return unset_to_none(attribute.bool_value)
20+
elif sa_type == SearchAttributeValueType.KEYWORD_ARRAY:
21+
return unset_to_none(attribute.string_array_value)
22+
else:
23+
raise ValueError(f"not supported search attribute value type, {sa_type}")

iwf/workflow_options.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class WorkflowOptions:
2525
initial_data_attributes: Optional[dict[str, Any]] = None
2626
_wait_for_completion_state_ids: list[str] = field(default_factory=list)
2727
_wait_for_completion_state_execution_ids: list[str] = field(default_factory=list)
28+
initial_search_attributes: Optional[dict[str, Any]] = None
2829

2930
@property
3031
def wait_for_completion_state_ids(self) -> Optional[list[str]]:

0 commit comments

Comments
 (0)