Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions cq/_core/dispatcher/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
from cq._core.dispatcher.base import BaseDispatcher, Dispatcher
from cq._core.handler import (
HandlerFactory,
HandlerManager,
MultipleHandlerManager,
SingleHandlerManager,
HandlerRegistry,
MultipleHandlerRegistry,
SingleHandlerRegistry,
)
from cq._core.middleware import Middleware

Expand All @@ -35,29 +35,29 @@ def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Sel


class BaseBus[I, O](BaseDispatcher[I, O], Bus[I, O], ABC):
__slots__ = ("__listeners", "__manager")
__slots__ = ("__listeners", "__registry")

__listeners: list[Listener[I]]
__manager: HandlerManager[I, O]
__registry: HandlerRegistry[I, O]

def __init__(self, manager: HandlerManager[I, O]) -> None:
def __init__(self, registry: HandlerRegistry[I, O], /) -> None:
super().__init__()
self.__listeners = []
self.__manager = manager
self.__registry = registry

def add_listeners(self, *listeners: Listener[I]) -> Self:
self.__listeners.extend(listeners)
return self

def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Self:
self.__manager.subscribe(input_type, factory)
self.__registry.subscribe(input_type, factory)
return self

def _handlers_from(
self,
input_type: type[I],
) -> Iterator[Callable[[I], Awaitable[O]]]:
return self.__manager.handlers_from(input_type)
return self.__registry.handlers_from(input_type)

def _trigger_listeners(self, input_value: I, /, task_group: TaskGroup) -> None:
for listener in self.__listeners:
Expand All @@ -67,8 +67,8 @@ def _trigger_listeners(self, input_value: I, /, task_group: TaskGroup) -> None:
class SimpleBus[I, O](BaseBus[I, O]):
__slots__ = ()

def __init__(self, manager: HandlerManager[I, O] | None = None) -> None:
super().__init__(manager or SingleHandlerManager())
def __init__(self, registry: HandlerRegistry[I, O] | None = None, /) -> None:
super().__init__(registry or SingleHandlerRegistry())

async def dispatch(self, input_value: I, /) -> O:
async with anyio.create_task_group() as task_group:
Expand All @@ -83,8 +83,8 @@ async def dispatch(self, input_value: I, /) -> O:
class TaskBus[I](BaseBus[I, None]):
__slots__ = ()

def __init__(self, manager: HandlerManager[I, None] | None = None) -> None:
super().__init__(manager or MultipleHandlerManager())
def __init__(self, registry: HandlerRegistry[I, None] | None = None, /) -> None:
super().__init__(registry or MultipleHandlerRegistry())

async def dispatch(self, input_value: I, /) -> None:
async with anyio.create_task_group() as task_group:
Expand Down
10 changes: 5 additions & 5 deletions cq/_core/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async def handle(self, *args: P.args, **kwargs: P.kwargs) -> T:


@runtime_checkable
class HandlerManager[I, O](Protocol):
class HandlerRegistry[I, O](Protocol):
__slots__ = ()

@abstractmethod
Expand All @@ -40,7 +40,7 @@ def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Sel


@dataclass(repr=False, eq=False, frozen=True, slots=True)
class MultipleHandlerManager[I, O](HandlerManager[I, O]):
class MultipleHandlerRegistry[I, O](HandlerRegistry[I, O]):
__factories: dict[type[I], list[HandlerFactory[[I], O]]] = field(
default_factory=partial(defaultdict, list),
init=False,
Expand All @@ -62,7 +62,7 @@ def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Sel


@dataclass(repr=False, eq=False, frozen=True, slots=True)
class SingleHandlerManager[I, O](HandlerManager[I, O]):
class SingleHandlerRegistry[I, O](HandlerRegistry[I, O]):
__factories: dict[type[I], HandlerFactory[[I], O]] = field(
default_factory=dict,
init=False,
Expand Down Expand Up @@ -92,7 +92,7 @@ def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Sel

@dataclass(repr=False, eq=False, frozen=True, slots=True)
class HandlerDecorator[I, O]:
manager: HandlerManager[I, O]
registry: HandlerRegistry[I, O]
injection_module: injection.Module = field(default_factory=injection.mod)

if TYPE_CHECKING: # pragma: no cover
Expand Down Expand Up @@ -154,7 +154,7 @@ def __decorator(
) -> HandlerType[[I], O]:
factory = self.injection_module.make_async_factory(wrapped, threadsafe)
input_type = input_type or _resolve_input_type(wrapped)
self.manager.subscribe(input_type, factory)
self.registry.subscribe(input_type, factory)
return wrapped


Expand Down
16 changes: 8 additions & 8 deletions cq/_core/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from cq._core.dispatcher.bus import Bus, SimpleBus, TaskBus
from cq._core.handler import (
HandlerDecorator,
MultipleHandlerManager,
SingleHandlerManager,
MultipleHandlerRegistry,
SingleHandlerRegistry,
)
from cq._core.scope import CQScope
from cq.middlewares.scope import InjectionScopeMiddleware
Expand All @@ -24,13 +24,13 @@


command_handler: Final[HandlerDecorator[Command, Any]] = HandlerDecorator(
SingleHandlerManager(),
SingleHandlerRegistry(),
)
event_handler: Final[HandlerDecorator[Event, None]] = HandlerDecorator(
MultipleHandlerManager(),
MultipleHandlerRegistry(),
)
query_handler: Final[HandlerDecorator[Query, Any]] = HandlerDecorator(
SingleHandlerManager(),
SingleHandlerRegistry(),
)


Expand All @@ -41,7 +41,7 @@
mode="fallback",
)
def new_command_bus(*, threadsafe: bool | None = None) -> Bus[Command, Any]:
bus = SimpleBus(command_handler.manager)
bus = SimpleBus(command_handler.registry)
transaction_scope_middleware = InjectionScopeMiddleware(
CQScope.TRANSACTION,
exist_ok=True,
Expand All @@ -58,7 +58,7 @@ def new_command_bus(*, threadsafe: bool | None = None) -> Bus[Command, Any]:
mode="fallback",
)
def new_event_bus() -> Bus[Event, None]:
return TaskBus(event_handler.manager)
return TaskBus(event_handler.registry)


@injection.injectable(
Expand All @@ -68,4 +68,4 @@ def new_event_bus() -> Bus[Event, None]:
mode="fallback",
)
def new_query_bus() -> Bus[Query, Any]:
return SimpleBus(query_handler.manager)
return SimpleBus(query_handler.registry)
4 changes: 2 additions & 2 deletions tests/core/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pytest

from cq._core.handler import HandlerDecorator, SingleHandlerManager
from cq._core.handler import HandlerDecorator, SingleHandlerRegistry


class _Handler:
Expand All @@ -13,7 +13,7 @@ async def handle(self, input_value: str) -> NoReturn:
class TestHandlerDecorator:
@pytest.fixture(scope="function")
def handler_decorator(self) -> HandlerDecorator[Any, Any]:
return HandlerDecorator(SingleHandlerManager())
return HandlerDecorator(SingleHandlerRegistry())

def test_call_with_success_return_wrapped_type(
self,
Expand Down
Loading