Skip to content

Commit 622edae

Browse files
authored
IWF-628: Add setting search attributes (#84)
* IWF-628: Add set search attributes methods * IWF-628: Add unit tests * IWF-628: Fix * IWF-628: Fix test * IWF-628: Fix * IWF-628: Testing * IWF-628: Fix test * IWF-628: Fix test * IWF-628: Fix test * IWF-628: Add comment * IWF-628: Change test * IWF-628: Lint * IWF-628: Simplify test * IWF-628: Refactor * IWF-628: Change comment * IWF-628: Change timeout * IWF-628: Testing * IWF-628: Unskip tests * IWF-628: Edit comment * IWF-628: Increase wait
1 parent 36db192 commit 622edae

File tree

9 files changed

+427
-54
lines changed

9 files changed

+427
-54
lines changed

.devcontainer/.env

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ CASSANDRA_VERSION=3.11.9
22
ELASTICSEARCH_VERSION=7.16.2
33
MYSQL_VERSION=8
44
POSTGRESQL_VERSION=13
5-
TEMPORAL_VERSION=1.20.2
6-
TEMPORAL_UI_VERSION=2.15.0
5+
TEMPORAL_VERSION=1.25
6+
TEMPORAL_ADMIN_TOOLS_VERSION=1.25.2-tctl-1.18.1-cli-1.1.1
7+
TEMPORAL_UI_VERSION=2.31.2

iwf/client.py

Lines changed: 81 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,10 @@ def get_all_search_attributes(
225225
workflow_id: str,
226226
workflow_run_id: Optional[str] = None,
227227
):
228+
run_id = workflow_run_id if workflow_run_id is not None else ""
229+
228230
return self._do_get_workflow_search_attributes(
229-
workflow_class, workflow_id, workflow_run_id
231+
workflow_class, workflow_id, run_id
230232
)
231233

232234
def get_workflow_search_attributes(
@@ -240,24 +242,27 @@ def get_workflow_search_attributes(
240242
raise ValueError(
241243
"attribute_keys must contain at least one entry, or use get_all_search_attributes API to get all"
242244
)
245+
246+
run_id = workflow_run_id if workflow_run_id is not None else ""
247+
243248
return self._do_get_workflow_search_attributes(
244-
workflow_class, workflow_id, workflow_run_id, attribute_keys
249+
workflow_class, workflow_id, run_id, attribute_keys
245250
)
246251

247252
def _do_get_workflow_search_attributes(
248253
self,
249254
workflow_class: type[ObjectWorkflow],
250255
workflow_id: str,
251-
workflow_run_id: Optional[str],
256+
workflow_run_id: str,
252257
attribute_keys: Optional[list[str]] = None,
253258
):
254259
wf_type = get_workflow_type_by_class(workflow_class)
255260
self._registry.get_workflow_with_check(wf_type)
256261

257262
search_attribute_types = self._registry.get_search_attribute_types(wf_type)
258263

259-
# if attribute keys is None or empty, iwf server will return all search attributes
260-
if attribute_keys is not None and attribute_keys:
264+
# if attribute keys is None, will fetch all registered search attributes from the server
265+
if attribute_keys:
261266
non_existing_search_attribute_list: list[str] = []
262267
for attribute_key in attribute_keys:
263268
if attribute_key not in search_attribute_types:
@@ -277,10 +282,8 @@ def _do_get_workflow_search_attributes(
277282
sa_type = search_attribute_types[attribute_key]
278283
key_and_types.append(SearchAttributeKeyAndType(attribute_key, sa_type))
279284

280-
run_id = workflow_run_id if workflow_run_id is not None else ""
281-
282285
response = self._unregistered_client.get_workflow_search_attributes(
283-
workflow_id, run_id, key_and_types
286+
workflow_id, workflow_run_id, key_and_types
284287
)
285288

286289
response_sas = response.search_attributes
@@ -296,28 +299,78 @@ def _do_get_workflow_search_attributes(
296299
if response_sa_key is None:
297300
raise RuntimeError("search attribute key is None")
298301
response_sa_type = search_attribute_types[response_sa_key]
299-
value = self.get_search_attribute_value(response_sa_type, response_sa)
302+
value = get_search_attribute_value(response_sa_type, response_sa)
300303
result[response_sa_key] = value
301304

302305
return result
303306

304-
@staticmethod
305-
def get_search_attribute_value(
306-
sa_type: SearchAttributeValueType, attribute: SearchAttribute
307+
def set_workflow_search_attributes(
308+
self,
309+
workflow_class: type[ObjectWorkflow],
310+
workflow_id: str,
311+
search_attributes: list[SearchAttribute],
312+
workflow_run_id: Optional[str] = None,
307313
):
308-
if (
309-
sa_type == SearchAttributeValueType.KEYWORD
310-
or sa_type == SearchAttributeValueType.DATETIME
311-
or sa_type == SearchAttributeValueType.TEXT
312-
):
313-
return unset_to_none(attribute.string_value)
314-
elif sa_type == SearchAttributeValueType.INT:
315-
return unset_to_none(attribute.integer_value)
316-
elif sa_type == SearchAttributeValueType.DOUBLE:
317-
return unset_to_none(attribute.double_value)
318-
elif sa_type == SearchAttributeValueType.BOOL:
319-
return unset_to_none(attribute.bool_value)
320-
elif sa_type == SearchAttributeValueType.KEYWORD_ARRAY:
321-
return unset_to_none(attribute.string_array_value)
322-
else:
323-
raise ValueError(f"not supported search attribute value type, {sa_type}")
314+
run_id = workflow_run_id if workflow_run_id is not None else ""
315+
316+
return self._do_set_workflow_search_attributes(
317+
workflow_class, workflow_id, run_id, search_attributes
318+
)
319+
320+
def _do_set_workflow_search_attributes(
321+
self,
322+
workflow_class: type[ObjectWorkflow],
323+
workflow_id: str,
324+
workflow_run_id: str,
325+
search_attributes: list[SearchAttribute],
326+
):
327+
wf_type = get_workflow_type_by_class(workflow_class)
328+
self._registry.get_workflow_with_check(wf_type)
329+
330+
search_attribute_types = self._registry.get_search_attribute_types(wf_type)
331+
332+
# Check that the requested sa type is registered to the key
333+
for search_attribute in search_attributes:
334+
sa_key = unset_to_none(search_attribute.key)
335+
if sa_key is None:
336+
raise RuntimeError("search attribute key is None")
337+
if sa_key not in search_attribute_types:
338+
raise InvalidArgumentError(f"Search attribute not registered: {sa_key}")
339+
registered_value_type = search_attribute_types[sa_key]
340+
341+
sa_value_type = unset_to_none(search_attribute.value_type)
342+
if sa_value_type is None:
343+
raise RuntimeError("search value type is None")
344+
345+
if (
346+
sa_value_type is not None
347+
and registered_value_type != sa_value_type.value
348+
):
349+
raise ValueError(
350+
f"Search attribute key, {sa_key} is registered to type {registered_value_type}, but tried to add search attribute type {sa_value_type.value}"
351+
)
352+
353+
self._unregistered_client.set_workflow_search_attributes(
354+
workflow_id, workflow_run_id, search_attributes
355+
)
356+
357+
358+
def get_search_attribute_value(
359+
sa_type: SearchAttributeValueType, attribute: SearchAttribute
360+
):
361+
if (
362+
sa_type == SearchAttributeValueType.KEYWORD
363+
or sa_type == SearchAttributeValueType.DATETIME
364+
or sa_type == SearchAttributeValueType.TEXT
365+
):
366+
return unset_to_none(attribute.string_value)
367+
elif sa_type == SearchAttributeValueType.INT:
368+
return unset_to_none(attribute.integer_value)
369+
elif sa_type == SearchAttributeValueType.DOUBLE:
370+
return unset_to_none(attribute.double_value)
371+
elif sa_type == SearchAttributeValueType.BOOL:
372+
return unset_to_none(attribute.bool_value)
373+
elif sa_type == SearchAttributeValueType.KEYWORD_ARRAY:
374+
return unset_to_none(attribute.string_array_value)
375+
else:
376+
raise ValueError(f"not supported search attribute value type, {sa_type}")

iwf/persistence.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,41 +25,43 @@ def set_data_attribute(self, key: str, value: Any):
2525
def get_search_attribute_int64(self, key: str) -> Union[None, int]:
2626
return self._search_attributes.get_search_attribute_int64(key)
2727

28-
def set_search_attribute_int64(self, key: str, value: int):
28+
def set_search_attribute_int64(self, key: str, value: Union[None, int]):
2929
self._search_attributes.set_search_attribute_int64(key, value)
3030

3131
def get_search_attribute_double(self, key: str) -> Union[None, float]:
3232
return self._search_attributes.get_search_attribute_double(key)
3333

34-
def set_search_attribute_double(self, key: str, value: float):
34+
def set_search_attribute_double(self, key: str, value: Union[None, float]):
3535
self._search_attributes.set_search_attribute_double(key, value)
3636

3737
def get_search_attribute_boolean(self, key: str) -> Union[None, bool]:
3838
return self._search_attributes.get_search_attribute_boolean(key)
3939

40-
def set_search_attribute_boolean(self, key: str, value: bool):
40+
def set_search_attribute_boolean(self, key: str, value: Union[None, bool]):
4141
self._search_attributes.set_search_attribute_boolean(key, value)
4242

4343
def get_search_attribute_keyword(self, key: str) -> Union[None, str]:
4444
return self._search_attributes.get_search_attribute_keyword(key)
4545

46-
def set_search_attribute_keyword(self, key: str, value: str):
46+
def set_search_attribute_keyword(self, key: str, value: Union[None, str]):
4747
self._search_attributes.set_search_attribute_keyword(key, value)
4848

4949
def get_search_attribute_text(self, key: str) -> Union[None, str]:
5050
return self._search_attributes.get_search_attribute_text(key)
5151

52-
def set_search_attribute_text(self, key: str, value: str):
52+
def set_search_attribute_text(self, key: str, value: Union[None, str]):
5353
self._search_attributes.set_search_attribute_text(key, value)
5454

5555
def get_search_attribute_datetime(self, key: str) -> Union[None, str]:
5656
return self._search_attributes.get_search_attribute_datetime(key)
5757

58-
def set_search_attribute_datetime(self, key: str, value: str):
58+
def set_search_attribute_datetime(self, key: str, value: Union[None, str]):
5959
self._search_attributes.set_search_attribute_datetime(key, value)
6060

6161
def get_search_attribute_keyword_array(self, key: str) -> Union[None, list[str]]:
6262
return self._search_attributes.get_search_attribute_keyword_array(key)
6363

64-
def set_search_attribute_keyword_array(self, key: str, value: list[str]):
64+
def set_search_attribute_keyword_array(
65+
self, key: str, value: Union[None, list[str]]
66+
):
6567
self._search_attributes.set_search_attribute_keyword_array(key, value)

iwf/search_attributes.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ def __init__(
7272
def get_search_attribute_int64(self, key: str) -> Union[int, None]:
7373
return self._int64_attribute_map.get(key)
7474

75-
def set_search_attribute_int64(self, key: str, value: int):
75+
def set_search_attribute_int64(self, key: str, value: Union[int, None]):
7676
if (
7777
key not in self._key_to_type_map
7878
or self._key_to_type_map[key] != SearchAttributeValueType.INT
@@ -84,7 +84,7 @@ def set_search_attribute_int64(self, key: str, value: int):
8484
def get_search_attribute_double(self, key: str) -> Union[float, None]:
8585
return self._double_attribute_map.get(key)
8686

87-
def set_search_attribute_double(self, key: str, value: float):
87+
def set_search_attribute_double(self, key: str, value: Union[float, None]):
8888
if (
8989
key not in self._key_to_type_map
9090
or self._key_to_type_map[key] != SearchAttributeValueType.DOUBLE
@@ -96,7 +96,7 @@ def set_search_attribute_double(self, key: str, value: float):
9696
def get_search_attribute_boolean(self, key: str) -> Union[bool, None]:
9797
return self._bool_attribute_map.get(key)
9898

99-
def set_search_attribute_boolean(self, key: str, value: bool):
99+
def set_search_attribute_boolean(self, key: str, value: Union[bool, None]):
100100
if (
101101
key not in self._key_to_type_map
102102
or self._key_to_type_map[key] != SearchAttributeValueType.BOOL
@@ -108,7 +108,7 @@ def set_search_attribute_boolean(self, key: str, value: bool):
108108
def get_search_attribute_keyword(self, key: str) -> Union[str, None]:
109109
return self._string_attribute_map.get(key)
110110

111-
def set_search_attribute_keyword(self, key: str, value: str):
111+
def set_search_attribute_keyword(self, key: str, value: Union[str, None]):
112112
if (
113113
key not in self._key_to_type_map
114114
or self._key_to_type_map[key] != SearchAttributeValueType.KEYWORD
@@ -120,7 +120,7 @@ def set_search_attribute_keyword(self, key: str, value: str):
120120
def get_search_attribute_text(self, key: str) -> Union[str, None]:
121121
return self._string_attribute_map.get(key)
122122

123-
def set_search_attribute_text(self, key: str, value: str):
123+
def set_search_attribute_text(self, key: str, value: Union[str, None]):
124124
if (
125125
key not in self._key_to_type_map
126126
or self._key_to_type_map[key] != SearchAttributeValueType.TEXT
@@ -132,7 +132,7 @@ def set_search_attribute_text(self, key: str, value: str):
132132
def get_search_attribute_datetime(self, key: str) -> Union[str, None]:
133133
return self._string_attribute_map.get(key)
134134

135-
def set_search_attribute_datetime(self, key: str, value: str):
135+
def set_search_attribute_datetime(self, key: str, value: Union[str, None]):
136136
if (
137137
key not in self._key_to_type_map
138138
or self._key_to_type_map[key] != SearchAttributeValueType.DATETIME
@@ -144,7 +144,9 @@ def set_search_attribute_datetime(self, key: str, value: str):
144144
def get_search_attribute_keyword_array(self, key: str) -> Union[list[str], None]:
145145
return self._string_array_attribute_map.get(key)
146146

147-
def set_search_attribute_keyword_array(self, key: str, value: list[str]):
147+
def set_search_attribute_keyword_array(
148+
self, key: str, value: Union[list[str], None]
149+
):
148150
if (
149151
key not in self._key_to_type_map
150152
or self._key_to_type_map[key] != SearchAttributeValueType.KEYWORD_ARRAY

iwf/tests/iwf-service-env/docker-compose-init.sh

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@ for run in {1..120}; do
3030
sleep 0.1
3131
temporal operator search-attribute create --name CustomStringField --type Text
3232
sleep 0.1
33+
temporal operator search-attribute create --name CustomKeywordArrayField --type KeywordList
34+
sleep 0.1
3335

34-
if checkExists "IwfWorkflowType" && checkExists "IwfGlobalWorkflowVersion" && checkExists "IwfExecutingStateIds" && checkExists "CustomKeywordField" && checkExists "CustomIntField" && checkExists "CustomBoolField" && checkExists "CustomDoubleField" && checkExists "CustomDatetimeField" && checkExists "CustomStringField"; then
36+
if checkExists "IwfWorkflowType" && checkExists "IwfGlobalWorkflowVersion" && checkExists "IwfExecutingStateIds" && checkExists "CustomKeywordField" && checkExists "CustomIntField" && checkExists "CustomBoolField" && checkExists "CustomDoubleField" && checkExists "CustomDatetimeField" && checkExists "CustomStringField" && checkExists "CustomKeywordArrayField"; then
3537
echo "All search attributes are registered"
3638
break
3739
fi

iwf/tests/test_persistence.py renamed to iwf/tests/test_persistence_data_attributes.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def execute(
6363
return StateDecision.graceful_complete_workflow()
6464

6565

66-
class PersistenceWorkflow(ObjectWorkflow):
66+
class PersistenceDataAttributesWorkflow(ObjectWorkflow):
6767
def get_workflow_states(self) -> StateSchema:
6868
return StateSchema.with_starting_state(DataAttributeRWState())
6969

@@ -85,14 +85,14 @@ def test_persistence_read(self, pers: Persistence):
8585
)
8686

8787

88-
class TestPersistence(unittest.TestCase):
88+
class TestPersistenceDataAttributes(unittest.TestCase):
8989
@classmethod
9090
def setUpClass(cls):
91-
wf = PersistenceWorkflow()
91+
wf = PersistenceDataAttributesWorkflow()
9292
registry.add_workflow(wf)
9393
cls.client = Client(registry)
9494

95-
def test_persistence_workflow(self):
95+
def test_persistence_data_attributes_workflow(self):
9696
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"
9797

9898
start_options = WorkflowOptions(
@@ -102,10 +102,14 @@ def test_persistence_workflow(self):
102102
},
103103
)
104104

105-
self.client.start_workflow(PersistenceWorkflow, wf_id, 100, None, start_options)
106-
self.client.get_simple_workflow_result_with_wait(wf_id, None)
105+
self.client.start_workflow(
106+
PersistenceDataAttributesWorkflow, wf_id, 100, None, start_options
107+
)
108+
self.client.wait_for_workflow_completion(wf_id, None)
107109

108-
res = self.client.invoke_rpc(wf_id, PersistenceWorkflow.test_persistence_read)
110+
res = self.client.invoke_rpc(
111+
wf_id, PersistenceDataAttributesWorkflow.test_persistence_read
112+
)
109113
assert res == [
110114
final_initial_da_value_1,
111115
final_initial_da_value_2,

0 commit comments

Comments
 (0)