Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# For more details on the configuration please see:
# https://github.com/marketplace/actions/labeler

"part:docs":
"part:docs":
- "**/*.md"
- "docs/**"
- "examples/**"
Expand All @@ -31,14 +31,16 @@
- noxfile.py

"part:channels":
- any:
- "src/frequenz/channels/**"
- "!src/frequenz/channels/util/**"
- "src/frequenz/channels/anycast.py"
- "src/frequenz/channels/bidirectional.py"
- "src/frequenz/channels/broadcast.py"

"part:receivers":
- any:
- "src/frequenz/channels/util/**"
- "!src/frequenz/channels/util/_select.py"
- "src/frequenz/channels/event.py"
- "src/frequenz/channels/file_watcher.py"
- "src/frequenz/channels/merge.py"
- "src/frequenz/channels/merge_named.py"
- "src/frequenz/channels/timer.py"

"part:select":
- "src/frequenz/channels/util/_select.py"
- "src/frequenz/channels/_select.py"
86 changes: 74 additions & 12 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,88 @@ The `Timer` now can be started with a delay.

## Upgrading

* Internal variable names in the `Anycast` and `Broadcast` implementations are now private.
* `Anycast`

- `__init__`: The `maxsize` argument was renamed to `limit` and made keyword-only and a new optional, keyword-only `name` argument was added. If a `name` is not specified, it will be generated from the `id()` of the instance.

You should instantiate using `Anycast(name=..., limit=...)` (or `Anycast()` if the defaults are enough) instead of `Anycast(...)` or `Anycast(maxsize=...)`.

* `Bidirectional`

- The `client_id` and `service_id` arguments were merged into an optional, keyword-only `name`. If a `name` is not specified, it will be generated from the `id()` of the instance.

You should instantiate using `Bidirectional(name=...)` (or `Bidirectional()` if the default name is enough) instead of `Bidirectional(..., ...)` or `Bidirectional(client_id=..., service_id=...)`.

- `new_receiver`: The `maxsize` argument was renamed to `limit` and made keyword-only; the `name` argument was also made keyword-only. If a `name` is not specified, it will be generated from the `id()` of the instance instead of a random UUID.

You should instantiate using `Broadcast(name=name, resend_latest=resend_latest)` (or `Broadcast()` if the defaults are enough) instead of `Broadcast(name)` or `Broadcast(name, resend_latest)`.

* `Broadcast`

- `__init__`: The `name` argument was made optional and keyword-only; `resend_latest` was also made keyword-only. If a `name` is not specified, it will be generated from the `id()` of the instance.

You should instantiate using `Broadcast(name=name, resend_latest=resend_latest)` (or `Broadcast()` if the defaults are enough) instead of `Broadcast(name)` or `Broadcast(name, resend_latest)`.

* `Event`

- `__init__`: The `name` argument was made keyword-only. The default was changed to a more readable version of `id(self)`.

You should instantiate using `Event(name=...)` instead of `Event(...)`.

* All exceptions that took `Any` as the `message` argument now take `str` instead.

If you were passing a non-`str` value to an exception, you should convert it using `str(value)` before passing it to the exception.

## New Features

* `Timer()`, `Timer.timeout()`, `Timer.periodic()` and `Timer.reset()` now take an optional `start_delay` option to make the timer start after some delay.
* `Anycast`

- The following new read-only properties were added:

- `name`: The name of the channel.
- `limit`: The maximum number of messages that can be sent to the channel.
- `is_closed`: Whether the channel is closed.

- A more useful implementation of `__str__ and `__repr__` were added for the channel and its senders and receivers.

* `Bidirectional`

- The following new read-only properties were added:

- `name`: The name of the channel (read-only).
- `is_closed`: Whether the channel is closed (read-only).

- A more useful implementation of `__str__ and `__repr__` were added for the channel and the client and service handles.

* `Broadcast`

- The following new read-only properties were added:

- `name`: The name of the channel.
- `latest`: The latest message sent to the channel.
- `is_closed`: Whether the channel is closed.

- A more useful implementation of `__str__ and `__repr__` were added for the channel and the client and service handles.

* `FileWatcher`

- A more useful implementation of `__str__ and `__repr__` were added.

* `Merge`

- A more useful implementation of `__str__ and `__repr__` were added.

* `MergeNamed`

This can be useful, for example, if the timer needs to be *aligned* to a particular time. The alternative to this would be to `sleep()` for the time needed to align the timer, but if the `sleep()` call gets delayed because the event loop is busy, then a re-alignment is needed and this could go on for a while. The only way to guarantee a certain alignment (with a reasonable precision) is to delay the timer start.
- A more useful implementation of `__str__ and `__repr__` were added.

* `Broadcast.resend_latest` is now a public attribute, allowing it to be changed after the channel is created.
* `Peekable`

* The arm64 architecture is now officially supported.
- A more useful implementation of `__str__ and `__repr__` were added.

* The documentation was improved to:
* `Receiver`

- Show signatures with types.
- Show the inherited members.
- Documentation for pre-releases are now published.
- Show the full tag name as the documentation version.
- All development branches now have their documentation published (there is no `next` version anymore).
- Fix the order of the documentation versions.
- `map()`: The returned map object now has a more useful implementation of `__str__ and `__repr__`.

## Bug Fixes

Expand Down
7 changes: 5 additions & 2 deletions benchmarks/benchmark_anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
from collections.abc import Coroutine
from typing import Any

from frequenz.channels import Anycast, Receiver, Sender
from frequenz.channels import Receiver, Sender
from frequenz.channels.anycast import Anycast


async def send_msg(num_messages: int, chan: Sender[int]) -> None:
Expand Down Expand Up @@ -41,7 +42,9 @@ async def benchmark_anycast(
Returns:
Total number of messages received by all channels.
"""
channels: list[Anycast[int]] = [Anycast(buffer_size) for _ in range(num_channels)]
channels: list[Anycast[int]] = [
Anycast(limit=buffer_size) for _ in range(num_channels)
]
senders = [
asyncio.create_task(send_msg(num_messages, bcast.new_sender()))
for bcast in channels
Expand Down
11 changes: 8 additions & 3 deletions benchmarks/benchmark_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from functools import partial
from typing import Any

from frequenz.channels import Broadcast, Receiver, Sender
from frequenz.channels import Receiver, Sender
from frequenz.channels.broadcast import Broadcast


async def component_sender(num_messages: int, chan: Sender[int]) -> None:
Expand Down Expand Up @@ -61,7 +62,9 @@ async def benchmark_broadcast(
Returns:
Total number of messages received by all receivers.
"""
channels: list[Broadcast[int]] = [Broadcast("meter") for _ in range(num_channels)]
channels: list[Broadcast[int]] = [
Broadcast(name="meter") for _ in range(num_channels)
]
senders: list[asyncio.Task[Any]] = [
asyncio.create_task(send_msg(num_messages, bcast.new_sender()))
for bcast in channels
Expand Down Expand Up @@ -104,7 +107,9 @@ async def benchmark_single_task_broadcast(
Returns:
Total number of messages received by all receivers.
"""
channels: list[Broadcast[int]] = [Broadcast("meter") for _ in range(num_channels)]
channels: list[Broadcast[int]] = [
Broadcast(name="meter") for _ in range(num_channels)
]
senders = [b.new_sender() for b in channels]
recv_tracker = 0

Expand Down
91 changes: 33 additions & 58 deletions src/frequenz/channels/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,80 +6,50 @@
This package contains
[channel](https://en.wikipedia.org/wiki/Channel_(programming)) implementations.

Channels:

* [Anycast][frequenz.channels.Anycast]: A channel that supports multiple
senders and multiple receivers. A message sent through a sender will be
received by exactly one receiver.

* [Bidirectional][frequenz.channels.Bidirectional]: A channel providing
a `client` and a `service` handle to send and receive bidirectionally.

* [Broadcast][frequenz.channels.Broadcast]: A channel to broadcast messages
from multiple senders to multiple receivers. Each message sent through any of
the senders is received by all of the receivers.

Other base classes:

* [Peekable][frequenz.channels.Peekable]: An object to allow users to get
a peek at the latest value in the channel, without consuming anything.

* [Receiver][frequenz.channels.Receiver]: An object that can wait for and
consume messages from a channel.
Main base classes and functions:

* [Sender][frequenz.channels.Sender]: An object that can send messages to
a channel.

Utilities:

* [util][frequenz.channels.util]: A module with utilities, like special
receivers that implement timers, file watchers, merge receivers, or wait for
messages in multiple channels.

Exception classes:

* [Error][frequenz.channels.Error]: Base class for all errors in this
library.

* [ChannelError][frequenz.channels.ChannelError]: Base class for all errors
related to channels.
* [Receiver][frequenz.channels.Receiver]: An object that can wait for and
consume messages from a channel.

* [ChannelClosedError][frequenz.channels.ChannelClosedError]: Error raised when
trying to operate (send, receive, etc.) through a closed channel.
* [selected()][frequenz.channels.select]: A function to wait on multiple
receivers at once.

* [SenderError][frequenz.channels.SenderError]: Base class for all errors
related to senders.
Channels:

* [ReceiverError][frequenz.channels.ReceiverError]: Base class for all errors
related to receivers.
* [Anycast][frequenz.channels.anycast.Anycast]: A channel that supports multiple
senders and multiple receivers. A message sent through a sender will be
received by exactly one receiver.

* [ReceiverStoppedError][frequenz.channels.ReceiverStoppedError]: A receiver
stopped producing messages.
* [Bidirectional][frequenz.channels.bidirectional.Bidirectional]: A channel providing
a `client` and a `service` handle to send and receive bidirectionally.

* [ReceiverInvalidatedError][frequenz.channels.ReceiverInvalidatedError]:
A receiver is not longer valid (for example if it was converted into
a peekable.
* [Broadcast][frequenz.channels.broadcast.Broadcast]: A channel to broadcast messages
from multiple senders to multiple receivers. Each message sent through any of
the senders is received by all of the receivers.
"""

from . import util
from ._anycast import Anycast
from ._base_classes import Peekable, Receiver, Sender
from ._bidirectional import Bidirectional
from ._broadcast import Broadcast
from ._exceptions import (
ChannelClosedError,
ChannelError,
Error,
from ._exceptions import ChannelClosedError, ChannelError, Error
from ._receiver import (
Peekable,
Receiver,
ReceiverError,
ReceiverInvalidatedError,
ReceiverStoppedError,
SenderError,
)
from ._select import (
Selected,
SelectError,
SelectErrorGroup,
UnhandledSelectedError,
select,
selected_from,
)
from ._sender import Sender, SenderError

__all__ = [
"Anycast",
"Bidirectional",
"Broadcast",
"ChannelClosedError",
"ChannelError",
"Error",
Expand All @@ -88,7 +58,12 @@
"ReceiverError",
"ReceiverInvalidatedError",
"ReceiverStoppedError",
"SelectError",
"SelectErrorGroup",
"Selected",
"Sender",
"SenderError",
"util",
"UnhandledSelectedError",
"select",
"selected_from",
]
Loading