Skip to content

Commit 4a7f1e6

Browse files
authored
Clean up and improve the code and public interface (#231)
This PR addresses many small cleanups and improvements to the code and the public interface, including some minor breaking changes. Cleanups: - Use `deque` for typing - Replace `__anext__()` with `anext()` - Rename `recv` argument as `receiver` - Add missing types to members in `__init__()` - Anycast: Don't store the `_limit` - Use the string representation for logging - Broadcast: Move the `resend_latest` attribute to the end - Remove `logging` from `Timer` examples - Make `Broadcast` logger private Improvements: - Add `is_closed` property to channels - Anycast: Clarify what happens when the buffer is full - Anycast: Add `name` argument - Add `name` property to bidirectional - Broadcast: Use `hash()` instead of a UUID - Event: Make the default name more readable - Add descriptive `str` and `repr` implementations - Bump `repo-config` to v0.7.5 - Bump `pylint` to 3.0.2 - Bump `mike` to 2.0.0 - Anycast: Log a warning when a sender is blocked Breaking changes: - Make some arguments keyword-only - Make exception messages `str` - Rename `maxsize` to `limit` - Broadcast: Make `name` keyword-only - Bidirectional: unify `client_id` and `service_id` (fixes #187).
2 parents 91e39b4 + d7f751b commit 4a7f1e6

22 files changed

+471
-169
lines changed

RELEASE_NOTES.md

Lines changed: 75 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,26 +6,89 @@ The `Timer` now can be started with a delay.
66

77
## Upgrading
88

9-
* Internal variable names in the `Anycast` and `Broadcast` implementations are now private.
9+
* `Anycast`
10+
11+
- `__init__`: The `maxsize` argument was renamed to `limit` and made keyword-only and a new keyword-only `name` argument was added.
12+
13+
You should instantiate using `Anycast(name=..., limit=...)` (or `Anycast(name=...)` if the default `limit` is enough) instead of `Anycast(...)` or `Anycast(maxsize=...)`.
14+
15+
* `Bidirectional`
16+
17+
- The `client_id` and `service_id` arguments were merged into a keyword-only `name`.
18+
19+
You should instantiate using `Bidirectional(name=...)` instead of `Bidirectional(..., ...)` or `Bidirectional(client_id=..., service_id=...)`.
20+
21+
* `Broadcast`
22+
23+
- `__init__`: The `name` argument was made optional and keyword-only; `resend_latest` was also made keyword-only. If a `name` is not specified, it will be generated from the `id()` of the instance.
24+
25+
You should instantiate using `Broadcast(name=name, resend_latest=resend_latest)` (or `Broadcast()` if the defaults are enough) instead of `Broadcast(name)` or `Broadcast(name, resend_latest)`.
26+
27+
- `new_receiver`: The `maxsize` argument was renamed to `limit` and made keyword-only; the `name` argument was also made keyword-only. If a `name` is not specified, it will be generated from the `id()` of the instance instead of a random UUID.
28+
29+
You should use `.new_receiver(name=name, limit=limit)` (or `.new_receiver()` if the defaults are enough) instead of `.new_receiver(name)` or `.new_receiver(name, maxsize)`.
30+
31+
* `Event`
32+
33+
- `__init__`: The `name` argument was made keyword-only. The default was changed to a more readable version of `id(self)`.
34+
35+
You should instantiate using `Event(name=...)` instead of `Event(...)`.
36+
37+
* All exceptions that took `Any` as the `message` argument now take `str` instead.
38+
39+
If you were passing a non-`str` value to an exception, you should convert it using `str(value)` before passing it to the exception.
1040

1141
## New Features
1242

13-
* `Timer()`, `Timer.timeout()`, `Timer.periodic()` and `Timer.reset()` now take an optional `start_delay` option to make the timer start after some delay.
43+
* `Anycast`
44+
45+
- The following new read-only properties were added:
46+
47+
- `name`: The name of the channel.
48+
- `limit`: The maximum number of messages that can be sent to the channel.
49+
- `is_closed`: Whether the channel is closed.
50+
51+
- A more useful implementation of `__str__ and `__repr__` were added for the channel and its senders and receivers.
52+
53+
- A warning will be logged if senders are blocked because the channel buffer is full.
54+
55+
* `Bidirectional`
56+
57+
- The following new read-only properties were added:
58+
59+
- `name`: The name of the channel (read-only).
60+
- `is_closed`: Whether the channel is closed (read-only).
61+
62+
- A more useful implementation of `__str__ and `__repr__` were added for the channel and the client and service handles.
63+
64+
* `Broadcast`
65+
66+
- The following new read-only properties were added:
67+
68+
- `name`: The name of the channel.
69+
- `is_closed`: Whether the channel is closed.
70+
71+
- A more useful implementation of `__str__ and `__repr__` were added for the channel and the client and service handles.
72+
73+
* `FileWatcher`
74+
75+
- A more useful implementation of `__str__ and `__repr__` were added.
76+
77+
* `Merge`
78+
79+
- A more useful implementation of `__str__ and `__repr__` were added.
80+
81+
* `MergeNamed`
1482

15-
This can be useful, for example, if the timer needs to be *aligned* to a particular time. The alternative to this would be to `sleep()` for the time needed to align the timer, but if the `sleep()` call gets delayed because the event loop is busy, then a re-alignment is needed and this could go on for a while. The only way to guarantee a certain alignment (with a reasonable precision) is to delay the timer start.
83+
- A more useful implementation of `__str__ and `__repr__` were added.
1684

17-
* `Broadcast.resend_latest` is now a public attribute, allowing it to be changed after the channel is created.
85+
* `Peekable`
1886

19-
* The arm64 architecture is now officially supported.
87+
- A more useful implementation of `__str__ and `__repr__` were added.
2088

21-
* The documentation was improved to:
89+
* `Receiver`
2290

23-
- Show signatures with types.
24-
- Show the inherited members.
25-
- Documentation for pre-releases are now published.
26-
- Show the full tag name as the documentation version.
27-
- All development branches now have their documentation published (there is no `next` version anymore).
28-
- Fix the order of the documentation versions.
91+
- `map()`: The returned map object now has a more useful implementation of `__str__ and `__repr__`.
2992

3093
## Bug Fixes
3194

benchmarks/benchmark_anycast.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ async def benchmark_anycast(
4141
Returns:
4242
Total number of messages received by all channels.
4343
"""
44-
channels: list[Anycast[int]] = [Anycast(buffer_size) for _ in range(num_channels)]
44+
channels: list[Anycast[int]] = [
45+
Anycast(name="test", limit=buffer_size) for _ in range(num_channels)
46+
]
4547
senders = [
4648
asyncio.create_task(send_msg(num_messages, bcast.new_sender()))
4749
for bcast in channels

benchmarks/benchmark_broadcast.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ async def benchmark_broadcast(
6161
Returns:
6262
Total number of messages received by all receivers.
6363
"""
64-
channels: list[Broadcast[int]] = [Broadcast("meter") for _ in range(num_channels)]
64+
channels: list[Broadcast[int]] = [
65+
Broadcast(name="meter") for _ in range(num_channels)
66+
]
6567
senders: list[asyncio.Task[Any]] = [
6668
asyncio.create_task(send_msg(num_messages, bcast.new_sender()))
6769
for bcast in channels
@@ -104,7 +106,9 @@ async def benchmark_single_task_broadcast(
104106
Returns:
105107
Total number of messages received by all receivers.
106108
"""
107-
channels: list[Broadcast[int]] = [Broadcast("meter") for _ in range(num_channels)]
109+
channels: list[Broadcast[int]] = [
110+
Broadcast(name="meter") for _ in range(num_channels)
111+
]
108112
senders = [b.new_sender() for b in channels]
109113
recv_tracker = 0
110114

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ plugins:
9393
- literate-nav:
9494
nav_file: SUMMARY.md
9595
- mike:
96+
alias_type: redirect
9697
canonical_version: latest
9798
- mkdocstrings:
9899
custom_templates: templates

pyproject.toml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
requires = [
66
"setuptools == 68.1.0",
77
"setuptools_scm[toml] == 7.1.0",
8-
"frequenz-repo-config[lib] == 0.7.2",
8+
"frequenz-repo-config[lib] == 0.7.5",
99
]
1010
build-backend = "setuptools.build_meta"
1111

@@ -47,31 +47,31 @@ dev-formatting = ["black == 23.10.1", "isort == 5.12.0"]
4747
dev-mkdocs = [
4848
"black == 23.10.1",
4949
"Markdown==3.5.1",
50-
"mike == 1.1.2",
50+
"mike == 2.0.0",
5151
"mkdocs-gen-files == 0.5.0",
5252
"mkdocs-literate-nav == 0.6.1",
5353
"mkdocs-material == 9.4.7",
5454
"mkdocs-macros-plugin == 1.0.5",
5555
"mkdocstrings[python] == 0.23.0",
56-
"frequenz-repo-config[lib] == 0.7.2",
56+
"frequenz-repo-config[lib] == 0.7.5",
5757
]
5858
dev-mypy = [
5959
"mypy == 1.6.1",
6060
"types-Markdown == 3.5.0.0",
6161
# For checking the noxfile, docs/ script, and tests
6262
"frequenz-channels[dev-mkdocs,dev-noxfile,dev-pytest]",
6363
]
64-
dev-noxfile = ["nox == 2023.4.22", "frequenz-repo-config[lib] == 0.7.2"]
64+
dev-noxfile = ["nox == 2023.4.22", "frequenz-repo-config[lib] == 0.7.5"]
6565
dev-pylint = [
66-
"pylint == 2.17.7",
66+
"pylint == 3.0.2",
6767
# For checking the noxfile, docs/ script, and tests
6868
"frequenz-channels[dev-mkdocs,dev-noxfile,dev-pytest]",
6969
]
7070
dev-pytest = [
7171
"pytest == 7.4.3",
7272
"async-solipsism == 0.5",
7373
"hypothesis == 6.88.1",
74-
"frequenz-repo-config[extra-lint-examples] == 0.7.2",
74+
"frequenz-repo-config[extra-lint-examples] == 0.7.5",
7575
"pytest-asyncio == 0.21.1",
7676
"pytest-mock == 3.12.0",
7777
]

src/frequenz/channels/_anycast.py

Lines changed: 93 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,31 @@
55

66
from __future__ import annotations
77

8+
import logging
89
from asyncio import Condition
910
from collections import deque
10-
from typing import Deque, Generic
11+
from typing import Generic
1112

1213
from ._base_classes import Receiver as BaseReceiver
1314
from ._base_classes import Sender as BaseSender
1415
from ._base_classes import T
1516
from ._exceptions import ChannelClosedError, ReceiverStoppedError, SenderError
1617

18+
_logger = logging.getLogger(__name__)
19+
1720

1821
class Anycast(Generic[T]):
1922
"""A channel for sending data across async tasks.
2023
2124
Anycast channels support multiple senders and multiple receivers. A message sent
2225
through a sender will be received by exactly one receiver.
2326
27+
This channel is buffered, and if the senders are faster than the receivers, then the
28+
channel's buffer will fill up. In that case, the senders will block at the
29+
[send()][frequenz.channels.Sender.send] method until the receivers consume the
30+
messages in the channel's buffer. The channel's buffer size can be configured at
31+
creation time via the `limit` argument.
32+
2433
In cases where each message need to be received by every receiver, a
2534
[Broadcast][frequenz.channels.Broadcast] channel may be used.
2635
@@ -61,21 +70,24 @@ async def recv(id: int, receiver: channel.Receiver) -> None:
6170
Check the `tests` and `benchmarks` directories for more examples.
6271
"""
6372

64-
def __init__(self, maxsize: int = 10) -> None:
73+
def __init__(self, *, name: str, limit: int = 10) -> None:
6574
"""Create an Anycast channel.
6675
6776
Args:
68-
maxsize: Size of the channel's buffer.
77+
name: The name of the channel. This is for logging purposes, and it will be
78+
shown in the string representation of the channel.
79+
limit: The size of the internal buffer in number of messages. If the buffer
80+
is full, then the senders will block until the receivers consume the
81+
messages in the buffer.
6982
"""
70-
self._limit: int = maxsize
71-
"""The maximum number of values that can be stored in the channel's buffer.
83+
self._name: str = name
84+
"""The name of the channel.
7285
73-
If the length of channel's buffer reaches the limit, then the sender
74-
blocks at the [send()][frequenz.channels.Sender.send] method until
75-
a value is consumed.
86+
This is for logging purposes, and it will be shown in the string representation
87+
of the channel.
7688
"""
7789

78-
self._deque: Deque[T] = deque(maxlen=maxsize)
90+
self._deque: deque[T] = deque(maxlen=limit)
7991
"""The channel's buffer."""
8092

8193
self._send_cv: Condition = Condition()
@@ -97,6 +109,36 @@ def __init__(self, maxsize: int = 10) -> None:
97109
self._closed: bool = False
98110
"""Whether the channel is closed."""
99111

112+
@property
113+
def name(self) -> str:
114+
"""The name of this channel.
115+
116+
This is for debugging purposes, it will be shown in the string representation
117+
of this channel.
118+
"""
119+
return self._name
120+
121+
@property
122+
def is_closed(self) -> bool:
123+
"""Whether this channel is closed.
124+
125+
Any further attempts to use this channel after it is closed will result in an
126+
exception.
127+
"""
128+
return self._closed
129+
130+
@property
131+
def limit(self) -> int:
132+
"""The maximum number of values that can be stored in the channel's buffer.
133+
134+
If the length of channel's buffer reaches the limit, then the sender
135+
blocks at the [send()][frequenz.channels.Sender.send] method until
136+
a value is consumed.
137+
"""
138+
maxlen = self._deque.maxlen
139+
assert maxlen is not None
140+
return maxlen
141+
100142
async def close(self) -> None:
101143
"""Close the channel.
102144
@@ -131,6 +173,17 @@ def new_receiver(self) -> Receiver[T]:
131173
"""
132174
return Receiver(self)
133175

176+
def __str__(self) -> str:
177+
"""Return a string representation of this channel."""
178+
return f"{type(self).__name__}:{self._name}"
179+
180+
def __repr__(self) -> str:
181+
"""Return a string representation of this channel."""
182+
return (
183+
f"{type(self).__name__}(name={self._name!r}, limit={self.limit!r}):<"
184+
f"current={len(self._deque)!r}, closed={self._closed!r}>"
185+
)
186+
134187

135188
class Sender(BaseSender[T]):
136189
"""A sender to send messages to an Anycast channel.
@@ -145,7 +198,7 @@ def __init__(self, chan: Anycast[T]) -> None:
145198
Args:
146199
chan: A reference to the channel that this sender belongs to.
147200
"""
148-
self._chan = chan
201+
self._chan: Anycast[T] = chan
149202
"""The channel that this sender belongs to."""
150203

151204
async def send(self, msg: T) -> None:
@@ -169,14 +222,32 @@ async def send(self, msg: T) -> None:
169222
raise SenderError("The channel was closed", self) from ChannelClosedError(
170223
self._chan
171224
)
172-
while len(self._chan._deque) == self._chan._deque.maxlen:
173-
async with self._chan._send_cv:
174-
await self._chan._send_cv.wait()
225+
if len(self._chan._deque) == self._chan._deque.maxlen:
226+
_logger.warning(
227+
"Anycast channel [%s] is full, blocking sender until a receiver "
228+
"consumes a value",
229+
self,
230+
)
231+
while len(self._chan._deque) == self._chan._deque.maxlen:
232+
async with self._chan._send_cv:
233+
await self._chan._send_cv.wait()
234+
_logger.info(
235+
"Anycast channel [%s] has space again, resuming the blocked sender",
236+
self,
237+
)
175238
self._chan._deque.append(msg)
176239
async with self._chan._recv_cv:
177240
self._chan._recv_cv.notify(1)
178241
# pylint: enable=protected-access
179242

243+
def __str__(self) -> str:
244+
"""Return a string representation of this sender."""
245+
return f"{self._chan}:{type(self).__name__}"
246+
247+
def __repr__(self) -> str:
248+
"""Return a string representation of this sender."""
249+
return f"{type(self).__name__}({self._chan!r})"
250+
180251

181252
class _Empty:
182253
"""A sentinel value to indicate that a value has not been set."""
@@ -195,7 +266,7 @@ def __init__(self, chan: Anycast[T]) -> None:
195266
Args:
196267
chan: A reference to the channel that this receiver belongs to.
197268
"""
198-
self._chan = chan
269+
self._chan: Anycast[T] = chan
199270
"""The channel that this receiver belongs to."""
200271

201272
self._next: T | type[_Empty] = _Empty
@@ -251,3 +322,11 @@ def consume(self) -> T:
251322
self._next = _Empty
252323

253324
return next_val
325+
326+
def __str__(self) -> str:
327+
"""Return a string representation of this receiver."""
328+
return f"{self._chan}:{type(self).__name__}"
329+
330+
def __repr__(self) -> str:
331+
"""Return a string representation of this receiver."""
332+
return f"{type(self).__name__}({self._chan!r})"

0 commit comments

Comments
 (0)