Skip to content

Commit 668f7c3

Browse files
committed
add base and decision task handler
1 parent 016cc54 commit 668f7c3

File tree

2 files changed

+268
-0
lines changed

2 files changed

+268
-0
lines changed
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import logging
2+
from abc import ABC, abstractmethod
3+
from typing import TypeVar, Generic
4+
5+
logger = logging.getLogger(__name__)
6+
7+
T = TypeVar('T')
8+
9+
class BaseTaskHandler(ABC, Generic[T]):
10+
"""
11+
Base task handler that provides common functionality for processing tasks.
12+
13+
This abstract class defines the interface and common behavior for task handlers
14+
that process different types of tasks (workflow decisions, activities, etc.).
15+
"""
16+
17+
def __init__(self, client, task_list: str, identity: str, **options):
18+
"""
19+
Initialize the base task handler.
20+
21+
Args:
22+
client: The Cadence client instance
23+
task_list: The task list name
24+
identity: Worker identity
25+
**options: Additional options for the handler
26+
"""
27+
self._client = client
28+
self._task_list = task_list
29+
self._identity = identity
30+
self._options = options
31+
32+
async def handle_task(self, task: T) -> None:
33+
"""
34+
Handle a single task.
35+
36+
This method provides the base implementation for task handling that includes:
37+
- Context propagation
38+
- Error handling
39+
- Cleanup
40+
41+
Args:
42+
task: The task to handle
43+
"""
44+
try:
45+
# Propagate context from task parameters
46+
await self._propagate_context(task)
47+
48+
# Handle the task
49+
await self._handle_task_implementation(task)
50+
51+
except Exception as e:
52+
logger.exception(f"Error handling task: {e}")
53+
await self.handle_task_failure(task, e)
54+
finally:
55+
# Clean up context
56+
await self._unset_current_context()
57+
58+
@abstractmethod
59+
async def _handle_task_implementation(self, task: T) -> None:
60+
"""
61+
Handle the actual task implementation.
62+
63+
Args:
64+
task: The task to handle
65+
"""
66+
pass
67+
68+
@abstractmethod
69+
async def handle_task_failure(self, task: T, error: Exception) -> None:
70+
"""
71+
Handle task processing failure.
72+
73+
Args:
74+
task: The task that failed
75+
error: The exception that occurred
76+
"""
77+
pass
78+
79+
async def _propagate_context(self, task: T) -> None:
80+
"""
81+
Propagate context from task parameters.
82+
83+
Args:
84+
task: The task containing context information
85+
"""
86+
# Default implementation - subclasses should override if needed
87+
pass
88+
89+
async def _unset_current_context(self) -> None:
90+
"""
91+
Unset the current context after task completion.
92+
"""
93+
# Default implementation - subclasses should override if needed
94+
pass
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
import logging
2+
from typing import Dict, Any
3+
4+
from cadence.api.v1.common_pb2 import Payload
5+
from cadence.api.v1.service_worker_pb2 import (
6+
PollForDecisionTaskResponse,
7+
RespondDecisionTaskCompletedRequest,
8+
RespondDecisionTaskFailedRequest
9+
)
10+
from cadence.api.v1.workflow_pb2 import DecisionTaskFailedCause
11+
from cadence.client import Client
12+
from cadence.worker._base_task_handler import BaseTaskHandler
13+
from cadence._internal.workflow.workflow_engine import WorkflowEngine, DecisionResult
14+
from cadence.workflow import WorkflowInfo
15+
from cadence.worker._registry import Registry
16+
17+
logger = logging.getLogger(__name__)
18+
19+
class DecisionTaskHandler(BaseTaskHandler[PollForDecisionTaskResponse]):
20+
"""
21+
Task handler for processing decision tasks.
22+
23+
This handler processes decision tasks and generates decisions using the workflow engine.
24+
"""
25+
26+
def __init__(self, client: Client, task_list: str, registry: Registry, identity: str = "unknown", **options):
27+
"""
28+
Initialize the decision task handler.
29+
30+
Args:
31+
client: The Cadence client instance
32+
task_list: The task list name
33+
registry: Registry containing workflow functions
34+
identity: The worker identity
35+
**options: Additional options for the handler
36+
"""
37+
super().__init__(client, task_list, identity, **options)
38+
self._registry = registry
39+
self._workflow_engines: Dict[str, WorkflowEngine] = {}
40+
41+
42+
async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) -> None:
43+
"""
44+
Handle a decision task implementation.
45+
46+
Args:
47+
task: The decision task to handle
48+
"""
49+
# Extract workflow execution info
50+
workflow_execution = task.workflow_execution
51+
workflow_type = task.workflow_type
52+
53+
if not workflow_execution or not workflow_type:
54+
logger.error("Decision task missing workflow execution or type")
55+
await self.handle_task_failure(task, ValueError("Missing workflow execution or type"))
56+
return
57+
58+
workflow_id = workflow_execution.workflow_id
59+
run_id = workflow_execution.run_id
60+
workflow_type_name = workflow_type.name
61+
62+
logger.info(f"Processing decision task for workflow {workflow_id} (type: {workflow_type_name})")
63+
64+
# Get or create workflow engine for this workflow execution
65+
engine_key = f"{workflow_id}:{run_id}"
66+
if engine_key not in self._workflow_engines:
67+
# Get the workflow function from registry
68+
try:
69+
workflow_func = self._registry.get_workflow(workflow_type_name)
70+
except KeyError:
71+
logger.error(f"Workflow type '{workflow_type_name}' not found in registry")
72+
await self.handle_task_failure(task, KeyError(f"Workflow type '{workflow_type_name}' not found"))
73+
return
74+
75+
# Create workflow info and engine
76+
workflow_info = WorkflowInfo(
77+
workflow_type=workflow_type_name,
78+
workflow_domain=self._client.domain,
79+
workflow_id=workflow_id,
80+
workflow_run_id=run_id
81+
)
82+
83+
self._workflow_engines[engine_key] = WorkflowEngine(
84+
info=workflow_info,
85+
client=self._client,
86+
workflow_func=workflow_func
87+
)
88+
89+
# Process the decision using the workflow engine
90+
workflow_engine = self._workflow_engines[engine_key]
91+
decision_result = await workflow_engine.process_decision(task)
92+
93+
# Respond with the decisions
94+
await self._respond_decision_task_completed(task, decision_result)
95+
96+
logger.info(f"Successfully processed decision task for workflow {workflow_id}")
97+
98+
async def handle_task_failure(self, task: PollForDecisionTaskResponse, error: Exception) -> None:
99+
"""
100+
Handle decision task processing failure.
101+
102+
Args:
103+
task: The task that failed
104+
error: The exception that occurred
105+
"""
106+
try:
107+
logger.error(f"Decision task failed: {error}")
108+
109+
# Determine the failure cause
110+
cause = DecisionTaskFailedCause.DECISION_TASK_FAILED_CAUSE_UNHANDLED_DECISION
111+
if isinstance(error, KeyError):
112+
cause = DecisionTaskFailedCause.DECISION_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE
113+
elif isinstance(error, ValueError):
114+
cause = DecisionTaskFailedCause.DECISION_TASK_FAILED_CAUSE_BAD_SCHEDULE_ACTIVITY_ATTRIBUTES
115+
116+
# Create error details
117+
error_message = str(error).encode('utf-8')
118+
details = Payload(data=error_message)
119+
120+
# Respond with failure
121+
await self._client.worker_stub.RespondDecisionTaskFailed(
122+
RespondDecisionTaskFailedRequest(
123+
task_token=task.task_token,
124+
cause=cause,
125+
identity=self._identity,
126+
details=details
127+
)
128+
)
129+
130+
logger.info("Decision task failure response sent")
131+
132+
except Exception:
133+
logger.exception("Error handling decision task failure")
134+
135+
async def _respond_decision_task_completed(self, task: PollForDecisionTaskResponse, decision_result: DecisionResult) -> None:
136+
"""
137+
Respond to the service that the decision task has been completed.
138+
139+
Args:
140+
task: The original decision task
141+
decision_result: The result containing decisions and query results
142+
"""
143+
try:
144+
request = RespondDecisionTaskCompletedRequest(
145+
task_token=task.task_token,
146+
decisions=decision_result.decisions,
147+
identity=self._identity,
148+
return_new_decision_task=decision_result.force_create_new_decision_task,
149+
force_create_new_decision_task=decision_result.force_create_new_decision_task
150+
)
151+
152+
# Add query results if present
153+
if decision_result.query_results:
154+
request.query_results.update(decision_result.query_results)
155+
156+
await self._client.worker_stub.RespondDecisionTaskCompleted(request)
157+
logger.debug(f"Decision task completed with {len(decision_result.decisions)} decisions")
158+
159+
except Exception:
160+
logger.exception("Error responding to decision task completion")
161+
raise
162+
163+
def cleanup_workflow_engine(self, workflow_id: str, run_id: str) -> None:
164+
"""
165+
Clean up a workflow engine when workflow execution is complete.
166+
167+
Args:
168+
workflow_id: The workflow ID
169+
run_id: The run ID
170+
"""
171+
engine_key = f"{workflow_id}:{run_id}"
172+
if engine_key in self._workflow_engines:
173+
del self._workflow_engines[engine_key]
174+
logger.debug(f"Cleaned up workflow engine for {workflow_id}:{run_id}")

0 commit comments

Comments
 (0)