Skip to content

Commit e620472

Browse files
authored
feat: ✨ New handler registration logic
1 parent def82b8 commit e620472

File tree

10 files changed

+228
-220
lines changed

10 files changed

+228
-220
lines changed

cq/__init__.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
QueryBus,
1313
command_handler,
1414
event_handler,
15-
get_command_bus,
16-
get_event_bus,
17-
get_query_bus,
15+
new_command_bus,
16+
new_event_bus,
17+
new_query_bus,
1818
query_handler,
1919
)
2020
from ._core.middleware import Middleware, MiddlewareResult
@@ -39,8 +39,8 @@
3939
"RelatedEvents",
4040
"command_handler",
4141
"event_handler",
42-
"get_command_bus",
43-
"get_event_bus",
44-
"get_query_bus",
42+
"new_command_bus",
43+
"new_event_bus",
44+
"new_query_bus",
4545
"query_handler",
4646
)

cq/_core/dispatcher/bus.py

Lines changed: 28 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,20 @@
11
from abc import ABC, abstractmethod
2-
from collections import defaultdict
32
from collections.abc import Awaitable, Callable
4-
from dataclasses import dataclass, field
5-
from inspect import getmro, isclass
6-
from types import GenericAlias
7-
from typing import Any, Protocol, Self, TypeAliasType, runtime_checkable
3+
from typing import Any, Protocol, Self, runtime_checkable
84

95
import anyio
10-
import injection
6+
from anyio.abc import TaskGroup
117

128
from cq._core.dispatcher.base import BaseDispatcher, Dispatcher
13-
14-
type HandlerType[**P, T] = type[Handler[P, T]]
15-
type HandlerFactory[**P, T] = Callable[..., Awaitable[Handler[P, T]]]
9+
from cq._core.handler import (
10+
HandlerFactory,
11+
HandlerManager,
12+
MultipleHandlerManager,
13+
SingleHandlerManager,
14+
)
1615

1716
type Listener[T] = Callable[[T], Awaitable[Any]]
1817

19-
type BusType[I, O] = type[Bus[I, O]]
20-
21-
22-
@runtime_checkable
23-
class Handler[**P, T](Protocol):
24-
__slots__ = ()
25-
26-
@abstractmethod
27-
async def handle(self, *args: P.args, **kwargs: P.kwargs) -> T:
28-
raise NotImplementedError
29-
3018

3119
@runtime_checkable
3220
class Bus[I, O](Dispatcher[I, O], Protocol):
@@ -41,27 +29,6 @@ def add_listeners(self, *listeners: Listener[I]) -> Self:
4129
raise NotImplementedError
4230

4331

44-
@dataclass(eq=False, frozen=True, slots=True)
45-
class SubscriberDecorator[I, O]:
46-
bus_type: BusType[I, O] | TypeAliasType | GenericAlias
47-
injection_module: injection.Module = field(default_factory=injection.mod)
48-
49-
def __call__(self, first_input_type: type[I], /, *input_types: type[I]) -> Any:
50-
def decorator(wrapped: type[Handler[[I], O]]) -> type[Handler[[I], O]]:
51-
if not isclass(wrapped) or not issubclass(wrapped, Handler):
52-
raise TypeError(f"`{wrapped}` isn't a valid handler.")
53-
54-
bus = self.injection_module.find_instance(self.bus_type)
55-
factory = self.injection_module.make_async_factory(wrapped)
56-
57-
for input_type in (first_input_type, *input_types):
58-
bus.subscribe(input_type, factory)
59-
60-
return wrapped
61-
62-
return decorator
63-
64-
6532
class BaseBus[I, O](BaseDispatcher[I, O], Bus[I, O], ABC):
6633
__slots__ = ("__listeners",)
6734

@@ -75,81 +42,47 @@ def add_listeners(self, *listeners: Listener[I]) -> Self:
7542
self.__listeners.extend(listeners)
7643
return self
7744

78-
async def _trigger_listeners(self, input_value: I, /) -> None:
79-
listeners = self.__listeners
80-
81-
if not listeners:
82-
return
83-
84-
async with anyio.create_task_group() as task_group:
85-
for listener in listeners:
86-
task_group.start_soon(listener, input_value)
87-
88-
@staticmethod
89-
def _make_handle_function(
90-
handler_factory: HandlerFactory[[I], O],
91-
) -> Callable[[I], Awaitable[O]]:
92-
async def handle(input_value: I) -> O:
93-
handler = await handler_factory()
94-
return await handler.handle(input_value)
95-
96-
return handle
45+
def _trigger_listeners(self, input_value: I, /, task_group: TaskGroup) -> None:
46+
for listener in self.__listeners:
47+
task_group.start_soon(listener, input_value)
9748

9849

9950
class SimpleBus[I, O](BaseBus[I, O]):
100-
__slots__ = ("__handlers",)
51+
__slots__ = ("__manager",)
10152

102-
__handlers: dict[type[I], HandlerFactory[[I], O]]
53+
__manager: HandlerManager[I, O]
10354

104-
def __init__(self) -> None:
55+
def __init__(self, manager: HandlerManager[I, O] | None = None) -> None:
10556
super().__init__()
106-
self.__handlers = {}
57+
self.__manager = manager or SingleHandlerManager()
10758

10859
async def dispatch(self, input_value: I, /) -> O:
109-
await self._trigger_listeners(input_value)
110-
111-
for input_type in getmro(type(input_value)):
112-
if handler_factory := self.__handlers.get(input_type):
113-
break
60+
async with anyio.create_task_group() as task_group:
61+
self._trigger_listeners(input_value, task_group)
11462

115-
else:
116-
return NotImplemented
63+
for handler in self.__manager.handlers_from(type(input_value)):
64+
return await self._invoke_with_middlewares(handler, input_value)
11765

118-
handler = self._make_handle_function(handler_factory)
119-
return await self._invoke_with_middlewares(handler, input_value)
66+
return NotImplemented
12067

12168
def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Self:
122-
if input_type in self.__handlers:
123-
raise RuntimeError(
124-
f"A handler is already registered for the input type: `{input_type}`."
125-
)
126-
127-
self.__handlers[input_type] = factory
69+
self.__manager.subscribe(input_type, factory)
12870
return self
12971

13072

13173
class TaskBus[I](BaseBus[I, None]):
132-
__slots__ = ("__handlers",)
74+
__slots__ = ("__manager",)
13375

134-
__handlers: dict[type[I], list[HandlerFactory[[I], None]]]
76+
__manager: HandlerManager[I, None]
13577

136-
def __init__(self) -> None:
78+
def __init__(self, manager: HandlerManager[I, None] | None = None) -> None:
13779
super().__init__()
138-
self.__handlers = defaultdict(list)
80+
self.__manager = manager or MultipleHandlerManager()
13981

14082
async def dispatch(self, input_value: I, /) -> None:
141-
await self._trigger_listeners(input_value)
142-
143-
for input_type in getmro(type(input_value)):
144-
if handler_factories := self.__handlers.get(input_type):
145-
break
146-
147-
else:
148-
return
149-
15083
async with anyio.create_task_group() as task_group:
151-
for handler_factory in handler_factories:
152-
handler = self._make_handle_function(handler_factory)
84+
self._trigger_listeners(input_value, task_group)
85+
for handler in self.__manager.handlers_from(type(input_value)):
15386
task_group.start_soon(
15487
self._invoke_with_middlewares,
15588
handler,
@@ -161,5 +94,5 @@ def subscribe(
16194
input_type: type[I],
16295
factory: HandlerFactory[[I], None],
16396
) -> Self:
164-
self.__handlers[input_type].append(factory)
97+
self.__manager.subscribe(input_type, factory)
16598
return self

cq/_core/handler.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
from abc import abstractmethod
2+
from collections import defaultdict
3+
from collections.abc import Awaitable, Callable, Iterator
4+
from dataclasses import dataclass, field
5+
from functools import partial
6+
from inspect import getmro, isclass
7+
from typing import Any, Protocol, Self, runtime_checkable
8+
9+
import injection
10+
11+
type HandlerType[**P, T] = type[Handler[P, T]]
12+
type HandlerFactory[**P, T] = Callable[..., Awaitable[Handler[P, T]]]
13+
14+
15+
@runtime_checkable
16+
class Handler[**P, T](Protocol):
17+
__slots__ = ()
18+
19+
@abstractmethod
20+
async def handle(self, *args: P.args, **kwargs: P.kwargs) -> T:
21+
raise NotImplementedError
22+
23+
24+
@runtime_checkable
25+
class HandlerManager[I, O](Protocol):
26+
__slots__ = ()
27+
28+
@abstractmethod
29+
def handlers_from(
30+
self,
31+
input_type: type[I],
32+
) -> Iterator[Callable[[I], Awaitable[O]]]:
33+
raise NotImplementedError
34+
35+
@abstractmethod
36+
def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Self:
37+
raise NotImplementedError
38+
39+
40+
@dataclass(eq=False, frozen=True, slots=True)
41+
class MultipleHandlerManager[I, O](HandlerManager[I, O]):
42+
__factories: dict[type[I], list[HandlerFactory[[I], O]]] = field(
43+
default_factory=partial(defaultdict, list),
44+
init=False,
45+
)
46+
47+
def handlers_from(
48+
self,
49+
input_type: type[I],
50+
) -> Iterator[Callable[[I], Awaitable[O]]]:
51+
for it in getmro(input_type):
52+
for factory in self.__factories.get(it, ()):
53+
yield _make_handle_function(factory)
54+
55+
def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Self:
56+
self.__factories[input_type].append(factory)
57+
return self
58+
59+
60+
@dataclass(eq=False, frozen=True, slots=True)
61+
class SingleHandlerManager[I, O](HandlerManager[I, O]):
62+
__factories: dict[type[I], HandlerFactory[[I], O]] = field(
63+
default_factory=dict,
64+
init=False,
65+
)
66+
67+
def handlers_from(
68+
self,
69+
input_type: type[I],
70+
) -> Iterator[Callable[[I], Awaitable[O]]]:
71+
for it in getmro(input_type):
72+
factory = self.__factories.get(it, None)
73+
if factory is not None:
74+
yield _make_handle_function(factory)
75+
76+
def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Self:
77+
if input_type in self.__factories:
78+
raise RuntimeError(
79+
f"A handler is already registered for the input type: `{input_type}`."
80+
)
81+
82+
self.__factories[input_type] = factory
83+
return self
84+
85+
86+
@dataclass(eq=False, frozen=True, slots=True)
87+
class HandlerDecorator[I, O]:
88+
manager: HandlerManager[I, O]
89+
injection_module: injection.Module = field(default_factory=injection.mod)
90+
91+
def __call__(self, input_type: type[I], /) -> Any:
92+
def decorator(wrapped: type[Handler[[I], O]]) -> type[Handler[[I], O]]:
93+
if not isclass(wrapped) or not issubclass(wrapped, Handler):
94+
raise TypeError(f"`{wrapped}` isn't a valid handler.")
95+
96+
factory = self.injection_module.make_async_factory(wrapped)
97+
self.manager.subscribe(input_type, factory)
98+
return wrapped
99+
100+
return decorator
101+
102+
103+
def _make_handle_function[I, O](
104+
handler_factory: HandlerFactory[[I], O],
105+
) -> Callable[[I], Awaitable[O]]:
106+
return partial(__handle, factory=handler_factory)
107+
108+
109+
async def __handle[I, O](input_value: I, factory: HandlerFactory[[I], O]) -> O:
110+
handler = await factory()
111+
return await handler.handle(input_value)

cq/_core/message.py

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,13 @@
33

44
import injection
55

6-
from cq._core.dispatcher.bus import Bus, SimpleBus, SubscriberDecorator, TaskBus
6+
from cq._core.dispatcher.bus import Bus, SimpleBus, TaskBus
77
from cq._core.dto import DTO
8+
from cq._core.handler import (
9+
HandlerDecorator,
10+
MultipleHandlerManager,
11+
SingleHandlerManager,
12+
)
813
from cq._core.scope import CQScope
914
from cq.middlewares.scope import InjectionScopeMiddleware
1015

@@ -32,42 +37,42 @@ class Query(Message, ABC):
3237
AnyCommandBus = CommandBus[Any]
3338

3439

35-
command_handler: SubscriberDecorator[Command, Any] = SubscriberDecorator(CommandBus)
36-
event_handler: SubscriberDecorator[Event, None] = SubscriberDecorator(EventBus)
37-
query_handler: SubscriberDecorator[Query, Any] = SubscriberDecorator(QueryBus)
38-
39-
40-
@injection.inject
41-
def get_command_bus[T](bus: CommandBus[T] = NotImplemented, /) -> CommandBus[T]:
42-
return bus
40+
command_handler: HandlerDecorator[Command, Any] = HandlerDecorator(
41+
SingleHandlerManager(),
42+
)
43+
event_handler: HandlerDecorator[Event, None] = HandlerDecorator(
44+
MultipleHandlerManager(),
45+
)
46+
query_handler: HandlerDecorator[Query, Any] = HandlerDecorator(
47+
SingleHandlerManager(),
48+
)
4349

4450

51+
@injection.singleton(
52+
on=CommandBus,
53+
ignore_type_hint=True, # type: ignore[call-arg]
54+
inject=False,
55+
mode="fallback",
56+
)
4557
def new_command_bus[T]() -> CommandBus[T]:
46-
bus: CommandBus[T] = SimpleBus()
47-
bus.add_middlewares(
58+
return SimpleBus(command_handler.manager).add_middlewares(
4859
InjectionScopeMiddleware(CQScope.ON_COMMAND),
4960
)
50-
return bus
51-
52-
53-
@injection.inject
54-
def get_event_bus(bus: EventBus = NotImplemented, /) -> EventBus:
55-
return bus
5661

5762

63+
@injection.singleton(
64+
inject=False,
65+
mode="fallback",
66+
)
5867
def new_event_bus() -> EventBus:
59-
return TaskBus()
60-
61-
62-
@injection.inject
63-
def get_query_bus[T](bus: QueryBus[T] = NotImplemented, /) -> QueryBus[T]:
64-
return bus
68+
return TaskBus(event_handler.manager)
6569

6670

71+
@injection.singleton(
72+
on=QueryBus,
73+
ignore_type_hint=True, # type: ignore[call-arg]
74+
inject=False,
75+
mode="fallback",
76+
)
6777
def new_query_bus[T]() -> QueryBus[T]:
68-
return SimpleBus()
69-
70-
71-
injection.set_constant(new_command_bus(), CommandBus, alias=True)
72-
injection.set_constant(new_event_bus(), EventBus, alias=True)
73-
injection.set_constant(new_query_bus(), QueryBus, alias=True)
78+
return SimpleBus(query_handler.manager)

0 commit comments

Comments
 (0)