Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cadence/_internal/workflow/deterministic_event_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,12 @@ def create_task(
"eager_start in create_task is not supported for deterministic event loop"
)

return tasks.Task(coro, loop=self, **kwargs)
task = tasks.Task(coro, loop=self, **kwargs)
# We intentionally destroy pending tasks when shutting down the event loop.
# If our asyncio implementation supports it, disable the logs
if hasattr(task, "_log_destroy_pending"):
setattr(task, "_log_destroy_pending", False)
return task
Comment on lines +128 to +132
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually indicates a real data leak issue. But we can fix it later

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. There isn't a good solution to proactively delete the tasks. Cancelling tasks will trigger the workflow coroutine logic and may have unwanted side effects.

These tasks are weak references but will be deleted by GC eventually. So we'll just keep suppressing logging for now since it's expected behavior


def create_future(self) -> Future[Any]:
return futures.Future(loop=self)
Expand Down
33 changes: 5 additions & 28 deletions cadence/worker/_decision_task_handler.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
import logging
import threading
from typing import Dict, Optional, Tuple
from typing import Optional

from cadence._internal.workflow.history_event_iterator import iterate_history_events
from cadence.api.v1.common_pb2 import Payload
Expand Down Expand Up @@ -50,9 +49,6 @@ def __init__(
"""
super().__init__(client, task_list, identity, **options)
self._registry = registry
# Thread-safe cache to hold workflow engines keyed by (workflow_id, run_id)
self._workflow_engines: Dict[Tuple[str, str], WorkflowEngine] = {}
self._cache_lock = threading.RLock() # TODO: reevaluate if this is still needed
self._executor = executor

async def _handle_task_implementation(
Expand Down Expand Up @@ -123,34 +119,15 @@ async def _handle_task_implementation(
data_converter=self._client.data_converter,
)

# Use thread-safe cache to get or create workflow engine
cache_key = (workflow_id, run_id)
with self._cache_lock:
workflow_engine = self._workflow_engines.get(cache_key)
if workflow_engine is None:
workflow_engine = WorkflowEngine(
info=workflow_info,
workflow_definition=workflow_definition,
)
self._workflow_engines[cache_key] = workflow_engine
workflow_engine = WorkflowEngine(
info=workflow_info,
workflow_definition=workflow_definition,
)

decision_result = await asyncio.get_running_loop().run_in_executor(
self._executor, workflow_engine.process_decision, workflow_events
)

# Clean up completed workflows from cache to prevent memory leaks
if workflow_engine.is_done():
with self._cache_lock:
self._workflow_engines.pop(cache_key, None)
logger.debug(
"Removed completed workflow from cache",
extra={
"workflow_id": workflow_id,
"run_id": run_id,
"cache_size": len(self._workflow_engines),
},
)

# Respond with the decisions
await self._respond_decision_task_completed(task, decision_result)

Expand Down
8 changes: 8 additions & 0 deletions cadence/worker/_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ def __init__(
self._activity_worker = ActivityWorker(client, task_list, registry, options)
self._decision_worker = DecisionWorker(client, task_list, registry, options)

@property
def client(self) -> Client:
return self._client

@property
def task_list(self) -> str:
return self._task_list

async def run(self) -> None:
async with asyncio.TaskGroup() as tg:
if not self._options["disable_workflow_worker"]:
Expand Down
6 changes: 6 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
ENABLE_INTEGRATION_TESTS = "--integration-tests"
KEEP_CADENCE_ALIVE = "--keep-cadence-alive"


# Need to define the option in the root conftest.py file
Expand All @@ -8,3 +9,8 @@ def pytest_addoption(parser):
action="store_true",
help="enables running integration tests, which rely on docker and docker-compose",
)
parser.addoption(
KEEP_CADENCE_ALIVE,
action="store_true",
help="skips tearing down the docker-compose project used by the integration tests so it can be reused to quickly iterate on tests",
)
62 changes: 48 additions & 14 deletions tests/integration_tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import logging
import os
from datetime import timedelta

Expand All @@ -9,10 +10,14 @@

from cadence.api.v1.service_domain_pb2 import RegisterDomainRequest
from cadence.client import ClientOptions
from tests.conftest import ENABLE_INTEGRATION_TESTS
from cadence.error import DomainAlreadyExistsError
from tests.conftest import ENABLE_INTEGRATION_TESTS, KEEP_CADENCE_ALIVE
from tests.integration_tests.helper import CadenceHelper, DOMAIN_NAME


logger = logging.getLogger(__name__)


# Run tests in this directory and lower only if integration tests are enabled
def pytest_runtest_setup(item):
if not item.config.getoption(ENABLE_INTEGRATION_TESTS):
Expand All @@ -26,6 +31,20 @@ def docker_compose_file(pytestconfig):
)


# Use a consistent project name so it can be reused or replaced by a manually created one
@pytest.fixture(scope="session")
def docker_compose_project_name() -> str:
return "pytest-cadence"


@pytest.fixture(scope="session")
def docker_cleanup(pytestconfig):
if pytestconfig.getoption(KEEP_CADENCE_ALIVE):
return False
else:
return ["down -v"]


@pytest.fixture(scope="session")
def client_options(docker_ip: str, docker_services: Services) -> ClientOptions:
return ClientOptions(
Expand All @@ -34,22 +53,37 @@ def client_options(docker_ip: str, docker_services: Services) -> ClientOptions:
)


# We can't pass around Client objects between tests/fixtures without changing our pytest-asyncio version
# to ensure that they use the same event loop.
# Instead, we can wait for the server to be ready, create the common domain, and then provide a helper capable
# of creating additional clients within each test as needed
@pytest.fixture(scope="session")
async def helper(client_options: ClientOptions) -> CadenceHelper:
helper = CadenceHelper(client_options)
@pytest.fixture(scope="session", autouse=True)
async def create_test_domain(client_options: ClientOptions) -> None:
helper = CadenceHelper(client_options, "create_test_domain")
async with helper.client() as client:
logging.info("Connecting to service...")
# It takes around a minute for the Cadence server to start up with Cassandra
async with asyncio.timeout(120):
await client.ready()

await client.domain_stub.RegisterDomain(
RegisterDomainRequest(
name=DOMAIN_NAME,
workflow_execution_retention_period=from_timedelta(timedelta(days=1)),
try:
logging.info("Creating domain %s...", DOMAIN_NAME)
await client.domain_stub.RegisterDomain(
RegisterDomainRequest(
name=DOMAIN_NAME,
workflow_execution_retention_period=from_timedelta(
timedelta(days=1)
),
)
)
)
return CadenceHelper(client_options)
logging.info("Done creating domain %s", DOMAIN_NAME)
except DomainAlreadyExistsError:
logging.info("Domain %s already exists", DOMAIN_NAME)
return None


# We can't pass around Client objects between tests/fixtures without changing our pytest-asyncio version
# to ensure that they use the same event loop.
# Instead, we can wait for the server to be ready, create the common domain, and then provide a helper capable
# of creating additional clients within each test as needed
@pytest.fixture
async def helper(
client_options: ClientOptions, request: pytest.FixtureRequest
) -> CadenceHelper:
return CadenceHelper(client_options, request.node.name)
23 changes: 22 additions & 1 deletion tests/integration_tests/helper.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,32 @@
import asyncio
from contextlib import asynccontextmanager
from typing import AsyncGenerator

from cadence import Registry
from cadence.client import ClientOptions, Client
from cadence.worker import WorkerOptions, Worker

DOMAIN_NAME = "test-domain"


class CadenceHelper:
def __init__(self, options: ClientOptions):
def __init__(self, options: ClientOptions, test_name: str) -> None:
self.options = options
self.test_name = test_name

@asynccontextmanager
async def worker(
self, registry: Registry, **kwargs: WorkerOptions
) -> AsyncGenerator[Worker, None]:
async with self.client() as client:
worker = Worker(client, self.test_name, registry, **kwargs)
task = asyncio.create_task(worker.run())
yield worker
task.cancel()
try:
await task
except asyncio.CancelledError:
pass

def client(self):
return Client(**self.options)
146 changes: 146 additions & 0 deletions tests/integration_tests/workflow/test_activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import asyncio
from datetime import timedelta


from cadence import workflow, Registry
from cadence.api.v1.history_pb2 import EventFilterType
from cadence.api.v1.service_workflow_pb2 import (
GetWorkflowExecutionHistoryRequest,
GetWorkflowExecutionHistoryResponse,
)
from tests.integration_tests.helper import CadenceHelper, DOMAIN_NAME

reg = Registry()


@reg.activity()
async def echo(message: str) -> str:
return message


@reg.workflow()
class SingleActivity:
@workflow.run
async def run(self, message: str) -> str:
return await echo.with_options(
schedule_to_close_timeout=timedelta(seconds=10)
).execute(message)


@reg.workflow()
class MultipleActivities:
@workflow.run
async def run(self, message: str) -> str:
await echo.with_options(
schedule_to_close_timeout=timedelta(seconds=10)
).execute("first")

second = await echo.with_options(
schedule_to_close_timeout=timedelta(seconds=10)
).execute(message)

await echo.with_options(
schedule_to_close_timeout=timedelta(seconds=10)
).execute("third")

return second


@reg.workflow()
class ParallelActivities:
@workflow.run
async def run(self, message: str) -> str:
first = echo.with_options(
schedule_to_close_timeout=timedelta(seconds=10)
).execute("first")

second = echo.with_options(
schedule_to_close_timeout=timedelta(seconds=10)
).execute(message)

first_res, second_res = await asyncio.gather(
first, second, return_exceptions=True
)

return second_res


async def test_single_activity(helper: CadenceHelper):
async with helper.worker(reg) as worker:
execution = await worker.client.start_workflow(
"SingleActivity",
"hello world",
task_list=worker.task_list,
execution_start_to_close_timeout=timedelta(seconds=10),
)

response: GetWorkflowExecutionHistoryResponse = await worker.client.workflow_stub.GetWorkflowExecutionHistory(
GetWorkflowExecutionHistoryRequest(
domain=DOMAIN_NAME,
workflow_execution=execution,
wait_for_new_event=True,
history_event_filter_type=EventFilterType.EVENT_FILTER_TYPE_CLOSE_EVENT,
skip_archival=True,
)
)

assert (
'"hello world"'
== response.history.events[
-1
].workflow_execution_completed_event_attributes.result.data.decode()
)


async def test_multiple_activities(helper: CadenceHelper):
async with helper.worker(reg) as worker:
execution = await worker.client.start_workflow(
"MultipleActivities",
"hello world",
task_list=worker.task_list,
execution_start_to_close_timeout=timedelta(seconds=10),
)

response: GetWorkflowExecutionHistoryResponse = await worker.client.workflow_stub.GetWorkflowExecutionHistory(
GetWorkflowExecutionHistoryRequest(
domain=DOMAIN_NAME,
workflow_execution=execution,
wait_for_new_event=True,
history_event_filter_type=EventFilterType.EVENT_FILTER_TYPE_CLOSE_EVENT,
skip_archival=True,
)
)

assert (
'"hello world"'
== response.history.events[
-1
].workflow_execution_completed_event_attributes.result.data.decode()
)


async def test_parallel_activities(helper: CadenceHelper):
async with helper.worker(reg) as worker:
execution = await worker.client.start_workflow(
"ParallelActivities",
"hello world",
task_list=worker.task_list,
execution_start_to_close_timeout=timedelta(seconds=10),
)

response: GetWorkflowExecutionHistoryResponse = await worker.client.workflow_stub.GetWorkflowExecutionHistory(
GetWorkflowExecutionHistoryRequest(
domain=DOMAIN_NAME,
workflow_execution=execution,
wait_for_new_event=True,
history_event_filter_type=EventFilterType.EVENT_FILTER_TYPE_CLOSE_EVENT,
skip_archival=True,
)
)

assert (
'"hello world"'
== response.history.events[
-1
].workflow_execution_completed_event_attributes.result.data.decode()
)
Loading