diff --git a/.github/labeler.yml b/.github/labeler.yml index edfdd7a2..8923cb0e 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -6,7 +6,7 @@ # For more details on the configuration please see: # https://github.com/marketplace/actions/labeler -"part:docs": +"part:docs": - "**/*.md" - "docs/**" - "examples/**" @@ -31,14 +31,16 @@ - noxfile.py "part:channels": - - any: - - "src/frequenz/channels/**" - - "!src/frequenz/channels/util/**" + - "src/frequenz/channels/anycast.py" + - "src/frequenz/channels/bidirectional.py" + - "src/frequenz/channels/broadcast.py" "part:receivers": - - any: - - "src/frequenz/channels/util/**" - - "!src/frequenz/channels/util/_select.py" + - "src/frequenz/channels/event.py" + - "src/frequenz/channels/file_watcher.py" + - "src/frequenz/channels/merge.py" + - "src/frequenz/channels/merge_named.py" + - "src/frequenz/channels/timer.py" "part:select": - - "src/frequenz/channels/util/_select.py" + - "src/frequenz/channels/_select.py" diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 7678e1d8..5220af66 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,26 +6,88 @@ The `Timer` now can be started with a delay. ## Upgrading -* Internal variable names in the `Anycast` and `Broadcast` implementations are now private. +* `Anycast` + + - `__init__`: The `maxsize` argument was renamed to `limit` and made keyword-only and a new optional, keyword-only `name` argument was added. If a `name` is not specified, it will be generated from the `id()` of the instance. + + You should instantiate using `Anycast(name=..., limit=...)` (or `Anycast()` if the defaults are enough) instead of `Anycast(...)` or `Anycast(maxsize=...)`. + +* `Bidirectional` + + - The `client_id` and `service_id` arguments were merged into an optional, keyword-only `name`. If a `name` is not specified, it will be generated from the `id()` of the instance. + + You should instantiate using `Bidirectional(name=...)` (or `Bidirectional()` if the default name is enough) instead of `Bidirectional(..., ...)` or `Bidirectional(client_id=..., service_id=...)`. + + - `new_receiver`: The `maxsize` argument was renamed to `limit` and made keyword-only; the `name` argument was also made keyword-only. If a `name` is not specified, it will be generated from the `id()` of the instance instead of a random UUID. + + You should instantiate using `Broadcast(name=name, resend_latest=resend_latest)` (or `Broadcast()` if the defaults are enough) instead of `Broadcast(name)` or `Broadcast(name, resend_latest)`. + +* `Broadcast` + + - `__init__`: The `name` argument was made optional and keyword-only; `resend_latest` was also made keyword-only. If a `name` is not specified, it will be generated from the `id()` of the instance. + + You should instantiate using `Broadcast(name=name, resend_latest=resend_latest)` (or `Broadcast()` if the defaults are enough) instead of `Broadcast(name)` or `Broadcast(name, resend_latest)`. + +* `Event` + + - `__init__`: The `name` argument was made keyword-only. The default was changed to a more readable version of `id(self)`. + + You should instantiate using `Event(name=...)` instead of `Event(...)`. + +* All exceptions that took `Any` as the `message` argument now take `str` instead. + + If you were passing a non-`str` value to an exception, you should convert it using `str(value)` before passing it to the exception. ## New Features -* `Timer()`, `Timer.timeout()`, `Timer.periodic()` and `Timer.reset()` now take an optional `start_delay` option to make the timer start after some delay. +* `Anycast` + + - The following new read-only properties were added: + + - `name`: The name of the channel. + - `limit`: The maximum number of messages that can be sent to the channel. + - `is_closed`: Whether the channel is closed. + + - A more useful implementation of `__str__ and `__repr__` were added for the channel and its senders and receivers. + +* `Bidirectional` + + - The following new read-only properties were added: + + - `name`: The name of the channel (read-only). + - `is_closed`: Whether the channel is closed (read-only). + + - A more useful implementation of `__str__ and `__repr__` were added for the channel and the client and service handles. + +* `Broadcast` + + - The following new read-only properties were added: + + - `name`: The name of the channel. + - `latest`: The latest message sent to the channel. + - `is_closed`: Whether the channel is closed. + + - A more useful implementation of `__str__ and `__repr__` were added for the channel and the client and service handles. + +* `FileWatcher` + + - A more useful implementation of `__str__ and `__repr__` were added. + +* `Merge` + + - A more useful implementation of `__str__ and `__repr__` were added. + +* `MergeNamed` - This can be useful, for example, if the timer needs to be *aligned* to a particular time. The alternative to this would be to `sleep()` for the time needed to align the timer, but if the `sleep()` call gets delayed because the event loop is busy, then a re-alignment is needed and this could go on for a while. The only way to guarantee a certain alignment (with a reasonable precision) is to delay the timer start. + - A more useful implementation of `__str__ and `__repr__` were added. -* `Broadcast.resend_latest` is now a public attribute, allowing it to be changed after the channel is created. +* `Peekable` -* The arm64 architecture is now officially supported. + - A more useful implementation of `__str__ and `__repr__` were added. -* The documentation was improved to: +* `Receiver` - - Show signatures with types. - - Show the inherited members. - - Documentation for pre-releases are now published. - - Show the full tag name as the documentation version. - - All development branches now have their documentation published (there is no `next` version anymore). - - Fix the order of the documentation versions. + - `map()`: The returned map object now has a more useful implementation of `__str__ and `__repr__`. ## Bug Fixes diff --git a/benchmarks/benchmark_anycast.py b/benchmarks/benchmark_anycast.py index 0e60aba9..462e1681 100644 --- a/benchmarks/benchmark_anycast.py +++ b/benchmarks/benchmark_anycast.py @@ -9,7 +9,8 @@ from collections.abc import Coroutine from typing import Any -from frequenz.channels import Anycast, Receiver, Sender +from frequenz.channels import Receiver, Sender +from frequenz.channels.anycast import Anycast async def send_msg(num_messages: int, chan: Sender[int]) -> None: @@ -41,7 +42,9 @@ async def benchmark_anycast( Returns: Total number of messages received by all channels. """ - channels: list[Anycast[int]] = [Anycast(buffer_size) for _ in range(num_channels)] + channels: list[Anycast[int]] = [ + Anycast(limit=buffer_size) for _ in range(num_channels) + ] senders = [ asyncio.create_task(send_msg(num_messages, bcast.new_sender())) for bcast in channels diff --git a/benchmarks/benchmark_broadcast.py b/benchmarks/benchmark_broadcast.py index dcc49b99..b088ace4 100644 --- a/benchmarks/benchmark_broadcast.py +++ b/benchmarks/benchmark_broadcast.py @@ -10,7 +10,8 @@ from functools import partial from typing import Any -from frequenz.channels import Broadcast, Receiver, Sender +from frequenz.channels import Receiver, Sender +from frequenz.channels.broadcast import Broadcast async def component_sender(num_messages: int, chan: Sender[int]) -> None: @@ -61,7 +62,9 @@ async def benchmark_broadcast( Returns: Total number of messages received by all receivers. """ - channels: list[Broadcast[int]] = [Broadcast("meter") for _ in range(num_channels)] + channels: list[Broadcast[int]] = [ + Broadcast(name="meter") for _ in range(num_channels) + ] senders: list[asyncio.Task[Any]] = [ asyncio.create_task(send_msg(num_messages, bcast.new_sender())) for bcast in channels @@ -104,7 +107,9 @@ async def benchmark_single_task_broadcast( Returns: Total number of messages received by all receivers. """ - channels: list[Broadcast[int]] = [Broadcast("meter") for _ in range(num_channels)] + channels: list[Broadcast[int]] = [ + Broadcast(name="meter") for _ in range(num_channels) + ] senders = [b.new_sender() for b in channels] recv_tracker = 0 diff --git a/src/frequenz/channels/__init__.py b/src/frequenz/channels/__init__.py index 3489452b..e351c30d 100644 --- a/src/frequenz/channels/__init__.py +++ b/src/frequenz/channels/__init__.py @@ -6,80 +6,50 @@ This package contains [channel](https://en.wikipedia.org/wiki/Channel_(programming)) implementations. -Channels: - -* [Anycast][frequenz.channels.Anycast]: A channel that supports multiple - senders and multiple receivers. A message sent through a sender will be - received by exactly one receiver. - -* [Bidirectional][frequenz.channels.Bidirectional]: A channel providing - a `client` and a `service` handle to send and receive bidirectionally. - -* [Broadcast][frequenz.channels.Broadcast]: A channel to broadcast messages - from multiple senders to multiple receivers. Each message sent through any of - the senders is received by all of the receivers. - -Other base classes: - -* [Peekable][frequenz.channels.Peekable]: An object to allow users to get - a peek at the latest value in the channel, without consuming anything. - -* [Receiver][frequenz.channels.Receiver]: An object that can wait for and - consume messages from a channel. +Main base classes and functions: * [Sender][frequenz.channels.Sender]: An object that can send messages to a channel. -Utilities: - -* [util][frequenz.channels.util]: A module with utilities, like special - receivers that implement timers, file watchers, merge receivers, or wait for - messages in multiple channels. - -Exception classes: - -* [Error][frequenz.channels.Error]: Base class for all errors in this - library. - -* [ChannelError][frequenz.channels.ChannelError]: Base class for all errors - related to channels. +* [Receiver][frequenz.channels.Receiver]: An object that can wait for and + consume messages from a channel. -* [ChannelClosedError][frequenz.channels.ChannelClosedError]: Error raised when - trying to operate (send, receive, etc.) through a closed channel. +* [selected()][frequenz.channels.select]: A function to wait on multiple + receivers at once. -* [SenderError][frequenz.channels.SenderError]: Base class for all errors - related to senders. +Channels: -* [ReceiverError][frequenz.channels.ReceiverError]: Base class for all errors - related to receivers. +* [Anycast][frequenz.channels.anycast.Anycast]: A channel that supports multiple + senders and multiple receivers. A message sent through a sender will be + received by exactly one receiver. -* [ReceiverStoppedError][frequenz.channels.ReceiverStoppedError]: A receiver - stopped producing messages. +* [Bidirectional][frequenz.channels.bidirectional.Bidirectional]: A channel providing + a `client` and a `service` handle to send and receive bidirectionally. -* [ReceiverInvalidatedError][frequenz.channels.ReceiverInvalidatedError]: - A receiver is not longer valid (for example if it was converted into - a peekable. +* [Broadcast][frequenz.channels.broadcast.Broadcast]: A channel to broadcast messages + from multiple senders to multiple receivers. Each message sent through any of + the senders is received by all of the receivers. """ -from . import util -from ._anycast import Anycast -from ._base_classes import Peekable, Receiver, Sender -from ._bidirectional import Bidirectional -from ._broadcast import Broadcast -from ._exceptions import ( - ChannelClosedError, - ChannelError, - Error, +from ._exceptions import ChannelClosedError, ChannelError, Error +from ._receiver import ( + Peekable, + Receiver, ReceiverError, ReceiverInvalidatedError, ReceiverStoppedError, - SenderError, ) +from ._select import ( + Selected, + SelectError, + SelectErrorGroup, + UnhandledSelectedError, + select, + selected_from, +) +from ._sender import Sender, SenderError __all__ = [ - "Anycast", - "Bidirectional", - "Broadcast", "ChannelClosedError", "ChannelError", "Error", @@ -88,7 +58,12 @@ "ReceiverError", "ReceiverInvalidatedError", "ReceiverStoppedError", + "SelectError", + "SelectErrorGroup", + "Selected", "Sender", "SenderError", - "util", + "UnhandledSelectedError", + "select", + "selected_from", ] diff --git a/src/frequenz/channels/_bidirectional.py b/src/frequenz/channels/_bidirectional.py deleted file mode 100644 index a1bfc94f..00000000 --- a/src/frequenz/channels/_bidirectional.py +++ /dev/null @@ -1,163 +0,0 @@ -# License: MIT -# Copyright © 2022 Frequenz Energy-as-a-Service GmbH - -"""An abstraction to provide bi-directional communication between actors.""" - -from __future__ import annotations - -from typing import Generic, TypeVar - -from ._base_classes import Receiver, Sender, T, U -from ._broadcast import Broadcast -from ._exceptions import ChannelError, ReceiverError, SenderError - -V = TypeVar("V") -W = TypeVar("W") - - -class Bidirectional(Generic[T, U]): - """A wrapper class for simulating bidirectional channels.""" - - class Handle(Sender[V], Receiver[W]): - """A handle to a [Bidirectional][frequenz.channels.Bidirectional] instance. - - It can be used to send/receive values between the client and service. - """ - - def __init__( - self, - channel: Bidirectional[V, W] | Bidirectional[W, V], - sender: Sender[V], - receiver: Receiver[W], - ) -> None: - """Create a `Bidirectional.Handle` instance. - - Args: - channel: The underlying channel. - sender: A sender to send values with. - receiver: A receiver to receive values from. - """ - self._chan = channel - """The underlying channel.""" - - self._sender = sender - """The sender to send values with.""" - - self._receiver = receiver - """The receiver to receive values from.""" - - async def send(self, msg: V) -> None: - """Send a value to the other side. - - Args: - msg: The value to send. - - Raises: - SenderError: if the underlying channel was closed. - A [ChannelClosedError][frequenz.channels.ChannelClosedError] - is set as the cause. - """ - try: - await self._sender.send(msg) - except SenderError as err: - # If this comes from a channel error, then we inject another - # ChannelError having the information about the Bidirectional - # channel to hide (at least partially) the underlying - # Broadcast channels we use. - if isinstance(err.__cause__, ChannelError): - this_chan_error = ChannelError( - f"Error in the underlying channel {err.__cause__.channel}: {err.__cause__}", - self._chan, # pylint: disable=protected-access - ) - this_chan_error.__cause__ = err.__cause__ - err.__cause__ = this_chan_error - raise err - - async def ready(self) -> bool: - """Wait until the receiver is ready with a value or an error. - - Once a call to `ready()` has finished, the value should be read with - a call to `consume()` (`receive()` or iterated over). The receiver will - remain ready (this method will return immediately) until it is - consumed. - - Returns: - Whether the receiver is still active. - """ - return await self._receiver.ready() # pylint: disable=protected-access - - def consume(self) -> W: - """Return the latest value once `_ready` is complete. - - Returns: - The next value that was received. - - Raises: - ReceiverStoppedError: if there is some problem with the receiver. - ReceiverError: if there is some problem with the receiver. - """ - try: - return self._receiver.consume() # pylint: disable=protected-access - except ReceiverError as err: - # If this comes from a channel error, then we inject another - # ChannelError having the information about the Bidirectional - # channel to hide (at least partially) the underlying - # Broadcast channels we use. - if isinstance(err.__cause__, ChannelError): - this_chan_error = ChannelError( - f"Error in the underlying channel {err.__cause__.channel}: {err.__cause__}", - self._chan, # pylint: disable=protected-access - ) - this_chan_error.__cause__ = err.__cause__ - err.__cause__ = this_chan_error - raise err - - def __init__(self, client_id: str, service_id: str) -> None: - """Create a `Bidirectional` instance. - - Args: - client_id: A name for the client, used to name the channels. - service_id: A name for the service end of the channels. - """ - self._client_id = client_id - """The name for the client, used to name the channels.""" - - self._request_channel: Broadcast[T] = Broadcast(f"req_{service_id}_{client_id}") - """The channel to send requests.""" - - self._response_channel: Broadcast[U] = Broadcast( - f"resp_{service_id}_{client_id}" - ) - """The channel to send responses.""" - - self._client_handle = Bidirectional.Handle( - self, - self._request_channel.new_sender(), - self._response_channel.new_receiver(), - ) - """The handle for the client side to send/receive values.""" - - self._service_handle = Bidirectional.Handle( - self, - self._response_channel.new_sender(), - self._request_channel.new_receiver(), - ) - """The handle for the service side to send/receive values.""" - - @property - def client_handle(self) -> Bidirectional.Handle[T, U]: - """Get a `Handle` for the client side to use. - - Returns: - Object to send/receive messages with. - """ - return self._client_handle - - @property - def service_handle(self) -> Bidirectional.Handle[U, T]: - """Get a `Handle` for the service side to use. - - Returns: - Object to send/receive messages with. - """ - return self._service_handle diff --git a/src/frequenz/channels/_exceptions.py b/src/frequenz/channels/_exceptions.py index 2042003b..dbfa032f 100644 --- a/src/frequenz/channels/_exceptions.py +++ b/src/frequenz/channels/_exceptions.py @@ -3,14 +3,7 @@ """Exception classes.""" -from __future__ import annotations - -from typing import TYPE_CHECKING, Any, Generic, TypeVar - -if TYPE_CHECKING: - from . import _base_classes - -T = TypeVar("T") +from typing import Any class Error(RuntimeError): @@ -19,7 +12,7 @@ class Error(RuntimeError): All exceptions generated by this library inherit from this exception. """ - def __init__(self, message: Any): + def __init__(self, message: str): """Create a ChannelError instance. Args: @@ -34,7 +27,7 @@ class ChannelError(Error): All exceptions generated by channels inherit from this exception. """ - def __init__(self, message: Any, channel: Any): + def __init__(self, message: str, channel: Any): """Create a ChannelError instance. Args: @@ -56,63 +49,3 @@ def __init__(self, channel: Any): channel: A reference to the channel that was closed. """ super().__init__(f"Channel {channel} was closed", channel) - - -class SenderError(Error, Generic[T]): - """An error produced in a [Sender][frequenz.channels.Sender]. - - All exceptions generated by senders inherit from this exception. - """ - - def __init__(self, message: Any, sender: _base_classes.Sender[T]): - """Create an instance. - - Args: - message: An error message. - sender: The [Sender][frequenz.channels.Sender] where the error - happened. - """ - super().__init__(message) - self.sender: _base_classes.Sender[T] = sender - """The sender where the error happened.""" - - -class ReceiverError(Error, Generic[T]): - """An error produced in a [Receiver][frequenz.channels.Receiver]. - - All exceptions generated by receivers inherit from this exception. - """ - - def __init__(self, message: Any, receiver: _base_classes.Receiver[T]): - """Create an instance. - - Args: - message: An error message. - receiver: The [Receiver][frequenz.channels.Receiver] where the - error happened. - """ - super().__init__(message) - self.receiver: _base_classes.Receiver[T] = receiver - """The receiver where the error happened.""" - - -class ReceiverStoppedError(ReceiverError[T]): - """The [Receiver][frequenz.channels.Receiver] stopped producing messages.""" - - def __init__(self, receiver: _base_classes.Receiver[T]): - """Create an instance. - - Args: - receiver: The [Receiver][frequenz.channels.Receiver] where the - error happened. - """ - super().__init__(f"Receiver {receiver} was stopped", receiver) - - -class ReceiverInvalidatedError(ReceiverError[T]): - """The [Receiver][frequenz.channels.Receiver] was invalidated. - - This happens when the Receiver is converted - [into][frequenz.channels.Receiver.into_peekable] - a [Peekable][frequenz.channels.Peekable]. - """ diff --git a/src/frequenz/channels/_base_classes.py b/src/frequenz/channels/_receiver.py similarity index 67% rename from src/frequenz/channels/_base_classes.py rename to src/frequenz/channels/_receiver.py index 8745ce87..b25020db 100644 --- a/src/frequenz/channels/_base_classes.py +++ b/src/frequenz/channels/_receiver.py @@ -7,33 +7,36 @@ from abc import ABC, abstractmethod from collections.abc import Callable -from typing import Generic, TypeVar +from typing import Generic, Self, TypeVar -from ._exceptions import ReceiverStoppedError +from ._exceptions import Error -T = TypeVar("T") -U = TypeVar("U") +_T = TypeVar("_T") +_U = TypeVar("_U") -class Sender(ABC, Generic[T]): - """A channel Sender.""" +class Peekable(ABC, Generic[_T]): + """A channel peekable. - @abstractmethod - async def send(self, msg: T) -> None: - """Send a message to the channel. + A Peekable provides a [peek()][frequenz.channels.Peekable] method that + allows the user to get a peek at the latest value in the channel, without + consuming anything. + """ - Args: - msg: The message to be sent. + @abstractmethod + def peek(self) -> _T | None: + """Return the latest value that was sent to the channel. - Raises: - SenderError: if there was an error sending the message. + Returns: + The latest value received by the channel, and `None`, if nothing + has been sent to the channel yet. """ -class Receiver(ABC, Generic[T]): +class Receiver(ABC, Generic[_T]): """A channel Receiver.""" - async def __anext__(self) -> T: + async def __anext__(self) -> _T: """Await the next value in the async iteration over received values. Returns: @@ -63,7 +66,7 @@ async def ready(self) -> bool: """ @abstractmethod - def consume(self) -> T: + def consume(self) -> _T: """Return the latest value once `ready()` is complete. `ready()` must be called before each call to `consume()`. @@ -76,7 +79,7 @@ def consume(self) -> T: ReceiverError: if there is some problem with the receiver. """ - def __aiter__(self) -> Receiver[T]: + def __aiter__(self) -> Self: """Initialize the async iterator over received values. Returns: @@ -84,7 +87,7 @@ def __aiter__(self) -> Receiver[T]: """ return self - async def receive(self) -> T: + async def receive(self) -> _T: """Receive a message from the channel. Returns: @@ -111,7 +114,7 @@ async def receive(self) -> T: raise ReceiverStoppedError(self) from exc return received - def map(self, call: Callable[[T], U]) -> Receiver[U]: + def map(self, call: Callable[[_T], _U]) -> Receiver[_U]: """Return a receiver with `call` applied on incoming messages. Args: @@ -122,7 +125,7 @@ def map(self, call: Callable[[T], U]) -> Receiver[U]: """ return _Map(self, call) - def into_peekable(self) -> Peekable[T]: + def into_peekable(self) -> Peekable[_T]: """Convert the `Receiver` implementation into a `Peekable`. Once this function has been called, the receiver will no longer be @@ -139,25 +142,48 @@ def into_peekable(self) -> Peekable[T]: raise NotImplementedError("This receiver does not implement `into_peekable`") -class Peekable(ABC, Generic[T]): - """A channel peekable. +class ReceiverError(Error, Generic[_T]): + """An error produced in a [Receiver][frequenz.channels.Receiver]. - A Peekable provides a [peek()][frequenz.channels.Peekable] method that - allows the user to get a peek at the latest value in the channel, without - consuming anything. + All exceptions generated by receivers inherit from this exception. """ - @abstractmethod - def peek(self) -> T | None: - """Return the latest value that was sent to the channel. + def __init__(self, message: str, receiver: Receiver[_T]): + """Create an instance. - Returns: - The latest value received by the channel, and `None`, if nothing - has been sent to the channel yet. + Args: + message: An error message. + receiver: The [Receiver][frequenz.channels.Receiver] where the + error happened. """ + super().__init__(message) + self.receiver: Receiver[_T] = receiver + """The receiver where the error happened.""" -class _Map(Receiver[U], Generic[T, U]): +class ReceiverStoppedError(ReceiverError[_T]): + """The [Receiver][frequenz.channels.Receiver] stopped producing messages.""" + + def __init__(self, receiver: Receiver[_T]): + """Create an instance. + + Args: + receiver: The [Receiver][frequenz.channels.Receiver] where the + error happened. + """ + super().__init__(f"Receiver {receiver} was stopped", receiver) + + +class ReceiverInvalidatedError(ReceiverError[_T]): + """The [Receiver][frequenz.channels.Receiver] was invalidated. + + This happens when the Receiver is converted + [into][frequenz.channels.Receiver.into_peekable] + a [Peekable][frequenz.channels.Peekable]. + """ + + +class _Map(Receiver[_U], Generic[_T, _U]): """Apply a transform function on a channel receiver. Has two generic types: @@ -166,17 +192,17 @@ class _Map(Receiver[U], Generic[T, U]): - The output type: return type of the transform method. """ - def __init__(self, recv: Receiver[T], transform: Callable[[T], U]) -> None: + def __init__(self, receiver: Receiver[_T], transform: Callable[[_T], _U]) -> None: """Create a `Transform` instance. Args: - recv: The input receiver. + receiver: The input receiver. transform: The function to run on the input data. """ - self._recv = recv + self._receiver: Receiver[_T] = receiver """The input receiver.""" - self._transform = transform + self._transform: Callable[[_T], _U] = transform """The function to run on the input data.""" async def ready(self) -> bool: @@ -190,11 +216,11 @@ async def ready(self) -> bool: Returns: Whether the receiver is still active. """ - return await self._recv.ready() # pylint: disable=protected-access + return await self._receiver.ready() # pylint: disable=protected-access # We need a noqa here because the docs have a Raises section but the code doesn't # explicitly raise anything. - def consume(self) -> U: # noqa: DOC502 + def consume(self) -> _U: # noqa: DOC502 """Return a transformed value once `ready()` is complete. Returns: @@ -203,4 +229,14 @@ def consume(self) -> U: # noqa: DOC502 Raises: ChannelClosedError: if the underlying channel is closed. """ - return self._transform(self._recv.consume()) # pylint: disable=protected-access + return self._transform( + self._receiver.consume() + ) # pylint: disable=protected-access + + def __str__(self) -> str: + """Return a string representation of the timer.""" + return f"{type(self).__name__}:{self._receiver}:{self._transform}" + + def __repr__(self) -> str: + """Return a string representation of the timer.""" + return f"{type(self).__name__}({self._receiver!r}, {self._transform!r})" diff --git a/src/frequenz/channels/util/_select.py b/src/frequenz/channels/_select.py similarity index 90% rename from src/frequenz/channels/util/_select.py rename to src/frequenz/channels/_select.py index 43a0e357..00f594ce 100644 --- a/src/frequenz/channels/util/_select.py +++ b/src/frequenz/channels/_select.py @@ -12,24 +12,23 @@ from collections.abc import AsyncIterator from typing import Any, Generic, TypeGuard, TypeVar -from .._base_classes import Receiver -from .._exceptions import ReceiverStoppedError +from ._receiver import Receiver, ReceiverStoppedError _T = TypeVar("_T") class Selected(Generic[_T]): - """A result of a [`select()`][frequenz.channels.util.select] iteration. + """A result of a [`select()`][frequenz.channels.select] iteration. The selected receiver is consumed immediately and the received value is stored in the instance, unless there was an exception while receiving the value, in which case the exception is stored instead. `Selected` instances should be used in conjunction with the - [`selected_from()`][frequenz.channels.util.selected_from] function to determine + [`selected_from()`][frequenz.channels.selected_from] function to determine which receiver was selected. - Please see [`select()`][frequenz.channels.util.select] for an example. + Please see [`select()`][frequenz.channels.select] for an example. """ class _EmptyResult: @@ -46,9 +45,9 @@ def __init__(self, receiver: Receiver[_T]) -> None: The receiver is consumed immediately when creating the instance and the received value is stored in the instance for later use as - [`value`][frequenz.channels.util.Selected.value]. If there was an exception + [`value`][frequenz.channels.Selected.value]. If there was an exception while receiving the value, then the exception is stored in the instance instead - (as [`exception`][frequenz.channels.util.Selected.exception]). + (as [`exception`][frequenz.channels.Selected.exception]). Args: receiver: The receiver that was selected. @@ -140,16 +139,16 @@ def __repr__(self) -> str: def selected_from( selected: Selected[Any], receiver: Receiver[_T] ) -> TypeGuard[Selected[_T]]: - """Check if the given receiver was selected by [`select()`][frequenz.channels.util.select]. + """Check if the given receiver was selected by [`select()`][frequenz.channels.select]. This function is used in conjunction with the - [`Selected`][frequenz.channels.util.Selected] class to determine which receiver was + [`Selected`][frequenz.channels.Selected] class to determine which receiver was selected in `select()` iteration. It also works as a [type guard][typing.TypeGuard] to narrow the type of the `Selected` instance to the type of the receiver. - Please see [`select()`][frequenz.channels.util.select] for an example. + Please see [`select()`][frequenz.channels.select] for an example. Args: selected: The result of a `select()` iteration. @@ -164,21 +163,21 @@ def selected_from( class SelectError(BaseException): - """A base exception for [`select()`][frequenz.channels.util.select]. + """A base exception for [`select()`][frequenz.channels.select]. This exception is raised when a `select()` iteration fails. It is raised as a single exception when one receiver fails during normal operation (while calling `ready()` for example). It is raised as a group exception - ([`SelectErrorGroup`][frequenz.channels.util.SelectErrorGroup]) when a `select` loop + ([`SelectErrorGroup`][frequenz.channels.SelectErrorGroup]) when a `select` loop is cleaning up after it's done. """ class UnhandledSelectedError(SelectError, Generic[_T]): - """A receiver was not handled in a [`select()`][frequenz.channels.util.select] loop. + """A receiver was not handled in a [`select()`][frequenz.channels.select] loop. This exception is raised when a `select()` iteration finishes without a call to - [`selected_from()`][frequenz.channels.util.selected_from] for the selected receiver. + [`selected_from()`][frequenz.channels.selected_from] for the selected receiver. """ def __init__(self, selected: Selected[_T]) -> None: @@ -189,12 +188,12 @@ def __init__(self, selected: Selected[_T]) -> None: """ recv = selected._recv # pylint: disable=protected-access super().__init__(f"Selected receiver {recv} was not handled in the if-chain") - self.selected = selected + self.selected: Selected[_T] = selected """The selected receiver that was not handled.""" class SelectErrorGroup(BaseExceptionGroup[BaseException], SelectError): - """An exception group for [`select()`][frequenz.channels.util.select] operation. + """An exception group for [`select()`][frequenz.channels.select] operation. This exception group is raised when a `select()` loops fails while cleaning up running tests to check for ready receivers. @@ -243,8 +242,8 @@ async def select(*receivers: Receiver[Any]) -> AsyncIterator[Selected[Any]]: This function is used to iterate over the values of all receivers as they receive new values. It is used in conjunction with the - [`Selected`][frequenz.channels.util.Selected] class and the - [`selected_from()`][frequenz.channels.util.selected_from] function to determine + [`Selected`][frequenz.channels.Selected] class and the + [`selected_from()`][frequenz.channels.selected_from] function to determine which function to determine which receiver was selected in a select operation. An exhaustiveness check is performed at runtime to make sure all selected receivers @@ -258,10 +257,10 @@ async def select(*receivers: Receiver[Any]) -> AsyncIterator[Selected[Any]]: receivers from a select loop, there are a few alternatives. Depending on your use case, one or the other could work better for you: - * Use [`Merge`][frequenz.channels.util.Merge] or - [`MergeNamed`][frequenz.channels.util.MergeNamed]: this is useful when you - have and unknown number of receivers of the same type that can be handled as - a group. + * Use [`Merge`][frequenz.channels.merge.Merge] or + [`MergeNamed`][frequenz.channels.merge_named.MergeNamed]: this is useful when + you have and unknown number of receivers of the same type that can be handled + as a group. * Use tasks to manage each receiver individually: this is better if there are no relationships between the receivers. * Break the `select()` loop and start a new one with the new set of receivers @@ -273,8 +272,8 @@ async def select(*receivers: Receiver[Any]) -> AsyncIterator[Selected[Any]]: import datetime from typing import assert_never - from frequenz.channels import ReceiverStoppedError - from frequenz.channels.util import select, selected_from, Timer + from frequenz.channels import ReceiverStoppedError, select, selected_from + from frequenz.channels.timer import Timer timer1 = Timer.periodic(datetime.timedelta(seconds=1)) timer2 = Timer.timeout(datetime.timedelta(seconds=0.5)) diff --git a/src/frequenz/channels/_sender.py b/src/frequenz/channels/_sender.py new file mode 100644 index 00000000..a6d61453 --- /dev/null +++ b/src/frequenz/channels/_sender.py @@ -0,0 +1,45 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""Base classes for Channel Sender and Receiver.""" + +from abc import ABC, abstractmethod +from typing import Generic, TypeVar + +from ._exceptions import Error + +_T = TypeVar("_T") + + +class Sender(ABC, Generic[_T]): + """A channel Sender.""" + + @abstractmethod + async def send(self, msg: _T) -> None: + """Send a message to the channel. + + Args: + msg: The message to be sent. + + Raises: + SenderError: if there was an error sending the message. + """ + + +class SenderError(Error, Generic[_T]): + """An error produced in a [Sender][frequenz.channels.Sender]. + + All exceptions generated by senders inherit from this exception. + """ + + def __init__(self, message: str, sender: Sender[_T]): + """Create an instance. + + Args: + message: An error message. + sender: The [Sender][frequenz.channels.Sender] where the error + happened. + """ + super().__init__(message) + self.sender: Sender[_T] = sender + """The sender where the error happened.""" diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/anycast.py similarity index 65% rename from src/frequenz/channels/_anycast.py rename to src/frequenz/channels/anycast.py index 40cf69db..c69cae8b 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/anycast.py @@ -3,34 +3,39 @@ """A channel for sending data across async tasks.""" -from __future__ import annotations - from asyncio import Condition from collections import deque -from typing import Deque, Generic +from typing import Generic, TypeVar + +from ._exceptions import ChannelClosedError +from ._receiver import Receiver, ReceiverStoppedError +from ._sender import Sender, SenderError -from ._base_classes import Receiver as BaseReceiver -from ._base_classes import Sender as BaseSender -from ._base_classes import T -from ._exceptions import ChannelClosedError, ReceiverStoppedError, SenderError +_T = TypeVar("_T") -class Anycast(Generic[T]): +class Anycast(Generic[_T]): """A channel for sending data across async tasks. Anycast channels support multiple senders and multiple receivers. A message sent through a sender will be received by exactly one receiver. + This channel is buffered, and if the senders are faster than the receivers, then the + channel's buffer will fill up. In that case, the senders will block at the + [send()][frequenz.channels.Sender.send] method until the receivers consume the + messages in the channel's buffer. The channel's buffer size can be configured at + creation time via the `limit` argument. + In cases where each message need to be received by every receiver, a - [Broadcast][frequenz.channels.Broadcast] channel may be used. + [Broadcast][frequenz.channels.broadcast.Broadcast] channel may be used. Uses an [deque][collections.deque] internally, so Anycast channels are not thread-safe. When there are multiple channel receivers, they can be awaited - simultaneously using [select][frequenz.channels.util.select], - [Merge][frequenz.channels.util.Merge] or - [MergeNamed][frequenz.channels.util.MergeNamed]. + simultaneously using [select][frequenz.channels.select], + [Merge][frequenz.channels.merge.Merge] or + [MergeNamed][frequenz.channels.merge_named.MergeNamed]. Example: ``` python @@ -61,21 +66,25 @@ async def recv(id: int, receiver: channel.Receiver) -> None: Check the `tests` and `benchmarks` directories for more examples. """ - def __init__(self, maxsize: int = 10) -> None: + def __init__(self, *, name: str | None = None, limit: int = 10) -> None: """Create an Anycast channel. Args: - maxsize: Size of the channel's buffer. + name: The name of the channel. If `None`, an `id(self)`-based name will be + used. This is only for debugging purposes, it will be shown in the + string representation of the channel. + limit: The size of the internal buffer in number of messages. If the buffer + is full, then the senders will block until the receivers consume the + messages in the buffer. """ - self._limit: int = maxsize - """The maximum number of values that can be stored in the channel's buffer. + self._name: str = f"{id(self):x}" if name is None else name + """The name of the channel. - If the length of channel's buffer reaches the limit, then the sender - blocks at the [send()][frequenz.channels.Sender.send] method until - a value is consumed. + This is for debugging purposes, it will be shown in the string representation + of the channel. """ - self._deque: Deque[T] = deque(maxlen=maxsize) + self._deque: deque[_T] = deque(maxlen=limit) """The channel's buffer.""" self._send_cv: Condition = Condition() @@ -97,6 +106,36 @@ def __init__(self, maxsize: int = 10) -> None: self._closed: bool = False """Whether the channel is closed.""" + @property + def name(self) -> str: + """The name of this channel. + + This is for debugging purposes, it will be shown in the string representation + of this channel. + """ + return self._name + + @property + def is_closed(self) -> bool: + """Whether this channel is closed. + + Any further attempts to use this channel after it is closed will result in an + exception. + """ + return self._closed + + @property + def limit(self) -> int: + """The maximum number of values that can be stored in the channel's buffer. + + If the length of channel's buffer reaches the limit, then the sender + blocks at the [send()][frequenz.channels.Sender.send] method until + a value is consumed. + """ + maxlen = self._deque.maxlen + assert maxlen is not None + return maxlen + async def close(self) -> None: """Close the channel. @@ -115,40 +154,51 @@ async def close(self) -> None: async with self._recv_cv: self._recv_cv.notify_all() - def new_sender(self) -> Sender[T]: + def new_sender(self) -> Sender[_T]: """Create a new sender. Returns: A Sender instance attached to the Anycast channel. """ - return Sender(self) + return _Sender(self) - def new_receiver(self) -> Receiver[T]: + def new_receiver(self) -> Receiver[_T]: """Create a new receiver. Returns: A Receiver instance attached to the Anycast channel. """ - return Receiver(self) + return _Receiver(self) + def __str__(self) -> str: + """Return a string representation of this channel.""" + return f"{type(self).__name__}:{self._name}" -class Sender(BaseSender[T]): + def __repr__(self) -> str: + """Return a string representation of this channel.""" + return ( + f"{type(self).__name__}(name={self._name!r}, limit={self.limit!r}):<" + f"current={len(self._deque)!r}, closed={self._closed!r}>" + ) + + +class _Sender(Sender[_T]): """A sender to send messages to an Anycast channel. Should not be created directly, but through the `Anycast.new_sender()` method. """ - def __init__(self, chan: Anycast[T]) -> None: + def __init__(self, chan: Anycast[_T]) -> None: """Create a channel sender. Args: chan: A reference to the channel that this sender belongs to. """ - self._chan = chan + self._chan: Anycast[_T] = chan """The channel that this sender belongs to.""" - async def send(self, msg: T) -> None: + async def send(self, msg: _T) -> None: """Send a message across the channel. To send, this method inserts the message into the Anycast channel's @@ -177,28 +227,36 @@ async def send(self, msg: T) -> None: self._chan._recv_cv.notify(1) # pylint: enable=protected-access + def __str__(self) -> str: + """Return a string representation of this sender.""" + return f"{self._chan}:{type(self).__name__}" + + def __repr__(self) -> str: + """Return a string representation of this sender.""" + return f"{type(self).__name__}({self._chan!r})" + class _Empty: """A sentinel value to indicate that a value has not been set.""" -class Receiver(BaseReceiver[T]): +class _Receiver(Receiver[_T]): """A receiver to receive messages from an Anycast channel. Should not be created directly, but through the `Anycast.new_receiver()` method. """ - def __init__(self, chan: Anycast[T]) -> None: + def __init__(self, chan: Anycast[_T]) -> None: """Create a channel receiver. Args: chan: A reference to the channel that this receiver belongs to. """ - self._chan = chan + self._chan: Anycast[_T] = chan """The channel that this receiver belongs to.""" - self._next: T | type[_Empty] = _Empty + self._next: _T | type[_Empty] = _Empty async def ready(self) -> bool: """Wait until the receiver is ready with a value or an error. @@ -227,7 +285,7 @@ async def ready(self) -> bool: # pylint: enable=protected-access return True - def consume(self) -> T: + def consume(self) -> _T: """Return the latest value once `ready()` is complete. Returns: @@ -247,7 +305,15 @@ def consume(self) -> T: ), "`consume()` must be preceded by a call to `ready()`" # mypy doesn't understand that the assert above ensures that self._next is not # _Sentinel. So we have to use a type ignore here. - next_val: T = self._next # type: ignore[assignment] + next_val: _T = self._next # type: ignore[assignment] self._next = _Empty return next_val + + def __str__(self) -> str: + """Return a string representation of this receiver.""" + return f"{self._chan}:{type(self).__name__}" + + def __repr__(self) -> str: + """Return a string representation of this receiver.""" + return f"{type(self).__name__}({self._chan!r})" diff --git a/src/frequenz/channels/bidirectional.py b/src/frequenz/channels/bidirectional.py new file mode 100644 index 00000000..154b3cc6 --- /dev/null +++ b/src/frequenz/channels/bidirectional.py @@ -0,0 +1,209 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""An abstraction to provide bi-directional communication between actors.""" + +from __future__ import annotations + +from typing import Generic, TypeVar + +from ._exceptions import ChannelError +from ._receiver import Receiver, ReceiverError +from ._sender import Sender, SenderError +from .broadcast import Broadcast + +_T = TypeVar("_T") +_U = TypeVar("_U") +_V = TypeVar("_V") +_W = TypeVar("_W") + + +class Bidirectional(Generic[_T, _U]): + """A wrapper class for simulating bidirectional channels.""" + + def __init__(self, *, name: str | None = None) -> None: + """Create a `Bidirectional` instance. + + Args: + name: A name for the client, used to name the channels. + """ + self._name: str = f"{id(self):_}" if name is None else name + """The name for the client, used to name the channels.""" + + self._request_channel: Broadcast[_T] = Broadcast(name=f"{self._name}:request") + """The channel to send requests.""" + + self._response_channel: Broadcast[_U] = Broadcast(name=f"{self._name}:response") + """The channel to send responses.""" + + self._client_handle: Handle[_T, _U] = Handle( + self, + self._request_channel.new_sender(), + self._response_channel.new_receiver(), + ) + """The handle for the client side to send/receive values.""" + + self._service_handle: Handle[_U, _T] = Handle( + self, + self._response_channel.new_sender(), + self._request_channel.new_receiver(), + ) + """The handle for the service side to send/receive values.""" + + @property + def name(self) -> str: + """The name of this channel. + + This is for debugging purposes, it will be shown in the string representation + of this channel. + """ + return self._name + + @property + def is_closed(self) -> bool: + """Whether this channel is closed. + + Any further attempts to use this channel after it is closed will result in an + exception. + + As long as there is a way to send or receive data, the channel is considered + open, even if the other side is closed, so this returns `False` if only both + underlying channels are closed. + """ + return self._request_channel.is_closed and self._response_channel.is_closed + + @property + def client_handle(self) -> Handle[_T, _U]: + """Get a `Handle` for the client side to use. + + Returns: + Object to send/receive messages with. + """ + return self._client_handle + + @property + def service_handle(self) -> Handle[_U, _T]: + """Get a `Handle` for the service side to use. + + Returns: + Object to send/receive messages with. + """ + return self._service_handle + + def __str__(self) -> str: + """Return a string representation of this channel.""" + return f"{type(self).__name__}:{self._name}" + + def __repr__(self) -> str: + """Return a string representation of this channel.""" + return ( + f"{type(self).__name__}(name={self._name!r}):<" + f"request_channel={self._request_channel!r}, " + f"response_channel={self._response_channel!r}>" + ) + + +class Handle(Sender[_V], Receiver[_W]): + """A handle to a [Bidirectional][frequenz.channels.bidirectional.Bidirectional] instance. + + It can be used to send/receive values between the client and service. + """ + + def __init__( + self, + channel: Bidirectional[_V, _W] | Bidirectional[_W, _V], + sender: Sender[_V], + receiver: Receiver[_W], + ) -> None: + """Create a `Handle` instance. + + Args: + channel: The underlying channel. + sender: A sender to send values with. + receiver: A receiver to receive values from. + """ + self._chan: Bidirectional[_V, _W] | Bidirectional[_W, _V] = channel + """The underlying channel.""" + + self._sender: Sender[_V] = sender + """The sender to send values with.""" + + self._receiver: Receiver[_W] = receiver + """The receiver to receive values from.""" + + async def send(self, msg: _V) -> None: + """Send a value to the other side. + + Args: + msg: The value to send. + + Raises: + SenderError: if the underlying channel was closed. + A [ChannelClosedError][frequenz.channels.ChannelClosedError] + is set as the cause. + """ + try: + await self._sender.send(msg) + except SenderError as err: + # If this comes from a channel error, then we inject another + # ChannelError having the information about the Bidirectional + # channel to hide (at least partially) the underlying + # Broadcast channels we use. + if isinstance(err.__cause__, ChannelError): + this_chan_error = ChannelError( + f"Error in the underlying channel {err.__cause__.channel}: {err.__cause__}", + self._chan, # pylint: disable=protected-access + ) + this_chan_error.__cause__ = err.__cause__ + err.__cause__ = this_chan_error + raise err + + async def ready(self) -> bool: + """Wait until the receiver is ready with a value or an error. + + Once a call to `ready()` has finished, the value should be read with + a call to `consume()` (`receive()` or iterated over). The receiver will + remain ready (this method will return immediately) until it is + consumed. + + Returns: + Whether the receiver is still active. + """ + return await self._receiver.ready() # pylint: disable=protected-access + + def consume(self) -> _W: + """Return the latest value once `_ready` is complete. + + Returns: + The next value that was received. + + Raises: + ReceiverStoppedError: if there is some problem with the receiver. + ReceiverError: if there is some problem with the receiver. + """ + try: + return self._receiver.consume() # pylint: disable=protected-access + except ReceiverError as err: + # If this comes from a channel error, then we inject another + # ChannelError having the information about the Bidirectional + # channel to hide (at least partially) the underlying + # Broadcast channels we use. + if isinstance(err.__cause__, ChannelError): + this_chan_error = ChannelError( + f"Error in the underlying channel {err.__cause__.channel}: {err.__cause__}", + self._chan, # pylint: disable=protected-access + ) + this_chan_error.__cause__ = err.__cause__ + err.__cause__ = this_chan_error + raise err + + def __str__(self) -> str: + """Return a string representation of this handle.""" + return f"{type(self).__name__}:{self._chan}" + + def __repr__(self) -> str: + """Return a string representation of this handle.""" + return ( + f"{type(self).__name__}(channel={self._chan!r}, " + f"sender={self._sender!r}, receiver={self._receiver!r})" + ) diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/broadcast.py similarity index 68% rename from src/frequenz/channels/_broadcast.py rename to src/frequenz/channels/broadcast.py index 5fe94652..c37157fd 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/broadcast.py @@ -3,30 +3,28 @@ """A channel to broadcast messages to all receivers.""" -from __future__ import annotations - import logging import weakref from asyncio import Condition from collections import deque -from typing import Deque, Generic -from uuid import UUID, uuid4 - -from ._base_classes import Peekable as BasePeekable -from ._base_classes import Receiver as BaseReceiver -from ._base_classes import Sender as BaseSender -from ._base_classes import T -from ._exceptions import ( - ChannelClosedError, +from typing import Generic, TypeVar + +from ._exceptions import ChannelClosedError +from ._receiver import ( + Peekable, + Receiver, ReceiverInvalidatedError, ReceiverStoppedError, - SenderError, ) +from ._sender import Sender, SenderError logger = logging.Logger(__name__) -class Broadcast(Generic[T]): +_T = TypeVar("_T") + + +class Broadcast(Generic[_T]): """A channel to broadcast messages to multiple receivers. `Broadcast` channels can have multiple senders and multiple receivers. Each @@ -38,9 +36,9 @@ class Broadcast(Generic[T]): are thread-safe. Because of this, `Broadcast` channels are thread-safe. When there are multiple channel receivers, they can be awaited - simultaneously using [select][frequenz.channels.util.select], - [Merge][frequenz.channels.util.Merge] or - [MergeNamed][frequenz.channels.util.MergeNamed]. + simultaneously using [select][frequenz.channels.select], + [Merge][frequenz.channels.merge.Merge] or + [MergeNamed][frequenz.channels.merge_named.MergeNamed]. Example: ``` python @@ -71,12 +69,13 @@ async def recv(id: int, receiver: channel.Receiver) -> None: Check the `tests` and `benchmarks` directories for more examples. """ - def __init__(self, name: str, resend_latest: bool = False) -> None: + def __init__(self, *, name: str | None, resend_latest: bool = False) -> None: """Create a Broadcast channel. Args: - name: A name for the broadcast channel, typically based on the type of data - sent through it. Used to identify the channel in the logs. + name: The name of the channel. If `None`, an `id(self)`-based name will be + used. This is only for debugging purposes, it will be shown in the + string representation of the channel. resend_latest: When True, every time a new receiver is created with `new_receiver`, it will automatically get sent the latest value on the channel. This allows new receivers on slow streams to get the latest @@ -85,17 +84,27 @@ def __init__(self, name: str, resend_latest: bool = False) -> None: data/reporting channels, but is not recommended for use in channels that stream control instructions. """ - self._name: str = name + self._name: str = f"{id(self):_}" if name is None else name """The name of the broadcast channel. Only used for debugging purposes. """ + self._recv_cv: Condition = Condition() + """The condition to wait for data in the channel's buffer.""" + + self._receivers: dict[int, weakref.ReferenceType[_Receiver[_T]]] = {} + """The receivers attached to the channel, indexed by their hash().""" + + self._closed: bool = False + """Whether the channel is closed.""" + + self._latest: _T | None = None + """The latest value sent to the channel.""" + self.resend_latest: bool = resend_latest """Whether to resend the latest value to new receivers. - It is `False` by default. - When `True`, every time a new receiver is created with `new_receiver`, it will automatically get sent the latest value on the channel. This allows new receivers on slow streams to get the latest value as soon as they are created, @@ -105,17 +114,32 @@ def __init__(self, name: str, resend_latest: bool = False) -> None: in channels that stream control instructions. """ - self._recv_cv: Condition = Condition() - """The condition to wait for data in the channel's buffer.""" + @property + def name(self) -> str: + """The name of this channel. - self._receivers: dict[UUID, weakref.ReferenceType[Receiver[T]]] = {} - """The receivers attached to the channel, indexed by their UUID.""" + This is for debugging purposes, it will be shown in the string representation + of this channel. + """ + return self._name - self._closed: bool = False - """Whether the channel is closed.""" + @property + def latest(self) -> _T | None: + """The latest value sent to the channel. - self._latest: T | None = None - """The latest value sent to the channel.""" + Returns: + The latest value sent to the channel, or `None` if no value was sent yet. + """ + return self._latest + + @property + def is_closed(self) -> bool: + """Whether this channel is closed. + + Any further attempts to use this channel after it is closed will result in an + exception. + """ + return self._closed async def close(self) -> None: """Close the Broadcast channel. @@ -133,15 +157,15 @@ async def close(self) -> None: async with self._recv_cv: self._recv_cv.notify_all() - def new_sender(self) -> Sender[T]: + def new_sender(self) -> Sender[_T]: """Create a new broadcast sender. Returns: A Sender instance attached to the broadcast channel. """ - return Sender(self) + return _Sender(self) - def new_receiver(self, name: str | None = None, maxsize: int = 50) -> Receiver[T]: + def new_receiver(self, *, name: str | None = None, limit: int = 50) -> Receiver[_T]: """Create a new broadcast receiver. Broadcast receivers have their own buffer, and when messages are not @@ -150,21 +174,18 @@ def new_receiver(self, name: str | None = None, maxsize: int = 50) -> Receiver[T Args: name: A name to identify the receiver in the logs. - maxsize: Size of the receiver's buffer. + limit: Size of the receiver's buffer in number of messages. Returns: A Receiver instance attached to the broadcast channel. """ - uuid = uuid4() - if name is None: - name = str(uuid) - recv: Receiver[T] = Receiver(uuid, name, maxsize, self) - self._receivers[uuid] = weakref.ref(recv) + recv: _Receiver[_T] = _Receiver(name, limit, self) + self._receivers[hash(recv)] = weakref.ref(recv) if self.resend_latest and self._latest is not None: recv.enqueue(self._latest) return recv - def new_peekable(self) -> Peekable[T]: + def new_peekable(self) -> Peekable[_T]: """Create a new Peekable for the broadcast channel. A Peekable provides a [peek()][frequenz.channels.Peekable.peek] method @@ -174,10 +195,24 @@ def new_peekable(self) -> Peekable[T]: Returns: A Peekable to peek into the broadcast channel with. """ - return Peekable(self) + return _Peekable(self) + + def __str__(self) -> str: + """Return a string representation of this receiver.""" + return f"{type(self).__name__}:{self._name}" + + def __repr__(self) -> str: + """Return a string representation of this channel.""" + return ( + f"{type(self).__name__}(name={self._name!r}, " + f"resend_latest={self.resend_latest!r}):<" + f"latest={self._latest!r}, " + f"receivers={len(self._receivers)!r}, " + f"closed={self._closed!r}>" + ) -class Sender(BaseSender[T]): +class _Sender(Sender[_T]): """A sender to send messages to the broadcast channel. Should not be created directly, but through the @@ -185,16 +220,16 @@ class Sender(BaseSender[T]): method. """ - def __init__(self, chan: Broadcast[T]) -> None: + def __init__(self, chan: Broadcast[_T]) -> None: """Create a Broadcast sender. Args: chan: A reference to the broadcast channel this sender belongs to. """ - self._chan = chan + self._chan: Broadcast[_T] = chan """The broadcast channel this sender belongs to.""" - async def send(self, msg: T) -> None: + async def send(self, msg: _T) -> None: """Send a message to all broadcast receivers. Args: @@ -212,20 +247,28 @@ async def send(self, msg: T) -> None: ) self._chan._latest = msg stale_refs = [] - for name, recv_ref in self._chan._receivers.items(): + for _hash, recv_ref in self._chan._receivers.items(): recv = recv_ref() if recv is None: - stale_refs.append(name) + stale_refs.append(_hash) continue recv.enqueue(msg) - for name in stale_refs: - del self._chan._receivers[name] + for _hash in stale_refs: + del self._chan._receivers[_hash] async with self._chan._recv_cv: self._chan._recv_cv.notify_all() # pylint: enable=protected-access + def __str__(self) -> str: + """Return a string representation of this sender.""" + return f"{self._chan}:{type(self).__name__}" -class Receiver(BaseReceiver[T]): + def __repr__(self) -> str: + """Return a string representation of this sender.""" + return f"{type(self).__name__}({self._chan!r})" + + +class _Receiver(Receiver[_T]): """A receiver to receive messages from the broadcast channel. Should not be created directly, but through the @@ -233,7 +276,7 @@ class Receiver(BaseReceiver[T]): method. """ - def __init__(self, uuid: UUID, name: str, maxsize: int, chan: Broadcast[T]) -> None: + def __init__(self, name: str | None, limit: int, chan: Broadcast[_T]) -> None: """Create a broadcast receiver. Broadcast receivers have their own buffer, and when messages are not @@ -241,36 +284,34 @@ def __init__(self, uuid: UUID, name: str, maxsize: int, chan: Broadcast[T]) -> N get dropped just in this receiver. Args: - uuid: A uuid to identify the receiver in the broadcast channel's - list of receivers. - name: A name to identify the receiver in the logs. - maxsize: Size of the receiver's buffer. + name: A name to identify the receiver in the logs. If `None` an + `id(self)`-based name will be used. This is only for debugging + purposes, it will be shown in the string representation of the + receiver. + limit: Size of the receiver's buffer in number of messages. chan: a reference to the Broadcast channel that this receiver belongs to. """ - self._uuid = uuid - """The UUID to identify the receiver in the broadcast channel's list of receivers.""" - - self._name = name + self._name: str = name if name is not None else f"{id(self):_}" """The name to identify the receiver. Only used for debugging purposes. """ - self._chan = chan + self._chan: Broadcast[_T] = chan """The broadcast channel that this receiver belongs to.""" - self._q: Deque[T] = deque(maxlen=maxsize) + self._q: deque[_T] = deque(maxlen=limit) """The receiver's internal message queue.""" - self._active = True + self._active: bool = True """Whether the receiver is still active. If this receiver is converted into a Peekable, it will neither be considered valid nor active. """ - def enqueue(self, msg: T) -> None: + def enqueue(self, msg: _T) -> None: """Put a message into this receiver's queue. To be called by broadcast senders. If the receiver's queue is already @@ -283,9 +324,8 @@ def enqueue(self, msg: T) -> None: if len(self._q) == self._q.maxlen: self._q.popleft() logger.warning( - "Broadcast receiver [%s:%s] is full. Oldest message was dropped.", - self._chan._name, # pylint: disable=protected-access - self._name, + "Broadcast receiver [%s] is full. Oldest message was dropped.", + self, ) self._q.append(msg) @@ -333,11 +373,12 @@ def _deactivate(self) -> None: """Set the receiver as inactive and remove it from the channel.""" self._active = False # pylint: disable=protected-access - if self._uuid in self._chan._receivers: - del self._chan._receivers[self._uuid] + _hash = hash(self) + if _hash in self._chan._receivers: + del self._chan._receivers[_hash] # pylint: enable=protected-access - def consume(self) -> T: + def consume(self) -> _T: """Return the latest value once `ready` is complete. Returns: @@ -360,7 +401,7 @@ def consume(self) -> T: assert self._q, "`consume()` must be preceded by a call to `ready()`" return self._q.popleft() - def into_peekable(self) -> Peekable[T]: + def into_peekable(self) -> Peekable[_T]: """Convert the `Receiver` implementation into a `Peekable`. Once this function has been called, the receiver will no longer be @@ -371,10 +412,24 @@ def into_peekable(self) -> Peekable[T]: A `Peekable` instance. """ self._deactivate() - return Peekable(self._chan) + return _Peekable(self._chan) + def __str__(self) -> str: + """Return a string representation of this receiver.""" + return f"{self._chan}:{type(self).__name__}" -class Peekable(BasePeekable[T]): + def __repr__(self) -> str: + """Return a string representation of this receiver.""" + limit = self._q.maxlen + assert limit is not None + return ( + f"{type(self).__name__}(name={self._name!r}, limit={limit!r}, " + f"{self._chan!r}):" + ) + + +class _Peekable(Peekable[_T]): """A Peekable to peek into broadcast channels. A Peekable provides a [peek()][frequenz.channels.Peekable] method that @@ -382,16 +437,16 @@ class Peekable(BasePeekable[T]): consuming anything. """ - def __init__(self, chan: Broadcast[T]) -> None: + def __init__(self, chan: Broadcast[_T]) -> None: """Create a `Peekable` instance. Args: chan: The broadcast channel this Peekable will try to peek into. """ - self._chan = chan + self._chan: Broadcast[_T] = chan """The broadcast channel this Peekable will try to peek into.""" - def peek(self) -> T | None: + def peek(self) -> _T | None: """Return the latest value that was sent to the channel. Returns: @@ -399,3 +454,11 @@ def peek(self) -> T | None: has been sent to the channel yet, or if the channel is closed. """ return self._chan._latest # pylint: disable=protected-access + + def __str__(self) -> str: + """Return a string representation of this receiver.""" + return f"{self._chan}:{type(self).__name__}" + + def __repr__(self) -> str: + """Return a string representation of this receiver.""" + return f"{type(self).__name__}({self._chan!r}):" diff --git a/src/frequenz/channels/util/_event.py b/src/frequenz/channels/event.py similarity index 82% rename from src/frequenz/channels/util/_event.py rename to src/frequenz/channels/event.py index c227663a..27e20054 100644 --- a/src/frequenz/channels/util/_event.py +++ b/src/frequenz/channels/event.py @@ -4,27 +4,27 @@ """A receiver that can be made ready through an event.""" -import asyncio as _asyncio +import asyncio -from frequenz.channels import _base_classes, _exceptions +from ._receiver import Receiver, ReceiverStoppedError -class Event(_base_classes.Receiver[None]): +class Event(Receiver[None]): """A receiver that can be made ready through an event. - The receiver (the [`ready()`][frequenz.channels.util.Event.ready] method) will wait - until [`set()`][frequenz.channels.util.Event.set] is called. At that point the + The receiver (the [`ready()`][frequenz.channels.event.Event.ready] method) will wait + until [`set()`][frequenz.channels.event.Event.set] is called. At that point the receiver will wait again after the event is [`consume()`][frequenz.channels.Receiver.consume]d. The receiver can be completely stopped by calling - [`stop()`][frequenz.channels.util.Event.stop]. + [`stop()`][frequenz.channels.event.Event.stop]. Example: ```python import asyncio - from frequenz.channels import Receiver - from frequenz.channels.util import Event, select, selected_from + from frequenz.channels import Receiver, select, selected_from + from frequenz.channels.event import Event other_receiver: Receiver[int] = ... exit_event = Event() @@ -45,18 +45,18 @@ async def exit_after_10_seconds() -> None: ``` """ - def __init__(self, name: str | None = None) -> None: + def __init__(self, *, name: str | None = None) -> None: """Create a new instance. Args: - name: The name of the receiver. If `None` the `id(self)` will be used as - the name. This is only for debugging purposes, it will be shown in the + name: The name of the receiver. If `None` an `id(self)`-based name will be + used. This is only for debugging purposes, it will be shown in the string representation of the receiver. """ - self._event: _asyncio.Event = _asyncio.Event() + self._event: asyncio.Event = asyncio.Event() """The event that is set when the receiver is ready.""" - self._name: str = name or str(id(self)) + self._name: str = f"{id(self):_}" if name is None else name """The name of the receiver. This is for debugging purposes, it will be shown in the string representation @@ -134,7 +134,7 @@ def consume(self) -> None: ReceiverStoppedError: If this receiver is stopped. """ if not self._is_set and self._is_stopped: - raise _exceptions.ReceiverStoppedError(self) + raise ReceiverStoppedError(self) assert self._is_set, "calls to `consume()` must be follow a call to `ready()`" diff --git a/src/frequenz/channels/util/_file_watcher.py b/src/frequenz/channels/file_watcher.py similarity index 68% rename from src/frequenz/channels/util/_file_watcher.py rename to src/frequenz/channels/file_watcher.py index 1c87742a..64bc62e1 100644 --- a/src/frequenz/channels/util/_file_watcher.py +++ b/src/frequenz/channels/file_watcher.py @@ -3,8 +3,6 @@ """A Channel receiver for watching for new, modified or deleted files.""" -from __future__ import annotations - import asyncio import pathlib from collections import abc @@ -14,33 +12,34 @@ from watchfiles import Change, awatch from watchfiles.main import FileChange -from .._base_classes import Receiver -from .._exceptions import ReceiverStoppedError +from ._receiver import Receiver, ReceiverStoppedError -class FileWatcher(Receiver["FileWatcher.Event"]): - """A channel receiver that watches for file events.""" +class EventType(Enum): + """Available types of changes to watch for.""" + + CREATE = Change.added + """A new file was created.""" - class EventType(Enum): - """Available types of changes to watch for.""" + MODIFY = Change.modified + """An existing file was modified.""" - CREATE = Change.added - """A new file was created.""" + DELETE = Change.deleted + """An existing file was deleted.""" - MODIFY = Change.modified - """An existing file was modified.""" - DELETE = Change.deleted - """An existing file was deleted.""" +@dataclass(frozen=True) +class Event: + """A file change event.""" - @dataclass(frozen=True) - class Event: - """A file change event.""" + type: EventType + """The type of change that was observed.""" + path: pathlib.Path + """The path where the change was observed.""" - type: FileWatcher.EventType - """The type of change that was observed.""" - path: pathlib.Path - """The path where the change was observed.""" + +class FileWatcher(Receiver[Event]): + """A channel receiver that watches for file events.""" def __init__( self, @@ -54,15 +53,15 @@ def __init__( event_types: Types of events to watch for. Defaults to watch for all event types. """ - self.event_types: frozenset[FileWatcher.EventType] = frozenset(event_types) + self.event_types: frozenset[EventType] = frozenset(event_types) """The types of events to watch for.""" - self._stop_event = asyncio.Event() - self._paths = [ + self._stop_event: asyncio.Event = asyncio.Event() + self._paths: list[pathlib.Path] = [ path if isinstance(path, pathlib.Path) else pathlib.Path(path) for path in paths ] - self._awatch = awatch( + self._awatch: abc.AsyncGenerator[set[FileChange], None] = awatch( *self._paths, stop_event=self._stop_event, watch_filter=self._filter_events ) self._awatch_stopped_exc: Exception | None = None @@ -113,7 +112,7 @@ async def ready(self) -> bool: return False try: - self._changes = await self._awatch.__anext__() + self._changes = await anext(self._awatch) except StopAsyncIteration as err: self._awatch_stopped_exc = err @@ -134,6 +133,18 @@ def consume(self) -> Event: assert self._changes, "`consume()` must be preceded by a call to `ready()`" # Tuple of (Change, path) returned by watchfiles change, path_str = self._changes.pop() - return FileWatcher.Event( - type=FileWatcher.EventType(change), path=pathlib.Path(path_str) - ) + return Event(type=EventType(change), path=pathlib.Path(path_str)) + + def __str__(self) -> str: + """Return a string representation of this receiver.""" + if len(self._paths) > 3: + paths = [str(p) for p in self._paths[:3]] + paths.append("…") + else: + paths = [str(p) for p in self._paths] + event_types = [event_type.name for event_type in self.event_types] + return f"{type(self).__name__}:{','.join(event_types)}:{','.join(paths)}" + + def __repr__(self) -> str: + """Return a string representation of this receiver.""" + return f"{type(self).__name__}({self._paths!r}, {self.event_types!r})" diff --git a/src/frequenz/channels/util/_merge.py b/src/frequenz/channels/merge.py similarity index 73% rename from src/frequenz/channels/util/_merge.py rename to src/frequenz/channels/merge.py index f026c9f1..00461ed9 100644 --- a/src/frequenz/channels/util/_merge.py +++ b/src/frequenz/channels/merge.py @@ -4,14 +4,16 @@ """Merge messages coming from channels into a single stream.""" import asyncio +import itertools from collections import deque -from typing import Any, Deque +from typing import Any, TypeVar -from .._base_classes import Receiver, T -from .._exceptions import ReceiverStoppedError +from ._receiver import Receiver, ReceiverStoppedError +_T = TypeVar("_T") -class Merge(Receiver[T]): + +class Merge(Receiver[_T]): """Merge messages coming from multiple channels into a single stream. Example: @@ -22,8 +24,8 @@ class Merge(Receiver[T]): ```python from frequenz.channels import Broadcast - channel1 = Broadcast[int]("input-chan-1") - channel2 = Broadcast[int]("input-chan-2") + channel1 = Broadcast[int](name="input-chan-1") + channel2 = Broadcast[int](name="input-chan-2") receiver1 = channel1.new_receiver() receiver2 = channel2.new_receiver() @@ -37,18 +39,20 @@ class Merge(Receiver[T]): `self.stop()` method. This will cleanup any internal pending async tasks. """ - def __init__(self, *args: Receiver[T]) -> None: + def __init__(self, *args: Receiver[_T]) -> None: """Create a `Merge` instance. Args: *args: sequence of channel receivers. """ - self._receivers = {str(id): recv for id, recv in enumerate(args)} + self._receivers: dict[str, Receiver[_T]] = { + str(id): recv for id, recv in enumerate(args) + } self._pending: set[asyncio.Task[Any]] = { - asyncio.create_task(recv.__anext__(), name=name) + asyncio.create_task(anext(recv), name=name) for name, recv in self._receivers.items() } - self._results: Deque[T] = deque(maxlen=len(self._receivers)) + self._results: deque[_T] = deque(maxlen=len(self._receivers)) def __del__(self) -> None: """Cleanup any pending tasks.""" @@ -96,11 +100,10 @@ async def ready(self) -> bool: result = item.result() self._results.append(result) self._pending.add( - # pylint: disable=unnecessary-dunder-call - asyncio.create_task(self._receivers[name].__anext__(), name=name) + asyncio.create_task(anext(self._receivers[name]), name=name) ) - def consume(self) -> T: + def consume(self) -> _T: """Return the latest value once `ready` is complete. Returns: @@ -116,3 +119,19 @@ def consume(self) -> T: assert self._results, "`consume()` must be preceded by a call to `ready()`" return self._results.popleft() + + def __str__(self) -> str: + """Return a string representation of this receiver.""" + if len(self._receivers) > 3: + receivers = [str(p) for p in itertools.islice(self._receivers.values(), 3)] + receivers.append("…") + else: + receivers = [str(p) for p in self._receivers.values()] + return f"{type(self).__name__}:{','.join(receivers)}" + + def __repr__(self) -> str: + """Return a string representation of this receiver.""" + return ( + f"{type(self).__name__}(" + f"{', '.join(f'{k}={v!r}' for k, v in self._receivers.items())})" + ) diff --git a/src/frequenz/channels/util/_merge_named.py b/src/frequenz/channels/merge_named.py similarity index 74% rename from src/frequenz/channels/util/_merge_named.py rename to src/frequenz/channels/merge_named.py index d8ab9839..3060bcc6 100644 --- a/src/frequenz/channels/util/_merge_named.py +++ b/src/frequenz/channels/merge_named.py @@ -4,36 +4,38 @@ """Merge messages coming from channels into a single stream containing name of message.""" import asyncio +import itertools from collections import deque -from typing import Any, Deque +from typing import Any, TypeVar -from .._base_classes import Receiver, T -from .._exceptions import ReceiverStoppedError +from ._receiver import Receiver, ReceiverStoppedError +_T = TypeVar("_T") -class MergeNamed(Receiver[tuple[str, T]]): + +class MergeNamed(Receiver[tuple[str, _T]]): """Merge messages coming from multiple named channels into a single stream. When `MergeNamed` is no longer needed, then it should be stopped using `self.stop()` method. This will cleanup any internal pending async tasks. """ - def __init__(self, **kwargs: Receiver[T]) -> None: + def __init__(self, **kwargs: Receiver[_T]) -> None: """Create a `MergeNamed` instance. Args: **kwargs: sequence of channel receivers. """ - self._receivers = kwargs + self._receivers: dict[str, Receiver[_T]] = kwargs """The sequence of channel receivers to get the messages to merge.""" self._pending: set[asyncio.Task[Any]] = { - asyncio.create_task(recv.__anext__(), name=name) + asyncio.create_task(anext(recv), name=name) for name, recv in self._receivers.items() } """The set of pending tasks to merge messages.""" - self._results: Deque[tuple[str, T]] = deque(maxlen=len(self._receivers)) + self._results: deque[tuple[str, _T]] = deque(maxlen=len(self._receivers)) """The internal buffer of merged messages.""" def __del__(self) -> None: @@ -83,10 +85,10 @@ async def ready(self) -> bool: self._results.append((name, result)) self._pending.add( # pylint: disable=unnecessary-dunder-call - asyncio.create_task(self._receivers[name].__anext__(), name=name) + asyncio.create_task(anext(self._receivers[name]), name=name) ) - def consume(self) -> tuple[str, T]: + def consume(self) -> tuple[str, _T]: """Return the latest value once `ready` is complete. Returns: @@ -102,3 +104,19 @@ def consume(self) -> tuple[str, T]: assert self._results, "`consume()` must be preceded by a call to `ready()`" return self._results.popleft() + + def __str__(self) -> str: + """Return a string representation of this receiver.""" + if len(self._receivers) > 3: + receivers = [str(p) for p in itertools.islice(self._receivers, 3)] + receivers.append("…") + else: + receivers = [str(p) for p in self._receivers] + return f"{type(self).__name__}:{','.join(receivers)}" + + def __repr__(self) -> str: + """Return a string representation of this receiver.""" + return ( + f"{type(self).__name__}(" + f"{', '.join(f'{k}={v!r}' for k, v in self._receivers.items())})" + ) diff --git a/src/frequenz/channels/util/_timer.py b/src/frequenz/channels/timer.py similarity index 94% rename from src/frequenz/channels/util/_timer.py rename to src/frequenz/channels/timer.py index ef577897..69dd4e30 100644 --- a/src/frequenz/channels/util/_timer.py +++ b/src/frequenz/channels/timer.py @@ -11,14 +11,12 @@ time, which can lead to very hard to reproduce, and debug, issues. """ -from __future__ import annotations - import abc import asyncio from datetime import timedelta +from typing import Self -from .._base_classes import Receiver -from .._exceptions import ReceiverStoppedError +from ._receiver import Receiver, ReceiverStoppedError def _to_microseconds(time: float | timedelta) -> int: @@ -271,7 +269,7 @@ class Timer(Receiver[timedelta]): """A timer receiver that triggers every `interval` time. The timer has microseconds resolution, so the - [`interval`][frequenz.channels.util.Timer.interval] must be at least + [`interval`][frequenz.channels.timer.Timer.interval] must be at least 1 microsecond. The message it produces is a [`timedelta`][datetime.timedelta] containing the drift @@ -284,34 +282,34 @@ class Timer(Receiver[timedelta]): as the timer uses [`asyncio`][asyncio]s loop monotonic clock. If the timer is delayed too much, then it will behave according to the - [`missed_tick_policy`][frequenz.channels.util.Timer.missed_tick_policy]. Missing + [`missed_tick_policy`][frequenz.channels.timer.Timer.missed_tick_policy]. Missing ticks might or might not trigger a message and the drift could be accumulated or not depending on the chosen policy. These are the currently built-in available policies: - * [`SkipMissedAndDrift`][frequenz.channels.util.SkipMissedAndDrift] - * [`SkipMissedAndResync`][frequenz.channels.util.SkipMissedAndResync] - * [`TriggerAllMissed`][frequenz.channels.util.TriggerAllMissed] + * [`SkipMissedAndDrift`][frequenz.channels.timer.SkipMissedAndDrift] + * [`SkipMissedAndResync`][frequenz.channels.timer.SkipMissedAndResync] + * [`TriggerAllMissed`][frequenz.channels.timer.TriggerAllMissed] For the most common cases, a specialized constructor is provided: - * [`periodic()`][frequenz.channels.util.Timer.periodic] (uses the - [`TriggerAllMissed`][frequenz.channels.util.TriggerAllMissed] or - [`SkipMissedAndResync`][frequenz.channels.util.SkipMissedAndResync] policy) - * [`timeout()`][frequenz.channels.util.Timer.timeout] (uses the - [`SkipMissedAndDrift`][frequenz.channels.util.SkipMissedAndDrift] policy) + * [`periodic()`][frequenz.channels.timer.Timer.periodic] (uses the + [`TriggerAllMissed`][frequenz.channels.timer.TriggerAllMissed] or + [`SkipMissedAndResync`][frequenz.channels.timer.SkipMissedAndResync] policy) + * [`timeout()`][frequenz.channels.timer.Timer.timeout] (uses the + [`SkipMissedAndDrift`][frequenz.channels.timer.SkipMissedAndDrift] policy) - The timer accepts an optional [`loop`][frequenz.channels.util.Timer.loop], which + The timer accepts an optional [`loop`][frequenz.channels.timer.Timer.loop], which will be used to track the time. If `loop` is `None`, then the running loop will be used (if there is no running loop most calls will raise a [`RuntimeError`][RuntimeError]). Starting the timer can be delayed if necessary by using `auto_start=False` (for example until we have a running loop). A call to - [`reset()`][frequenz.channels.util.Timer.reset], - [`ready()`][frequenz.channels.util.Timer.ready], - [`receive()`][frequenz.channels.util.Timer.receive] or the async iterator interface + [`reset()`][frequenz.channels.timer.Timer.reset], + [`ready()`][frequenz.channels.timer.Timer.ready], + [`receive()`][frequenz.channels.timer.Timer.receive] or the async iterator interface to await for a new message will start the timer. Example: Periodic timer example @@ -320,16 +318,16 @@ class Timer(Receiver[timedelta]): print(f"The timer has triggered {drift=}") ``` - But you can also use a [`select`][frequenz.channels.util.select] to combine + But you can also use a [`select`][frequenz.channels.select] to combine it with other receivers, and even start it (semi) manually: ```python import logging - from frequenz.channels.util import select, selected_from - from frequenz.channels import Broadcast + from frequenz.channels import select, selected_from + from frequenz.channels.broadcast import Broadcast timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False) - chan = Broadcast[int]("input-chan") + chan = Broadcast[int](name="input-chan") battery_data = chan.new_receiver() timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False) @@ -349,8 +347,8 @@ class Timer(Receiver[timedelta]): Example: Timeout example ```python import logging - from frequenz.channels.util import select, selected_from - from frequenz.channels import Broadcast + from frequenz.channels import select, selected_from + from frequenz.channels.broadcast import Broadcast def process_data(data: int): logging.info("Processing data: %d", data) @@ -359,8 +357,8 @@ def do_heavy_processing(data: int): logging.info("Heavy processing data: %d", data) timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False) - chan1 = Broadcast[int]("input-chan-1") - chan2 = Broadcast[int]("input-chan-2") + chan1 = Broadcast[int](name="input-chan-1") + chan2 = Broadcast[int](name="input-chan-2") battery_data = chan1.new_receiver() heavy_process = chan2.new_receiver() async for selected in select(battery_data, heavy_process, timer): @@ -493,7 +491,7 @@ def timeout( # noqa: DOC502 auto_start: bool = True, start_delay: timedelta = timedelta(0), loop: asyncio.AbstractEventLoop | None = None, - ) -> Timer: + ) -> Self: """Create a timer useful for tracking timeouts. This is basically a shortcut to create a timer with @@ -524,7 +522,7 @@ def timeout( # noqa: DOC502 microsecond; if `start_delay` is negative or `start_delay` was specified but `auto_start` is `False`. """ - return Timer( + return cls( delay, SkipMissedAndDrift(delay_tolerance=timedelta(0)), auto_start=auto_start, @@ -544,7 +542,7 @@ def periodic( # noqa: DOC502 auto_start: bool = True, start_delay: timedelta = timedelta(0), loop: asyncio.AbstractEventLoop | None = None, - ) -> Timer: + ) -> Self: """Create a periodic timer. This is basically a shortcut to create a timer with either @@ -581,7 +579,7 @@ def periodic( # noqa: DOC502 missed_tick_policy = ( SkipMissedAndResync() if skip_missed_ticks else TriggerAllMissed() ) - return Timer( + return cls( period, missed_tick_policy, auto_start=auto_start, diff --git a/src/frequenz/channels/util/__init__.py b/src/frequenz/channels/util/__init__.py deleted file mode 100644 index 515e1ac2..00000000 --- a/src/frequenz/channels/util/__init__.py +++ /dev/null @@ -1,66 +0,0 @@ -# License: MIT -# Copyright © 2022 Frequenz Energy-as-a-Service GmbH - -"""Channel utilities. - -A module with several utilities to work with channels: - -* [Event][frequenz.channels.util.Event]: - A [receiver][frequenz.channels.Receiver] that can be made ready through an event. - -* [FileWatcher][frequenz.channels.util.FileWatcher]: - A [receiver][frequenz.channels.Receiver] that watches for file events. - -* [Merge][frequenz.channels.util.Merge]: - A [receiver][frequenz.channels.Receiver] that merge messages coming from - multiple receivers into a single stream. - -* [MergeNamed][frequenz.channels.util.MergeNamed]: - A [receiver][frequenz.channels.Receiver] that merge messages coming from - multiple receivers into a single named stream, allowing to identify the - origin of each message. - -* [Timer][frequenz.channels.util.Timer]: - A [receiver][frequenz.channels.Receiver] that ticks at certain intervals. - -* [select][frequenz.channels.util.select]: Iterate over the values of all - [receivers][frequenz.channels.Receiver] as new values become available. -""" - -from ._event import Event -from ._file_watcher import FileWatcher -from ._merge import Merge -from ._merge_named import MergeNamed -from ._select import ( - Selected, - SelectError, - SelectErrorGroup, - UnhandledSelectedError, - select, - selected_from, -) -from ._timer import ( - MissedTickPolicy, - SkipMissedAndDrift, - SkipMissedAndResync, - Timer, - TriggerAllMissed, -) - -__all__ = [ - "Event", - "FileWatcher", - "Merge", - "MergeNamed", - "MissedTickPolicy", - "SelectError", - "SelectErrorGroup", - "Selected", - "SkipMissedAndDrift", - "SkipMissedAndResync", - "Timer", - "TriggerAllMissed", - "UnhandledSelectedError", - "select", - "selected_from", -] diff --git a/tests/test_anycast.py b/tests/test_anycast.py index b571aead..70ac5946 100644 --- a/tests/test_anycast.py +++ b/tests/test_anycast.py @@ -9,13 +9,13 @@ import pytest from frequenz.channels import ( - Anycast, ChannelClosedError, Receiver, ReceiverStoppedError, Sender, SenderError, ) +from frequenz.channels.anycast import Anycast async def test_anycast() -> None: @@ -106,7 +106,7 @@ async def test_anycast_full() -> None: """Ensure send calls to a full channel are blocked.""" buffer_size = 10 timeout = 0.2 - acast: Anycast[int] = Anycast(buffer_size) + acast: Anycast[int] = Anycast(limit=buffer_size) receiver = acast.new_receiver() sender = acast.new_sender() diff --git a/tests/test_bidirectional.py b/tests/test_bidirectional.py index 0d954263..8e77aad9 100644 --- a/tests/test_bidirectional.py +++ b/tests/test_bidirectional.py @@ -8,19 +8,19 @@ import pytest from frequenz.channels import ( - Bidirectional, ChannelClosedError, ChannelError, ReceiverError, SenderError, ) +from frequenz.channels.bidirectional import Bidirectional, Handle async def test_request_response() -> None: """Ensure bi-directional communication is possible.""" - req_resp: Bidirectional[int, str] = Bidirectional("test_client", "test_service") + req_resp: Bidirectional[int, str] = Bidirectional(name="test_service") - async def service(handle: Bidirectional.Handle[str, int]) -> None: + async def service(handle: Handle[str, int]) -> None: while True: num = await handle.receive() if num is None: @@ -36,7 +36,7 @@ async def service(handle: Bidirectional.Handle[str, int]) -> None: service(req_resp.service_handle), ) - client_handle: Bidirectional.Handle[int, str] = req_resp.client_handle + client_handle: Handle[int, str] = req_resp.client_handle for ctr in range(-5, 5): await client_handle.send(ctr) @@ -52,7 +52,7 @@ async def service(handle: Bidirectional.Handle[str, int]) -> None: async def test_sender_error_chaining() -> None: """Ensure bi-directional communication is possible.""" - req_resp: Bidirectional[int, str] = Bidirectional("test_client", "test_service") + req_resp: Bidirectional[int, str] = Bidirectional(name="test_service") await req_resp._response_channel.close() # pylint: disable=protected-access @@ -68,7 +68,7 @@ async def test_sender_error_chaining() -> None: async def test_consume_error_chaining() -> None: """Ensure bi-directional communication is possible.""" - req_resp: Bidirectional[int, str] = Bidirectional("test_client", "test_service") + req_resp: Bidirectional[int, str] = Bidirectional(name="test_service") await req_resp._request_channel.close() # pylint: disable=protected-access diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index d8a7c49a..dc02378e 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -9,7 +9,6 @@ import pytest from frequenz.channels import ( - Broadcast, ChannelClosedError, Receiver, ReceiverInvalidatedError, @@ -17,11 +16,13 @@ Sender, SenderError, ) +from frequenz.channels.broadcast import Broadcast +from frequenz.channels.broadcast import _Receiver as BroadcastReceiver async def test_broadcast() -> None: """Ensure sent messages are received by all receivers.""" - bcast: Broadcast[int] = Broadcast("meter_5") + bcast: Broadcast[int] = Broadcast(name="meter_5") num_receivers = 5 num_senders = 5 @@ -70,7 +71,7 @@ async def update_tracker_on_receive(receiver_id: int, recv: Receiver[int]) -> No async def test_broadcast_none_values() -> None: """Ensure None values can be sent and received.""" - bcast: Broadcast[int | None] = Broadcast("any_channel") + bcast: Broadcast[int | None] = Broadcast(name="any_channel") sender = bcast.new_sender() receiver = bcast.new_receiver() @@ -87,7 +88,7 @@ async def test_broadcast_none_values() -> None: async def test_broadcast_after_close() -> None: """Ensure closed channels can't get new messages.""" - bcast: Broadcast[int] = Broadcast("meter_5") + bcast: Broadcast[int] = Broadcast(name="meter_5") receiver = bcast.new_receiver() sender = bcast.new_sender() @@ -105,14 +106,16 @@ async def test_broadcast_after_close() -> None: async def test_broadcast_overflow() -> None: """Ensure messages sent to full broadcast receivers get dropped.""" - bcast: Broadcast[int] = Broadcast("meter_5") + bcast: Broadcast[int] = Broadcast(name="meter_5") big_recv_size = 10 small_recv_size = int(big_recv_size / 2) sender = bcast.new_sender() - big_receiver = bcast.new_receiver("named-recv", big_recv_size) - small_receiver = bcast.new_receiver(None, small_recv_size) + big_receiver = bcast.new_receiver(name="named-recv", limit=big_recv_size) + assert isinstance(big_receiver, BroadcastReceiver) + small_receiver = bcast.new_receiver(limit=small_recv_size) + assert isinstance(small_receiver, BroadcastReceiver) async def drain_receivers() -> tuple[int, int]: big_sum = 0 @@ -156,7 +159,7 @@ async def drain_receivers() -> tuple[int, int]: async def test_broadcast_resend_latest() -> None: """Check if new receivers get the latest value when resend_latest is set.""" - bcast: Broadcast[int] = Broadcast("new_recv_test", resend_latest=True) + bcast: Broadcast[int] = Broadcast(name="new_recv_test", resend_latest=True) sender = bcast.new_sender() old_recv = bcast.new_receiver() @@ -173,7 +176,7 @@ async def test_broadcast_resend_latest() -> None: async def test_broadcast_no_resend_latest() -> None: """Ensure new receivers don't get the latest value when resend_latest isn't set.""" - bcast: Broadcast[int] = Broadcast("new_recv_test", resend_latest=False) + bcast: Broadcast[int] = Broadcast(name="new_recv_test", resend_latest=False) sender = bcast.new_sender() old_recv = bcast.new_receiver() @@ -189,7 +192,7 @@ async def test_broadcast_no_resend_latest() -> None: async def test_broadcast_peek() -> None: """Ensure we are able to peek into broadcast channels.""" - bcast: Broadcast[int] = Broadcast("peek-test") + bcast: Broadcast[int] = Broadcast(name="peek-test") receiver = bcast.new_receiver() peekable = receiver.into_peekable() sender = bcast.new_sender() @@ -215,7 +218,7 @@ async def test_broadcast_peek() -> None: async def test_broadcast_async_iterator() -> None: """Check that the broadcast receiver works as an async iterator.""" - bcast: Broadcast[int] = Broadcast("iter_test") + bcast: Broadcast[int] = Broadcast(name="iter_test") sender = bcast.new_sender() receiver = bcast.new_receiver() @@ -238,7 +241,7 @@ async def send_values() -> None: async def test_broadcast_map() -> None: """Ensure map runs on all incoming messages.""" - chan = Broadcast[int]("input-chan") + chan = Broadcast[int](name="input-chan") sender = chan.new_sender() # transform int receiver into bool receiver. @@ -253,7 +256,7 @@ async def test_broadcast_map() -> None: async def test_broadcast_receiver_drop() -> None: """Ensure deleted receivers get cleaned up.""" - chan = Broadcast[int]("input-chan") + chan = Broadcast[int](name="input-chan") sender = chan.new_sender() receiver1 = chan.new_receiver() diff --git a/tests/utils/test_event.py b/tests/test_event.py similarity index 96% rename from tests/utils/test_event.py rename to tests/test_event.py index 0cda9d23..950720d0 100644 --- a/tests/utils/test_event.py +++ b/tests/test_event.py @@ -8,7 +8,7 @@ import pytest as _pytest from frequenz.channels import ReceiverStoppedError -from frequenz.channels.util import Event +from frequenz.channels.event import Event async def test_event() -> None: diff --git a/tests/utils/test_file_watcher.py b/tests/test_file_watcher.py similarity index 85% rename from tests/utils/test_file_watcher.py rename to tests/test_file_watcher.py index bed75bcb..c1a65838 100644 --- a/tests/utils/test_file_watcher.py +++ b/tests/test_file_watcher.py @@ -15,7 +15,7 @@ from watchfiles import Change from watchfiles.main import FileChange -from frequenz.channels.util import FileWatcher +from frequenz.channels.file_watcher import Event, EventType, FileWatcher class _FakeAwatch: @@ -52,7 +52,7 @@ def fake_awatch() -> Iterator[_FakeAwatch]: """Fixture to mock the awatch function.""" fake = _FakeAwatch() with mock.patch( - "frequenz.channels.util._file_watcher.awatch", + "frequenz.channels.file_watcher.awatch", autospec=True, side_effect=fake.fake_awatch, ): @@ -74,14 +74,14 @@ async def test_file_watcher_receive_updates( for change in changes: recv_changes = await file_watcher.receive() - event_type = FileWatcher.EventType(change[0]) + event_type = EventType(change[0]) path = pathlib.Path(change[1]) - assert recv_changes == FileWatcher.Event(type=event_type, path=path) + assert recv_changes == Event(type=event_type, path=path) -@hypothesis.given(event_types=st.sets(st.sampled_from(FileWatcher.EventType))) +@hypothesis.given(event_types=st.sets(st.sampled_from(EventType))) async def test_file_watcher_filter_events( - event_types: set[FileWatcher.EventType], + event_types: set[EventType], ) -> None: """Test the file watcher events filtering.""" good_path = "good-file" @@ -89,7 +89,7 @@ async def test_file_watcher_filter_events( # We need to reset the mock explicitly because hypothesis runs all the produced # inputs in the same context. with mock.patch( - "frequenz.channels.util._file_watcher.awatch", autospec=True + "frequenz.channels.file_watcher.awatch", autospec=True ) as awatch_mock: file_watcher = FileWatcher(paths=[good_path], event_types=event_types) @@ -100,7 +100,7 @@ async def test_file_watcher_filter_events( pathlib.Path(good_path), stop_event=mock.ANY, watch_filter=filter_events ) ] - for event_type in FileWatcher.EventType: + for event_type in EventType: assert filter_events(event_type.value, good_path) == ( event_type in event_types ) diff --git a/tests/utils/test_integration.py b/tests/test_integration.py similarity index 87% rename from tests/utils/test_integration.py rename to tests/test_integration.py index e61cb620..754aca5f 100644 --- a/tests/utils/test_integration.py +++ b/tests/test_integration.py @@ -9,7 +9,9 @@ import pytest -from frequenz.channels.util import FileWatcher, Timer, select, selected_from +from frequenz.channels import select, selected_from +from frequenz.channels.file_watcher import Event, EventType, FileWatcher +from frequenz.channels.timer import Timer @pytest.mark.integration @@ -31,12 +33,8 @@ async def test_file_watcher(tmp_path: pathlib.Path) -> None: if selected_from(selected, timer): filename.write_text(f"{selected.value}") elif selected_from(selected, file_watcher): - event_type = ( - FileWatcher.EventType.CREATE - if number_of_writes == 0 - else FileWatcher.EventType.MODIFY - ) - assert selected.value == FileWatcher.Event(type=event_type, path=filename) + event_type = EventType.CREATE if number_of_writes == 0 else EventType.MODIFY + assert selected.value == Event(type=event_type, path=filename) number_of_writes += 1 # After receiving a write 3 times, unsubscribe from the writes channel if number_of_writes == expected_number_of_writes: @@ -56,9 +54,7 @@ async def test_file_watcher_deletes(tmp_path: pathlib.Path) -> None: tmp_path: A tmp directory to run the file watcher on. Created by pytest. """ filename = tmp_path / "test-file" - file_watcher = FileWatcher( - paths=[str(tmp_path)], event_types={FileWatcher.EventType.DELETE} - ) + file_watcher = FileWatcher(paths=[str(tmp_path)], event_types={EventType.DELETE}) write_timer = Timer.timeout(timedelta(seconds=0.1)) deletion_timer = Timer.timeout(timedelta(seconds=0.25)) diff --git a/tests/test_merge.py b/tests/test_merge.py index 9cc920f2..3d3cf3ab 100644 --- a/tests/test_merge.py +++ b/tests/test_merge.py @@ -5,8 +5,9 @@ import asyncio -from frequenz.channels import Anycast, Sender -from frequenz.channels.util import Merge +from frequenz.channels import Sender +from frequenz.channels.anycast import Anycast +from frequenz.channels.merge import Merge async def test_merge() -> None: diff --git a/tests/test_mergenamed.py b/tests/test_mergenamed.py index 1c06bbe9..1ddcb510 100644 --- a/tests/test_mergenamed.py +++ b/tests/test_mergenamed.py @@ -5,8 +5,9 @@ import asyncio -from frequenz.channels import Anycast, Sender -from frequenz.channels.util import MergeNamed +from frequenz.channels import Sender +from frequenz.channels.anycast import Anycast +from frequenz.channels.merge_named import MergeNamed async def test_mergenamed() -> None: diff --git a/tests/utils/test_select.py b/tests/test_select.py similarity index 93% rename from tests/utils/test_select.py rename to tests/test_select.py index a9a46921..9eb001c5 100644 --- a/tests/utils/test_select.py +++ b/tests/test_select.py @@ -7,8 +7,7 @@ import pytest -from frequenz.channels import Receiver, ReceiverStoppedError -from frequenz.channels.util import Selected, selected_from +from frequenz.channels import Receiver, ReceiverStoppedError, Selected, selected_from class TestSelected: diff --git a/tests/utils/test_select_integration.py b/tests/test_select_integration.py similarity index 98% rename from tests/utils/test_select_integration.py rename to tests/test_select_integration.py index e5f405d7..6d676528 100644 --- a/tests/utils/test_select_integration.py +++ b/tests/test_select_integration.py @@ -15,14 +15,15 @@ class at a time. import async_solipsism import pytest -from frequenz.channels import Receiver, ReceiverStoppedError -from frequenz.channels.util import ( - Event, +from frequenz.channels import ( + Receiver, + ReceiverStoppedError, Selected, UnhandledSelectedError, select, selected_from, ) +from frequenz.channels.event import Event @pytest.mark.integration @@ -57,9 +58,9 @@ async def start_run_ordered_sequence(self) -> AsyncIterator[asyncio.Task[None]]: def setup_method(self) -> None: """Set up the test.""" - self.recv1 = Event("recv1") - self.recv2 = Event("recv2") - self.recv3 = Event("recv3") + self.recv1 = Event(name="recv1") + self.recv2 = Event(name="recv2") + self.recv3 = Event(name="recv3") def assert_received_from( self, diff --git a/tests/utils/test_timer.py b/tests/test_timer.py similarity index 99% rename from tests/utils/test_timer.py rename to tests/test_timer.py index dd5e5109..73fe28f6 100644 --- a/tests/utils/test_timer.py +++ b/tests/test_timer.py @@ -14,7 +14,7 @@ import pytest from hypothesis import strategies as st -from frequenz.channels.util import ( +from frequenz.channels.timer import ( SkipMissedAndDrift, SkipMissedAndResync, Timer, diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py deleted file mode 100644 index 25e1e6d9..00000000 --- a/tests/utils/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -# License: MIT -# Copyright © 2022 Frequenz Energy-as-a-Service GmbH - -"""Tests for channel utils."""