Skip to content

Commit c6b1b93

Browse files
committed
respond to comment 2
Signed-off-by: Tim Li <[email protected]>
1 parent 01fa7a3 commit c6b1b93

File tree

4 files changed

+51
-22
lines changed

4 files changed

+51
-22
lines changed

cadence/_internal/workflow/workflow_engine.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ async def process_decision(self, decision_task: PollForDecisionTaskResponse) ->
4848
"workflow_type": self._context.info().workflow_type,
4949
"workflow_id": self._context.info().workflow_id,
5050
"run_id": self._context.info().workflow_run_id,
51-
"started_event_id": getattr(decision_task, 'started_event_id', None),
52-
"attempt": getattr(decision_task, 'attempt', None)
51+
"started_event_id": decision_task.started_event_id,
52+
"attempt": decision_task.attempt
5353
}
5454
)
5555

@@ -74,7 +74,7 @@ async def process_decision(self, decision_task: PollForDecisionTaskResponse) ->
7474
"workflow_type": self._context.info().workflow_type,
7575
"workflow_id": self._context.info().workflow_id,
7676
"run_id": self._context.info().workflow_run_id,
77-
"started_event_id": getattr(decision_task, 'started_event_id', None),
77+
"started_event_id": decision_task.started_event_id,
7878
"decisions_count": len(decisions),
7979
"replay_mode": self._context.is_replay_mode()
8080
}
@@ -90,14 +90,14 @@ async def process_decision(self, decision_task: PollForDecisionTaskResponse) ->
9090
"workflow_type": self._context.info().workflow_type,
9191
"workflow_id": self._context.info().workflow_id,
9292
"run_id": self._context.info().workflow_run_id,
93-
"started_event_id": getattr(decision_task, 'started_event_id', None),
94-
"attempt": getattr(decision_task, 'attempt', None),
93+
"started_event_id": decision_task.started_event_id,
94+
"attempt": decision_task.attempt,
9595
"error_type": type(e).__name__
9696
},
9797
exc_info=True
9898
)
99-
# Return empty decisions on error - the task will be failed by the handler
100-
return DecisionResult(decisions=[])
99+
# Re-raise the exception so the handler can properly handle the failure
100+
raise
101101

102102
async def _process_decision_events(self, events_iterator: DecisionEventsIterator, decision_task: PollForDecisionTaskResponse) -> None:
103103
"""

cadence/worker/_decision_task_handler.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) -
6969
"workflow_type": workflow_type_name,
7070
"workflow_id": workflow_id,
7171
"run_id": run_id,
72-
"started_event_id": getattr(task, 'started_event_id', None),
73-
"attempt": getattr(task, 'attempt', None),
72+
"started_event_id": task.started_event_id,
73+
"attempt": task.attempt,
7474
"task_token": task.task_token[:16].hex() if task.task_token else None # Log partial token for debugging
7575
}
7676
)
@@ -134,7 +134,7 @@ async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) -
134134
"workflow_type": workflow_type_name,
135135
"workflow_id": workflow_id,
136136
"run_id": run_id,
137-
"started_event_id": getattr(task, 'started_event_id', None)
137+
"started_event_id": task.started_event_id
138138
}
139139
)
140140

@@ -159,8 +159,8 @@ async def handle_task_failure(self, task: PollForDecisionTaskResponse, error: Ex
159159
"workflow_type": workflow_type,
160160
"workflow_id": workflow_id,
161161
"run_id": run_id,
162-
"started_event_id": getattr(task, 'started_event_id', None),
163-
"attempt": getattr(task, 'attempt', None),
162+
"started_event_id": task.started_event_id,
163+
"attempt": task.attempt,
164164
"error_type": type(error).__name__,
165165
"error_message": str(error)
166166
},
@@ -209,7 +209,6 @@ async def handle_task_failure(self, task: PollForDecisionTaskResponse, error: Ex
209209
},
210210
exc_info=True
211211
)
212-
213212

214213
async def _respond_decision_task_completed(self, task: PollForDecisionTaskResponse, decision_result: DecisionResult) -> None:
215214
"""
@@ -237,7 +236,7 @@ async def _respond_decision_task_completed(self, task: PollForDecisionTaskRespon
237236
"workflow_type": task.workflow_type.name if task.workflow_type else "unknown",
238237
"workflow_id": workflow_execution.workflow_id if workflow_execution else "unknown",
239238
"run_id": workflow_execution.run_id if workflow_execution else "unknown",
240-
"started_event_id": getattr(task, 'started_event_id', None),
239+
"started_event_id": task.started_event_id,
241240
"decisions_count": len(decision_result.decisions),
242241
"return_new_decision_task": True,
243242
"task_token": task.task_token[:16].hex() if task.task_token else None
@@ -252,7 +251,7 @@ async def _respond_decision_task_completed(self, task: PollForDecisionTaskRespon
252251
"workflow_type": task.workflow_type.name if task.workflow_type else "unknown",
253252
"workflow_id": workflow_execution.workflow_id if workflow_execution else "unknown",
254253
"run_id": workflow_execution.run_id if workflow_execution else "unknown",
255-
"started_event_id": getattr(task, 'started_event_id', None),
254+
"started_event_id": task.started_event_id,
256255
"decisions_count": len(decision_result.decisions),
257256
"error_type": type(e).__name__
258257
},

tests/cadence/worker/test_decision_task_handler.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ def sample_decision_task(self):
5858
task.workflow_execution.run_id = "test_run_id"
5959
task.workflow_type = Mock()
6060
task.workflow_type.name = "TestWorkflow"
61+
# Add the missing attributes that are now accessed directly
62+
task.started_event_id = 1
63+
task.attempt = 1
6164
return task
6265

6366
def test_initialization(self, mock_client, mock_registry):
@@ -178,6 +181,8 @@ async def test_handle_task_implementation_different_executions_get_separate_engi
178181
task1.workflow_execution.run_id = "run_1"
179182
task1.workflow_type = Mock()
180183
task1.workflow_type.name = "TestWorkflow"
184+
task1.started_event_id = 1
185+
task1.attempt = 1
181186

182187
task2 = Mock(spec=PollForDecisionTaskResponse)
183188
task2.task_token = b"test_task_token_2"
@@ -186,6 +191,8 @@ async def test_handle_task_implementation_different_executions_get_separate_engi
186191
task2.workflow_execution.run_id = "run_2" # Different run
187192
task2.workflow_type = Mock()
188193
task2.workflow_type.name = "TestWorkflow"
194+
task2.started_event_id = 2
195+
task2.attempt = 1
189196

190197
# Mock workflow engine
191198
mock_engine = Mock(spec=WorkflowEngine)

tests/cadence/worker/test_task_handler_integration.py

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,18 @@ def sample_decision_task(self):
5353
task.workflow_execution.run_id = "test_run_id"
5454
task.workflow_type = Mock()
5555
task.workflow_type.name = "TestWorkflow"
56+
# Add the missing attributes that are now accessed directly
57+
task.started_event_id = 1
58+
task.attempt = 1
5659
return task
5760

5861
@pytest.mark.asyncio
5962
async def test_full_task_handling_flow_success(self, handler, sample_decision_task, mock_registry):
6063
"""Test the complete task handling flow from base handler through decision handler."""
6164
# Mock workflow function
62-
mock_workflow_func = Mock()
65+
def mock_workflow_func(input_data):
66+
return f"processed: {input_data}"
67+
6368
mock_registry.get_workflow.return_value = mock_workflow_func
6469

6570
# Mock workflow engine
@@ -81,7 +86,9 @@ async def test_full_task_handling_flow_success(self, handler, sample_decision_ta
8186
async def test_full_task_handling_flow_with_error(self, handler, sample_decision_task, mock_registry):
8287
"""Test the complete task handling flow when an error occurs."""
8388
# Mock workflow function
84-
mock_workflow_func = Mock()
89+
def mock_workflow_func(input_data):
90+
return f"processed: {input_data}"
91+
8592
mock_registry.get_workflow.return_value = mock_workflow_func
8693

8794
# Mock workflow engine to raise an error
@@ -102,7 +109,9 @@ async def test_full_task_handling_flow_with_error(self, handler, sample_decision
102109
async def test_context_activation_integration(self, handler, sample_decision_task, mock_registry):
103110
"""Test that context activation works correctly in the integration."""
104111
# Mock workflow function
105-
mock_workflow_func = Mock()
112+
def mock_workflow_func(input_data):
113+
return f"processed: {input_data}"
114+
106115
mock_registry.get_workflow.return_value = mock_workflow_func
107116

108117
# Mock workflow engine
@@ -133,7 +142,9 @@ def track_context_activation():
133142
async def test_multiple_workflow_executions(self, handler, mock_registry):
134143
"""Test handling multiple workflow executions creates new engines for each."""
135144
# Mock workflow function
136-
mock_workflow_func = Mock()
145+
def mock_workflow_func(input_data):
146+
return f"processed: {input_data}"
147+
137148
mock_registry.get_workflow.return_value = mock_workflow_func
138149

139150
# Create multiple decision tasks for different workflows
@@ -144,6 +155,8 @@ async def test_multiple_workflow_executions(self, handler, mock_registry):
144155
task1.workflow_execution.run_id = "run1"
145156
task1.workflow_type = Mock()
146157
task1.workflow_type.name = "TestWorkflow"
158+
task1.started_event_id = 1
159+
task1.attempt = 1
147160

148161
task2 = Mock(spec=PollForDecisionTaskResponse)
149162
task2.task_token = b"task2_token"
@@ -152,6 +165,8 @@ async def test_multiple_workflow_executions(self, handler, mock_registry):
152165
task2.workflow_execution.run_id = "run2"
153166
task2.workflow_type = Mock()
154167
task2.workflow_type.name = "TestWorkflow"
168+
task2.started_event_id = 2
169+
task2.attempt = 1
155170

156171
# Mock workflow engine
157172
mock_engine = Mock(spec=WorkflowEngine)
@@ -176,7 +191,9 @@ async def test_multiple_workflow_executions(self, handler, mock_registry):
176191
async def test_workflow_engine_creation_integration(self, handler, sample_decision_task, mock_registry):
177192
"""Test workflow engine creation integration."""
178193
# Mock workflow function
179-
mock_workflow_func = Mock()
194+
def mock_workflow_func(input_data):
195+
return f"processed: {input_data}"
196+
180197
mock_registry.get_workflow.return_value = mock_workflow_func
181198

182199
# Mock workflow engine
@@ -197,7 +214,9 @@ async def test_workflow_engine_creation_integration(self, handler, sample_decisi
197214
async def test_error_handling_with_context_cleanup(self, handler, sample_decision_task, mock_registry):
198215
"""Test that context cleanup happens even when errors occur."""
199216
# Mock workflow function
200-
mock_workflow_func = Mock()
217+
def mock_workflow_func(input_data):
218+
return f"processed: {input_data}"
219+
201220
mock_registry.get_workflow.return_value = mock_workflow_func
202221

203222
# Mock workflow engine to raise an error
@@ -231,7 +250,9 @@ async def test_concurrent_task_handling(self, handler, mock_registry):
231250
import asyncio
232251

233252
# Mock workflow function
234-
mock_workflow_func = Mock()
253+
def mock_workflow_func(input_data):
254+
return f"processed: {input_data}"
255+
235256
mock_registry.get_workflow.return_value = mock_workflow_func
236257

237258
# Create multiple tasks
@@ -244,6 +265,8 @@ async def test_concurrent_task_handling(self, handler, mock_registry):
244265
task.workflow_execution.run_id = f"run{i}"
245266
task.workflow_type = Mock()
246267
task.workflow_type.name = "TestWorkflow"
268+
task.started_event_id = i + 1
269+
task.attempt = 1
247270
tasks.append(task)
248271

249272
# Mock workflow engine

0 commit comments

Comments
 (0)