diff --git a/cadence/_internal/workflow/deterministic_event_loop.py b/cadence/_internal/workflow/deterministic_event_loop.py index 305759e..3ee4fe2 100644 --- a/cadence/_internal/workflow/deterministic_event_loop.py +++ b/cadence/_internal/workflow/deterministic_event_loop.py @@ -124,7 +124,12 @@ def create_task( "eager_start in create_task is not supported for deterministic event loop" ) - return tasks.Task(coro, loop=self, **kwargs) + task = tasks.Task(coro, loop=self, **kwargs) + # We intentionally destroy pending tasks when shutting down the event loop. + # If our asyncio implementation supports it, disable the logs + if hasattr(task, "_log_destroy_pending"): + setattr(task, "_log_destroy_pending", False) + return task def create_future(self) -> Future[Any]: return futures.Future(loop=self) diff --git a/cadence/worker/_decision_task_handler.py b/cadence/worker/_decision_task_handler.py index e40bb13..eb32eb6 100644 --- a/cadence/worker/_decision_task_handler.py +++ b/cadence/worker/_decision_task_handler.py @@ -1,8 +1,7 @@ import asyncio from concurrent.futures import ThreadPoolExecutor import logging -import threading -from typing import Dict, Optional, Tuple +from typing import Optional from cadence._internal.workflow.history_event_iterator import iterate_history_events from cadence.api.v1.common_pb2 import Payload @@ -50,9 +49,6 @@ def __init__( """ super().__init__(client, task_list, identity, **options) self._registry = registry - # Thread-safe cache to hold workflow engines keyed by (workflow_id, run_id) - self._workflow_engines: Dict[Tuple[str, str], WorkflowEngine] = {} - self._cache_lock = threading.RLock() # TODO: reevaluate if this is still needed self._executor = executor async def _handle_task_implementation( @@ -123,34 +119,15 @@ async def _handle_task_implementation( data_converter=self._client.data_converter, ) - # Use thread-safe cache to get or create workflow engine - cache_key = (workflow_id, run_id) - with self._cache_lock: - workflow_engine = self._workflow_engines.get(cache_key) - if workflow_engine is None: - workflow_engine = WorkflowEngine( - info=workflow_info, - workflow_definition=workflow_definition, - ) - self._workflow_engines[cache_key] = workflow_engine + workflow_engine = WorkflowEngine( + info=workflow_info, + workflow_definition=workflow_definition, + ) decision_result = await asyncio.get_running_loop().run_in_executor( self._executor, workflow_engine.process_decision, workflow_events ) - # Clean up completed workflows from cache to prevent memory leaks - if workflow_engine.is_done(): - with self._cache_lock: - self._workflow_engines.pop(cache_key, None) - logger.debug( - "Removed completed workflow from cache", - extra={ - "workflow_id": workflow_id, - "run_id": run_id, - "cache_size": len(self._workflow_engines), - }, - ) - # Respond with the decisions await self._respond_decision_task_completed(task, decision_result) diff --git a/cadence/worker/_worker.py b/cadence/worker/_worker.py index 6d57d30..f540958 100644 --- a/cadence/worker/_worker.py +++ b/cadence/worker/_worker.py @@ -26,6 +26,14 @@ def __init__( self._activity_worker = ActivityWorker(client, task_list, registry, options) self._decision_worker = DecisionWorker(client, task_list, registry, options) + @property + def client(self) -> Client: + return self._client + + @property + def task_list(self) -> str: + return self._task_list + async def run(self) -> None: async with asyncio.TaskGroup() as tg: if not self._options["disable_workflow_worker"]: diff --git a/tests/conftest.py b/tests/conftest.py index 4983c71..371f1f1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ ENABLE_INTEGRATION_TESTS = "--integration-tests" +KEEP_CADENCE_ALIVE = "--keep-cadence-alive" # Need to define the option in the root conftest.py file @@ -8,3 +9,8 @@ def pytest_addoption(parser): action="store_true", help="enables running integration tests, which rely on docker and docker-compose", ) + parser.addoption( + KEEP_CADENCE_ALIVE, + action="store_true", + help="skips tearing down the docker-compose project used by the integration tests so it can be reused to quickly iterate on tests", + ) diff --git a/tests/integration_tests/conftest.py b/tests/integration_tests/conftest.py index a4486eb..0c288dd 100644 --- a/tests/integration_tests/conftest.py +++ b/tests/integration_tests/conftest.py @@ -1,4 +1,5 @@ import asyncio +import logging import os from datetime import timedelta @@ -9,10 +10,14 @@ from cadence.api.v1.service_domain_pb2 import RegisterDomainRequest from cadence.client import ClientOptions -from tests.conftest import ENABLE_INTEGRATION_TESTS +from cadence.error import DomainAlreadyExistsError +from tests.conftest import ENABLE_INTEGRATION_TESTS, KEEP_CADENCE_ALIVE from tests.integration_tests.helper import CadenceHelper, DOMAIN_NAME +logger = logging.getLogger(__name__) + + # Run tests in this directory and lower only if integration tests are enabled def pytest_runtest_setup(item): if not item.config.getoption(ENABLE_INTEGRATION_TESTS): @@ -26,6 +31,20 @@ def docker_compose_file(pytestconfig): ) +# Use a consistent project name so it can be reused or replaced by a manually created one +@pytest.fixture(scope="session") +def docker_compose_project_name() -> str: + return "pytest-cadence" + + +@pytest.fixture(scope="session") +def docker_cleanup(pytestconfig): + if pytestconfig.getoption(KEEP_CADENCE_ALIVE): + return False + else: + return ["down -v"] + + @pytest.fixture(scope="session") def client_options(docker_ip: str, docker_services: Services) -> ClientOptions: return ClientOptions( @@ -34,22 +53,37 @@ def client_options(docker_ip: str, docker_services: Services) -> ClientOptions: ) -# We can't pass around Client objects between tests/fixtures without changing our pytest-asyncio version -# to ensure that they use the same event loop. -# Instead, we can wait for the server to be ready, create the common domain, and then provide a helper capable -# of creating additional clients within each test as needed -@pytest.fixture(scope="session") -async def helper(client_options: ClientOptions) -> CadenceHelper: - helper = CadenceHelper(client_options) +@pytest.fixture(scope="session", autouse=True) +async def create_test_domain(client_options: ClientOptions) -> None: + helper = CadenceHelper(client_options, "create_test_domain") async with helper.client() as client: + logging.info("Connecting to service...") # It takes around a minute for the Cadence server to start up with Cassandra async with asyncio.timeout(120): await client.ready() - await client.domain_stub.RegisterDomain( - RegisterDomainRequest( - name=DOMAIN_NAME, - workflow_execution_retention_period=from_timedelta(timedelta(days=1)), + try: + logging.info("Creating domain %s...", DOMAIN_NAME) + await client.domain_stub.RegisterDomain( + RegisterDomainRequest( + name=DOMAIN_NAME, + workflow_execution_retention_period=from_timedelta( + timedelta(days=1) + ), + ) ) - ) - return CadenceHelper(client_options) + logging.info("Done creating domain %s", DOMAIN_NAME) + except DomainAlreadyExistsError: + logging.info("Domain %s already exists", DOMAIN_NAME) + return None + + +# We can't pass around Client objects between tests/fixtures without changing our pytest-asyncio version +# to ensure that they use the same event loop. +# Instead, we can wait for the server to be ready, create the common domain, and then provide a helper capable +# of creating additional clients within each test as needed +@pytest.fixture +async def helper( + client_options: ClientOptions, request: pytest.FixtureRequest +) -> CadenceHelper: + return CadenceHelper(client_options, request.node.name) diff --git a/tests/integration_tests/helper.py b/tests/integration_tests/helper.py index 5dac03d..6664611 100644 --- a/tests/integration_tests/helper.py +++ b/tests/integration_tests/helper.py @@ -1,11 +1,32 @@ +import asyncio +from contextlib import asynccontextmanager +from typing import AsyncGenerator + +from cadence import Registry from cadence.client import ClientOptions, Client +from cadence.worker import WorkerOptions, Worker DOMAIN_NAME = "test-domain" class CadenceHelper: - def __init__(self, options: ClientOptions): + def __init__(self, options: ClientOptions, test_name: str) -> None: self.options = options + self.test_name = test_name + + @asynccontextmanager + async def worker( + self, registry: Registry, **kwargs: WorkerOptions + ) -> AsyncGenerator[Worker, None]: + async with self.client() as client: + worker = Worker(client, self.test_name, registry, **kwargs) + task = asyncio.create_task(worker.run()) + yield worker + task.cancel() + try: + await task + except asyncio.CancelledError: + pass def client(self): return Client(**self.options) diff --git a/tests/integration_tests/workflow/test_activities.py b/tests/integration_tests/workflow/test_activities.py new file mode 100644 index 0000000..7e6c0cc --- /dev/null +++ b/tests/integration_tests/workflow/test_activities.py @@ -0,0 +1,146 @@ +import asyncio +from datetime import timedelta + + +from cadence import workflow, Registry +from cadence.api.v1.history_pb2 import EventFilterType +from cadence.api.v1.service_workflow_pb2 import ( + GetWorkflowExecutionHistoryRequest, + GetWorkflowExecutionHistoryResponse, +) +from tests.integration_tests.helper import CadenceHelper, DOMAIN_NAME + +reg = Registry() + + +@reg.activity() +async def echo(message: str) -> str: + return message + + +@reg.workflow() +class SingleActivity: + @workflow.run + async def run(self, message: str) -> str: + return await echo.with_options( + schedule_to_close_timeout=timedelta(seconds=10) + ).execute(message) + + +@reg.workflow() +class MultipleActivities: + @workflow.run + async def run(self, message: str) -> str: + await echo.with_options( + schedule_to_close_timeout=timedelta(seconds=10) + ).execute("first") + + second = await echo.with_options( + schedule_to_close_timeout=timedelta(seconds=10) + ).execute(message) + + await echo.with_options( + schedule_to_close_timeout=timedelta(seconds=10) + ).execute("third") + + return second + + +@reg.workflow() +class ParallelActivities: + @workflow.run + async def run(self, message: str) -> str: + first = echo.with_options( + schedule_to_close_timeout=timedelta(seconds=10) + ).execute("first") + + second = echo.with_options( + schedule_to_close_timeout=timedelta(seconds=10) + ).execute(message) + + first_res, second_res = await asyncio.gather( + first, second, return_exceptions=True + ) + + return second_res + + +async def test_single_activity(helper: CadenceHelper): + async with helper.worker(reg) as worker: + execution = await worker.client.start_workflow( + "SingleActivity", + "hello world", + task_list=worker.task_list, + execution_start_to_close_timeout=timedelta(seconds=10), + ) + + response: GetWorkflowExecutionHistoryResponse = await worker.client.workflow_stub.GetWorkflowExecutionHistory( + GetWorkflowExecutionHistoryRequest( + domain=DOMAIN_NAME, + workflow_execution=execution, + wait_for_new_event=True, + history_event_filter_type=EventFilterType.EVENT_FILTER_TYPE_CLOSE_EVENT, + skip_archival=True, + ) + ) + + assert ( + '"hello world"' + == response.history.events[ + -1 + ].workflow_execution_completed_event_attributes.result.data.decode() + ) + + +async def test_multiple_activities(helper: CadenceHelper): + async with helper.worker(reg) as worker: + execution = await worker.client.start_workflow( + "MultipleActivities", + "hello world", + task_list=worker.task_list, + execution_start_to_close_timeout=timedelta(seconds=10), + ) + + response: GetWorkflowExecutionHistoryResponse = await worker.client.workflow_stub.GetWorkflowExecutionHistory( + GetWorkflowExecutionHistoryRequest( + domain=DOMAIN_NAME, + workflow_execution=execution, + wait_for_new_event=True, + history_event_filter_type=EventFilterType.EVENT_FILTER_TYPE_CLOSE_EVENT, + skip_archival=True, + ) + ) + + assert ( + '"hello world"' + == response.history.events[ + -1 + ].workflow_execution_completed_event_attributes.result.data.decode() + ) + + +async def test_parallel_activities(helper: CadenceHelper): + async with helper.worker(reg) as worker: + execution = await worker.client.start_workflow( + "ParallelActivities", + "hello world", + task_list=worker.task_list, + execution_start_to_close_timeout=timedelta(seconds=10), + ) + + response: GetWorkflowExecutionHistoryResponse = await worker.client.workflow_stub.GetWorkflowExecutionHistory( + GetWorkflowExecutionHistoryRequest( + domain=DOMAIN_NAME, + workflow_execution=execution, + wait_for_new_event=True, + history_event_filter_type=EventFilterType.EVENT_FILTER_TYPE_CLOSE_EVENT, + skip_archival=True, + ) + ) + + assert ( + '"hello world"' + == response.history.events[ + -1 + ].workflow_execution_completed_event_attributes.result.data.decode() + ) diff --git a/tests/integration_tests/workflow/test_workflows.py b/tests/integration_tests/workflow/test_workflows.py new file mode 100644 index 0000000..84ccf91 --- /dev/null +++ b/tests/integration_tests/workflow/test_workflows.py @@ -0,0 +1,78 @@ +from datetime import timedelta + +import pytest + +from cadence import Registry, workflow +from cadence.api.v1.history_pb2 import EventFilterType +from cadence.api.v1.service_workflow_pb2 import ( + GetWorkflowExecutionHistoryResponse, + GetWorkflowExecutionHistoryRequest, +) +from tests.integration_tests.helper import CadenceHelper, DOMAIN_NAME + +reg = Registry() + + +@reg.workflow() +class EchoWorkflow: + def __init__(self): + pass + + @workflow.run + async def echo(self, message: str) -> str: + return message + + +async def test_simple_workflow(helper: CadenceHelper): + async with helper.worker(reg) as worker: + execution = await worker.client.start_workflow( + "EchoWorkflow", + "hello world", + task_list=worker.task_list, + execution_start_to_close_timeout=timedelta(seconds=10), + ) + + response: GetWorkflowExecutionHistoryResponse = await worker.client.workflow_stub.GetWorkflowExecutionHistory( + GetWorkflowExecutionHistoryRequest( + domain=DOMAIN_NAME, + workflow_execution=execution, + wait_for_new_event=True, + history_event_filter_type=EventFilterType.EVENT_FILTER_TYPE_CLOSE_EVENT, + skip_archival=True, + ) + ) + + assert ( + '"hello world"' + == response.history.events[ + -1 + ].workflow_execution_completed_event_attributes.result.data.decode() + ) + + +@pytest.mark.skip(reason="Incorrect WorkflowType") +async def test_workflow_fn(helper: CadenceHelper): + async with helper.worker(reg) as worker: + execution = await worker.client.start_workflow( + EchoWorkflow.echo, + "hello world", + task_list=worker.task_list, + execution_start_to_close_timeout=timedelta(seconds=10), + ) + + response: GetWorkflowExecutionHistoryResponse = await worker.client.workflow_stub.GetWorkflowExecutionHistory( + GetWorkflowExecutionHistoryRequest( + domain=DOMAIN_NAME, + workflow_execution=execution, + wait_for_new_event=True, + history_event_filter_type=EventFilterType.EVENT_FILTER_TYPE_CLOSE_EVENT, + skip_archival=True, + ) + ) + + assert ( + '"hello world"' + == response.history.events[ + -1 + ].workflow_execution_completed_event_attributes.result.data.decode() + )