Skip to content

Implement AsyncAPI documentation #244

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ dev-dependencies:

update-dependencies:
uv lock --upgrade
uv sync --all-groups --frozen

migrate:
uv run alembic upgrade heads
Expand Down
6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
authors = [
{name = "Federico Busetti", email = "[email protected]"},
]
requires-python = "<3.14,>=3.9"
requires-python = "<3.14,>=3.10"
name = "bootstrap-fastapi-service"
version = "0.1.0"
description = ""
Expand All @@ -14,7 +14,7 @@ dependencies = [
"cloudevents-pydantic<1.0.0,>=0.0.3",
"dependency-injector[pydantic]<5.0.0,>=4.41.0",
"dramatiq[redis,watch]<2.0.0,>=1.17.1",
"hiredis<4.0.0,>=3.1.0", # Recommended by dramatiq
"hiredis<4.0.0,>=3.1.0", # Recommended by dramatiq
"httpx>=0.23.0",
"opentelemetry-distro[otlp]",
"opentelemetry-instrumentation",
Expand All @@ -28,6 +28,7 @@ dependencies = [
"SQLAlchemy[asyncio,mypy]<3.0.0,>=2.0.0",
"sqlalchemy-bind-manager",
"structlog<25.1.1,>=25.1.0",
"pydantic-asyncapi>=0.2.1",
]

[dependency-groups]
Expand Down Expand Up @@ -115,6 +116,7 @@ testpaths = [

[tool.ruff]
target-version = "py39"
line-length = 120
extend-exclude = [
"docs",
]
Expand Down
188 changes: 188 additions & 0 deletions src/common/asyncapi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
from typing import List, Literal, Optional, Type

import pydantic_asyncapi.v3 as pa
from pydantic import BaseModel

_info: pa.Info = pa.Info(
title="AsyncAPI",
version="1.0.0",
)


_servers = {} # type: ignore
_channels = {} # type: ignore
_operations = {} # type: ignore
_components_schemas = {} # type: ignore


def get_schema() -> pa.AsyncAPI:
"""
Function `get_schema` provides the complete AsyncAPI schema for the application, complying with
version 3.0.0 of the AsyncAPI specification. It includes detailed information about info metadata,
components, servers, channels, and operations required to set up and describe the asynchronous
communication layer.

Returns:
pa.AsyncAPI: A fully constructed AsyncAPI schema object based on predefined configurations.
"""
return pa.AsyncAPI(
asyncapi="3.0.0",
info=_info,
components=pa.Components(
schemas=_components_schemas,
),
servers=_servers,
channels=_channels,
operations=_operations,
)


def init_asyncapi_info(
title: str,
version: str = "1.0.0",
) -> None:
"""
Initializes the AsyncAPI information object with the specified title and version.

This function creates and initializes an AsyncAPI Info object, which includes
mandatory fields such as title and version. The title represents the name of the
AsyncAPI document, and the version represents the version of the API.

Parameters:
title (str): The title of the AsyncAPI document.
version (str): The version of the AsyncAPI document. Defaults to "1.0.0".

Returns:
None
"""
# We can potentially add the other info supported by pa.Info
global _info
_info = pa.Info(
title=title,
version=version,
)


def register_server(
id: str,
host: str,
protocol: str,
pathname: Optional[str] = None,
) -> None:
"""
Registers a server with a unique identifier and its associated properties.
This function accepts information about the server such as its host,
protocol, and optionally its pathname, and stores it in the internal
server registry identified by the unique ID. The parameters must be
provided appropriately for proper registration. The server registry
ensures that server configurations can be retrieved and managed based
on the assigned identifier.

Args:
id: str
A unique identifier for the server being registered. It is used
as the key in the internal server registry.
host: str
The host address of the server. This may be an IP address or
a domain name.
protocol: str
Communication protocol used by the server, such as "http" or "https".
pathname: Optional[str]
The optional pathname of the server. If provided, it will be
associated with the registered server.

Returns:
None
This function does not return a value. It modifies the internal
server registry to include the provided server details.
"""
# TODO: Implement other server parameters
_servers[id] = pa.Server(
host=host,
protocol=protocol,
)
if pathname is not None:
_servers[id].pathname = pathname


def register_channel(
address: str,
id: Optional[str] = None,
description: Optional[str] = None,
title: Optional[str] = None,
server_id: Optional[str] = None,
) -> None:
"""
Registers a communication channel with the specified parameters and updates the
internal dictionary holding channel metadata. The function allows optional
parameters to set additional properties such as description and title, and
optionally associates the channel with a predefined server.

Args:
address (str): The address of the channel.
id (Optional[str]): Unique identifier for the channel. Defaults to None.
description (Optional[str]): Description of the channel. Defaults to None.
title (Optional[str]): Title to be associated with the channel. Defaults to None.
server_id (Optional[str]): Server identifier to link this channel to.
Must exist in the internal server registry. Defaults to None.

Returns:
None
"""
# TODO: Define channel metadata in decorator
_channels[id or address] = pa.Channel(
address=address,
servers=[],
messages={},
)
if description is not None:
_channels[id or address].description = description
if title is not None:
_channels[id or address].title = title
if server_id is not None and server_id in _servers:
_channels[id or address].servers.append(pa.Reference(ref=f"#/servers/{server_id}")) # type: ignore


def register_channel_operation(
channel_id: str,
operation_type: Literal["receive", "send"],
messages: List[Type[BaseModel]],
operation_name: Optional[str] = None,
):
if not _channels.get(channel_id):
raise ValueError(f"Channel {channel_id} does not exist.")

_operation_message_refs = []
for message in messages:
# TODO: Check for overlapping model schemas, if they are different log a warning!
_message_json_schema = message.model_json_schema(
mode="validation" if operation_type == "receive" else "serialization",
ref_template="#/components/schemas/{model}",
)

_components_schemas[message.__name__] = _message_json_schema

if _message_json_schema.get("$defs"):
_components_schemas.update(_message_json_schema["$defs"])
_channels[channel_id].messages[message.__name__] = pa.Message( # type: ignore
payload=pa.Reference(ref=f"#/components/schemas/{message.__name__}")
)

# Cannot point to the /components path
_operation_message_refs.append(pa.Reference(ref=f"#/channels/{channel_id}/messages/{message.__name__}"))

_operations[operation_name or f"{channel_id}-{operation_type}"] = pa.Operation(
action=operation_type,
channel=pa.Reference(ref=f"#/channels/{channel_id}"),
messages=_operation_message_refs,
traits=[],
)
# TODO: Define operation traits
# if operation_name is not None:
# _operations[operation_name or f"{channel_id}-{operation_type}"].traits.append(
# pa.OperationTrait(
# title=operation_name,
# summary=f"{operation_name} operation summary",
# description=f"{operation_name} operation description",
# )
# )
2 changes: 2 additions & 0 deletions src/common/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from dependency_injector.providers import Object
from pydantic import BaseModel, ConfigDict

from .asyncapi import init_asyncapi_info
from .config import AppConfig
from .di_container import Container
from .dramatiq import init_dramatiq
Expand All @@ -27,6 +28,7 @@ def application_init(app_config: AppConfig) -> InitReference:
init_logger(app_config)
init_storage()
init_dramatiq(app_config)
init_asyncapi_info(app_config.APP_NAME)

return InitReference(
di_container=container,
Expand Down
4 changes: 1 addition & 3 deletions src/common/di_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,4 @@ def function(
bind=SQLAlchemyBindManager.provided.get_bind.call(),
model_class=BookModel,
)
BookEventGatewayInterface: Factory[BookEventGatewayInterface] = Factory(
NullEventGateway
)
BookEventGatewayInterface: Factory[BookEventGatewayInterface] = Factory(NullEventGateway)
4 changes: 1 addition & 3 deletions src/common/dramatiq.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ def decode(self, data: bytes) -> MessageData:
try:
return orjson.loads(data)
except orjson.JSONDecodeError as e:
raise DecodeError(
"failed to decode message %r" % (data,), data, e
) from None
raise DecodeError("failed to decode message %r" % (data,), data, e) from None


def init_dramatiq(config: AppConfig):
Expand Down
4 changes: 1 addition & 3 deletions src/common/logs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ def init_logger(config: AppConfig) -> None:

log_level = logging.DEBUG if config.DEBUG else logging.INFO
if config.ENVIRONMENT in ["local", "test"]:
shared_processors.append(
structlog.processors.TimeStamper(fmt="%d-%m-%Y %H:%M:%S", utc=True)
)
shared_processors.append(structlog.processors.TimeStamper(fmt="%d-%m-%Y %H:%M:%S", utc=True))
stdlib_processors.append(structlog.dev.ConsoleRenderer())
else:
shared_processors.append(structlog.processors.TimeStamper(fmt="iso", utc=True))
Expand Down
4 changes: 1 addition & 3 deletions src/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
def apply_decorator_to_methods(
decorator, protected_methods: bool = False, private_methods: bool = False
):
def apply_decorator_to_methods(decorator, protected_methods: bool = False, private_methods: bool = False):
"""
Class decorator to apply a given function or coroutine decorator
to all functions and coroutines within a class.
Expand Down
4 changes: 1 addition & 3 deletions src/domains/books/_gateway_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ async def save(self, book: BookModel) -> BookModel: ...
async def find(
self,
search_params: Union[None, Mapping[str, Any]] = None,
order_by: Union[
None, Iterable[Union[str, Tuple[str, Literal["asc", "desc"]]]]
] = None,
order_by: Union[None, Iterable[Union[str, Tuple[str, Literal["asc", "desc"]]]]] = None,
) -> List[BookModel]: ...


Expand Down
20 changes: 5 additions & 15 deletions src/domains/books/_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,8 @@ class BookService:
@inject
def __init__(
self,
book_repository: BookRepositoryInterface = Provide[
BookRepositoryInterface.__name__
],
event_gateway: BookEventGatewayInterface = Provide[
BookEventGatewayInterface.__name__
],
book_repository: BookRepositoryInterface = Provide[BookRepositoryInterface.__name__],
event_gateway: BookEventGatewayInterface = Provide[BookEventGatewayInterface.__name__],
) -> None:
super().__init__()
self._book_repository = book_repository
Expand All @@ -37,14 +33,10 @@ async def create_book(self, book: BookData) -> Book:
# Example of CPU intensive task ran in a different thread
# Using processes could be better, but it would bring technical complexity
# https://anyio.readthedocs.io/en/3.x/subprocesses.html#running-functions-in-worker-processes
book_data_altered: dict = await to_thread.run_sync(
self._some_cpu_intensive_blocking_task, book.model_dump()
)
book_data_altered: dict = await to_thread.run_sync(self._some_cpu_intensive_blocking_task, book.model_dump())

book_model = BookModel(**book_data_altered)
book = Book.model_validate(
await self._book_repository.save(book_model), from_attributes=True
)
book = Book.model_validate(await self._book_repository.save(book_model), from_attributes=True)

# Example of CPU intensive task ran in a dramatiq task. We should not rely on
# dramatiq if we need to wait the operation result.
Expand All @@ -53,9 +45,7 @@ async def create_book(self, book: BookData) -> Book:
book_cpu_intensive_task.send(book_id=str(book.book_id))

await self._event_gateway.emit(
BookCreatedV1.event_factory(
data=BookCreatedV1Data.model_validate(book_model, from_attributes=True)
)
BookCreatedV1.event_factory(data=BookCreatedV1Data.model_validate(book_model, from_attributes=True))
)
return book

Expand Down
8 changes: 2 additions & 6 deletions src/domains/books/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ class BookCreatedV1(CloudEvent):
Field(default="/book_service", validate_default=True),
metadata.FieldSource,
]
type: Annotated[
Literal["book.created.v1"], Field(default="book.created.v1"), metadata.FieldType
]
type: Annotated[Literal["book.created.v1"], Field(default="book.created.v1"), metadata.FieldType]
dataschema: Annotated[
URI,
Field(default=_dataschema_url("book.created.v1"), validate_default=True),
Expand Down Expand Up @@ -61,9 +59,7 @@ class BookUpdatedV1(CloudEvent):
Field(default="/book_service", validate_default=True),
metadata.FieldSource,
]
type: Annotated[
Literal["book.updated.v1"], Field(default="book.updated.v1"), metadata.FieldType
]
type: Annotated[Literal["book.updated.v1"], Field(default="book.updated.v1"), metadata.FieldType]
dataschema: Annotated[
URI,
Field(default=_dataschema_url("book.updated.v1"), validate_default=True),
Expand Down
4 changes: 1 addition & 3 deletions src/gateways/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@


class NullEventGateway:
async def emit(
self, event: CloudEvent
) -> None: # pragma: no cover # No need to test this
async def emit(self, event: CloudEvent) -> None: # pragma: no cover # No need to test this
logger = get_logger()
await logger.ainfo(
"Event emitted",
Expand Down
2 changes: 1 addition & 1 deletion src/http_app/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
from http_app import context


def app_config() -> AppConfig:
def get_app_config() -> AppConfig:
return context.app_config.get()
Loading