From f355fd33682f905cd3d294df89960e34750e2084 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Wed, 1 Nov 2023 15:24:09 +0100 Subject: [PATCH 01/22] Use `deque` for typing There is no need to import `typing.Deque` in newer Python versions. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_anycast.py | 4 ++-- src/frequenz/channels/_broadcast.py | 4 ++-- src/frequenz/channels/util/_merge.py | 4 ++-- src/frequenz/channels/util/_merge_named.py | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index 40cf69db..8217588e 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -7,7 +7,7 @@ from asyncio import Condition from collections import deque -from typing import Deque, Generic +from typing import Generic from ._base_classes import Receiver as BaseReceiver from ._base_classes import Sender as BaseSender @@ -75,7 +75,7 @@ def __init__(self, maxsize: int = 10) -> None: a value is consumed. """ - self._deque: Deque[T] = deque(maxlen=maxsize) + self._deque: deque[T] = deque(maxlen=maxsize) """The channel's buffer.""" self._send_cv: Condition = Condition() diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 5fe94652..4c163332 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -9,7 +9,7 @@ import weakref from asyncio import Condition from collections import deque -from typing import Deque, Generic +from typing import Generic from uuid import UUID, uuid4 from ._base_classes import Peekable as BasePeekable @@ -260,7 +260,7 @@ def __init__(self, uuid: UUID, name: str, maxsize: int, chan: Broadcast[T]) -> N self._chan = chan """The broadcast channel that this receiver belongs to.""" - self._q: Deque[T] = deque(maxlen=maxsize) + self._q: deque[T] = deque(maxlen=maxsize) """The receiver's internal message queue.""" self._active = True diff --git a/src/frequenz/channels/util/_merge.py b/src/frequenz/channels/util/_merge.py index f026c9f1..0899bb22 100644 --- a/src/frequenz/channels/util/_merge.py +++ b/src/frequenz/channels/util/_merge.py @@ -5,7 +5,7 @@ import asyncio from collections import deque -from typing import Any, Deque +from typing import Any from .._base_classes import Receiver, T from .._exceptions import ReceiverStoppedError @@ -48,7 +48,7 @@ def __init__(self, *args: Receiver[T]) -> None: asyncio.create_task(recv.__anext__(), name=name) for name, recv in self._receivers.items() } - self._results: Deque[T] = deque(maxlen=len(self._receivers)) + self._results: deque[T] = deque(maxlen=len(self._receivers)) def __del__(self) -> None: """Cleanup any pending tasks.""" diff --git a/src/frequenz/channels/util/_merge_named.py b/src/frequenz/channels/util/_merge_named.py index d8ab9839..db6a450a 100644 --- a/src/frequenz/channels/util/_merge_named.py +++ b/src/frequenz/channels/util/_merge_named.py @@ -5,7 +5,7 @@ import asyncio from collections import deque -from typing import Any, Deque +from typing import Any from .._base_classes import Receiver, T from .._exceptions import ReceiverStoppedError @@ -33,7 +33,7 @@ def __init__(self, **kwargs: Receiver[T]) -> None: } """The set of pending tasks to merge messages.""" - self._results: Deque[tuple[str, T]] = deque(maxlen=len(self._receivers)) + self._results: deque[tuple[str, T]] = deque(maxlen=len(self._receivers)) """The internal buffer of merged messages.""" def __del__(self) -> None: From f0547891d530d24b2cfc394c7b29a980c2ff2b97 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 27 Oct 2023 22:22:04 +0200 Subject: [PATCH 02/22] Make some arguments keyword-only We do this when the arguments could be easily mistake or hard to know what they are for in the call site, for example when the type is a `str` or `int`, and for most optional arguments. Signed-off-by: Leandro Lucarella --- benchmarks/benchmark_anycast.py | 4 +++- src/frequenz/channels/_anycast.py | 2 +- src/frequenz/channels/_bidirectional.py | 2 +- src/frequenz/channels/_broadcast.py | 6 ++++-- src/frequenz/channels/util/_event.py | 2 +- tests/test_anycast.py | 2 +- tests/test_bidirectional.py | 12 +++++++++--- tests/test_broadcast.py | 4 ++-- tests/utils/test_select_integration.py | 6 +++--- 9 files changed, 25 insertions(+), 15 deletions(-) diff --git a/benchmarks/benchmark_anycast.py b/benchmarks/benchmark_anycast.py index 0e60aba9..c9e1ffac 100644 --- a/benchmarks/benchmark_anycast.py +++ b/benchmarks/benchmark_anycast.py @@ -41,7 +41,9 @@ async def benchmark_anycast( Returns: Total number of messages received by all channels. """ - channels: list[Anycast[int]] = [Anycast(buffer_size) for _ in range(num_channels)] + channels: list[Anycast[int]] = [ + Anycast(maxsize=buffer_size) for _ in range(num_channels) + ] senders = [ asyncio.create_task(send_msg(num_messages, bcast.new_sender())) for bcast in channels diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index 8217588e..1e7f5633 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -61,7 +61,7 @@ async def recv(id: int, receiver: channel.Receiver) -> None: Check the `tests` and `benchmarks` directories for more examples. """ - def __init__(self, maxsize: int = 10) -> None: + def __init__(self, *, maxsize: int = 10) -> None: """Create an Anycast channel. Args: diff --git a/src/frequenz/channels/_bidirectional.py b/src/frequenz/channels/_bidirectional.py index a1bfc94f..17357c6e 100644 --- a/src/frequenz/channels/_bidirectional.py +++ b/src/frequenz/channels/_bidirectional.py @@ -112,7 +112,7 @@ def consume(self) -> W: err.__cause__ = this_chan_error raise err - def __init__(self, client_id: str, service_id: str) -> None: + def __init__(self, *, client_id: str, service_id: str) -> None: """Create a `Bidirectional` instance. Args: diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 4c163332..6043db00 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -71,7 +71,7 @@ async def recv(id: int, receiver: channel.Receiver) -> None: Check the `tests` and `benchmarks` directories for more examples. """ - def __init__(self, name: str, resend_latest: bool = False) -> None: + def __init__(self, name: str, *, resend_latest: bool = False) -> None: """Create a Broadcast channel. Args: @@ -141,7 +141,9 @@ def new_sender(self) -> Sender[T]: """ return Sender(self) - def new_receiver(self, name: str | None = None, maxsize: int = 50) -> Receiver[T]: + def new_receiver( + self, *, name: str | None = None, maxsize: int = 50 + ) -> Receiver[T]: """Create a new broadcast receiver. Broadcast receivers have their own buffer, and when messages are not diff --git a/src/frequenz/channels/util/_event.py b/src/frequenz/channels/util/_event.py index c227663a..421b2965 100644 --- a/src/frequenz/channels/util/_event.py +++ b/src/frequenz/channels/util/_event.py @@ -45,7 +45,7 @@ async def exit_after_10_seconds() -> None: ``` """ - def __init__(self, name: str | None = None) -> None: + def __init__(self, *, name: str | None = None) -> None: """Create a new instance. Args: diff --git a/tests/test_anycast.py b/tests/test_anycast.py index b571aead..bddfa5b4 100644 --- a/tests/test_anycast.py +++ b/tests/test_anycast.py @@ -106,7 +106,7 @@ async def test_anycast_full() -> None: """Ensure send calls to a full channel are blocked.""" buffer_size = 10 timeout = 0.2 - acast: Anycast[int] = Anycast(buffer_size) + acast: Anycast[int] = Anycast(maxsize=buffer_size) receiver = acast.new_receiver() sender = acast.new_sender() diff --git a/tests/test_bidirectional.py b/tests/test_bidirectional.py index 0d954263..11165ff2 100644 --- a/tests/test_bidirectional.py +++ b/tests/test_bidirectional.py @@ -18,7 +18,9 @@ async def test_request_response() -> None: """Ensure bi-directional communication is possible.""" - req_resp: Bidirectional[int, str] = Bidirectional("test_client", "test_service") + req_resp: Bidirectional[int, str] = Bidirectional( + client_id="test_client", service_id="test_service" + ) async def service(handle: Bidirectional.Handle[str, int]) -> None: while True: @@ -52,7 +54,9 @@ async def service(handle: Bidirectional.Handle[str, int]) -> None: async def test_sender_error_chaining() -> None: """Ensure bi-directional communication is possible.""" - req_resp: Bidirectional[int, str] = Bidirectional("test_client", "test_service") + req_resp: Bidirectional[int, str] = Bidirectional( + client_id="test_client", service_id="test_service" + ) await req_resp._response_channel.close() # pylint: disable=protected-access @@ -68,7 +72,9 @@ async def test_sender_error_chaining() -> None: async def test_consume_error_chaining() -> None: """Ensure bi-directional communication is possible.""" - req_resp: Bidirectional[int, str] = Bidirectional("test_client", "test_service") + req_resp: Bidirectional[int, str] = Bidirectional( + client_id="test_client", service_id="test_service" + ) await req_resp._request_channel.close() # pylint: disable=protected-access diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index d8a7c49a..7db4b290 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -111,8 +111,8 @@ async def test_broadcast_overflow() -> None: small_recv_size = int(big_recv_size / 2) sender = bcast.new_sender() - big_receiver = bcast.new_receiver("named-recv", big_recv_size) - small_receiver = bcast.new_receiver(None, small_recv_size) + big_receiver = bcast.new_receiver(name="named-recv", maxsize=big_recv_size) + small_receiver = bcast.new_receiver(maxsize=small_recv_size) async def drain_receivers() -> tuple[int, int]: big_sum = 0 diff --git a/tests/utils/test_select_integration.py b/tests/utils/test_select_integration.py index e5f405d7..a4984f62 100644 --- a/tests/utils/test_select_integration.py +++ b/tests/utils/test_select_integration.py @@ -57,9 +57,9 @@ async def start_run_ordered_sequence(self) -> AsyncIterator[asyncio.Task[None]]: def setup_method(self) -> None: """Set up the test.""" - self.recv1 = Event("recv1") - self.recv2 = Event("recv2") - self.recv3 = Event("recv3") + self.recv1 = Event(name="recv1") + self.recv2 = Event(name="recv2") + self.recv3 = Event(name="recv3") def assert_received_from( self, From e537ea8ddd1b6bc85742d34f1f01b1440c048f49 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 27 Oct 2023 22:31:09 +0200 Subject: [PATCH 03/22] Make exception messages `str` There is no reason to have them of `Any` type, anybody can explicitly convert any object to `str` using `str(obj)` if they want to do so. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_exceptions.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/frequenz/channels/_exceptions.py b/src/frequenz/channels/_exceptions.py index 2042003b..921c6237 100644 --- a/src/frequenz/channels/_exceptions.py +++ b/src/frequenz/channels/_exceptions.py @@ -19,7 +19,7 @@ class Error(RuntimeError): All exceptions generated by this library inherit from this exception. """ - def __init__(self, message: Any): + def __init__(self, message: str): """Create a ChannelError instance. Args: @@ -34,7 +34,7 @@ class ChannelError(Error): All exceptions generated by channels inherit from this exception. """ - def __init__(self, message: Any, channel: Any): + def __init__(self, message: str, channel: Any): """Create a ChannelError instance. Args: @@ -64,7 +64,7 @@ class SenderError(Error, Generic[T]): All exceptions generated by senders inherit from this exception. """ - def __init__(self, message: Any, sender: _base_classes.Sender[T]): + def __init__(self, message: str, sender: _base_classes.Sender[T]): """Create an instance. Args: @@ -83,7 +83,7 @@ class ReceiverError(Error, Generic[T]): All exceptions generated by receivers inherit from this exception. """ - def __init__(self, message: Any, receiver: _base_classes.Receiver[T]): + def __init__(self, message: str, receiver: _base_classes.Receiver[T]): """Create an instance. Args: From 6e680cf8bb31a04a2008e1b1dc3d1d1aaba245e9 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Fri, 27 Oct 2023 22:44:28 +0200 Subject: [PATCH 04/22] Replace `__anext__()` with `anext()` `anext()` wasn't available in older Python versions. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/util/_file_watcher.py | 2 +- src/frequenz/channels/util/_merge.py | 5 ++--- src/frequenz/channels/util/_merge_named.py | 4 ++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/frequenz/channels/util/_file_watcher.py b/src/frequenz/channels/util/_file_watcher.py index 1c87742a..1f915f83 100644 --- a/src/frequenz/channels/util/_file_watcher.py +++ b/src/frequenz/channels/util/_file_watcher.py @@ -113,7 +113,7 @@ async def ready(self) -> bool: return False try: - self._changes = await self._awatch.__anext__() + self._changes = await anext(self._awatch) except StopAsyncIteration as err: self._awatch_stopped_exc = err diff --git a/src/frequenz/channels/util/_merge.py b/src/frequenz/channels/util/_merge.py index 0899bb22..70022d08 100644 --- a/src/frequenz/channels/util/_merge.py +++ b/src/frequenz/channels/util/_merge.py @@ -45,7 +45,7 @@ def __init__(self, *args: Receiver[T]) -> None: """ self._receivers = {str(id): recv for id, recv in enumerate(args)} self._pending: set[asyncio.Task[Any]] = { - asyncio.create_task(recv.__anext__(), name=name) + asyncio.create_task(anext(recv), name=name) for name, recv in self._receivers.items() } self._results: deque[T] = deque(maxlen=len(self._receivers)) @@ -96,8 +96,7 @@ async def ready(self) -> bool: result = item.result() self._results.append(result) self._pending.add( - # pylint: disable=unnecessary-dunder-call - asyncio.create_task(self._receivers[name].__anext__(), name=name) + asyncio.create_task(anext(self._receivers[name]), name=name) ) def consume(self) -> T: diff --git a/src/frequenz/channels/util/_merge_named.py b/src/frequenz/channels/util/_merge_named.py index db6a450a..78758345 100644 --- a/src/frequenz/channels/util/_merge_named.py +++ b/src/frequenz/channels/util/_merge_named.py @@ -28,7 +28,7 @@ def __init__(self, **kwargs: Receiver[T]) -> None: """The sequence of channel receivers to get the messages to merge.""" self._pending: set[asyncio.Task[Any]] = { - asyncio.create_task(recv.__anext__(), name=name) + asyncio.create_task(anext(recv), name=name) for name, recv in self._receivers.items() } """The set of pending tasks to merge messages.""" @@ -83,7 +83,7 @@ async def ready(self) -> bool: self._results.append((name, result)) self._pending.add( # pylint: disable=unnecessary-dunder-call - asyncio.create_task(self._receivers[name].__anext__(), name=name) + asyncio.create_task(anext(self._receivers[name]), name=name) ) def consume(self) -> tuple[str, T]: From c654ab79cd93a66684c128a66181c79ffd43ea66 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Mon, 30 Oct 2023 09:48:16 +0100 Subject: [PATCH 05/22] Rename `recv` argument as `receiver` For the rest of the API we use full names instead of abbreviated names, so better to do the same here for consistency and clarity. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_base_classes.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/frequenz/channels/_base_classes.py b/src/frequenz/channels/_base_classes.py index 8745ce87..723ba408 100644 --- a/src/frequenz/channels/_base_classes.py +++ b/src/frequenz/channels/_base_classes.py @@ -166,14 +166,14 @@ class _Map(Receiver[U], Generic[T, U]): - The output type: return type of the transform method. """ - def __init__(self, recv: Receiver[T], transform: Callable[[T], U]) -> None: + def __init__(self, receiver: Receiver[T], transform: Callable[[T], U]) -> None: """Create a `Transform` instance. Args: - recv: The input receiver. + receiver: The input receiver. transform: The function to run on the input data. """ - self._recv = recv + self._receiver = receiver """The input receiver.""" self._transform = transform @@ -190,7 +190,7 @@ async def ready(self) -> bool: Returns: Whether the receiver is still active. """ - return await self._recv.ready() # pylint: disable=protected-access + return await self._receiver.ready() # pylint: disable=protected-access # We need a noqa here because the docs have a Raises section but the code doesn't # explicitly raise anything. @@ -203,4 +203,6 @@ def consume(self) -> U: # noqa: DOC502 Raises: ChannelClosedError: if the underlying channel is closed. """ - return self._transform(self._recv.consume()) # pylint: disable=protected-access + return self._transform( + self._receiver.consume() + ) # pylint: disable=protected-access From 6c9b84fa3b82198fba188492d3594e6fe057b296 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Sat, 28 Oct 2023 00:08:19 +0200 Subject: [PATCH 06/22] Add `is_closed` property to channels It might be useful to be able to test if a channel is closed or not. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_anycast.py | 9 +++++++++ src/frequenz/channels/_bidirectional.py | 13 +++++++++++++ src/frequenz/channels/_broadcast.py | 9 +++++++++ 3 files changed, 31 insertions(+) diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index 1e7f5633..f671948b 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -97,6 +97,15 @@ def __init__(self, *, maxsize: int = 10) -> None: self._closed: bool = False """Whether the channel is closed.""" + @property + def is_closed(self) -> bool: + """Whether this channel is closed. + + Any further attempts to use this channel after it is closed will result in an + exception. + """ + return self._closed + async def close(self) -> None: """Close the channel. diff --git a/src/frequenz/channels/_bidirectional.py b/src/frequenz/channels/_bidirectional.py index 17357c6e..6aadac05 100644 --- a/src/frequenz/channels/_bidirectional.py +++ b/src/frequenz/channels/_bidirectional.py @@ -144,6 +144,19 @@ def __init__(self, *, client_id: str, service_id: str) -> None: ) """The handle for the service side to send/receive values.""" + @property + def is_closed(self) -> bool: + """Whether this channel is closed. + + Any further attempts to use this channel after it is closed will result in an + exception. + + As long as there is a way to send or receive data, the channel is considered + open, even if the other side is closed, so this returns `False` if only both + underlying channels are closed. + """ + return self._request_channel.is_closed and self._response_channel.is_closed + @property def client_handle(self) -> Bidirectional.Handle[T, U]: """Get a `Handle` for the client side to use. diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 6043db00..7f45c9a1 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -117,6 +117,15 @@ def __init__(self, name: str, *, resend_latest: bool = False) -> None: self._latest: T | None = None """The latest value sent to the channel.""" + @property + def is_closed(self) -> bool: + """Whether this channel is closed. + + Any further attempts to use this channel after it is closed will result in an + exception. + """ + return self._closed + async def close(self) -> None: """Close the Broadcast channel. From be89c5cee29a6dc0e29d1cb799c1863aee3c9c02 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Sat, 28 Oct 2023 00:41:59 +0200 Subject: [PATCH 07/22] Add missing types to members in `__init__()` Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_anycast.py | 4 ++-- src/frequenz/channels/_base_classes.py | 4 ++-- src/frequenz/channels/_bidirectional.py | 12 ++++++------ src/frequenz/channels/_broadcast.py | 12 ++++++------ src/frequenz/channels/util/_file_watcher.py | 6 +++--- src/frequenz/channels/util/_merge.py | 4 +++- src/frequenz/channels/util/_merge_named.py | 2 +- src/frequenz/channels/util/_select.py | 2 +- 8 files changed, 24 insertions(+), 22 deletions(-) diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index f671948b..8fe9b3f9 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -154,7 +154,7 @@ def __init__(self, chan: Anycast[T]) -> None: Args: chan: A reference to the channel that this sender belongs to. """ - self._chan = chan + self._chan: Anycast[T] = chan """The channel that this sender belongs to.""" async def send(self, msg: T) -> None: @@ -204,7 +204,7 @@ def __init__(self, chan: Anycast[T]) -> None: Args: chan: A reference to the channel that this receiver belongs to. """ - self._chan = chan + self._chan: Anycast[T] = chan """The channel that this receiver belongs to.""" self._next: T | type[_Empty] = _Empty diff --git a/src/frequenz/channels/_base_classes.py b/src/frequenz/channels/_base_classes.py index 723ba408..38aa1649 100644 --- a/src/frequenz/channels/_base_classes.py +++ b/src/frequenz/channels/_base_classes.py @@ -173,10 +173,10 @@ def __init__(self, receiver: Receiver[T], transform: Callable[[T], U]) -> None: receiver: The input receiver. transform: The function to run on the input data. """ - self._receiver = receiver + self._receiver: Receiver[T] = receiver """The input receiver.""" - self._transform = transform + self._transform: Callable[[T], U] = transform """The function to run on the input data.""" async def ready(self) -> bool: diff --git a/src/frequenz/channels/_bidirectional.py b/src/frequenz/channels/_bidirectional.py index 6aadac05..779ae5c3 100644 --- a/src/frequenz/channels/_bidirectional.py +++ b/src/frequenz/channels/_bidirectional.py @@ -37,13 +37,13 @@ def __init__( sender: A sender to send values with. receiver: A receiver to receive values from. """ - self._chan = channel + self._chan: Bidirectional[V, W] | Bidirectional[W, V] = channel """The underlying channel.""" - self._sender = sender + self._sender: Sender[V] = sender """The sender to send values with.""" - self._receiver = receiver + self._receiver: Receiver[W] = receiver """The receiver to receive values from.""" async def send(self, msg: V) -> None: @@ -119,7 +119,7 @@ def __init__(self, *, client_id: str, service_id: str) -> None: client_id: A name for the client, used to name the channels. service_id: A name for the service end of the channels. """ - self._client_id = client_id + self._client_id: str = client_id """The name for the client, used to name the channels.""" self._request_channel: Broadcast[T] = Broadcast(f"req_{service_id}_{client_id}") @@ -130,14 +130,14 @@ def __init__(self, *, client_id: str, service_id: str) -> None: ) """The channel to send responses.""" - self._client_handle = Bidirectional.Handle( + self._client_handle: Bidirectional.Handle[T, U] = Bidirectional.Handle( self, self._request_channel.new_sender(), self._response_channel.new_receiver(), ) """The handle for the client side to send/receive values.""" - self._service_handle = Bidirectional.Handle( + self._service_handle: Bidirectional.Handle[U, T] = Bidirectional.Handle( self, self._response_channel.new_sender(), self._request_channel.new_receiver(), diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 7f45c9a1..2d36132f 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -202,7 +202,7 @@ def __init__(self, chan: Broadcast[T]) -> None: Args: chan: A reference to the broadcast channel this sender belongs to. """ - self._chan = chan + self._chan: Broadcast[T] = chan """The broadcast channel this sender belongs to.""" async def send(self, msg: T) -> None: @@ -259,22 +259,22 @@ def __init__(self, uuid: UUID, name: str, maxsize: int, chan: Broadcast[T]) -> N chan: a reference to the Broadcast channel that this receiver belongs to. """ - self._uuid = uuid + self._uuid: UUID = uuid """The UUID to identify the receiver in the broadcast channel's list of receivers.""" - self._name = name + self._name: str = name """The name to identify the receiver. Only used for debugging purposes. """ - self._chan = chan + self._chan: Broadcast[T] = chan """The broadcast channel that this receiver belongs to.""" self._q: deque[T] = deque(maxlen=maxsize) """The receiver's internal message queue.""" - self._active = True + self._active: bool = True """Whether the receiver is still active. If this receiver is converted into a Peekable, it will neither be @@ -399,7 +399,7 @@ def __init__(self, chan: Broadcast[T]) -> None: Args: chan: The broadcast channel this Peekable will try to peek into. """ - self._chan = chan + self._chan: Broadcast[T] = chan """The broadcast channel this Peekable will try to peek into.""" def peek(self) -> T | None: diff --git a/src/frequenz/channels/util/_file_watcher.py b/src/frequenz/channels/util/_file_watcher.py index 1f915f83..e3d61808 100644 --- a/src/frequenz/channels/util/_file_watcher.py +++ b/src/frequenz/channels/util/_file_watcher.py @@ -57,12 +57,12 @@ def __init__( self.event_types: frozenset[FileWatcher.EventType] = frozenset(event_types) """The types of events to watch for.""" - self._stop_event = asyncio.Event() - self._paths = [ + self._stop_event: asyncio.Event = asyncio.Event() + self._paths: list[pathlib.Path] = [ path if isinstance(path, pathlib.Path) else pathlib.Path(path) for path in paths ] - self._awatch = awatch( + self._awatch: abc.AsyncGenerator[set[FileChange], None] = awatch( *self._paths, stop_event=self._stop_event, watch_filter=self._filter_events ) self._awatch_stopped_exc: Exception | None = None diff --git a/src/frequenz/channels/util/_merge.py b/src/frequenz/channels/util/_merge.py index 70022d08..013777d5 100644 --- a/src/frequenz/channels/util/_merge.py +++ b/src/frequenz/channels/util/_merge.py @@ -43,7 +43,9 @@ def __init__(self, *args: Receiver[T]) -> None: Args: *args: sequence of channel receivers. """ - self._receivers = {str(id): recv for id, recv in enumerate(args)} + self._receivers: dict[str, Receiver[T]] = { + str(id): recv for id, recv in enumerate(args) + } self._pending: set[asyncio.Task[Any]] = { asyncio.create_task(anext(recv), name=name) for name, recv in self._receivers.items() diff --git a/src/frequenz/channels/util/_merge_named.py b/src/frequenz/channels/util/_merge_named.py index 78758345..dcbec396 100644 --- a/src/frequenz/channels/util/_merge_named.py +++ b/src/frequenz/channels/util/_merge_named.py @@ -24,7 +24,7 @@ def __init__(self, **kwargs: Receiver[T]) -> None: Args: **kwargs: sequence of channel receivers. """ - self._receivers = kwargs + self._receivers: dict[str, Receiver[T]] = kwargs """The sequence of channel receivers to get the messages to merge.""" self._pending: set[asyncio.Task[Any]] = { diff --git a/src/frequenz/channels/util/_select.py b/src/frequenz/channels/util/_select.py index 43a0e357..ac6e59ad 100644 --- a/src/frequenz/channels/util/_select.py +++ b/src/frequenz/channels/util/_select.py @@ -189,7 +189,7 @@ def __init__(self, selected: Selected[_T]) -> None: """ recv = selected._recv # pylint: disable=protected-access super().__init__(f"Selected receiver {recv} was not handled in the if-chain") - self.selected = selected + self.selected: Selected[_T] = selected """The selected receiver that was not handled.""" From 3a9952be16cc550aee3e48861a7eb612bb952e50 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Sat, 28 Oct 2023 00:07:09 +0200 Subject: [PATCH 08/22] Anycast: Clarify what happens when the buffer is full Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_anycast.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index 8fe9b3f9..c4d23cfa 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -21,6 +21,12 @@ class Anycast(Generic[T]): Anycast channels support multiple senders and multiple receivers. A message sent through a sender will be received by exactly one receiver. + This channel is buffered, and if the senders are faster than the receivers, then the + channel's buffer will fill up. In that case, the senders will block at the + [send()][frequenz.channels.Sender.send] method until the receivers consume the + messages in the channel's buffer. The channel's buffer size can be configured at + creation time via the `limit` argument. + In cases where each message need to be received by every receiver, a [Broadcast][frequenz.channels.Broadcast] channel may be used. From 5aded0ed7a91f3b5f1b3d17551959ca1e75349ee Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Sat, 28 Oct 2023 00:03:07 +0200 Subject: [PATCH 09/22] Anycast: Don't store the `_limit` The `_limit` is already accesible via the `_deque`'s `maxsize`. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_anycast.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index c4d23cfa..8b737ddb 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -73,14 +73,6 @@ def __init__(self, *, maxsize: int = 10) -> None: Args: maxsize: Size of the channel's buffer. """ - self._limit: int = maxsize - """The maximum number of values that can be stored in the channel's buffer. - - If the length of channel's buffer reaches the limit, then the sender - blocks at the [send()][frequenz.channels.Sender.send] method until - a value is consumed. - """ - self._deque: deque[T] = deque(maxlen=maxsize) """The channel's buffer.""" From b525072c2a3f306e0aaec7268f1c5684987ce006 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Sat, 28 Oct 2023 00:06:15 +0200 Subject: [PATCH 10/22] Rename `maxsize` to `limit` `maxsize` comes from `deque` and it is not very clear, and doesn't follow the snake_case convention. Also exposes the `limit` for `Anycast` as a (read-only) property. Signed-off-by: Leandro Lucarella --- benchmarks/benchmark_anycast.py | 2 +- src/frequenz/channels/_anycast.py | 20 +++++++++++++++++--- src/frequenz/channels/_broadcast.py | 14 ++++++-------- tests/test_anycast.py | 2 +- 4 files changed, 25 insertions(+), 13 deletions(-) diff --git a/benchmarks/benchmark_anycast.py b/benchmarks/benchmark_anycast.py index c9e1ffac..e468f1b8 100644 --- a/benchmarks/benchmark_anycast.py +++ b/benchmarks/benchmark_anycast.py @@ -42,7 +42,7 @@ async def benchmark_anycast( Total number of messages received by all channels. """ channels: list[Anycast[int]] = [ - Anycast(maxsize=buffer_size) for _ in range(num_channels) + Anycast(limit=buffer_size) for _ in range(num_channels) ] senders = [ asyncio.create_task(send_msg(num_messages, bcast.new_sender())) diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index 8b737ddb..29374000 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -67,13 +67,15 @@ async def recv(id: int, receiver: channel.Receiver) -> None: Check the `tests` and `benchmarks` directories for more examples. """ - def __init__(self, *, maxsize: int = 10) -> None: + def __init__(self, *, limit: int = 10) -> None: """Create an Anycast channel. Args: - maxsize: Size of the channel's buffer. + limit: The size of the internal buffer in number of messages. If the buffer + is full, then the senders will block until the receivers consume the + messages in the buffer. """ - self._deque: deque[T] = deque(maxlen=maxsize) + self._deque: deque[T] = deque(maxlen=limit) """The channel's buffer.""" self._send_cv: Condition = Condition() @@ -104,6 +106,18 @@ def is_closed(self) -> bool: """ return self._closed + @property + def limit(self) -> int: + """The maximum number of values that can be stored in the channel's buffer. + + If the length of channel's buffer reaches the limit, then the sender + blocks at the [send()][frequenz.channels.Sender.send] method until + a value is consumed. + """ + maxlen = self._deque.maxlen + assert maxlen is not None + return maxlen + async def close(self) -> None: """Close the channel. diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 2d36132f..72754e5d 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -150,9 +150,7 @@ def new_sender(self) -> Sender[T]: """ return Sender(self) - def new_receiver( - self, *, name: str | None = None, maxsize: int = 50 - ) -> Receiver[T]: + def new_receiver(self, *, name: str | None = None, limit: int = 50) -> Receiver[T]: """Create a new broadcast receiver. Broadcast receivers have their own buffer, and when messages are not @@ -161,7 +159,7 @@ def new_receiver( Args: name: A name to identify the receiver in the logs. - maxsize: Size of the receiver's buffer. + limit: Size of the receiver's buffer in number of messages. Returns: A Receiver instance attached to the broadcast channel. @@ -169,7 +167,7 @@ def new_receiver( uuid = uuid4() if name is None: name = str(uuid) - recv: Receiver[T] = Receiver(uuid, name, maxsize, self) + recv: Receiver[T] = Receiver(uuid, name, limit, self) self._receivers[uuid] = weakref.ref(recv) if self.resend_latest and self._latest is not None: recv.enqueue(self._latest) @@ -244,7 +242,7 @@ class Receiver(BaseReceiver[T]): method. """ - def __init__(self, uuid: UUID, name: str, maxsize: int, chan: Broadcast[T]) -> None: + def __init__(self, uuid: UUID, name: str, limit: int, chan: Broadcast[T]) -> None: """Create a broadcast receiver. Broadcast receivers have their own buffer, and when messages are not @@ -255,7 +253,7 @@ def __init__(self, uuid: UUID, name: str, maxsize: int, chan: Broadcast[T]) -> N uuid: A uuid to identify the receiver in the broadcast channel's list of receivers. name: A name to identify the receiver in the logs. - maxsize: Size of the receiver's buffer. + limit: Size of the receiver's buffer in number of messages. chan: a reference to the Broadcast channel that this receiver belongs to. """ @@ -271,7 +269,7 @@ def __init__(self, uuid: UUID, name: str, maxsize: int, chan: Broadcast[T]) -> N self._chan: Broadcast[T] = chan """The broadcast channel that this receiver belongs to.""" - self._q: deque[T] = deque(maxlen=maxsize) + self._q: deque[T] = deque(maxlen=limit) """The receiver's internal message queue.""" self._active: bool = True diff --git a/tests/test_anycast.py b/tests/test_anycast.py index bddfa5b4..488877f0 100644 --- a/tests/test_anycast.py +++ b/tests/test_anycast.py @@ -106,7 +106,7 @@ async def test_anycast_full() -> None: """Ensure send calls to a full channel are blocked.""" buffer_size = 10 timeout = 0.2 - acast: Anycast[int] = Anycast(maxsize=buffer_size) + acast: Anycast[int] = Anycast(limit=buffer_size) receiver = acast.new_receiver() sender = acast.new_sender() From 92122df5fd0743a0d9bec116d63d283175bdb312 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Sat, 28 Oct 2023 00:11:40 +0200 Subject: [PATCH 11/22] Anycast: Add optional `name` argument This is only for debugging purposes, to show in the string representation and logs, and it is added to match other channels. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_anycast.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index 29374000..e76edc9e 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -67,14 +67,24 @@ async def recv(id: int, receiver: channel.Receiver) -> None: Check the `tests` and `benchmarks` directories for more examples. """ - def __init__(self, *, limit: int = 10) -> None: + def __init__(self, *, name: str | None = None, limit: int = 10) -> None: """Create an Anycast channel. Args: + name: The name of the channel. If `None`, an `id(self)`-based name will be + used. This is only for debugging purposes, it will be shown in the + string representation of the channel. limit: The size of the internal buffer in number of messages. If the buffer is full, then the senders will block until the receivers consume the messages in the buffer. """ + self._name: str = f"{id(self):x}" if name is None else name + """The name of the channel. + + This is for debugging purposes, it will be shown in the string representation + of the channel. + """ + self._deque: deque[T] = deque(maxlen=limit) """The channel's buffer.""" @@ -97,6 +107,15 @@ def __init__(self, *, limit: int = 10) -> None: self._closed: bool = False """Whether the channel is closed.""" + @property + def name(self) -> str: + """The name of this channel. + + This is for debugging purposes, it will be shown in the string representation + of this channel. + """ + return self._name + @property def is_closed(self) -> bool: """Whether this channel is closed. From 67c158f1ca9f9c3000ca40471fa1f4f4bfbae4c2 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Sat, 28 Oct 2023 00:18:11 +0200 Subject: [PATCH 12/22] Broadcast: Make `name` optional and keyword-only Since the `name` is only for debugging purposes, it shouldn't be required. If no `name` is specified, a name will be created based on the `id()` of the channel, so channels can be easily uniquely identified. Also makes the `name` accessible via a read-only property. Signed-off-by: Leandro Lucarella --- benchmarks/benchmark_broadcast.py | 8 ++++++-- src/frequenz/channels/_bidirectional.py | 6 ++++-- src/frequenz/channels/_broadcast.py | 18 ++++++++++++++---- src/frequenz/channels/util/_merge.py | 4 ++-- src/frequenz/channels/util/_timer.py | 6 +++--- tests/test_broadcast.py | 24 ++++++++++++------------ 6 files changed, 41 insertions(+), 25 deletions(-) diff --git a/benchmarks/benchmark_broadcast.py b/benchmarks/benchmark_broadcast.py index dcc49b99..08181dfc 100644 --- a/benchmarks/benchmark_broadcast.py +++ b/benchmarks/benchmark_broadcast.py @@ -61,7 +61,9 @@ async def benchmark_broadcast( Returns: Total number of messages received by all receivers. """ - channels: list[Broadcast[int]] = [Broadcast("meter") for _ in range(num_channels)] + channels: list[Broadcast[int]] = [ + Broadcast(name="meter") for _ in range(num_channels) + ] senders: list[asyncio.Task[Any]] = [ asyncio.create_task(send_msg(num_messages, bcast.new_sender())) for bcast in channels @@ -104,7 +106,9 @@ async def benchmark_single_task_broadcast( Returns: Total number of messages received by all receivers. """ - channels: list[Broadcast[int]] = [Broadcast("meter") for _ in range(num_channels)] + channels: list[Broadcast[int]] = [ + Broadcast(name="meter") for _ in range(num_channels) + ] senders = [b.new_sender() for b in channels] recv_tracker = 0 diff --git a/src/frequenz/channels/_bidirectional.py b/src/frequenz/channels/_bidirectional.py index 779ae5c3..197d5648 100644 --- a/src/frequenz/channels/_bidirectional.py +++ b/src/frequenz/channels/_bidirectional.py @@ -122,11 +122,13 @@ def __init__(self, *, client_id: str, service_id: str) -> None: self._client_id: str = client_id """The name for the client, used to name the channels.""" - self._request_channel: Broadcast[T] = Broadcast(f"req_{service_id}_{client_id}") + self._request_channel: Broadcast[T] = Broadcast( + name=f"req_{service_id}_{client_id}" + ) """The channel to send requests.""" self._response_channel: Broadcast[U] = Broadcast( - f"resp_{service_id}_{client_id}" + name=f"resp_{service_id}_{client_id}" ) """The channel to send responses.""" diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 72754e5d..d3a3750f 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -71,12 +71,13 @@ async def recv(id: int, receiver: channel.Receiver) -> None: Check the `tests` and `benchmarks` directories for more examples. """ - def __init__(self, name: str, *, resend_latest: bool = False) -> None: + def __init__(self, *, name: str | None, resend_latest: bool = False) -> None: """Create a Broadcast channel. Args: - name: A name for the broadcast channel, typically based on the type of data - sent through it. Used to identify the channel in the logs. + name: The name of the channel. If `None`, an `id(self)`-based name will be + used. This is only for debugging purposes, it will be shown in the + string representation of the channel. resend_latest: When True, every time a new receiver is created with `new_receiver`, it will automatically get sent the latest value on the channel. This allows new receivers on slow streams to get the latest @@ -85,7 +86,7 @@ def __init__(self, name: str, *, resend_latest: bool = False) -> None: data/reporting channels, but is not recommended for use in channels that stream control instructions. """ - self._name: str = name + self._name: str = f"{id(self):_}" if name is None else name """The name of the broadcast channel. Only used for debugging purposes. @@ -117,6 +118,15 @@ def __init__(self, name: str, *, resend_latest: bool = False) -> None: self._latest: T | None = None """The latest value sent to the channel.""" + @property + def name(self) -> str: + """The name of this channel. + + This is for debugging purposes, it will be shown in the string representation + of this channel. + """ + return self._name + @property def is_closed(self) -> bool: """Whether this channel is closed. diff --git a/src/frequenz/channels/util/_merge.py b/src/frequenz/channels/util/_merge.py index 013777d5..8676e72d 100644 --- a/src/frequenz/channels/util/_merge.py +++ b/src/frequenz/channels/util/_merge.py @@ -22,8 +22,8 @@ class Merge(Receiver[T]): ```python from frequenz.channels import Broadcast - channel1 = Broadcast[int]("input-chan-1") - channel2 = Broadcast[int]("input-chan-2") + channel1 = Broadcast[int](name="input-chan-1") + channel2 = Broadcast[int](name="input-chan-2") receiver1 = channel1.new_receiver() receiver2 = channel2.new_receiver() diff --git a/src/frequenz/channels/util/_timer.py b/src/frequenz/channels/util/_timer.py index ef577897..eeed4410 100644 --- a/src/frequenz/channels/util/_timer.py +++ b/src/frequenz/channels/util/_timer.py @@ -329,7 +329,7 @@ class Timer(Receiver[timedelta]): from frequenz.channels import Broadcast timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False) - chan = Broadcast[int]("input-chan") + chan = Broadcast[int](name="input-chan") battery_data = chan.new_receiver() timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False) @@ -359,8 +359,8 @@ def do_heavy_processing(data: int): logging.info("Heavy processing data: %d", data) timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False) - chan1 = Broadcast[int]("input-chan-1") - chan2 = Broadcast[int]("input-chan-2") + chan1 = Broadcast[int](name="input-chan-1") + chan2 = Broadcast[int](name="input-chan-2") battery_data = chan1.new_receiver() heavy_process = chan2.new_receiver() async for selected in select(battery_data, heavy_process, timer): diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index 7db4b290..8d1f4aec 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -21,7 +21,7 @@ async def test_broadcast() -> None: """Ensure sent messages are received by all receivers.""" - bcast: Broadcast[int] = Broadcast("meter_5") + bcast: Broadcast[int] = Broadcast(name="meter_5") num_receivers = 5 num_senders = 5 @@ -70,7 +70,7 @@ async def update_tracker_on_receive(receiver_id: int, recv: Receiver[int]) -> No async def test_broadcast_none_values() -> None: """Ensure None values can be sent and received.""" - bcast: Broadcast[int | None] = Broadcast("any_channel") + bcast: Broadcast[int | None] = Broadcast(name="any_channel") sender = bcast.new_sender() receiver = bcast.new_receiver() @@ -87,7 +87,7 @@ async def test_broadcast_none_values() -> None: async def test_broadcast_after_close() -> None: """Ensure closed channels can't get new messages.""" - bcast: Broadcast[int] = Broadcast("meter_5") + bcast: Broadcast[int] = Broadcast(name="meter_5") receiver = bcast.new_receiver() sender = bcast.new_sender() @@ -105,14 +105,14 @@ async def test_broadcast_after_close() -> None: async def test_broadcast_overflow() -> None: """Ensure messages sent to full broadcast receivers get dropped.""" - bcast: Broadcast[int] = Broadcast("meter_5") + bcast: Broadcast[int] = Broadcast(name="meter_5") big_recv_size = 10 small_recv_size = int(big_recv_size / 2) sender = bcast.new_sender() - big_receiver = bcast.new_receiver(name="named-recv", maxsize=big_recv_size) - small_receiver = bcast.new_receiver(maxsize=small_recv_size) + big_receiver = bcast.new_receiver(name="named-recv", limit=big_recv_size) + small_receiver = bcast.new_receiver(limit=small_recv_size) async def drain_receivers() -> tuple[int, int]: big_sum = 0 @@ -156,7 +156,7 @@ async def drain_receivers() -> tuple[int, int]: async def test_broadcast_resend_latest() -> None: """Check if new receivers get the latest value when resend_latest is set.""" - bcast: Broadcast[int] = Broadcast("new_recv_test", resend_latest=True) + bcast: Broadcast[int] = Broadcast(name="new_recv_test", resend_latest=True) sender = bcast.new_sender() old_recv = bcast.new_receiver() @@ -173,7 +173,7 @@ async def test_broadcast_resend_latest() -> None: async def test_broadcast_no_resend_latest() -> None: """Ensure new receivers don't get the latest value when resend_latest isn't set.""" - bcast: Broadcast[int] = Broadcast("new_recv_test", resend_latest=False) + bcast: Broadcast[int] = Broadcast(name="new_recv_test", resend_latest=False) sender = bcast.new_sender() old_recv = bcast.new_receiver() @@ -189,7 +189,7 @@ async def test_broadcast_no_resend_latest() -> None: async def test_broadcast_peek() -> None: """Ensure we are able to peek into broadcast channels.""" - bcast: Broadcast[int] = Broadcast("peek-test") + bcast: Broadcast[int] = Broadcast(name="peek-test") receiver = bcast.new_receiver() peekable = receiver.into_peekable() sender = bcast.new_sender() @@ -215,7 +215,7 @@ async def test_broadcast_peek() -> None: async def test_broadcast_async_iterator() -> None: """Check that the broadcast receiver works as an async iterator.""" - bcast: Broadcast[int] = Broadcast("iter_test") + bcast: Broadcast[int] = Broadcast(name="iter_test") sender = bcast.new_sender() receiver = bcast.new_receiver() @@ -238,7 +238,7 @@ async def send_values() -> None: async def test_broadcast_map() -> None: """Ensure map runs on all incoming messages.""" - chan = Broadcast[int]("input-chan") + chan = Broadcast[int](name="input-chan") sender = chan.new_sender() # transform int receiver into bool receiver. @@ -253,7 +253,7 @@ async def test_broadcast_map() -> None: async def test_broadcast_receiver_drop() -> None: """Ensure deleted receivers get cleaned up.""" - chan = Broadcast[int]("input-chan") + chan = Broadcast[int](name="input-chan") sender = chan.new_sender() receiver1 = chan.new_receiver() From 5380ebc97014772ce689acf5eff2196f3055b99c Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Tue, 31 Oct 2023 12:52:25 +0100 Subject: [PATCH 13/22] Add name property to bidirectional Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_bidirectional.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/frequenz/channels/_bidirectional.py b/src/frequenz/channels/_bidirectional.py index 197d5648..393d0300 100644 --- a/src/frequenz/channels/_bidirectional.py +++ b/src/frequenz/channels/_bidirectional.py @@ -146,6 +146,15 @@ def __init__(self, *, client_id: str, service_id: str) -> None: ) """The handle for the service side to send/receive values.""" + @property + def name(self) -> str: + """The name of this channel. + + This is for debugging purposes, it will be shown in the string representation + of this channel. + """ + return self._name + @property def is_closed(self) -> bool: """Whether this channel is closed. From 2a03e512deed02eb0cd377e20ce624f27cf64db0 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Sat, 28 Oct 2023 00:20:56 +0200 Subject: [PATCH 14/22] Broadcast: Add the `latest` message as a property Make the `latest` message accessible through a read-only property. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_broadcast.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index d3a3750f..32019974 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -127,6 +127,15 @@ def name(self) -> str: """ return self._name + @property + def latest(self) -> T | None: + """The latest value sent to the channel. + + Returns: + The latest value sent to the channel, or `None` if no value was sent yet. + """ + return self._latest + @property def is_closed(self) -> bool: """Whether this channel is closed. From 3d140125d67be372b5041af437b5f450f5cf4619 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Sat, 28 Oct 2023 00:41:16 +0200 Subject: [PATCH 15/22] Broadcast: Use `hash()` instead of a UUID UUIDs are expensive to create and not really necessary in this case as we are just keeping a local list of unique objects, so using the object's hash is enough. We also use a `id(self)`-based name by default if a `name` was not provided when creating a `new_receiver()`. A simple test using `timeit` shows a 2 orders of magnitude improvement in Python 3.11: ```console $ python -m timeit -s "import uuid" "uuid.uuid4()" 100000 loops, best of 5: 2.67 usec per loop $ python -m timeit "hash(__name__)" 10000000 loops, best of 5: 19.6 nsec per loop $ python -m timeit "id(__name__)" 10000000 loops, best of 5: 20.1 nsec per loop ``` Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_broadcast.py | 39 +++++++++++++---------------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 32019974..536595ce 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -10,7 +10,6 @@ from asyncio import Condition from collections import deque from typing import Generic -from uuid import UUID, uuid4 from ._base_classes import Peekable as BasePeekable from ._base_classes import Receiver as BaseReceiver @@ -109,8 +108,8 @@ def __init__(self, *, name: str | None, resend_latest: bool = False) -> None: self._recv_cv: Condition = Condition() """The condition to wait for data in the channel's buffer.""" - self._receivers: dict[UUID, weakref.ReferenceType[Receiver[T]]] = {} - """The receivers attached to the channel, indexed by their UUID.""" + self._receivers: dict[int, weakref.ReferenceType[Receiver[T]]] = {} + """The receivers attached to the channel, indexed by their hash().""" self._closed: bool = False """Whether the channel is closed.""" @@ -183,11 +182,8 @@ def new_receiver(self, *, name: str | None = None, limit: int = 50) -> Receiver[ Returns: A Receiver instance attached to the broadcast channel. """ - uuid = uuid4() - if name is None: - name = str(uuid) - recv: Receiver[T] = Receiver(uuid, name, limit, self) - self._receivers[uuid] = weakref.ref(recv) + recv: Receiver[T] = Receiver(name, limit, self) + self._receivers[hash(recv)] = weakref.ref(recv) if self.resend_latest and self._latest is not None: recv.enqueue(self._latest) return recv @@ -240,14 +236,14 @@ async def send(self, msg: T) -> None: ) self._chan._latest = msg stale_refs = [] - for name, recv_ref in self._chan._receivers.items(): + for _hash, recv_ref in self._chan._receivers.items(): recv = recv_ref() if recv is None: - stale_refs.append(name) + stale_refs.append(_hash) continue recv.enqueue(msg) - for name in stale_refs: - del self._chan._receivers[name] + for _hash in stale_refs: + del self._chan._receivers[_hash] async with self._chan._recv_cv: self._chan._recv_cv.notify_all() # pylint: enable=protected-access @@ -261,7 +257,7 @@ class Receiver(BaseReceiver[T]): method. """ - def __init__(self, uuid: UUID, name: str, limit: int, chan: Broadcast[T]) -> None: + def __init__(self, name: str | None, limit: int, chan: Broadcast[T]) -> None: """Create a broadcast receiver. Broadcast receivers have their own buffer, and when messages are not @@ -269,17 +265,15 @@ def __init__(self, uuid: UUID, name: str, limit: int, chan: Broadcast[T]) -> Non get dropped just in this receiver. Args: - uuid: A uuid to identify the receiver in the broadcast channel's - list of receivers. - name: A name to identify the receiver in the logs. + name: A name to identify the receiver in the logs. If `None` an + `id(self)`-based name will be used. This is only for debugging + purposes, it will be shown in the string representation of the + receiver. limit: Size of the receiver's buffer in number of messages. chan: a reference to the Broadcast channel that this receiver belongs to. """ - self._uuid: UUID = uuid - """The UUID to identify the receiver in the broadcast channel's list of receivers.""" - - self._name: str = name + self._name: str = name if name is not None else f"{id(self):_}" """The name to identify the receiver. Only used for debugging purposes. @@ -361,8 +355,9 @@ def _deactivate(self) -> None: """Set the receiver as inactive and remove it from the channel.""" self._active = False # pylint: disable=protected-access - if self._uuid in self._chan._receivers: - del self._chan._receivers[self._uuid] + _hash = hash(self) + if _hash in self._chan._receivers: + del self._chan._receivers[_hash] # pylint: enable=protected-access def consume(self) -> T: From 19f503ca79dde779cd7ca32f5a428503f1d029f9 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Sun, 29 Oct 2023 17:04:19 +0100 Subject: [PATCH 16/22] Bidirectional: unify `client_id` and `service_id` This is only used to create the underlying `Broadcast` channel's names, so instead of using 2 separate strings, just use a plain `name` as with other channels. Also like with other channels, make the `name` optional and default to a more readable `id(self)` representation (using `_` separators). Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_bidirectional.py | 15 +++++---------- tests/test_bidirectional.py | 12 +++--------- 2 files changed, 8 insertions(+), 19 deletions(-) diff --git a/src/frequenz/channels/_bidirectional.py b/src/frequenz/channels/_bidirectional.py index 393d0300..18d1cff3 100644 --- a/src/frequenz/channels/_bidirectional.py +++ b/src/frequenz/channels/_bidirectional.py @@ -112,24 +112,19 @@ def consume(self) -> W: err.__cause__ = this_chan_error raise err - def __init__(self, *, client_id: str, service_id: str) -> None: + def __init__(self, *, name: str | None = None) -> None: """Create a `Bidirectional` instance. Args: - client_id: A name for the client, used to name the channels. - service_id: A name for the service end of the channels. + name: A name for the client, used to name the channels. """ - self._client_id: str = client_id + self._name: str = f"{id(self):_}" if name is None else name """The name for the client, used to name the channels.""" - self._request_channel: Broadcast[T] = Broadcast( - name=f"req_{service_id}_{client_id}" - ) + self._request_channel: Broadcast[T] = Broadcast(name=f"{self._name}:request") """The channel to send requests.""" - self._response_channel: Broadcast[U] = Broadcast( - name=f"resp_{service_id}_{client_id}" - ) + self._response_channel: Broadcast[U] = Broadcast(name=f"{self._name}:response") """The channel to send responses.""" self._client_handle: Bidirectional.Handle[T, U] = Bidirectional.Handle( diff --git a/tests/test_bidirectional.py b/tests/test_bidirectional.py index 11165ff2..30d59335 100644 --- a/tests/test_bidirectional.py +++ b/tests/test_bidirectional.py @@ -18,9 +18,7 @@ async def test_request_response() -> None: """Ensure bi-directional communication is possible.""" - req_resp: Bidirectional[int, str] = Bidirectional( - client_id="test_client", service_id="test_service" - ) + req_resp: Bidirectional[int, str] = Bidirectional(name="test_service") async def service(handle: Bidirectional.Handle[str, int]) -> None: while True: @@ -54,9 +52,7 @@ async def service(handle: Bidirectional.Handle[str, int]) -> None: async def test_sender_error_chaining() -> None: """Ensure bi-directional communication is possible.""" - req_resp: Bidirectional[int, str] = Bidirectional( - client_id="test_client", service_id="test_service" - ) + req_resp: Bidirectional[int, str] = Bidirectional(name="test_service") await req_resp._response_channel.close() # pylint: disable=protected-access @@ -72,9 +68,7 @@ async def test_sender_error_chaining() -> None: async def test_consume_error_chaining() -> None: """Ensure bi-directional communication is possible.""" - req_resp: Bidirectional[int, str] = Bidirectional( - client_id="test_client", service_id="test_service" - ) + req_resp: Bidirectional[int, str] = Bidirectional(name="test_service") await req_resp._request_channel.close() # pylint: disable=protected-access From 9205e871ec56e55eb4aa3afb5fd19abc3f5257eb Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Sat, 28 Oct 2023 00:45:13 +0200 Subject: [PATCH 17/22] Event: Make the default name more readable Use underscore separators to format the `id(self)`. Also only use the default if `name` is `None`. Before an empty string would also be changed to the default, but if an user passed an empty string, it is better to leave it untouched. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/util/_event.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/frequenz/channels/util/_event.py b/src/frequenz/channels/util/_event.py index 421b2965..ca43e262 100644 --- a/src/frequenz/channels/util/_event.py +++ b/src/frequenz/channels/util/_event.py @@ -49,14 +49,14 @@ def __init__(self, *, name: str | None = None) -> None: """Create a new instance. Args: - name: The name of the receiver. If `None` the `id(self)` will be used as - the name. This is only for debugging purposes, it will be shown in the + name: The name of the receiver. If `None` an `id(self)`-based name will be + used. This is only for debugging purposes, it will be shown in the string representation of the receiver. """ self._event: _asyncio.Event = _asyncio.Event() """The event that is set when the receiver is ready.""" - self._name: str = name or str(id(self)) + self._name: str = f"{id(self):_}" if name is None else name """The name of the receiver. This is for debugging purposes, it will be shown in the string representation From 25a9099f6ebd79046a215281bf6c3c70a5d6311d Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Wed, 1 Nov 2023 15:12:08 +0100 Subject: [PATCH 18/22] Add descriptive `str` and `repr` implementations When debugging and logging objects it is very useful to get a descriptive and clear string representation. The new representation uses the class name and the user defined name (if any) for `str` and a `repr` that tries to show how the class was created but also important internal state. Signed-off-by: Leandro Lucarella # ------------------------ >8 ------------------------ # Do not modify or remove the line above. # Everything below it will be ignored. # # Conflicts: # src/frequenz/channels/_base_classes.py # src/frequenz/channels/_bidirectional.py --- src/frequenz/channels/_anycast.py | 27 +++++++++++++ src/frequenz/channels/_base_classes.py | 8 ++++ src/frequenz/channels/_bidirectional.py | 23 +++++++++++ src/frequenz/channels/_broadcast.py | 44 +++++++++++++++++++++ src/frequenz/channels/util/_file_watcher.py | 14 +++++++ src/frequenz/channels/util/_merge.py | 17 ++++++++ src/frequenz/channels/util/_merge_named.py | 18 ++++++++- 7 files changed, 150 insertions(+), 1 deletion(-) diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index e76edc9e..f960541f 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -171,6 +171,17 @@ def new_receiver(self) -> Receiver[T]: """ return Receiver(self) + def __str__(self) -> str: + """Return a string representation of this channel.""" + return f"{type(self).__name__}:{self._name}" + + def __repr__(self) -> str: + """Return a string representation of this channel.""" + return ( + f"{type(self).__name__}(name={self._name!r}, limit={self.limit!r}):<" + f"current={len(self._deque)!r}, closed={self._closed!r}>" + ) + class Sender(BaseSender[T]): """A sender to send messages to an Anycast channel. @@ -217,6 +228,14 @@ async def send(self, msg: T) -> None: self._chan._recv_cv.notify(1) # pylint: enable=protected-access + def __str__(self) -> str: + """Return a string representation of this sender.""" + return f"{self._chan}:{type(self).__name__}" + + def __repr__(self) -> str: + """Return a string representation of this sender.""" + return f"{type(self).__name__}({self._chan!r})" + class _Empty: """A sentinel value to indicate that a value has not been set.""" @@ -291,3 +310,11 @@ def consume(self) -> T: self._next = _Empty return next_val + + def __str__(self) -> str: + """Return a string representation of this receiver.""" + return f"{self._chan}:{type(self).__name__}" + + def __repr__(self) -> str: + """Return a string representation of this receiver.""" + return f"{type(self).__name__}({self._chan!r})" diff --git a/src/frequenz/channels/_base_classes.py b/src/frequenz/channels/_base_classes.py index 38aa1649..55a13644 100644 --- a/src/frequenz/channels/_base_classes.py +++ b/src/frequenz/channels/_base_classes.py @@ -206,3 +206,11 @@ def consume(self) -> U: # noqa: DOC502 return self._transform( self._receiver.consume() ) # pylint: disable=protected-access + + def __str__(self) -> str: + """Return a string representation of the timer.""" + return f"{type(self).__name__}:{self._receiver}:{self._transform}" + + def __repr__(self) -> str: + """Return a string representation of the timer.""" + return f"{type(self).__name__}({self._receiver!r}, {self._transform!r})" diff --git a/src/frequenz/channels/_bidirectional.py b/src/frequenz/channels/_bidirectional.py index 18d1cff3..7011ebc3 100644 --- a/src/frequenz/channels/_bidirectional.py +++ b/src/frequenz/channels/_bidirectional.py @@ -112,6 +112,17 @@ def consume(self) -> W: err.__cause__ = this_chan_error raise err + def __str__(self) -> str: + """Return a string representation of this handle.""" + return f"{type(self).__name__}:{self._chan}" + + def __repr__(self) -> str: + """Return a string representation of this handle.""" + return ( + f"{type(self).__name__}(channel={self._chan!r}, " + f"sender={self._sender!r}, receiver={self._receiver!r})" + ) + def __init__(self, *, name: str | None = None) -> None: """Create a `Bidirectional` instance. @@ -180,3 +191,15 @@ def service_handle(self) -> Bidirectional.Handle[U, T]: Object to send/receive messages with. """ return self._service_handle + + def __str__(self) -> str: + """Return a string representation of this channel.""" + return f"{type(self).__name__}:{self._name}" + + def __repr__(self) -> str: + """Return a string representation of this channel.""" + return ( + f"{type(self).__name__}(name={self._name!r}):<" + f"request_channel={self._request_channel!r}, " + f"response_channel={self._response_channel!r}>" + ) diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 536595ce..9e83e0df 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -200,6 +200,20 @@ def new_peekable(self) -> Peekable[T]: """ return Peekable(self) + def __str__(self) -> str: + """Return a string representation of this receiver.""" + return f"{type(self).__name__}:{self._name}" + + def __repr__(self) -> str: + """Return a string representation of this channel.""" + return ( + f"{type(self).__name__}(name={self._name!r}, " + f"resend_latest={self.resend_latest!r}):<" + f"latest={self._latest!r}, " + f"receivers={len(self._receivers)!r}, " + f"closed={self._closed!r}>" + ) + class Sender(BaseSender[T]): """A sender to send messages to the broadcast channel. @@ -248,6 +262,14 @@ async def send(self, msg: T) -> None: self._chan._recv_cv.notify_all() # pylint: enable=protected-access + def __str__(self) -> str: + """Return a string representation of this sender.""" + return f"{self._chan}:{type(self).__name__}" + + def __repr__(self) -> str: + """Return a string representation of this sender.""" + return f"{type(self).__name__}({self._chan!r})" + class Receiver(BaseReceiver[T]): """A receiver to receive messages from the broadcast channel. @@ -396,6 +418,20 @@ def into_peekable(self) -> Peekable[T]: self._deactivate() return Peekable(self._chan) + def __str__(self) -> str: + """Return a string representation of this receiver.""" + return f"{self._chan}:{type(self).__name__}" + + def __repr__(self) -> str: + """Return a string representation of this receiver.""" + limit = self._q.maxlen + assert limit is not None + return ( + f"{type(self).__name__}(name={self._name!r}, limit={limit!r}, " + f"{self._chan!r}):" + ) + class Peekable(BasePeekable[T]): """A Peekable to peek into broadcast channels. @@ -422,3 +458,11 @@ def peek(self) -> T | None: has been sent to the channel yet, or if the channel is closed. """ return self._chan._latest # pylint: disable=protected-access + + def __str__(self) -> str: + """Return a string representation of this receiver.""" + return f"{self._chan}:{type(self).__name__}" + + def __repr__(self) -> str: + """Return a string representation of this receiver.""" + return f"{type(self).__name__}({self._chan!r}):" diff --git a/src/frequenz/channels/util/_file_watcher.py b/src/frequenz/channels/util/_file_watcher.py index e3d61808..5291a0f9 100644 --- a/src/frequenz/channels/util/_file_watcher.py +++ b/src/frequenz/channels/util/_file_watcher.py @@ -137,3 +137,17 @@ def consume(self) -> Event: return FileWatcher.Event( type=FileWatcher.EventType(change), path=pathlib.Path(path_str) ) + + def __str__(self) -> str: + """Return a string representation of this receiver.""" + if len(self._paths) > 3: + paths = [str(p) for p in self._paths[:3]] + paths.append("…") + else: + paths = [str(p) for p in self._paths] + event_types = [event_type.name for event_type in self.event_types] + return f"{type(self).__name__}:{','.join(event_types)}:{','.join(paths)}" + + def __repr__(self) -> str: + """Return a string representation of this receiver.""" + return f"{type(self).__name__}({self._paths!r}, {self.event_types!r})" diff --git a/src/frequenz/channels/util/_merge.py b/src/frequenz/channels/util/_merge.py index 8676e72d..5baeea31 100644 --- a/src/frequenz/channels/util/_merge.py +++ b/src/frequenz/channels/util/_merge.py @@ -4,6 +4,7 @@ """Merge messages coming from channels into a single stream.""" import asyncio +import itertools from collections import deque from typing import Any @@ -117,3 +118,19 @@ def consume(self) -> T: assert self._results, "`consume()` must be preceded by a call to `ready()`" return self._results.popleft() + + def __str__(self) -> str: + """Return a string representation of this receiver.""" + if len(self._receivers) > 3: + receivers = [str(p) for p in itertools.islice(self._receivers.values(), 3)] + receivers.append("…") + else: + receivers = [str(p) for p in self._receivers.values()] + return f"{type(self).__name__}:{','.join(receivers)}" + + def __repr__(self) -> str: + """Return a string representation of this receiver.""" + return ( + f"{type(self).__name__}(" + f"{', '.join(f'{k}={v!r}' for k, v in self._receivers.items())})" + ) diff --git a/src/frequenz/channels/util/_merge_named.py b/src/frequenz/channels/util/_merge_named.py index dcbec396..271f72da 100644 --- a/src/frequenz/channels/util/_merge_named.py +++ b/src/frequenz/channels/util/_merge_named.py @@ -2,8 +2,8 @@ # Copyright © 2022 Frequenz Energy-as-a-Service GmbH """Merge messages coming from channels into a single stream containing name of message.""" - import asyncio +import itertools from collections import deque from typing import Any @@ -102,3 +102,19 @@ def consume(self) -> tuple[str, T]: assert self._results, "`consume()` must be preceded by a call to `ready()`" return self._results.popleft() + + def __str__(self) -> str: + """Return a string representation of this receiver.""" + if len(self._receivers) > 3: + receivers = [str(p) for p in itertools.islice(self._receivers, 3)] + receivers.append("…") + else: + receivers = [str(p) for p in self._receivers] + return f"{type(self).__name__}:{','.join(receivers)}" + + def __repr__(self) -> str: + """Return a string representation of this receiver.""" + return ( + f"{type(self).__name__}(" + f"{', '.join(f'{k}={v!r}' for k, v in self._receivers.items())})" + ) From b2ea4faee8803accc0c000deb2ff88da7908fc05 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Sat, 28 Oct 2023 00:49:09 +0200 Subject: [PATCH 19/22] Use the string representation for logging Now that we have nice string representations, it is no longer needed to build one every time we log something. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_broadcast.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 9e83e0df..8b49004d 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -327,9 +327,8 @@ def enqueue(self, msg: T) -> None: if len(self._q) == self._q.maxlen: self._q.popleft() logger.warning( - "Broadcast receiver [%s:%s] is full. Oldest message was dropped.", - self._chan._name, # pylint: disable=protected-access - self._name, + "Broadcast receiver [%s] is full. Oldest message was dropped.", + self, ) self._q.append(msg) From f0801b94b18914e801a8b5f6f6ba202161b4e1a7 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Thu, 2 Nov 2023 11:09:04 +0100 Subject: [PATCH 20/22] Broadcast: Move the `resend_latest` attribute to the end This is so it is closer to the other public attributes (properties) of the class. Also remove the mention to the default, as the default belongs to the `__init__` argument docs. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_broadcast.py | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 8b49004d..c1d8304d 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -91,20 +91,6 @@ def __init__(self, *, name: str | None, resend_latest: bool = False) -> None: Only used for debugging purposes. """ - self.resend_latest: bool = resend_latest - """Whether to resend the latest value to new receivers. - - It is `False` by default. - - When `True`, every time a new receiver is created with `new_receiver`, it will - automatically get sent the latest value on the channel. This allows new - receivers on slow streams to get the latest value as soon as they are created, - without having to wait for the next message on the channel to arrive. - - It is safe to be set in data/reporting channels, but is not recommended for use - in channels that stream control instructions. - """ - self._recv_cv: Condition = Condition() """The condition to wait for data in the channel's buffer.""" @@ -117,6 +103,18 @@ def __init__(self, *, name: str | None, resend_latest: bool = False) -> None: self._latest: T | None = None """The latest value sent to the channel.""" + self.resend_latest: bool = resend_latest + """Whether to resend the latest value to new receivers. + + When `True`, every time a new receiver is created with `new_receiver`, it will + automatically get sent the latest value on the channel. This allows new + receivers on slow streams to get the latest value as soon as they are created, + without having to wait for the next message on the channel to arrive. + + It is safe to be set in data/reporting channels, but is not recommended for use + in channels that stream control instructions. + """ + @property def name(self) -> str: """The name of this channel. From a698896511b8f47c0408bd24c791194c892b1970 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Thu, 2 Nov 2023 10:35:43 +0100 Subject: [PATCH 21/22] Update release notes Signed-off-by: Leandro Lucarella --- RELEASE_NOTES.md | 86 +++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 74 insertions(+), 12 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 7678e1d8..5220af66 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,26 +6,88 @@ The `Timer` now can be started with a delay. ## Upgrading -* Internal variable names in the `Anycast` and `Broadcast` implementations are now private. +* `Anycast` + + - `__init__`: The `maxsize` argument was renamed to `limit` and made keyword-only and a new optional, keyword-only `name` argument was added. If a `name` is not specified, it will be generated from the `id()` of the instance. + + You should instantiate using `Anycast(name=..., limit=...)` (or `Anycast()` if the defaults are enough) instead of `Anycast(...)` or `Anycast(maxsize=...)`. + +* `Bidirectional` + + - The `client_id` and `service_id` arguments were merged into an optional, keyword-only `name`. If a `name` is not specified, it will be generated from the `id()` of the instance. + + You should instantiate using `Bidirectional(name=...)` (or `Bidirectional()` if the default name is enough) instead of `Bidirectional(..., ...)` or `Bidirectional(client_id=..., service_id=...)`. + + - `new_receiver`: The `maxsize` argument was renamed to `limit` and made keyword-only; the `name` argument was also made keyword-only. If a `name` is not specified, it will be generated from the `id()` of the instance instead of a random UUID. + + You should instantiate using `Broadcast(name=name, resend_latest=resend_latest)` (or `Broadcast()` if the defaults are enough) instead of `Broadcast(name)` or `Broadcast(name, resend_latest)`. + +* `Broadcast` + + - `__init__`: The `name` argument was made optional and keyword-only; `resend_latest` was also made keyword-only. If a `name` is not specified, it will be generated from the `id()` of the instance. + + You should instantiate using `Broadcast(name=name, resend_latest=resend_latest)` (or `Broadcast()` if the defaults are enough) instead of `Broadcast(name)` or `Broadcast(name, resend_latest)`. + +* `Event` + + - `__init__`: The `name` argument was made keyword-only. The default was changed to a more readable version of `id(self)`. + + You should instantiate using `Event(name=...)` instead of `Event(...)`. + +* All exceptions that took `Any` as the `message` argument now take `str` instead. + + If you were passing a non-`str` value to an exception, you should convert it using `str(value)` before passing it to the exception. ## New Features -* `Timer()`, `Timer.timeout()`, `Timer.periodic()` and `Timer.reset()` now take an optional `start_delay` option to make the timer start after some delay. +* `Anycast` + + - The following new read-only properties were added: + + - `name`: The name of the channel. + - `limit`: The maximum number of messages that can be sent to the channel. + - `is_closed`: Whether the channel is closed. + + - A more useful implementation of `__str__ and `__repr__` were added for the channel and its senders and receivers. + +* `Bidirectional` + + - The following new read-only properties were added: + + - `name`: The name of the channel (read-only). + - `is_closed`: Whether the channel is closed (read-only). + + - A more useful implementation of `__str__ and `__repr__` were added for the channel and the client and service handles. + +* `Broadcast` + + - The following new read-only properties were added: + + - `name`: The name of the channel. + - `latest`: The latest message sent to the channel. + - `is_closed`: Whether the channel is closed. + + - A more useful implementation of `__str__ and `__repr__` were added for the channel and the client and service handles. + +* `FileWatcher` + + - A more useful implementation of `__str__ and `__repr__` were added. + +* `Merge` + + - A more useful implementation of `__str__ and `__repr__` were added. + +* `MergeNamed` - This can be useful, for example, if the timer needs to be *aligned* to a particular time. The alternative to this would be to `sleep()` for the time needed to align the timer, but if the `sleep()` call gets delayed because the event loop is busy, then a re-alignment is needed and this could go on for a while. The only way to guarantee a certain alignment (with a reasonable precision) is to delay the timer start. + - A more useful implementation of `__str__ and `__repr__` were added. -* `Broadcast.resend_latest` is now a public attribute, allowing it to be changed after the channel is created. +* `Peekable` -* The arm64 architecture is now officially supported. + - A more useful implementation of `__str__ and `__repr__` were added. -* The documentation was improved to: +* `Receiver` - - Show signatures with types. - - Show the inherited members. - - Documentation for pre-releases are now published. - - Show the full tag name as the documentation version. - - All development branches now have their documentation published (there is no `next` version anymore). - - Fix the order of the documentation versions. + - `map()`: The returned map object now has a more useful implementation of `__str__ and `__repr__`. ## Bug Fixes From f7880b87cb68d707f5dee6bfbfe5254f06b2cbfe Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Thu, 2 Nov 2023 13:48:15 +0100 Subject: [PATCH 22/22] RFC: Revamp the repository modules structure These are the most notable changes: - The `_base_classes` modules is split into the `_receiver` and `_sender` modules. - Sender and receiver exceptions are moved from the `_exceptions` module to the new `_sender` and `_receiver` modules. - The `frequenz.channel` package now only exposes the new `_receiver` and `_sender` modules, and the `_exceptions` and `_select` modules. - All channels and receiver modules are moved to the `frequenz.channels` package and made public. - All public nested classes were moved to the top level of the corresponding module. Advantages of this structure: - It completely removes circular dependencies. - It avoids importing unnecessary code. In Python importing means code execution, so even when it is done only at startup, it adds some overhead. Also by not importing unnecessary code, we can potentially add real optional dependencies. For example, if a project doesn't need to use a file watcher, they could avoid pulling the unnecessary `awatch` dependency. This is not done in this PR, but it could be done in the future. - By having the channels and receivers on their own module we can move public nested classes were moved to the top level of the corresponding module withough having to add superflous prefixes for support classes. - Removing nested classes avoids having to use hacky constructions, like requiring the use of `from __future__ import annotations`, types as strings (nested classes) and confusing the `mkdocstrings` tools when extracting and cross-linking docs. - The `frequenz.channels` package exposes all classes that are used once you have setted up your channels, so the importing should still be pretty terse in most cases and only `frequenz.channels` would need to be imported in modules only taking some receivers and iterating or selecting over them. Signed-off-by: Leandro Lucarella --- .github/labeler.yml | 18 +- benchmarks/benchmark_anycast.py | 3 +- benchmarks/benchmark_broadcast.py | 3 +- src/frequenz/channels/__init__.py | 91 +++----- src/frequenz/channels/_bidirectional.py | 205 ----------------- src/frequenz/channels/_exceptions.py | 69 +----- .../{_base_classes.py => _receiver.py} | 98 +++++--- src/frequenz/channels/{util => }/_select.py | 45 ++-- src/frequenz/channels/_sender.py | 45 ++++ .../channels/{_anycast.py => anycast.py} | 53 +++-- src/frequenz/channels/bidirectional.py | 209 ++++++++++++++++++ .../channels/{_broadcast.py => broadcast.py} | 79 ++++--- .../channels/{util/_event.py => event.py} | 20 +- .../_file_watcher.py => file_watcher.py} | 47 ++-- .../channels/{util/_merge.py => merge.py} | 17 +- .../{util/_merge_named.py => merge_named.py} | 18 +- .../channels/{util/_timer.py => timer.py} | 52 +++-- src/frequenz/channels/util/__init__.py | 66 ------ tests/test_anycast.py | 2 +- tests/test_bidirectional.py | 6 +- tests/test_broadcast.py | 5 +- tests/{utils => }/test_event.py | 2 +- tests/{utils => }/test_file_watcher.py | 16 +- tests/{utils => }/test_integration.py | 16 +- tests/test_merge.py | 5 +- tests/test_mergenamed.py | 5 +- tests/{utils => }/test_select.py | 3 +- tests/{utils => }/test_select_integration.py | 7 +- tests/{utils => }/test_timer.py | 2 +- tests/utils/__init__.py | 4 - 30 files changed, 562 insertions(+), 649 deletions(-) delete mode 100644 src/frequenz/channels/_bidirectional.py rename src/frequenz/channels/{_base_classes.py => _receiver.py} (75%) rename src/frequenz/channels/{util => }/_select.py (90%) create mode 100644 src/frequenz/channels/_sender.py rename src/frequenz/channels/{_anycast.py => anycast.py} (89%) create mode 100644 src/frequenz/channels/bidirectional.py rename src/frequenz/channels/{_broadcast.py => broadcast.py} (90%) rename src/frequenz/channels/{util/_event.py => event.py} (88%) rename src/frequenz/channels/{util/_file_watcher.py => file_watcher.py} (81%) rename src/frequenz/channels/{util/_merge.py => merge.py} (93%) rename src/frequenz/channels/{util/_merge_named.py => merge_named.py} (91%) rename src/frequenz/channels/{util/_timer.py => timer.py} (94%) delete mode 100644 src/frequenz/channels/util/__init__.py rename tests/{utils => }/test_event.py (96%) rename tests/{utils => }/test_file_watcher.py (85%) rename tests/{utils => }/test_integration.py (87%) rename tests/{utils => }/test_select.py (93%) rename tests/{utils => }/test_select_integration.py (99%) rename tests/{utils => }/test_timer.py (99%) delete mode 100644 tests/utils/__init__.py diff --git a/.github/labeler.yml b/.github/labeler.yml index edfdd7a2..8923cb0e 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -6,7 +6,7 @@ # For more details on the configuration please see: # https://github.com/marketplace/actions/labeler -"part:docs": +"part:docs": - "**/*.md" - "docs/**" - "examples/**" @@ -31,14 +31,16 @@ - noxfile.py "part:channels": - - any: - - "src/frequenz/channels/**" - - "!src/frequenz/channels/util/**" + - "src/frequenz/channels/anycast.py" + - "src/frequenz/channels/bidirectional.py" + - "src/frequenz/channels/broadcast.py" "part:receivers": - - any: - - "src/frequenz/channels/util/**" - - "!src/frequenz/channels/util/_select.py" + - "src/frequenz/channels/event.py" + - "src/frequenz/channels/file_watcher.py" + - "src/frequenz/channels/merge.py" + - "src/frequenz/channels/merge_named.py" + - "src/frequenz/channels/timer.py" "part:select": - - "src/frequenz/channels/util/_select.py" + - "src/frequenz/channels/_select.py" diff --git a/benchmarks/benchmark_anycast.py b/benchmarks/benchmark_anycast.py index e468f1b8..462e1681 100644 --- a/benchmarks/benchmark_anycast.py +++ b/benchmarks/benchmark_anycast.py @@ -9,7 +9,8 @@ from collections.abc import Coroutine from typing import Any -from frequenz.channels import Anycast, Receiver, Sender +from frequenz.channels import Receiver, Sender +from frequenz.channels.anycast import Anycast async def send_msg(num_messages: int, chan: Sender[int]) -> None: diff --git a/benchmarks/benchmark_broadcast.py b/benchmarks/benchmark_broadcast.py index 08181dfc..b088ace4 100644 --- a/benchmarks/benchmark_broadcast.py +++ b/benchmarks/benchmark_broadcast.py @@ -10,7 +10,8 @@ from functools import partial from typing import Any -from frequenz.channels import Broadcast, Receiver, Sender +from frequenz.channels import Receiver, Sender +from frequenz.channels.broadcast import Broadcast async def component_sender(num_messages: int, chan: Sender[int]) -> None: diff --git a/src/frequenz/channels/__init__.py b/src/frequenz/channels/__init__.py index 3489452b..e351c30d 100644 --- a/src/frequenz/channels/__init__.py +++ b/src/frequenz/channels/__init__.py @@ -6,80 +6,50 @@ This package contains [channel](https://en.wikipedia.org/wiki/Channel_(programming)) implementations. -Channels: - -* [Anycast][frequenz.channels.Anycast]: A channel that supports multiple - senders and multiple receivers. A message sent through a sender will be - received by exactly one receiver. - -* [Bidirectional][frequenz.channels.Bidirectional]: A channel providing - a `client` and a `service` handle to send and receive bidirectionally. - -* [Broadcast][frequenz.channels.Broadcast]: A channel to broadcast messages - from multiple senders to multiple receivers. Each message sent through any of - the senders is received by all of the receivers. - -Other base classes: - -* [Peekable][frequenz.channels.Peekable]: An object to allow users to get - a peek at the latest value in the channel, without consuming anything. - -* [Receiver][frequenz.channels.Receiver]: An object that can wait for and - consume messages from a channel. +Main base classes and functions: * [Sender][frequenz.channels.Sender]: An object that can send messages to a channel. -Utilities: - -* [util][frequenz.channels.util]: A module with utilities, like special - receivers that implement timers, file watchers, merge receivers, or wait for - messages in multiple channels. - -Exception classes: - -* [Error][frequenz.channels.Error]: Base class for all errors in this - library. - -* [ChannelError][frequenz.channels.ChannelError]: Base class for all errors - related to channels. +* [Receiver][frequenz.channels.Receiver]: An object that can wait for and + consume messages from a channel. -* [ChannelClosedError][frequenz.channels.ChannelClosedError]: Error raised when - trying to operate (send, receive, etc.) through a closed channel. +* [selected()][frequenz.channels.select]: A function to wait on multiple + receivers at once. -* [SenderError][frequenz.channels.SenderError]: Base class for all errors - related to senders. +Channels: -* [ReceiverError][frequenz.channels.ReceiverError]: Base class for all errors - related to receivers. +* [Anycast][frequenz.channels.anycast.Anycast]: A channel that supports multiple + senders and multiple receivers. A message sent through a sender will be + received by exactly one receiver. -* [ReceiverStoppedError][frequenz.channels.ReceiverStoppedError]: A receiver - stopped producing messages. +* [Bidirectional][frequenz.channels.bidirectional.Bidirectional]: A channel providing + a `client` and a `service` handle to send and receive bidirectionally. -* [ReceiverInvalidatedError][frequenz.channels.ReceiverInvalidatedError]: - A receiver is not longer valid (for example if it was converted into - a peekable. +* [Broadcast][frequenz.channels.broadcast.Broadcast]: A channel to broadcast messages + from multiple senders to multiple receivers. Each message sent through any of + the senders is received by all of the receivers. """ -from . import util -from ._anycast import Anycast -from ._base_classes import Peekable, Receiver, Sender -from ._bidirectional import Bidirectional -from ._broadcast import Broadcast -from ._exceptions import ( - ChannelClosedError, - ChannelError, - Error, +from ._exceptions import ChannelClosedError, ChannelError, Error +from ._receiver import ( + Peekable, + Receiver, ReceiverError, ReceiverInvalidatedError, ReceiverStoppedError, - SenderError, ) +from ._select import ( + Selected, + SelectError, + SelectErrorGroup, + UnhandledSelectedError, + select, + selected_from, +) +from ._sender import Sender, SenderError __all__ = [ - "Anycast", - "Bidirectional", - "Broadcast", "ChannelClosedError", "ChannelError", "Error", @@ -88,7 +58,12 @@ "ReceiverError", "ReceiverInvalidatedError", "ReceiverStoppedError", + "SelectError", + "SelectErrorGroup", + "Selected", "Sender", "SenderError", - "util", + "UnhandledSelectedError", + "select", + "selected_from", ] diff --git a/src/frequenz/channels/_bidirectional.py b/src/frequenz/channels/_bidirectional.py deleted file mode 100644 index 7011ebc3..00000000 --- a/src/frequenz/channels/_bidirectional.py +++ /dev/null @@ -1,205 +0,0 @@ -# License: MIT -# Copyright © 2022 Frequenz Energy-as-a-Service GmbH - -"""An abstraction to provide bi-directional communication between actors.""" - -from __future__ import annotations - -from typing import Generic, TypeVar - -from ._base_classes import Receiver, Sender, T, U -from ._broadcast import Broadcast -from ._exceptions import ChannelError, ReceiverError, SenderError - -V = TypeVar("V") -W = TypeVar("W") - - -class Bidirectional(Generic[T, U]): - """A wrapper class for simulating bidirectional channels.""" - - class Handle(Sender[V], Receiver[W]): - """A handle to a [Bidirectional][frequenz.channels.Bidirectional] instance. - - It can be used to send/receive values between the client and service. - """ - - def __init__( - self, - channel: Bidirectional[V, W] | Bidirectional[W, V], - sender: Sender[V], - receiver: Receiver[W], - ) -> None: - """Create a `Bidirectional.Handle` instance. - - Args: - channel: The underlying channel. - sender: A sender to send values with. - receiver: A receiver to receive values from. - """ - self._chan: Bidirectional[V, W] | Bidirectional[W, V] = channel - """The underlying channel.""" - - self._sender: Sender[V] = sender - """The sender to send values with.""" - - self._receiver: Receiver[W] = receiver - """The receiver to receive values from.""" - - async def send(self, msg: V) -> None: - """Send a value to the other side. - - Args: - msg: The value to send. - - Raises: - SenderError: if the underlying channel was closed. - A [ChannelClosedError][frequenz.channels.ChannelClosedError] - is set as the cause. - """ - try: - await self._sender.send(msg) - except SenderError as err: - # If this comes from a channel error, then we inject another - # ChannelError having the information about the Bidirectional - # channel to hide (at least partially) the underlying - # Broadcast channels we use. - if isinstance(err.__cause__, ChannelError): - this_chan_error = ChannelError( - f"Error in the underlying channel {err.__cause__.channel}: {err.__cause__}", - self._chan, # pylint: disable=protected-access - ) - this_chan_error.__cause__ = err.__cause__ - err.__cause__ = this_chan_error - raise err - - async def ready(self) -> bool: - """Wait until the receiver is ready with a value or an error. - - Once a call to `ready()` has finished, the value should be read with - a call to `consume()` (`receive()` or iterated over). The receiver will - remain ready (this method will return immediately) until it is - consumed. - - Returns: - Whether the receiver is still active. - """ - return await self._receiver.ready() # pylint: disable=protected-access - - def consume(self) -> W: - """Return the latest value once `_ready` is complete. - - Returns: - The next value that was received. - - Raises: - ReceiverStoppedError: if there is some problem with the receiver. - ReceiverError: if there is some problem with the receiver. - """ - try: - return self._receiver.consume() # pylint: disable=protected-access - except ReceiverError as err: - # If this comes from a channel error, then we inject another - # ChannelError having the information about the Bidirectional - # channel to hide (at least partially) the underlying - # Broadcast channels we use. - if isinstance(err.__cause__, ChannelError): - this_chan_error = ChannelError( - f"Error in the underlying channel {err.__cause__.channel}: {err.__cause__}", - self._chan, # pylint: disable=protected-access - ) - this_chan_error.__cause__ = err.__cause__ - err.__cause__ = this_chan_error - raise err - - def __str__(self) -> str: - """Return a string representation of this handle.""" - return f"{type(self).__name__}:{self._chan}" - - def __repr__(self) -> str: - """Return a string representation of this handle.""" - return ( - f"{type(self).__name__}(channel={self._chan!r}, " - f"sender={self._sender!r}, receiver={self._receiver!r})" - ) - - def __init__(self, *, name: str | None = None) -> None: - """Create a `Bidirectional` instance. - - Args: - name: A name for the client, used to name the channels. - """ - self._name: str = f"{id(self):_}" if name is None else name - """The name for the client, used to name the channels.""" - - self._request_channel: Broadcast[T] = Broadcast(name=f"{self._name}:request") - """The channel to send requests.""" - - self._response_channel: Broadcast[U] = Broadcast(name=f"{self._name}:response") - """The channel to send responses.""" - - self._client_handle: Bidirectional.Handle[T, U] = Bidirectional.Handle( - self, - self._request_channel.new_sender(), - self._response_channel.new_receiver(), - ) - """The handle for the client side to send/receive values.""" - - self._service_handle: Bidirectional.Handle[U, T] = Bidirectional.Handle( - self, - self._response_channel.new_sender(), - self._request_channel.new_receiver(), - ) - """The handle for the service side to send/receive values.""" - - @property - def name(self) -> str: - """The name of this channel. - - This is for debugging purposes, it will be shown in the string representation - of this channel. - """ - return self._name - - @property - def is_closed(self) -> bool: - """Whether this channel is closed. - - Any further attempts to use this channel after it is closed will result in an - exception. - - As long as there is a way to send or receive data, the channel is considered - open, even if the other side is closed, so this returns `False` if only both - underlying channels are closed. - """ - return self._request_channel.is_closed and self._response_channel.is_closed - - @property - def client_handle(self) -> Bidirectional.Handle[T, U]: - """Get a `Handle` for the client side to use. - - Returns: - Object to send/receive messages with. - """ - return self._client_handle - - @property - def service_handle(self) -> Bidirectional.Handle[U, T]: - """Get a `Handle` for the service side to use. - - Returns: - Object to send/receive messages with. - """ - return self._service_handle - - def __str__(self) -> str: - """Return a string representation of this channel.""" - return f"{type(self).__name__}:{self._name}" - - def __repr__(self) -> str: - """Return a string representation of this channel.""" - return ( - f"{type(self).__name__}(name={self._name!r}):<" - f"request_channel={self._request_channel!r}, " - f"response_channel={self._response_channel!r}>" - ) diff --git a/src/frequenz/channels/_exceptions.py b/src/frequenz/channels/_exceptions.py index 921c6237..dbfa032f 100644 --- a/src/frequenz/channels/_exceptions.py +++ b/src/frequenz/channels/_exceptions.py @@ -3,14 +3,7 @@ """Exception classes.""" -from __future__ import annotations - -from typing import TYPE_CHECKING, Any, Generic, TypeVar - -if TYPE_CHECKING: - from . import _base_classes - -T = TypeVar("T") +from typing import Any class Error(RuntimeError): @@ -56,63 +49,3 @@ def __init__(self, channel: Any): channel: A reference to the channel that was closed. """ super().__init__(f"Channel {channel} was closed", channel) - - -class SenderError(Error, Generic[T]): - """An error produced in a [Sender][frequenz.channels.Sender]. - - All exceptions generated by senders inherit from this exception. - """ - - def __init__(self, message: str, sender: _base_classes.Sender[T]): - """Create an instance. - - Args: - message: An error message. - sender: The [Sender][frequenz.channels.Sender] where the error - happened. - """ - super().__init__(message) - self.sender: _base_classes.Sender[T] = sender - """The sender where the error happened.""" - - -class ReceiverError(Error, Generic[T]): - """An error produced in a [Receiver][frequenz.channels.Receiver]. - - All exceptions generated by receivers inherit from this exception. - """ - - def __init__(self, message: str, receiver: _base_classes.Receiver[T]): - """Create an instance. - - Args: - message: An error message. - receiver: The [Receiver][frequenz.channels.Receiver] where the - error happened. - """ - super().__init__(message) - self.receiver: _base_classes.Receiver[T] = receiver - """The receiver where the error happened.""" - - -class ReceiverStoppedError(ReceiverError[T]): - """The [Receiver][frequenz.channels.Receiver] stopped producing messages.""" - - def __init__(self, receiver: _base_classes.Receiver[T]): - """Create an instance. - - Args: - receiver: The [Receiver][frequenz.channels.Receiver] where the - error happened. - """ - super().__init__(f"Receiver {receiver} was stopped", receiver) - - -class ReceiverInvalidatedError(ReceiverError[T]): - """The [Receiver][frequenz.channels.Receiver] was invalidated. - - This happens when the Receiver is converted - [into][frequenz.channels.Receiver.into_peekable] - a [Peekable][frequenz.channels.Peekable]. - """ diff --git a/src/frequenz/channels/_base_classes.py b/src/frequenz/channels/_receiver.py similarity index 75% rename from src/frequenz/channels/_base_classes.py rename to src/frequenz/channels/_receiver.py index 55a13644..b25020db 100644 --- a/src/frequenz/channels/_base_classes.py +++ b/src/frequenz/channels/_receiver.py @@ -7,33 +7,36 @@ from abc import ABC, abstractmethod from collections.abc import Callable -from typing import Generic, TypeVar +from typing import Generic, Self, TypeVar -from ._exceptions import ReceiverStoppedError +from ._exceptions import Error -T = TypeVar("T") -U = TypeVar("U") +_T = TypeVar("_T") +_U = TypeVar("_U") -class Sender(ABC, Generic[T]): - """A channel Sender.""" +class Peekable(ABC, Generic[_T]): + """A channel peekable. - @abstractmethod - async def send(self, msg: T) -> None: - """Send a message to the channel. + A Peekable provides a [peek()][frequenz.channels.Peekable] method that + allows the user to get a peek at the latest value in the channel, without + consuming anything. + """ - Args: - msg: The message to be sent. + @abstractmethod + def peek(self) -> _T | None: + """Return the latest value that was sent to the channel. - Raises: - SenderError: if there was an error sending the message. + Returns: + The latest value received by the channel, and `None`, if nothing + has been sent to the channel yet. """ -class Receiver(ABC, Generic[T]): +class Receiver(ABC, Generic[_T]): """A channel Receiver.""" - async def __anext__(self) -> T: + async def __anext__(self) -> _T: """Await the next value in the async iteration over received values. Returns: @@ -63,7 +66,7 @@ async def ready(self) -> bool: """ @abstractmethod - def consume(self) -> T: + def consume(self) -> _T: """Return the latest value once `ready()` is complete. `ready()` must be called before each call to `consume()`. @@ -76,7 +79,7 @@ def consume(self) -> T: ReceiverError: if there is some problem with the receiver. """ - def __aiter__(self) -> Receiver[T]: + def __aiter__(self) -> Self: """Initialize the async iterator over received values. Returns: @@ -84,7 +87,7 @@ def __aiter__(self) -> Receiver[T]: """ return self - async def receive(self) -> T: + async def receive(self) -> _T: """Receive a message from the channel. Returns: @@ -111,7 +114,7 @@ async def receive(self) -> T: raise ReceiverStoppedError(self) from exc return received - def map(self, call: Callable[[T], U]) -> Receiver[U]: + def map(self, call: Callable[[_T], _U]) -> Receiver[_U]: """Return a receiver with `call` applied on incoming messages. Args: @@ -122,7 +125,7 @@ def map(self, call: Callable[[T], U]) -> Receiver[U]: """ return _Map(self, call) - def into_peekable(self) -> Peekable[T]: + def into_peekable(self) -> Peekable[_T]: """Convert the `Receiver` implementation into a `Peekable`. Once this function has been called, the receiver will no longer be @@ -139,25 +142,48 @@ def into_peekable(self) -> Peekable[T]: raise NotImplementedError("This receiver does not implement `into_peekable`") -class Peekable(ABC, Generic[T]): - """A channel peekable. +class ReceiverError(Error, Generic[_T]): + """An error produced in a [Receiver][frequenz.channels.Receiver]. - A Peekable provides a [peek()][frequenz.channels.Peekable] method that - allows the user to get a peek at the latest value in the channel, without - consuming anything. + All exceptions generated by receivers inherit from this exception. """ - @abstractmethod - def peek(self) -> T | None: - """Return the latest value that was sent to the channel. + def __init__(self, message: str, receiver: Receiver[_T]): + """Create an instance. - Returns: - The latest value received by the channel, and `None`, if nothing - has been sent to the channel yet. + Args: + message: An error message. + receiver: The [Receiver][frequenz.channels.Receiver] where the + error happened. + """ + super().__init__(message) + self.receiver: Receiver[_T] = receiver + """The receiver where the error happened.""" + + +class ReceiverStoppedError(ReceiverError[_T]): + """The [Receiver][frequenz.channels.Receiver] stopped producing messages.""" + + def __init__(self, receiver: Receiver[_T]): + """Create an instance. + + Args: + receiver: The [Receiver][frequenz.channels.Receiver] where the + error happened. """ + super().__init__(f"Receiver {receiver} was stopped", receiver) + + +class ReceiverInvalidatedError(ReceiverError[_T]): + """The [Receiver][frequenz.channels.Receiver] was invalidated. + + This happens when the Receiver is converted + [into][frequenz.channels.Receiver.into_peekable] + a [Peekable][frequenz.channels.Peekable]. + """ -class _Map(Receiver[U], Generic[T, U]): +class _Map(Receiver[_U], Generic[_T, _U]): """Apply a transform function on a channel receiver. Has two generic types: @@ -166,17 +192,17 @@ class _Map(Receiver[U], Generic[T, U]): - The output type: return type of the transform method. """ - def __init__(self, receiver: Receiver[T], transform: Callable[[T], U]) -> None: + def __init__(self, receiver: Receiver[_T], transform: Callable[[_T], _U]) -> None: """Create a `Transform` instance. Args: receiver: The input receiver. transform: The function to run on the input data. """ - self._receiver: Receiver[T] = receiver + self._receiver: Receiver[_T] = receiver """The input receiver.""" - self._transform: Callable[[T], U] = transform + self._transform: Callable[[_T], _U] = transform """The function to run on the input data.""" async def ready(self) -> bool: @@ -194,7 +220,7 @@ async def ready(self) -> bool: # We need a noqa here because the docs have a Raises section but the code doesn't # explicitly raise anything. - def consume(self) -> U: # noqa: DOC502 + def consume(self) -> _U: # noqa: DOC502 """Return a transformed value once `ready()` is complete. Returns: diff --git a/src/frequenz/channels/util/_select.py b/src/frequenz/channels/_select.py similarity index 90% rename from src/frequenz/channels/util/_select.py rename to src/frequenz/channels/_select.py index ac6e59ad..00f594ce 100644 --- a/src/frequenz/channels/util/_select.py +++ b/src/frequenz/channels/_select.py @@ -12,24 +12,23 @@ from collections.abc import AsyncIterator from typing import Any, Generic, TypeGuard, TypeVar -from .._base_classes import Receiver -from .._exceptions import ReceiverStoppedError +from ._receiver import Receiver, ReceiverStoppedError _T = TypeVar("_T") class Selected(Generic[_T]): - """A result of a [`select()`][frequenz.channels.util.select] iteration. + """A result of a [`select()`][frequenz.channels.select] iteration. The selected receiver is consumed immediately and the received value is stored in the instance, unless there was an exception while receiving the value, in which case the exception is stored instead. `Selected` instances should be used in conjunction with the - [`selected_from()`][frequenz.channels.util.selected_from] function to determine + [`selected_from()`][frequenz.channels.selected_from] function to determine which receiver was selected. - Please see [`select()`][frequenz.channels.util.select] for an example. + Please see [`select()`][frequenz.channels.select] for an example. """ class _EmptyResult: @@ -46,9 +45,9 @@ def __init__(self, receiver: Receiver[_T]) -> None: The receiver is consumed immediately when creating the instance and the received value is stored in the instance for later use as - [`value`][frequenz.channels.util.Selected.value]. If there was an exception + [`value`][frequenz.channels.Selected.value]. If there was an exception while receiving the value, then the exception is stored in the instance instead - (as [`exception`][frequenz.channels.util.Selected.exception]). + (as [`exception`][frequenz.channels.Selected.exception]). Args: receiver: The receiver that was selected. @@ -140,16 +139,16 @@ def __repr__(self) -> str: def selected_from( selected: Selected[Any], receiver: Receiver[_T] ) -> TypeGuard[Selected[_T]]: - """Check if the given receiver was selected by [`select()`][frequenz.channels.util.select]. + """Check if the given receiver was selected by [`select()`][frequenz.channels.select]. This function is used in conjunction with the - [`Selected`][frequenz.channels.util.Selected] class to determine which receiver was + [`Selected`][frequenz.channels.Selected] class to determine which receiver was selected in `select()` iteration. It also works as a [type guard][typing.TypeGuard] to narrow the type of the `Selected` instance to the type of the receiver. - Please see [`select()`][frequenz.channels.util.select] for an example. + Please see [`select()`][frequenz.channels.select] for an example. Args: selected: The result of a `select()` iteration. @@ -164,21 +163,21 @@ def selected_from( class SelectError(BaseException): - """A base exception for [`select()`][frequenz.channels.util.select]. + """A base exception for [`select()`][frequenz.channels.select]. This exception is raised when a `select()` iteration fails. It is raised as a single exception when one receiver fails during normal operation (while calling `ready()` for example). It is raised as a group exception - ([`SelectErrorGroup`][frequenz.channels.util.SelectErrorGroup]) when a `select` loop + ([`SelectErrorGroup`][frequenz.channels.SelectErrorGroup]) when a `select` loop is cleaning up after it's done. """ class UnhandledSelectedError(SelectError, Generic[_T]): - """A receiver was not handled in a [`select()`][frequenz.channels.util.select] loop. + """A receiver was not handled in a [`select()`][frequenz.channels.select] loop. This exception is raised when a `select()` iteration finishes without a call to - [`selected_from()`][frequenz.channels.util.selected_from] for the selected receiver. + [`selected_from()`][frequenz.channels.selected_from] for the selected receiver. """ def __init__(self, selected: Selected[_T]) -> None: @@ -194,7 +193,7 @@ def __init__(self, selected: Selected[_T]) -> None: class SelectErrorGroup(BaseExceptionGroup[BaseException], SelectError): - """An exception group for [`select()`][frequenz.channels.util.select] operation. + """An exception group for [`select()`][frequenz.channels.select] operation. This exception group is raised when a `select()` loops fails while cleaning up running tests to check for ready receivers. @@ -243,8 +242,8 @@ async def select(*receivers: Receiver[Any]) -> AsyncIterator[Selected[Any]]: This function is used to iterate over the values of all receivers as they receive new values. It is used in conjunction with the - [`Selected`][frequenz.channels.util.Selected] class and the - [`selected_from()`][frequenz.channels.util.selected_from] function to determine + [`Selected`][frequenz.channels.Selected] class and the + [`selected_from()`][frequenz.channels.selected_from] function to determine which function to determine which receiver was selected in a select operation. An exhaustiveness check is performed at runtime to make sure all selected receivers @@ -258,10 +257,10 @@ async def select(*receivers: Receiver[Any]) -> AsyncIterator[Selected[Any]]: receivers from a select loop, there are a few alternatives. Depending on your use case, one or the other could work better for you: - * Use [`Merge`][frequenz.channels.util.Merge] or - [`MergeNamed`][frequenz.channels.util.MergeNamed]: this is useful when you - have and unknown number of receivers of the same type that can be handled as - a group. + * Use [`Merge`][frequenz.channels.merge.Merge] or + [`MergeNamed`][frequenz.channels.merge_named.MergeNamed]: this is useful when + you have and unknown number of receivers of the same type that can be handled + as a group. * Use tasks to manage each receiver individually: this is better if there are no relationships between the receivers. * Break the `select()` loop and start a new one with the new set of receivers @@ -273,8 +272,8 @@ async def select(*receivers: Receiver[Any]) -> AsyncIterator[Selected[Any]]: import datetime from typing import assert_never - from frequenz.channels import ReceiverStoppedError - from frequenz.channels.util import select, selected_from, Timer + from frequenz.channels import ReceiverStoppedError, select, selected_from + from frequenz.channels.timer import Timer timer1 = Timer.periodic(datetime.timedelta(seconds=1)) timer2 = Timer.timeout(datetime.timedelta(seconds=0.5)) diff --git a/src/frequenz/channels/_sender.py b/src/frequenz/channels/_sender.py new file mode 100644 index 00000000..a6d61453 --- /dev/null +++ b/src/frequenz/channels/_sender.py @@ -0,0 +1,45 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""Base classes for Channel Sender and Receiver.""" + +from abc import ABC, abstractmethod +from typing import Generic, TypeVar + +from ._exceptions import Error + +_T = TypeVar("_T") + + +class Sender(ABC, Generic[_T]): + """A channel Sender.""" + + @abstractmethod + async def send(self, msg: _T) -> None: + """Send a message to the channel. + + Args: + msg: The message to be sent. + + Raises: + SenderError: if there was an error sending the message. + """ + + +class SenderError(Error, Generic[_T]): + """An error produced in a [Sender][frequenz.channels.Sender]. + + All exceptions generated by senders inherit from this exception. + """ + + def __init__(self, message: str, sender: Sender[_T]): + """Create an instance. + + Args: + message: An error message. + sender: The [Sender][frequenz.channels.Sender] where the error + happened. + """ + super().__init__(message) + self.sender: Sender[_T] = sender + """The sender where the error happened.""" diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/anycast.py similarity index 89% rename from src/frequenz/channels/_anycast.py rename to src/frequenz/channels/anycast.py index f960541f..c69cae8b 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/anycast.py @@ -3,19 +3,18 @@ """A channel for sending data across async tasks.""" -from __future__ import annotations - from asyncio import Condition from collections import deque -from typing import Generic +from typing import Generic, TypeVar + +from ._exceptions import ChannelClosedError +from ._receiver import Receiver, ReceiverStoppedError +from ._sender import Sender, SenderError -from ._base_classes import Receiver as BaseReceiver -from ._base_classes import Sender as BaseSender -from ._base_classes import T -from ._exceptions import ChannelClosedError, ReceiverStoppedError, SenderError +_T = TypeVar("_T") -class Anycast(Generic[T]): +class Anycast(Generic[_T]): """A channel for sending data across async tasks. Anycast channels support multiple senders and multiple receivers. A message sent @@ -28,15 +27,15 @@ class Anycast(Generic[T]): creation time via the `limit` argument. In cases where each message need to be received by every receiver, a - [Broadcast][frequenz.channels.Broadcast] channel may be used. + [Broadcast][frequenz.channels.broadcast.Broadcast] channel may be used. Uses an [deque][collections.deque] internally, so Anycast channels are not thread-safe. When there are multiple channel receivers, they can be awaited - simultaneously using [select][frequenz.channels.util.select], - [Merge][frequenz.channels.util.Merge] or - [MergeNamed][frequenz.channels.util.MergeNamed]. + simultaneously using [select][frequenz.channels.select], + [Merge][frequenz.channels.merge.Merge] or + [MergeNamed][frequenz.channels.merge_named.MergeNamed]. Example: ``` python @@ -85,7 +84,7 @@ def __init__(self, *, name: str | None = None, limit: int = 10) -> None: of the channel. """ - self._deque: deque[T] = deque(maxlen=limit) + self._deque: deque[_T] = deque(maxlen=limit) """The channel's buffer.""" self._send_cv: Condition = Condition() @@ -155,21 +154,21 @@ async def close(self) -> None: async with self._recv_cv: self._recv_cv.notify_all() - def new_sender(self) -> Sender[T]: + def new_sender(self) -> Sender[_T]: """Create a new sender. Returns: A Sender instance attached to the Anycast channel. """ - return Sender(self) + return _Sender(self) - def new_receiver(self) -> Receiver[T]: + def new_receiver(self) -> Receiver[_T]: """Create a new receiver. Returns: A Receiver instance attached to the Anycast channel. """ - return Receiver(self) + return _Receiver(self) def __str__(self) -> str: """Return a string representation of this channel.""" @@ -183,23 +182,23 @@ def __repr__(self) -> str: ) -class Sender(BaseSender[T]): +class _Sender(Sender[_T]): """A sender to send messages to an Anycast channel. Should not be created directly, but through the `Anycast.new_sender()` method. """ - def __init__(self, chan: Anycast[T]) -> None: + def __init__(self, chan: Anycast[_T]) -> None: """Create a channel sender. Args: chan: A reference to the channel that this sender belongs to. """ - self._chan: Anycast[T] = chan + self._chan: Anycast[_T] = chan """The channel that this sender belongs to.""" - async def send(self, msg: T) -> None: + async def send(self, msg: _T) -> None: """Send a message across the channel. To send, this method inserts the message into the Anycast channel's @@ -241,23 +240,23 @@ class _Empty: """A sentinel value to indicate that a value has not been set.""" -class Receiver(BaseReceiver[T]): +class _Receiver(Receiver[_T]): """A receiver to receive messages from an Anycast channel. Should not be created directly, but through the `Anycast.new_receiver()` method. """ - def __init__(self, chan: Anycast[T]) -> None: + def __init__(self, chan: Anycast[_T]) -> None: """Create a channel receiver. Args: chan: A reference to the channel that this receiver belongs to. """ - self._chan: Anycast[T] = chan + self._chan: Anycast[_T] = chan """The channel that this receiver belongs to.""" - self._next: T | type[_Empty] = _Empty + self._next: _T | type[_Empty] = _Empty async def ready(self) -> bool: """Wait until the receiver is ready with a value or an error. @@ -286,7 +285,7 @@ async def ready(self) -> bool: # pylint: enable=protected-access return True - def consume(self) -> T: + def consume(self) -> _T: """Return the latest value once `ready()` is complete. Returns: @@ -306,7 +305,7 @@ def consume(self) -> T: ), "`consume()` must be preceded by a call to `ready()`" # mypy doesn't understand that the assert above ensures that self._next is not # _Sentinel. So we have to use a type ignore here. - next_val: T = self._next # type: ignore[assignment] + next_val: _T = self._next # type: ignore[assignment] self._next = _Empty return next_val diff --git a/src/frequenz/channels/bidirectional.py b/src/frequenz/channels/bidirectional.py new file mode 100644 index 00000000..154b3cc6 --- /dev/null +++ b/src/frequenz/channels/bidirectional.py @@ -0,0 +1,209 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""An abstraction to provide bi-directional communication between actors.""" + +from __future__ import annotations + +from typing import Generic, TypeVar + +from ._exceptions import ChannelError +from ._receiver import Receiver, ReceiverError +from ._sender import Sender, SenderError +from .broadcast import Broadcast + +_T = TypeVar("_T") +_U = TypeVar("_U") +_V = TypeVar("_V") +_W = TypeVar("_W") + + +class Bidirectional(Generic[_T, _U]): + """A wrapper class for simulating bidirectional channels.""" + + def __init__(self, *, name: str | None = None) -> None: + """Create a `Bidirectional` instance. + + Args: + name: A name for the client, used to name the channels. + """ + self._name: str = f"{id(self):_}" if name is None else name + """The name for the client, used to name the channels.""" + + self._request_channel: Broadcast[_T] = Broadcast(name=f"{self._name}:request") + """The channel to send requests.""" + + self._response_channel: Broadcast[_U] = Broadcast(name=f"{self._name}:response") + """The channel to send responses.""" + + self._client_handle: Handle[_T, _U] = Handle( + self, + self._request_channel.new_sender(), + self._response_channel.new_receiver(), + ) + """The handle for the client side to send/receive values.""" + + self._service_handle: Handle[_U, _T] = Handle( + self, + self._response_channel.new_sender(), + self._request_channel.new_receiver(), + ) + """The handle for the service side to send/receive values.""" + + @property + def name(self) -> str: + """The name of this channel. + + This is for debugging purposes, it will be shown in the string representation + of this channel. + """ + return self._name + + @property + def is_closed(self) -> bool: + """Whether this channel is closed. + + Any further attempts to use this channel after it is closed will result in an + exception. + + As long as there is a way to send or receive data, the channel is considered + open, even if the other side is closed, so this returns `False` if only both + underlying channels are closed. + """ + return self._request_channel.is_closed and self._response_channel.is_closed + + @property + def client_handle(self) -> Handle[_T, _U]: + """Get a `Handle` for the client side to use. + + Returns: + Object to send/receive messages with. + """ + return self._client_handle + + @property + def service_handle(self) -> Handle[_U, _T]: + """Get a `Handle` for the service side to use. + + Returns: + Object to send/receive messages with. + """ + return self._service_handle + + def __str__(self) -> str: + """Return a string representation of this channel.""" + return f"{type(self).__name__}:{self._name}" + + def __repr__(self) -> str: + """Return a string representation of this channel.""" + return ( + f"{type(self).__name__}(name={self._name!r}):<" + f"request_channel={self._request_channel!r}, " + f"response_channel={self._response_channel!r}>" + ) + + +class Handle(Sender[_V], Receiver[_W]): + """A handle to a [Bidirectional][frequenz.channels.bidirectional.Bidirectional] instance. + + It can be used to send/receive values between the client and service. + """ + + def __init__( + self, + channel: Bidirectional[_V, _W] | Bidirectional[_W, _V], + sender: Sender[_V], + receiver: Receiver[_W], + ) -> None: + """Create a `Handle` instance. + + Args: + channel: The underlying channel. + sender: A sender to send values with. + receiver: A receiver to receive values from. + """ + self._chan: Bidirectional[_V, _W] | Bidirectional[_W, _V] = channel + """The underlying channel.""" + + self._sender: Sender[_V] = sender + """The sender to send values with.""" + + self._receiver: Receiver[_W] = receiver + """The receiver to receive values from.""" + + async def send(self, msg: _V) -> None: + """Send a value to the other side. + + Args: + msg: The value to send. + + Raises: + SenderError: if the underlying channel was closed. + A [ChannelClosedError][frequenz.channels.ChannelClosedError] + is set as the cause. + """ + try: + await self._sender.send(msg) + except SenderError as err: + # If this comes from a channel error, then we inject another + # ChannelError having the information about the Bidirectional + # channel to hide (at least partially) the underlying + # Broadcast channels we use. + if isinstance(err.__cause__, ChannelError): + this_chan_error = ChannelError( + f"Error in the underlying channel {err.__cause__.channel}: {err.__cause__}", + self._chan, # pylint: disable=protected-access + ) + this_chan_error.__cause__ = err.__cause__ + err.__cause__ = this_chan_error + raise err + + async def ready(self) -> bool: + """Wait until the receiver is ready with a value or an error. + + Once a call to `ready()` has finished, the value should be read with + a call to `consume()` (`receive()` or iterated over). The receiver will + remain ready (this method will return immediately) until it is + consumed. + + Returns: + Whether the receiver is still active. + """ + return await self._receiver.ready() # pylint: disable=protected-access + + def consume(self) -> _W: + """Return the latest value once `_ready` is complete. + + Returns: + The next value that was received. + + Raises: + ReceiverStoppedError: if there is some problem with the receiver. + ReceiverError: if there is some problem with the receiver. + """ + try: + return self._receiver.consume() # pylint: disable=protected-access + except ReceiverError as err: + # If this comes from a channel error, then we inject another + # ChannelError having the information about the Bidirectional + # channel to hide (at least partially) the underlying + # Broadcast channels we use. + if isinstance(err.__cause__, ChannelError): + this_chan_error = ChannelError( + f"Error in the underlying channel {err.__cause__.channel}: {err.__cause__}", + self._chan, # pylint: disable=protected-access + ) + this_chan_error.__cause__ = err.__cause__ + err.__cause__ = this_chan_error + raise err + + def __str__(self) -> str: + """Return a string representation of this handle.""" + return f"{type(self).__name__}:{self._chan}" + + def __repr__(self) -> str: + """Return a string representation of this handle.""" + return ( + f"{type(self).__name__}(channel={self._chan!r}, " + f"sender={self._sender!r}, receiver={self._receiver!r})" + ) diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/broadcast.py similarity index 90% rename from src/frequenz/channels/_broadcast.py rename to src/frequenz/channels/broadcast.py index c1d8304d..c37157fd 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/broadcast.py @@ -3,29 +3,28 @@ """A channel to broadcast messages to all receivers.""" -from __future__ import annotations - import logging import weakref from asyncio import Condition from collections import deque -from typing import Generic - -from ._base_classes import Peekable as BasePeekable -from ._base_classes import Receiver as BaseReceiver -from ._base_classes import Sender as BaseSender -from ._base_classes import T -from ._exceptions import ( - ChannelClosedError, +from typing import Generic, TypeVar + +from ._exceptions import ChannelClosedError +from ._receiver import ( + Peekable, + Receiver, ReceiverInvalidatedError, ReceiverStoppedError, - SenderError, ) +from ._sender import Sender, SenderError logger = logging.Logger(__name__) -class Broadcast(Generic[T]): +_T = TypeVar("_T") + + +class Broadcast(Generic[_T]): """A channel to broadcast messages to multiple receivers. `Broadcast` channels can have multiple senders and multiple receivers. Each @@ -37,9 +36,9 @@ class Broadcast(Generic[T]): are thread-safe. Because of this, `Broadcast` channels are thread-safe. When there are multiple channel receivers, they can be awaited - simultaneously using [select][frequenz.channels.util.select], - [Merge][frequenz.channels.util.Merge] or - [MergeNamed][frequenz.channels.util.MergeNamed]. + simultaneously using [select][frequenz.channels.select], + [Merge][frequenz.channels.merge.Merge] or + [MergeNamed][frequenz.channels.merge_named.MergeNamed]. Example: ``` python @@ -94,13 +93,13 @@ def __init__(self, *, name: str | None, resend_latest: bool = False) -> None: self._recv_cv: Condition = Condition() """The condition to wait for data in the channel's buffer.""" - self._receivers: dict[int, weakref.ReferenceType[Receiver[T]]] = {} + self._receivers: dict[int, weakref.ReferenceType[_Receiver[_T]]] = {} """The receivers attached to the channel, indexed by their hash().""" self._closed: bool = False """Whether the channel is closed.""" - self._latest: T | None = None + self._latest: _T | None = None """The latest value sent to the channel.""" self.resend_latest: bool = resend_latest @@ -125,7 +124,7 @@ def name(self) -> str: return self._name @property - def latest(self) -> T | None: + def latest(self) -> _T | None: """The latest value sent to the channel. Returns: @@ -158,15 +157,15 @@ async def close(self) -> None: async with self._recv_cv: self._recv_cv.notify_all() - def new_sender(self) -> Sender[T]: + def new_sender(self) -> Sender[_T]: """Create a new broadcast sender. Returns: A Sender instance attached to the broadcast channel. """ - return Sender(self) + return _Sender(self) - def new_receiver(self, *, name: str | None = None, limit: int = 50) -> Receiver[T]: + def new_receiver(self, *, name: str | None = None, limit: int = 50) -> Receiver[_T]: """Create a new broadcast receiver. Broadcast receivers have their own buffer, and when messages are not @@ -180,13 +179,13 @@ def new_receiver(self, *, name: str | None = None, limit: int = 50) -> Receiver[ Returns: A Receiver instance attached to the broadcast channel. """ - recv: Receiver[T] = Receiver(name, limit, self) + recv: _Receiver[_T] = _Receiver(name, limit, self) self._receivers[hash(recv)] = weakref.ref(recv) if self.resend_latest and self._latest is not None: recv.enqueue(self._latest) return recv - def new_peekable(self) -> Peekable[T]: + def new_peekable(self) -> Peekable[_T]: """Create a new Peekable for the broadcast channel. A Peekable provides a [peek()][frequenz.channels.Peekable.peek] method @@ -196,7 +195,7 @@ def new_peekable(self) -> Peekable[T]: Returns: A Peekable to peek into the broadcast channel with. """ - return Peekable(self) + return _Peekable(self) def __str__(self) -> str: """Return a string representation of this receiver.""" @@ -213,7 +212,7 @@ def __repr__(self) -> str: ) -class Sender(BaseSender[T]): +class _Sender(Sender[_T]): """A sender to send messages to the broadcast channel. Should not be created directly, but through the @@ -221,16 +220,16 @@ class Sender(BaseSender[T]): method. """ - def __init__(self, chan: Broadcast[T]) -> None: + def __init__(self, chan: Broadcast[_T]) -> None: """Create a Broadcast sender. Args: chan: A reference to the broadcast channel this sender belongs to. """ - self._chan: Broadcast[T] = chan + self._chan: Broadcast[_T] = chan """The broadcast channel this sender belongs to.""" - async def send(self, msg: T) -> None: + async def send(self, msg: _T) -> None: """Send a message to all broadcast receivers. Args: @@ -269,7 +268,7 @@ def __repr__(self) -> str: return f"{type(self).__name__}({self._chan!r})" -class Receiver(BaseReceiver[T]): +class _Receiver(Receiver[_T]): """A receiver to receive messages from the broadcast channel. Should not be created directly, but through the @@ -277,7 +276,7 @@ class Receiver(BaseReceiver[T]): method. """ - def __init__(self, name: str | None, limit: int, chan: Broadcast[T]) -> None: + def __init__(self, name: str | None, limit: int, chan: Broadcast[_T]) -> None: """Create a broadcast receiver. Broadcast receivers have their own buffer, and when messages are not @@ -299,10 +298,10 @@ def __init__(self, name: str | None, limit: int, chan: Broadcast[T]) -> None: Only used for debugging purposes. """ - self._chan: Broadcast[T] = chan + self._chan: Broadcast[_T] = chan """The broadcast channel that this receiver belongs to.""" - self._q: deque[T] = deque(maxlen=limit) + self._q: deque[_T] = deque(maxlen=limit) """The receiver's internal message queue.""" self._active: bool = True @@ -312,7 +311,7 @@ def __init__(self, name: str | None, limit: int, chan: Broadcast[T]) -> None: considered valid nor active. """ - def enqueue(self, msg: T) -> None: + def enqueue(self, msg: _T) -> None: """Put a message into this receiver's queue. To be called by broadcast senders. If the receiver's queue is already @@ -379,7 +378,7 @@ def _deactivate(self) -> None: del self._chan._receivers[_hash] # pylint: enable=protected-access - def consume(self) -> T: + def consume(self) -> _T: """Return the latest value once `ready` is complete. Returns: @@ -402,7 +401,7 @@ def consume(self) -> T: assert self._q, "`consume()` must be preceded by a call to `ready()`" return self._q.popleft() - def into_peekable(self) -> Peekable[T]: + def into_peekable(self) -> Peekable[_T]: """Convert the `Receiver` implementation into a `Peekable`. Once this function has been called, the receiver will no longer be @@ -413,7 +412,7 @@ def into_peekable(self) -> Peekable[T]: A `Peekable` instance. """ self._deactivate() - return Peekable(self._chan) + return _Peekable(self._chan) def __str__(self) -> str: """Return a string representation of this receiver.""" @@ -430,7 +429,7 @@ def __repr__(self) -> str: ) -class Peekable(BasePeekable[T]): +class _Peekable(Peekable[_T]): """A Peekable to peek into broadcast channels. A Peekable provides a [peek()][frequenz.channels.Peekable] method that @@ -438,16 +437,16 @@ class Peekable(BasePeekable[T]): consuming anything. """ - def __init__(self, chan: Broadcast[T]) -> None: + def __init__(self, chan: Broadcast[_T]) -> None: """Create a `Peekable` instance. Args: chan: The broadcast channel this Peekable will try to peek into. """ - self._chan: Broadcast[T] = chan + self._chan: Broadcast[_T] = chan """The broadcast channel this Peekable will try to peek into.""" - def peek(self) -> T | None: + def peek(self) -> _T | None: """Return the latest value that was sent to the channel. Returns: diff --git a/src/frequenz/channels/util/_event.py b/src/frequenz/channels/event.py similarity index 88% rename from src/frequenz/channels/util/_event.py rename to src/frequenz/channels/event.py index ca43e262..27e20054 100644 --- a/src/frequenz/channels/util/_event.py +++ b/src/frequenz/channels/event.py @@ -4,27 +4,27 @@ """A receiver that can be made ready through an event.""" -import asyncio as _asyncio +import asyncio -from frequenz.channels import _base_classes, _exceptions +from ._receiver import Receiver, ReceiverStoppedError -class Event(_base_classes.Receiver[None]): +class Event(Receiver[None]): """A receiver that can be made ready through an event. - The receiver (the [`ready()`][frequenz.channels.util.Event.ready] method) will wait - until [`set()`][frequenz.channels.util.Event.set] is called. At that point the + The receiver (the [`ready()`][frequenz.channels.event.Event.ready] method) will wait + until [`set()`][frequenz.channels.event.Event.set] is called. At that point the receiver will wait again after the event is [`consume()`][frequenz.channels.Receiver.consume]d. The receiver can be completely stopped by calling - [`stop()`][frequenz.channels.util.Event.stop]. + [`stop()`][frequenz.channels.event.Event.stop]. Example: ```python import asyncio - from frequenz.channels import Receiver - from frequenz.channels.util import Event, select, selected_from + from frequenz.channels import Receiver, select, selected_from + from frequenz.channels.event import Event other_receiver: Receiver[int] = ... exit_event = Event() @@ -53,7 +53,7 @@ def __init__(self, *, name: str | None = None) -> None: used. This is only for debugging purposes, it will be shown in the string representation of the receiver. """ - self._event: _asyncio.Event = _asyncio.Event() + self._event: asyncio.Event = asyncio.Event() """The event that is set when the receiver is ready.""" self._name: str = f"{id(self):_}" if name is None else name @@ -134,7 +134,7 @@ def consume(self) -> None: ReceiverStoppedError: If this receiver is stopped. """ if not self._is_set and self._is_stopped: - raise _exceptions.ReceiverStoppedError(self) + raise ReceiverStoppedError(self) assert self._is_set, "calls to `consume()` must be follow a call to `ready()`" diff --git a/src/frequenz/channels/util/_file_watcher.py b/src/frequenz/channels/file_watcher.py similarity index 81% rename from src/frequenz/channels/util/_file_watcher.py rename to src/frequenz/channels/file_watcher.py index 5291a0f9..64bc62e1 100644 --- a/src/frequenz/channels/util/_file_watcher.py +++ b/src/frequenz/channels/file_watcher.py @@ -3,8 +3,6 @@ """A Channel receiver for watching for new, modified or deleted files.""" -from __future__ import annotations - import asyncio import pathlib from collections import abc @@ -14,33 +12,34 @@ from watchfiles import Change, awatch from watchfiles.main import FileChange -from .._base_classes import Receiver -from .._exceptions import ReceiverStoppedError +from ._receiver import Receiver, ReceiverStoppedError -class FileWatcher(Receiver["FileWatcher.Event"]): - """A channel receiver that watches for file events.""" +class EventType(Enum): + """Available types of changes to watch for.""" + + CREATE = Change.added + """A new file was created.""" - class EventType(Enum): - """Available types of changes to watch for.""" + MODIFY = Change.modified + """An existing file was modified.""" - CREATE = Change.added - """A new file was created.""" + DELETE = Change.deleted + """An existing file was deleted.""" - MODIFY = Change.modified - """An existing file was modified.""" - DELETE = Change.deleted - """An existing file was deleted.""" +@dataclass(frozen=True) +class Event: + """A file change event.""" - @dataclass(frozen=True) - class Event: - """A file change event.""" + type: EventType + """The type of change that was observed.""" + path: pathlib.Path + """The path where the change was observed.""" - type: FileWatcher.EventType - """The type of change that was observed.""" - path: pathlib.Path - """The path where the change was observed.""" + +class FileWatcher(Receiver[Event]): + """A channel receiver that watches for file events.""" def __init__( self, @@ -54,7 +53,7 @@ def __init__( event_types: Types of events to watch for. Defaults to watch for all event types. """ - self.event_types: frozenset[FileWatcher.EventType] = frozenset(event_types) + self.event_types: frozenset[EventType] = frozenset(event_types) """The types of events to watch for.""" self._stop_event: asyncio.Event = asyncio.Event() @@ -134,9 +133,7 @@ def consume(self) -> Event: assert self._changes, "`consume()` must be preceded by a call to `ready()`" # Tuple of (Change, path) returned by watchfiles change, path_str = self._changes.pop() - return FileWatcher.Event( - type=FileWatcher.EventType(change), path=pathlib.Path(path_str) - ) + return Event(type=EventType(change), path=pathlib.Path(path_str)) def __str__(self) -> str: """Return a string representation of this receiver.""" diff --git a/src/frequenz/channels/util/_merge.py b/src/frequenz/channels/merge.py similarity index 93% rename from src/frequenz/channels/util/_merge.py rename to src/frequenz/channels/merge.py index 5baeea31..00461ed9 100644 --- a/src/frequenz/channels/util/_merge.py +++ b/src/frequenz/channels/merge.py @@ -6,13 +6,14 @@ import asyncio import itertools from collections import deque -from typing import Any +from typing import Any, TypeVar -from .._base_classes import Receiver, T -from .._exceptions import ReceiverStoppedError +from ._receiver import Receiver, ReceiverStoppedError +_T = TypeVar("_T") -class Merge(Receiver[T]): + +class Merge(Receiver[_T]): """Merge messages coming from multiple channels into a single stream. Example: @@ -38,20 +39,20 @@ class Merge(Receiver[T]): `self.stop()` method. This will cleanup any internal pending async tasks. """ - def __init__(self, *args: Receiver[T]) -> None: + def __init__(self, *args: Receiver[_T]) -> None: """Create a `Merge` instance. Args: *args: sequence of channel receivers. """ - self._receivers: dict[str, Receiver[T]] = { + self._receivers: dict[str, Receiver[_T]] = { str(id): recv for id, recv in enumerate(args) } self._pending: set[asyncio.Task[Any]] = { asyncio.create_task(anext(recv), name=name) for name, recv in self._receivers.items() } - self._results: deque[T] = deque(maxlen=len(self._receivers)) + self._results: deque[_T] = deque(maxlen=len(self._receivers)) def __del__(self) -> None: """Cleanup any pending tasks.""" @@ -102,7 +103,7 @@ async def ready(self) -> bool: asyncio.create_task(anext(self._receivers[name]), name=name) ) - def consume(self) -> T: + def consume(self) -> _T: """Return the latest value once `ready` is complete. Returns: diff --git a/src/frequenz/channels/util/_merge_named.py b/src/frequenz/channels/merge_named.py similarity index 91% rename from src/frequenz/channels/util/_merge_named.py rename to src/frequenz/channels/merge_named.py index 271f72da..3060bcc6 100644 --- a/src/frequenz/channels/util/_merge_named.py +++ b/src/frequenz/channels/merge_named.py @@ -2,29 +2,31 @@ # Copyright © 2022 Frequenz Energy-as-a-Service GmbH """Merge messages coming from channels into a single stream containing name of message.""" + import asyncio import itertools from collections import deque -from typing import Any +from typing import Any, TypeVar + +from ._receiver import Receiver, ReceiverStoppedError -from .._base_classes import Receiver, T -from .._exceptions import ReceiverStoppedError +_T = TypeVar("_T") -class MergeNamed(Receiver[tuple[str, T]]): +class MergeNamed(Receiver[tuple[str, _T]]): """Merge messages coming from multiple named channels into a single stream. When `MergeNamed` is no longer needed, then it should be stopped using `self.stop()` method. This will cleanup any internal pending async tasks. """ - def __init__(self, **kwargs: Receiver[T]) -> None: + def __init__(self, **kwargs: Receiver[_T]) -> None: """Create a `MergeNamed` instance. Args: **kwargs: sequence of channel receivers. """ - self._receivers: dict[str, Receiver[T]] = kwargs + self._receivers: dict[str, Receiver[_T]] = kwargs """The sequence of channel receivers to get the messages to merge.""" self._pending: set[asyncio.Task[Any]] = { @@ -33,7 +35,7 @@ def __init__(self, **kwargs: Receiver[T]) -> None: } """The set of pending tasks to merge messages.""" - self._results: deque[tuple[str, T]] = deque(maxlen=len(self._receivers)) + self._results: deque[tuple[str, _T]] = deque(maxlen=len(self._receivers)) """The internal buffer of merged messages.""" def __del__(self) -> None: @@ -86,7 +88,7 @@ async def ready(self) -> bool: asyncio.create_task(anext(self._receivers[name]), name=name) ) - def consume(self) -> tuple[str, T]: + def consume(self) -> tuple[str, _T]: """Return the latest value once `ready` is complete. Returns: diff --git a/src/frequenz/channels/util/_timer.py b/src/frequenz/channels/timer.py similarity index 94% rename from src/frequenz/channels/util/_timer.py rename to src/frequenz/channels/timer.py index eeed4410..69dd4e30 100644 --- a/src/frequenz/channels/util/_timer.py +++ b/src/frequenz/channels/timer.py @@ -11,14 +11,12 @@ time, which can lead to very hard to reproduce, and debug, issues. """ -from __future__ import annotations - import abc import asyncio from datetime import timedelta +from typing import Self -from .._base_classes import Receiver -from .._exceptions import ReceiverStoppedError +from ._receiver import Receiver, ReceiverStoppedError def _to_microseconds(time: float | timedelta) -> int: @@ -271,7 +269,7 @@ class Timer(Receiver[timedelta]): """A timer receiver that triggers every `interval` time. The timer has microseconds resolution, so the - [`interval`][frequenz.channels.util.Timer.interval] must be at least + [`interval`][frequenz.channels.timer.Timer.interval] must be at least 1 microsecond. The message it produces is a [`timedelta`][datetime.timedelta] containing the drift @@ -284,34 +282,34 @@ class Timer(Receiver[timedelta]): as the timer uses [`asyncio`][asyncio]s loop monotonic clock. If the timer is delayed too much, then it will behave according to the - [`missed_tick_policy`][frequenz.channels.util.Timer.missed_tick_policy]. Missing + [`missed_tick_policy`][frequenz.channels.timer.Timer.missed_tick_policy]. Missing ticks might or might not trigger a message and the drift could be accumulated or not depending on the chosen policy. These are the currently built-in available policies: - * [`SkipMissedAndDrift`][frequenz.channels.util.SkipMissedAndDrift] - * [`SkipMissedAndResync`][frequenz.channels.util.SkipMissedAndResync] - * [`TriggerAllMissed`][frequenz.channels.util.TriggerAllMissed] + * [`SkipMissedAndDrift`][frequenz.channels.timer.SkipMissedAndDrift] + * [`SkipMissedAndResync`][frequenz.channels.timer.SkipMissedAndResync] + * [`TriggerAllMissed`][frequenz.channels.timer.TriggerAllMissed] For the most common cases, a specialized constructor is provided: - * [`periodic()`][frequenz.channels.util.Timer.periodic] (uses the - [`TriggerAllMissed`][frequenz.channels.util.TriggerAllMissed] or - [`SkipMissedAndResync`][frequenz.channels.util.SkipMissedAndResync] policy) - * [`timeout()`][frequenz.channels.util.Timer.timeout] (uses the - [`SkipMissedAndDrift`][frequenz.channels.util.SkipMissedAndDrift] policy) + * [`periodic()`][frequenz.channels.timer.Timer.periodic] (uses the + [`TriggerAllMissed`][frequenz.channels.timer.TriggerAllMissed] or + [`SkipMissedAndResync`][frequenz.channels.timer.SkipMissedAndResync] policy) + * [`timeout()`][frequenz.channels.timer.Timer.timeout] (uses the + [`SkipMissedAndDrift`][frequenz.channels.timer.SkipMissedAndDrift] policy) - The timer accepts an optional [`loop`][frequenz.channels.util.Timer.loop], which + The timer accepts an optional [`loop`][frequenz.channels.timer.Timer.loop], which will be used to track the time. If `loop` is `None`, then the running loop will be used (if there is no running loop most calls will raise a [`RuntimeError`][RuntimeError]). Starting the timer can be delayed if necessary by using `auto_start=False` (for example until we have a running loop). A call to - [`reset()`][frequenz.channels.util.Timer.reset], - [`ready()`][frequenz.channels.util.Timer.ready], - [`receive()`][frequenz.channels.util.Timer.receive] or the async iterator interface + [`reset()`][frequenz.channels.timer.Timer.reset], + [`ready()`][frequenz.channels.timer.Timer.ready], + [`receive()`][frequenz.channels.timer.Timer.receive] or the async iterator interface to await for a new message will start the timer. Example: Periodic timer example @@ -320,13 +318,13 @@ class Timer(Receiver[timedelta]): print(f"The timer has triggered {drift=}") ``` - But you can also use a [`select`][frequenz.channels.util.select] to combine + But you can also use a [`select`][frequenz.channels.select] to combine it with other receivers, and even start it (semi) manually: ```python import logging - from frequenz.channels.util import select, selected_from - from frequenz.channels import Broadcast + from frequenz.channels import select, selected_from + from frequenz.channels.broadcast import Broadcast timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False) chan = Broadcast[int](name="input-chan") @@ -349,8 +347,8 @@ class Timer(Receiver[timedelta]): Example: Timeout example ```python import logging - from frequenz.channels.util import select, selected_from - from frequenz.channels import Broadcast + from frequenz.channels import select, selected_from + from frequenz.channels.broadcast import Broadcast def process_data(data: int): logging.info("Processing data: %d", data) @@ -493,7 +491,7 @@ def timeout( # noqa: DOC502 auto_start: bool = True, start_delay: timedelta = timedelta(0), loop: asyncio.AbstractEventLoop | None = None, - ) -> Timer: + ) -> Self: """Create a timer useful for tracking timeouts. This is basically a shortcut to create a timer with @@ -524,7 +522,7 @@ def timeout( # noqa: DOC502 microsecond; if `start_delay` is negative or `start_delay` was specified but `auto_start` is `False`. """ - return Timer( + return cls( delay, SkipMissedAndDrift(delay_tolerance=timedelta(0)), auto_start=auto_start, @@ -544,7 +542,7 @@ def periodic( # noqa: DOC502 auto_start: bool = True, start_delay: timedelta = timedelta(0), loop: asyncio.AbstractEventLoop | None = None, - ) -> Timer: + ) -> Self: """Create a periodic timer. This is basically a shortcut to create a timer with either @@ -581,7 +579,7 @@ def periodic( # noqa: DOC502 missed_tick_policy = ( SkipMissedAndResync() if skip_missed_ticks else TriggerAllMissed() ) - return Timer( + return cls( period, missed_tick_policy, auto_start=auto_start, diff --git a/src/frequenz/channels/util/__init__.py b/src/frequenz/channels/util/__init__.py deleted file mode 100644 index 515e1ac2..00000000 --- a/src/frequenz/channels/util/__init__.py +++ /dev/null @@ -1,66 +0,0 @@ -# License: MIT -# Copyright © 2022 Frequenz Energy-as-a-Service GmbH - -"""Channel utilities. - -A module with several utilities to work with channels: - -* [Event][frequenz.channels.util.Event]: - A [receiver][frequenz.channels.Receiver] that can be made ready through an event. - -* [FileWatcher][frequenz.channels.util.FileWatcher]: - A [receiver][frequenz.channels.Receiver] that watches for file events. - -* [Merge][frequenz.channels.util.Merge]: - A [receiver][frequenz.channels.Receiver] that merge messages coming from - multiple receivers into a single stream. - -* [MergeNamed][frequenz.channels.util.MergeNamed]: - A [receiver][frequenz.channels.Receiver] that merge messages coming from - multiple receivers into a single named stream, allowing to identify the - origin of each message. - -* [Timer][frequenz.channels.util.Timer]: - A [receiver][frequenz.channels.Receiver] that ticks at certain intervals. - -* [select][frequenz.channels.util.select]: Iterate over the values of all - [receivers][frequenz.channels.Receiver] as new values become available. -""" - -from ._event import Event -from ._file_watcher import FileWatcher -from ._merge import Merge -from ._merge_named import MergeNamed -from ._select import ( - Selected, - SelectError, - SelectErrorGroup, - UnhandledSelectedError, - select, - selected_from, -) -from ._timer import ( - MissedTickPolicy, - SkipMissedAndDrift, - SkipMissedAndResync, - Timer, - TriggerAllMissed, -) - -__all__ = [ - "Event", - "FileWatcher", - "Merge", - "MergeNamed", - "MissedTickPolicy", - "SelectError", - "SelectErrorGroup", - "Selected", - "SkipMissedAndDrift", - "SkipMissedAndResync", - "Timer", - "TriggerAllMissed", - "UnhandledSelectedError", - "select", - "selected_from", -] diff --git a/tests/test_anycast.py b/tests/test_anycast.py index 488877f0..70ac5946 100644 --- a/tests/test_anycast.py +++ b/tests/test_anycast.py @@ -9,13 +9,13 @@ import pytest from frequenz.channels import ( - Anycast, ChannelClosedError, Receiver, ReceiverStoppedError, Sender, SenderError, ) +from frequenz.channels.anycast import Anycast async def test_anycast() -> None: diff --git a/tests/test_bidirectional.py b/tests/test_bidirectional.py index 30d59335..8e77aad9 100644 --- a/tests/test_bidirectional.py +++ b/tests/test_bidirectional.py @@ -8,19 +8,19 @@ import pytest from frequenz.channels import ( - Bidirectional, ChannelClosedError, ChannelError, ReceiverError, SenderError, ) +from frequenz.channels.bidirectional import Bidirectional, Handle async def test_request_response() -> None: """Ensure bi-directional communication is possible.""" req_resp: Bidirectional[int, str] = Bidirectional(name="test_service") - async def service(handle: Bidirectional.Handle[str, int]) -> None: + async def service(handle: Handle[str, int]) -> None: while True: num = await handle.receive() if num is None: @@ -36,7 +36,7 @@ async def service(handle: Bidirectional.Handle[str, int]) -> None: service(req_resp.service_handle), ) - client_handle: Bidirectional.Handle[int, str] = req_resp.client_handle + client_handle: Handle[int, str] = req_resp.client_handle for ctr in range(-5, 5): await client_handle.send(ctr) diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index 8d1f4aec..dc02378e 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -9,7 +9,6 @@ import pytest from frequenz.channels import ( - Broadcast, ChannelClosedError, Receiver, ReceiverInvalidatedError, @@ -17,6 +16,8 @@ Sender, SenderError, ) +from frequenz.channels.broadcast import Broadcast +from frequenz.channels.broadcast import _Receiver as BroadcastReceiver async def test_broadcast() -> None: @@ -112,7 +113,9 @@ async def test_broadcast_overflow() -> None: sender = bcast.new_sender() big_receiver = bcast.new_receiver(name="named-recv", limit=big_recv_size) + assert isinstance(big_receiver, BroadcastReceiver) small_receiver = bcast.new_receiver(limit=small_recv_size) + assert isinstance(small_receiver, BroadcastReceiver) async def drain_receivers() -> tuple[int, int]: big_sum = 0 diff --git a/tests/utils/test_event.py b/tests/test_event.py similarity index 96% rename from tests/utils/test_event.py rename to tests/test_event.py index 0cda9d23..950720d0 100644 --- a/tests/utils/test_event.py +++ b/tests/test_event.py @@ -8,7 +8,7 @@ import pytest as _pytest from frequenz.channels import ReceiverStoppedError -from frequenz.channels.util import Event +from frequenz.channels.event import Event async def test_event() -> None: diff --git a/tests/utils/test_file_watcher.py b/tests/test_file_watcher.py similarity index 85% rename from tests/utils/test_file_watcher.py rename to tests/test_file_watcher.py index bed75bcb..c1a65838 100644 --- a/tests/utils/test_file_watcher.py +++ b/tests/test_file_watcher.py @@ -15,7 +15,7 @@ from watchfiles import Change from watchfiles.main import FileChange -from frequenz.channels.util import FileWatcher +from frequenz.channels.file_watcher import Event, EventType, FileWatcher class _FakeAwatch: @@ -52,7 +52,7 @@ def fake_awatch() -> Iterator[_FakeAwatch]: """Fixture to mock the awatch function.""" fake = _FakeAwatch() with mock.patch( - "frequenz.channels.util._file_watcher.awatch", + "frequenz.channels.file_watcher.awatch", autospec=True, side_effect=fake.fake_awatch, ): @@ -74,14 +74,14 @@ async def test_file_watcher_receive_updates( for change in changes: recv_changes = await file_watcher.receive() - event_type = FileWatcher.EventType(change[0]) + event_type = EventType(change[0]) path = pathlib.Path(change[1]) - assert recv_changes == FileWatcher.Event(type=event_type, path=path) + assert recv_changes == Event(type=event_type, path=path) -@hypothesis.given(event_types=st.sets(st.sampled_from(FileWatcher.EventType))) +@hypothesis.given(event_types=st.sets(st.sampled_from(EventType))) async def test_file_watcher_filter_events( - event_types: set[FileWatcher.EventType], + event_types: set[EventType], ) -> None: """Test the file watcher events filtering.""" good_path = "good-file" @@ -89,7 +89,7 @@ async def test_file_watcher_filter_events( # We need to reset the mock explicitly because hypothesis runs all the produced # inputs in the same context. with mock.patch( - "frequenz.channels.util._file_watcher.awatch", autospec=True + "frequenz.channels.file_watcher.awatch", autospec=True ) as awatch_mock: file_watcher = FileWatcher(paths=[good_path], event_types=event_types) @@ -100,7 +100,7 @@ async def test_file_watcher_filter_events( pathlib.Path(good_path), stop_event=mock.ANY, watch_filter=filter_events ) ] - for event_type in FileWatcher.EventType: + for event_type in EventType: assert filter_events(event_type.value, good_path) == ( event_type in event_types ) diff --git a/tests/utils/test_integration.py b/tests/test_integration.py similarity index 87% rename from tests/utils/test_integration.py rename to tests/test_integration.py index e61cb620..754aca5f 100644 --- a/tests/utils/test_integration.py +++ b/tests/test_integration.py @@ -9,7 +9,9 @@ import pytest -from frequenz.channels.util import FileWatcher, Timer, select, selected_from +from frequenz.channels import select, selected_from +from frequenz.channels.file_watcher import Event, EventType, FileWatcher +from frequenz.channels.timer import Timer @pytest.mark.integration @@ -31,12 +33,8 @@ async def test_file_watcher(tmp_path: pathlib.Path) -> None: if selected_from(selected, timer): filename.write_text(f"{selected.value}") elif selected_from(selected, file_watcher): - event_type = ( - FileWatcher.EventType.CREATE - if number_of_writes == 0 - else FileWatcher.EventType.MODIFY - ) - assert selected.value == FileWatcher.Event(type=event_type, path=filename) + event_type = EventType.CREATE if number_of_writes == 0 else EventType.MODIFY + assert selected.value == Event(type=event_type, path=filename) number_of_writes += 1 # After receiving a write 3 times, unsubscribe from the writes channel if number_of_writes == expected_number_of_writes: @@ -56,9 +54,7 @@ async def test_file_watcher_deletes(tmp_path: pathlib.Path) -> None: tmp_path: A tmp directory to run the file watcher on. Created by pytest. """ filename = tmp_path / "test-file" - file_watcher = FileWatcher( - paths=[str(tmp_path)], event_types={FileWatcher.EventType.DELETE} - ) + file_watcher = FileWatcher(paths=[str(tmp_path)], event_types={EventType.DELETE}) write_timer = Timer.timeout(timedelta(seconds=0.1)) deletion_timer = Timer.timeout(timedelta(seconds=0.25)) diff --git a/tests/test_merge.py b/tests/test_merge.py index 9cc920f2..3d3cf3ab 100644 --- a/tests/test_merge.py +++ b/tests/test_merge.py @@ -5,8 +5,9 @@ import asyncio -from frequenz.channels import Anycast, Sender -from frequenz.channels.util import Merge +from frequenz.channels import Sender +from frequenz.channels.anycast import Anycast +from frequenz.channels.merge import Merge async def test_merge() -> None: diff --git a/tests/test_mergenamed.py b/tests/test_mergenamed.py index 1c06bbe9..1ddcb510 100644 --- a/tests/test_mergenamed.py +++ b/tests/test_mergenamed.py @@ -5,8 +5,9 @@ import asyncio -from frequenz.channels import Anycast, Sender -from frequenz.channels.util import MergeNamed +from frequenz.channels import Sender +from frequenz.channels.anycast import Anycast +from frequenz.channels.merge_named import MergeNamed async def test_mergenamed() -> None: diff --git a/tests/utils/test_select.py b/tests/test_select.py similarity index 93% rename from tests/utils/test_select.py rename to tests/test_select.py index a9a46921..9eb001c5 100644 --- a/tests/utils/test_select.py +++ b/tests/test_select.py @@ -7,8 +7,7 @@ import pytest -from frequenz.channels import Receiver, ReceiverStoppedError -from frequenz.channels.util import Selected, selected_from +from frequenz.channels import Receiver, ReceiverStoppedError, Selected, selected_from class TestSelected: diff --git a/tests/utils/test_select_integration.py b/tests/test_select_integration.py similarity index 99% rename from tests/utils/test_select_integration.py rename to tests/test_select_integration.py index a4984f62..6d676528 100644 --- a/tests/utils/test_select_integration.py +++ b/tests/test_select_integration.py @@ -15,14 +15,15 @@ class at a time. import async_solipsism import pytest -from frequenz.channels import Receiver, ReceiverStoppedError -from frequenz.channels.util import ( - Event, +from frequenz.channels import ( + Receiver, + ReceiverStoppedError, Selected, UnhandledSelectedError, select, selected_from, ) +from frequenz.channels.event import Event @pytest.mark.integration diff --git a/tests/utils/test_timer.py b/tests/test_timer.py similarity index 99% rename from tests/utils/test_timer.py rename to tests/test_timer.py index dd5e5109..73fe28f6 100644 --- a/tests/utils/test_timer.py +++ b/tests/test_timer.py @@ -14,7 +14,7 @@ import pytest from hypothesis import strategies as st -from frequenz.channels.util import ( +from frequenz.channels.timer import ( SkipMissedAndDrift, SkipMissedAndResync, Timer, diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py deleted file mode 100644 index 25e1e6d9..00000000 --- a/tests/utils/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -# License: MIT -# Copyright © 2022 Frequenz Energy-as-a-Service GmbH - -"""Tests for channel utils."""