Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cadence/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@

# Import main client functionality
from .client import Client
from .worker import Registry
from .workflow import workflow

__version__ = "0.1.0"

__all__ = [
"Client",
"Registry",
"workflow",
]
24 changes: 14 additions & 10 deletions cadence/_internal/workflow/workflow_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ class DecisionResult:
decisions: list[Decision]

class WorkflowEngine:
def __init__(self, info: WorkflowInfo, client: Client, workflow_func: Callable[[Any], Any] | None = None):
def __init__(self, info: WorkflowInfo, client: Client, workflow_definition=None):
self._context = Context(client, info)
self._workflow_func = workflow_func
self._workflow_definition = workflow_definition
self._workflow_instance = None
if workflow_definition:
self._workflow_instance = workflow_definition.cls()
self._decision_manager = DecisionManager()
self._decisions_helper = DecisionsHelper(self._decision_manager)
self._is_workflow_complete = False
Expand Down Expand Up @@ -250,19 +253,17 @@ def _fallback_process_workflow_history(self, history) -> None:
async def _execute_workflow_function(self, decision_task: PollForDecisionTaskResponse) -> None:
"""
Execute the workflow function to generate new decisions.

This blocks until the workflow schedules an activity or completes.

Args:
decision_task: The decision task containing workflow context
"""
try:
# Execute the workflow function
# The workflow function should block until it schedules an activity
workflow_func = self._workflow_func
if workflow_func is None:
# Execute the workflow function from the workflow instance
if self._workflow_definition is None or self._workflow_instance is None:
logger.warning(
"No workflow function available",
"No workflow definition or instance available",
extra={
"workflow_type": self._context.info().workflow_type,
"workflow_id": self._context.info().workflow_id,
Expand All @@ -271,6 +272,9 @@ async def _execute_workflow_function(self, decision_task: PollForDecisionTaskRes
)
return

# Get the workflow run method from the instance
workflow_func = self._workflow_definition.get_run_method(self._workflow_instance)

# Extract workflow input from history
workflow_input = await self._extract_workflow_input(decision_task)

Expand All @@ -290,7 +294,7 @@ async def _execute_workflow_function(self, decision_task: PollForDecisionTaskRes
"completion_type": "success"
}
)

except Exception as e:
logger.error(
"Error executing workflow function",
Expand Down
8 changes: 4 additions & 4 deletions cadence/worker/_decision_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) -
)

try:
workflow_func = self._registry.get_workflow(workflow_type_name)
workflow_definition = self._registry.get_workflow(workflow_type_name)
except KeyError:
logger.error(
"Workflow type not found in registry",
Expand All @@ -103,9 +103,9 @@ async def _handle_task_implementation(self, task: PollForDecisionTaskResponse) -
workflow_engine = self._workflow_engines.get(cache_key)
if workflow_engine is None:
workflow_engine = WorkflowEngine(
info=workflow_info,
client=self._client,
workflow_func=workflow_func
info=workflow_info,
client=self._client,
workflow_definition=workflow_definition
)
self._workflow_engines[cache_key] = workflow_engine

Expand Down
66 changes: 36 additions & 30 deletions cadence/worker/_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
"""

import logging
from typing import Callable, Dict, Optional, Unpack, TypedDict, Sequence, overload
from typing import Callable, Dict, Optional, Unpack, TypedDict, Sequence, overload, Type, Union
from cadence.activity import ActivityDefinitionOptions, ActivityDefinition, ActivityDecorator, P, T
from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions

logger = logging.getLogger(__name__)

Expand All @@ -28,53 +29,58 @@ class Registry:

def __init__(self) -> None:
"""Initialize the registry."""
self._workflows: Dict[str, Callable] = {}
self._workflows: Dict[str, WorkflowDefinition] = {}
self._activities: Dict[str, ActivityDefinition] = {}
self._workflow_aliases: Dict[str, str] = {} # alias -> name mapping

def workflow(
self,
func: Optional[Callable] = None,
cls: Optional[Type] = None,
**kwargs: Unpack[RegisterWorkflowOptions]
) -> Callable:
) -> Union[Type, Callable[[Type], Type]]:
"""
Register a workflow function.
Register a workflow class.

This method can be used as a decorator or called directly.

Only supports class-based workflows.

Args:
func: The workflow function to register
cls: The workflow class to register
**kwargs: Options for registration (name, alias)

Returns:
The decorated function or the function itself
The decorated class

Raises:
KeyError: If workflow name already exists
ValueError: If class workflow is invalid
"""
options = RegisterWorkflowOptions(**kwargs)
def decorator(f: Callable) -> Callable:
workflow_name = options.get('name') or f.__name__

def decorator(target: Type) -> Type:
workflow_name = options.get('name') or target.__name__

if workflow_name in self._workflows:
raise KeyError(f"Workflow '{workflow_name}' is already registered")

self._workflows[workflow_name] = f


# Create WorkflowDefinition with type information
workflow_opts = WorkflowDefinitionOptions(name=workflow_name)
workflow_def = WorkflowDefinition.wrap(target, workflow_opts)
self._workflows[workflow_name] = workflow_def

# Register alias if provided
alias = options.get('alias')
if alias:
if alias in self._workflow_aliases:
raise KeyError(f"Workflow alias '{alias}' is already registered")
self._workflow_aliases[alias] = workflow_name

logger.info(f"Registered workflow '{workflow_name}'")
return f
if func is None:
return target

if cls is None:
return decorator
return decorator(func)
return decorator(cls)

@overload
def activity(self, func: Callable[P, T]) -> ActivityDefinition[P, T]:
Expand Down Expand Up @@ -135,25 +141,25 @@ def _register_activity(self, defn: ActivityDefinition) -> None:
self._activities[defn.name] = defn


def get_workflow(self, name: str) -> Callable:
def get_workflow(self, name: str) -> WorkflowDefinition:
"""
Get a registered workflow by name.

Args:
name: Name or alias of the workflow

Returns:
The workflow function
The workflow definition

Raises:
KeyError: If workflow is not found
"""
# Check if it's an alias
actual_name = self._workflow_aliases.get(name, name)

if actual_name not in self._workflows:
raise KeyError(f"Workflow '{name}' not found in registry")

return self._workflows[actual_name]

def get_activity(self, name: str) -> ActivityDefinition:
Expand Down
110 changes: 109 additions & 1 deletion cadence/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,118 @@
from contextlib import contextmanager
from contextvars import ContextVar
from dataclasses import dataclass
from typing import Iterator
from typing import Iterator, Callable, TypeVar, TypedDict, Type, cast, Any
from functools import wraps

from cadence.client import Client

T = TypeVar('T')


class WorkflowDefinitionOptions(TypedDict, total=False):
"""Options for defining a workflow."""
name: str


class WorkflowDefinition:
"""
Definition of a workflow class with metadata.

Similar to ActivityDefinition but for workflow classes.
Provides type safety and metadata for workflow classes.
"""

def __init__(self, cls: Type, name: str):
self._cls = cls
self._name = name

@property
def name(self) -> str:
"""Get the workflow name."""
return self._name

@property
def cls(self) -> Type:
"""Get the workflow class."""
return self._cls

def get_run_method(self, instance: Any) -> Callable:
"""Get the workflow run method from an instance of the workflow class."""
for attr_name in dir(instance):
if attr_name.startswith('_'):
continue
attr = getattr(instance, attr_name)
if callable(attr) and hasattr(attr, '_workflow_run'):
return cast(Callable, attr)
raise ValueError(f"No @workflow.run method found in class {self._cls.__name__}")

@staticmethod
def wrap(cls: Type, opts: WorkflowDefinitionOptions) -> 'WorkflowDefinition':
"""
Wrap a class as a WorkflowDefinition.

Args:
cls: The workflow class to wrap
opts: Options for the workflow definition

Returns:
A WorkflowDefinition instance

Raises:
ValueError: If no run method is found or multiple run methods exist
"""
name = cls.__name__
if "name" in opts and opts["name"]:
name = opts["name"]

# Validate that the class has exactly one run method
run_method_count = 0
for attr_name in dir(cls):
if attr_name.startswith('_'):
continue

attr = getattr(cls, attr_name)
if not callable(attr):
continue

# Check for workflow run method
if hasattr(attr, '_workflow_run'):
run_method_count += 1

if run_method_count == 0:
raise ValueError(f"No @workflow.run method found in class {cls.__name__}")
elif run_method_count > 1:
raise ValueError(f"Multiple @workflow.run methods found in class {cls.__name__}")

return WorkflowDefinition(cls, name)


def run(func: Callable[..., T]) -> Callable[..., T]:
"""
Decorator to mark a method as the main workflow run method.

Args:
func: The method to mark as the workflow run method

Returns:
The decorated method with workflow run metadata
"""
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)

# Attach metadata to the function
wrapper._workflow_run = True # type: ignore
return wrapper


# Create a simple namespace object for the workflow decorators
class _WorkflowNamespace:
run = staticmethod(run)

workflow = _WorkflowNamespace()


@dataclass
class WorkflowInfo:
workflow_type: str
Expand Down
Loading