Skip to content

Commit 72ed197

Browse files
committed
Reduce cognitive complexity
1 parent fdcc28f commit 72ed197

File tree

1 file changed

+72
-37
lines changed

1 file changed

+72
-37
lines changed

src/common/asyncapi.py

Lines changed: 72 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,29 @@ def register_server(
105105
_servers[id].pathname = pathname
106106

107107

108+
def _create_base_channel(address: str, channel_id: str) -> pa.Channel:
109+
"""Create a basic channel with minimum required parameters."""
110+
return pa.Channel(
111+
address=address,
112+
servers=[],
113+
messages={},
114+
)
115+
116+
117+
def _add_channel_metadata(channel: pa.Channel, description: Optional[str], title: Optional[str]) -> None:
118+
"""Add optional metadata to the channel."""
119+
if description is not None:
120+
channel.description = description
121+
if title is not None:
122+
channel.title = title
123+
124+
125+
def _add_server_reference(channel: pa.Channel, server_id: Optional[str]) -> None:
126+
"""Add server reference to the channel if server exists."""
127+
if server_id is not None and server_id in _servers:
128+
channel.servers.append(pa.Reference(ref=f"#/servers/{server_id}")) # type: ignore
129+
130+
108131
def register_channel(
109132
address: str,
110133
id: Optional[str] = None,
@@ -113,70 +136,82 @@ def register_channel(
113136
server_id: Optional[str] = None,
114137
) -> None:
115138
"""
116-
Registers a communication channel with the specified parameters and updates the
117-
internal dictionary holding channel metadata. The function allows optional
118-
parameters to set additional properties such as description and title, and
119-
optionally associates the channel with a predefined server.
139+
Registers a communication channel with the specified parameters.
120140
121141
Args:
122142
address (str): The address of the channel.
123143
id (Optional[str]): Unique identifier for the channel. Defaults to None.
124144
description (Optional[str]): Description of the channel. Defaults to None.
125145
title (Optional[str]): Title to be associated with the channel. Defaults to None.
126-
server_id (Optional[str]): Server identifier to link this channel to.
127-
Must exist in the internal server registry. Defaults to None.
146+
server_id (Optional[str]): Server identifier to link this channel to. Defaults to None.
128147
129148
Returns:
130149
None
131150
"""
132-
# TODO: Define channel metadata in decorator
133-
_channels[id or address] = pa.Channel(
134-
address=address,
135-
servers=[],
136-
messages={},
151+
channel_id = id or address
152+
channel = _create_base_channel(address, channel_id)
153+
_add_channel_metadata(channel, description, title)
154+
_add_server_reference(channel, server_id)
155+
_channels[channel_id] = channel
156+
157+
158+
def _register_message_schema(message: Type[BaseModel], operation_type: Literal["receive", "send"]) -> None:
159+
"""Register message schema in components schemas."""
160+
message_json_schema = message.model_json_schema(
161+
mode="validation" if operation_type == "receive" else "serialization",
162+
ref_template="#/components/schemas/{model}",
137163
)
138-
if description is not None:
139-
_channels[id or address].description = description
140-
if title is not None:
141-
_channels[id or address].title = title
142-
if server_id is not None and server_id in _servers:
143-
_channels[id or address].servers.append(pa.Reference(ref=f"#/servers/{server_id}")) # type: ignore
164+
165+
_components_schemas[message.__name__] = message_json_schema
166+
167+
if message_json_schema.get("$defs"):
168+
_components_schemas.update(message_json_schema["$defs"])
169+
170+
171+
def _create_channel_message(channel_id: str, message: Type[BaseModel]) -> pa.Reference:
172+
"""Create channel message and return reference to it."""
173+
_channels[channel_id].messages[message.__name__] = pa.Message( # type: ignore
174+
payload=pa.Reference(ref=f"#/components/schemas/{message.__name__}")
175+
)
176+
return pa.Reference(ref=f"#/channels/{channel_id}/messages/{message.__name__}")
144177

145178

146179
def register_channel_operation(
147180
channel_id: str,
148181
operation_type: Literal["receive", "send"],
149182
messages: List[Type[BaseModel]],
150183
operation_name: Optional[str] = None,
151-
):
152-
if not _channels.get(channel_id):
153-
raise ValueError(f"Channel {channel_id} does not exist.")
184+
) -> None:
185+
"""
186+
Registerm a channel operation with associated messages.
154187
155-
_operation_message_refs = []
156-
for message in messages:
157-
# TODO: Check for overlapping model schemas, if they are different log a warning!
158-
_message_json_schema = message.model_json_schema(
159-
mode="validation" if operation_type == "receive" else "serialization",
160-
ref_template="#/components/schemas/{model}",
161-
)
188+
Args:
189+
channel_id: Channel identifier
190+
operation_type: Type of operation ("receive" or "send")
191+
messages: List of message models
192+
operation_name: Optional operation name
162193
163-
_components_schemas[message.__name__] = _message_json_schema
194+
Raises:
195+
ValueError: If channel_id doesn't exist
196+
"""
197+
if not _channels.get(channel_id):
198+
raise ValueError(f"Channel {channel_id} does not exist.")
164199

165-
if _message_json_schema.get("$defs"):
166-
_components_schemas.update(_message_json_schema["$defs"])
167-
_channels[channel_id].messages[message.__name__] = pa.Message( # type: ignore
168-
payload=pa.Reference(ref=f"#/components/schemas/{message.__name__}")
169-
)
200+
operation_message_refs = []
170201

171-
# Cannot point to the /components path
172-
_operation_message_refs.append(pa.Reference(ref=f"#/channels/{channel_id}/messages/{message.__name__}"))
202+
for message in messages:
203+
_register_message_schema(message, operation_type)
204+
message_ref = _create_channel_message(channel_id, message)
205+
operation_message_refs.append(message_ref)
173206

174-
_operations[operation_name or f"{channel_id}-{operation_type}"] = pa.Operation(
207+
operation_id = operation_name or f"{channel_id}-{operation_type}"
208+
_operations[operation_id] = pa.Operation(
175209
action=operation_type,
176210
channel=pa.Reference(ref=f"#/channels/{channel_id}"),
177-
messages=_operation_message_refs,
211+
messages=operation_message_refs,
178212
traits=[],
179213
)
214+
180215
# TODO: Define operation traits
181216
# if operation_name is not None:
182217
# _operations[operation_name or f"{channel_id}-{operation_type}"].traits.append(

0 commit comments

Comments
 (0)