Skip to content

Implement AsyncAPI documentation #244

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ dev-dependencies:

update-dependencies:
uv lock --upgrade
uv sync --all-groups --frozen

migrate:
uv run alembic upgrade heads
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ and [SOLID principles](https://en.wikipedia.org/wiki/SOLID).

This template provides out of the box some commonly used functionalities:

* API Documentation using [FastAPI](https://fastapi.tiangolo.com/)
* Sync and Async API Documentation using [FastAPI](https://fastapi.tiangolo.com/) and [AsyncAPI](https://www.asyncapi.com/en)
* Async tasks execution using [Dramatiq](https://dramatiq.io/index.html)
* Repository pattern for databases using [SQLAlchemy](https://www.sqlalchemy.org/) and [SQLAlchemy bind manager](https://febus982.github.io/sqlalchemy-bind-manager/stable/)
* Database migrations using [Alembic](https://alembic.sqlalchemy.org/en/latest/) (configured supporting both sync and async SQLAlchemy engines)
Expand Down
4 changes: 4 additions & 0 deletions docs/api-documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
API documentation is rendered by [FastAPI](https://fastapi.tiangolo.com/features/)
on `/docs` and `/redoc` paths using OpenAPI format.

AsyncAPI documentation is rendered using the
[AsyncAPI react components](https://github.com/asyncapi/asyncapi-react).
It is available on `/docs/ws` path.

## API versioning

Versioning an API at resource level provides a much more
Expand Down
6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
authors = [
{name = "Federico Busetti", email = "[email protected]"},
]
requires-python = "<3.14,>=3.9"
requires-python = "<3.14,>=3.10"
name = "bootstrap-fastapi-service"
version = "0.1.0"
description = ""
Expand All @@ -14,7 +14,7 @@ dependencies = [
"cloudevents-pydantic<1.0.0,>=0.0.3",
"dependency-injector[pydantic]<5.0.0,>=4.41.0",
"dramatiq[redis,watch]<2.0.0,>=1.17.1",
"hiredis<4.0.0,>=3.1.0", # Recommended by dramatiq
"hiredis<4.0.0,>=3.1.0", # Recommended by dramatiq
"httpx>=0.23.0",
"opentelemetry-distro[otlp]",
"opentelemetry-instrumentation",
Expand All @@ -28,6 +28,7 @@ dependencies = [
"SQLAlchemy[asyncio,mypy]<3.0.0,>=2.0.0",
"sqlalchemy-bind-manager",
"structlog<25.1.1,>=25.1.0",
"pydantic-asyncapi>=0.2.1",
]

[dependency-groups]
Expand Down Expand Up @@ -115,6 +116,7 @@ testpaths = [

[tool.ruff]
target-version = "py39"
line-length = 120
extend-exclude = [
"docs",
]
Expand Down
223 changes: 223 additions & 0 deletions src/common/asyncapi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
from typing import List, Literal, Optional, Type

import pydantic_asyncapi.v3 as pa
from pydantic import BaseModel

_info: pa.Info = pa.Info(
title="AsyncAPI",
version="1.0.0",
)


_servers = {} # type: ignore
_channels = {} # type: ignore
_operations = {} # type: ignore
_components_schemas = {} # type: ignore


def get_schema() -> pa.AsyncAPI:
"""
Function `get_schema` provides the complete AsyncAPI schema for the application, complying with
version 3.0.0 of the AsyncAPI specification. It includes detailed information about info metadata,
components, servers, channels, and operations required to set up and describe the asynchronous
communication layer.

Returns:
pa.AsyncAPI: A fully constructed AsyncAPI schema object based on predefined configurations.
"""
return pa.AsyncAPI(
asyncapi="3.0.0",
info=_info,
components=pa.Components(
schemas=_components_schemas,
),
servers=_servers,
channels=_channels,
operations=_operations,
)


def init_asyncapi_info(
title: str,
version: str = "1.0.0",
) -> None:
"""
Initializes the AsyncAPI information object with the specified title and version.

This function creates and initializes an AsyncAPI Info object, which includes
mandatory fields such as title and version. The title represents the name of the
AsyncAPI document, and the version represents the version of the API.

Parameters:
title (str): The title of the AsyncAPI document.
version (str): The version of the AsyncAPI document. Defaults to "1.0.0".

Returns:
None
"""
# We can potentially add the other info supported by pa.Info
global _info
_info = pa.Info(
title=title,
version=version,
)


def register_server(
id: str,
host: str,
protocol: str,
pathname: Optional[str] = None,
) -> None:
"""
Registers a server with a unique identifier and its associated properties.
This function accepts information about the server such as its host,
protocol, and optionally its pathname, and stores it in the internal
server registry identified by the unique ID. The parameters must be
provided appropriately for proper registration. The server registry
ensures that server configurations can be retrieved and managed based
on the assigned identifier.

Args:
id: str
A unique identifier for the server being registered. It is used
as the key in the internal server registry.
host: str
The host address of the server. This may be an IP address or
a domain name.
protocol: str
Communication protocol used by the server, such as "http" or "https".
pathname: Optional[str]
The optional pathname of the server. If provided, it will be
associated with the registered server.

Returns:
None
This function does not return a value. It modifies the internal
server registry to include the provided server details.
"""
# TODO: Implement other server parameters
_servers[id] = pa.Server(
host=host,
protocol=protocol,
)
if pathname is not None:
_servers[id].pathname = pathname


def _create_base_channel(address: str, channel_id: str) -> pa.Channel:
"""Create a basic channel with minimum required parameters."""
return pa.Channel(
address=address,
servers=[],
messages={},
)


def _add_channel_metadata(channel: pa.Channel, description: Optional[str], title: Optional[str]) -> None:
"""Add optional metadata to the channel."""
if description is not None:
channel.description = description
if title is not None:
channel.title = title


def _add_server_reference(channel: pa.Channel, server_id: Optional[str]) -> None:
"""Add server reference to the channel if server exists."""
if server_id is not None and server_id in _servers:
channel.servers.append(pa.Reference(ref=f"#/servers/{server_id}")) # type: ignore


def register_channel(
address: str,
id: Optional[str] = None,
description: Optional[str] = None,
title: Optional[str] = None,
server_id: Optional[str] = None,
) -> None:
"""
Registers a communication channel with the specified parameters.

Args:
address (str): The address of the channel.
id (Optional[str]): Unique identifier for the channel. Defaults to None.
description (Optional[str]): Description of the channel. Defaults to None.
title (Optional[str]): Title to be associated with the channel. Defaults to None.
server_id (Optional[str]): Server identifier to link this channel to. Defaults to None.

Returns:
None
"""
channel_id = id or address
channel = _create_base_channel(address, channel_id)
_add_channel_metadata(channel, description, title)
_add_server_reference(channel, server_id)
_channels[channel_id] = channel


def _register_message_schema(message: Type[BaseModel], operation_type: Literal["receive", "send"]) -> None:
"""Register message schema in components schemas."""
message_json_schema = message.model_json_schema(
mode="validation" if operation_type == "receive" else "serialization",
ref_template="#/components/schemas/{model}",
)

_components_schemas[message.__name__] = message_json_schema

if message_json_schema.get("$defs"):
_components_schemas.update(message_json_schema["$defs"])


def _create_channel_message(channel_id: str, message: Type[BaseModel]) -> pa.Reference:
"""Create channel message and return reference to it."""
_channels[channel_id].messages[message.__name__] = pa.Message( # type: ignore
payload=pa.Reference(ref=f"#/components/schemas/{message.__name__}")
)
return pa.Reference(ref=f"#/channels/{channel_id}/messages/{message.__name__}")


def register_channel_operation(
channel_id: str,
operation_type: Literal["receive", "send"],
messages: List[Type[BaseModel]],
operation_name: Optional[str] = None,
) -> None:
"""
Registerm a channel operation with associated messages.

Args:
channel_id: Channel identifier
operation_type: Type of operation ("receive" or "send")
messages: List of message models
operation_name: Optional operation name

Raises:
ValueError: If channel_id doesn't exist
"""
if not _channels.get(channel_id):
raise ValueError(f"Channel {channel_id} does not exist.")

operation_message_refs = []

for message in messages:
_register_message_schema(message, operation_type)
message_ref = _create_channel_message(channel_id, message)
operation_message_refs.append(message_ref)

operation_id = operation_name or f"{channel_id}-{operation_type}"
_operations[operation_id] = pa.Operation(
action=operation_type,
channel=pa.Reference(ref=f"#/channels/{channel_id}"),
messages=operation_message_refs,
traits=[],
)

# TODO: Define operation traits
# if operation_name is not None:
# _operations[operation_name or f"{channel_id}-{operation_type}"].traits.append(
# pa.OperationTrait(
# title=operation_name,
# summary=f"{operation_name} operation summary",
# description=f"{operation_name} operation description",
# )
# )
2 changes: 2 additions & 0 deletions src/common/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from dependency_injector.providers import Object
from pydantic import BaseModel, ConfigDict

from .asyncapi import init_asyncapi_info
from .config import AppConfig
from .di_container import Container
from .dramatiq import init_dramatiq
Expand All @@ -27,6 +28,7 @@ def application_init(app_config: AppConfig) -> InitReference:
init_logger(app_config)
init_storage()
init_dramatiq(app_config)
init_asyncapi_info(app_config.APP_NAME)

return InitReference(
di_container=container,
Expand Down
4 changes: 1 addition & 3 deletions src/common/di_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,4 @@ def function(
bind=SQLAlchemyBindManager.provided.get_bind.call(),
model_class=BookModel,
)
BookEventGatewayInterface: Factory[BookEventGatewayInterface] = Factory(
NullEventGateway
)
BookEventGatewayInterface: Factory[BookEventGatewayInterface] = Factory(NullEventGateway)
4 changes: 1 addition & 3 deletions src/common/dramatiq.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ def decode(self, data: bytes) -> MessageData:
try:
return orjson.loads(data)
except orjson.JSONDecodeError as e:
raise DecodeError(
"failed to decode message %r" % (data,), data, e
) from None
raise DecodeError("failed to decode message %r" % (data,), data, e) from None


def init_dramatiq(config: AppConfig):
Expand Down
4 changes: 1 addition & 3 deletions src/common/logs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ def init_logger(config: AppConfig) -> None:

log_level = logging.DEBUG if config.DEBUG else logging.INFO
if config.ENVIRONMENT in ["local", "test"]:
shared_processors.append(
structlog.processors.TimeStamper(fmt="%d-%m-%Y %H:%M:%S", utc=True)
)
shared_processors.append(structlog.processors.TimeStamper(fmt="%d-%m-%Y %H:%M:%S", utc=True))
stdlib_processors.append(structlog.dev.ConsoleRenderer())
else:
shared_processors.append(structlog.processors.TimeStamper(fmt="iso", utc=True))
Expand Down
4 changes: 1 addition & 3 deletions src/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
def apply_decorator_to_methods(
decorator, protected_methods: bool = False, private_methods: bool = False
):
def apply_decorator_to_methods(decorator, protected_methods: bool = False, private_methods: bool = False):
"""
Class decorator to apply a given function or coroutine decorator
to all functions and coroutines within a class.
Expand Down
4 changes: 1 addition & 3 deletions src/domains/books/_gateway_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ async def save(self, book: BookModel) -> BookModel: ...
async def find(
self,
search_params: Union[None, Mapping[str, Any]] = None,
order_by: Union[
None, Iterable[Union[str, Tuple[str, Literal["asc", "desc"]]]]
] = None,
order_by: Union[None, Iterable[Union[str, Tuple[str, Literal["asc", "desc"]]]]] = None,
) -> List[BookModel]: ...


Expand Down
20 changes: 5 additions & 15 deletions src/domains/books/_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,8 @@ class BookService:
@inject
def __init__(
self,
book_repository: BookRepositoryInterface = Provide[
BookRepositoryInterface.__name__
],
event_gateway: BookEventGatewayInterface = Provide[
BookEventGatewayInterface.__name__
],
book_repository: BookRepositoryInterface = Provide[BookRepositoryInterface.__name__],
event_gateway: BookEventGatewayInterface = Provide[BookEventGatewayInterface.__name__],
) -> None:
super().__init__()
self._book_repository = book_repository
Expand All @@ -37,14 +33,10 @@ async def create_book(self, book: BookData) -> Book:
# Example of CPU intensive task ran in a different thread
# Using processes could be better, but it would bring technical complexity
# https://anyio.readthedocs.io/en/3.x/subprocesses.html#running-functions-in-worker-processes
book_data_altered: dict = await to_thread.run_sync(
self._some_cpu_intensive_blocking_task, book.model_dump()
)
book_data_altered: dict = await to_thread.run_sync(self._some_cpu_intensive_blocking_task, book.model_dump())

book_model = BookModel(**book_data_altered)
book = Book.model_validate(
await self._book_repository.save(book_model), from_attributes=True
)
book = Book.model_validate(await self._book_repository.save(book_model), from_attributes=True)

# Example of CPU intensive task ran in a dramatiq task. We should not rely on
# dramatiq if we need to wait the operation result.
Expand All @@ -53,9 +45,7 @@ async def create_book(self, book: BookData) -> Book:
book_cpu_intensive_task.send(book_id=str(book.book_id))

await self._event_gateway.emit(
BookCreatedV1.event_factory(
data=BookCreatedV1Data.model_validate(book_model, from_attributes=True)
)
BookCreatedV1.event_factory(data=BookCreatedV1Data.model_validate(book_model, from_attributes=True))
)
return book

Expand Down
Loading
Loading