Skip to content

Commit 890fe83

Browse files
authored
feat: Remove WorkflowEngine Caching and add basic Activity/Workflow integration tests (#55)
<!-- Describe what has changed in this PR --> **What changed?** - Remove WorkflowEngine caching. It's required for sticky TaskLists, which we haven't implemented yet. We can reintroduce similar logic during that process. - Suppress logs on destroyed pending tasks. This is the expected behavior for our event loop. - Add utilities to scaffold integration tests - Add integration tests for basic workflow functionality - Add support for keeping the docker container running or using an alternative docker configuration, making it faster to iterate on integration tests. <!-- Tell your future self why have you made these changes --> **Why?** - Get core workflow execution working <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** - Integration tests <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes**
1 parent 6e7630c commit 890fe83

File tree

8 files changed

+319
-44
lines changed

8 files changed

+319
-44
lines changed

cadence/_internal/workflow/deterministic_event_loop.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,12 @@ def create_task(
124124
"eager_start in create_task is not supported for deterministic event loop"
125125
)
126126

127-
return tasks.Task(coro, loop=self, **kwargs)
127+
task = tasks.Task(coro, loop=self, **kwargs)
128+
# We intentionally destroy pending tasks when shutting down the event loop.
129+
# If our asyncio implementation supports it, disable the logs
130+
if hasattr(task, "_log_destroy_pending"):
131+
setattr(task, "_log_destroy_pending", False)
132+
return task
128133

129134
def create_future(self) -> Future[Any]:
130135
return futures.Future(loop=self)

cadence/worker/_decision_task_handler.py

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import asyncio
22
from concurrent.futures import ThreadPoolExecutor
33
import logging
4-
import threading
5-
from typing import Dict, Optional, Tuple
4+
from typing import Optional
65

76
from cadence._internal.workflow.history_event_iterator import iterate_history_events
87
from cadence.api.v1.common_pb2 import Payload
@@ -50,9 +49,6 @@ def __init__(
5049
"""
5150
super().__init__(client, task_list, identity, **options)
5251
self._registry = registry
53-
# Thread-safe cache to hold workflow engines keyed by (workflow_id, run_id)
54-
self._workflow_engines: Dict[Tuple[str, str], WorkflowEngine] = {}
55-
self._cache_lock = threading.RLock() # TODO: reevaluate if this is still needed
5652
self._executor = executor
5753

5854
async def _handle_task_implementation(
@@ -123,34 +119,15 @@ async def _handle_task_implementation(
123119
data_converter=self._client.data_converter,
124120
)
125121

126-
# Use thread-safe cache to get or create workflow engine
127-
cache_key = (workflow_id, run_id)
128-
with self._cache_lock:
129-
workflow_engine = self._workflow_engines.get(cache_key)
130-
if workflow_engine is None:
131-
workflow_engine = WorkflowEngine(
132-
info=workflow_info,
133-
workflow_definition=workflow_definition,
134-
)
135-
self._workflow_engines[cache_key] = workflow_engine
122+
workflow_engine = WorkflowEngine(
123+
info=workflow_info,
124+
workflow_definition=workflow_definition,
125+
)
136126

137127
decision_result = await asyncio.get_running_loop().run_in_executor(
138128
self._executor, workflow_engine.process_decision, workflow_events
139129
)
140130

141-
# Clean up completed workflows from cache to prevent memory leaks
142-
if workflow_engine.is_done():
143-
with self._cache_lock:
144-
self._workflow_engines.pop(cache_key, None)
145-
logger.debug(
146-
"Removed completed workflow from cache",
147-
extra={
148-
"workflow_id": workflow_id,
149-
"run_id": run_id,
150-
"cache_size": len(self._workflow_engines),
151-
},
152-
)
153-
154131
# Respond with the decisions
155132
await self._respond_decision_task_completed(task, decision_result)
156133

cadence/worker/_worker.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@ def __init__(
2626
self._activity_worker = ActivityWorker(client, task_list, registry, options)
2727
self._decision_worker = DecisionWorker(client, task_list, registry, options)
2828

29+
@property
30+
def client(self) -> Client:
31+
return self._client
32+
33+
@property
34+
def task_list(self) -> str:
35+
return self._task_list
36+
2937
async def run(self) -> None:
3038
async with asyncio.TaskGroup() as tg:
3139
if not self._options["disable_workflow_worker"]:

tests/conftest.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
ENABLE_INTEGRATION_TESTS = "--integration-tests"
2+
KEEP_CADENCE_ALIVE = "--keep-cadence-alive"
23

34

45
# Need to define the option in the root conftest.py file
@@ -8,3 +9,8 @@ def pytest_addoption(parser):
89
action="store_true",
910
help="enables running integration tests, which rely on docker and docker-compose",
1011
)
12+
parser.addoption(
13+
KEEP_CADENCE_ALIVE,
14+
action="store_true",
15+
help="skips tearing down the docker-compose project used by the integration tests so it can be reused to quickly iterate on tests",
16+
)
Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import logging
23
import os
34
from datetime import timedelta
45

@@ -9,10 +10,14 @@
910

1011
from cadence.api.v1.service_domain_pb2 import RegisterDomainRequest
1112
from cadence.client import ClientOptions
12-
from tests.conftest import ENABLE_INTEGRATION_TESTS
13+
from cadence.error import DomainAlreadyExistsError
14+
from tests.conftest import ENABLE_INTEGRATION_TESTS, KEEP_CADENCE_ALIVE
1315
from tests.integration_tests.helper import CadenceHelper, DOMAIN_NAME
1416

1517

18+
logger = logging.getLogger(__name__)
19+
20+
1621
# Run tests in this directory and lower only if integration tests are enabled
1722
def pytest_runtest_setup(item):
1823
if not item.config.getoption(ENABLE_INTEGRATION_TESTS):
@@ -26,6 +31,20 @@ def docker_compose_file(pytestconfig):
2631
)
2732

2833

34+
# Use a consistent project name so it can be reused or replaced by a manually created one
35+
@pytest.fixture(scope="session")
36+
def docker_compose_project_name() -> str:
37+
return "pytest-cadence"
38+
39+
40+
@pytest.fixture(scope="session")
41+
def docker_cleanup(pytestconfig):
42+
if pytestconfig.getoption(KEEP_CADENCE_ALIVE):
43+
return False
44+
else:
45+
return ["down -v"]
46+
47+
2948
@pytest.fixture(scope="session")
3049
def client_options(docker_ip: str, docker_services: Services) -> ClientOptions:
3150
return ClientOptions(
@@ -34,22 +53,37 @@ def client_options(docker_ip: str, docker_services: Services) -> ClientOptions:
3453
)
3554

3655

37-
# We can't pass around Client objects between tests/fixtures without changing our pytest-asyncio version
38-
# to ensure that they use the same event loop.
39-
# Instead, we can wait for the server to be ready, create the common domain, and then provide a helper capable
40-
# of creating additional clients within each test as needed
41-
@pytest.fixture(scope="session")
42-
async def helper(client_options: ClientOptions) -> CadenceHelper:
43-
helper = CadenceHelper(client_options)
56+
@pytest.fixture(scope="session", autouse=True)
57+
async def create_test_domain(client_options: ClientOptions) -> None:
58+
helper = CadenceHelper(client_options, "create_test_domain")
4459
async with helper.client() as client:
60+
logging.info("Connecting to service...")
4561
# It takes around a minute for the Cadence server to start up with Cassandra
4662
async with asyncio.timeout(120):
4763
await client.ready()
4864

49-
await client.domain_stub.RegisterDomain(
50-
RegisterDomainRequest(
51-
name=DOMAIN_NAME,
52-
workflow_execution_retention_period=from_timedelta(timedelta(days=1)),
65+
try:
66+
logging.info("Creating domain %s...", DOMAIN_NAME)
67+
await client.domain_stub.RegisterDomain(
68+
RegisterDomainRequest(
69+
name=DOMAIN_NAME,
70+
workflow_execution_retention_period=from_timedelta(
71+
timedelta(days=1)
72+
),
73+
)
5374
)
54-
)
55-
return CadenceHelper(client_options)
75+
logging.info("Done creating domain %s", DOMAIN_NAME)
76+
except DomainAlreadyExistsError:
77+
logging.info("Domain %s already exists", DOMAIN_NAME)
78+
return None
79+
80+
81+
# We can't pass around Client objects between tests/fixtures without changing our pytest-asyncio version
82+
# to ensure that they use the same event loop.
83+
# Instead, we can wait for the server to be ready, create the common domain, and then provide a helper capable
84+
# of creating additional clients within each test as needed
85+
@pytest.fixture
86+
async def helper(
87+
client_options: ClientOptions, request: pytest.FixtureRequest
88+
) -> CadenceHelper:
89+
return CadenceHelper(client_options, request.node.name)

tests/integration_tests/helper.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,32 @@
1+
import asyncio
2+
from contextlib import asynccontextmanager
3+
from typing import AsyncGenerator
4+
5+
from cadence import Registry
16
from cadence.client import ClientOptions, Client
7+
from cadence.worker import WorkerOptions, Worker
28

39
DOMAIN_NAME = "test-domain"
410

511

612
class CadenceHelper:
7-
def __init__(self, options: ClientOptions):
13+
def __init__(self, options: ClientOptions, test_name: str) -> None:
814
self.options = options
15+
self.test_name = test_name
16+
17+
@asynccontextmanager
18+
async def worker(
19+
self, registry: Registry, **kwargs: WorkerOptions
20+
) -> AsyncGenerator[Worker, None]:
21+
async with self.client() as client:
22+
worker = Worker(client, self.test_name, registry, **kwargs)
23+
task = asyncio.create_task(worker.run())
24+
yield worker
25+
task.cancel()
26+
try:
27+
await task
28+
except asyncio.CancelledError:
29+
pass
930

1031
def client(self):
1132
return Client(**self.options)
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
import asyncio
2+
from datetime import timedelta
3+
4+
5+
from cadence import workflow, Registry
6+
from cadence.api.v1.history_pb2 import EventFilterType
7+
from cadence.api.v1.service_workflow_pb2 import (
8+
GetWorkflowExecutionHistoryRequest,
9+
GetWorkflowExecutionHistoryResponse,
10+
)
11+
from tests.integration_tests.helper import CadenceHelper, DOMAIN_NAME
12+
13+
reg = Registry()
14+
15+
16+
@reg.activity()
17+
async def echo(message: str) -> str:
18+
return message
19+
20+
21+
@reg.workflow()
22+
class SingleActivity:
23+
@workflow.run
24+
async def run(self, message: str) -> str:
25+
return await echo.with_options(
26+
schedule_to_close_timeout=timedelta(seconds=10)
27+
).execute(message)
28+
29+
30+
@reg.workflow()
31+
class MultipleActivities:
32+
@workflow.run
33+
async def run(self, message: str) -> str:
34+
await echo.with_options(
35+
schedule_to_close_timeout=timedelta(seconds=10)
36+
).execute("first")
37+
38+
second = await echo.with_options(
39+
schedule_to_close_timeout=timedelta(seconds=10)
40+
).execute(message)
41+
42+
await echo.with_options(
43+
schedule_to_close_timeout=timedelta(seconds=10)
44+
).execute("third")
45+
46+
return second
47+
48+
49+
@reg.workflow()
50+
class ParallelActivities:
51+
@workflow.run
52+
async def run(self, message: str) -> str:
53+
first = echo.with_options(
54+
schedule_to_close_timeout=timedelta(seconds=10)
55+
).execute("first")
56+
57+
second = echo.with_options(
58+
schedule_to_close_timeout=timedelta(seconds=10)
59+
).execute(message)
60+
61+
first_res, second_res = await asyncio.gather(
62+
first, second, return_exceptions=True
63+
)
64+
65+
return second_res
66+
67+
68+
async def test_single_activity(helper: CadenceHelper):
69+
async with helper.worker(reg) as worker:
70+
execution = await worker.client.start_workflow(
71+
"SingleActivity",
72+
"hello world",
73+
task_list=worker.task_list,
74+
execution_start_to_close_timeout=timedelta(seconds=10),
75+
)
76+
77+
response: GetWorkflowExecutionHistoryResponse = await worker.client.workflow_stub.GetWorkflowExecutionHistory(
78+
GetWorkflowExecutionHistoryRequest(
79+
domain=DOMAIN_NAME,
80+
workflow_execution=execution,
81+
wait_for_new_event=True,
82+
history_event_filter_type=EventFilterType.EVENT_FILTER_TYPE_CLOSE_EVENT,
83+
skip_archival=True,
84+
)
85+
)
86+
87+
assert (
88+
'"hello world"'
89+
== response.history.events[
90+
-1
91+
].workflow_execution_completed_event_attributes.result.data.decode()
92+
)
93+
94+
95+
async def test_multiple_activities(helper: CadenceHelper):
96+
async with helper.worker(reg) as worker:
97+
execution = await worker.client.start_workflow(
98+
"MultipleActivities",
99+
"hello world",
100+
task_list=worker.task_list,
101+
execution_start_to_close_timeout=timedelta(seconds=10),
102+
)
103+
104+
response: GetWorkflowExecutionHistoryResponse = await worker.client.workflow_stub.GetWorkflowExecutionHistory(
105+
GetWorkflowExecutionHistoryRequest(
106+
domain=DOMAIN_NAME,
107+
workflow_execution=execution,
108+
wait_for_new_event=True,
109+
history_event_filter_type=EventFilterType.EVENT_FILTER_TYPE_CLOSE_EVENT,
110+
skip_archival=True,
111+
)
112+
)
113+
114+
assert (
115+
'"hello world"'
116+
== response.history.events[
117+
-1
118+
].workflow_execution_completed_event_attributes.result.data.decode()
119+
)
120+
121+
122+
async def test_parallel_activities(helper: CadenceHelper):
123+
async with helper.worker(reg) as worker:
124+
execution = await worker.client.start_workflow(
125+
"ParallelActivities",
126+
"hello world",
127+
task_list=worker.task_list,
128+
execution_start_to_close_timeout=timedelta(seconds=10),
129+
)
130+
131+
response: GetWorkflowExecutionHistoryResponse = await worker.client.workflow_stub.GetWorkflowExecutionHistory(
132+
GetWorkflowExecutionHistoryRequest(
133+
domain=DOMAIN_NAME,
134+
workflow_execution=execution,
135+
wait_for_new_event=True,
136+
history_event_filter_type=EventFilterType.EVENT_FILTER_TYPE_CLOSE_EVENT,
137+
skip_archival=True,
138+
)
139+
)
140+
141+
assert (
142+
'"hello world"'
143+
== response.history.events[
144+
-1
145+
].workflow_execution_completed_event_attributes.result.data.decode()
146+
)

0 commit comments

Comments
 (0)