Skip to content

Commit affa3c3

Browse files
authored
feat: ✨ Introduce ContextPipeline
1 parent 0148752 commit affa3c3

File tree

7 files changed

+518
-227
lines changed

7 files changed

+518
-227
lines changed

cq/__init__.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from ._core.dispatcher.base import DeferredDispatcher, Dispatcher
22
from ._core.dispatcher.bus import Bus
3-
from ._core.dispatcher.pipe import Pipe
3+
from ._core.dispatcher.lazy import LazyDispatcher
4+
from ._core.dispatcher.pipe import ContextPipeline, Pipe
45
from ._core.message import (
56
AnyCommandBus,
67
Command,
@@ -17,6 +18,7 @@
1718
query_handler,
1819
)
1920
from ._core.middleware import Middleware, MiddlewareResult
21+
from ._core.pipetools import ContextCommandPipeline
2022
from ._core.related_events import RelatedEvents
2123
from ._core.scope import CQScope
2224

@@ -26,10 +28,13 @@
2628
"CQScope",
2729
"Command",
2830
"CommandBus",
31+
"ContextCommandPipeline",
32+
"ContextPipeline",
2933
"DeferredDispatcher",
3034
"Dispatcher",
3135
"Event",
3236
"EventBus",
37+
"LazyDispatcher",
3338
"Middleware",
3439
"MiddlewareResult",
3540
"Pipe",

cq/_core/dispatcher/lazy.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from collections.abc import Awaitable
2+
from types import GenericAlias
3+
from typing import TypeAliasType
4+
5+
import injection
6+
7+
from cq._core.dispatcher.base import Dispatcher
8+
9+
10+
class LazyDispatcher[I, O](Dispatcher[I, O]):
11+
__slots__ = ("__value",)
12+
13+
__value: Awaitable[Dispatcher[I, O]]
14+
15+
def __init__(
16+
self,
17+
dispatcher_type: type[Dispatcher[I, O]] | TypeAliasType | GenericAlias,
18+
/,
19+
*,
20+
injection_module: injection.Module | None = None,
21+
threadsafe: bool | None = None,
22+
) -> None:
23+
module = injection_module or injection.mod()
24+
self.__value = module.aget_lazy_instance(dispatcher_type, threadsafe=threadsafe)
25+
26+
async def dispatch(self, input_value: I, /) -> O:
27+
dispatcher = await self.__value
28+
return await dispatcher.dispatch(input_value)

cq/_core/dispatcher/pipe.py

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,22 @@
1+
from collections import deque
12
from collections.abc import Awaitable, Callable
23
from dataclasses import dataclass, field
3-
from typing import Any, Self, overload
4+
from typing import Any, Protocol, Self, overload
45

56
from cq._core.dispatcher.base import BaseDispatcher, Dispatcher
7+
from cq._core.middleware import Middleware
68

79
type PipeConverter[I, O] = Callable[[O], Awaitable[I]]
810

911

12+
class PipeConverterMethod[I, O](Protocol):
13+
def __get__(
14+
self,
15+
instance: object,
16+
owner: type | None = ...,
17+
) -> PipeConverter[I, O]: ...
18+
19+
1020
@dataclass(repr=False, eq=False, frozen=True, slots=True)
1121
class PipeStep[I, O]:
1222
converter: PipeConverter[I, O]
@@ -77,6 +87,110 @@ async def __execute(self, input_value: I) -> O:
7787
for step in self.__steps:
7888
output_value = await dispatcher.dispatch(input_value)
7989
input_value = await step.converter(output_value)
90+
91+
if input_value is None:
92+
return NotImplemented
93+
8094
dispatcher = step.dispatcher or self.__dispatcher
8195

8296
return await dispatcher.dispatch(input_value)
97+
98+
99+
@dataclass(repr=False, eq=False, frozen=True, slots=True)
100+
class ContextPipelineStep[I, O]:
101+
converter: PipeConverterMethod[I, O]
102+
dispatcher: Dispatcher[I, Any] | None = field(default=None)
103+
104+
105+
class ContextPipeline[I]:
106+
__slots__ = ("__dispatcher", "__middlewares", "__steps")
107+
108+
__dispatcher: Dispatcher[Any, Any]
109+
__middlewares: deque[Middleware[Any, Any]]
110+
__steps: list[ContextPipelineStep[Any, Any]]
111+
112+
def __init__(self, dispatcher: Dispatcher[Any, Any]) -> None:
113+
self.__dispatcher = dispatcher
114+
self.__middlewares = deque()
115+
self.__steps = []
116+
117+
@overload
118+
def __get__[O](
119+
self,
120+
instance: O,
121+
owner: type[O] | None = ...,
122+
) -> Dispatcher[I, O]: ...
123+
124+
@overload
125+
def __get__(self, instance: None = ..., owner: type | None = ...) -> Self: ...
126+
127+
def __get__[O](
128+
self,
129+
instance: O | None = None,
130+
owner: type[O] | None = None,
131+
) -> Self | Dispatcher[I, O]:
132+
if instance is None:
133+
return self
134+
135+
pipeline = self.__new_pipeline(instance, owner)
136+
return BoundContextPipeline(instance, pipeline)
137+
138+
def add_middlewares(self, *middlewares: Middleware[[I], Any]) -> Self:
139+
self.__middlewares.extendleft(reversed(middlewares))
140+
return self
141+
142+
@overload
143+
def step[T](
144+
self,
145+
wrapped: PipeConverterMethod[T, Any],
146+
/,
147+
*,
148+
dispatcher: Dispatcher[T, Any] | None = ...,
149+
) -> PipeConverterMethod[T, Any]: ...
150+
151+
@overload
152+
def step[T](
153+
self,
154+
wrapped: None = ...,
155+
/,
156+
*,
157+
dispatcher: Dispatcher[T, Any] | None = ...,
158+
) -> Callable[[PipeConverterMethod[T, Any]], PipeConverterMethod[T, Any]]: ...
159+
160+
def step[T](
161+
self,
162+
wrapped: PipeConverterMethod[T, Any] | None = None,
163+
/,
164+
*,
165+
dispatcher: Dispatcher[T, Any] | None = None,
166+
) -> Any:
167+
def decorator(wp: PipeConverterMethod[T, Any]) -> PipeConverterMethod[T, Any]:
168+
step = ContextPipelineStep(wp, dispatcher)
169+
self.__steps.append(step)
170+
return wp
171+
172+
return decorator(wrapped) if wrapped else decorator
173+
174+
def __new_pipeline[T](
175+
self,
176+
context: T,
177+
context_type: type[T] | None,
178+
) -> Pipe[I, Any]:
179+
pipeline: Pipe[I, Any] = Pipe(self.__dispatcher)
180+
pipeline.add_middlewares(*self.__middlewares)
181+
182+
for step in self.__steps:
183+
converter = step.converter.__get__(context, context_type)
184+
pipeline.step(converter, dispatcher=step.dispatcher)
185+
186+
return pipeline
187+
188+
189+
@dataclass(repr=False, eq=False, frozen=True, slots=True)
190+
class BoundContextPipeline[I, O](Dispatcher[I, O]):
191+
context: O
192+
pipeline: Pipe[I, Any]
193+
194+
async def dispatch(self, input_value: I, /) -> O:
195+
await self.pipeline.dispatch(input_value)
196+
return self.context

cq/_core/pipetools.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import injection
2+
3+
from cq._core.dispatcher.lazy import LazyDispatcher
4+
from cq._core.dispatcher.pipe import ContextPipeline
5+
from cq._core.message import AnyCommandBus, Command
6+
from cq._core.scope import CQScope
7+
from cq.middlewares.scope import InjectionScopeMiddleware
8+
9+
10+
class ContextCommandPipeline[I: Command](ContextPipeline[I]):
11+
__slots__ = ()
12+
13+
def __init__(
14+
self,
15+
/,
16+
*,
17+
injection_module: injection.Module | None = None,
18+
threadsafe: bool | None = None,
19+
) -> None:
20+
dispatcher = LazyDispatcher(
21+
AnyCommandBus,
22+
injection_module=injection_module,
23+
threadsafe=threadsafe,
24+
)
25+
super().__init__(dispatcher)
26+
transaction_scope_middleware = InjectionScopeMiddleware(
27+
CQScope.TRANSACTION,
28+
exist_ok=True,
29+
threadsafe=threadsafe,
30+
)
31+
self.add_middlewares(transaction_scope_middleware)

documentation/pipeline.md

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
Pipelines are designed to execute several commands one after the other, while encapsulating them in middleware.
44

5+
## Pipe
6+
7+
A simple pipeline implementation.
8+
59
Example:
610

711
```python
@@ -12,14 +16,54 @@ async def pipeline_example(command_bus: AnyCommandBus) -> None:
1216
pipeline.add_middlewares(...) # You can add middleware to encapsulate the pipeline in a transaction, for example.
1317

1418
@pipeline.step
15-
async def converter_1(output_value: FirstResult) -> SecondCommand:
19+
async def _(result: FirstResult) -> SecondCommand:
1620
""" Transform the return value into a new command. """
1721

1822
@pipeline.step
19-
async def converter_2(output_value: SecondResult) -> ThirdCommand:
23+
async def _(result: SecondResult) -> ThirdCommand:
2024
""" Transform the return value into a new command. """
2125

2226
command = FirstCommand(...)
2327
output_value = await pipeline.dispatch(command)
2428
# ...
2529
```
30+
31+
## ContextPipeline
32+
33+
A limitation of `Pipe` is that it isn't possible to have values that go through the steps. Each converter in a `Pipe`
34+
receives only the output of the previous one, but the intermediate state, any contextual information, is lost between
35+
steps.
36+
37+
This makes it impossible to accumulate data or share context between converters without resorting to global variables.
38+
39+
To solve this limitation, `ContextPipeline` introduces a contextual layer: each stage operates within the same instance,
40+
allowing stateful processing and side effects to persist across the entire pipeline execution.
41+
42+
Example:
43+
44+
```python
45+
from cq import ContextCommandPipeline, ContextPipeline
46+
47+
class ContextExample:
48+
user_id: int
49+
50+
pipeline: ContextPipeline[FirstCommand] = ContextCommandPipeline()
51+
52+
@pipeline.step
53+
async def _(self, result: FirstResult) -> SecondCommand:
54+
self.user_id = result.user_id # set user_id in context
55+
return SecondCommand(...)
56+
57+
@pipeline.step
58+
async def _(self, result: SecondResult) -> ThirdCommand:
59+
""" Transform the return value into a new command. """
60+
61+
@pipeline.step
62+
async def _(self, result: ThirdResult) -> None:
63+
""" The last step is optional, but if you need it, you must return `None`. """
64+
65+
async def how_to_dispatch() -> None:
66+
command = FirstCommand(...)
67+
context = await ContextExample().pipeline.dispatch(command)
68+
# ...
69+
```
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
from cq import ContextCommandPipeline, ContextPipeline, command_handler
2+
from tests.helpers.history import HistoryMiddleware
3+
4+
5+
class TestContextCommandPipeline:
6+
async def test_dispatch_with_success_return_any(
7+
self,
8+
history: HistoryMiddleware,
9+
) -> None:
10+
class Command1: ...
11+
12+
class Command2: ...
13+
14+
class Command3: ...
15+
16+
class Foo: ...
17+
18+
class Bar: ...
19+
20+
class Baz: ...
21+
22+
@command_handler
23+
class CommandHandler1:
24+
async def handle(self, command: Command1) -> Foo:
25+
return Foo()
26+
27+
@command_handler
28+
class CommandHandler2:
29+
async def handle(self, command: Command2) -> Bar:
30+
return Bar()
31+
32+
@command_handler
33+
class CommandHandler3:
34+
async def handle(self, command: Command3) -> Baz:
35+
return Baz()
36+
37+
class Context:
38+
foo: Foo
39+
bar: Bar
40+
baz: Baz
41+
42+
pipeline: ContextPipeline[Command1] = ContextCommandPipeline()
43+
44+
@pipeline.step
45+
async def _(self, foo: Foo) -> Command2:
46+
self.foo = foo
47+
return Command2()
48+
49+
@pipeline.step
50+
async def _(self, bar: Bar) -> Command3:
51+
self.bar = bar
52+
return Command3()
53+
54+
@pipeline.step
55+
async def _(self, baz: Baz) -> None:
56+
self.baz = baz
57+
58+
cmd = Command1()
59+
ctx = await Context().pipeline.dispatch(cmd)
60+
61+
assert isinstance(ctx, Context)
62+
assert isinstance(ctx.foo, Foo)
63+
assert isinstance(ctx.bar, Bar)
64+
assert isinstance(ctx.baz, Baz)
65+
assert len(history.records) == 3

0 commit comments

Comments
 (0)