Skip to content

Commit 7d6eaec

Browse files
committed
respond to comments
Signed-off-by: Tim Li <[email protected]>
1 parent 9f9c289 commit 7d6eaec

File tree

5 files changed

+65
-164
lines changed

5 files changed

+65
-164
lines changed

cadence/_internal/workflow/workflow_engine.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111
@dataclass
1212
class DecisionResult:
1313
decisions: list[Decision]
14-
force_create_new_decision_task: bool = False
15-
query_results: Optional[dict] = None
1614

1715
class WorkflowEngine:
1816
def __init__(self, info: WorkflowInfo, client: Client, workflow_func: Optional[Callable[..., Any]] = None):

cadence/worker/_decision_task_handler.py

Lines changed: 40 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def __init__(self, client: Client, task_list: str, registry: Registry, identity:
3636
"""
3737
super().__init__(client, task_list, identity, **options)
3838
self._registry = registry
39-
self._workflow_engines: Dict[str, WorkflowEngine] = {}
39+
self._workflow_engine: WorkflowEngine
4040

4141

4242
async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) -> None:
@@ -51,7 +51,7 @@ async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) -
5151
workflow_type = task.workflow_type
5252

5353
if not workflow_execution or not workflow_type:
54-
logger.error("Decision task missing workflow execution or type")
54+
logger.error("Decision task missing workflow execution or type. Task: %r", task)
5555
raise ValueError("Missing workflow execution or type")
5656

5757
workflow_id = workflow_execution.workflow_id
@@ -60,34 +60,27 @@ async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) -
6060

6161
logger.info(f"Processing decision task for workflow {workflow_id} (type: {workflow_type_name})")
6262

63-
# Get or create workflow engine for this workflow execution
64-
engine_key = f"{workflow_id}:{run_id}"
65-
if engine_key not in self._workflow_engines:
66-
# Get the workflow function from registry
67-
try:
68-
workflow_func = self._registry.get_workflow(workflow_type_name)
69-
except KeyError:
70-
logger.error(f"Workflow type '{workflow_type_name}' not found in registry")
71-
raise KeyError(f"Workflow type '{workflow_type_name}' not found")
72-
73-
# Create workflow info and engine
74-
workflow_info = WorkflowInfo(
75-
workflow_type=workflow_type_name,
76-
workflow_domain=self._client.domain,
77-
workflow_id=workflow_id,
78-
workflow_run_id=run_id
79-
)
80-
81-
self._workflow_engines[engine_key] = WorkflowEngine(
82-
info=workflow_info,
83-
client=self._client,
84-
workflow_func=workflow_func
85-
)
63+
try:
64+
workflow_func = self._registry.get_workflow(workflow_type_name)
65+
except KeyError:
66+
logger.error(f"Workflow type '{workflow_type_name}' not found in registry")
67+
raise KeyError(f"Workflow type '{workflow_type_name}' not found")
68+
69+
# Create workflow info and engine
70+
workflow_info = WorkflowInfo(
71+
workflow_type=workflow_type_name,
72+
workflow_domain=self._client.domain,
73+
workflow_id=workflow_id,
74+
workflow_run_id=run_id
75+
)
8676

87-
# Create workflow context and execute with it active
88-
workflow_engine = self._workflow_engines[engine_key]
77+
self._workflow_engine = WorkflowEngine(
78+
info=workflow_info,
79+
client=self._client,
80+
workflow_func=workflow_func
81+
)
8982

90-
decision_result = await workflow_engine.process_decision(task)
83+
decision_result = await self._workflow_engine.process_decision(task)
9184

9285
# Respond with the decisions
9386
await self._respond_decision_task_completed(task, decision_result)
@@ -102,21 +95,22 @@ async def handle_task_failure(self, task: PollForDecisionTaskResponse, error: Ex
10295
task: The task that failed
10396
error: The exception that occurred
10497
"""
98+
logger.error(f"Decision task failed: {error}")
99+
100+
# Determine the failure cause
101+
cause = DecisionTaskFailedCause.DECISION_TASK_FAILED_CAUSE_UNHANDLED_DECISION
102+
if isinstance(error, KeyError):
103+
cause = DecisionTaskFailedCause.DECISION_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE
104+
elif isinstance(error, ValueError):
105+
cause = DecisionTaskFailedCause.DECISION_TASK_FAILED_CAUSE_BAD_SCHEDULE_ACTIVITY_ATTRIBUTES
106+
107+
# Create error details
108+
# TODO: Use a data converter for error details serialization
109+
error_message = str(error).encode('utf-8')
110+
details = Payload(data=error_message)
111+
112+
# Respond with failure
105113
try:
106-
logger.error(f"Decision task failed: {error}")
107-
108-
# Determine the failure cause
109-
cause = DecisionTaskFailedCause.DECISION_TASK_FAILED_CAUSE_UNHANDLED_DECISION
110-
if isinstance(error, KeyError):
111-
cause = DecisionTaskFailedCause.DECISION_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE
112-
elif isinstance(error, ValueError):
113-
cause = DecisionTaskFailedCause.DECISION_TASK_FAILED_CAUSE_BAD_SCHEDULE_ACTIVITY_ATTRIBUTES
114-
115-
# Create error details
116-
error_message = str(error).encode('utf-8')
117-
details = Payload(data=error_message)
118-
119-
# Respond with failure
120114
await self._client.worker_stub.RespondDecisionTaskFailed(
121115
RespondDecisionTaskFailedRequest(
122116
task_token=task.task_token,
@@ -125,11 +119,10 @@ async def handle_task_failure(self, task: PollForDecisionTaskResponse, error: Ex
125119
details=details
126120
)
127121
)
128-
129122
logger.info("Decision task failure response sent")
130-
131123
except Exception:
132-
logger.exception("Error handling decision task failure")
124+
logger.exception("Error responding to decision task failure")
125+
133126

134127
async def _respond_decision_task_completed(self, task: PollForDecisionTaskResponse, decision_result: DecisionResult) -> None:
135128
"""
@@ -144,30 +137,13 @@ async def _respond_decision_task_completed(self, task: PollForDecisionTaskRespon
144137
task_token=task.task_token,
145138
decisions=decision_result.decisions,
146139
identity=self._identity,
147-
return_new_decision_task=decision_result.force_create_new_decision_task,
148-
force_create_new_decision_task=decision_result.force_create_new_decision_task
140+
return_new_decision_task=True,
141+
force_create_new_decision_task=False
149142
)
150143

151-
# Add query results if present
152-
if decision_result.query_results:
153-
request.query_results.update(decision_result.query_results)
154-
155144
await self._client.worker_stub.RespondDecisionTaskCompleted(request)
156145
logger.debug(f"Decision task completed with {len(decision_result.decisions)} decisions")
157146

158147
except Exception:
159148
logger.exception("Error responding to decision task completion")
160149
raise
161-
162-
def cleanup_workflow_engine(self, workflow_id: str, run_id: str) -> None:
163-
"""
164-
Clean up a workflow engine when workflow execution is complete.
165-
166-
Args:
167-
workflow_id: The workflow ID
168-
run_id: The run ID
169-
"""
170-
engine_key = f"{workflow_id}:{run_id}"
171-
if engine_key in self._workflow_engines:
172-
del self._workflow_engines[engine_key]
173-
logger.debug(f"Cleaned up workflow engine for {workflow_id}:{run_id}")

tests/cadence/_internal/test_decision_state_machine.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,3 @@ def test_manager_aggregates_and_routes():
439439
),
440440
)
441441
)
442-
443-
assert a.status is DecisionState.COMPLETED
444-
assert t.status is DecisionState.COMPLETED
445-
assert c.status is DecisionState.COMPLETED

tests/cadence/worker/test_decision_task_handler.py

Lines changed: 10 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,6 @@ def test_initialization(self, mock_client, mock_registry):
7575
assert handler._identity == "test_identity"
7676
assert handler._registry == mock_registry
7777
assert handler._options == {"option1": "value1"}
78-
assert isinstance(handler._workflow_engines, dict)
79-
assert len(handler._workflow_engines) == 0
8078

8179
@pytest.mark.asyncio
8280
async def test_handle_task_implementation_success(self, handler, sample_decision_task, mock_registry):
@@ -139,8 +137,8 @@ async def test_handle_task_implementation_workflow_not_found(self, handler, samp
139137
await handler._handle_task_implementation(sample_decision_task)
140138

141139
@pytest.mark.asyncio
142-
async def test_handle_task_implementation_reuses_existing_engine(self, handler, sample_decision_task, mock_registry):
143-
"""Test that decision task handler reuses existing workflow engine."""
140+
async def test_handle_task_implementation_creates_new_engine(self, handler, sample_decision_task, mock_registry):
141+
"""Test that decision task handler creates new workflow engine for each task."""
144142
# Mock workflow function
145143
mock_workflow_func = Mock()
146144
mock_registry.get_workflow.return_value = mock_workflow_func
@@ -153,23 +151,19 @@ async def test_handle_task_implementation_reuses_existing_engine(self, handler,
153151
mock_decision_result.query_results = {}
154152
mock_engine.process_decision = AsyncMock(return_value=mock_decision_result)
155153

156-
with patch('cadence.worker._decision_task_handler.WorkflowEngine', return_value=mock_engine):
154+
with patch('cadence.worker._decision_task_handler.WorkflowEngine', return_value=mock_engine) as mock_engine_class:
157155
# First call - should create new engine
158156
await handler._handle_task_implementation(sample_decision_task)
159157

160-
# Second call - should reuse existing engine
158+
# Second call - should create another new engine
161159
await handler._handle_task_implementation(sample_decision_task)
162160

163-
# Registry should only be called once
164-
mock_registry.get_workflow.assert_called_once_with("TestWorkflow")
161+
# Registry should be called for each task
162+
assert mock_registry.get_workflow.call_count == 2
165163

166-
# Engine should be called twice
164+
# Engine should be created twice and called twice
165+
assert mock_engine_class.call_count == 2
167166
assert mock_engine.process_decision.call_count == 2
168-
169-
# Should have one engine in the cache
170-
assert len(handler._workflow_engines) == 1
171-
engine_key = "test_workflow_id:test_run_id"
172-
assert engine_key in handler._workflow_engines
173167

174168
@pytest.mark.asyncio
175169
async def test_handle_task_failure_keyerror(self, handler, sample_decision_task):
@@ -237,8 +231,6 @@ async def test_respond_decision_task_completed_success(self, handler, sample_dec
237231
"""Test successful decision task completion response."""
238232
decision_result = Mock(spec=DecisionResult)
239233
decision_result.decisions = [Decision(), Decision()]
240-
decision_result.force_create_new_decision_task = True
241-
decision_result.query_results = None # Test without query results first
242234

243235
await handler._respond_decision_task_completed(sample_decision_task, decision_result)
244236

@@ -248,62 +240,33 @@ async def test_respond_decision_task_completed_success(self, handler, sample_dec
248240
assert call_args.task_token == sample_decision_task.task_token
249241
assert call_args.identity == handler._identity
250242
assert call_args.return_new_decision_task
251-
assert call_args.force_create_new_decision_task
243+
assert not call_args.force_create_new_decision_task
252244
assert len(call_args.decisions) == 2
253-
# query_results should not be set when None
254-
assert not hasattr(call_args, 'query_results') or len(call_args.query_results) == 0
255245

256246
@pytest.mark.asyncio
257247
async def test_respond_decision_task_completed_no_query_results(self, handler, sample_decision_task):
258248
"""Test decision task completion response without query results."""
259249
decision_result = Mock(spec=DecisionResult)
260250
decision_result.decisions = []
261-
decision_result.force_create_new_decision_task = False
262-
decision_result.query_results = None
263251

264252
await handler._respond_decision_task_completed(sample_decision_task, decision_result)
265253

266254
call_args = handler._client.worker_stub.RespondDecisionTaskCompleted.call_args[0][0]
267-
assert not call_args.return_new_decision_task
255+
assert call_args.return_new_decision_task
268256
assert not call_args.force_create_new_decision_task
269257
assert len(call_args.decisions) == 0
270-
# query_results should not be set when None
271-
assert not hasattr(call_args, 'query_results') or len(call_args.query_results) == 0
272258

273259
@pytest.mark.asyncio
274260
async def test_respond_decision_task_completed_error(self, handler, sample_decision_task):
275261
"""Test decision task completion response error handling."""
276262
decision_result = Mock(spec=DecisionResult)
277263
decision_result.decisions = []
278-
decision_result.force_create_new_decision_task = False
279-
decision_result.query_results = {}
280264

281265
handler._client.worker_stub.RespondDecisionTaskCompleted.side_effect = Exception("Respond failed")
282266

283267
with pytest.raises(Exception, match="Respond failed"):
284268
await handler._respond_decision_task_completed(sample_decision_task, decision_result)
285269

286-
def test_cleanup_workflow_engine(self, handler):
287-
"""Test workflow engine cleanup."""
288-
# Add some mock engines
289-
handler._workflow_engines["workflow1:run1"] = Mock()
290-
handler._workflow_engines["workflow2:run2"] = Mock()
291-
292-
# Clean up one engine
293-
handler.cleanup_workflow_engine("workflow1", "run1")
294-
295-
# Verify only one engine was removed
296-
assert len(handler._workflow_engines) == 1
297-
assert "workflow1:run1" not in handler._workflow_engines
298-
assert "workflow2:run2" in handler._workflow_engines
299-
300-
def test_cleanup_workflow_engine_not_found(self, handler):
301-
"""Test cleanup of non-existent workflow engine."""
302-
# Should not raise error
303-
handler.cleanup_workflow_engine("nonexistent", "run")
304-
305-
# Should not affect existing engines
306-
assert len(handler._workflow_engines) == 0
307270

308271
@pytest.mark.asyncio
309272
async def test_workflow_engine_creation_with_workflow_info(self, handler, sample_decision_task, mock_registry):

0 commit comments

Comments
 (0)