Skip to content

Commit bd6097b

Browse files
N-giveNathan Givens
andauthored
IWF-683 Add caching persistence data (#104)
* IWF-683 add persistence options and use memo Co-authored-by: Nathan Givens <[email protected]>
1 parent a47de61 commit bd6097b

12 files changed

+856
-4
lines changed

iwf/client.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@ def start_workflow(
106106
)
107107
unreg_opts.initial_search_attributes = converted_sas
108108

109+
schema_options = self._registry.get_persistence_options(wf_type)
110+
if schema_options.enable_caching:
111+
unreg_opts.using_memo_for_data_attributes = schema_options.enable_caching
112+
109113
starting_state_id = None
110114

111115
if starting_state is not None:
@@ -188,8 +192,10 @@ def get_workflow_data_attributes(
188192
f"key {key} is not registered in workflow {wf_type}"
189193
)
190194

195+
schema_options = self._registry.get_persistence_options(wf_type)
196+
191197
response = self._unregistered_client.get_workflow_data_attributes(
192-
workflow_id, workflow_run_id, keys
198+
workflow_id, workflow_run_id, keys, schema_options.enable_caching
193199
)
194200

195201
if not response.objects:
@@ -237,6 +243,10 @@ def invoke_rpc(
237243
wf_type = get_workflow_type_by_rpc_method(rpc)
238244
rpc_name = rpc.__name__
239245
rpc_info = self._registry.get_rpc_infos(wf_type)[rpc_name]
246+
schema_options = self._registry.get_persistence_options(wf_type)
247+
use_memo = schema_options.enable_caching
248+
if rpc_info.bypass_caching_for_strong_consistency:
249+
use_memo = False
240250

241251
return self._unregistered_client.invoke_rpc(
242252
input=input,
@@ -245,6 +255,7 @@ def invoke_rpc(
245255
rpc_name=rpc_name,
246256
timeout_seconds=rpc_info.timeout_seconds,
247257
data_attribute_policy=rpc_info.data_attribute_loading_policy,
258+
use_memo_for_data_attributes=use_memo,
248259
all_defined_search_attribute_types=[],
249260
return_type_hint=return_type_hint,
250261
)

iwf/data_attributes.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,18 @@ def get_data_attribute(self, key: str) -> Any:
3838
def set_data_attribute(self, key: str, value: Any):
3939
is_registered = self._type_store.is_valid_name_or_prefix(key)
4040
if not is_registered:
41-
raise WorkflowDefinitionError(f"data attribute %s is not registered {key}")
41+
raise WorkflowDefinitionError(
42+
f"data attribute {key} is not registered {key}"
43+
)
4244

4345
registered_type = self._type_store.get_type(key)
44-
if registered_type is not None and not isinstance(value, registered_type):
46+
if (
47+
value is not None
48+
and registered_type is not None
49+
and not isinstance(value, registered_type)
50+
):
4551
raise WorkflowDefinitionError(
46-
f"data attribute %s is of the right type {registered_type}"
52+
f"data attribute {key} is of the right type {registered_type}"
4753
)
4854

4955
encoded_value = self._object_encoder.encode(value)

iwf/persistence_options.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from dataclasses import dataclass
2+
3+
4+
@dataclass
5+
class PersistenceOptions:
6+
enable_caching: bool
7+
8+
@classmethod
9+
def get_default(cls):
10+
return PersistenceOptions(False)

iwf/registry.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from iwf.communication_schema import CommunicationMethodType
44
from iwf.errors import InvalidArgumentError, WorkflowDefinitionError
55
from iwf.iwf_api.models import SearchAttributeValueType
6+
from iwf.persistence_options import PersistenceOptions
67
from iwf.persistence_schema import PersistenceFieldType
78
from iwf.rpc import RPCInfo
89
from iwf.type_store import TypeStore, Type
@@ -18,6 +19,7 @@ class Registry:
1819
_signal_channel_type_store: dict[str, dict[str, Optional[type]]]
1920
_data_attribute_types: dict[str, TypeStore]
2021
_search_attribute_types: dict[str, dict[str, SearchAttributeValueType]]
22+
_persistence_options: dict[str, PersistenceOptions]
2123
_rpc_infos: dict[str, dict[str, RPCInfo]]
2224

2325
def __init__(self):
@@ -28,6 +30,7 @@ def __init__(self):
2830
self._signal_channel_type_store = dict()
2931
self._data_attribute_types = dict()
3032
self._search_attribute_types = {}
33+
self._persistence_options = {}
3134
self._rpc_infos = dict()
3235

3336
def add_workflow(self, wf: ObjectWorkflow):
@@ -37,6 +40,7 @@ def add_workflow(self, wf: ObjectWorkflow):
3740
self._register_signal_channels(wf)
3841
self._register_data_attributes(wf)
3942
self._register_search_attributes(wf)
43+
self._register_persistence_options(wf)
4044
self._register_workflow_rpcs(wf)
4145

4246
def add_workflows(self, *wfs: ObjectWorkflow):
@@ -82,6 +86,9 @@ def get_search_attribute_types(
8286
) -> dict[str, SearchAttributeValueType]:
8387
return self._search_attribute_types[wf_type]
8488

89+
def get_persistence_options(self, wf_type: str) -> PersistenceOptions:
90+
return self._persistence_options[wf_type]
91+
8592
def get_rpc_infos(self, wf_type: str) -> dict[str, RPCInfo]:
8693
return self._rpc_infos[wf_type]
8794

@@ -141,6 +148,10 @@ def _register_search_attributes(self, wf: ObjectWorkflow):
141148
types[field.key] = sa_type
142149
self._search_attribute_types[wf_type] = types
143150

151+
def _register_persistence_options(self, wf: ObjectWorkflow):
152+
wf_type = get_workflow_type(wf)
153+
self._persistence_options[wf_type] = wf.get_persistence_options()
154+
144155
def _register_workflow_state(self, wf):
145156
wf_type = get_workflow_type(wf)
146157
state_map = {}

iwf/rpc.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class RPCInfo:
1616
params_order: Optional[list] = (
1717
None # store this so that the rpc can be invoked with correct parameters
1818
)
19+
bypass_caching_for_strong_consistency: bool = False
1920

2021

2122
rpc_definition_err = WorkflowDefinitionError(
@@ -27,6 +28,7 @@ class RPCInfo:
2728
def rpc(
2829
timeout_seconds: int = 10,
2930
data_attribute_loading_policy: Optional[PersistenceLoadingPolicy] = None,
31+
bypass_caching_for_strong_consistency: bool = False,
3032
):
3133
def decorator(func):
3234
# preserve the properties of the original function.
@@ -40,6 +42,7 @@ def wrapper(*args, **kwargs):
4042
method_func=func,
4143
timeout_seconds=timeout_seconds,
4244
data_attribute_loading_policy=data_attribute_loading_policy,
45+
bypass_caching_for_strong_consistency=bypass_caching_for_strong_consistency,
4346
)
4447
params = signature(func).parameters
4548

iwf/tests/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
from iwf.registry import Registry
2+
from iwf.tests.workflows.java_duplicate_rpc_memo_workflow import (
3+
JavaDuplicateRpcMemoWorkflow,
4+
)
25
from iwf.tests.workflows.abnormal_exit_workflow import AbnormalExitWorkflow
36
from iwf.tests.workflows.basic_workflow import BasicWorkflow
47
from iwf.tests.workflows.conditional_complete_workflow import (
@@ -19,6 +22,7 @@
1922
PersistenceStateExecutionLocalWorkflow,
2023
)
2124
from iwf.tests.workflows.recovery_workflow import RecoveryWorkflow
25+
from iwf.tests.workflows.rpc_memo_workflow import RpcMemoWorkflow
2226
from iwf.tests.workflows.rpc_workflow import RPCWorkflow
2327
from iwf.tests.workflows.state_options_override_workflow import (
2428
StateOptionsOverrideWorkflow,
@@ -43,10 +47,12 @@
4347
registry.add_workflow(DescribeWorkflow())
4448
registry.add_workflow(InternalChannelWorkflow())
4549
registry.add_workflow(InternalChannelWorkflowWithNoPrefixChannel())
50+
registry.add_workflow(JavaDuplicateRpcMemoWorkflow())
4651
registry.add_workflow(PersistenceDataAttributesWorkflow())
4752
registry.add_workflow(PersistenceSearchAttributesWorkflow())
4853
registry.add_workflow(PersistenceStateExecutionLocalWorkflow())
4954
registry.add_workflow(RecoveryWorkflow())
55+
registry.add_workflow(RpcMemoWorkflow())
5056
registry.add_workflow(RPCWorkflow())
5157
registry.add_workflow(TimerWorkflow())
5258
registry.add_workflow(StateOptionsOverrideWorkflow())

iwf/tests/test_rpc_with_memo.py

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
import inspect
2+
import time
3+
import unittest
4+
5+
from iwf.client import Client
6+
from iwf.iwf_api.models.workflow_stop_type import WorkflowStopType
7+
from iwf.stop_workflow_options import StopWorkflowOptions
8+
from iwf.tests import registry
9+
from iwf.tests.workflows.rpc_memo_workflow import (
10+
RpcMemoWorkflow,
11+
TEST_DATA_OBJECT_KEY,
12+
TEST_SEARCH_ATTRIBUTE_INT,
13+
TEST_SEARCH_ATTRIBUTE_KEY,
14+
RPC_INPUT,
15+
RPC_OUTPUT,
16+
TEST_STR,
17+
TEST_DELAY,
18+
)
19+
20+
21+
class TestRpcWithMemo(unittest.TestCase):
22+
@classmethod
23+
def setUpClass(cls):
24+
cls.client = Client(registry)
25+
26+
def test_rpc_memo_workflow_func1(self):
27+
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"
28+
run_id = self.client.start_workflow(RpcMemoWorkflow, wf_id, 30, 999)
29+
30+
self.client.invoke_rpc(
31+
wf_id, RpcMemoWorkflow.test_rpc_set_data_attribute, TEST_STR
32+
)
33+
value = self.client.invoke_rpc(
34+
wf_id, RpcMemoWorkflow.test_rpc_get_data_attribute_strong_consistency
35+
)
36+
assert value == TEST_STR
37+
time.sleep(TEST_DELAY)
38+
value = self.client.invoke_rpc(
39+
wf_id, RpcMemoWorkflow.test_rpc_get_data_attribute, None, str
40+
)
41+
42+
self.client.invoke_rpc(wf_id, RpcMemoWorkflow.test_rpc_set_data_attribute, None)
43+
value = self.client.invoke_rpc(
44+
wf_id, RpcMemoWorkflow.test_rpc_get_data_attribute_strong_consistency
45+
)
46+
assert value is None
47+
time.sleep(TEST_DELAY)
48+
value = self.client.invoke_rpc(
49+
wf_id, RpcMemoWorkflow.test_rpc_get_data_attribute
50+
)
51+
assert value is None
52+
53+
self.client.invoke_rpc(wf_id, RpcMemoWorkflow.test_rpc_set_keyword, TEST_STR)
54+
value = self.client.invoke_rpc(
55+
wf_id, RpcMemoWorkflow.test_rpc_get_keyword_strong_consistency
56+
)
57+
assert value == TEST_STR
58+
time.sleep(TEST_DELAY)
59+
value = self.client.invoke_rpc(wf_id, RpcMemoWorkflow.test_rpc_get_keyword)
60+
assert value == TEST_STR
61+
62+
self.client.invoke_rpc(wf_id, RpcMemoWorkflow.test_rpc_set_keyword, None)
63+
value = self.client.invoke_rpc(
64+
wf_id, RpcMemoWorkflow.test_rpc_get_keyword_strong_consistency
65+
)
66+
assert value is None
67+
time.sleep(TEST_DELAY)
68+
value = self.client.invoke_rpc(wf_id, RpcMemoWorkflow.test_rpc_get_keyword)
69+
assert value is None
70+
71+
rpc_output = self.client.invoke_rpc(
72+
wf_id, RpcMemoWorkflow.test_rpc_func1, RPC_INPUT
73+
)
74+
assert RPC_OUTPUT == rpc_output
75+
76+
self.client.wait_for_workflow_completion(wf_id)
77+
78+
data_attributes = self.client.get_workflow_data_attributes(
79+
RpcMemoWorkflow, wf_id, run_id, [TEST_DATA_OBJECT_KEY]
80+
)
81+
assert TEST_DATA_OBJECT_KEY in data_attributes
82+
assert data_attributes[TEST_DATA_OBJECT_KEY] == RPC_INPUT
83+
84+
search_attributes = self.client.get_workflow_search_attributes(
85+
RpcMemoWorkflow,
86+
wf_id,
87+
[TEST_SEARCH_ATTRIBUTE_KEY, TEST_SEARCH_ATTRIBUTE_INT],
88+
run_id,
89+
)
90+
assert TEST_SEARCH_ATTRIBUTE_INT in search_attributes
91+
assert TEST_SEARCH_ATTRIBUTE_KEY in search_attributes
92+
assert search_attributes[TEST_SEARCH_ATTRIBUTE_INT] == RPC_OUTPUT
93+
assert search_attributes[TEST_SEARCH_ATTRIBUTE_KEY] == RPC_INPUT
94+
95+
def test_rpc_memo_workflow_func0(self):
96+
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"
97+
run_id = self.client.start_workflow(RpcMemoWorkflow, wf_id, 30, 999)
98+
99+
rpc_output = self.client.invoke_rpc(
100+
wf_id, RpcMemoWorkflow.test_rpc_func0, TEST_STR
101+
)
102+
assert RPC_OUTPUT == rpc_output
103+
104+
self.client.wait_for_workflow_completion(wf_id)
105+
106+
data_attributes = self.client.get_workflow_data_attributes(
107+
RpcMemoWorkflow, wf_id, run_id, [TEST_DATA_OBJECT_KEY]
108+
)
109+
assert TEST_DATA_OBJECT_KEY in data_attributes
110+
assert data_attributes[TEST_DATA_OBJECT_KEY] == TEST_STR
111+
112+
search_attributes = self.client.get_workflow_search_attributes(
113+
RpcMemoWorkflow,
114+
wf_id,
115+
[TEST_SEARCH_ATTRIBUTE_KEY, TEST_SEARCH_ATTRIBUTE_INT],
116+
run_id,
117+
)
118+
assert TEST_SEARCH_ATTRIBUTE_INT in search_attributes
119+
assert TEST_SEARCH_ATTRIBUTE_KEY in search_attributes
120+
assert search_attributes[TEST_SEARCH_ATTRIBUTE_INT] == RPC_OUTPUT
121+
assert search_attributes[TEST_SEARCH_ATTRIBUTE_KEY] == TEST_STR
122+
123+
def test_rpc_memo_workflow_proc1(self):
124+
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"
125+
run_id = self.client.start_workflow(RpcMemoWorkflow, wf_id, 30, 999)
126+
127+
self.client.invoke_rpc(wf_id, RpcMemoWorkflow.test_rpc_proc1, RPC_INPUT)
128+
self.client.wait_for_workflow_completion(wf_id)
129+
130+
data_attributes = self.client.get_workflow_data_attributes(
131+
RpcMemoWorkflow, wf_id, run_id, [TEST_DATA_OBJECT_KEY]
132+
)
133+
assert TEST_DATA_OBJECT_KEY in data_attributes
134+
assert data_attributes[TEST_DATA_OBJECT_KEY] == RPC_INPUT
135+
136+
search_attributes = self.client.get_workflow_search_attributes(
137+
RpcMemoWorkflow,
138+
wf_id,
139+
[TEST_SEARCH_ATTRIBUTE_KEY, TEST_SEARCH_ATTRIBUTE_INT],
140+
run_id,
141+
)
142+
assert TEST_SEARCH_ATTRIBUTE_INT in search_attributes
143+
assert TEST_SEARCH_ATTRIBUTE_KEY in search_attributes
144+
assert search_attributes[TEST_SEARCH_ATTRIBUTE_INT] == RPC_OUTPUT
145+
assert search_attributes[TEST_SEARCH_ATTRIBUTE_KEY] == RPC_INPUT
146+
147+
def test_rpc_memo_workflow_proc0(self):
148+
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"
149+
run_id = self.client.start_workflow(RpcMemoWorkflow, wf_id, 30, 999)
150+
151+
self.client.invoke_rpc(wf_id, RpcMemoWorkflow.test_rpc_proc0)
152+
self.client.wait_for_workflow_completion(wf_id)
153+
154+
data_attributes = self.client.get_workflow_data_attributes(
155+
RpcMemoWorkflow, wf_id, run_id, [TEST_DATA_OBJECT_KEY]
156+
)
157+
assert TEST_DATA_OBJECT_KEY in data_attributes
158+
assert data_attributes[TEST_DATA_OBJECT_KEY] == TEST_STR
159+
160+
search_attributes = self.client.get_workflow_search_attributes(
161+
RpcMemoWorkflow,
162+
wf_id,
163+
[TEST_SEARCH_ATTRIBUTE_KEY, TEST_SEARCH_ATTRIBUTE_INT],
164+
run_id,
165+
)
166+
assert TEST_SEARCH_ATTRIBUTE_INT in search_attributes
167+
assert TEST_SEARCH_ATTRIBUTE_KEY in search_attributes
168+
assert search_attributes[TEST_SEARCH_ATTRIBUTE_INT] == RPC_OUTPUT
169+
assert search_attributes[TEST_SEARCH_ATTRIBUTE_KEY] == TEST_STR
170+
171+
def test_rpc_memo_workflow_func1_readonly(self):
172+
wf_id = f"{inspect.currentframe().f_code.co_name}-{time.time_ns()}"
173+
self.client.start_workflow(RpcMemoWorkflow, wf_id, 30, 999)
174+
175+
rpc_output = self.client.invoke_rpc(
176+
wf_id, RpcMemoWorkflow.test_rpc_func1_readonly, RPC_INPUT
177+
)
178+
assert RPC_OUTPUT == rpc_output
179+
180+
self.client.stop_workflow(
181+
wf_id,
182+
StopWorkflowOptions(
183+
workflow_stop_type=WorkflowStopType.FAIL,
184+
reason=TEST_STR,
185+
),
186+
)

0 commit comments

Comments
 (0)