Skip to content

Commit c14ba05

Browse files
timl3136claude
andcommitted
Add workflow engine integration and fix test compatibility
Cherry-picked workflow engine improvements and added comprehensive integration tests for decision task handling. Updated test mocks and expectations to match the simplified workflow engine implementation that creates new engines per task rather than caching. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
1 parent aa34615 commit c14ba05

File tree

8 files changed

+1027
-6
lines changed

8 files changed

+1027
-6
lines changed
Lines changed: 169 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,188 @@
1+
import asyncio
2+
import logging
13
from dataclasses import dataclass
2-
from typing import Optional, Callable, Any
4+
from typing import Callable, Optional, Dict, Any
35

46
from cadence._internal.workflow.context import Context
57
from cadence.api.v1.decision_pb2 import Decision
68
from cadence.client import Client
79
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
810
from cadence.workflow import WorkflowInfo
11+
from cadence._internal.decision_state_machine import DecisionManager
12+
13+
logger = logging.getLogger(__name__)
914

1015

1116
@dataclass
1217
class DecisionResult:
1318
decisions: list[Decision]
19+
force_create_new_decision_task: bool = False
20+
query_results: Optional[Dict[str, Any]] = None
1421

1522
class WorkflowEngine:
16-
def __init__(self, info: WorkflowInfo, client: Client, workflow_func: Optional[Callable[..., Any]] = None):
23+
def __init__(self, info: WorkflowInfo, client: Client, workflow_func: Callable[[Any], Any] | None = None):
1724
self._context = Context(client, info)
1825
self._workflow_func = workflow_func
26+
self._decision_manager = DecisionManager()
27+
self._is_workflow_complete = False
1928

20-
# TODO: Implement this
2129
async def process_decision(self, decision_task: PollForDecisionTaskResponse) -> DecisionResult:
22-
with self._context._activate():
30+
"""
31+
Process a decision task and generate decisions.
32+
33+
Args:
34+
decision_task: The PollForDecisionTaskResponse from the service
35+
36+
Returns:
37+
DecisionResult containing the list of decisions
38+
"""
39+
try:
40+
logger.info(f"Processing decision task for workflow {self._context.info().workflow_id}")
41+
42+
# Process workflow history to update decision state machines
43+
if decision_task.history:
44+
self._process_workflow_history(decision_task.history)
45+
46+
# Execute workflow function to generate new decisions
47+
if not self._is_workflow_complete:
48+
await self._execute_workflow_function(decision_task)
49+
50+
# Collect all pending decisions from state machines
51+
decisions = self._decision_manager.collect_pending_decisions()
52+
53+
# Close decider's event loop
54+
self._close_event_loop()
55+
56+
logger.info(f"Generated {len(decisions)} decisions for workflow {self._context.info().workflow_id}")
57+
58+
return DecisionResult(decisions=decisions)
59+
60+
except Exception:
61+
logger.exception(f"Error processing decision task for workflow {self._context.info().workflow_id}")
62+
# Return empty decisions on error - the task will be failed by the handler
2363
return DecisionResult(decisions=[])
64+
65+
def _process_workflow_history(self, history) -> None:
66+
"""
67+
Process workflow history events to update decision state machines.
68+
69+
Args:
70+
history: The workflow history from the decision task
71+
"""
72+
if not history or not hasattr(history, 'events'):
73+
return
74+
75+
logger.debug(f"Processing {len(history.events)} history events")
76+
77+
for event in history.events:
78+
try:
79+
self._decision_manager.handle_history_event(event)
80+
except Exception as e:
81+
logger.warning(f"Error processing history event: {e}")
82+
83+
async def _execute_workflow_function(self, decision_task: PollForDecisionTaskResponse) -> None:
84+
"""
85+
Execute the workflow function to generate new decisions.
86+
87+
This blocks until the workflow schedules an activity or completes.
88+
89+
Args:
90+
decision_task: The decision task containing workflow context
91+
"""
92+
try:
93+
with self._context._activate():
94+
# Execute the workflow function
95+
# The workflow function should block until it schedules an activity
96+
workflow_func = self._workflow_func
97+
if workflow_func is None:
98+
logger.warning(f"No workflow function available for workflow {self._context.info().workflow_id}")
99+
return
100+
101+
# Extract workflow input from history
102+
workflow_input = await self._extract_workflow_input(decision_task)
103+
104+
# Execute workflow function
105+
result = self._execute_workflow_function_sync(workflow_func, workflow_input)
106+
107+
# Check if workflow is complete
108+
if result is not None:
109+
self._is_workflow_complete = True
110+
logger.info(f"Workflow {self._context.info().workflow_id} completed")
111+
112+
except Exception:
113+
logger.exception(f"Error executing workflow function for {self._context.info().workflow_id}")
114+
raise
115+
116+
async def _extract_workflow_input(self, decision_task: PollForDecisionTaskResponse) -> Any:
117+
"""
118+
Extract workflow input from the decision task history.
119+
120+
Args:
121+
decision_task: The decision task containing workflow history
122+
123+
Returns:
124+
The workflow input data, or None if not found
125+
"""
126+
if not decision_task.history or not hasattr(decision_task.history, 'events'):
127+
logger.warning("No history events found in decision task")
128+
return None
129+
130+
# Look for WorkflowExecutionStarted event
131+
for event in decision_task.history.events:
132+
if hasattr(event, 'workflow_execution_started_event_attributes'):
133+
started_attrs = event.workflow_execution_started_event_attributes
134+
if started_attrs and hasattr(started_attrs, 'input'):
135+
# Deserialize the input using the client's data converter
136+
try:
137+
# Use from_data method with a single type hint of None (no type conversion)
138+
input_data_list = await self._context.client().data_converter.from_data(started_attrs.input, [None])
139+
input_data = input_data_list[0] if input_data_list else None
140+
logger.debug(f"Extracted workflow input: {input_data}")
141+
return input_data
142+
except Exception as e:
143+
logger.warning(f"Failed to deserialize workflow input: {e}")
144+
return None
145+
146+
logger.warning("No WorkflowExecutionStarted event found in history")
147+
return None
148+
149+
def _execute_workflow_function_sync(self, workflow_func: Callable, workflow_input: Any) -> Any:
150+
"""
151+
Execute the workflow function synchronously.
152+
153+
Args:
154+
workflow_func: The workflow function to execute
155+
workflow_input: The input data for the workflow function
156+
157+
Returns:
158+
The result of the workflow function execution
159+
"""
160+
logger.debug(f"Executing workflow function with input: {workflow_input}")
161+
result = workflow_func(workflow_input)
162+
163+
# If the workflow function is async, we need to handle it properly
164+
if asyncio.iscoroutine(result):
165+
# Create a simple event loop for async workflow functions
166+
try:
167+
loop = asyncio.new_event_loop()
168+
asyncio.set_event_loop(loop)
169+
result = loop.run_until_complete(result)
170+
finally:
171+
loop.close()
172+
asyncio.set_event_loop(None)
173+
174+
return result
175+
176+
def _close_event_loop(self) -> None:
177+
"""
178+
Close the decider's event loop.
179+
"""
180+
try:
181+
# Get the current event loop
182+
loop = asyncio.get_event_loop()
183+
if loop.is_running():
184+
# Schedule the loop to stop
185+
loop.call_soon_threadsafe(loop.stop)
186+
logger.debug("Scheduled event loop to stop")
187+
except Exception as e:
188+
logger.warning(f"Error closing event loop: {e}")

cadence/worker/_decision.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ class DecisionWorker:
1414
def __init__(self, client: Client, task_list: str, registry: Registry, options: WorkerOptions) -> None:
1515
self._client = client
1616
self._task_list = task_list
17+
self._registry = registry
1718
self._identity = options["identity"]
1819
permits = asyncio.Semaphore(options["max_concurrent_decision_task_execution_size"])
1920
self._decision_handler = DecisionTaskHandler(client, task_list, registry, **options)

cadence/worker/_decision_task_handler.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
from typing import Dict, Any
23

34
from cadence.api.v1.common_pb2 import Payload
45
from cadence.api.v1.service_worker_pb2 import (

0 commit comments

Comments
 (0)