Skip to content

Commit 6a0dd9b

Browse files
committed
Change decorator implementation to functions
1 parent 8109c87 commit 6a0dd9b

File tree

4 files changed

+215
-149
lines changed

4 files changed

+215
-149
lines changed

src/common/asyncapi.py

Lines changed: 172 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -1,115 +1,185 @@
1-
from asyncio import iscoroutinefunction
2-
from functools import wraps
3-
from typing import List, Dict, Literal, Type, Optional
1+
from typing import List, Literal, Optional, Type
42

5-
from pydantic import BaseModel
63
import pydantic_asyncapi.v3 as pa
4+
from pydantic import BaseModel
75

6+
_info: pa.Info = pa.Info(
7+
title="AsyncAPI",
8+
version="1.0.0",
9+
)
810

9-
_asyncapi_registry: Dict[str, Dict[Literal["receive", "send"], List]] = {
10-
11-
}
1211

12+
_servers = {} # type: ignore
13+
_channels = {} # type: ignore
14+
_operations = {} # type: ignore
15+
_components_schemas = {} # type: ignore
1316

14-
# asyncapi_registry: Dict[str, Dict[Literal["receive", "send"], List]] = {
15-
# "chat_channel": {
16-
# "receive": [BookCreatedV1],
17-
# "send": [BookUpdatedV1],
18-
# }
19-
# }
2017

18+
def get_schema() -> pa.AsyncAPI:
19+
"""
20+
Function `get_schema` provides the complete AsyncAPI schema for the application, complying with
21+
version 3.0.0 of the AsyncAPI specification. It includes detailed information about info metadata,
22+
components, servers, channels, and operations required to set up and describe the asynchronous
23+
communication layer.
2124
22-
def add_channel_to_asyncapi_schema(
23-
receive: Optional[List[Type[BaseModel]]] = None,
24-
send: Optional[List[Type[BaseModel]]] = None,
25-
channel_name: Optional[str] = None,
26-
):
27-
def decorator(func):
28-
_channel_name = channel_name or func.__name__
29-
if _asyncapi_registry.get(_channel_name) is not None:
30-
raise ValueError(f"The schema already contains a definition for function {_channel_name}. Please rename the function or provide a different channel name.")
31-
32-
_asyncapi_registry[_channel_name] = {}
33-
if receive:
34-
_asyncapi_registry[_channel_name]["receive"] = receive
35-
if send:
36-
_asyncapi_registry[_channel_name]["send"] = send
37-
38-
@wraps(func)
39-
def wrapper(*args, **kwargs):
40-
# You can optionally use decorator_args and decorator_kwargs here if needed
41-
return func(*args, **kwargs)
42-
43-
@wraps(func)
44-
async def a_wrapper(*args, **kwargs):
45-
# You can optionally use decorator_args and decorator_kwargs here if needed
46-
return await func(*args, **kwargs)
47-
48-
return a_wrapper if iscoroutinefunction(func) else wrapper
49-
50-
return decorator
51-
52-
53-
def get_asyncapi_schema():
54-
components_schemas = {}
55-
channels = {}
56-
operations = {}
57-
58-
for channel, channel_operations in _asyncapi_registry.items():
59-
_channel_messages = {}
60-
for operation, messages in channel_operations.items():
61-
_operation_message_refs = []
62-
for message in messages:
63-
# TODO: Check for overlapping model schemas, if they are different log a warning!
64-
components_schemas[message.__name__] = message.model_json_schema(
65-
mode="validation" if operation == "receive" else "serialization",
66-
ref_template="#/components/schemas/{model}"
67-
)
68-
components_schemas.update(
69-
message.model_json_schema(mode="serialization", ref_template="#/components/schemas/{model}")[
70-
"$defs"])
71-
_channel_messages[message.__name__] = pa.Message(
72-
payload=pa.Reference(ref=f"#/components/schemas/{message.__name__}")
73-
)
74-
# Cannot point to the /components path
75-
_operation_message_refs.append(
76-
pa.Reference(ref=f"#/channels/{channel}/messages/{message.__name__}"))
77-
78-
# TODO: Define operation names in decorator
79-
operations[f"{channel}-{operation}"] = pa.Operation(
80-
action=operation,
81-
channel=pa.Reference(ref=f"#/channels/{channel}"),
82-
messages=_operation_message_refs,
83-
)
84-
85-
# TODO: Define channel metadata in decorator
86-
channels[channel] = pa.Channel(
87-
address=channel,
88-
description=f"Description for channel {channel}",
89-
title=f"Title for channel {channel}",
90-
servers=[pa.Reference(ref="#/servers/chat")],
91-
messages=_channel_messages,
92-
)
93-
94-
# TODO: Implement function to initialize application and servers
95-
schema = pa.AsyncAPI(
25+
Returns:
26+
pa.AsyncAPI: A fully constructed AsyncAPI schema object based on predefined configurations.
27+
"""
28+
return pa.AsyncAPI(
9629
asyncapi="3.0.0",
97-
info=pa.Info(
98-
title="Bookstore API",
99-
version="1.0.0",
100-
description="A bookstore asyncapi specification",
101-
),
30+
info=_info,
10231
components=pa.Components(
103-
schemas=components_schemas,
32+
schemas=_components_schemas,
10433
),
105-
servers={
106-
"chat": pa.Server(
107-
host="localhost",
108-
protocol="ws",
109-
)
110-
},
111-
channels=channels,
112-
operations=operations,
34+
servers=_servers,
35+
channels=_channels,
36+
operations=_operations,
37+
)
38+
39+
40+
def init_asyncapi_info(
41+
title: str,
42+
version: str = "1.0.0",
43+
) -> None:
44+
"""
45+
Initializes the AsyncAPI information object with the specified title and version.
46+
47+
This function creates and initializes an AsyncAPI Info object, which includes
48+
mandatory fields such as title and version. The title represents the name of the
49+
AsyncAPI document, and the version represents the version of the API.
50+
51+
Parameters:
52+
title (str): The title of the AsyncAPI document.
53+
version (str): The version of the AsyncAPI document. Defaults to "1.0.0".
54+
55+
Returns:
56+
None
57+
"""
58+
# We can potentially add the other info supported by pa.Info
59+
global _info
60+
_info = pa.Info(
61+
title=title,
62+
version=version,
11363
)
11464

115-
return schema
65+
66+
def register_server(
67+
id: str,
68+
host: str,
69+
protocol: str,
70+
pathname: Optional[str] = None,
71+
) -> None:
72+
"""
73+
Registers a server with a unique identifier and its associated properties.
74+
This function accepts information about the server such as its host,
75+
protocol, and optionally its pathname, and stores it in the internal
76+
server registry identified by the unique ID. The parameters must be
77+
provided appropriately for proper registration. The server registry
78+
ensures that server configurations can be retrieved and managed based
79+
on the assigned identifier.
80+
81+
Args:
82+
id: str
83+
A unique identifier for the server being registered. It is used
84+
as the key in the internal server registry.
85+
host: str
86+
The host address of the server. This may be an IP address or
87+
a domain name.
88+
protocol: str
89+
Communication protocol used by the server, such as "http" or "https".
90+
pathname: Optional[str]
91+
The optional pathname of the server. If provided, it will be
92+
associated with the registered server.
93+
94+
Returns:
95+
None
96+
This function does not return a value. It modifies the internal
97+
server registry to include the provided server details.
98+
"""
99+
# TODO: Implement other server parameters
100+
_servers[id] = pa.Server(
101+
host=host,
102+
protocol=protocol,
103+
)
104+
if pathname is not None:
105+
_servers[id].pathname = pathname
106+
107+
108+
def register_channel(
109+
address: str,
110+
id: Optional[str] = None,
111+
description: Optional[str] = None,
112+
title: Optional[str] = None,
113+
server_id: Optional[str] = None,
114+
) -> None:
115+
"""
116+
Registers a communication channel with the specified parameters and updates the
117+
internal dictionary holding channel metadata. The function allows optional
118+
parameters to set additional properties such as description and title, and
119+
optionally associates the channel with a predefined server.
120+
121+
Args:
122+
address (str): The address of the channel.
123+
id (Optional[str]): Unique identifier for the channel. Defaults to None.
124+
description (Optional[str]): Description of the channel. Defaults to None.
125+
title (Optional[str]): Title to be associated with the channel. Defaults to None.
126+
server_id (Optional[str]): Server identifier to link this channel to.
127+
Must exist in the internal server registry. Defaults to None.
128+
129+
Returns:
130+
None
131+
"""
132+
# TODO: Define channel metadata in decorator
133+
_channels[id or address] = pa.Channel(
134+
address=address,
135+
servers=[],
136+
messages={},
137+
)
138+
if description is not None:
139+
_channels[id or address].description = description
140+
if title is not None:
141+
_channels[id or address].title = title
142+
if server_id is not None and server_id in _servers:
143+
_channels[id or address].servers.append(pa.Reference(ref=f"#/servers/{server_id}")) # type: ignore
144+
145+
146+
def register_channel_operation(
147+
channel_id: str,
148+
operation_type: Literal["receive", "send"],
149+
messages: List[Type[BaseModel]],
150+
operation_name: Optional[str] = None,
151+
):
152+
if not _channels.get(channel_id):
153+
raise ValueError(f"Channel {channel_id} does not exist.")
154+
155+
_operation_message_refs = []
156+
for message in messages:
157+
# TODO: Check for overlapping model schemas, if they are different log a warning!
158+
_components_schemas[message.__name__] = message.model_json_schema(
159+
mode="validation" if operation_type == "receive" else "serialization",
160+
ref_template="#/components/schemas/{model}",
161+
)
162+
_components_schemas.update(
163+
message.model_json_schema(mode="serialization", ref_template="#/components/schemas/{model}")["$defs"]
164+
)
165+
_channels[channel_id].messages[message.__name__] = pa.Message( # type: ignore
166+
payload=pa.Reference(ref=f"#/components/schemas/{message.__name__}")
167+
)
168+
# Cannot point to the /components path
169+
_operation_message_refs.append(pa.Reference(ref=f"#/channels/{channel_id}/messages/{message.__name__}"))
170+
171+
_operations[operation_name or f"{channel_id}-{operation_type}"] = pa.Operation(
172+
action=operation_type,
173+
channel=pa.Reference(ref=f"#/channels/{channel_id}"),
174+
messages=_operation_message_refs,
175+
traits=[],
176+
)
177+
# TODO: Define operation traits
178+
# if operation_name is not None:
179+
# _operations[operation_name or f"{channel_id}-{operation_type}"].traits.append(
180+
# pa.OperationTrait(
181+
# title=operation_name,
182+
# summary=f"{operation_name} operation summary",
183+
# description=f"{operation_name} operation description",
184+
# )
185+
# )

src/common/bootstrap.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from dependency_injector.providers import Object
55
from pydantic import BaseModel, ConfigDict
66

7+
from .asyncapi import init_asyncapi_info
78
from .config import AppConfig
89
from .di_container import Container
910
from .dramatiq import init_dramatiq
@@ -27,6 +28,7 @@ def application_init(app_config: AppConfig) -> InitReference:
2728
init_logger(app_config)
2829
init_storage()
2930
init_dramatiq(app_config)
31+
init_asyncapi_info(app_config.APP_NAME)
3032

3133
return InitReference(
3234
di_container=container,

src/http_app/routes/docs_ws.py

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@
44
from fastapi import APIRouter
55
from starlette.responses import HTMLResponse
66

7-
from common.asyncapi import add_channel_to_asyncapi_schema, get_asyncapi_schema
8-
from domains.books.events import BookCreatedV1, BookUpdatedV1
9-
7+
from common.asyncapi import get_schema
108

119
router = APIRouter(prefix="/docs/ws")
1210

@@ -16,27 +14,23 @@
1614
response_model_exclude_unset=True,
1715
include_in_schema=False,
1816
)
19-
@add_channel_to_asyncapi_schema(send=[BookUpdatedV1])
2017
def asyncapi_raw() -> pa.v3.AsyncAPI:
21-
return get_asyncapi_schema()
18+
return get_schema()
2219

2320

2421
ASYNCAPI_COMPONENT_VERSION = "latest"
2522

2623
ASYNCAPI_JS_DEFAULT_URL = (
2724
f"https://unpkg.com/@asyncapi/react-component@{ASYNCAPI_COMPONENT_VERSION}/browser/standalone/index.js"
2825
)
29-
NORMALIZE_CSS_DEFAULT_URL = (
30-
"https://cdn.jsdelivr.net/npm/modern-normalize/modern-normalize.min.css"
31-
)
26+
NORMALIZE_CSS_DEFAULT_URL = "https://cdn.jsdelivr.net/npm/modern-normalize/modern-normalize.min.css"
3227
ASYNCAPI_CSS_DEFAULT_URL = (
3328
f"https://unpkg.com/@asyncapi/react-component@{ASYNCAPI_COMPONENT_VERSION}/styles/default.min.css"
3429
)
3530

3631

3732
# https://github.com/asyncapi/asyncapi-react/blob/v2.5.0/docs/usage/standalone-bundle.md
3833
@router.get("", include_in_schema=False)
39-
@add_channel_to_asyncapi_schema(receive=[BookCreatedV1], send=[BookUpdatedV1])
4034
async def get_asyncapi_html(
4135
sidebar: bool = True,
4236
info: bool = True,
@@ -48,7 +42,6 @@ async def get_asyncapi_html(
4842
expand_message_examples: bool = False,
4943
title: str = "Websocket",
5044
) -> HTMLResponse:
51-
5245
"""Generate HTML for displaying an AsyncAPI document."""
5346
config = {
5447
"schema": {

0 commit comments

Comments
 (0)