From 324de45c9b7766ff7d8f3db53837c86172074a77 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Mon, 15 Sep 2025 10:29:24 -0700 Subject: [PATCH 01/12] integrate into worker Signed-off-by: Tim Li --- cadence/worker/_decision.py | 19 +++++++------------ cadence/worker/_worker.py | 2 +- 2 files changed, 8 insertions(+), 13 deletions(-) diff --git a/cadence/worker/_decision.py b/cadence/worker/_decision.py index 47e0817..35e0a10 100644 --- a/cadence/worker/_decision.py +++ b/cadence/worker/_decision.py @@ -1,22 +1,22 @@ import asyncio from typing import Optional -from cadence.api.v1.common_pb2 import Payload -from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskRequest, PollForDecisionTaskResponse, \ - RespondDecisionTaskFailedRequest +from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskRequest, PollForDecisionTaskResponse from cadence.api.v1.tasklist_pb2 import TaskList, TaskListKind -from cadence.api.v1.workflow_pb2 import DecisionTaskFailedCause from cadence.client import Client from cadence.worker._poller import Poller from cadence.worker._types import WorkerOptions, _LONG_POLL_TIMEOUT +from cadence.worker._decision_task_handler import DecisionTaskHandler +from cadence.worker._registry import Registry class DecisionWorker: - def __init__(self, client: Client, task_list: str, options: WorkerOptions) -> None: + def __init__(self, client: Client, task_list: str, registry: Registry, options: WorkerOptions) -> None: self._client = client self._task_list = task_list self._identity = options["identity"] permits = asyncio.Semaphore(options["max_concurrent_decision_task_execution_size"]) + self._decision_handler = DecisionTaskHandler(client, task_list, registry, **options) self._poller = Poller[PollForDecisionTaskResponse](options["decision_task_pollers"], permits, self._poll, self._execute) # TODO: Sticky poller, actually running workflows, etc. @@ -30,17 +30,12 @@ async def _poll(self) -> Optional[PollForDecisionTaskResponse]: identity=self._identity, ), timeout=_LONG_POLL_TIMEOUT) - if task.task_token: + if task and task.task_token: return task else: return None async def _execute(self, task: PollForDecisionTaskResponse) -> None: - await self._client.worker_stub.RespondDecisionTaskFailed(RespondDecisionTaskFailedRequest( - task_token=task.task_token, - cause=DecisionTaskFailedCause.DECISION_TASK_FAILED_CAUSE_UNHANDLED_DECISION, - identity=self._identity, - details=Payload(data=b'not implemented') - )) + await self._decision_handler.handle_task(task) diff --git a/cadence/worker/_worker.py b/cadence/worker/_worker.py index 70ce364..ff273ad 100644 --- a/cadence/worker/_worker.py +++ b/cadence/worker/_worker.py @@ -19,7 +19,7 @@ def __init__(self, client: Client, task_list: str, registry: Registry, **kwargs: _validate_and_copy_defaults(client, task_list, options) self._options = options self._activity_worker = ActivityWorker(client, task_list, registry, options) - self._decision_worker = DecisionWorker(client, task_list, options) + self._decision_worker = DecisionWorker(client, task_list, registry, options) async def run(self) -> None: From 428f0ac30bad9f398f07450f1f4251df7083088a Mon Sep 17 00:00:00 2001 From: Tim Li Date: Mon, 15 Sep 2025 13:32:46 -0700 Subject: [PATCH 02/12] Add workflow engine integration and fix test compatibility Signed-off-by: Tim Li --- cadence/_internal/workflow/workflow_engine.py | 173 +++++++++- cadence/worker/_decision.py | 1 + cadence/worker/_decision_task_handler.py | 1 + .../test_workflow_engine_integration.py | 312 ++++++++++++++++++ .../worker/test_decision_task_handler.py | 10 +- .../test_decision_task_handler_integration.py | 240 ++++++++++++++ .../test_decision_worker_integration.py | 286 ++++++++++++++++ .../worker/test_task_handler_integration.py | 10 + 8 files changed, 1027 insertions(+), 6 deletions(-) create mode 100644 tests/cadence/_internal/workflow/test_workflow_engine_integration.py create mode 100644 tests/cadence/worker/test_decision_task_handler_integration.py create mode 100644 tests/cadence/worker/test_decision_worker_integration.py diff --git a/cadence/_internal/workflow/workflow_engine.py b/cadence/_internal/workflow/workflow_engine.py index 00fac2c..ff5f01f 100644 --- a/cadence/_internal/workflow/workflow_engine.py +++ b/cadence/_internal/workflow/workflow_engine.py @@ -1,23 +1,188 @@ +import asyncio +import logging from dataclasses import dataclass -from typing import Optional, Callable, Any +from typing import Callable, Optional, Dict, Any from cadence._internal.workflow.context import Context from cadence.api.v1.decision_pb2 import Decision from cadence.client import Client from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse from cadence.workflow import WorkflowInfo +from cadence._internal.decision_state_machine import DecisionManager + +logger = logging.getLogger(__name__) @dataclass class DecisionResult: decisions: list[Decision] + force_create_new_decision_task: bool = False + query_results: Optional[Dict[str, Any]] = None class WorkflowEngine: - def __init__(self, info: WorkflowInfo, client: Client, workflow_func: Optional[Callable[..., Any]] = None): + def __init__(self, info: WorkflowInfo, client: Client, workflow_func: Callable[[Any], Any] | None = None): self._context = Context(client, info) self._workflow_func = workflow_func + self._decision_manager = DecisionManager() + self._is_workflow_complete = False - # TODO: Implement this async def process_decision(self, decision_task: PollForDecisionTaskResponse) -> DecisionResult: - with self._context._activate(): + """ + Process a decision task and generate decisions. + + Args: + decision_task: The PollForDecisionTaskResponse from the service + + Returns: + DecisionResult containing the list of decisions + """ + try: + logger.info(f"Processing decision task for workflow {self._context.info().workflow_id}") + + # Process workflow history to update decision state machines + if decision_task.history: + self._process_workflow_history(decision_task.history) + + # Execute workflow function to generate new decisions + if not self._is_workflow_complete: + await self._execute_workflow_function(decision_task) + + # Collect all pending decisions from state machines + decisions = self._decision_manager.collect_pending_decisions() + + # Close decider's event loop + self._close_event_loop() + + logger.info(f"Generated {len(decisions)} decisions for workflow {self._context.info().workflow_id}") + + return DecisionResult(decisions=decisions) + + except Exception: + logger.exception(f"Error processing decision task for workflow {self._context.info().workflow_id}") + # Return empty decisions on error - the task will be failed by the handler return DecisionResult(decisions=[]) + + def _process_workflow_history(self, history) -> None: + """ + Process workflow history events to update decision state machines. + + Args: + history: The workflow history from the decision task + """ + if not history or not hasattr(history, 'events'): + return + + logger.debug(f"Processing {len(history.events)} history events") + + for event in history.events: + try: + self._decision_manager.handle_history_event(event) + except Exception as e: + logger.warning(f"Error processing history event: {e}") + + async def _execute_workflow_function(self, decision_task: PollForDecisionTaskResponse) -> None: + """ + Execute the workflow function to generate new decisions. + + This blocks until the workflow schedules an activity or completes. + + Args: + decision_task: The decision task containing workflow context + """ + try: + with self._context._activate(): + # Execute the workflow function + # The workflow function should block until it schedules an activity + workflow_func = self._workflow_func + if workflow_func is None: + logger.warning(f"No workflow function available for workflow {self._context.info().workflow_id}") + return + + # Extract workflow input from history + workflow_input = await self._extract_workflow_input(decision_task) + + # Execute workflow function + result = self._execute_workflow_function_sync(workflow_func, workflow_input) + + # Check if workflow is complete + if result is not None: + self._is_workflow_complete = True + logger.info(f"Workflow {self._context.info().workflow_id} completed") + + except Exception: + logger.exception(f"Error executing workflow function for {self._context.info().workflow_id}") + raise + + async def _extract_workflow_input(self, decision_task: PollForDecisionTaskResponse) -> Any: + """ + Extract workflow input from the decision task history. + + Args: + decision_task: The decision task containing workflow history + + Returns: + The workflow input data, or None if not found + """ + if not decision_task.history or not hasattr(decision_task.history, 'events'): + logger.warning("No history events found in decision task") + return None + + # Look for WorkflowExecutionStarted event + for event in decision_task.history.events: + if hasattr(event, 'workflow_execution_started_event_attributes'): + started_attrs = event.workflow_execution_started_event_attributes + if started_attrs and hasattr(started_attrs, 'input'): + # Deserialize the input using the client's data converter + try: + # Use from_data method with a single type hint of None (no type conversion) + input_data_list = await self._context.client().data_converter.from_data(started_attrs.input, [None]) + input_data = input_data_list[0] if input_data_list else None + logger.debug(f"Extracted workflow input: {input_data}") + return input_data + except Exception as e: + logger.warning(f"Failed to deserialize workflow input: {e}") + return None + + logger.warning("No WorkflowExecutionStarted event found in history") + return None + + def _execute_workflow_function_sync(self, workflow_func: Callable, workflow_input: Any) -> Any: + """ + Execute the workflow function synchronously. + + Args: + workflow_func: The workflow function to execute + workflow_input: The input data for the workflow function + + Returns: + The result of the workflow function execution + """ + logger.debug(f"Executing workflow function with input: {workflow_input}") + result = workflow_func(workflow_input) + + # If the workflow function is async, we need to handle it properly + if asyncio.iscoroutine(result): + # Create a simple event loop for async workflow functions + try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + result = loop.run_until_complete(result) + finally: + loop.close() + asyncio.set_event_loop(None) + + return result + + def _close_event_loop(self) -> None: + """ + Close the decider's event loop. + """ + try: + # Get the current event loop + loop = asyncio.get_event_loop() + if loop.is_running(): + # Schedule the loop to stop + loop.call_soon_threadsafe(loop.stop) + logger.debug("Scheduled event loop to stop") + except Exception as e: + logger.warning(f"Error closing event loop: {e}") diff --git a/cadence/worker/_decision.py b/cadence/worker/_decision.py index 35e0a10..a95c812 100644 --- a/cadence/worker/_decision.py +++ b/cadence/worker/_decision.py @@ -14,6 +14,7 @@ class DecisionWorker: def __init__(self, client: Client, task_list: str, registry: Registry, options: WorkerOptions) -> None: self._client = client self._task_list = task_list + self._registry = registry self._identity = options["identity"] permits = asyncio.Semaphore(options["max_concurrent_decision_task_execution_size"]) self._decision_handler = DecisionTaskHandler(client, task_list, registry, **options) diff --git a/cadence/worker/_decision_task_handler.py b/cadence/worker/_decision_task_handler.py index 636505f..4c3e4a4 100644 --- a/cadence/worker/_decision_task_handler.py +++ b/cadence/worker/_decision_task_handler.py @@ -1,4 +1,5 @@ import logging +from typing import Dict, Any from cadence.api.v1.common_pb2 import Payload from cadence.api.v1.service_worker_pb2 import ( diff --git a/tests/cadence/_internal/workflow/test_workflow_engine_integration.py b/tests/cadence/_internal/workflow/test_workflow_engine_integration.py new file mode 100644 index 0000000..6499e5c --- /dev/null +++ b/tests/cadence/_internal/workflow/test_workflow_engine_integration.py @@ -0,0 +1,312 @@ +#!/usr/bin/env python3 +""" +Integration tests for WorkflowEngine. +""" + +import asyncio +import pytest +from unittest.mock import Mock, AsyncMock, patch +from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse +from cadence.api.v1.common_pb2 import Payload, WorkflowExecution, WorkflowType +from cadence.api.v1.history_pb2 import History, HistoryEvent, WorkflowExecutionStartedEventAttributes +from cadence.api.v1.decision_pb2 import Decision +from cadence._internal.workflow.workflow_engine import WorkflowEngine, DecisionResult +from cadence.workflow import WorkflowInfo +from cadence.client import Client + + +class TestWorkflowEngineIntegration: + """Integration tests for WorkflowEngine.""" + + @pytest.fixture + def mock_client(self): + """Create a mock Cadence client.""" + client = Mock(spec=Client) + client.domain = "test-domain" + client.data_converter = Mock() + client.data_converter.from_data = AsyncMock(return_value=["test-input"]) + return client + + @pytest.fixture + def workflow_info(self): + """Create workflow info.""" + return WorkflowInfo( + workflow_type="test_workflow", + workflow_domain="test-domain", + workflow_id="test-workflow-id", + workflow_run_id="test-run-id" + ) + + @pytest.fixture + def mock_workflow_func(self): + """Create a mock workflow function.""" + def workflow_func(input_data): + return f"processed: {input_data}" + return workflow_func + + @pytest.fixture + def workflow_engine(self, mock_client, workflow_info, mock_workflow_func): + """Create a WorkflowEngine instance.""" + return WorkflowEngine( + info=workflow_info, + client=mock_client, + workflow_func=mock_workflow_func + ) + + def create_mock_decision_task(self, workflow_id="test-workflow", run_id="test-run", workflow_type="test_workflow"): + """Create a mock decision task with history.""" + # Create workflow execution + workflow_execution = WorkflowExecution() + workflow_execution.workflow_id = workflow_id + workflow_execution.run_id = run_id + + # Create workflow type + workflow_type_obj = WorkflowType() + workflow_type_obj.name = workflow_type + + # Create workflow execution started event + started_event = WorkflowExecutionStartedEventAttributes() + input_payload = Payload(data=b'"test-input"') + started_event.input.CopyFrom(input_payload) + + history_event = HistoryEvent() + history_event.workflow_execution_started_event_attributes.CopyFrom(started_event) + + # Create history + history = History() + history.events.append(history_event) + + # Create decision task + decision_task = PollForDecisionTaskResponse() + decision_task.task_token = b"test-task-token" + decision_task.workflow_execution.CopyFrom(workflow_execution) + decision_task.workflow_type.CopyFrom(workflow_type_obj) + decision_task.history.CopyFrom(history) + + return decision_task + + @pytest.mark.asyncio + async def test_process_decision_success(self, workflow_engine, mock_client): + """Test successful decision processing.""" + decision_task = self.create_mock_decision_task() + + # Mock the decision manager to return some decisions + with patch.object(workflow_engine._decision_manager, 'collect_pending_decisions', return_value=[Mock()]): + # Process the decision + result = await workflow_engine.process_decision(decision_task) + + # Verify the result + assert isinstance(result, DecisionResult) + assert len(result.decisions) == 1 + assert result.force_create_new_decision_task is False + assert result.query_results is None + + @pytest.mark.asyncio + async def test_process_decision_with_history(self, workflow_engine, mock_client): + """Test decision processing with history events.""" + decision_task = self.create_mock_decision_task() + + # Mock the decision manager + with patch.object(workflow_engine._decision_manager, 'handle_history_event') as mock_handle: + with patch.object(workflow_engine._decision_manager, 'collect_pending_decisions', return_value=[]): + # Process the decision + await workflow_engine.process_decision(decision_task) + + # Verify history events were processed + mock_handle.assert_called() + + @pytest.mark.asyncio + async def test_process_decision_workflow_complete(self, workflow_engine, mock_client): + """Test decision processing when workflow is already complete.""" + # Mark workflow as complete + workflow_engine._is_workflow_complete = True + + decision_task = self.create_mock_decision_task() + + with patch.object(workflow_engine._decision_manager, 'collect_pending_decisions', return_value=[]): + # Process the decision + result = await workflow_engine.process_decision(decision_task) + + # Verify the result + assert isinstance(result, DecisionResult) + assert len(result.decisions) == 0 + + @pytest.mark.asyncio + async def test_process_decision_error_handling(self, workflow_engine, mock_client): + """Test decision processing error handling.""" + decision_task = self.create_mock_decision_task() + + # Mock the decision manager to raise an exception + with patch.object(workflow_engine._decision_manager, 'handle_history_event', side_effect=Exception("Test error")): + # Process the decision + result = await workflow_engine.process_decision(decision_task) + + # Verify error handling - should return empty decisions + assert isinstance(result, DecisionResult) + assert len(result.decisions) == 0 + + @pytest.mark.asyncio + async def test_extract_workflow_input_success(self, workflow_engine, mock_client): + """Test successful workflow input extraction.""" + decision_task = self.create_mock_decision_task() + + # Extract workflow input + input_data = await workflow_engine._extract_workflow_input(decision_task) + + # Verify the input was extracted + assert input_data == "test-input" + mock_client.data_converter.from_data.assert_called_once() + + @pytest.mark.asyncio + async def test_extract_workflow_input_no_history(self, workflow_engine, mock_client): + """Test workflow input extraction with no history.""" + decision_task = PollForDecisionTaskResponse() + decision_task.task_token = b"test-task-token" + # No history set + + # Extract workflow input + input_data = await workflow_engine._extract_workflow_input(decision_task) + + # Verify no input was extracted + assert input_data is None + + @pytest.mark.asyncio + async def test_extract_workflow_input_no_started_event(self, workflow_engine, mock_client): + """Test workflow input extraction with no WorkflowExecutionStarted event.""" + # Create a decision task with no started event + decision_task = PollForDecisionTaskResponse() + decision_task.task_token = b"test-task-token" + + # Create workflow execution + workflow_execution = WorkflowExecution() + workflow_execution.workflow_id = "test-workflow" + workflow_execution.run_id = "test-run" + decision_task.workflow_execution.CopyFrom(workflow_execution) + + # Create workflow type + workflow_type_obj = WorkflowType() + workflow_type_obj.name = "test_workflow" + decision_task.workflow_type.CopyFrom(workflow_type_obj) + + # Create history with no events + history = History() + decision_task.history.CopyFrom(history) + + # Extract workflow input + input_data = await workflow_engine._extract_workflow_input(decision_task) + + # Verify no input was extracted + assert input_data is None + + @pytest.mark.asyncio + async def test_extract_workflow_input_deserialization_error(self, workflow_engine, mock_client): + """Test workflow input extraction with deserialization error.""" + decision_task = self.create_mock_decision_task() + + # Mock data converter to raise an exception + mock_client.data_converter.from_data = AsyncMock(side_effect=Exception("Deserialization error")) + + # Extract workflow input + input_data = await workflow_engine._extract_workflow_input(decision_task) + + # Verify no input was extracted due to error + assert input_data is None + + def test_execute_workflow_function_sync(self, workflow_engine): + """Test synchronous workflow function execution.""" + input_data = "test-input" + + # Execute the workflow function + result = workflow_engine._execute_workflow_function_sync(workflow_engine._workflow_func, input_data) + + # Verify the result + assert result == "processed: test-input" + + def test_execute_workflow_function_async(self, workflow_engine): + """Test asynchronous workflow function execution.""" + async def async_workflow_func(input_data): + return f"async-processed: {input_data}" + + input_data = "test-input" + + # Execute the async workflow function + result = workflow_engine._execute_workflow_function_sync(async_workflow_func, input_data) + + # Verify the result + assert result == "async-processed: test-input" + + def test_execute_workflow_function_none(self, workflow_engine): + """Test workflow function execution with None function.""" + input_data = "test-input" + + # Execute with None workflow function - should raise TypeError + with pytest.raises(TypeError, match="'NoneType' object is not callable"): + workflow_engine._execute_workflow_function_sync(None, input_data) + + def test_workflow_engine_initialization(self, workflow_engine, workflow_info, mock_client, mock_workflow_func): + """Test WorkflowEngine initialization.""" + assert workflow_engine._context is not None + assert workflow_engine._workflow_func == mock_workflow_func + assert workflow_engine._decision_manager is not None + assert workflow_engine._is_workflow_complete is False + + @pytest.mark.asyncio + async def test_workflow_engine_without_workflow_func(self, mock_client, workflow_info): + """Test WorkflowEngine without workflow function.""" + engine = WorkflowEngine( + info=workflow_info, + client=mock_client, + workflow_func=None + ) + + decision_task = self.create_mock_decision_task() + + with patch.object(engine._decision_manager, 'collect_pending_decisions', return_value=[]): + # Process the decision + result = await engine.process_decision(decision_task) + + # Verify the result + assert isinstance(result, DecisionResult) + assert len(result.decisions) == 0 + + @pytest.mark.asyncio + async def test_workflow_engine_workflow_completion(self, workflow_engine, mock_client): + """Test workflow completion detection.""" + decision_task = self.create_mock_decision_task() + + # Mock workflow function to return a result (indicating completion) + def completing_workflow_func(input_data): + return "workflow-completed" + + workflow_engine._workflow_func = completing_workflow_func + + with patch.object(workflow_engine._decision_manager, 'collect_pending_decisions', return_value=[]): + # Process the decision + await workflow_engine.process_decision(decision_task) + + # Verify workflow is marked as complete + assert workflow_engine._is_workflow_complete is True + + def test_close_event_loop(self, workflow_engine): + """Test event loop closing.""" + # This should not raise an exception + workflow_engine._close_event_loop() + + @pytest.mark.asyncio + async def test_process_decision_with_query_results(self, workflow_engine, mock_client): + """Test decision processing with query results.""" + decision_task = self.create_mock_decision_task() + + # Mock the decision manager to return decisions with query results + mock_decisions = [Mock()] + query_results = {"query1": "result1"} + + with patch.object(workflow_engine._decision_manager, 'collect_pending_decisions', return_value=mock_decisions): + # Process the decision + result = await workflow_engine.process_decision(decision_task) + + # Verify the result + assert isinstance(result, DecisionResult) + assert len(result.decisions) == 1 + assert result.force_create_new_decision_task is False + assert result.query_results is None # Not set in this test diff --git a/tests/cadence/worker/test_decision_task_handler.py b/tests/cadence/worker/test_decision_task_handler.py index 2fc98ec..ef61c56 100644 --- a/tests/cadence/worker/test_decision_task_handler.py +++ b/tests/cadence/worker/test_decision_task_handler.py @@ -137,8 +137,8 @@ async def test_handle_task_implementation_workflow_not_found(self, handler, samp await handler._handle_task_implementation(sample_decision_task) @pytest.mark.asyncio - async def test_handle_task_implementation_creates_new_engine(self, handler, sample_decision_task, mock_registry): - """Test that decision task handler creates new workflow engine for each task.""" + async def test_handle_task_implementation_creates_new_engines(self, handler, sample_decision_task, mock_registry): + """Test that decision task handler creates new workflow engines for each task.""" # Mock workflow function mock_workflow_func = Mock() mock_registry.get_workflow.return_value = mock_workflow_func @@ -231,6 +231,8 @@ async def test_respond_decision_task_completed_success(self, handler, sample_dec """Test successful decision task completion response.""" decision_result = Mock(spec=DecisionResult) decision_result.decisions = [Decision(), Decision()] + decision_result.force_create_new_decision_task = False + decision_result.query_results = None await handler._respond_decision_task_completed(sample_decision_task, decision_result) @@ -248,6 +250,8 @@ async def test_respond_decision_task_completed_no_query_results(self, handler, s """Test decision task completion response without query results.""" decision_result = Mock(spec=DecisionResult) decision_result.decisions = [] + decision_result.force_create_new_decision_task = False + decision_result.query_results = None await handler._respond_decision_task_completed(sample_decision_task, decision_result) @@ -261,6 +265,8 @@ async def test_respond_decision_task_completed_error(self, handler, sample_decis """Test decision task completion response error handling.""" decision_result = Mock(spec=DecisionResult) decision_result.decisions = [] + decision_result.force_create_new_decision_task = False + decision_result.query_results = None handler._client.worker_stub.RespondDecisionTaskCompleted.side_effect = Exception("Respond failed") diff --git a/tests/cadence/worker/test_decision_task_handler_integration.py b/tests/cadence/worker/test_decision_task_handler_integration.py new file mode 100644 index 0000000..a6c5e04 --- /dev/null +++ b/tests/cadence/worker/test_decision_task_handler_integration.py @@ -0,0 +1,240 @@ +#!/usr/bin/env python3 +""" +Integration tests for DecisionTaskHandler and WorkflowEngine. +""" + +import asyncio +import pytest +from unittest.mock import Mock, AsyncMock, MagicMock, patch +from cadence.api.v1.service_worker_pb2 import ( + PollForDecisionTaskResponse, + RespondDecisionTaskCompletedRequest, + RespondDecisionTaskFailedRequest +) +from cadence.api.v1.common_pb2 import Payload, WorkflowExecution, WorkflowType +from cadence.api.v1.history_pb2 import History, HistoryEvent, WorkflowExecutionStartedEventAttributes +from cadence.api.v1.decision_pb2 import Decision +from cadence.worker._decision_task_handler import DecisionTaskHandler +from cadence.worker._registry import Registry +from cadence.workflow import WorkflowInfo +from cadence.client import Client + + +class TestDecisionTaskHandlerIntegration: + """Integration tests for DecisionTaskHandler.""" + + @pytest.fixture + def mock_client(self): + """Create a mock Cadence client.""" + client = Mock(spec=Client) + client.domain = "test-domain" + client.data_converter = Mock() + client.data_converter.from_data = AsyncMock(return_value=["test-input"]) + client.worker_stub = Mock() + client.worker_stub.RespondDecisionTaskCompleted = AsyncMock() + client.worker_stub.RespondDecisionTaskFailed = AsyncMock() + return client + + @pytest.fixture + def registry(self): + """Create a registry with a test workflow.""" + reg = Registry() + + @reg.workflow + def test_workflow(input_data): + """Simple test workflow that returns the input.""" + return f"processed: {input_data}" + + return reg + + @pytest.fixture + def decision_task_handler(self, mock_client, registry): + """Create a DecisionTaskHandler instance.""" + return DecisionTaskHandler( + client=mock_client, + task_list="test-task-list", + registry=registry, + identity="test-worker" + ) + + def create_mock_decision_task(self, workflow_id="test-workflow", run_id="test-run", workflow_type="test_workflow"): + """Create a mock decision task with history.""" + # Create workflow execution + workflow_execution = WorkflowExecution() + workflow_execution.workflow_id = workflow_id + workflow_execution.run_id = run_id + + # Create workflow type + workflow_type_obj = WorkflowType() + workflow_type_obj.name = workflow_type + + # Create workflow execution started event + started_event = WorkflowExecutionStartedEventAttributes() + input_payload = Payload(data=b'"test-input"') + started_event.input.CopyFrom(input_payload) + + history_event = HistoryEvent() + history_event.workflow_execution_started_event_attributes.CopyFrom(started_event) + + # Create history + history = History() + history.events.append(history_event) + + # Create decision task + decision_task = PollForDecisionTaskResponse() + decision_task.task_token = b"test-task-token" + decision_task.workflow_execution.CopyFrom(workflow_execution) + decision_task.workflow_type.CopyFrom(workflow_type_obj) + decision_task.history.CopyFrom(history) + + return decision_task + + @pytest.mark.asyncio + async def test_handle_decision_task_success(self, decision_task_handler, mock_client): + """Test successful decision task handling.""" + # Create a mock decision task + decision_task = self.create_mock_decision_task() + + # Mock the workflow engine to return some decisions + # Mock the workflow engine creation and execution + mock_engine = Mock() + # Create a proper Decision object + decision = Decision() + mock_engine.process_decision = AsyncMock(return_value=Mock( + decisions=[decision], # Proper Decision object + force_create_new_decision_task=False, + query_results=None + )) + + with patch('cadence.worker._decision_task_handler.WorkflowEngine', return_value=mock_engine): + # Handle the decision task + await decision_task_handler._handle_task_implementation(decision_task) + + # Verify the workflow engine was called + mock_engine.process_decision.assert_called_once_with(decision_task) + + # Verify the response was sent + mock_client.worker_stub.RespondDecisionTaskCompleted.assert_called_once() + + @pytest.mark.asyncio + async def test_handle_decision_task_workflow_not_found(self, decision_task_handler, mock_client): + """Test decision task handling when workflow is not found in registry.""" + # Create a decision task with unknown workflow type + decision_task = self.create_mock_decision_task(workflow_type="unknown_workflow") + + # Handle the decision task + await decision_task_handler.handle_task(decision_task) + + # Verify failure response was sent + mock_client.worker_stub.RespondDecisionTaskFailed.assert_called_once() + + # Verify the failure request has the correct cause + call_args = mock_client.worker_stub.RespondDecisionTaskFailed.call_args[0][0] + assert call_args.cause == 14 # DECISION_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE + + @pytest.mark.asyncio + async def test_handle_decision_task_missing_workflow_execution(self, decision_task_handler, mock_client): + """Test decision task handling when workflow execution is missing.""" + # Create a decision task without workflow execution + decision_task = PollForDecisionTaskResponse() + decision_task.task_token = b"test-task-token" + # No workflow_execution set + + # Handle the decision task + await decision_task_handler.handle_task(decision_task) + + # Verify failure response was sent + mock_client.worker_stub.RespondDecisionTaskFailed.assert_called_once() + + # Verify the failure request has the correct cause + call_args = mock_client.worker_stub.RespondDecisionTaskFailed.call_args[0][0] + assert call_args.cause == 14 # DECISION_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE + + @pytest.mark.asyncio + async def test_workflow_engine_creation_each_task(self, decision_task_handler, mock_client): + """Test that workflow engines are created for each task.""" + decision_task = self.create_mock_decision_task() + + with patch('cadence.worker._decision_task_handler.WorkflowEngine') as mock_engine_class: + mock_engine = Mock() + mock_engine.process_decision = AsyncMock(return_value=Mock( + decisions=[], + force_create_new_decision_task=False, + query_results=None + )) + mock_engine_class.return_value = mock_engine + + # Handle the same decision task twice + await decision_task_handler._handle_task_implementation(decision_task) + await decision_task_handler._handle_task_implementation(decision_task) + + # Verify engine was created twice (once for each task) + assert mock_engine_class.call_count == 2 + + # Verify engine was called twice + assert mock_engine.process_decision.call_count == 2 + + + @pytest.mark.asyncio + async def test_decision_task_failure_handling(self, decision_task_handler, mock_client): + """Test decision task failure handling.""" + decision_task = self.create_mock_decision_task() + + # Mock the workflow engine to raise an exception + with patch('cadence.worker._decision_task_handler.WorkflowEngine') as mock_engine_class: + mock_engine = Mock() + mock_engine.process_decision = AsyncMock(side_effect=Exception("Test error")) + mock_engine_class.return_value = mock_engine + + # Handle the decision task - this should catch the exception + await decision_task_handler.handle_task(decision_task) + + # Verify failure response was sent + mock_client.worker_stub.RespondDecisionTaskFailed.assert_called_once() + + def test_decision_task_handler_initialization(self, decision_task_handler): + """Test DecisionTaskHandler initialization.""" + assert decision_task_handler._registry is not None + assert decision_task_handler._identity == "test-worker" + + @pytest.mark.asyncio + async def test_respond_decision_task_completed(self, decision_task_handler, mock_client): + """Test decision task completion response.""" + decision_task = self.create_mock_decision_task() + + # Create mock decision result + decision_result = Mock() + decision_result.decisions = [Decision()] # Proper Decision object + decision_result.force_create_new_decision_task = False + decision_result.query_results = None + + # Call the response method + await decision_task_handler._respond_decision_task_completed(decision_task, decision_result) + + # Verify the response was sent + mock_client.worker_stub.RespondDecisionTaskCompleted.assert_called_once() + + # Verify the request parameters + call_args = mock_client.worker_stub.RespondDecisionTaskCompleted.call_args[0][0] + assert call_args.task_token == b"test-task-token" + assert call_args.identity == "test-worker" + assert len(call_args.decisions) == 1 + + @pytest.mark.asyncio + async def test_respond_decision_task_failed(self, decision_task_handler, mock_client): + """Test decision task failure response.""" + decision_task = self.create_mock_decision_task() + error = ValueError("Test error") + + # Call the failure method + await decision_task_handler.handle_task_failure(decision_task, error) + + # Verify the failure response was sent + mock_client.worker_stub.RespondDecisionTaskFailed.assert_called_once() + + # Verify the request parameters + call_args = mock_client.worker_stub.RespondDecisionTaskFailed.call_args[0][0] + assert call_args.task_token == b"test-task-token" + assert call_args.identity == "test-worker" + assert call_args.cause == 2 # BAD_SCHEDULE_ACTIVITY_ATTRIBUTES for ValueError + assert b"Test error" in call_args.details.data diff --git a/tests/cadence/worker/test_decision_worker_integration.py b/tests/cadence/worker/test_decision_worker_integration.py new file mode 100644 index 0000000..e9530a7 --- /dev/null +++ b/tests/cadence/worker/test_decision_worker_integration.py @@ -0,0 +1,286 @@ +#!/usr/bin/env python3 +""" +Integration tests for DecisionWorker with DecisionTaskHandler. +""" + +import asyncio +import pytest +from unittest.mock import Mock, AsyncMock, patch +from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse +from cadence.api.v1.common_pb2 import Payload, WorkflowExecution, WorkflowType +from cadence.api.v1.history_pb2 import History, HistoryEvent, WorkflowExecutionStartedEventAttributes +from cadence.worker._decision import DecisionWorker +from cadence.worker._decision_task_handler import DecisionTaskHandler +from cadence.worker._registry import Registry +from cadence.client import Client + + +class TestDecisionWorkerIntegration: + """Integration tests for DecisionWorker with DecisionTaskHandler.""" + + @pytest.fixture + def mock_client(self): + """Create a mock Cadence client.""" + client = Mock(spec=Client) + client.domain = "test-domain" + client.data_converter = Mock() + client.data_converter.from_data = AsyncMock(return_value=["test-input"]) + client.worker_stub = Mock() + client.worker_stub.PollForDecisionTask = AsyncMock() + client.worker_stub.RespondDecisionTaskCompleted = AsyncMock() + client.worker_stub.RespondDecisionTaskFailed = AsyncMock() + return client + + @pytest.fixture + def registry(self): + """Create a registry with a test workflow.""" + reg = Registry() + + @reg.workflow + def test_workflow(input_data): + """Simple test workflow that returns the input.""" + return f"processed: {input_data}" + + return reg + + @pytest.fixture + def decision_worker(self, mock_client, registry): + """Create a DecisionWorker instance.""" + options = { + "identity": "test-worker", + "max_concurrent_decision_task_execution_size": 1, + "decision_task_pollers": 1 + } + return DecisionWorker( + client=mock_client, + task_list="test-task-list", + registry=registry, + options=options + ) + + def create_mock_decision_task(self, workflow_id="test-workflow", run_id="test-run", workflow_type="test_workflow"): + """Create a mock decision task with history.""" + # Create workflow execution + workflow_execution = WorkflowExecution() + workflow_execution.workflow_id = workflow_id + workflow_execution.run_id = run_id + + # Create workflow type + workflow_type_obj = WorkflowType() + workflow_type_obj.name = workflow_type + + # Create workflow execution started event + started_event = WorkflowExecutionStartedEventAttributes() + input_payload = Payload(data=b'"test-input"') + started_event.input.CopyFrom(input_payload) + + history_event = HistoryEvent() + history_event.workflow_execution_started_event_attributes.CopyFrom(started_event) + + # Create history + history = History() + history.events.append(history_event) + + # Create decision task + decision_task = PollForDecisionTaskResponse() + decision_task.task_token = b"test-task-token" + decision_task.workflow_execution.CopyFrom(workflow_execution) + decision_task.workflow_type.CopyFrom(workflow_type_obj) + decision_task.history.CopyFrom(history) + + return decision_task + + @pytest.mark.asyncio + async def test_decision_worker_poll_and_execute(self, decision_worker, mock_client): + """Test decision worker polling and executing tasks.""" + # Create a mock decision task + decision_task = self.create_mock_decision_task() + + # Mock the poll to return the decision task + mock_client.worker_stub.PollForDecisionTask.return_value = decision_task + + # Mock the decision handler + with patch.object(decision_worker, '_decision_handler') as mock_handler: + mock_handler.handle_task = AsyncMock() + + # Run the poll and execute + await decision_worker._poll() + await decision_worker._execute(decision_task) + + # Verify the poll was called + mock_client.worker_stub.PollForDecisionTask.assert_called_once() + + # Verify the handler was called + mock_handler.handle_task.assert_called_once_with(decision_task) + + @pytest.mark.asyncio + async def test_decision_worker_poll_no_task(self, decision_worker, mock_client): + """Test decision worker polling when no task is available.""" + # Mock the poll to return None (no task) + mock_client.worker_stub.PollForDecisionTask.return_value = None + + # Run the poll + result = await decision_worker._poll() + + # Verify no task was returned + assert result is None + + @pytest.mark.asyncio + async def test_decision_worker_poll_with_task_token(self, decision_worker, mock_client): + """Test decision worker polling when task has token.""" + # Create a decision task with token + decision_task = self.create_mock_decision_task() + decision_task.task_token = b"valid-token" + + # Mock the poll to return the decision task + mock_client.worker_stub.PollForDecisionTask.return_value = decision_task + + # Run the poll + result = await decision_worker._poll() + + # Verify the task was returned + assert result == decision_task + + @pytest.mark.asyncio + async def test_decision_worker_poll_without_task_token(self, decision_worker, mock_client): + """Test decision worker polling when task has no token.""" + # Create a decision task without token + decision_task = self.create_mock_decision_task() + decision_task.task_token = b"" # Empty token + + # Mock the poll to return the decision task + mock_client.worker_stub.PollForDecisionTask.return_value = decision_task + + # Run the poll + result = await decision_worker._poll() + + # Verify no task was returned + assert result is None + + @pytest.mark.asyncio + async def test_decision_worker_execute_success(self, decision_worker, mock_client): + """Test successful decision task execution.""" + decision_task = self.create_mock_decision_task() + + # Mock the decision handler + with patch.object(decision_worker, '_decision_handler') as mock_handler: + mock_handler.handle_task = AsyncMock() + + # Execute the task + await decision_worker._execute(decision_task) + + # Verify the handler was called + mock_handler.handle_task.assert_called_once_with(decision_task) + + @pytest.mark.asyncio + async def test_decision_worker_execute_handler_error(self, decision_worker, mock_client): + """Test decision task execution when handler raises an error.""" + decision_task = self.create_mock_decision_task() + + # Mock the decision handler to raise an error + with patch.object(decision_worker, '_decision_handler') as mock_handler: + mock_handler.handle_task = AsyncMock(side_effect=Exception("Handler error")) + + # Execute the task - should raise the exception + with pytest.raises(Exception, match="Handler error"): + await decision_worker._execute(decision_task) + + # Verify the handler was called + mock_handler.handle_task.assert_called_once_with(decision_task) + + def test_decision_worker_initialization(self, decision_worker, mock_client, registry): + """Test DecisionWorker initialization.""" + assert decision_worker._client == mock_client + assert decision_worker._task_list == "test-task-list" + assert decision_worker._identity == "test-worker" + assert decision_worker._registry == registry + assert decision_worker._decision_handler is not None + assert decision_worker._poller is not None + + @pytest.mark.asyncio + async def test_decision_worker_run(self, decision_worker, mock_client): + """Test DecisionWorker run method.""" + # Mock the poller to complete immediately + with patch.object(decision_worker._poller, 'run', new_callable=AsyncMock) as mock_poller_run: + # Run the worker + await decision_worker.run() + + # Verify the poller was run + mock_poller_run.assert_called_once() + + @pytest.mark.asyncio + async def test_decision_worker_integration_flow(self, decision_worker, mock_client): + """Test the complete integration flow from poll to execute.""" + # Create a mock decision task + decision_task = self.create_mock_decision_task() + + # Mock the poll to return the decision task + mock_client.worker_stub.PollForDecisionTask.return_value = decision_task + + # Mock the decision handler + with patch.object(decision_worker, '_decision_handler') as mock_handler: + mock_handler.handle_task = AsyncMock() + + # Test the complete flow + # 1. Poll for task + polled_task = await decision_worker._poll() + assert polled_task == decision_task + + # 2. Execute the task + await decision_worker._execute(polled_task) + + # 3. Verify the handler was called + mock_handler.handle_task.assert_called_once_with(decision_task) + + @pytest.mark.asyncio + async def test_decision_worker_with_different_workflow_types(self, decision_worker, mock_client, registry): + """Test decision worker with different workflow types.""" + # Add another workflow to the registry + @registry.workflow + def another_workflow(input_data): + return f"another-processed: {input_data}" + + # Create decision tasks for different workflow types + task1 = self.create_mock_decision_task(workflow_type="test_workflow") + task2 = self.create_mock_decision_task(workflow_type="another_workflow") + + # Mock the decision handler + with patch.object(decision_worker, '_decision_handler') as mock_handler: + mock_handler.handle_task = AsyncMock() + + # Execute both tasks + await decision_worker._execute(task1) + await decision_worker._execute(task2) + + # Verify both tasks were handled + assert mock_handler.handle_task.call_count == 2 + + @pytest.mark.asyncio + async def test_decision_worker_poll_timeout(self, decision_worker, mock_client): + """Test decision worker polling with timeout.""" + # Mock the poll to raise a timeout exception + mock_client.worker_stub.PollForDecisionTask.side_effect = asyncio.TimeoutError("Poll timeout") + + # Run the poll - should handle timeout gracefully + with pytest.raises(asyncio.TimeoutError): + await decision_worker._poll() + + def test_decision_worker_options_handling(self, mock_client, registry): + """Test DecisionWorker with various options.""" + options = { + "identity": "custom-worker", + "max_concurrent_decision_task_execution_size": 5, + "decision_task_pollers": 3 + } + + worker = DecisionWorker( + client=mock_client, + task_list="custom-task-list", + registry=registry, + options=options + ) + + # Verify options were applied + assert worker._identity == "custom-worker" + assert worker._task_list == "custom-task-list" + assert worker._registry == registry diff --git a/tests/cadence/worker/test_task_handler_integration.py b/tests/cadence/worker/test_task_handler_integration.py index 64d877f..62837e4 100644 --- a/tests/cadence/worker/test_task_handler_integration.py +++ b/tests/cadence/worker/test_task_handler_integration.py @@ -66,6 +66,8 @@ async def test_full_task_handling_flow_success(self, handler, sample_decision_ta mock_engine = Mock(spec=WorkflowEngine) mock_decision_result = Mock(spec=DecisionResult) mock_decision_result.decisions = [] + mock_decision_result.force_create_new_decision_task = False + mock_decision_result.query_results = None mock_engine.process_decision = AsyncMock(return_value=mock_decision_result) with patch('cadence.worker._decision_task_handler.WorkflowEngine', return_value=mock_engine): @@ -109,6 +111,8 @@ async def test_context_activation_integration(self, handler, sample_decision_tas mock_engine = Mock(spec=WorkflowEngine) mock_decision_result = Mock(spec=DecisionResult) mock_decision_result.decisions = [] + mock_decision_result.force_create_new_decision_task = False + mock_decision_result.query_results = None mock_engine.process_decision = AsyncMock(return_value=mock_decision_result) # Track if context is activated @@ -158,6 +162,8 @@ async def test_multiple_workflow_executions(self, handler, mock_registry): mock_decision_result = Mock(spec=DecisionResult) mock_decision_result.decisions = [] + mock_decision_result.force_create_new_decision_task = False + mock_decision_result.query_results = None mock_engine.process_decision = AsyncMock(return_value=mock_decision_result) @@ -183,6 +189,8 @@ async def test_workflow_engine_creation_integration(self, handler, sample_decisi mock_engine = Mock(spec=WorkflowEngine) mock_decision_result = Mock(spec=DecisionResult) mock_decision_result.decisions = [] + mock_decision_result.force_create_new_decision_task = False + mock_decision_result.query_results = None mock_engine.process_decision = AsyncMock(return_value=mock_decision_result) with patch('cadence.worker._decision_task_handler.WorkflowEngine', return_value=mock_engine) as mock_engine_class: @@ -250,6 +258,8 @@ async def test_concurrent_task_handling(self, handler, mock_registry): mock_engine = Mock(spec=WorkflowEngine) mock_decision_result = Mock(spec=DecisionResult) mock_decision_result.decisions = [] + mock_decision_result.force_create_new_decision_task = False + mock_decision_result.query_results = None mock_engine.process_decision = AsyncMock(return_value=mock_decision_result) with patch('cadence.worker._decision_task_handler.WorkflowEngine', return_value=mock_engine): From f9989a94cef913662ea4ac5eef3b0a6dffa0fbdb Mon Sep 17 00:00:00 2001 From: Tim Li Date: Wed, 17 Sep 2025 16:10:46 -0700 Subject: [PATCH 03/12] lint Signed-off-by: Tim Li --- cadence/worker/_decision_task_handler.py | 1 - .../workflow/test_workflow_engine_integration.py | 3 --- .../worker/test_decision_task_handler_integration.py | 8 ++------ tests/cadence/worker/test_decision_worker_integration.py | 1 - 4 files changed, 2 insertions(+), 11 deletions(-) diff --git a/cadence/worker/_decision_task_handler.py b/cadence/worker/_decision_task_handler.py index 4c3e4a4..636505f 100644 --- a/cadence/worker/_decision_task_handler.py +++ b/cadence/worker/_decision_task_handler.py @@ -1,5 +1,4 @@ import logging -from typing import Dict, Any from cadence.api.v1.common_pb2 import Payload from cadence.api.v1.service_worker_pb2 import ( diff --git a/tests/cadence/_internal/workflow/test_workflow_engine_integration.py b/tests/cadence/_internal/workflow/test_workflow_engine_integration.py index 6499e5c..75b0075 100644 --- a/tests/cadence/_internal/workflow/test_workflow_engine_integration.py +++ b/tests/cadence/_internal/workflow/test_workflow_engine_integration.py @@ -3,13 +3,11 @@ Integration tests for WorkflowEngine. """ -import asyncio import pytest from unittest.mock import Mock, AsyncMock, patch from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse from cadence.api.v1.common_pb2 import Payload, WorkflowExecution, WorkflowType from cadence.api.v1.history_pb2 import History, HistoryEvent, WorkflowExecutionStartedEventAttributes -from cadence.api.v1.decision_pb2 import Decision from cadence._internal.workflow.workflow_engine import WorkflowEngine, DecisionResult from cadence.workflow import WorkflowInfo from cadence.client import Client @@ -299,7 +297,6 @@ async def test_process_decision_with_query_results(self, workflow_engine, mock_c # Mock the decision manager to return decisions with query results mock_decisions = [Mock()] - query_results = {"query1": "result1"} with patch.object(workflow_engine._decision_manager, 'collect_pending_decisions', return_value=mock_decisions): # Process the decision diff --git a/tests/cadence/worker/test_decision_task_handler_integration.py b/tests/cadence/worker/test_decision_task_handler_integration.py index a6c5e04..aef389d 100644 --- a/tests/cadence/worker/test_decision_task_handler_integration.py +++ b/tests/cadence/worker/test_decision_task_handler_integration.py @@ -3,20 +3,16 @@ Integration tests for DecisionTaskHandler and WorkflowEngine. """ -import asyncio import pytest -from unittest.mock import Mock, AsyncMock, MagicMock, patch +from unittest.mock import Mock, AsyncMock, patch from cadence.api.v1.service_worker_pb2 import ( - PollForDecisionTaskResponse, - RespondDecisionTaskCompletedRequest, - RespondDecisionTaskFailedRequest + PollForDecisionTaskResponse ) from cadence.api.v1.common_pb2 import Payload, WorkflowExecution, WorkflowType from cadence.api.v1.history_pb2 import History, HistoryEvent, WorkflowExecutionStartedEventAttributes from cadence.api.v1.decision_pb2 import Decision from cadence.worker._decision_task_handler import DecisionTaskHandler from cadence.worker._registry import Registry -from cadence.workflow import WorkflowInfo from cadence.client import Client diff --git a/tests/cadence/worker/test_decision_worker_integration.py b/tests/cadence/worker/test_decision_worker_integration.py index e9530a7..85c55d2 100644 --- a/tests/cadence/worker/test_decision_worker_integration.py +++ b/tests/cadence/worker/test_decision_worker_integration.py @@ -10,7 +10,6 @@ from cadence.api.v1.common_pb2 import Payload, WorkflowExecution, WorkflowType from cadence.api.v1.history_pb2 import History, HistoryEvent, WorkflowExecutionStartedEventAttributes from cadence.worker._decision import DecisionWorker -from cadence.worker._decision_task_handler import DecisionTaskHandler from cadence.worker._registry import Registry from cadence.client import Client From 073dc341a856732ad7131d5086b7035dede3a458 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Thu, 18 Sep 2025 14:30:05 -0700 Subject: [PATCH 04/12] minor fix Signed-off-by: Tim Li --- cadence/_internal/workflow/workflow_engine.py | 4 +--- cadence/worker/_decision_task_handler.py | 3 +-- .../workflow/test_workflow_engine_integration.py | 5 +---- tests/cadence/worker/test_decision_task_handler.py | 14 -------------- .../test_decision_task_handler_integration.py | 6 ------ .../worker/test_task_handler_integration.py | 10 ---------- 6 files changed, 3 insertions(+), 39 deletions(-) diff --git a/cadence/_internal/workflow/workflow_engine.py b/cadence/_internal/workflow/workflow_engine.py index ff5f01f..6281f76 100644 --- a/cadence/_internal/workflow/workflow_engine.py +++ b/cadence/_internal/workflow/workflow_engine.py @@ -1,7 +1,7 @@ import asyncio import logging from dataclasses import dataclass -from typing import Callable, Optional, Dict, Any +from typing import Callable, Any from cadence._internal.workflow.context import Context from cadence.api.v1.decision_pb2 import Decision @@ -16,8 +16,6 @@ @dataclass class DecisionResult: decisions: list[Decision] - force_create_new_decision_task: bool = False - query_results: Optional[Dict[str, Any]] = None class WorkflowEngine: def __init__(self, info: WorkflowInfo, client: Client, workflow_func: Callable[[Any], Any] | None = None): diff --git a/cadence/worker/_decision_task_handler.py b/cadence/worker/_decision_task_handler.py index 636505f..793c596 100644 --- a/cadence/worker/_decision_task_handler.py +++ b/cadence/worker/_decision_task_handler.py @@ -136,8 +136,7 @@ async def _respond_decision_task_completed(self, task: PollForDecisionTaskRespon task_token=task.task_token, decisions=decision_result.decisions, identity=self._identity, - return_new_decision_task=True, - force_create_new_decision_task=False + return_new_decision_task=True ) await self._client.worker_stub.RespondDecisionTaskCompleted(request) diff --git a/tests/cadence/_internal/workflow/test_workflow_engine_integration.py b/tests/cadence/_internal/workflow/test_workflow_engine_integration.py index 75b0075..7e210d6 100644 --- a/tests/cadence/_internal/workflow/test_workflow_engine_integration.py +++ b/tests/cadence/_internal/workflow/test_workflow_engine_integration.py @@ -96,8 +96,6 @@ async def test_process_decision_success(self, workflow_engine, mock_client): # Verify the result assert isinstance(result, DecisionResult) assert len(result.decisions) == 1 - assert result.force_create_new_decision_task is False - assert result.query_results is None @pytest.mark.asyncio async def test_process_decision_with_history(self, workflow_engine, mock_client): @@ -305,5 +303,4 @@ async def test_process_decision_with_query_results(self, workflow_engine, mock_c # Verify the result assert isinstance(result, DecisionResult) assert len(result.decisions) == 1 - assert result.force_create_new_decision_task is False - assert result.query_results is None # Not set in this test + # Not set in this test diff --git a/tests/cadence/worker/test_decision_task_handler.py b/tests/cadence/worker/test_decision_task_handler.py index ef61c56..687e148 100644 --- a/tests/cadence/worker/test_decision_task_handler.py +++ b/tests/cadence/worker/test_decision_task_handler.py @@ -87,8 +87,6 @@ async def test_handle_task_implementation_success(self, handler, sample_decision mock_engine = Mock(spec=WorkflowEngine) mock_decision_result = Mock(spec=DecisionResult) mock_decision_result.decisions = [Decision()] - mock_decision_result.force_create_new_decision_task = False - mock_decision_result.query_results = {} mock_engine.process_decision = AsyncMock(return_value=mock_decision_result) with patch('cadence.worker._decision_task_handler.WorkflowEngine', return_value=mock_engine): @@ -147,8 +145,6 @@ async def test_handle_task_implementation_creates_new_engines(self, handler, sam mock_engine = Mock(spec=WorkflowEngine) mock_decision_result = Mock(spec=DecisionResult) mock_decision_result.decisions = [] - mock_decision_result.force_create_new_decision_task = False - mock_decision_result.query_results = {} mock_engine.process_decision = AsyncMock(return_value=mock_decision_result) with patch('cadence.worker._decision_task_handler.WorkflowEngine', return_value=mock_engine) as mock_engine_class: @@ -231,8 +227,6 @@ async def test_respond_decision_task_completed_success(self, handler, sample_dec """Test successful decision task completion response.""" decision_result = Mock(spec=DecisionResult) decision_result.decisions = [Decision(), Decision()] - decision_result.force_create_new_decision_task = False - decision_result.query_results = None await handler._respond_decision_task_completed(sample_decision_task, decision_result) @@ -242,7 +236,6 @@ async def test_respond_decision_task_completed_success(self, handler, sample_dec assert call_args.task_token == sample_decision_task.task_token assert call_args.identity == handler._identity assert call_args.return_new_decision_task - assert not call_args.force_create_new_decision_task assert len(call_args.decisions) == 2 @pytest.mark.asyncio @@ -250,14 +243,11 @@ async def test_respond_decision_task_completed_no_query_results(self, handler, s """Test decision task completion response without query results.""" decision_result = Mock(spec=DecisionResult) decision_result.decisions = [] - decision_result.force_create_new_decision_task = False - decision_result.query_results = None await handler._respond_decision_task_completed(sample_decision_task, decision_result) call_args = handler._client.worker_stub.RespondDecisionTaskCompleted.call_args[0][0] assert call_args.return_new_decision_task - assert not call_args.force_create_new_decision_task assert len(call_args.decisions) == 0 @pytest.mark.asyncio @@ -265,8 +255,6 @@ async def test_respond_decision_task_completed_error(self, handler, sample_decis """Test decision task completion response error handling.""" decision_result = Mock(spec=DecisionResult) decision_result.decisions = [] - decision_result.force_create_new_decision_task = False - decision_result.query_results = None handler._client.worker_stub.RespondDecisionTaskCompleted.side_effect = Exception("Respond failed") @@ -283,8 +271,6 @@ async def test_workflow_engine_creation_with_workflow_info(self, handler, sample mock_engine = Mock(spec=WorkflowEngine) mock_decision_result = Mock(spec=DecisionResult) mock_decision_result.decisions = [] - mock_decision_result.force_create_new_decision_task = False - mock_decision_result.query_results = {} mock_engine.process_decision = AsyncMock(return_value=mock_decision_result) with patch('cadence.worker._decision_task_handler.WorkflowEngine', return_value=mock_engine) as mock_workflow_engine_class: diff --git a/tests/cadence/worker/test_decision_task_handler_integration.py b/tests/cadence/worker/test_decision_task_handler_integration.py index aef389d..b513a14 100644 --- a/tests/cadence/worker/test_decision_task_handler_integration.py +++ b/tests/cadence/worker/test_decision_task_handler_integration.py @@ -98,8 +98,6 @@ async def test_handle_decision_task_success(self, decision_task_handler, mock_cl decision = Decision() mock_engine.process_decision = AsyncMock(return_value=Mock( decisions=[decision], # Proper Decision object - force_create_new_decision_task=False, - query_results=None )) with patch('cadence.worker._decision_task_handler.WorkflowEngine', return_value=mock_engine): @@ -155,8 +153,6 @@ async def test_workflow_engine_creation_each_task(self, decision_task_handler, m mock_engine = Mock() mock_engine.process_decision = AsyncMock(return_value=Mock( decisions=[], - force_create_new_decision_task=False, - query_results=None )) mock_engine_class.return_value = mock_engine @@ -201,8 +197,6 @@ async def test_respond_decision_task_completed(self, decision_task_handler, mock # Create mock decision result decision_result = Mock() decision_result.decisions = [Decision()] # Proper Decision object - decision_result.force_create_new_decision_task = False - decision_result.query_results = None # Call the response method await decision_task_handler._respond_decision_task_completed(decision_task, decision_result) diff --git a/tests/cadence/worker/test_task_handler_integration.py b/tests/cadence/worker/test_task_handler_integration.py index 62837e4..64d877f 100644 --- a/tests/cadence/worker/test_task_handler_integration.py +++ b/tests/cadence/worker/test_task_handler_integration.py @@ -66,8 +66,6 @@ async def test_full_task_handling_flow_success(self, handler, sample_decision_ta mock_engine = Mock(spec=WorkflowEngine) mock_decision_result = Mock(spec=DecisionResult) mock_decision_result.decisions = [] - mock_decision_result.force_create_new_decision_task = False - mock_decision_result.query_results = None mock_engine.process_decision = AsyncMock(return_value=mock_decision_result) with patch('cadence.worker._decision_task_handler.WorkflowEngine', return_value=mock_engine): @@ -111,8 +109,6 @@ async def test_context_activation_integration(self, handler, sample_decision_tas mock_engine = Mock(spec=WorkflowEngine) mock_decision_result = Mock(spec=DecisionResult) mock_decision_result.decisions = [] - mock_decision_result.force_create_new_decision_task = False - mock_decision_result.query_results = None mock_engine.process_decision = AsyncMock(return_value=mock_decision_result) # Track if context is activated @@ -162,8 +158,6 @@ async def test_multiple_workflow_executions(self, handler, mock_registry): mock_decision_result = Mock(spec=DecisionResult) mock_decision_result.decisions = [] - mock_decision_result.force_create_new_decision_task = False - mock_decision_result.query_results = None mock_engine.process_decision = AsyncMock(return_value=mock_decision_result) @@ -189,8 +183,6 @@ async def test_workflow_engine_creation_integration(self, handler, sample_decisi mock_engine = Mock(spec=WorkflowEngine) mock_decision_result = Mock(spec=DecisionResult) mock_decision_result.decisions = [] - mock_decision_result.force_create_new_decision_task = False - mock_decision_result.query_results = None mock_engine.process_decision = AsyncMock(return_value=mock_decision_result) with patch('cadence.worker._decision_task_handler.WorkflowEngine', return_value=mock_engine) as mock_engine_class: @@ -258,8 +250,6 @@ async def test_concurrent_task_handling(self, handler, mock_registry): mock_engine = Mock(spec=WorkflowEngine) mock_decision_result = Mock(spec=DecisionResult) mock_decision_result.decisions = [] - mock_decision_result.force_create_new_decision_task = False - mock_decision_result.query_results = None mock_engine.process_decision = AsyncMock(return_value=mock_decision_result) with patch('cadence.worker._decision_task_handler.WorkflowEngine', return_value=mock_engine): From 3b385cb2d4226906b26a0104cf3c267e6848dedf Mon Sep 17 00:00:00 2001 From: Tim Li Date: Mon, 22 Sep 2025 15:47:11 -0700 Subject: [PATCH 05/12] respond to comments Signed-off-by: Tim Li --- .../_internal/workflow/decisions_helper.py | 395 ++++++++++++++++++ cadence/_internal/workflow/history_helper.py | 239 +++++++++++ cadence/_internal/workflow/workflow_engine.py | 15 +- cadence/worker/_decision.py | 42 -- cadence/worker/_decision_worker.py | 58 +++ cadence/worker/_worker.py | 2 +- .../test_decision_worker_integration.py | 2 +- 7 files changed, 704 insertions(+), 49 deletions(-) create mode 100644 cadence/_internal/workflow/decisions_helper.py create mode 100644 cadence/_internal/workflow/history_helper.py delete mode 100644 cadence/worker/_decision.py create mode 100644 cadence/worker/_decision_worker.py diff --git a/cadence/_internal/workflow/decisions_helper.py b/cadence/_internal/workflow/decisions_helper.py new file mode 100644 index 0000000..e3081b4 --- /dev/null +++ b/cadence/_internal/workflow/decisions_helper.py @@ -0,0 +1,395 @@ +""" +DecisionsHelper manages the next decision ID which is used for tracking decision state machines. + +This helper ensures that decision IDs are properly assigned and tracked to maintain +consistency in the workflow execution state. +""" + +import logging +from dataclasses import dataclass +from typing import Dict, Optional + +from cadence._internal.decision_state_machine import DecisionId, DecisionType +from cadence.api.v1.history_pb2 import HistoryEvent + +logger = logging.getLogger(__name__) + + +@dataclass +class DecisionTracker: + """Tracks a decision with its ID and current state.""" + + decision_id: DecisionId + scheduled_event_id: Optional[int] = None + initiated_event_id: Optional[int] = None + started_event_id: Optional[int] = None + is_completed: bool = False + + +class DecisionsHelper: + """ + Helper class to manage decision IDs and track decision state across workflow execution. + + This class ensures that each decision gets a unique ID and tracks the lifecycle + of decisions through the workflow execution. + """ + + def __init__(self): + """Initialize the DecisionsHelper.""" + self._next_decision_counters: Dict[DecisionType, int] = {} + self._tracked_decisions: Dict[str, DecisionTracker] = {} + self._decision_id_to_key: Dict[str, str] = {} + logger.debug("DecisionsHelper initialized") + + def _get_next_counter(self, decision_type: DecisionType) -> int: + """ + Get the next counter value for a given decision type. + + Args: + decision_type: The type of decision + + Returns: + The next counter value + """ + if decision_type not in self._next_decision_counters: + self._next_decision_counters[decision_type] = 1 + else: + self._next_decision_counters[decision_type] += 1 + + return self._next_decision_counters[decision_type] + + def generate_activity_id(self, activity_name: str) -> str: + """ + Generate a unique activity ID. + + Args: + activity_name: The name of the activity + + Returns: + A unique activity ID + """ + counter = self._get_next_counter(DecisionType.ACTIVITY) + activity_id = f"{activity_name}_{counter}" + + # Track this decision + decision_id = DecisionId(DecisionType.ACTIVITY, activity_id) + tracker = DecisionTracker(decision_id) + self._tracked_decisions[activity_id] = tracker + self._decision_id_to_key[str(decision_id)] = activity_id + + logger.debug(f"Generated activity ID: {activity_id}") + return activity_id + + def generate_timer_id(self, timer_name: str = "timer") -> str: + """ + Generate a unique timer ID. + + Args: + timer_name: The name/prefix for the timer + + Returns: + A unique timer ID + """ + counter = self._get_next_counter(DecisionType.TIMER) + timer_id = f"{timer_name}_{counter}" + + # Track this decision + decision_id = DecisionId(DecisionType.TIMER, timer_id) + tracker = DecisionTracker(decision_id) + self._tracked_decisions[timer_id] = tracker + self._decision_id_to_key[str(decision_id)] = timer_id + + logger.debug(f"Generated timer ID: {timer_id}") + return timer_id + + def generate_child_workflow_id(self, workflow_name: str) -> str: + """ + Generate a unique child workflow ID. + + Args: + workflow_name: The name of the child workflow + + Returns: + A unique child workflow ID + """ + counter = self._get_next_counter(DecisionType.CHILD_WORKFLOW) + workflow_id = f"{workflow_name}_{counter}" + + # Track this decision + decision_id = DecisionId(DecisionType.CHILD_WORKFLOW, workflow_id) + tracker = DecisionTracker(decision_id) + self._tracked_decisions[workflow_id] = tracker + self._decision_id_to_key[str(decision_id)] = workflow_id + + logger.debug(f"Generated child workflow ID: {workflow_id}") + return workflow_id + + def generate_marker_id(self, marker_name: str) -> str: + """ + Generate a unique marker ID. + + Args: + marker_name: The name of the marker + + Returns: + A unique marker ID + """ + counter = self._get_next_counter(DecisionType.MARKER) + marker_id = f"{marker_name}_{counter}" + + # Track this decision + decision_id = DecisionId(DecisionType.MARKER, marker_id) + tracker = DecisionTracker(decision_id) + self._tracked_decisions[marker_id] = tracker + self._decision_id_to_key[str(decision_id)] = marker_id + + logger.debug(f"Generated marker ID: {marker_id}") + return marker_id + + def get_decision_tracker(self, decision_key: str) -> Optional[DecisionTracker]: + """ + Get the decision tracker for a given decision key. + + Args: + decision_key: The decision key (activity_id, timer_id, etc.) + + Returns: + The DecisionTracker if found, None otherwise + """ + return self._tracked_decisions.get(decision_key) + + def update_decision_scheduled( + self, decision_key: str, scheduled_event_id: int + ) -> None: + """ + Update a decision tracker when it gets scheduled. + + Args: + decision_key: The decision key + scheduled_event_id: The event ID when the decision was scheduled + """ + tracker = self._tracked_decisions.get(decision_key) + if tracker: + tracker.scheduled_event_id = scheduled_event_id + logger.debug( + f"Updated decision {decision_key} with scheduled event ID {scheduled_event_id}" + ) + else: + logger.warning(f"No tracker found for decision key: {decision_key}") + + def update_decision_initiated( + self, decision_key: str, initiated_event_id: int + ) -> None: + """ + Update a decision tracker when it gets initiated. + + Args: + decision_key: The decision key + initiated_event_id: The event ID when the decision was initiated + """ + tracker = self._tracked_decisions.get(decision_key) + if tracker: + tracker.initiated_event_id = initiated_event_id + logger.debug( + f"Updated decision {decision_key} with initiated event ID {initiated_event_id}" + ) + else: + logger.warning(f"No tracker found for decision key: {decision_key}") + + def update_decision_started(self, decision_key: str, started_event_id: int) -> None: + """ + Update a decision tracker when it gets started. + + Args: + decision_key: The decision key + started_event_id: The event ID when the decision was started + """ + tracker = self._tracked_decisions.get(decision_key) + if tracker: + tracker.started_event_id = started_event_id + logger.debug( + f"Updated decision {decision_key} with started event ID {started_event_id}" + ) + else: + logger.warning(f"No tracker found for decision key: {decision_key}") + + def update_decision_completed(self, decision_key: str) -> None: + """ + Mark a decision as completed. + + Args: + decision_key: The decision key + """ + tracker = self._tracked_decisions.get(decision_key) + if tracker: + tracker.is_completed = True + logger.debug(f"Marked decision {decision_key} as completed") + else: + logger.warning(f"No tracker found for decision key: {decision_key}") + + def process_history_event(self, event: HistoryEvent) -> None: + """ + Process a history event and update decision trackers accordingly. + + Args: + event: The history event to process + """ + attr = event.WhichOneof("attributes") + if not attr: + return + + # Handle activity events + if attr == "activity_task_scheduled_event_attributes": + attrs = event.activity_task_scheduled_event_attributes + if hasattr(attrs, "activity_id"): + self.update_decision_scheduled(attrs.activity_id, event.event_id) + + elif attr == "activity_task_started_event_attributes": + attrs = event.activity_task_started_event_attributes + if hasattr(attrs, "scheduled_event_id"): + # Find the decision by scheduled event ID + decision_key = self._find_decision_by_scheduled_event_id( + attrs.scheduled_event_id + ) + if decision_key: + self.update_decision_started(decision_key, event.event_id) + + elif attr in [ + "activity_task_completed_event_attributes", + "activity_task_failed_event_attributes", + "activity_task_timed_out_event_attributes", + ]: + attrs = getattr(event, attr) + if hasattr(attrs, "scheduled_event_id"): + # Find the decision by scheduled event ID + decision_key = self._find_decision_by_scheduled_event_id( + attrs.scheduled_event_id + ) + if decision_key: + self.update_decision_completed(decision_key) + + # Handle timer events + elif attr == "timer_started_event_attributes": + attrs = event.timer_started_event_attributes + if hasattr(attrs, "timer_id"): + self.update_decision_initiated(attrs.timer_id, event.event_id) + + elif attr == "timer_fired_event_attributes": + attrs = event.timer_fired_event_attributes + if hasattr(attrs, "started_event_id"): + # Find the decision by started event ID + decision_key = self._find_decision_by_started_event_id( + attrs.started_event_id + ) + if decision_key: + self.update_decision_completed(decision_key) + + # Handle child workflow events + elif attr == "start_child_workflow_execution_initiated_event_attributes": + attrs = event.start_child_workflow_execution_initiated_event_attributes + if hasattr(attrs, "workflow_id"): + self.update_decision_initiated(attrs.workflow_id, event.event_id) + + elif attr == "child_workflow_execution_started_event_attributes": + attrs = event.child_workflow_execution_started_event_attributes + if hasattr(attrs, "initiated_event_id"): + # Find the decision by initiated event ID + decision_key = self._find_decision_by_initiated_event_id( + attrs.initiated_event_id + ) + if decision_key: + self.update_decision_started(decision_key, event.event_id) + + elif attr in [ + "child_workflow_execution_completed_event_attributes", + "child_workflow_execution_failed_event_attributes", + "child_workflow_execution_timed_out_event_attributes", + ]: + attrs = getattr(event, attr) + if hasattr(attrs, "initiated_event_id"): + # Find the decision by initiated event ID + decision_key = self._find_decision_by_initiated_event_id( + attrs.initiated_event_id + ) + if decision_key: + self.update_decision_completed(decision_key) + + def _find_decision_by_scheduled_event_id( + self, scheduled_event_id: int + ) -> Optional[str]: + """Find a decision key by its scheduled event ID.""" + for key, tracker in self._tracked_decisions.items(): + if tracker.scheduled_event_id == scheduled_event_id: + return key + return None + + def _find_decision_by_initiated_event_id( + self, initiated_event_id: int + ) -> Optional[str]: + """Find a decision key by its initiated event ID.""" + for key, tracker in self._tracked_decisions.items(): + if tracker.initiated_event_id == initiated_event_id: + return key + return None + + def _find_decision_by_started_event_id( + self, started_event_id: int + ) -> Optional[str]: + """Find a decision key by its started event ID.""" + for key, tracker in self._tracked_decisions.items(): + if tracker.started_event_id == started_event_id: + return key + return None + + def get_pending_decisions_count(self) -> int: + """ + Get the count of decisions that are not yet completed. + + Returns: + The number of pending decisions + """ + return sum( + 1 + for tracker in self._tracked_decisions.values() + if not tracker.is_completed + ) + + def get_completed_decisions_count(self) -> int: + """ + Get the count of decisions that have been completed. + + Returns: + The number of completed decisions + """ + return sum( + 1 for tracker in self._tracked_decisions.values() if tracker.is_completed + ) + + def reset(self) -> None: + """Reset all decision tracking state.""" + self._next_decision_counters.clear() + self._tracked_decisions.clear() + self._decision_id_to_key.clear() + logger.debug("DecisionsHelper reset") + + def get_stats(self) -> Dict[str, int]: + """ + Get statistics about tracked decisions. + + Returns: + Dictionary with decision statistics + """ + stats = { + "total_decisions": len(self._tracked_decisions), + "pending_decisions": self.get_pending_decisions_count(), + "completed_decisions": self.get_completed_decisions_count(), + } + + # Add per-type counts + for decision_type in DecisionType: + type_name = decision_type.name.lower() + stats[f"{type_name}_count"] = self._next_decision_counters.get( + decision_type, 0 + ) + + return stats diff --git a/cadence/_internal/workflow/history_helper.py b/cadence/_internal/workflow/history_helper.py new file mode 100644 index 0000000..9194b79 --- /dev/null +++ b/cadence/_internal/workflow/history_helper.py @@ -0,0 +1,239 @@ +""" +HistoryHelper manages the addition of workflow history when a decision task is responded. + +This helper ensures that the workflow history is properly tracked and updated +when decisions are made and responses are sent back to the Cadence service. +""" + +import logging +from typing import List, Optional + +from cadence.api.v1.decision_pb2 import Decision +from cadence.api.v1.history_pb2 import History, HistoryEvent + +logger = logging.getLogger(__name__) + + +class HistoryHelper: + """ + Helper class to manage workflow history updates when decisions are responded. + + This class tracks the current history state and adds new events as decisions + are processed and responses are generated. + """ + + def __init__(self, initial_history: Optional[History] = None): + """ + Initialize the HistoryHelper with optional initial history. + + Args: + initial_history: The initial workflow history to start with + """ + self._current_history = initial_history or History() + self._next_event_id = self._calculate_next_event_id() + logger.debug( + f"HistoryHelper initialized with {len(self._current_history.events)} events, next event ID: {self._next_event_id}" + ) + + def _calculate_next_event_id(self) -> int: + """ + Calculate the next event ID based on the current history. + + Returns: + The next event ID to use for new events + """ + if not self._current_history.events: + return 1 + + # Find the highest event ID in the current history + max_event_id = max( + event.event_id for event in self._current_history.events if event.event_id + ) + return max_event_id + 1 + + def get_current_history(self) -> History: + """ + Get the current workflow history. + + Returns: + The current History object + """ + return self._current_history + + def get_event_count(self) -> int: + """ + Get the current number of events in the history. + + Returns: + The number of events in the current history + """ + return len(self._current_history.events) + + def get_next_event_id(self) -> int: + """ + Get the next event ID that would be assigned to a new event. + + Returns: + The next event ID + """ + return self._next_event_id + + def add_decision_task_started_event( + self, task_token: bytes, identity: str + ) -> HistoryEvent: + """ + Add a DecisionTaskStarted event to the history. + + Args: + task_token: The decision task token + identity: The worker identity + + Returns: + The created HistoryEvent + """ + event = HistoryEvent() + event.event_id = self._next_event_id + event.decision_task_started_event_attributes.identity = identity + # Note: task_token would typically be stored in the decision task context + + self._current_history.events.append(event) + self._next_event_id += 1 + + logger.debug(f"Added DecisionTaskStarted event with ID {event.event_id}") + return event + + def add_decision_task_completed_event( + self, decisions: List[Decision], execution_context: bytes = b"" + ) -> HistoryEvent: + """ + Add a DecisionTaskCompleted event to the history. + + Args: + decisions: The list of decisions that were made + execution_context: Optional execution context + + Returns: + The created HistoryEvent + """ + event = HistoryEvent() + event.event_id = self._next_event_id + event.decision_task_completed_event_attributes.execution_context = ( + execution_context + ) + # Note: decisions would be processed and their effects would generate subsequent events + + self._current_history.events.append(event) + self._next_event_id += 1 + + logger.debug( + f"Added DecisionTaskCompleted event with ID {event.event_id} for {len(decisions)} decisions" + ) + return event + + def add_decision_task_failed_event( + self, cause: str, details: str = "" + ) -> HistoryEvent: + """ + Add a DecisionTaskFailed event to the history. + + Args: + cause: The cause of the failure + details: Additional failure details + + Returns: + The created HistoryEvent + """ + event = HistoryEvent() + event.event_id = self._next_event_id + event.decision_task_failed_event_attributes.cause = cause + event.decision_task_failed_event_attributes.details = details + + self._current_history.events.append(event) + self._next_event_id += 1 + + logger.debug( + f"Added DecisionTaskFailed event with ID {event.event_id}, cause: {cause}" + ) + return event + + def update_from_new_history(self, new_history: History) -> None: + """ + Update the current history with new events from a received history. + + This is typically called when a new decision task is received with + updated history that includes events that happened since the last task. + + Args: + new_history: The new history received from the service + """ + if not new_history or not new_history.events: + logger.debug("No new history events to process") + return + + # Find events that are newer than our current history + current_max_event_id = 0 + if self._current_history.events: + current_max_event_id = max( + event.event_id + for event in self._current_history.events + if event.event_id + ) + + new_events = [ + event + for event in new_history.events + if event.event_id > current_max_event_id + ] + + if new_events: + self._current_history.events.extend(new_events) + self._next_event_id = self._calculate_next_event_id() + logger.debug( + f"Added {len(new_events)} new events, next event ID: {self._next_event_id}" + ) + else: + logger.debug("No new events found in the provided history") + + def reset_to_history(self, history: History) -> None: + """ + Reset the current history to the provided history. + + This completely replaces the current history state. + + Args: + history: The new history to use + """ + self._current_history = history or History() + self._next_event_id = self._calculate_next_event_id() + logger.debug( + f"History reset to {len(self._current_history.events)} events, next event ID: {self._next_event_id}" + ) + + def find_event_by_id(self, event_id: int) -> Optional[HistoryEvent]: + """ + Find a specific event by its ID. + + Args: + event_id: The event ID to search for + + Returns: + The HistoryEvent if found, None otherwise + """ + for event in self._current_history.events: + if event.event_id == event_id: + return event + return None + + def get_events_since(self, event_id: int) -> List[HistoryEvent]: + """ + Get all events that occurred after the specified event ID. + + Args: + event_id: The event ID to search from + + Returns: + List of HistoryEvents that occurred after the specified ID + """ + return [ + event for event in self._current_history.events if event.event_id > event_id + ] diff --git a/cadence/_internal/workflow/workflow_engine.py b/cadence/_internal/workflow/workflow_engine.py index 6281f76..6cc6dbe 100644 --- a/cadence/_internal/workflow/workflow_engine.py +++ b/cadence/_internal/workflow/workflow_engine.py @@ -4,6 +4,9 @@ from typing import Callable, Any from cadence._internal.workflow.context import Context +from cadence._internal.workflow.deterministic_event_loop import DeterministicEventLoop +from cadence._internal.workflow.history_helper import HistoryHelper +from cadence._internal.workflow.decisions_helper import DecisionsHelper from cadence.api.v1.decision_pb2 import Decision from cadence.client import Client from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse @@ -22,6 +25,8 @@ def __init__(self, info: WorkflowInfo, client: Client, workflow_func: Callable[[ self._context = Context(client, info) self._workflow_func = workflow_func self._decision_manager = DecisionManager() + self._history_helper = HistoryHelper() + self._decisions_helper = DecisionsHelper() self._is_workflow_complete = False async def process_decision(self, decision_task: PollForDecisionTaskResponse) -> DecisionResult: @@ -37,8 +42,9 @@ async def process_decision(self, decision_task: PollForDecisionTaskResponse) -> try: logger.info(f"Processing decision task for workflow {self._context.info().workflow_id}") - # Process workflow history to update decision state machines + # Update history helper and process workflow history to update decision state machines if decision_task.history: + self._history_helper.update_from_new_history(decision_task.history) self._process_workflow_history(decision_task.history) # Execute workflow function to generate new decisions @@ -75,6 +81,7 @@ def _process_workflow_history(self, history) -> None: for event in history.events: try: self._decision_manager.handle_history_event(event) + self._decisions_helper.process_history_event(event) except Exception as e: logger.warning(f"Error processing history event: {e}") @@ -160,14 +167,12 @@ def _execute_workflow_function_sync(self, workflow_func: Callable, workflow_inpu # If the workflow function is async, we need to handle it properly if asyncio.iscoroutine(result): - # Create a simple event loop for async workflow functions + # Create a deterministic event loop for async workflow functions try: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + loop = DeterministicEventLoop() result = loop.run_until_complete(result) finally: loop.close() - asyncio.set_event_loop(None) return result diff --git a/cadence/worker/_decision.py b/cadence/worker/_decision.py deleted file mode 100644 index a95c812..0000000 --- a/cadence/worker/_decision.py +++ /dev/null @@ -1,42 +0,0 @@ -import asyncio -from typing import Optional - -from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskRequest, PollForDecisionTaskResponse -from cadence.api.v1.tasklist_pb2 import TaskList, TaskListKind -from cadence.client import Client -from cadence.worker._poller import Poller -from cadence.worker._types import WorkerOptions, _LONG_POLL_TIMEOUT -from cadence.worker._decision_task_handler import DecisionTaskHandler -from cadence.worker._registry import Registry - - -class DecisionWorker: - def __init__(self, client: Client, task_list: str, registry: Registry, options: WorkerOptions) -> None: - self._client = client - self._task_list = task_list - self._registry = registry - self._identity = options["identity"] - permits = asyncio.Semaphore(options["max_concurrent_decision_task_execution_size"]) - self._decision_handler = DecisionTaskHandler(client, task_list, registry, **options) - self._poller = Poller[PollForDecisionTaskResponse](options["decision_task_pollers"], permits, self._poll, self._execute) - # TODO: Sticky poller, actually running workflows, etc. - - async def run(self) -> None: - await self._poller.run() - - async def _poll(self) -> Optional[PollForDecisionTaskResponse]: - task: PollForDecisionTaskResponse = await self._client.worker_stub.PollForDecisionTask(PollForDecisionTaskRequest( - domain=self._client.domain, - task_list=TaskList(name=self._task_list,kind=TaskListKind.TASK_LIST_KIND_NORMAL), - identity=self._identity, - ), timeout=_LONG_POLL_TIMEOUT) - - if task and task.task_token: - return task - else: - return None - - - async def _execute(self, task: PollForDecisionTaskResponse) -> None: - await self._decision_handler.handle_task(task) - diff --git a/cadence/worker/_decision_worker.py b/cadence/worker/_decision_worker.py new file mode 100644 index 0000000..64f31c7 --- /dev/null +++ b/cadence/worker/_decision_worker.py @@ -0,0 +1,58 @@ +import asyncio +from typing import Optional + +from cadence.api.v1.service_worker_pb2 import ( + PollForDecisionTaskRequest, + PollForDecisionTaskResponse, +) +from cadence.api.v1.tasklist_pb2 import TaskList, TaskListKind +from cadence.client import Client +from cadence.worker._decision_task_handler import DecisionTaskHandler +from cadence.worker._poller import Poller +from cadence.worker._registry import Registry +from cadence.worker._types import _LONG_POLL_TIMEOUT, WorkerOptions + + +class DecisionWorker: + def __init__( + self, client: Client, task_list: str, registry: Registry, options: WorkerOptions + ) -> None: + self._client = client + self._task_list = task_list + self._registry = registry + self._identity = options["identity"] + permits = asyncio.Semaphore( + options["max_concurrent_decision_task_execution_size"] + ) + self._decision_handler = DecisionTaskHandler( + client, task_list, registry, **options + ) + self._poller = Poller[PollForDecisionTaskResponse]( + options["decision_task_pollers"], permits, self._poll, self._execute + ) + # TODO: Sticky poller, actually running workflows, etc. + + async def run(self) -> None: + await self._poller.run() + + async def _poll(self) -> Optional[PollForDecisionTaskResponse]: + task: PollForDecisionTaskResponse = ( + await self._client.worker_stub.PollForDecisionTask( + PollForDecisionTaskRequest( + domain=self._client.domain, + task_list=TaskList( + name=self._task_list, kind=TaskListKind.TASK_LIST_KIND_NORMAL + ), + identity=self._identity, + ), + timeout=_LONG_POLL_TIMEOUT, + ) + ) + + if task and task.task_token: + return task + else: + return None + + async def _execute(self, task: PollForDecisionTaskResponse) -> None: + await self._decision_handler.handle_task(task) diff --git a/cadence/worker/_worker.py b/cadence/worker/_worker.py index ff273ad..9eb39fb 100644 --- a/cadence/worker/_worker.py +++ b/cadence/worker/_worker.py @@ -5,7 +5,7 @@ from cadence.client import Client from cadence.worker._registry import Registry from cadence.worker._activity import ActivityWorker -from cadence.worker._decision import DecisionWorker +from cadence.worker._decision_worker import DecisionWorker from cadence.worker._types import WorkerOptions, _DEFAULT_WORKER_OPTIONS diff --git a/tests/cadence/worker/test_decision_worker_integration.py b/tests/cadence/worker/test_decision_worker_integration.py index 85c55d2..d38ebe4 100644 --- a/tests/cadence/worker/test_decision_worker_integration.py +++ b/tests/cadence/worker/test_decision_worker_integration.py @@ -9,7 +9,7 @@ from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse from cadence.api.v1.common_pb2 import Payload, WorkflowExecution, WorkflowType from cadence.api.v1.history_pb2 import History, HistoryEvent, WorkflowExecutionStartedEventAttributes -from cadence.worker._decision import DecisionWorker +from cadence.worker._decision_worker import DecisionWorker from cadence.worker._registry import Registry from cadence.client import Client From a8432b6b1530af341c7f58f4a38b55392c5bc1f2 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Wed, 1 Oct 2025 16:17:19 -0700 Subject: [PATCH 06/12] integrate with event iterator Signed-off-by: Tim Li --- cadence/_internal/workflow/context.py | 19 ++ .../_internal/workflow/decisions_helper.py | 107 +----- cadence/_internal/workflow/history_helper.py | 239 ------------- cadence/_internal/workflow/workflow_engine.py | 321 ++++++++++++++---- cadence/worker/_decision_task_handler.py | 109 +++++- .../worker/test_decision_task_handler.py | 3 +- 6 files changed, 391 insertions(+), 407 deletions(-) delete mode 100644 cadence/_internal/workflow/history_helper.py diff --git a/cadence/_internal/workflow/context.py b/cadence/_internal/workflow/context.py index 5038e48..87184d8 100644 --- a/cadence/_internal/workflow/context.py +++ b/cadence/_internal/workflow/context.py @@ -1,3 +1,4 @@ +from typing import Optional from cadence.client import Client from cadence.workflow import WorkflowContext, WorkflowInfo @@ -7,9 +8,27 @@ class Context(WorkflowContext): def __init__(self, client: Client, info: WorkflowInfo): self._client = client self._info = info + self._replay_mode = True + self._replay_current_time_milliseconds: Optional[int] = None def info(self) -> WorkflowInfo: return self._info def client(self) -> Client: return self._client + + def set_replay_mode(self, replay: bool) -> None: + """Set whether the workflow is currently in replay mode.""" + self._replay_mode = replay + + def is_replay_mode(self) -> bool: + """Check if the workflow is currently in replay mode.""" + return self._replay_mode + + def set_replay_current_time_milliseconds(self, time_millis: int) -> None: + """Set the current replay time in milliseconds.""" + self._replay_current_time_milliseconds = time_millis + + def get_replay_current_time_milliseconds(self) -> Optional[int]: + """Get the current replay time in milliseconds.""" + return self._replay_current_time_milliseconds diff --git a/cadence/_internal/workflow/decisions_helper.py b/cadence/_internal/workflow/decisions_helper.py index e3081b4..4099150 100644 --- a/cadence/_internal/workflow/decisions_helper.py +++ b/cadence/_internal/workflow/decisions_helper.py @@ -9,8 +9,7 @@ from dataclasses import dataclass from typing import Dict, Optional -from cadence._internal.decision_state_machine import DecisionId, DecisionType -from cadence.api.v1.history_pb2 import HistoryEvent +from cadence._internal.decision_state_machine import DecisionId, DecisionType, DecisionManager logger = logging.getLogger(__name__) @@ -28,18 +27,24 @@ class DecisionTracker: class DecisionsHelper: """ - Helper class to manage decision IDs and track decision state across workflow execution. + Helper class to manage decision IDs and work with DecisionManager state machines. - This class ensures that each decision gets a unique ID and tracks the lifecycle - of decisions through the workflow execution. + This class generates unique decision IDs and integrates with the DecisionManager + state machines for proper decision lifecycle tracking. """ - def __init__(self): - """Initialize the DecisionsHelper.""" + def __init__(self, decision_manager: DecisionManager): + """ + Initialize the DecisionsHelper with a DecisionManager reference. + + Args: + decision_manager: The DecisionManager containing the state machines + """ self._next_decision_counters: Dict[DecisionType, int] = {} self._tracked_decisions: Dict[str, DecisionTracker] = {} self._decision_id_to_key: Dict[str, str] = {} - logger.debug("DecisionsHelper initialized") + self._decision_manager = decision_manager + logger.debug("DecisionsHelper initialized with DecisionManager integration") def _get_next_counter(self, decision_type: DecisionType) -> int: """ @@ -227,92 +232,6 @@ def update_decision_completed(self, decision_key: str) -> None: else: logger.warning(f"No tracker found for decision key: {decision_key}") - def process_history_event(self, event: HistoryEvent) -> None: - """ - Process a history event and update decision trackers accordingly. - - Args: - event: The history event to process - """ - attr = event.WhichOneof("attributes") - if not attr: - return - - # Handle activity events - if attr == "activity_task_scheduled_event_attributes": - attrs = event.activity_task_scheduled_event_attributes - if hasattr(attrs, "activity_id"): - self.update_decision_scheduled(attrs.activity_id, event.event_id) - - elif attr == "activity_task_started_event_attributes": - attrs = event.activity_task_started_event_attributes - if hasattr(attrs, "scheduled_event_id"): - # Find the decision by scheduled event ID - decision_key = self._find_decision_by_scheduled_event_id( - attrs.scheduled_event_id - ) - if decision_key: - self.update_decision_started(decision_key, event.event_id) - - elif attr in [ - "activity_task_completed_event_attributes", - "activity_task_failed_event_attributes", - "activity_task_timed_out_event_attributes", - ]: - attrs = getattr(event, attr) - if hasattr(attrs, "scheduled_event_id"): - # Find the decision by scheduled event ID - decision_key = self._find_decision_by_scheduled_event_id( - attrs.scheduled_event_id - ) - if decision_key: - self.update_decision_completed(decision_key) - - # Handle timer events - elif attr == "timer_started_event_attributes": - attrs = event.timer_started_event_attributes - if hasattr(attrs, "timer_id"): - self.update_decision_initiated(attrs.timer_id, event.event_id) - - elif attr == "timer_fired_event_attributes": - attrs = event.timer_fired_event_attributes - if hasattr(attrs, "started_event_id"): - # Find the decision by started event ID - decision_key = self._find_decision_by_started_event_id( - attrs.started_event_id - ) - if decision_key: - self.update_decision_completed(decision_key) - - # Handle child workflow events - elif attr == "start_child_workflow_execution_initiated_event_attributes": - attrs = event.start_child_workflow_execution_initiated_event_attributes - if hasattr(attrs, "workflow_id"): - self.update_decision_initiated(attrs.workflow_id, event.event_id) - - elif attr == "child_workflow_execution_started_event_attributes": - attrs = event.child_workflow_execution_started_event_attributes - if hasattr(attrs, "initiated_event_id"): - # Find the decision by initiated event ID - decision_key = self._find_decision_by_initiated_event_id( - attrs.initiated_event_id - ) - if decision_key: - self.update_decision_started(decision_key, event.event_id) - - elif attr in [ - "child_workflow_execution_completed_event_attributes", - "child_workflow_execution_failed_event_attributes", - "child_workflow_execution_timed_out_event_attributes", - ]: - attrs = getattr(event, attr) - if hasattr(attrs, "initiated_event_id"): - # Find the decision by initiated event ID - decision_key = self._find_decision_by_initiated_event_id( - attrs.initiated_event_id - ) - if decision_key: - self.update_decision_completed(decision_key) def _find_decision_by_scheduled_event_id( self, scheduled_event_id: int diff --git a/cadence/_internal/workflow/history_helper.py b/cadence/_internal/workflow/history_helper.py deleted file mode 100644 index 9194b79..0000000 --- a/cadence/_internal/workflow/history_helper.py +++ /dev/null @@ -1,239 +0,0 @@ -""" -HistoryHelper manages the addition of workflow history when a decision task is responded. - -This helper ensures that the workflow history is properly tracked and updated -when decisions are made and responses are sent back to the Cadence service. -""" - -import logging -from typing import List, Optional - -from cadence.api.v1.decision_pb2 import Decision -from cadence.api.v1.history_pb2 import History, HistoryEvent - -logger = logging.getLogger(__name__) - - -class HistoryHelper: - """ - Helper class to manage workflow history updates when decisions are responded. - - This class tracks the current history state and adds new events as decisions - are processed and responses are generated. - """ - - def __init__(self, initial_history: Optional[History] = None): - """ - Initialize the HistoryHelper with optional initial history. - - Args: - initial_history: The initial workflow history to start with - """ - self._current_history = initial_history or History() - self._next_event_id = self._calculate_next_event_id() - logger.debug( - f"HistoryHelper initialized with {len(self._current_history.events)} events, next event ID: {self._next_event_id}" - ) - - def _calculate_next_event_id(self) -> int: - """ - Calculate the next event ID based on the current history. - - Returns: - The next event ID to use for new events - """ - if not self._current_history.events: - return 1 - - # Find the highest event ID in the current history - max_event_id = max( - event.event_id for event in self._current_history.events if event.event_id - ) - return max_event_id + 1 - - def get_current_history(self) -> History: - """ - Get the current workflow history. - - Returns: - The current History object - """ - return self._current_history - - def get_event_count(self) -> int: - """ - Get the current number of events in the history. - - Returns: - The number of events in the current history - """ - return len(self._current_history.events) - - def get_next_event_id(self) -> int: - """ - Get the next event ID that would be assigned to a new event. - - Returns: - The next event ID - """ - return self._next_event_id - - def add_decision_task_started_event( - self, task_token: bytes, identity: str - ) -> HistoryEvent: - """ - Add a DecisionTaskStarted event to the history. - - Args: - task_token: The decision task token - identity: The worker identity - - Returns: - The created HistoryEvent - """ - event = HistoryEvent() - event.event_id = self._next_event_id - event.decision_task_started_event_attributes.identity = identity - # Note: task_token would typically be stored in the decision task context - - self._current_history.events.append(event) - self._next_event_id += 1 - - logger.debug(f"Added DecisionTaskStarted event with ID {event.event_id}") - return event - - def add_decision_task_completed_event( - self, decisions: List[Decision], execution_context: bytes = b"" - ) -> HistoryEvent: - """ - Add a DecisionTaskCompleted event to the history. - - Args: - decisions: The list of decisions that were made - execution_context: Optional execution context - - Returns: - The created HistoryEvent - """ - event = HistoryEvent() - event.event_id = self._next_event_id - event.decision_task_completed_event_attributes.execution_context = ( - execution_context - ) - # Note: decisions would be processed and their effects would generate subsequent events - - self._current_history.events.append(event) - self._next_event_id += 1 - - logger.debug( - f"Added DecisionTaskCompleted event with ID {event.event_id} for {len(decisions)} decisions" - ) - return event - - def add_decision_task_failed_event( - self, cause: str, details: str = "" - ) -> HistoryEvent: - """ - Add a DecisionTaskFailed event to the history. - - Args: - cause: The cause of the failure - details: Additional failure details - - Returns: - The created HistoryEvent - """ - event = HistoryEvent() - event.event_id = self._next_event_id - event.decision_task_failed_event_attributes.cause = cause - event.decision_task_failed_event_attributes.details = details - - self._current_history.events.append(event) - self._next_event_id += 1 - - logger.debug( - f"Added DecisionTaskFailed event with ID {event.event_id}, cause: {cause}" - ) - return event - - def update_from_new_history(self, new_history: History) -> None: - """ - Update the current history with new events from a received history. - - This is typically called when a new decision task is received with - updated history that includes events that happened since the last task. - - Args: - new_history: The new history received from the service - """ - if not new_history or not new_history.events: - logger.debug("No new history events to process") - return - - # Find events that are newer than our current history - current_max_event_id = 0 - if self._current_history.events: - current_max_event_id = max( - event.event_id - for event in self._current_history.events - if event.event_id - ) - - new_events = [ - event - for event in new_history.events - if event.event_id > current_max_event_id - ] - - if new_events: - self._current_history.events.extend(new_events) - self._next_event_id = self._calculate_next_event_id() - logger.debug( - f"Added {len(new_events)} new events, next event ID: {self._next_event_id}" - ) - else: - logger.debug("No new events found in the provided history") - - def reset_to_history(self, history: History) -> None: - """ - Reset the current history to the provided history. - - This completely replaces the current history state. - - Args: - history: The new history to use - """ - self._current_history = history or History() - self._next_event_id = self._calculate_next_event_id() - logger.debug( - f"History reset to {len(self._current_history.events)} events, next event ID: {self._next_event_id}" - ) - - def find_event_by_id(self, event_id: int) -> Optional[HistoryEvent]: - """ - Find a specific event by its ID. - - Args: - event_id: The event ID to search for - - Returns: - The HistoryEvent if found, None otherwise - """ - for event in self._current_history.events: - if event.event_id == event_id: - return event - return None - - def get_events_since(self, event_id: int) -> List[HistoryEvent]: - """ - Get all events that occurred after the specified event ID. - - Args: - event_id: The event ID to search from - - Returns: - List of HistoryEvents that occurred after the specified ID - """ - return [ - event for event in self._current_history.events if event.event_id > event_id - ] diff --git a/cadence/_internal/workflow/workflow_engine.py b/cadence/_internal/workflow/workflow_engine.py index 6cc6dbe..48a4d3f 100644 --- a/cadence/_internal/workflow/workflow_engine.py +++ b/cadence/_internal/workflow/workflow_engine.py @@ -4,9 +4,8 @@ from typing import Callable, Any from cadence._internal.workflow.context import Context -from cadence._internal.workflow.deterministic_event_loop import DeterministicEventLoop -from cadence._internal.workflow.history_helper import HistoryHelper from cadence._internal.workflow.decisions_helper import DecisionsHelper +from cadence._internal.workflow.decision_events_iterator import DecisionEventsIterator from cadence.api.v1.decision_pb2 import Decision from cadence.client import Client from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse @@ -25,65 +24,228 @@ def __init__(self, info: WorkflowInfo, client: Client, workflow_func: Callable[[ self._context = Context(client, info) self._workflow_func = workflow_func self._decision_manager = DecisionManager() - self._history_helper = HistoryHelper() - self._decisions_helper = DecisionsHelper() + self._decisions_helper = DecisionsHelper(self._decision_manager) self._is_workflow_complete = False async def process_decision(self, decision_task: PollForDecisionTaskResponse) -> DecisionResult: """ - Process a decision task and generate decisions. - + Process a decision task and generate decisions using DecisionEventsIterator. + + This method follows the Java client pattern of using DecisionEventsIterator + to drive the decision processing pipeline with proper replay handling. + Args: decision_task: The PollForDecisionTaskResponse from the service - + Returns: DecisionResult containing the list of decisions """ try: - logger.info(f"Processing decision task for workflow {self._context.info().workflow_id}") - - # Update history helper and process workflow history to update decision state machines - if decision_task.history: - self._history_helper.update_from_new_history(decision_task.history) - self._process_workflow_history(decision_task.history) - - # Execute workflow function to generate new decisions - if not self._is_workflow_complete: - await self._execute_workflow_function(decision_task) - - # Collect all pending decisions from state machines - decisions = self._decision_manager.collect_pending_decisions() - - # Close decider's event loop - self._close_event_loop() - - logger.info(f"Generated {len(decisions)} decisions for workflow {self._context.info().workflow_id}") - - return DecisionResult(decisions=decisions) - - except Exception: - logger.exception(f"Error processing decision task for workflow {self._context.info().workflow_id}") + # Log decision task processing start with full context (matches Java ReplayDecisionTaskHandler) + logger.info( + "Processing decision task for workflow", + extra={ + "workflow_type": self._context.info().workflow_type, + "workflow_id": self._context.info().workflow_id, + "run_id": self._context.info().workflow_run_id, + "started_event_id": getattr(decision_task, 'started_event_id', None), + "attempt": getattr(decision_task, 'attempt', None) + } + ) + + # Activate workflow context for the entire decision processing + with self._context._activate(): + # Create DecisionEventsIterator for structured event processing + events_iterator = DecisionEventsIterator(decision_task, self._context.client()) + + # Process decision events using iterator-driven approach + await self._process_decision_events(events_iterator, decision_task) + + # Collect all pending decisions from state machines + decisions = self._decision_manager.collect_pending_decisions() + + # Close decider's event loop + self._close_event_loop() + + # Log decision task completion with metrics (matches Java ReplayDecisionTaskHandler) + logger.debug( + "Decision task completed", + extra={ + "workflow_type": self._context.info().workflow_type, + "workflow_id": self._context.info().workflow_id, + "run_id": self._context.info().workflow_run_id, + "started_event_id": getattr(decision_task, 'started_event_id', None), + "decisions_count": len(decisions), + "replay_mode": self._context.is_replay_mode() + } + ) + + return DecisionResult(decisions=decisions) + + except Exception as e: + # Log decision task failure with full context (matches Java ReplayDecisionTaskHandler) + logger.error( + "Decision task processing failed", + extra={ + "workflow_type": self._context.info().workflow_type, + "workflow_id": self._context.info().workflow_id, + "run_id": self._context.info().workflow_run_id, + "started_event_id": getattr(decision_task, 'started_event_id', None), + "attempt": getattr(decision_task, 'attempt', None), + "error_type": type(e).__name__ + }, + exc_info=True + ) # Return empty decisions on error - the task will be failed by the handler return DecisionResult(decisions=[]) - def _process_workflow_history(self, history) -> None: + async def _process_decision_events(self, events_iterator: DecisionEventsIterator, decision_task: PollForDecisionTaskResponse) -> None: """ - Process workflow history events to update decision state machines. - + Process decision events using the iterator-driven approach similar to Java client. + + This method implements the three-phase event processing pattern: + 1. Process markers first (for deterministic replay) + 2. Process regular events (trigger workflow state changes) + 3. Execute workflow logic + 4. Process decision events from previous decisions + + Args: + events_iterator: The DecisionEventsIterator for structured event processing + decision_task: The original decision task + """ + # Track if we processed any decision events + processed_any_decision_events = False + + # Check if there are any decision events to process + while await events_iterator.has_next_decision_events(): + decision_events = await events_iterator.next_decision_events() + processed_any_decision_events = True + + # Log decision events batch processing (matches Go client patterns) + logger.debug( + "Processing decision events batch", + extra={ + "workflow_id": self._context.info().workflow_id, + "events_count": len(decision_events.get_events()), + "markers_count": len(decision_events.get_markers()), + "replay_mode": decision_events.is_replay(), + "replay_time": decision_events.replay_current_time_milliseconds + } + ) + + # Update context with replay information + self._context.set_replay_mode(decision_events.is_replay()) + if decision_events.replay_current_time_milliseconds: + self._context.set_replay_current_time_milliseconds(decision_events.replay_current_time_milliseconds) + + # Phase 1: Process markers first for deterministic replay + for marker_event in decision_events.get_markers(): + try: + logger.debug( + "Processing marker event", + extra={ + "workflow_id": self._context.info().workflow_id, + "marker_name": getattr(marker_event, 'marker_name', 'unknown'), + "event_id": getattr(marker_event, 'event_id', None), + "replay_mode": self._context.is_replay_mode() + } + ) + # Process through state machines (DecisionsHelper now delegates to DecisionManager) + self._decision_manager.handle_history_event(marker_event) + except Exception as e: + # Warning for unexpected markers (matches Java ClockDecisionContext) + logger.warning( + "Unexpected marker event encountered", + extra={ + "workflow_id": self._context.info().workflow_id, + "marker_name": getattr(marker_event, 'marker_name', 'unknown'), + "event_id": getattr(marker_event, 'event_id', None), + "error_type": type(e).__name__ + }, + exc_info=True + ) + + # Phase 2: Process regular events to update workflow state + for event in decision_events.get_events(): + try: + logger.debug( + "Processing history event", + extra={ + "workflow_id": self._context.info().workflow_id, + "event_type": getattr(event, 'event_type', 'unknown'), + "event_id": getattr(event, 'event_id', None), + "replay_mode": self._context.is_replay_mode() + } + ) + # Process through state machines (DecisionsHelper now delegates to DecisionManager) + self._decision_manager.handle_history_event(event) + except Exception as e: + logger.warning( + "Error processing history event", + extra={ + "workflow_id": self._context.info().workflow_id, + "event_type": getattr(event, 'event_type', 'unknown'), + "event_id": getattr(event, 'event_id', None), + "error_type": type(e).__name__ + }, + exc_info=True + ) + + # Phase 3: Execute workflow logic if not in replay mode + if not decision_events.is_replay() and not self._is_workflow_complete: + await self._execute_workflow_function(decision_task) + + # If no decision events were processed but we have history, fall back to direct processing + # This handles edge cases where the iterator doesn't find decision events + if not processed_any_decision_events and decision_task.history and hasattr(decision_task.history, 'events'): + logger.debug( + "No decision events found by iterator, falling back to direct history processing", + extra={ + "workflow_id": self._context.info().workflow_id, + "history_events_count": len(decision_task.history.events) if decision_task.history else 0 + } + ) + self._fallback_process_workflow_history(decision_task.history) + if not self._is_workflow_complete: + await self._execute_workflow_function(decision_task) + + + def _fallback_process_workflow_history(self, history) -> None: + """ + Fallback method to process workflow history events directly. + + This is used when DecisionEventsIterator doesn't find decision events, + maintaining backward compatibility. + Args: history: The workflow history from the decision task """ if not history or not hasattr(history, 'events'): return - - logger.debug(f"Processing {len(history.events)} history events") - + + logger.debug( + "Processing history events in fallback mode", + extra={ + "workflow_id": self._context.info().workflow_id, + "events_count": len(history.events) + } + ) + for event in history.events: try: + # Process through state machines (DecisionsHelper now delegates to DecisionManager) self._decision_manager.handle_history_event(event) - self._decisions_helper.process_history_event(event) except Exception as e: - logger.warning(f"Error processing history event: {e}") + logger.warning( + "Error processing history event in fallback mode", + extra={ + "workflow_id": self._context.info().workflow_id, + "event_type": getattr(event, 'event_type', 'unknown'), + "event_id": getattr(event, 'event_id', None), + "error_type": type(e).__name__ + }, + exc_info=True + ) async def _execute_workflow_function(self, decision_task: PollForDecisionTaskResponse) -> None: """ @@ -95,27 +257,51 @@ async def _execute_workflow_function(self, decision_task: PollForDecisionTaskRes decision_task: The decision task containing workflow context """ try: - with self._context._activate(): - # Execute the workflow function - # The workflow function should block until it schedules an activity - workflow_func = self._workflow_func - if workflow_func is None: - logger.warning(f"No workflow function available for workflow {self._context.info().workflow_id}") - return - - # Extract workflow input from history - workflow_input = await self._extract_workflow_input(decision_task) - - # Execute workflow function - result = self._execute_workflow_function_sync(workflow_func, workflow_input) - - # Check if workflow is complete - if result is not None: - self._is_workflow_complete = True - logger.info(f"Workflow {self._context.info().workflow_id} completed") + # Execute the workflow function + # The workflow function should block until it schedules an activity + workflow_func = self._workflow_func + if workflow_func is None: + logger.warning( + "No workflow function available", + extra={ + "workflow_type": self._context.info().workflow_type, + "workflow_id": self._context.info().workflow_id, + "run_id": self._context.info().workflow_run_id + } + ) + return + + # Extract workflow input from history + workflow_input = await self._extract_workflow_input(decision_task) + + # Execute workflow function + result = self._execute_workflow_function_sync(workflow_func, workflow_input) + + # Check if workflow is complete + if result is not None: + self._is_workflow_complete = True + # Log workflow completion (matches Go client patterns) + logger.info( + "Workflow execution completed", + extra={ + "workflow_type": self._context.info().workflow_type, + "workflow_id": self._context.info().workflow_id, + "run_id": self._context.info().workflow_run_id, + "completion_type": "success" + } + ) - except Exception: - logger.exception(f"Error executing workflow function for {self._context.info().workflow_id}") + except Exception as e: + logger.error( + "Error executing workflow function", + extra={ + "workflow_type": self._context.info().workflow_type, + "workflow_id": self._context.info().workflow_id, + "run_id": self._context.info().workflow_run_id, + "error_type": type(e).__name__ + }, + exc_info=True + ) raise async def _extract_workflow_input(self, decision_task: PollForDecisionTaskResponse) -> Any: @@ -167,12 +353,21 @@ def _execute_workflow_function_sync(self, workflow_func: Callable, workflow_inpu # If the workflow function is async, we need to handle it properly if asyncio.iscoroutine(result): - # Create a deterministic event loop for async workflow functions + # For now, use asyncio.run for async workflow functions + # TODO: Implement proper deterministic event loop for workflow execution try: - loop = DeterministicEventLoop() - result = loop.run_until_complete(result) - finally: - loop.close() + result = asyncio.run(result) + except RuntimeError: + # If we're already in an event loop, create a new task + loop = asyncio.get_event_loop() + if loop.is_running(): + # We can't use asyncio.run inside a running loop + # For now, just get the result (this may not be deterministic) + logger.warning("Async workflow function called within running event loop - may not be deterministic") + # This is a workaround - in a real implementation, we'd need proper task scheduling + result = None + else: + result = loop.run_until_complete(result) return result diff --git a/cadence/worker/_decision_task_handler.py b/cadence/worker/_decision_task_handler.py index 793c596..70e1aa2 100644 --- a/cadence/worker/_decision_task_handler.py +++ b/cadence/worker/_decision_task_handler.py @@ -57,12 +57,31 @@ async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) - run_id = workflow_execution.run_id workflow_type_name = workflow_type.name - logger.info(f"Processing decision task for workflow {workflow_id} (type: {workflow_type_name})") + # This log matches the WorkflowEngine but at task handler level (like Java ReplayDecisionTaskHandler) + logger.info( + "Received decision task for workflow", + extra={ + "workflow_type": workflow_type_name, + "workflow_id": workflow_id, + "run_id": run_id, + "started_event_id": getattr(task, 'started_event_id', None), + "attempt": getattr(task, 'attempt', None), + "task_token": task.task_token[:16].hex() if task.task_token else None # Log partial token for debugging + } + ) try: workflow_func = self._registry.get_workflow(workflow_type_name) except KeyError: - logger.error(f"Workflow type '{workflow_type_name}' not found in registry") + logger.error( + "Workflow type not found in registry", + extra={ + "workflow_type": workflow_type_name, + "workflow_id": workflow_id, + "run_id": run_id, + "error_type": "workflow_not_registered" + } + ) raise KeyError(f"Workflow type '{workflow_type_name}' not found") # Create workflow info and engine @@ -84,7 +103,15 @@ async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) - # Respond with the decisions await self._respond_decision_task_completed(task, decision_result) - logger.info(f"Successfully processed decision task for workflow {workflow_id}") + logger.info( + "Successfully processed decision task", + extra={ + "workflow_type": workflow_type_name, + "workflow_id": workflow_id, + "run_id": run_id, + "started_event_id": getattr(task, 'started_event_id', None) + } + ) async def handle_task_failure(self, task: PollForDecisionTaskResponse, error: Exception) -> None: """ @@ -94,7 +121,26 @@ async def handle_task_failure(self, task: PollForDecisionTaskResponse, error: Ex task: The task that failed error: The exception that occurred """ - logger.error(f"Decision task failed: {error}") + # Extract workflow context for error logging (matches Java ReplayDecisionTaskHandler error patterns) + workflow_execution = task.workflow_execution + workflow_id = workflow_execution.workflow_id if workflow_execution else "unknown" + run_id = workflow_execution.run_id if workflow_execution else "unknown" + workflow_type = task.workflow_type.name if task.workflow_type else "unknown" + + # Log task failure with full context (matches Java error logging) + logger.error( + "Decision task processing failure", + extra={ + "workflow_type": workflow_type, + "workflow_id": workflow_id, + "run_id": run_id, + "started_event_id": getattr(task, 'started_event_id', None), + "attempt": getattr(task, 'attempt', None), + "error_type": type(error).__name__, + "error_message": str(error) + }, + exc_info=True + ) # Determine the failure cause cause = DecisionTaskFailedCause.DECISION_TASK_FAILED_CAUSE_UNHANDLED_DECISION @@ -118,9 +164,26 @@ async def handle_task_failure(self, task: PollForDecisionTaskResponse, error: Ex details=details ) ) - logger.info("Decision task failure response sent") - except Exception: - logger.exception("Error responding to decision task failure") + logger.info( + "Decision task failure response sent", + extra={ + "workflow_id": workflow_id, + "run_id": run_id, + "cause": cause, + "task_token": task.task_token[:16].hex() if task.task_token else None + } + ) + except Exception as e: + logger.error( + "Error responding to decision task failure", + extra={ + "workflow_id": workflow_id, + "run_id": run_id, + "original_error": type(error).__name__, + "response_error": type(e).__name__ + }, + exc_info=True + ) async def _respond_decision_task_completed(self, task: PollForDecisionTaskResponse, decision_result: DecisionResult) -> None: @@ -140,8 +203,34 @@ async def _respond_decision_task_completed(self, task: PollForDecisionTaskRespon ) await self._client.worker_stub.RespondDecisionTaskCompleted(request) - logger.debug(f"Decision task completed with {len(decision_result.decisions)} decisions") + + # Log completion response (matches Java ReplayDecisionTaskHandler trace/debug patterns) + workflow_execution = task.workflow_execution + logger.debug( + "Decision task completion response sent", + extra={ + "workflow_type": task.workflow_type.name if task.workflow_type else "unknown", + "workflow_id": workflow_execution.workflow_id if workflow_execution else "unknown", + "run_id": workflow_execution.run_id if workflow_execution else "unknown", + "started_event_id": getattr(task, 'started_event_id', None), + "decisions_count": len(decision_result.decisions), + "return_new_decision_task": True, + "task_token": task.task_token[:16].hex() if task.task_token else None + } + ) - except Exception: - logger.exception("Error responding to decision task completion") + except Exception as e: + workflow_execution = task.workflow_execution + logger.error( + "Error responding to decision task completion", + extra={ + "workflow_type": task.workflow_type.name if task.workflow_type else "unknown", + "workflow_id": workflow_execution.workflow_id if workflow_execution else "unknown", + "run_id": workflow_execution.run_id if workflow_execution else "unknown", + "started_event_id": getattr(task, 'started_event_id', None), + "decisions_count": len(decision_result.decisions), + "error_type": type(e).__name__ + }, + exc_info=True + ) raise diff --git a/tests/cadence/worker/test_decision_task_handler.py b/tests/cadence/worker/test_decision_task_handler.py index 687e148..44e440e 100644 --- a/tests/cadence/worker/test_decision_task_handler.py +++ b/tests/cadence/worker/test_decision_task_handler.py @@ -220,7 +220,8 @@ async def test_handle_task_failure_respond_error(self, handler, sample_decision_ # Should not raise exception, but should log error with patch('cadence.worker._decision_task_handler.logger') as mock_logger: await handler.handle_task_failure(sample_decision_task, error) - mock_logger.exception.assert_called_once() + # Now uses logger.error with exc_info=True instead of logger.exception + mock_logger.error.assert_called() @pytest.mark.asyncio async def test_respond_decision_task_completed_success(self, handler, sample_decision_task): From c6f1165b240741cb25d8f357500750aa6e7c2d8a Mon Sep 17 00:00:00 2001 From: Tim Li Date: Thu, 2 Oct 2025 09:33:49 -0700 Subject: [PATCH 07/12] minor rename Signed-off-by: Tim Li --- cadence/_internal/workflow/workflow_engine.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cadence/_internal/workflow/workflow_engine.py b/cadence/_internal/workflow/workflow_engine.py index 48a4d3f..f1fb0d6 100644 --- a/cadence/_internal/workflow/workflow_engine.py +++ b/cadence/_internal/workflow/workflow_engine.py @@ -275,7 +275,7 @@ async def _execute_workflow_function(self, decision_task: PollForDecisionTaskRes workflow_input = await self._extract_workflow_input(decision_task) # Execute workflow function - result = self._execute_workflow_function_sync(workflow_func, workflow_input) + result = self._execute_workflow_function_once(workflow_func, workflow_input) # Check if workflow is complete if result is not None: @@ -337,14 +337,14 @@ async def _extract_workflow_input(self, decision_task: PollForDecisionTaskRespon logger.warning("No WorkflowExecutionStarted event found in history") return None - def _execute_workflow_function_sync(self, workflow_func: Callable, workflow_input: Any) -> Any: + def _execute_workflow_function_once(self, workflow_func: Callable, workflow_input: Any) -> Any: """ - Execute the workflow function synchronously. - + Execute the workflow function once (not during replay). + Args: workflow_func: The workflow function to execute workflow_input: The input data for the workflow function - + Returns: The result of the workflow function execution """ From 17a73164d763782aee3098b7e57f0f913f5e5f77 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Thu, 2 Oct 2025 09:54:49 -0700 Subject: [PATCH 08/12] fix test Signed-off-by: Tim Li --- .../_internal/workflow/test_workflow_engine_integration.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/cadence/_internal/workflow/test_workflow_engine_integration.py b/tests/cadence/_internal/workflow/test_workflow_engine_integration.py index 7e210d6..cb1f449 100644 --- a/tests/cadence/_internal/workflow/test_workflow_engine_integration.py +++ b/tests/cadence/_internal/workflow/test_workflow_engine_integration.py @@ -213,7 +213,7 @@ def test_execute_workflow_function_sync(self, workflow_engine): input_data = "test-input" # Execute the workflow function - result = workflow_engine._execute_workflow_function_sync(workflow_engine._workflow_func, input_data) + result = workflow_engine._execute_workflow_function_once(workflow_engine._workflow_func, input_data) # Verify the result assert result == "processed: test-input" @@ -226,7 +226,7 @@ async def async_workflow_func(input_data): input_data = "test-input" # Execute the async workflow function - result = workflow_engine._execute_workflow_function_sync(async_workflow_func, input_data) + result = workflow_engine._execute_workflow_function_once(async_workflow_func, input_data) # Verify the result assert result == "async-processed: test-input" @@ -237,7 +237,7 @@ def test_execute_workflow_function_none(self, workflow_engine): # Execute with None workflow function - should raise TypeError with pytest.raises(TypeError, match="'NoneType' object is not callable"): - workflow_engine._execute_workflow_function_sync(None, input_data) + workflow_engine._execute_workflow_function_once(None, input_data) def test_workflow_engine_initialization(self, workflow_engine, workflow_info, mock_client, mock_workflow_func): """Test WorkflowEngine initialization.""" From 01fa7a3c4bd597648e4847550f8828c1848d20d8 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Thu, 2 Oct 2025 13:28:19 -0700 Subject: [PATCH 09/12] respond to comment 1 Signed-off-by: Tim Li --- .../{_decision_worker.py => _decision.py} | 0 cadence/worker/_decision_task_handler.py | 43 +++++++++++---- cadence/worker/_worker.py | 2 +- .../worker/test_decision_task_handler.py | 54 +++++++++++++++++-- .../test_decision_worker_integration.py | 2 +- 5 files changed, 86 insertions(+), 15 deletions(-) rename cadence/worker/{_decision_worker.py => _decision.py} (100%) diff --git a/cadence/worker/_decision_worker.py b/cadence/worker/_decision.py similarity index 100% rename from cadence/worker/_decision_worker.py rename to cadence/worker/_decision.py diff --git a/cadence/worker/_decision_task_handler.py b/cadence/worker/_decision_task_handler.py index 70e1aa2..cca14b3 100644 --- a/cadence/worker/_decision_task_handler.py +++ b/cadence/worker/_decision_task_handler.py @@ -1,4 +1,6 @@ import logging +import threading +from typing import Dict, Tuple from cadence.api.v1.common_pb2 import Payload from cadence.api.v1.service_worker_pb2 import ( @@ -19,7 +21,8 @@ class DecisionTaskHandler(BaseTaskHandler[PollForDecisionTaskResponse]): """ Task handler for processing decision tasks. - This handler processes decision tasks and generates decisions using the workflow engine. + This handler processes decision tasks and generates decisions using workflow engines. + Uses a thread-safe cache to hold workflow engines for concurrent decision task handling. """ def __init__(self, client: Client, task_list: str, registry: Registry, identity: str = "unknown", **options): @@ -35,7 +38,9 @@ def __init__(self, client: Client, task_list: str, registry: Registry, identity: """ super().__init__(client, task_list, identity, **options) self._registry = registry - self._workflow_engine: WorkflowEngine + # Thread-safe cache to hold workflow engines keyed by (workflow_id, run_id) + self._workflow_engines: Dict[Tuple[str, str], WorkflowEngine] = {} + self._cache_lock = threading.RLock() async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) -> None: @@ -84,7 +89,7 @@ async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) - ) raise KeyError(f"Workflow type '{workflow_type_name}' not found") - # Create workflow info and engine + # Create workflow info and get or create workflow engine from cache workflow_info = WorkflowInfo( workflow_type=workflow_type_name, workflow_domain=self._client.domain, @@ -92,13 +97,33 @@ async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) - workflow_run_id=run_id ) - self._workflow_engine = WorkflowEngine( - info=workflow_info, - client=self._client, - workflow_func=workflow_func - ) + # Use thread-safe cache to get or create workflow engine + cache_key = (workflow_id, run_id) + with self._cache_lock: + workflow_engine = self._workflow_engines.get(cache_key) + if workflow_engine is None: + workflow_engine = WorkflowEngine( + info=workflow_info, + client=self._client, + workflow_func=workflow_func + ) + self._workflow_engines[cache_key] = workflow_engine - decision_result = await self._workflow_engine.process_decision(task) + decision_result = await workflow_engine.process_decision(task) + + # Clean up completed workflows from cache to prevent memory leaks + # Use getattr with default False to handle mocked engines in tests + if getattr(workflow_engine, '_is_workflow_complete', False): + with self._cache_lock: + self._workflow_engines.pop(cache_key, None) + logger.debug( + "Removed completed workflow from cache", + extra={ + "workflow_id": workflow_id, + "run_id": run_id, + "cache_size": len(self._workflow_engines) + } + ) # Respond with the decisions await self._respond_decision_task_completed(task, decision_result) diff --git a/cadence/worker/_worker.py b/cadence/worker/_worker.py index 9eb39fb..ff273ad 100644 --- a/cadence/worker/_worker.py +++ b/cadence/worker/_worker.py @@ -5,7 +5,7 @@ from cadence.client import Client from cadence.worker._registry import Registry from cadence.worker._activity import ActivityWorker -from cadence.worker._decision_worker import DecisionWorker +from cadence.worker._decision import DecisionWorker from cadence.worker._types import WorkerOptions, _DEFAULT_WORKER_OPTIONS diff --git a/tests/cadence/worker/test_decision_task_handler.py b/tests/cadence/worker/test_decision_task_handler.py index 44e440e..57e0175 100644 --- a/tests/cadence/worker/test_decision_task_handler.py +++ b/tests/cadence/worker/test_decision_task_handler.py @@ -135,8 +135,8 @@ async def test_handle_task_implementation_workflow_not_found(self, handler, samp await handler._handle_task_implementation(sample_decision_task) @pytest.mark.asyncio - async def test_handle_task_implementation_creates_new_engines(self, handler, sample_decision_task, mock_registry): - """Test that decision task handler creates new workflow engines for each task.""" + async def test_handle_task_implementation_caches_engines(self, handler, sample_decision_task, mock_registry): + """Test that decision task handler caches workflow engines for same workflow execution.""" # Mock workflow function mock_workflow_func = Mock() mock_registry.get_workflow.return_value = mock_workflow_func @@ -151,14 +151,60 @@ async def test_handle_task_implementation_creates_new_engines(self, handler, sam # First call - should create new engine await handler._handle_task_implementation(sample_decision_task) - # Second call - should create another new engine + # Second call with same workflow_id and run_id - should reuse cached engine await handler._handle_task_implementation(sample_decision_task) + # Registry should be called for each task (to get workflow function) + assert mock_registry.get_workflow.call_count == 2 + + # Engine should be created only once (cached for second call) + assert mock_engine_class.call_count == 1 + + # But process_decision should be called twice + assert mock_engine.process_decision.call_count == 2 + + @pytest.mark.asyncio + async def test_handle_task_implementation_different_executions_get_separate_engines(self, handler, mock_registry): + """Test that different workflow executions get separate engines.""" + # Mock workflow function + mock_workflow_func = Mock() + mock_registry.get_workflow.return_value = mock_workflow_func + + # Create two different decision tasks + task1 = Mock(spec=PollForDecisionTaskResponse) + task1.task_token = b"test_task_token_1" + task1.workflow_execution = Mock() + task1.workflow_execution.workflow_id = "workflow_1" + task1.workflow_execution.run_id = "run_1" + task1.workflow_type = Mock() + task1.workflow_type.name = "TestWorkflow" + + task2 = Mock(spec=PollForDecisionTaskResponse) + task2.task_token = b"test_task_token_2" + task2.workflow_execution = Mock() + task2.workflow_execution.workflow_id = "workflow_2" # Different workflow + task2.workflow_execution.run_id = "run_2" # Different run + task2.workflow_type = Mock() + task2.workflow_type.name = "TestWorkflow" + + # Mock workflow engine + mock_engine = Mock(spec=WorkflowEngine) + mock_decision_result = Mock(spec=DecisionResult) + mock_decision_result.decisions = [] + mock_engine.process_decision = AsyncMock(return_value=mock_decision_result) + + with patch('cadence.worker._decision_task_handler.WorkflowEngine', return_value=mock_engine) as mock_engine_class: + # Process different workflow executions + await handler._handle_task_implementation(task1) + await handler._handle_task_implementation(task2) + # Registry should be called for each task assert mock_registry.get_workflow.call_count == 2 - # Engine should be created twice and called twice + # Engine should be created twice (different executions) assert mock_engine_class.call_count == 2 + + # Process_decision should be called twice assert mock_engine.process_decision.call_count == 2 @pytest.mark.asyncio diff --git a/tests/cadence/worker/test_decision_worker_integration.py b/tests/cadence/worker/test_decision_worker_integration.py index d38ebe4..85c55d2 100644 --- a/tests/cadence/worker/test_decision_worker_integration.py +++ b/tests/cadence/worker/test_decision_worker_integration.py @@ -9,7 +9,7 @@ from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse from cadence.api.v1.common_pb2 import Payload, WorkflowExecution, WorkflowType from cadence.api.v1.history_pb2 import History, HistoryEvent, WorkflowExecutionStartedEventAttributes -from cadence.worker._decision_worker import DecisionWorker +from cadence.worker._decision import DecisionWorker from cadence.worker._registry import Registry from cadence.client import Client From c6b1b93f7f8c20643b4e3ab3cfc2d04eef00db16 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Thu, 2 Oct 2025 14:23:41 -0700 Subject: [PATCH 10/12] respond to comment 2 Signed-off-by: Tim Li --- cadence/_internal/workflow/workflow_engine.py | 14 +++---- cadence/worker/_decision_task_handler.py | 15 ++++---- .../worker/test_decision_task_handler.py | 7 ++++ .../worker/test_task_handler_integration.py | 37 +++++++++++++++---- 4 files changed, 51 insertions(+), 22 deletions(-) diff --git a/cadence/_internal/workflow/workflow_engine.py b/cadence/_internal/workflow/workflow_engine.py index f1fb0d6..2456cc1 100644 --- a/cadence/_internal/workflow/workflow_engine.py +++ b/cadence/_internal/workflow/workflow_engine.py @@ -48,8 +48,8 @@ async def process_decision(self, decision_task: PollForDecisionTaskResponse) -> "workflow_type": self._context.info().workflow_type, "workflow_id": self._context.info().workflow_id, "run_id": self._context.info().workflow_run_id, - "started_event_id": getattr(decision_task, 'started_event_id', None), - "attempt": getattr(decision_task, 'attempt', None) + "started_event_id": decision_task.started_event_id, + "attempt": decision_task.attempt } ) @@ -74,7 +74,7 @@ async def process_decision(self, decision_task: PollForDecisionTaskResponse) -> "workflow_type": self._context.info().workflow_type, "workflow_id": self._context.info().workflow_id, "run_id": self._context.info().workflow_run_id, - "started_event_id": getattr(decision_task, 'started_event_id', None), + "started_event_id": decision_task.started_event_id, "decisions_count": len(decisions), "replay_mode": self._context.is_replay_mode() } @@ -90,14 +90,14 @@ async def process_decision(self, decision_task: PollForDecisionTaskResponse) -> "workflow_type": self._context.info().workflow_type, "workflow_id": self._context.info().workflow_id, "run_id": self._context.info().workflow_run_id, - "started_event_id": getattr(decision_task, 'started_event_id', None), - "attempt": getattr(decision_task, 'attempt', None), + "started_event_id": decision_task.started_event_id, + "attempt": decision_task.attempt, "error_type": type(e).__name__ }, exc_info=True ) - # Return empty decisions on error - the task will be failed by the handler - return DecisionResult(decisions=[]) + # Re-raise the exception so the handler can properly handle the failure + raise async def _process_decision_events(self, events_iterator: DecisionEventsIterator, decision_task: PollForDecisionTaskResponse) -> None: """ diff --git a/cadence/worker/_decision_task_handler.py b/cadence/worker/_decision_task_handler.py index cca14b3..98ce338 100644 --- a/cadence/worker/_decision_task_handler.py +++ b/cadence/worker/_decision_task_handler.py @@ -69,8 +69,8 @@ async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) - "workflow_type": workflow_type_name, "workflow_id": workflow_id, "run_id": run_id, - "started_event_id": getattr(task, 'started_event_id', None), - "attempt": getattr(task, 'attempt', None), + "started_event_id": task.started_event_id, + "attempt": task.attempt, "task_token": task.task_token[:16].hex() if task.task_token else None # Log partial token for debugging } ) @@ -134,7 +134,7 @@ async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) - "workflow_type": workflow_type_name, "workflow_id": workflow_id, "run_id": run_id, - "started_event_id": getattr(task, 'started_event_id', None) + "started_event_id": task.started_event_id } ) @@ -159,8 +159,8 @@ async def handle_task_failure(self, task: PollForDecisionTaskResponse, error: Ex "workflow_type": workflow_type, "workflow_id": workflow_id, "run_id": run_id, - "started_event_id": getattr(task, 'started_event_id', None), - "attempt": getattr(task, 'attempt', None), + "started_event_id": task.started_event_id, + "attempt": task.attempt, "error_type": type(error).__name__, "error_message": str(error) }, @@ -209,7 +209,6 @@ async def handle_task_failure(self, task: PollForDecisionTaskResponse, error: Ex }, exc_info=True ) - async def _respond_decision_task_completed(self, task: PollForDecisionTaskResponse, decision_result: DecisionResult) -> None: """ @@ -237,7 +236,7 @@ async def _respond_decision_task_completed(self, task: PollForDecisionTaskRespon "workflow_type": task.workflow_type.name if task.workflow_type else "unknown", "workflow_id": workflow_execution.workflow_id if workflow_execution else "unknown", "run_id": workflow_execution.run_id if workflow_execution else "unknown", - "started_event_id": getattr(task, 'started_event_id', None), + "started_event_id": task.started_event_id, "decisions_count": len(decision_result.decisions), "return_new_decision_task": True, "task_token": task.task_token[:16].hex() if task.task_token else None @@ -252,7 +251,7 @@ async def _respond_decision_task_completed(self, task: PollForDecisionTaskRespon "workflow_type": task.workflow_type.name if task.workflow_type else "unknown", "workflow_id": workflow_execution.workflow_id if workflow_execution else "unknown", "run_id": workflow_execution.run_id if workflow_execution else "unknown", - "started_event_id": getattr(task, 'started_event_id', None), + "started_event_id": task.started_event_id, "decisions_count": len(decision_result.decisions), "error_type": type(e).__name__ }, diff --git a/tests/cadence/worker/test_decision_task_handler.py b/tests/cadence/worker/test_decision_task_handler.py index 57e0175..6187499 100644 --- a/tests/cadence/worker/test_decision_task_handler.py +++ b/tests/cadence/worker/test_decision_task_handler.py @@ -58,6 +58,9 @@ def sample_decision_task(self): task.workflow_execution.run_id = "test_run_id" task.workflow_type = Mock() task.workflow_type.name = "TestWorkflow" + # Add the missing attributes that are now accessed directly + task.started_event_id = 1 + task.attempt = 1 return task def test_initialization(self, mock_client, mock_registry): @@ -178,6 +181,8 @@ async def test_handle_task_implementation_different_executions_get_separate_engi task1.workflow_execution.run_id = "run_1" task1.workflow_type = Mock() task1.workflow_type.name = "TestWorkflow" + task1.started_event_id = 1 + task1.attempt = 1 task2 = Mock(spec=PollForDecisionTaskResponse) task2.task_token = b"test_task_token_2" @@ -186,6 +191,8 @@ async def test_handle_task_implementation_different_executions_get_separate_engi task2.workflow_execution.run_id = "run_2" # Different run task2.workflow_type = Mock() task2.workflow_type.name = "TestWorkflow" + task2.started_event_id = 2 + task2.attempt = 1 # Mock workflow engine mock_engine = Mock(spec=WorkflowEngine) diff --git a/tests/cadence/worker/test_task_handler_integration.py b/tests/cadence/worker/test_task_handler_integration.py index 64d877f..6202813 100644 --- a/tests/cadence/worker/test_task_handler_integration.py +++ b/tests/cadence/worker/test_task_handler_integration.py @@ -53,13 +53,18 @@ def sample_decision_task(self): task.workflow_execution.run_id = "test_run_id" task.workflow_type = Mock() task.workflow_type.name = "TestWorkflow" + # Add the missing attributes that are now accessed directly + task.started_event_id = 1 + task.attempt = 1 return task @pytest.mark.asyncio async def test_full_task_handling_flow_success(self, handler, sample_decision_task, mock_registry): """Test the complete task handling flow from base handler through decision handler.""" # Mock workflow function - mock_workflow_func = Mock() + def mock_workflow_func(input_data): + return f"processed: {input_data}" + mock_registry.get_workflow.return_value = mock_workflow_func # Mock workflow engine @@ -81,7 +86,9 @@ async def test_full_task_handling_flow_success(self, handler, sample_decision_ta async def test_full_task_handling_flow_with_error(self, handler, sample_decision_task, mock_registry): """Test the complete task handling flow when an error occurs.""" # Mock workflow function - mock_workflow_func = Mock() + def mock_workflow_func(input_data): + return f"processed: {input_data}" + mock_registry.get_workflow.return_value = mock_workflow_func # Mock workflow engine to raise an error @@ -102,7 +109,9 @@ async def test_full_task_handling_flow_with_error(self, handler, sample_decision async def test_context_activation_integration(self, handler, sample_decision_task, mock_registry): """Test that context activation works correctly in the integration.""" # Mock workflow function - mock_workflow_func = Mock() + def mock_workflow_func(input_data): + return f"processed: {input_data}" + mock_registry.get_workflow.return_value = mock_workflow_func # Mock workflow engine @@ -133,7 +142,9 @@ def track_context_activation(): async def test_multiple_workflow_executions(self, handler, mock_registry): """Test handling multiple workflow executions creates new engines for each.""" # Mock workflow function - mock_workflow_func = Mock() + def mock_workflow_func(input_data): + return f"processed: {input_data}" + mock_registry.get_workflow.return_value = mock_workflow_func # Create multiple decision tasks for different workflows @@ -144,6 +155,8 @@ async def test_multiple_workflow_executions(self, handler, mock_registry): task1.workflow_execution.run_id = "run1" task1.workflow_type = Mock() task1.workflow_type.name = "TestWorkflow" + task1.started_event_id = 1 + task1.attempt = 1 task2 = Mock(spec=PollForDecisionTaskResponse) task2.task_token = b"task2_token" @@ -152,6 +165,8 @@ async def test_multiple_workflow_executions(self, handler, mock_registry): task2.workflow_execution.run_id = "run2" task2.workflow_type = Mock() task2.workflow_type.name = "TestWorkflow" + task2.started_event_id = 2 + task2.attempt = 1 # Mock workflow engine mock_engine = Mock(spec=WorkflowEngine) @@ -176,7 +191,9 @@ async def test_multiple_workflow_executions(self, handler, mock_registry): async def test_workflow_engine_creation_integration(self, handler, sample_decision_task, mock_registry): """Test workflow engine creation integration.""" # Mock workflow function - mock_workflow_func = Mock() + def mock_workflow_func(input_data): + return f"processed: {input_data}" + mock_registry.get_workflow.return_value = mock_workflow_func # Mock workflow engine @@ -197,7 +214,9 @@ async def test_workflow_engine_creation_integration(self, handler, sample_decisi async def test_error_handling_with_context_cleanup(self, handler, sample_decision_task, mock_registry): """Test that context cleanup happens even when errors occur.""" # Mock workflow function - mock_workflow_func = Mock() + def mock_workflow_func(input_data): + return f"processed: {input_data}" + mock_registry.get_workflow.return_value = mock_workflow_func # Mock workflow engine to raise an error @@ -231,7 +250,9 @@ async def test_concurrent_task_handling(self, handler, mock_registry): import asyncio # Mock workflow function - mock_workflow_func = Mock() + def mock_workflow_func(input_data): + return f"processed: {input_data}" + mock_registry.get_workflow.return_value = mock_workflow_func # Create multiple tasks @@ -244,6 +265,8 @@ async def test_concurrent_task_handling(self, handler, mock_registry): task.workflow_execution.run_id = f"run{i}" task.workflow_type = Mock() task.workflow_type.name = "TestWorkflow" + task.started_event_id = i + 1 + task.attempt = 1 tasks.append(task) # Mock workflow engine From de5583209572c8a85224ffd3d391e99858a3258b Mon Sep 17 00:00:00 2001 From: Tim Li Date: Fri, 3 Oct 2025 09:50:56 -0700 Subject: [PATCH 11/12] fix comment Signed-off-by: Tim Li --- cadence/_internal/workflow/decision_events_iterator.py | 8 ++++---- cadence/worker/_decision_task_handler.py | 3 +-- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/cadence/_internal/workflow/decision_events_iterator.py b/cadence/_internal/workflow/decision_events_iterator.py index 0758588..cb0020b 100644 --- a/cadence/_internal/workflow/decision_events_iterator.py +++ b/cadence/_internal/workflow/decision_events_iterator.py @@ -157,10 +157,10 @@ async def next_decision_events(self) -> DecisionEvents: decision_events.events.append(decision_task_started) # Update replay time if available - if hasattr(decision_task_started, 'event_time') and decision_task_started.event_time: - self._replay_current_time_milliseconds = getattr( - decision_task_started.event_time, 'seconds', 0 - ) * 1000 + if decision_task_started.event_time: + self._replay_current_time_milliseconds = ( + decision_task_started.event_time.seconds * 1000 + ) decision_events.replay_current_time_milliseconds = self._replay_current_time_milliseconds # Process subsequent events until we find the corresponding DecisionTask completion diff --git a/cadence/worker/_decision_task_handler.py b/cadence/worker/_decision_task_handler.py index 98ce338..d35ee66 100644 --- a/cadence/worker/_decision_task_handler.py +++ b/cadence/worker/_decision_task_handler.py @@ -112,8 +112,7 @@ async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) - decision_result = await workflow_engine.process_decision(task) # Clean up completed workflows from cache to prevent memory leaks - # Use getattr with default False to handle mocked engines in tests - if getattr(workflow_engine, '_is_workflow_complete', False): + if workflow_engine._is_workflow_complete: with self._cache_lock: self._workflow_engines.pop(cache_key, None) logger.debug( From 1ac57c52bf5186aea870bd00b694e8b636c2b58d Mon Sep 17 00:00:00 2001 From: Tim Li Date: Fri, 3 Oct 2025 10:43:47 -0700 Subject: [PATCH 12/12] fix test Signed-off-by: Tim Li --- tests/cadence/worker/test_decision_task_handler.py | 5 +++++ tests/cadence/worker/test_task_handler_integration.py | 7 +++++++ 2 files changed, 12 insertions(+) diff --git a/tests/cadence/worker/test_decision_task_handler.py b/tests/cadence/worker/test_decision_task_handler.py index 6187499..cd2b210 100644 --- a/tests/cadence/worker/test_decision_task_handler.py +++ b/tests/cadence/worker/test_decision_task_handler.py @@ -88,6 +88,8 @@ async def test_handle_task_implementation_success(self, handler, sample_decision # Mock workflow engine mock_engine = Mock(spec=WorkflowEngine) + mock_engine._is_workflow_complete = False # Add missing attribute + mock_engine._is_workflow_complete = False # Add missing attribute mock_decision_result = Mock(spec=DecisionResult) mock_decision_result.decisions = [Decision()] mock_engine.process_decision = AsyncMock(return_value=mock_decision_result) @@ -146,6 +148,7 @@ async def test_handle_task_implementation_caches_engines(self, handler, sample_d # Mock workflow engine mock_engine = Mock(spec=WorkflowEngine) + mock_engine._is_workflow_complete = False # Add missing attribute mock_decision_result = Mock(spec=DecisionResult) mock_decision_result.decisions = [] mock_engine.process_decision = AsyncMock(return_value=mock_decision_result) @@ -196,6 +199,7 @@ async def test_handle_task_implementation_different_executions_get_separate_engi # Mock workflow engine mock_engine = Mock(spec=WorkflowEngine) + mock_engine._is_workflow_complete = False # Add missing attribute mock_decision_result = Mock(spec=DecisionResult) mock_decision_result.decisions = [] mock_engine.process_decision = AsyncMock(return_value=mock_decision_result) @@ -323,6 +327,7 @@ async def test_workflow_engine_creation_with_workflow_info(self, handler, sample mock_registry.get_workflow.return_value = mock_workflow_func mock_engine = Mock(spec=WorkflowEngine) + mock_engine._is_workflow_complete = False # Add missing attribute mock_decision_result = Mock(spec=DecisionResult) mock_decision_result.decisions = [] mock_engine.process_decision = AsyncMock(return_value=mock_decision_result) diff --git a/tests/cadence/worker/test_task_handler_integration.py b/tests/cadence/worker/test_task_handler_integration.py index 6202813..8e6aef9 100644 --- a/tests/cadence/worker/test_task_handler_integration.py +++ b/tests/cadence/worker/test_task_handler_integration.py @@ -69,6 +69,7 @@ def mock_workflow_func(input_data): # Mock workflow engine mock_engine = Mock(spec=WorkflowEngine) + mock_engine._is_workflow_complete = False # Add missing attribute mock_decision_result = Mock(spec=DecisionResult) mock_decision_result.decisions = [] mock_engine.process_decision = AsyncMock(return_value=mock_decision_result) @@ -93,6 +94,7 @@ def mock_workflow_func(input_data): # Mock workflow engine to raise an error mock_engine = Mock(spec=WorkflowEngine) + mock_engine._is_workflow_complete = False # Add missing attribute mock_engine.process_decision = AsyncMock(side_effect=RuntimeError("Workflow processing failed")) with patch('cadence.worker._decision_task_handler.WorkflowEngine', return_value=mock_engine): @@ -116,6 +118,7 @@ def mock_workflow_func(input_data): # Mock workflow engine mock_engine = Mock(spec=WorkflowEngine) + mock_engine._is_workflow_complete = False # Add missing attribute mock_decision_result = Mock(spec=DecisionResult) mock_decision_result.decisions = [] mock_engine.process_decision = AsyncMock(return_value=mock_decision_result) @@ -170,6 +173,7 @@ def mock_workflow_func(input_data): # Mock workflow engine mock_engine = Mock(spec=WorkflowEngine) + mock_engine._is_workflow_complete = False # Add missing attribute mock_decision_result = Mock(spec=DecisionResult) mock_decision_result.decisions = [] @@ -198,6 +202,7 @@ def mock_workflow_func(input_data): # Mock workflow engine mock_engine = Mock(spec=WorkflowEngine) + mock_engine._is_workflow_complete = False # Add missing attribute mock_decision_result = Mock(spec=DecisionResult) mock_decision_result.decisions = [] mock_engine.process_decision = AsyncMock(return_value=mock_decision_result) @@ -221,6 +226,7 @@ def mock_workflow_func(input_data): # Mock workflow engine to raise an error mock_engine = Mock(spec=WorkflowEngine) + mock_engine._is_workflow_complete = False # Add missing attribute mock_engine.process_decision = AsyncMock(side_effect=RuntimeError("Workflow processing failed")) # Track context cleanup @@ -271,6 +277,7 @@ def mock_workflow_func(input_data): # Mock workflow engine mock_engine = Mock(spec=WorkflowEngine) + mock_engine._is_workflow_complete = False # Add missing attribute mock_decision_result = Mock(spec=DecisionResult) mock_decision_result.decisions = [] mock_engine.process_decision = AsyncMock(return_value=mock_decision_result)