Skip to content

Commit 36db192

Browse files
authored
IWF-607: Implement Search Attributes store and API (#83)
* IWF-607: Refactor Persistence * IWF-607: Lint * IWF-607: Fix types * IWF-607: Fix * IWF-607: Create client methods
1 parent b4ccfe3 commit 36db192

File tree

9 files changed

+455
-48
lines changed

9 files changed

+455
-48
lines changed

iwf-idl

iwf/client.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,17 @@
55

66
from iwf.client_options import ClientOptions
77
from iwf.errors import InvalidArgumentError
8+
from iwf.iwf_api.models import (
9+
SearchAttribute,
10+
SearchAttributeKeyAndType,
11+
SearchAttributeValueType,
12+
)
13+
from iwf.iwf_api.types import Unset
814
from iwf.registry import Registry
915
from iwf.reset_workflow_type_and_options import ResetWorkflowTypeAndOptions
1016
from iwf.stop_workflow_options import StopWorkflowOptions
1117
from iwf.unregistered_client import UnregisteredClient, UnregisteredWorkflowOptions
18+
from iwf.utils.iwf_typing import unset_to_none
1219
from iwf.workflow import ObjectWorkflow, get_workflow_type_by_class
1320
from iwf.workflow_options import WorkflowOptions
1421
from iwf.workflow_state import (
@@ -211,3 +218,106 @@ def skip_timer_at_command_index(
211218
state_execution_number,
212219
timer_command_index,
213220
)
221+
222+
def get_all_search_attributes(
223+
self,
224+
workflow_class: type[ObjectWorkflow],
225+
workflow_id: str,
226+
workflow_run_id: Optional[str] = None,
227+
):
228+
return self._do_get_workflow_search_attributes(
229+
workflow_class, workflow_id, workflow_run_id
230+
)
231+
232+
def get_workflow_search_attributes(
233+
self,
234+
workflow_class: type[ObjectWorkflow],
235+
workflow_id: str,
236+
attribute_keys: list[str],
237+
workflow_run_id: Optional[str] = None,
238+
):
239+
if not attribute_keys:
240+
raise ValueError(
241+
"attribute_keys must contain at least one entry, or use get_all_search_attributes API to get all"
242+
)
243+
return self._do_get_workflow_search_attributes(
244+
workflow_class, workflow_id, workflow_run_id, attribute_keys
245+
)
246+
247+
def _do_get_workflow_search_attributes(
248+
self,
249+
workflow_class: type[ObjectWorkflow],
250+
workflow_id: str,
251+
workflow_run_id: Optional[str],
252+
attribute_keys: Optional[list[str]] = None,
253+
):
254+
wf_type = get_workflow_type_by_class(workflow_class)
255+
self._registry.get_workflow_with_check(wf_type)
256+
257+
search_attribute_types = self._registry.get_search_attribute_types(wf_type)
258+
259+
# if attribute keys is None or empty, iwf server will return all search attributes
260+
if attribute_keys is not None and attribute_keys:
261+
non_existing_search_attribute_list: list[str] = []
262+
for attribute_key in attribute_keys:
263+
if attribute_key not in search_attribute_types:
264+
non_existing_search_attribute_list.append(attribute_key)
265+
266+
if non_existing_search_attribute_list:
267+
raise InvalidArgumentError(
268+
f"Search attributes not registered: {','.join(non_existing_search_attribute_list)}"
269+
)
270+
271+
key_and_types: list[SearchAttributeKeyAndType] = []
272+
if attribute_keys is None:
273+
for attribute_key, sa_type in search_attribute_types.items():
274+
key_and_types.append(SearchAttributeKeyAndType(attribute_key, sa_type))
275+
else:
276+
for attribute_key in attribute_keys:
277+
sa_type = search_attribute_types[attribute_key]
278+
key_and_types.append(SearchAttributeKeyAndType(attribute_key, sa_type))
279+
280+
run_id = workflow_run_id if workflow_run_id is not None else ""
281+
282+
response = self._unregistered_client.get_workflow_search_attributes(
283+
workflow_id, run_id, key_and_types
284+
)
285+
286+
response_sas = response.search_attributes
287+
288+
# TODO: troubleshoot why unset_to_none doesn't work as expected with lists
289+
if isinstance(response_sas, Unset) or response_sas is None:
290+
raise RuntimeError("search attributes not returned")
291+
292+
result: dict[str, Any] = {}
293+
294+
for response_sa in response_sas:
295+
response_sa_key = unset_to_none(response_sa.key)
296+
if response_sa_key is None:
297+
raise RuntimeError("search attribute key is None")
298+
response_sa_type = search_attribute_types[response_sa_key]
299+
value = self.get_search_attribute_value(response_sa_type, response_sa)
300+
result[response_sa_key] = value
301+
302+
return result
303+
304+
@staticmethod
305+
def get_search_attribute_value(
306+
sa_type: SearchAttributeValueType, attribute: SearchAttribute
307+
):
308+
if (
309+
sa_type == SearchAttributeValueType.KEYWORD
310+
or sa_type == SearchAttributeValueType.DATETIME
311+
or sa_type == SearchAttributeValueType.TEXT
312+
):
313+
return unset_to_none(attribute.string_value)
314+
elif sa_type == SearchAttributeValueType.INT:
315+
return unset_to_none(attribute.integer_value)
316+
elif sa_type == SearchAttributeValueType.DOUBLE:
317+
return unset_to_none(attribute.double_value)
318+
elif sa_type == SearchAttributeValueType.BOOL:
319+
return unset_to_none(attribute.bool_value)
320+
elif sa_type == SearchAttributeValueType.KEYWORD_ARRAY:
321+
return unset_to_none(attribute.string_array_value)
322+
else:
323+
raise ValueError(f"not supported search attribute value type, {sa_type}")

iwf/data_attributes.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
from typing import Any, Optional, Union
2+
3+
from iwf.errors import WorkflowDefinitionError
4+
from iwf.iwf_api.models import EncodedObject
5+
from iwf.object_encoder import ObjectEncoder
6+
7+
8+
class DataAttributes:
9+
_type_store: dict[str, Optional[type]]
10+
_object_encoder: ObjectEncoder
11+
_current_values: dict[str, Union[EncodedObject, None]]
12+
_updated_values_to_return: dict[str, EncodedObject]
13+
14+
def __init__(
15+
self,
16+
type_store: dict[str, Optional[type]],
17+
object_encoder: ObjectEncoder,
18+
current_values: dict[str, Union[EncodedObject, None]],
19+
):
20+
self._object_encoder = object_encoder
21+
self._type_store = type_store
22+
self._current_values = current_values
23+
self._updated_values_to_return = {}
24+
25+
def get_data_attribute(self, key: str) -> Any:
26+
if key not in self._type_store:
27+
raise WorkflowDefinitionError(f"data attribute %s is not registered {key}")
28+
29+
encoded_object = self._current_values.get(key)
30+
if encoded_object is None:
31+
return None
32+
33+
registered_type = self._type_store[key]
34+
return self._object_encoder.decode(encoded_object, registered_type)
35+
36+
def set_data_attribute(self, key: str, value: Any):
37+
if key not in self._type_store:
38+
raise WorkflowDefinitionError(f"data attribute %s is not registered {key}")
39+
40+
registered_type = self._type_store[key]
41+
if registered_type is not None and not isinstance(value, registered_type):
42+
raise WorkflowDefinitionError(
43+
f"data attribute %s is of the right type {registered_type}"
44+
)
45+
46+
encoded_value = self._object_encoder.encode(value)
47+
self._current_values[key] = encoded_value
48+
self._updated_values_to_return[key] = encoded_value
49+
50+
def get_updated_values_to_return(self) -> dict[str, EncodedObject]:
51+
return self._updated_values_to_return

iwf/persistence.py

Lines changed: 49 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,65 @@
1-
from typing import Any, Optional, Union
1+
from typing import Any, Union
22

3-
from iwf.errors import WorkflowDefinitionError
4-
from iwf.iwf_api.models import EncodedObject
5-
from iwf.object_encoder import ObjectEncoder
3+
from iwf.data_attributes import DataAttributes
4+
from iwf.search_attributes import SearchAttributes
65

76

87
class Persistence:
9-
_type_store: dict[str, Optional[type]]
10-
_object_encoder: ObjectEncoder
11-
_current_values: dict[str, Union[EncodedObject, None]]
12-
_updated_values_to_return: dict[str, EncodedObject]
8+
_data_attributes: DataAttributes
9+
_search_attributes: SearchAttributes
1310

1411
def __init__(
1512
self,
16-
type_store: dict[str, Optional[type]],
17-
object_encoder: ObjectEncoder,
18-
current_values: dict[str, Union[EncodedObject, None]],
13+
data_attributes: DataAttributes,
14+
search_attributes: SearchAttributes,
1915
):
20-
self._object_encoder = object_encoder
21-
self._type_store = type_store
22-
self._current_values = current_values
23-
self._updated_values_to_return = {}
16+
self._data_attributes = data_attributes
17+
self._search_attributes = search_attributes
2418

2519
def get_data_attribute(self, key: str) -> Any:
26-
if key not in self._type_store:
27-
raise WorkflowDefinitionError(f"data attribute %s is not registered {key}")
20+
return self._data_attributes.get_data_attribute(key)
2821

29-
encoded_object = self._current_values.get(key)
30-
if encoded_object is None:
31-
return None
22+
def set_data_attribute(self, key: str, value: Any):
23+
self._data_attributes.set_data_attribute(key, value)
3224

33-
registered_type = self._type_store[key]
34-
return self._object_encoder.decode(encoded_object, registered_type)
25+
def get_search_attribute_int64(self, key: str) -> Union[None, int]:
26+
return self._search_attributes.get_search_attribute_int64(key)
3527

36-
def set_data_attribute(self, key: str, value: Any):
37-
if key not in self._type_store:
38-
raise WorkflowDefinitionError(f"data attribute %s is not registered {key}")
28+
def set_search_attribute_int64(self, key: str, value: int):
29+
self._search_attributes.set_search_attribute_int64(key, value)
30+
31+
def get_search_attribute_double(self, key: str) -> Union[None, float]:
32+
return self._search_attributes.get_search_attribute_double(key)
33+
34+
def set_search_attribute_double(self, key: str, value: float):
35+
self._search_attributes.set_search_attribute_double(key, value)
36+
37+
def get_search_attribute_boolean(self, key: str) -> Union[None, bool]:
38+
return self._search_attributes.get_search_attribute_boolean(key)
39+
40+
def set_search_attribute_boolean(self, key: str, value: bool):
41+
self._search_attributes.set_search_attribute_boolean(key, value)
42+
43+
def get_search_attribute_keyword(self, key: str) -> Union[None, str]:
44+
return self._search_attributes.get_search_attribute_keyword(key)
45+
46+
def set_search_attribute_keyword(self, key: str, value: str):
47+
self._search_attributes.set_search_attribute_keyword(key, value)
48+
49+
def get_search_attribute_text(self, key: str) -> Union[None, str]:
50+
return self._search_attributes.get_search_attribute_text(key)
51+
52+
def set_search_attribute_text(self, key: str, value: str):
53+
self._search_attributes.set_search_attribute_text(key, value)
54+
55+
def get_search_attribute_datetime(self, key: str) -> Union[None, str]:
56+
return self._search_attributes.get_search_attribute_datetime(key)
3957

40-
registered_type = self._type_store[key]
41-
if registered_type is not None and not isinstance(value, registered_type):
42-
raise WorkflowDefinitionError(
43-
f"data attribute %s is of the right type {registered_type}"
44-
)
58+
def set_search_attribute_datetime(self, key: str, value: str):
59+
self._search_attributes.set_search_attribute_datetime(key, value)
4560

46-
encoded_value = self._object_encoder.encode(value)
47-
self._current_values[key] = encoded_value
48-
self._updated_values_to_return[key] = encoded_value
61+
def get_search_attribute_keyword_array(self, key: str) -> Union[None, list[str]]:
62+
return self._search_attributes.get_search_attribute_keyword_array(key)
4963

50-
def get_updated_values_to_return(self) -> dict[str, EncodedObject]:
51-
return self._updated_values_to_return
64+
def set_search_attribute_keyword_array(self, key: str, value: list[str]):
65+
self._search_attributes.set_search_attribute_keyword_array(key, value)

iwf/persistence_schema.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,30 @@
22
from enum import Enum
33
from typing import List, Optional
44

5+
from iwf.iwf_api.models import SearchAttributeValueType
6+
57

68
class PersistenceFieldType(Enum):
79
DataAttribute = 1
8-
# SearchAttribute = 2
10+
SearchAttribute = 2
911

1012

1113
@dataclass
1214
class PersistenceField:
1315
key: str
1416
field_type: PersistenceFieldType
1517
value_type: Optional[type]
16-
# search_attribute_type: Optional[SearchAttributeValueType] = None
18+
search_attribute_type: Optional[SearchAttributeValueType] = None
1719

1820
@classmethod
1921
def data_attribute_def(cls, key: str, value_type: Optional[type]):
2022
return PersistenceField(key, PersistenceFieldType.DataAttribute, value_type)
2123

22-
# @classmethod
23-
# def search_attribute_def(cls, key: str, sa_type: SearchAttributeValueType):
24-
# return PersistenceField(key, PersistenceFieldType.SearchAttribute, sa_type)
24+
@classmethod
25+
def search_attribute_def(cls, key: str, sa_type: SearchAttributeValueType):
26+
return PersistenceField(
27+
key, PersistenceFieldType.SearchAttribute, None, sa_type
28+
)
2529

2630

2731
@dataclass

iwf/registry.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from iwf.communication_schema import CommunicationMethodType
44
from iwf.errors import InvalidArgumentError, WorkflowDefinitionError
5+
from iwf.iwf_api.models import SearchAttributeValueType
56
from iwf.persistence_schema import PersistenceFieldType
67
from iwf.rpc import RPCInfo
78
from iwf.type_store import TypeStore, Type
@@ -16,6 +17,7 @@ class Registry:
1617
_internal_channel_type_store: dict[str, TypeStore]
1718
_signal_channel_type_store: dict[str, dict[str, Optional[type]]]
1819
_data_attribute_types: dict[str, dict[str, Optional[type]]]
20+
_search_attribute_types: dict[str, dict[str, SearchAttributeValueType]]
1921
_rpc_infos: dict[str, dict[str, RPCInfo]]
2022

2123
def __init__(self):
@@ -25,6 +27,7 @@ def __init__(self):
2527
self._internal_channel_type_store = dict()
2628
self._signal_channel_type_store = dict()
2729
self._data_attribute_types = dict()
30+
self._search_attribute_types = {}
2831
self._rpc_infos = dict()
2932

3033
def add_workflow(self, wf: ObjectWorkflow):
@@ -33,6 +36,7 @@ def add_workflow(self, wf: ObjectWorkflow):
3336
self._register_internal_channels(wf)
3437
self._register_signal_channels(wf)
3538
self._register_data_attributes(wf)
39+
self._register_search_attributes(wf)
3640
self._register_workflow_rpcs(wf)
3741

3842
def add_workflows(self, *wfs: ObjectWorkflow):
@@ -73,6 +77,11 @@ def get_signal_channel_types(self, wf_type: str) -> dict[str, Optional[type]]:
7377
def get_data_attribute_types(self, wf_type: str) -> dict[str, Optional[type]]:
7478
return self._data_attribute_types[wf_type]
7579

80+
def get_search_attribute_types(
81+
self, wf_type: str
82+
) -> dict[str, SearchAttributeValueType]:
83+
return self._search_attribute_types[wf_type]
84+
7685
def get_rpc_infos(self, wf_type: str) -> dict[str, RPCInfo]:
7786
return self._rpc_infos[wf_type]
7887

@@ -112,6 +121,23 @@ def _register_data_attributes(self, wf: ObjectWorkflow):
112121
types[field.key] = field.value_type
113122
self._data_attribute_types[wf_type] = types
114123

124+
def _register_search_attributes(self, wf: ObjectWorkflow):
125+
wf_type = get_workflow_type(wf)
126+
types: dict[str, SearchAttributeValueType] = {}
127+
for field in wf.get_persistence_schema().persistence_fields:
128+
if field.field_type == PersistenceFieldType.SearchAttribute:
129+
sa_type = field.search_attribute_type
130+
if sa_type is None:
131+
raise WorkflowDefinitionError(
132+
f"Found search attribute {field.key} with no type set"
133+
)
134+
if field.key in types:
135+
raise WorkflowDefinitionError(
136+
f"Search attribute {field.key} already exists"
137+
)
138+
types[field.key] = sa_type
139+
self._search_attribute_types[wf_type] = types
140+
115141
def _register_workflow_state(self, wf):
116142
wf_type = get_workflow_type(wf)
117143
state_map = {}

0 commit comments

Comments
 (0)