Skip to content

Commit 0f35949

Browse files
committed
Make the ChannelRegistry type-aware
The channel registry is using `Any` as the message type, which is not type safe, as it completely *disables* type checking for the messages. This commit makes the channel registry type-aware, so channels are stored with their message type and the registry checks that the same channel is not used for different message types. This also makes the registry just a plain container for channels, the wrapper methods to create new senders and receivers, and to configure the `resend_latest` flag are removed. The `ReceiverFetcher` abstraction is also not needed if we just return the channel directly, as the channel itself is a `ReceiverFetcher`. Also the method to close and remove a channel is made public and the name more explicit, as it is used in normal code paths. The new registry only provide 2 main methods: * `get_or_create()`: Get or create a channel for the given key, doing the type checking to make sure the requested message type matches the existing channel message type if it already exists. * `close_and_remove()`: Close and remove the channel for the given key. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 7f4e1bd commit 0f35949

File tree

1 file changed

+150
-0
lines changed

1 file changed

+150
-0
lines changed
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
# License: MIT
2+
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A class that would dynamically create, own and provide access to channels."""
5+
6+
import dataclasses
7+
import logging
8+
import traceback
9+
from typing import TypeVar, cast
10+
11+
from frequenz.channels import Broadcast
12+
13+
_T = TypeVar("_T")
14+
_logger = logging.getLogger(__name__)
15+
16+
17+
class ChannelRegistry:
18+
"""Dynamically creates, own and provide access to broadcast channels.
19+
20+
It can be used by actors to dynamically establish a communication channel
21+
between each other.
22+
23+
The registry is responsible for creating channels when they are first requested via
24+
the [`get_or_create()`][frequenz.sdk.actor.ChannelRegistry.get_or_create] method.
25+
26+
The registry also stores type information to make sure that the same channel is not
27+
used for different message types.
28+
29+
Since the registry owns the channels, it is also responsible for closing them when
30+
they are no longer needed. There is no way to remove a channel without closing it.
31+
32+
Note:
33+
This registry stores [`Broadcast`][frequenz.channels.Broadcast] channels.
34+
"""
35+
36+
def __init__(self, *, name: str) -> None:
37+
"""Initialize this registry.
38+
39+
Args:
40+
name: A name to identify the registry in the logs. This name is also used as
41+
a prefix for the channel names.
42+
"""
43+
self._name = name
44+
self._channels: dict[str, _Entry] = {}
45+
46+
@property
47+
def name(self) -> str:
48+
"""The name of this registry."""
49+
return self._name
50+
51+
def message_type(self, key: str) -> type:
52+
"""Get the message type of the channel for the given key.
53+
54+
Args:
55+
key: The key to identify the channel.
56+
57+
Returns:
58+
The message type of the channel.
59+
60+
Raises:
61+
KeyError: If the channel does not exist.
62+
"""
63+
entry = self._channels.get(key)
64+
if entry is None:
65+
raise KeyError(f"No channel for key {key!r} exists.")
66+
return entry.message_type
67+
68+
def __contains__(self, key: str) -> bool:
69+
"""Check whether the channel for the given `key` exists."""
70+
return key in self._channels
71+
72+
def get_or_create(self, message_type: type[_T], key: str) -> Broadcast[_T]:
73+
"""Get or create a channel for the given key.
74+
75+
If a channel for the given key already exists, the message type of the existing
76+
channel is checked against the requested message type. If they do not match,
77+
a `ValueError` is raised.
78+
79+
Note:
80+
The types have to match exactly, it doesn't do a subtype check due to
81+
technical limitations. In the future subtype checks might be supported.
82+
83+
Args:
84+
message_type: The type of the message that is sent through the channel.
85+
key: The key to identify the channel.
86+
87+
Returns:
88+
The channel for the given key.
89+
90+
Raises:
91+
ValueError: If the channel exists and the message type does not match.
92+
"""
93+
if key not in self._channels:
94+
if _logger.isEnabledFor(logging.DEBUG):
95+
_logger.debug(
96+
"Creating a new channel for key %r with type %s at:\n%s",
97+
key,
98+
message_type,
99+
"".join(traceback.format_stack(limit=10)[:9]),
100+
)
101+
self._channels[key] = _Entry(message_type, Broadcast(f"{self._name}-{key}"))
102+
103+
entry = self._channels[key]
104+
if entry.message_type is not message_type:
105+
exception = ValueError(
106+
f"Type mismatch, a channel for key {key!r} exists and the requested "
107+
f"message type {message_type} is not the same as the existing "
108+
f"message type {entry.message_type}."
109+
)
110+
if _logger.isEnabledFor(logging.DEBUG):
111+
_logger.debug(
112+
"%s at:\n%s",
113+
str(exception),
114+
# We skip the last frame because it's this method, and limit the
115+
# stack to 9 frames to avoid adding too much noise.
116+
"".join(traceback.format_stack(limit=10)[:9]),
117+
)
118+
raise exception
119+
120+
return cast(Broadcast[_T], entry.channel)
121+
122+
async def close_and_remove(self, key: str) -> None:
123+
"""Remove the channel for the given key.
124+
125+
Args:
126+
key: The key to identify the channel.
127+
128+
Raises:
129+
KeyError: If the channel does not exist.
130+
"""
131+
entry = self._channels.pop(key, None)
132+
if entry is None:
133+
raise KeyError(f"No channel for key {key!r} exists.")
134+
await entry.channel.close()
135+
136+
137+
@dataclasses.dataclass(frozen=True)
138+
class _Entry:
139+
"""An entry in a channel registry."""
140+
141+
message_type: type
142+
"""The type of the message that is sent through the channel in this entry."""
143+
144+
# We use object instead of Any to minimize the chances of hindering type checking.
145+
# If for some reason the channel is not casted to the proper underlaying type, when
146+
# using object at least accessing any member that's not part of the object base
147+
# class will yield a type error, while if we used Any, it would not and the issue
148+
# would be much harder to find.
149+
channel: Broadcast[object]
150+
"""The channel in this entry."""

0 commit comments

Comments
 (0)