Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:

steps:
- name: Run checkout
uses: actions/checkout@v5
uses: actions/checkout@v6

- name: Set up environment
uses: ./.github/actions/environment
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:

steps:
- name: Run checkout
uses: actions/checkout@v5
uses: actions/checkout@v6

- name: Set up environment
uses: ./.github/actions/environment
Expand Down
29 changes: 29 additions & 0 deletions .github/workflows/pages.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: GitHub Pages

on:
push:
branches:
- prod

jobs:
delivery:
name: Delivery
runs-on: ubuntu-latest
permissions:
contents: write

steps:
- name: Run checkout
uses: actions/checkout@v6

- name: Configure Git
run: |
git config --global user.name "github-pages[bot]"
git config --global user.email "github-pages[bot]@users.noreply.github.com"

- name: Set up environment
uses: ./.github/actions/environment

- name: Deploy MkDocs
shell: bash
run: uv run mkdocs gh-deploy --force
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@ mypy:

pytest:
uv run pytest

mkdocs:
uv run mkdocs serve
13 changes: 2 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,13 @@
[![PyPI - Downloads](https://img.shields.io/pypi/dm/python-cq.svg?color=blue)](https://pypistats.org/packages/python-cq)
[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff)

Lightweight library for separating Python code according to **Command and Query Responsibility Segregation** principles.
Documentation: https://python-cq.remimd.dev

Dependency injection is handled by [python-injection](https://github.com/100nm/python-injection).

Easy to use with [FastAPI](https://github.com/fastapi/fastapi).
Python package designed to organize your code following CQRS principles. It builds on top of [python-injection](https://github.com/100nm/python-injection) for dependency injection.

## Installation

⚠️ _Requires Python 3.12 or higher_

```bash
pip install python-cq
```

## Resources

* [**Writing Application Layer**](https://github.com/100nm/python-cq/tree/prod/documentation/writing-application-layer.md)
* [**Pipeline**](https://github.com/100nm/python-cq/tree/prod/documentation/pipeline.md)
* [**FastAPI Example**](https://github.com/100nm/python-cq/tree/prod/documentation/fastapi-example.md)
18 changes: 11 additions & 7 deletions cq/_core/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Sel
return self


class _Decorator(Protocol):
def __call__[T](self, wrapped: T, /) -> T: ...


@dataclass(repr=False, eq=False, frozen=True, slots=True)
class HandlerDecorator[I, O]:
registry: HandlerRegistry[I, O]
Expand All @@ -104,16 +108,16 @@ def __call__(
/,
*,
threadsafe: bool | None = ...,
) -> Callable[[HandlerType[[I], O]], HandlerType[[I], O]]: ...
) -> _Decorator: ...

@overload
def __call__(
def __call__[T](
self,
input_or_handler_type: HandlerType[[I], O],
input_or_handler_type: T,
/,
*,
threadsafe: bool | None = ...,
) -> HandlerType[[I], O]: ...
) -> T: ...

@overload
def __call__(
Expand All @@ -122,11 +126,11 @@ def __call__(
/,
*,
threadsafe: bool | None = ...,
) -> Callable[[HandlerType[[I], O]], HandlerType[[I], O]]: ...
) -> _Decorator: ...

def __call__(
def __call__[T](
self,
input_or_handler_type: type[I] | HandlerType[[I], O] | None = None,
input_or_handler_type: type[I] | T | None = None,
/,
*,
threadsafe: bool | None = None,
Expand Down
6 changes: 3 additions & 3 deletions cq/middlewares/scope.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ class InjectionScopeMiddleware:

async def __call__(self, *args: Any, **kwargs: Any) -> MiddlewareResult[Any]:
async with AsyncExitStack() as stack:
cm = adefine_scope(self.scope_name, threadsafe=self.threadsafe)
try:
await stack.enter_async_context(cm)
await stack.enter_async_context(
adefine_scope(self.scope_name, threadsafe=self.threadsafe)
)

except ScopeAlreadyDefinedError:
if not self.exist_ok:
raise

del cm
yield
1 change: 1 addition & 0 deletions docs/CNAME
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
python-cq.remimd.dev
71 changes: 71 additions & 0 deletions docs/guides/configuring.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Configuring a Bus

Each bus can be customized with listeners and middlewares. To do so, create a factory function decorated with `@injectable` that returns the configured bus.
```python
from cq import CommandBus, MiddlewareResult, new_command_bus
from injection import injectable

async def listener(message: MessageType):
...

async def middleware(message: MessageType) -> MiddlewareResult[ReturnType]:
# do something before the handler is executed
return_value = yield
# do something after the handler is executed

@injectable
def command_bus_factory() -> CommandBus:
bus = new_command_bus()
bus.add_listeners(listener)
bus.add_middlewares(middleware)
return bus
```

The same pattern applies to `QueryBus` and `EventBus` using `new_query_bus()` and `new_event_bus()`.

## Listeners

Listeners are executed before the handler(s). They receive the message and can perform side effects such as logging or validation.
```python
async def log_listener(message: MessageType):
print(f"Received: {message}")
```

## Middlewares

Middlewares wrap around handler execution, allowing you to run logic before and after a handler processes a message.
```python
async def timing_middleware(message: Any) -> MiddlewareResult[Any]:
start = time.time()
yield
print(f"Execution time: {time.time() - start}s")
```

For commands and queries, middlewares run once around the single handler. For events, middlewares run around each handler individually.

!!! note
The generator was chosen to keep both the input message and the return value read-only.

## Class-based listeners and middlewares

For more flexibility, listeners and middlewares can be defined as classes with a `__call__` method. This allows you to inject dependencies and configure their behavior.
```python
from cq import MiddlewareResult
from dataclasses import dataclass

@dataclass
class LogListener:
logger: Logger

async def __call__(self, message: Any):
self.logger.info(f"Received: {message}")

@dataclass
class TimingMiddleware:
metrics: MetricsService

async def __call__(self, message: Any) -> MiddlewareResult[Any]:
start = time.time()
yield
self.metrics.record(time.time() - start)
```
52 changes: 52 additions & 0 deletions docs/guides/dispatching.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Dispatching messages

To dispatch messages to their handlers, **python-cq** provides three bus classes: `CommandBus`, `QueryBus`, and `EventBus`.

Each bus can take a generic parameter to specify the return type of the `dispatch` method.

## Retrieving a bus

Bus instances are available through [python-injection](https://github.com/100nm/python-injection)'s dependency injection:

```python
from cq import CommandBus
from injection import inject

@inject
async def create_user(bus: CommandBus[None]):
command = CreateUserCommand(name="John", email="[email protected]")
await bus.dispatch(command)
```

## CommandBus

Use the CommandBus to dispatch commands. It returns the value produced by the handler.
```python
from cq import CommandBus

bus: CommandBus[None]
command = CreateUserCommand(name="John", email="[email protected]")
await bus.dispatch(command)
```

## QueryBus

Use the QueryBus to dispatch queries. It returns the value produced by the handler.
```python
from cq import QueryBus

bus: QueryBus[User]
query = GetUserByIdQuery(user_id)
user = await bus.dispatch(query)
```

## EventBus

Use the EventBus to dispatch events. Since events can be handled by multiple handlers (or none), it does not return a value.
```python
from cq import EventBus

bus: EventBus
event = UserCreatedEvent(user_id)
await bus.dispatch(event)
```
Loading