Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions cadence/_internal/activity/_activity_executor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import inspect
from concurrent.futures import ThreadPoolExecutor
from logging import getLogger
from traceback import format_exception
Expand All @@ -7,7 +6,7 @@
from google.protobuf.timestamp import to_datetime

from cadence._internal.activity._context import _Context, _SyncContext
from cadence.activity import ActivityInfo
from cadence.activity import ActivityInfo, ActivityDefinition, ExecutionStrategy
from cadence.api.v1.common_pb2 import Failure
from cadence.api.v1.service_worker_pb2 import PollForActivityTaskResponse, RespondActivityTaskFailedRequest, \
RespondActivityTaskCompletedRequest
Expand All @@ -16,7 +15,7 @@
_logger = getLogger(__name__)

class ActivityExecutor:
def __init__(self, client: Client, task_list: str, identity: str, max_workers: int, registry: Callable[[str], Callable]):
def __init__(self, client: Client, task_list: str, identity: str, max_workers: int, registry: Callable[[str], ActivityDefinition]):
self._client = client
self._data_converter = client.data_converter
self._registry = registry
Expand All @@ -36,16 +35,16 @@ async def execute(self, task: PollForActivityTaskResponse):
def _create_context(self, task: PollForActivityTaskResponse) -> _Context:
activity_type = task.activity_type.name
try:
activity_fn = self._registry(activity_type)
activity_def = self._registry(activity_type)
except KeyError:
raise KeyError(f"Activity type not found: {activity_type}") from None

info = self._create_info(task)

if inspect.iscoroutinefunction(activity_fn):
return _Context(self._client, info, activity_fn)
if activity_def.strategy == ExecutionStrategy.ASYNC:
return _Context(self._client, info, activity_def)
else:
return _SyncContext(self._client, info, activity_fn, self._thread_pool)
return _SyncContext(self._client, info, activity_def, self._thread_pool)

async def _report_failure(self, task: PollForActivityTaskResponse, error: Exception):
try:
Expand Down
11 changes: 5 additions & 6 deletions cadence/_internal/activity/_context.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import asyncio
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Callable, Any
from typing import Any

from cadence import Client
from cadence._internal.type_utils import get_fn_parameters
from cadence.activity import ActivityInfo, ActivityContext
from cadence.activity import ActivityInfo, ActivityContext, ActivityDefinition
from cadence.api.v1.common_pb2 import Payload


class _Context(ActivityContext):
def __init__(self, client: Client, info: ActivityInfo, activity_fn: Callable[[Any], Any]):
def __init__(self, client: Client, info: ActivityInfo, activity_fn: ActivityDefinition[[Any], Any]):
self._client = client
self._info = info
self._activity_fn = activity_fn
Expand All @@ -20,7 +19,7 @@ async def execute(self, payload: Payload) -> Any:
return await self._activity_fn(*params)

async def _to_params(self, payload: Payload) -> list[Any]:
type_hints = get_fn_parameters(self._activity_fn)
type_hints = [param.type_hint for param in self._activity_fn.params]
return await self._client.data_converter.from_data(payload, type_hints)

def client(self) -> Client:
Expand All @@ -30,7 +29,7 @@ def info(self) -> ActivityInfo:
return self._info

class _SyncContext(_Context):
def __init__(self, client: Client, info: ActivityInfo, activity_fn: Callable[[Any], Any], executor: ThreadPoolExecutor):
def __init__(self, client: Client, info: ActivityInfo, activity_fn: ActivityDefinition[[Any], Any], executor: ThreadPoolExecutor):
super().__init__(client, info, activity_fn)
self._executor = executor

Expand Down
19 changes: 0 additions & 19 deletions cadence/_internal/type_utils.py

This file was deleted.

103 changes: 102 additions & 1 deletion cadence/activity.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import inspect
from abc import ABC, abstractmethod
from contextlib import contextmanager
from contextvars import ContextVar
from dataclasses import dataclass
from datetime import timedelta, datetime
from typing import Iterator
from enum import Enum
from functools import update_wrapper
from inspect import signature, Parameter
from typing import Iterator, TypedDict, Unpack, Callable, Type, ParamSpec, TypeVar, Generic, get_type_hints, \
Any, overload

from cadence import Client

Expand Down Expand Up @@ -59,3 +64,99 @@ def is_set() -> bool:
@staticmethod
def get() -> 'ActivityContext':
return ActivityContext._var.get()


@dataclass(frozen=True)
class ActivityParameter:
name: str
type_hint: Type | None
default_value: Any | None

class ExecutionStrategy(Enum):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: naming, are we going to have more than 2 types of enum here? if not, perhaps a naming of isAsync might be more descriptive here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a chance we might add multiprocessing support in the future.

ASYNC = "async"
THREAD_POOL = "thread_pool"

class ActivityDefinitionOptions(TypedDict, total=False):
name: str

P = ParamSpec('P')
T = TypeVar('T')

class ActivityDefinition(Generic[P, T]):
def __init__(self, wrapped: Callable[P, T], name: str, strategy: ExecutionStrategy, params: list[ActivityParameter]):
self._wrapped = wrapped
self._name = name
self._strategy = strategy
self._params = params
update_wrapper(self, wrapped)

def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T:
return self._wrapped(*args, **kwargs)

@property
def name(self) -> str:
return self._name

@property
def strategy(self) -> ExecutionStrategy:
return self._strategy

@property
def params(self) -> list[ActivityParameter]:
return self._params

@staticmethod
def wrap(fn: Callable[P, T], opts: ActivityDefinitionOptions) -> 'ActivityDefinition[P, T]':
name = fn.__qualname__
if "name" in opts and opts["name"]:
name = opts["name"]

strategy = ExecutionStrategy.THREAD_POOL
if inspect.iscoroutinefunction(fn) or inspect.iscoroutinefunction(fn.__call__): # type: ignore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if fn is enforced as a callable, what's the difference between those two if conditions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It covers some really weird edge cases. This PR to CPython actually has good examples: https://github.com/python/cpython/pull/99247/files

        class Cl:
            async def __call__(self):
                pass

        self.assertFalse(inspect.iscoroutinefunction(Cl))
        # instances with async def __call__ are NOT recognised.
        self.assertFalse(inspect.iscoroutinefunction(Cl()))

I don't really know why it works this way, but it does. The only likely scenario for someone to run into an issue with this is if they have other decorators on the function.

strategy = ExecutionStrategy.ASYNC

params = _get_params(fn)
return ActivityDefinition(fn, name, strategy, params)


ActivityDecorator = Callable[[Callable[P, T]], ActivityDefinition[P, T]]

@overload
def defn(fn: Callable[P, T]) -> ActivityDefinition[P, T]:
...

@overload
def defn(**kwargs: Unpack[ActivityDefinitionOptions]) -> ActivityDecorator:
...

def defn(fn: Callable[P, T] | None = None, **kwargs: Unpack[ActivityDefinitionOptions]) -> ActivityDecorator | ActivityDefinition[P, T]:
options = ActivityDefinitionOptions(**kwargs)
def decorator(inner_fn: Callable[P, T]) -> ActivityDefinition[P, T]:
return ActivityDefinition.wrap(inner_fn, options)

if fn is not None:
return decorator(fn)

return decorator


def _get_params(fn: Callable) -> list[ActivityParameter]:
args = signature(fn).parameters
hints = get_type_hints(fn)
result = []
for name, param in args.items():
# "unbound functions" aren't a thing in the Python spec. Filter out the self parameter and hope they followed
# the convention.
if param.name == "self":
continue
default = None
if param.default != Parameter.empty:
default = param.default
if param.kind in (Parameter.POSITIONAL_ONLY, Parameter.POSITIONAL_OR_KEYWORD):
type_hint = hints.get(name, None)
result.append(ActivityParameter(name, type_hint, default))

else:
raise ValueError(f"Parameters must be positional. {name} is {param.kind}, and not valid")

return result
2 changes: 0 additions & 2 deletions cadence/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@
from ._registry import (
Registry,
RegisterWorkflowOptions,
RegisterActivityOptions,
)

__all__ = [
"Worker",
"WorkerOptions",
'Registry',
'RegisterWorkflowOptions',
'RegisterActivityOptions',
]
Loading