Skip to content

Commit 271ac7e

Browse files
committed
Create global registry with a decorator to register channels and function to generate the schema
1 parent b7d99e7 commit 271ac7e

File tree

2 files changed

+130
-74
lines changed

2 files changed

+130
-74
lines changed

src/common/asyncapi.py

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
from asyncio import iscoroutinefunction
2+
from functools import wraps
3+
from typing import List, Dict, Literal, Type, Optional
4+
5+
from pydantic import BaseModel
6+
import pydantic_asyncapi.v3 as pa
7+
8+
9+
_asyncapi_registry: Dict[str, Dict[Literal["receive", "send"], List]] = {
10+
11+
}
12+
13+
14+
# asyncapi_registry: Dict[str, Dict[Literal["receive", "send"], List]] = {
15+
# "chat_channel": {
16+
# "receive": [BookCreatedV1],
17+
# "send": [BookUpdatedV1],
18+
# }
19+
# }
20+
21+
22+
def add_channel_to_asyncapi_schema(
23+
receive: Optional[List[Type[BaseModel]]] = None,
24+
send: Optional[List[Type[BaseModel]]] = None,
25+
channel_name: Optional[str] = None,
26+
):
27+
def decorator(func):
28+
_channel_name = channel_name or func.__name__
29+
if _asyncapi_registry.get(_channel_name) is not None:
30+
raise ValueError(f"The schema already contains a definition for function {_channel_name}. Please rename the function or provide a different channel name.")
31+
32+
_asyncapi_registry[_channel_name] = {}
33+
if receive:
34+
_asyncapi_registry[_channel_name]["receive"] = receive
35+
if send:
36+
_asyncapi_registry[_channel_name]["send"] = send
37+
38+
@wraps(func)
39+
def wrapper(*args, **kwargs):
40+
# You can optionally use decorator_args and decorator_kwargs here if needed
41+
return func(*args, **kwargs)
42+
43+
@wraps(func)
44+
async def a_wrapper(*args, **kwargs):
45+
# You can optionally use decorator_args and decorator_kwargs here if needed
46+
return await func(*args, **kwargs)
47+
48+
return a_wrapper if iscoroutinefunction(func) else wrapper
49+
50+
return decorator
51+
52+
53+
def get_asyncapi_schema():
54+
components_schemas = {}
55+
channels = {}
56+
operations = {}
57+
58+
for channel, channel_operations in _asyncapi_registry.items():
59+
_channel_messages = {}
60+
for operation, messages in channel_operations.items():
61+
_operation_message_refs = []
62+
for message in messages:
63+
# TODO: Check for overlapping model schemas, if they are different log a warning!
64+
components_schemas[message.__name__] = message.model_json_schema(
65+
mode="validation" if operation == "receive" else "serialization",
66+
ref_template="#/components/schemas/{model}"
67+
)
68+
components_schemas.update(
69+
message.model_json_schema(mode="serialization", ref_template="#/components/schemas/{model}")[
70+
"$defs"])
71+
_channel_messages[message.__name__] = pa.Message(
72+
payload=pa.Reference(ref=f"#/components/schemas/{message.__name__}")
73+
)
74+
# Cannot point to the /components path
75+
_operation_message_refs.append(
76+
pa.Reference(ref=f"#/channels/{channel}/messages/{message.__name__}"))
77+
78+
# TODO: Define operation names in decorator
79+
operations[f"{channel}-{operation}"] = pa.Operation(
80+
action=operation,
81+
channel=pa.Reference(ref=f"#/channels/{channel}"),
82+
messages=_operation_message_refs,
83+
)
84+
85+
# TODO: Define channel metadata in decorator
86+
channels[channel] = pa.Channel(
87+
address=channel,
88+
description=f"Description for channel {channel}",
89+
title=f"Title for channel {channel}",
90+
servers=[pa.Reference(ref="#/servers/chat")],
91+
messages=_channel_messages,
92+
)
93+
94+
# TODO: Implement function to initialize application and servers
95+
schema = pa.AsyncAPI(
96+
asyncapi="3.0.0",
97+
info=pa.Info(
98+
title="Bookstore API",
99+
version="1.0.0",
100+
description="A bookstore asyncapi specification",
101+
),
102+
components=pa.Components(
103+
schemas=components_schemas,
104+
),
105+
servers={
106+
"chat": pa.Server(
107+
host="localhost",
108+
protocol="ws",
109+
)
110+
},
111+
channels=channels,
112+
operations=operations,
113+
)
114+
115+
return schema

src/http_app/routes/docs_ws.py

Lines changed: 15 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,72 +1,12 @@
11
import json
2-
from typing import Dict, Literal, List
32

43
import pydantic_asyncapi as pa
54
from fastapi import APIRouter
65
from starlette.responses import HTMLResponse
76

7+
from common.asyncapi import add_channel_to_asyncapi_schema, get_asyncapi_schema
88
from domains.books.events import BookCreatedV1, BookUpdatedV1
99

10-
asyncapi_registry: Dict[str, Dict[Literal["receive", "send"], List]] = {
11-
"chat_channel": {
12-
"receive": [BookCreatedV1],
13-
"send": [BookUpdatedV1],
14-
}
15-
}
16-
17-
components_schemas = {}
18-
19-
channels = {}
20-
operations = {}
21-
22-
for channel, channel_operations in asyncapi_registry.items():
23-
_channel_messages = {}
24-
for operation, messages in channel_operations.items():
25-
_operation_message_refs = []
26-
for message in messages:
27-
# TODO: Check for overlapping model schemas, if they are different log a warning!
28-
components_schemas[message.__name__] = message.model_json_schema(
29-
mode="validation" if operation == "receive" else "serialization",
30-
ref_template="#/components/schemas/{model}"
31-
)
32-
components_schemas.update(message.model_json_schema(mode="serialization", ref_template="#/components/schemas/{model}")["$defs"])
33-
_channel_messages[message.__name__] = pa.v3.Message(
34-
payload=pa.v3.Reference(ref=f"#/components/schemas/{message.__name__}")
35-
)
36-
# Cannot point to the /components path
37-
_operation_message_refs.append(pa.v3.Reference(ref=f"#/channels/chat_channel/messages/{message.__name__}"))
38-
operations[operation] = pa.v3.Operation(
39-
action=operation,
40-
channel=pa.v3.Reference(ref=f"#/channels/{channel}"),
41-
messages=_operation_message_refs,
42-
)
43-
channels[channel] = pa.v3.Channel(
44-
title=channel,
45-
servers=[pa.v3.Reference(ref="#/servers/chat")],
46-
messages=_channel_messages,
47-
)
48-
49-
50-
51-
schema = pa.AsyncAPIV3(
52-
asyncapi="3.0.0",
53-
info=pa.v3.Info(
54-
title="Bookstore API",
55-
version="1.0.0",
56-
description="A bookstore aysncapi specification",
57-
),
58-
components=pa.v3.Components(
59-
schemas=components_schemas,
60-
),
61-
servers={
62-
"chat": pa.v3.Server(
63-
host="localhost",
64-
protocol="websocket",
65-
)
66-
},
67-
channels=channels,
68-
operations=operations,
69-
)
7010

7111
router = APIRouter(prefix="/docs/ws")
7212

@@ -76,8 +16,9 @@
7616
response_model_exclude_unset=True,
7717
include_in_schema=False,
7818
)
79-
def asyncapi_raw() -> pa.AsyncAPIV3:
80-
return schema
19+
@add_channel_to_asyncapi_schema(send=[BookUpdatedV1])
20+
def asyncapi_raw() -> pa.v3.AsyncAPI:
21+
return get_asyncapi_schema()
8122

8223

8324
ASYNCAPI_COMPONENT_VERSION = "latest"
@@ -95,21 +36,21 @@ def asyncapi_raw() -> pa.AsyncAPIV3:
9536

9637
# https://github.com/asyncapi/asyncapi-react/blob/v2.5.0/docs/usage/standalone-bundle.md
9738
@router.get("", include_in_schema=False)
98-
def get_asyncapi_html(
99-
sidebar: bool = True,
100-
info: bool = True,
101-
servers: bool = True,
102-
operations: bool = True,
103-
messages: bool = True,
104-
schemas: bool = True,
105-
errors: bool = True,
106-
expand_message_examples: bool = False,
107-
title: str = "Websocket",
39+
@add_channel_to_asyncapi_schema(receive=[BookCreatedV1], send=[BookUpdatedV1])
40+
async def get_asyncapi_html(
41+
sidebar: bool = True,
42+
info: bool = True,
43+
servers: bool = True,
44+
operations: bool = True,
45+
messages: bool = True,
46+
schemas: bool = True,
47+
errors: bool = True,
48+
expand_message_examples: bool = False,
49+
title: str = "Websocket",
10850
) -> HTMLResponse:
10951

11052
"""Generate HTML for displaying an AsyncAPI document."""
11153
config = {
112-
# "schema": schema_json,
11354
"schema": {
11455
"url": "/docs/ws/asyncapi.json",
11556
},

0 commit comments

Comments
 (0)