Skip to content
Merged
File renamed without changes.
43 changes: 34 additions & 9 deletions cadence/worker/_decision_task_handler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import logging
import threading
from typing import Dict, Tuple

from cadence.api.v1.common_pb2 import Payload
from cadence.api.v1.service_worker_pb2 import (
Expand All @@ -19,7 +21,8 @@ class DecisionTaskHandler(BaseTaskHandler[PollForDecisionTaskResponse]):
"""
Task handler for processing decision tasks.

This handler processes decision tasks and generates decisions using the workflow engine.
This handler processes decision tasks and generates decisions using workflow engines.
Uses a thread-safe cache to hold workflow engines for concurrent decision task handling.
"""

def __init__(self, client: Client, task_list: str, registry: Registry, identity: str = "unknown", **options):
Expand All @@ -35,7 +38,9 @@ def __init__(self, client: Client, task_list: str, registry: Registry, identity:
"""
super().__init__(client, task_list, identity, **options)
self._registry = registry
self._workflow_engine: WorkflowEngine
# Thread-safe cache to hold workflow engines keyed by (workflow_id, run_id)
self._workflow_engines: Dict[Tuple[str, str], WorkflowEngine] = {}
self._cache_lock = threading.RLock()


async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) -> None:
Expand Down Expand Up @@ -84,21 +89,41 @@ async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) -
)
raise KeyError(f"Workflow type '{workflow_type_name}' not found")

# Create workflow info and engine
# Create workflow info and get or create workflow engine from cache
workflow_info = WorkflowInfo(
workflow_type=workflow_type_name,
workflow_domain=self._client.domain,
workflow_id=workflow_id,
workflow_run_id=run_id
)

self._workflow_engine = WorkflowEngine(
info=workflow_info,
client=self._client,
workflow_func=workflow_func
)
# Use thread-safe cache to get or create workflow engine
cache_key = (workflow_id, run_id)
with self._cache_lock:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

current cache is unbounded. We need a LFU cache here. Maybe add a TODO for next PR

workflow_engine = self._workflow_engines.get(cache_key)
if workflow_engine is None:
workflow_engine = WorkflowEngine(
info=workflow_info,
client=self._client,
workflow_func=workflow_func
)
self._workflow_engines[cache_key] = workflow_engine

decision_result = await self._workflow_engine.process_decision(task)
decision_result = await workflow_engine.process_decision(task)

# Clean up completed workflows from cache to prevent memory leaks
# Use getattr with default False to handle mocked engines in tests
if getattr(workflow_engine, '_is_workflow_complete', False):
with self._cache_lock:
self._workflow_engines.pop(cache_key, None)
logger.debug(
"Removed completed workflow from cache",
extra={
"workflow_id": workflow_id,
"run_id": run_id,
"cache_size": len(self._workflow_engines)
}
)

# Respond with the decisions
await self._respond_decision_task_completed(task, decision_result)
Expand Down
2 changes: 1 addition & 1 deletion cadence/worker/_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from cadence.client import Client
from cadence.worker._registry import Registry
from cadence.worker._activity import ActivityWorker
from cadence.worker._decision_worker import DecisionWorker
from cadence.worker._decision import DecisionWorker
from cadence.worker._types import WorkerOptions, _DEFAULT_WORKER_OPTIONS


Expand Down
54 changes: 50 additions & 4 deletions tests/cadence/worker/test_decision_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ async def test_handle_task_implementation_workflow_not_found(self, handler, samp
await handler._handle_task_implementation(sample_decision_task)

@pytest.mark.asyncio
async def test_handle_task_implementation_creates_new_engines(self, handler, sample_decision_task, mock_registry):
"""Test that decision task handler creates new workflow engines for each task."""
async def test_handle_task_implementation_caches_engines(self, handler, sample_decision_task, mock_registry):
"""Test that decision task handler caches workflow engines for same workflow execution."""
# Mock workflow function
mock_workflow_func = Mock()
mock_registry.get_workflow.return_value = mock_workflow_func
Expand All @@ -151,14 +151,60 @@ async def test_handle_task_implementation_creates_new_engines(self, handler, sam
# First call - should create new engine
await handler._handle_task_implementation(sample_decision_task)

# Second call - should create another new engine
# Second call with same workflow_id and run_id - should reuse cached engine
await handler._handle_task_implementation(sample_decision_task)

# Registry should be called for each task (to get workflow function)
assert mock_registry.get_workflow.call_count == 2

# Engine should be created only once (cached for second call)
assert mock_engine_class.call_count == 1

# But process_decision should be called twice
assert mock_engine.process_decision.call_count == 2

@pytest.mark.asyncio
async def test_handle_task_implementation_different_executions_get_separate_engines(self, handler, mock_registry):
"""Test that different workflow executions get separate engines."""
# Mock workflow function
mock_workflow_func = Mock()
mock_registry.get_workflow.return_value = mock_workflow_func

# Create two different decision tasks
task1 = Mock(spec=PollForDecisionTaskResponse)
task1.task_token = b"test_task_token_1"
task1.workflow_execution = Mock()
task1.workflow_execution.workflow_id = "workflow_1"
task1.workflow_execution.run_id = "run_1"
task1.workflow_type = Mock()
task1.workflow_type.name = "TestWorkflow"

task2 = Mock(spec=PollForDecisionTaskResponse)
task2.task_token = b"test_task_token_2"
task2.workflow_execution = Mock()
task2.workflow_execution.workflow_id = "workflow_2" # Different workflow
task2.workflow_execution.run_id = "run_2" # Different run
task2.workflow_type = Mock()
task2.workflow_type.name = "TestWorkflow"

# Mock workflow engine
mock_engine = Mock(spec=WorkflowEngine)
mock_decision_result = Mock(spec=DecisionResult)
mock_decision_result.decisions = []
mock_engine.process_decision = AsyncMock(return_value=mock_decision_result)

with patch('cadence.worker._decision_task_handler.WorkflowEngine', return_value=mock_engine) as mock_engine_class:
# Process different workflow executions
await handler._handle_task_implementation(task1)
await handler._handle_task_implementation(task2)

# Registry should be called for each task
assert mock_registry.get_workflow.call_count == 2

# Engine should be created twice and called twice
# Engine should be created twice (different executions)
assert mock_engine_class.call_count == 2

# Process_decision should be called twice
assert mock_engine.process_decision.call_count == 2

@pytest.mark.asyncio
Expand Down
2 changes: 1 addition & 1 deletion tests/cadence/worker/test_decision_worker_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
from cadence.api.v1.common_pb2 import Payload, WorkflowExecution, WorkflowType
from cadence.api.v1.history_pb2 import History, HistoryEvent, WorkflowExecutionStartedEventAttributes
from cadence.worker._decision_worker import DecisionWorker
from cadence.worker._decision import DecisionWorker
from cadence.worker._registry import Registry
from cadence.client import Client

Expand Down