Skip to content
Merged
171 changes: 167 additions & 4 deletions cadence/_internal/workflow/workflow_engine.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,186 @@
import asyncio
import logging
from dataclasses import dataclass
from typing import Optional, Callable, Any
from typing import Callable, Any

from cadence._internal.workflow.context import Context
from cadence.api.v1.decision_pb2 import Decision
from cadence.client import Client
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
from cadence.workflow import WorkflowInfo
from cadence._internal.decision_state_machine import DecisionManager

logger = logging.getLogger(__name__)


@dataclass
class DecisionResult:
decisions: list[Decision]

class WorkflowEngine:
def __init__(self, info: WorkflowInfo, client: Client, workflow_func: Optional[Callable[..., Any]] = None):
def __init__(self, info: WorkflowInfo, client: Client, workflow_func: Callable[[Any], Any] | None = None):
self._context = Context(client, info)
self._workflow_func = workflow_func
self._decision_manager = DecisionManager()
self._is_workflow_complete = False

# TODO: Implement this
async def process_decision(self, decision_task: PollForDecisionTaskResponse) -> DecisionResult:
with self._context._activate():
"""
Process a decision task and generate decisions.
Args:
decision_task: The PollForDecisionTaskResponse from the service
Returns:
DecisionResult containing the list of decisions
"""
try:
logger.info(f"Processing decision task for workflow {self._context.info().workflow_id}")

# Process workflow history to update decision state machines
if decision_task.history:
self._process_workflow_history(decision_task.history)

# Execute workflow function to generate new decisions
if not self._is_workflow_complete:
await self._execute_workflow_function(decision_task)

# Collect all pending decisions from state machines
decisions = self._decision_manager.collect_pending_decisions()

# Close decider's event loop
self._close_event_loop()

logger.info(f"Generated {len(decisions)} decisions for workflow {self._context.info().workflow_id}")

return DecisionResult(decisions=decisions)

except Exception:
logger.exception(f"Error processing decision task for workflow {self._context.info().workflow_id}")
# Return empty decisions on error - the task will be failed by the handler
return DecisionResult(decisions=[])

def _process_workflow_history(self, history) -> None:
"""
Process workflow history events to update decision state machines.
Args:
history: The workflow history from the decision task
"""
if not history or not hasattr(history, 'events'):
return

logger.debug(f"Processing {len(history.events)} history events")

for event in history.events:
try:
self._decision_manager.handle_history_event(event)
except Exception as e:
logger.warning(f"Error processing history event: {e}")

async def _execute_workflow_function(self, decision_task: PollForDecisionTaskResponse) -> None:
"""
Execute the workflow function to generate new decisions.
This blocks until the workflow schedules an activity or completes.
Args:
decision_task: The decision task containing workflow context
"""
try:
with self._context._activate():
# Execute the workflow function
# The workflow function should block until it schedules an activity
workflow_func = self._workflow_func
if workflow_func is None:
logger.warning(f"No workflow function available for workflow {self._context.info().workflow_id}")
return

# Extract workflow input from history
workflow_input = await self._extract_workflow_input(decision_task)

# Execute workflow function
result = self._execute_workflow_function_sync(workflow_func, workflow_input)

# Check if workflow is complete
if result is not None:
self._is_workflow_complete = True
logger.info(f"Workflow {self._context.info().workflow_id} completed")

except Exception:
logger.exception(f"Error executing workflow function for {self._context.info().workflow_id}")
raise

async def _extract_workflow_input(self, decision_task: PollForDecisionTaskResponse) -> Any:
"""
Extract workflow input from the decision task history.
Args:
decision_task: The decision task containing workflow history
Returns:
The workflow input data, or None if not found
"""
if not decision_task.history or not hasattr(decision_task.history, 'events'):
logger.warning("No history events found in decision task")
return None

# Look for WorkflowExecutionStarted event
for event in decision_task.history.events:
if hasattr(event, 'workflow_execution_started_event_attributes'):
started_attrs = event.workflow_execution_started_event_attributes
if started_attrs and hasattr(started_attrs, 'input'):
# Deserialize the input using the client's data converter
try:
# Use from_data method with a single type hint of None (no type conversion)
input_data_list = await self._context.client().data_converter.from_data(started_attrs.input, [None])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type hints are needed here. I recommend creating a WorkflowDefinition class similar to activitydefinition. And pass it from registry.

input_data = input_data_list[0] if input_data_list else None
logger.debug(f"Extracted workflow input: {input_data}")
return input_data
except Exception as e:
logger.warning(f"Failed to deserialize workflow input: {e}")
return None

logger.warning("No WorkflowExecutionStarted event found in history")
return None

def _execute_workflow_function_sync(self, workflow_func: Callable, workflow_input: Any) -> Any:
"""
Execute the workflow function synchronously.
Args:
workflow_func: The workflow function to execute
workflow_input: The input data for the workflow function
Returns:
The result of the workflow function execution
"""
logger.debug(f"Executing workflow function with input: {workflow_input}")
result = workflow_func(workflow_input)

# If the workflow function is async, we need to handle it properly
if asyncio.iscoroutine(result):
# Create a simple event loop for async workflow functions
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(result)
finally:
loop.close()
asyncio.set_event_loop(None)

return result

def _close_event_loop(self) -> None:
"""
Close the decider's event loop.
"""
try:
# Get the current event loop
loop = asyncio.get_event_loop()
if loop.is_running():
# Schedule the loop to stop
loop.call_soon_threadsafe(loop.stop)
logger.debug("Scheduled event loop to stop")
except Exception as e:
logger.warning(f"Error closing event loop: {e}")
20 changes: 8 additions & 12 deletions cadence/worker/_decision.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
import asyncio
from typing import Optional

from cadence.api.v1.common_pb2 import Payload
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskRequest, PollForDecisionTaskResponse, \
RespondDecisionTaskFailedRequest
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskRequest, PollForDecisionTaskResponse
from cadence.api.v1.tasklist_pb2 import TaskList, TaskListKind
from cadence.api.v1.workflow_pb2 import DecisionTaskFailedCause
from cadence.client import Client
from cadence.worker._poller import Poller
from cadence.worker._types import WorkerOptions, _LONG_POLL_TIMEOUT
from cadence.worker._decision_task_handler import DecisionTaskHandler
from cadence.worker._registry import Registry


class DecisionWorker:
def __init__(self, client: Client, task_list: str, options: WorkerOptions) -> None:
def __init__(self, client: Client, task_list: str, registry: Registry, options: WorkerOptions) -> None:
self._client = client
self._task_list = task_list
self._registry = registry
self._identity = options["identity"]
permits = asyncio.Semaphore(options["max_concurrent_decision_task_execution_size"])
self._decision_handler = DecisionTaskHandler(client, task_list, registry, **options)
self._poller = Poller[PollForDecisionTaskResponse](options["decision_task_pollers"], permits, self._poll, self._execute)
# TODO: Sticky poller, actually running workflows, etc.

Expand All @@ -30,17 +31,12 @@ async def _poll(self) -> Optional[PollForDecisionTaskResponse]:
identity=self._identity,
), timeout=_LONG_POLL_TIMEOUT)

if task.task_token:
if task and task.task_token:
return task
else:
return None


async def _execute(self, task: PollForDecisionTaskResponse) -> None:
await self._client.worker_stub.RespondDecisionTaskFailed(RespondDecisionTaskFailedRequest(
task_token=task.task_token,
cause=DecisionTaskFailedCause.DECISION_TASK_FAILED_CAUSE_UNHANDLED_DECISION,
identity=self._identity,
details=Payload(data=b'not implemented')
))
await self._decision_handler.handle_task(task)

3 changes: 1 addition & 2 deletions cadence/worker/_decision_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ async def _respond_decision_task_completed(self, task: PollForDecisionTaskRespon
task_token=task.task_token,
decisions=decision_result.decisions,
identity=self._identity,
return_new_decision_task=True,
force_create_new_decision_task=False
return_new_decision_task=True
)

await self._client.worker_stub.RespondDecisionTaskCompleted(request)
Expand Down
2 changes: 1 addition & 1 deletion cadence/worker/_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(self, client: Client, task_list: str, registry: Registry, **kwargs:
_validate_and_copy_defaults(client, task_list, options)
self._options = options
self._activity_worker = ActivityWorker(client, task_list, registry, options)
self._decision_worker = DecisionWorker(client, task_list, options)
self._decision_worker = DecisionWorker(client, task_list, registry, options)


async def run(self) -> None:
Expand Down
Loading