Skip to content

Commit 0148752

Browse files
authored
refactoring: ♻️ improve naming consistency by renaming DeferredBus → DeferredDispatcher
1 parent 4b41a52 commit 0148752

File tree

6 files changed

+60
-35
lines changed

6 files changed

+60
-35
lines changed

cq/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from ._core.defer import DeferredBus
1+
from ._core.dispatcher.base import DeferredDispatcher, Dispatcher
22
from ._core.dispatcher.bus import Bus
33
from ._core.dispatcher.pipe import Pipe
44
from ._core.message import (
@@ -26,7 +26,8 @@
2626
"CQScope",
2727
"Command",
2828
"CommandBus",
29-
"DeferredBus",
29+
"DeferredDispatcher",
30+
"Dispatcher",
3031
"Event",
3132
"EventBus",
3233
"Middleware",

cq/_core/defer.py

Lines changed: 0 additions & 10 deletions
This file was deleted.

cq/_core/dispatcher/base.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,13 @@ class Dispatcher[I, O](Protocol):
1313
async def dispatch(self, input_value: I, /) -> O:
1414
raise NotImplementedError
1515

16+
17+
@runtime_checkable
18+
class DeferredDispatcher[I](Protocol):
19+
__slots__ = ()
20+
1621
@abstractmethod
17-
def add_middlewares(self, *middlewares: Middleware[[I], O]) -> Self:
22+
async def defer(self, input_value: I, /) -> None:
1823
raise NotImplementedError
1924

2025

cq/_core/dispatcher/bus.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
MultipleHandlerManager,
1313
SingleHandlerManager,
1414
)
15+
from cq._core.middleware import Middleware
1516

1617
type Listener[T] = Callable[[T], Awaitable[Any]]
1718

@@ -21,11 +22,15 @@ class Bus[I, O](Dispatcher[I, O], Protocol):
2122
__slots__ = ()
2223

2324
@abstractmethod
24-
def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Self:
25+
def add_listeners(self, *listeners: Listener[I]) -> Self:
2526
raise NotImplementedError
2627

2728
@abstractmethod
28-
def add_listeners(self, *listeners: Listener[I]) -> Self:
29+
def add_middlewares(self, *middlewares: Middleware[[I], O]) -> Self:
30+
raise NotImplementedError
31+
32+
@abstractmethod
33+
def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Self:
2934
raise NotImplementedError
3035

3136

cq/ext/fastapi.py

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,73 @@
11
from dataclasses import dataclass
2-
from typing import Annotated, Any
2+
from typing import TYPE_CHECKING, Annotated, Any
33

44
from fastapi import BackgroundTasks, Depends
55
from injection.ext.fastapi import Inject
66

7-
from cq import Bus, Command, CommandBus, DeferredBus, Event, EventBus, Query, QueryBus
7+
from cq import (
8+
Command,
9+
CommandBus,
10+
DeferredDispatcher,
11+
Dispatcher,
12+
Event,
13+
EventBus,
14+
Query,
15+
QueryBus,
16+
)
817

9-
__all__ = ("DeferredCommandBus", "DeferredEventBus", "DeferredQueryBus")
18+
__all__ = (
19+
"DeferredCommandBus",
20+
"DeferredEventBus",
21+
"DeferredQueryBus",
22+
"FastAPIDeferredDispatcher",
23+
)
1024

1125

1226
@dataclass(repr=False, eq=False, frozen=True, slots=True)
13-
class FastAPIDeferredBus[I](DeferredBus[I]):
27+
class FastAPIDeferredDispatcher[I](DeferredDispatcher[I]):
1428
background_tasks: BackgroundTasks
15-
bus: Bus[I, Any]
29+
dispatcher: Dispatcher[I, Any]
1630

1731
async def defer(self, input_value: I, /) -> None:
18-
self.background_tasks.add_task(self.bus.dispatch, input_value)
32+
self.background_tasks.add_task(self.dispatcher.dispatch, input_value)
1933

2034

2135
async def new_deferred_command_bus[T](
2236
background_tasks: BackgroundTasks,
2337
command_bus: Inject[CommandBus[T]],
24-
) -> DeferredBus[Command]:
25-
return FastAPIDeferredBus(background_tasks, command_bus)
38+
) -> DeferredDispatcher[Command]:
39+
return FastAPIDeferredDispatcher(background_tasks, command_bus)
2640

2741

2842
async def new_deferred_event_bus(
2943
background_tasks: BackgroundTasks,
3044
event_bus: Inject[EventBus],
31-
) -> DeferredBus[Event]:
32-
return FastAPIDeferredBus(background_tasks, event_bus)
45+
) -> DeferredDispatcher[Event]:
46+
return FastAPIDeferredDispatcher(background_tasks, event_bus)
3347

3448

3549
async def new_deferred_query_bus[T](
3650
background_tasks: BackgroundTasks,
3751
query_bus: Inject[QueryBus[T]],
38-
) -> DeferredBus[Query]:
39-
return FastAPIDeferredBus(background_tasks, query_bus)
52+
) -> DeferredDispatcher[Query]:
53+
return FastAPIDeferredDispatcher(background_tasks, query_bus)
4054

4155

42-
DeferredCommandBus = Annotated[DeferredBus[Command], Depends(new_deferred_command_bus)]
43-
DeferredEventBus = Annotated[DeferredBus[Event], Depends(new_deferred_event_bus)]
44-
DeferredQueryBus = Annotated[DeferredBus[Query], Depends(new_deferred_query_bus)]
56+
if TYPE_CHECKING:
57+
type DeferredCommandBus = DeferredDispatcher[Command]
58+
type DeferredEventBus = DeferredDispatcher[Event]
59+
type DeferredQueryBus = DeferredDispatcher[Query]
60+
61+
else:
62+
DeferredCommandBus = Annotated[
63+
DeferredDispatcher[Command],
64+
Depends(new_deferred_command_bus, use_cache=False),
65+
]
66+
DeferredEventBus = Annotated[
67+
DeferredDispatcher[Event],
68+
Depends(new_deferred_event_bus, use_cache=False),
69+
]
70+
DeferredQueryBus = Annotated[
71+
DeferredDispatcher[Query],
72+
Depends(new_deferred_query_bus, use_cache=False),
73+
]

cq/ext/fastapi.pyi

Lines changed: 0 additions & 5 deletions
This file was deleted.

0 commit comments

Comments
 (0)