Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions cadence/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""
Cadence Python Client

A Python framework for authoring workflows and activities for Cadence.
"""

# Import main client functionality
from .client import Client

__version__ = "0.1.0"

__all__ = [
"Client",
]
13 changes: 11 additions & 2 deletions cadence/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,16 @@
WorkerOptions
)

from ._registry import (
Registry,
RegisterWorkflowOptions,
RegisterActivityOptions,
)

__all__ = [
"Worker",
"WorkerOptions"
]
"WorkerOptions",
'Registry',
'RegisterWorkflowOptions',
'RegisterActivityOptions',
]
175 changes: 175 additions & 0 deletions cadence/worker/_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
#!/usr/bin/env python3
"""
Workflow and Activity Registry for Cadence Python Client.
This module provides a registry system for managing workflows and activities,
similar to the Go client's registry.go implementation.
"""

import logging
from typing import Callable, Dict, Optional
from dataclasses import dataclass


logger = logging.getLogger(__name__)


@dataclass
class RegisterWorkflowOptions:
"""Options for registering a workflow."""
name: Optional[str] = None
alias: Optional[str] = None


@dataclass
class RegisterActivityOptions:
"""Options for registering an activity."""
name: Optional[str] = None
alias: Optional[str] = None


class Registry:
"""
Registry for managing workflows and activities.
This class provides functionality to register, retrieve, and manage
workflows and activities in a Cadence application.
"""

def __init__(self):
"""Initialize the registry."""
self._workflows: Dict[str, Callable] = {}
self._activities: Dict[str, Callable] = {}
self._workflow_aliases: Dict[str, str] = {} # alias -> name mapping
self._activity_aliases: Dict[str, str] = {} # alias -> name mapping

def workflow(
self,
func: Optional[Callable] = None,
**kwargs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: : Unpack[RegisterWorkflowOptions] to signify the typing of kwargs

) -> Callable:
"""
Register a workflow function.
This method can be used as a decorator or called directly.
Args:
func: The workflow function to register
**kwargs: Options for registration (name, alias)
Returns:
The decorated function or the function itself
Raises:
KeyError: If workflow name already exists
"""
options = RegisterWorkflowOptions(**kwargs)

def decorator(f: Callable) -> Callable:
workflow_name = options.name or f.__name__

if workflow_name in self._workflows:
raise KeyError(f"Workflow '{workflow_name}' is already registered")

self._workflows[workflow_name] = f

# Register alias if provided
if options.alias:
if options.alias in self._workflow_aliases:
raise KeyError(f"Workflow alias '{options.alias}' is already registered")
self._workflow_aliases[options.alias] = workflow_name

logger.info(f"Registered workflow '{workflow_name}'")
return f

if func is None:
return decorator
return decorator(func)

def activity(
self,
func: Optional[Callable] = None,
**kwargs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: : Unpack[RegisterActivityOptions] to signify the typing of kwargs.

) -> Callable:
"""
Register an activity function.
This method can be used as a decorator or called directly.
Args:
func: The activity function to register
**kwargs: Options for registration (name, alias)
Returns:
The decorated function or the function itself
Raises:
KeyError: If activity name already exists
"""
options = RegisterActivityOptions(**kwargs)

def decorator(f: Callable) -> Callable:
activity_name = options.name or f.__name__

if activity_name in self._activities:
raise KeyError(f"Activity '{activity_name}' is already registered")

self._activities[activity_name] = f

# Register alias if provided
if options.alias:
if options.alias in self._activity_aliases:
raise KeyError(f"Activity alias '{options.alias}' is already registered")
self._activity_aliases[options.alias] = activity_name

logger.info(f"Registered activity '{activity_name}'")
return f

if func is None:
return decorator
return decorator(func)

def get_workflow(self, name: str) -> Callable:
"""
Get a registered workflow by name.
Args:
name: Name or alias of the workflow
Returns:
The workflow function
Raises:
KeyError: If workflow is not found
"""
# Check if it's an alias
actual_name = self._workflow_aliases.get(name, name)

if actual_name not in self._workflows:
raise KeyError(f"Workflow '{name}' not found in registry")

return self._workflows[actual_name]

def get_activity(self, name: str) -> Callable:
"""
Get a registered activity by name.
Args:
name: Name or alias of the activity
Returns:
The activity function
Raises:
KeyError: If activity is not found
"""
# Check if it's an alias
actual_name = self._activity_aliases.get(name, name)

if actual_name not in self._activities:
raise KeyError(f"Activity '{name}' not found in registry")

return self._activities[actual_name]



1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies = [
"grpcio-tools>=1.50.0",
"msgspec>=0.19.0",
"protobuf==5.29.1",
"pytest>=8.4.1",
"typing-extensions>=4.0.0",
]

Expand Down
Loading