Skip to content

Commit 428f0ac

Browse files
committed
Add workflow engine integration and fix test compatibility
Signed-off-by: Tim Li <[email protected]>
1 parent aa34615 commit 428f0ac

File tree

8 files changed

+1027
-6
lines changed

8 files changed

+1027
-6
lines changed
Lines changed: 169 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,188 @@
1+
import asyncio
2+
import logging
13
from dataclasses import dataclass
2-
from typing import Optional, Callable, Any
4+
from typing import Callable, Optional, Dict, Any
35

46
from cadence._internal.workflow.context import Context
57
from cadence.api.v1.decision_pb2 import Decision
68
from cadence.client import Client
79
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
810
from cadence.workflow import WorkflowInfo
11+
from cadence._internal.decision_state_machine import DecisionManager
12+
13+
logger = logging.getLogger(__name__)
914

1015

1116
@dataclass
1217
class DecisionResult:
1318
decisions: list[Decision]
19+
force_create_new_decision_task: bool = False
20+
query_results: Optional[Dict[str, Any]] = None
1421

1522
class WorkflowEngine:
16-
def __init__(self, info: WorkflowInfo, client: Client, workflow_func: Optional[Callable[..., Any]] = None):
23+
def __init__(self, info: WorkflowInfo, client: Client, workflow_func: Callable[[Any], Any] | None = None):
1724
self._context = Context(client, info)
1825
self._workflow_func = workflow_func
26+
self._decision_manager = DecisionManager()
27+
self._is_workflow_complete = False
1928

20-
# TODO: Implement this
2129
async def process_decision(self, decision_task: PollForDecisionTaskResponse) -> DecisionResult:
22-
with self._context._activate():
30+
"""
31+
Process a decision task and generate decisions.
32+
33+
Args:
34+
decision_task: The PollForDecisionTaskResponse from the service
35+
36+
Returns:
37+
DecisionResult containing the list of decisions
38+
"""
39+
try:
40+
logger.info(f"Processing decision task for workflow {self._context.info().workflow_id}")
41+
42+
# Process workflow history to update decision state machines
43+
if decision_task.history:
44+
self._process_workflow_history(decision_task.history)
45+
46+
# Execute workflow function to generate new decisions
47+
if not self._is_workflow_complete:
48+
await self._execute_workflow_function(decision_task)
49+
50+
# Collect all pending decisions from state machines
51+
decisions = self._decision_manager.collect_pending_decisions()
52+
53+
# Close decider's event loop
54+
self._close_event_loop()
55+
56+
logger.info(f"Generated {len(decisions)} decisions for workflow {self._context.info().workflow_id}")
57+
58+
return DecisionResult(decisions=decisions)
59+
60+
except Exception:
61+
logger.exception(f"Error processing decision task for workflow {self._context.info().workflow_id}")
62+
# Return empty decisions on error - the task will be failed by the handler
2363
return DecisionResult(decisions=[])
64+
65+
def _process_workflow_history(self, history) -> None:
66+
"""
67+
Process workflow history events to update decision state machines.
68+
69+
Args:
70+
history: The workflow history from the decision task
71+
"""
72+
if not history or not hasattr(history, 'events'):
73+
return
74+
75+
logger.debug(f"Processing {len(history.events)} history events")
76+
77+
for event in history.events:
78+
try:
79+
self._decision_manager.handle_history_event(event)
80+
except Exception as e:
81+
logger.warning(f"Error processing history event: {e}")
82+
83+
async def _execute_workflow_function(self, decision_task: PollForDecisionTaskResponse) -> None:
84+
"""
85+
Execute the workflow function to generate new decisions.
86+
87+
This blocks until the workflow schedules an activity or completes.
88+
89+
Args:
90+
decision_task: The decision task containing workflow context
91+
"""
92+
try:
93+
with self._context._activate():
94+
# Execute the workflow function
95+
# The workflow function should block until it schedules an activity
96+
workflow_func = self._workflow_func
97+
if workflow_func is None:
98+
logger.warning(f"No workflow function available for workflow {self._context.info().workflow_id}")
99+
return
100+
101+
# Extract workflow input from history
102+
workflow_input = await self._extract_workflow_input(decision_task)
103+
104+
# Execute workflow function
105+
result = self._execute_workflow_function_sync(workflow_func, workflow_input)
106+
107+
# Check if workflow is complete
108+
if result is not None:
109+
self._is_workflow_complete = True
110+
logger.info(f"Workflow {self._context.info().workflow_id} completed")
111+
112+
except Exception:
113+
logger.exception(f"Error executing workflow function for {self._context.info().workflow_id}")
114+
raise
115+
116+
async def _extract_workflow_input(self, decision_task: PollForDecisionTaskResponse) -> Any:
117+
"""
118+
Extract workflow input from the decision task history.
119+
120+
Args:
121+
decision_task: The decision task containing workflow history
122+
123+
Returns:
124+
The workflow input data, or None if not found
125+
"""
126+
if not decision_task.history or not hasattr(decision_task.history, 'events'):
127+
logger.warning("No history events found in decision task")
128+
return None
129+
130+
# Look for WorkflowExecutionStarted event
131+
for event in decision_task.history.events:
132+
if hasattr(event, 'workflow_execution_started_event_attributes'):
133+
started_attrs = event.workflow_execution_started_event_attributes
134+
if started_attrs and hasattr(started_attrs, 'input'):
135+
# Deserialize the input using the client's data converter
136+
try:
137+
# Use from_data method with a single type hint of None (no type conversion)
138+
input_data_list = await self._context.client().data_converter.from_data(started_attrs.input, [None])
139+
input_data = input_data_list[0] if input_data_list else None
140+
logger.debug(f"Extracted workflow input: {input_data}")
141+
return input_data
142+
except Exception as e:
143+
logger.warning(f"Failed to deserialize workflow input: {e}")
144+
return None
145+
146+
logger.warning("No WorkflowExecutionStarted event found in history")
147+
return None
148+
149+
def _execute_workflow_function_sync(self, workflow_func: Callable, workflow_input: Any) -> Any:
150+
"""
151+
Execute the workflow function synchronously.
152+
153+
Args:
154+
workflow_func: The workflow function to execute
155+
workflow_input: The input data for the workflow function
156+
157+
Returns:
158+
The result of the workflow function execution
159+
"""
160+
logger.debug(f"Executing workflow function with input: {workflow_input}")
161+
result = workflow_func(workflow_input)
162+
163+
# If the workflow function is async, we need to handle it properly
164+
if asyncio.iscoroutine(result):
165+
# Create a simple event loop for async workflow functions
166+
try:
167+
loop = asyncio.new_event_loop()
168+
asyncio.set_event_loop(loop)
169+
result = loop.run_until_complete(result)
170+
finally:
171+
loop.close()
172+
asyncio.set_event_loop(None)
173+
174+
return result
175+
176+
def _close_event_loop(self) -> None:
177+
"""
178+
Close the decider's event loop.
179+
"""
180+
try:
181+
# Get the current event loop
182+
loop = asyncio.get_event_loop()
183+
if loop.is_running():
184+
# Schedule the loop to stop
185+
loop.call_soon_threadsafe(loop.stop)
186+
logger.debug("Scheduled event loop to stop")
187+
except Exception as e:
188+
logger.warning(f"Error closing event loop: {e}")

cadence/worker/_decision.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ class DecisionWorker:
1414
def __init__(self, client: Client, task_list: str, registry: Registry, options: WorkerOptions) -> None:
1515
self._client = client
1616
self._task_list = task_list
17+
self._registry = registry
1718
self._identity = options["identity"]
1819
permits = asyncio.Semaphore(options["max_concurrent_decision_task_execution_size"])
1920
self._decision_handler = DecisionTaskHandler(client, task_list, registry, **options)

cadence/worker/_decision_task_handler.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
from typing import Dict, Any
23

34
from cadence.api.v1.common_pb2 import Payload
45
from cadence.api.v1.service_worker_pb2 import (

0 commit comments

Comments
 (0)