Skip to content

Commit 89cde6d

Browse files
committed
Make receivers raise ReceiverError
The `Receiver.ready()` method (and related `receive()` and `__anext__` when used as an async iterator) now `raise`s a `ReceiverError` and in particular a `ReceiverStoppedError` when the receiver has no more messages to receive. `Receiver.consume()` doesn't raise any exceptions now (except for `AssertionError` if there are programming errors). Receivers raising `EOFError` now raise `ReceiverError` instead. For channels which receivers stop receiving when the channel is closed, the `ReceiverStoppedError` is chained with a `__cause__` that is a `ChannelClosedError` with the channel that was closed. Since now there is no need to use `ChannelClosedError` without specifying a channel, now it requires the argument `channel` (before it was optional). Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 8f3c763 commit 89cde6d

File tree

14 files changed

+190
-61
lines changed

14 files changed

+190
-61
lines changed

RELEASE_NOTES.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,16 @@
88

99
* The `Sender.send()` method now `raise`s a `SenderError` instead of returning `False`. The `SenderError` will typically have a `ChannelClosedError` and the underlying reason as a chained exception.
1010

11+
* The `Receiver.ready()` method (and related `receive()` and `__anext__` when used as an async iterator) now `raise`s a `ReceiverError` and in particular a `ReceiverStoppedError` when the receiver has no more messages to receive.
12+
13+
`Receiver.consume()` doesn't raise any exceptions.
14+
15+
Receivers raising `EOFError` now raise `ReceiverError` instead.
16+
17+
* For channels which senders raise an error when the channel is closed or which receivers stop receiving when the channel is closed, the `SenderError` and `ReceiverStoppedError` are chained with a `__cause__` that is a `ChannelClosedError` with the channel that was closed.
18+
19+
* `ChannelClosedError` now requires the argument `channel` (before it was optional).
20+
1121
## New Features
1222

1323
* New exceptions were added:
@@ -16,6 +26,10 @@
1626

1727
* `SendError`: Raised for errors when sending messages.
1828

29+
* `ReceiverError`: Raised for errors when receiving messages.
30+
31+
* `ReceiverClosedError`: Raised when a receiver don't have more messages to receive.
32+
1933
## Bug Fixes
2034

2135
<!-- Here goes notable bug fixes that are worth a special mention or explanation -->

src/frequenz/channels/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@
4949
5050
* [SenderError][frequenz.channels.SenderError]: Base class for all errors
5151
related to senders.
52+
53+
* [ReceiverError][frequenz.channels.ReceiverError]: Base class for all errors
54+
related to receivers.
55+
56+
* [ReceiverStoppedError][frequenz.channels.ReceiverStoppedError]: A receiver
57+
stopped producing messages.
5258
"""
5359

5460
from . import util
@@ -59,6 +65,8 @@
5965
Error,
6066
Peekable,
6167
Receiver,
68+
ReceiverError,
69+
ReceiverStoppedError,
6270
Sender,
6371
SenderError,
6472
)
@@ -74,6 +82,8 @@
7482
"Error",
7583
"Peekable",
7684
"Receiver",
85+
"ReceiverError",
86+
"ReceiverStoppedError",
7787
"Sender",
7888
"SenderError",
7989
"util",

src/frequenz/channels/_anycast.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
from ._base_classes import ChannelClosedError
1313
from ._base_classes import Receiver as BaseReceiver
14+
from ._base_classes import ReceiverStoppedError
1415
from ._base_classes import Sender as BaseSender
1516
from ._base_classes import SenderError, T
1617

@@ -171,15 +172,16 @@ async def ready(self) -> None:
171172
"""Wait until the receiver is ready with a value.
172173
173174
Raises:
174-
ChannelClosedError: if the underlying channel is closed.
175+
ReceiverStoppedError: if the receiver stopped producing messages.
176+
ReceiverError: if there is some problem with the receiver.
175177
"""
176178
# if a message is already ready, then return immediately.
177179
if self._next is not None:
178180
return
179181

180182
while len(self._chan.deque) == 0:
181183
if self._chan.closed:
182-
raise ChannelClosedError()
184+
raise ReceiverStoppedError(self) from ChannelClosedError(self._chan)
183185
async with self._chan.recv_cv:
184186
await self._chan.recv_cv.wait()
185187
self._next = self._chan.deque.popleft()

src/frequenz/channels/_base_classes.py

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class ChannelError(Error):
3333
All exceptions generated by channels inherit from this exception.
3434
"""
3535

36-
def __init__(self, message: Any, channel: Any = None):
36+
def __init__(self, message: Any, channel: Any):
3737
"""Create a ChannelError instance.
3838
3939
Args:
@@ -47,7 +47,7 @@ def __init__(self, message: Any, channel: Any = None):
4747
class ChannelClosedError(ChannelError):
4848
"""Error raised when trying to operate on a closed channel."""
4949

50-
def __init__(self, channel: Any = None):
50+
def __init__(self, channel: Any):
5151
"""Create a `ChannelClosedError` instance.
5252
5353
Args:
@@ -74,6 +74,37 @@ def __init__(self, message: Any, sender: Sender[T]):
7474
self.sender: Sender[T] = sender
7575

7676

77+
class ReceiverError(Error, Generic[T]):
78+
"""An error produced in a [Receiver][frequenz.channels.Receiver].
79+
80+
All exceptions generated by receivers inherit from this exception.
81+
"""
82+
83+
def __init__(self, message: Any, receiver: Receiver[T]):
84+
"""Create an instance.
85+
86+
Args:
87+
message: An error message.
88+
receiver: The [Receiver][frequenz.channels.Receiver] where the
89+
error happened.
90+
"""
91+
super().__init__(message)
92+
self.receiver: Receiver[T] = receiver
93+
94+
95+
class ReceiverStoppedError(ReceiverError[T]):
96+
"""The [Receiver][frequenz.channels.Receiver] stopped producing messages."""
97+
98+
def __init__(self, receiver: Receiver[T]):
99+
"""Create an instance.
100+
101+
Args:
102+
receiver: The [Receiver][frequenz.channels.Receiver] where the
103+
error happened.
104+
"""
105+
super().__init__(f"Receiver {receiver} was stopped", receiver)
106+
107+
77108
class Sender(ABC, Generic[T]):
78109
"""A channel Sender."""
79110

@@ -99,12 +130,13 @@ async def __anext__(self) -> T:
99130
The next value received.
100131
101132
Raises:
102-
StopAsyncIteration: if the underlying channel is closed.
133+
StopAsyncIteration: if the receiver stopped producing messages.
134+
ReceiverError: if there is some problem with the receiver.
103135
"""
104136
try:
105137
await self.ready()
106138
return self.consume()
107-
except ChannelClosedError as exc:
139+
except ReceiverStoppedError as exc:
108140
raise StopAsyncIteration() from exc
109141

110142
@abstractmethod
@@ -115,7 +147,8 @@ async def ready(self) -> None:
115147
`consume()`.
116148
117149
Raises:
118-
ChannelClosedError: if the underlying channel is closed.
150+
ReceiverStoppedError: if the receiver stopped producing messages.
151+
ReceiverError: if there is some problem with the receiver.
119152
"""
120153

121154
@abstractmethod
@@ -126,9 +159,6 @@ def consume(self) -> T:
126159
127160
Returns:
128161
The next value received.
129-
130-
Raises:
131-
ChannelClosedError: if the underlying channel is closed.
132162
"""
133163

134164
def __aiter__(self) -> Receiver[T]:
@@ -143,15 +173,29 @@ async def receive(self) -> T:
143173
"""Receive a message from the channel.
144174
145175
Raises:
146-
ChannelClosedError: if the underlying channel is closed.
176+
ReceiverStoppedError: if there is some problem with the receiver.
177+
ReceiverError: if there is some problem with the receiver.
147178
148179
Returns:
149180
The received message.
181+
182+
# noqa: DAR401 __cause__ (https://github.com/terrencepreilly/darglint/issues/181)
150183
"""
151184
try:
152185
received = await self.__anext__() # pylint: disable=unnecessary-dunder-call
153186
except StopAsyncIteration as exc:
154-
raise ChannelClosedError() from exc
187+
# If we already had a cause and it was the receiver was stopped,
188+
# then reuse that error, as StopAsyncIteration is just an artifact
189+
# introduced by __anext__.
190+
if (
191+
isinstance(exc.__cause__, ReceiverStoppedError)
192+
# pylint is not smart enough to figure out we checked above
193+
# this is a ReceiverStoppedError and thus it does have
194+
# a receiver member
195+
and exc.__cause__.receiver is self # pylint: disable=no-member
196+
):
197+
raise exc.__cause__
198+
raise ReceiverStoppedError(self) from exc
155199
return received
156200

157201
def map(self, call: Callable[[T], U]) -> Receiver[U]:

src/frequenz/channels/_bidirectional.py

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,15 @@
77

88
from typing import Generic, TypeVar
99

10-
from ._base_classes import ChannelError, Receiver, Sender, SenderError, T, U
10+
from ._base_classes import (
11+
ChannelError,
12+
Receiver,
13+
ReceiverError,
14+
Sender,
15+
SenderError,
16+
T,
17+
U,
18+
)
1119
from ._broadcast import Broadcast
1220

1321
V = TypeVar("V")
@@ -68,8 +76,27 @@ async def send(self, msg: V) -> None:
6876
raise err
6977

7078
async def ready(self) -> None:
71-
"""Wait until the receiver is ready with a value."""
72-
await self._receiver.ready() # pylint: disable=protected-access
79+
"""Wait until the receiver is ready with a value.
80+
81+
Raises:
82+
ReceiverStoppedError: if the receiver stopped producing messages.
83+
ReceiverError: if there is some problem with the receiver.
84+
"""
85+
try:
86+
await self._receiver.ready() # pylint: disable=protected-access
87+
except ReceiverError as err:
88+
# If this comes from a channel error, then we inject another
89+
# ChannelError having the information about the Bidirectional
90+
# channel to hide (at least partially) the underlaying
91+
# Broadcast channels we use.
92+
if isinstance(err.__cause__, ChannelError):
93+
this_chan_error = ChannelError(
94+
f"Error in the underlying channel {err.__cause__.channel}: {err.__cause__}",
95+
self._chan, # pylint: disable=protected-access
96+
)
97+
this_chan_error.__cause__ = err.__cause__
98+
err.__cause__ = this_chan_error
99+
raise err
73100

74101
def consume(self) -> W:
75102
"""Return the latest value once `_ready` is complete.

src/frequenz/channels/_broadcast.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from ._base_classes import ChannelClosedError
1616
from ._base_classes import Peekable as BasePeekable
1717
from ._base_classes import Receiver as BaseReceiver
18+
from ._base_classes import ReceiverError, ReceiverStoppedError
1819
from ._base_classes import Sender as BaseSender
1920
from ._base_classes import SenderError, T
2021

@@ -256,19 +257,19 @@ async def ready(self) -> None:
256257
"""Wait until the receiver is ready with a value.
257258
258259
Raises:
259-
EOFError: if this receiver is no longer active.
260-
ChannelClosedError: if the underlying channel is closed.
260+
ReceiverStoppedError: if there is some problem with the receiver.
261+
ReceiverError: if the receiver is not longer active.
261262
"""
262263
if not self._active:
263-
raise EOFError("This receiver is no longer active.")
264+
raise ReceiverError("This receiver is no longer active.", self)
264265

265266
# Use a while loop here, to handle spurious wakeups of condition variables.
266267
#
267268
# The condition also makes sure that if there are already messages ready to be
268269
# consumed, then we return immediately.
269270
while len(self._q) == 0:
270271
if self._chan.closed:
271-
raise ChannelClosedError()
272+
raise ReceiverStoppedError(self) from ChannelClosedError(self._chan)
272273
async with self._chan.recv_cv:
273274
await self._chan.recv_cv.wait()
274275

src/frequenz/channels/util/_file_watcher.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from watchfiles import Change, awatch
1111
from watchfiles.main import FileChange
1212

13-
from .._base_classes import Receiver
13+
from .._base_classes import Receiver, ReceiverStoppedError
1414

1515

1616
class FileWatcher(Receiver[pathlib.Path]):
@@ -66,24 +66,25 @@ def __del__(self) -> None:
6666
async def ready(self) -> None:
6767
"""Wait for the next file event and return its path.
6868
69-
Raises:
70-
StopAsyncIteration: When the channel is closed.
71-
7269
Returns:
7370
Path of next file.
71+
72+
Raises:
73+
ReceiverStoppedError: if the receiver stopped producing messages.
74+
ReceiverError: if there is some problem with the receiver.
7475
"""
7576
# if there are messages waiting to be consumed, return immediately.
7677
if self._changes:
7778
return
7879

79-
self._changes = await self._awatch.__anext__()
80+
try:
81+
self._changes = await self._awatch.__anext__()
82+
except StopAsyncIteration as err:
83+
raise ReceiverStoppedError(self) from err
8084

8185
def consume(self) -> pathlib.Path:
8286
"""Return the latest change once `ready` is complete.
8387
84-
Raises:
85-
ChannelClosedError: When the channel is closed.
86-
8788
Returns:
8889
The next change that was received.
8990
"""

src/frequenz/channels/util/_merge.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from collections import deque
88
from typing import Any, Deque, Set
99

10-
from .._base_classes import ChannelClosedError, Receiver, T
10+
from .._base_classes import Receiver, ReceiverStoppedError, T
1111

1212

1313
class Merge(Receiver[T]):
@@ -58,7 +58,8 @@ async def ready(self) -> None:
5858
"""Wait until the receiver is ready with a value.
5959
6060
Raises:
61-
ChannelClosedError: if the underlying channel is closed.
61+
ReceiverStoppedError: if the receiver stopped producing messages.
62+
ReceiverError: if there is some problem with the receiver.
6263
"""
6364
# we use a while loop to continue to wait for new data, in case the
6465
# previous `wait` completed because a channel was closed.
@@ -68,7 +69,7 @@ async def ready(self) -> None:
6869
return
6970

7071
if len(self._pending) == 0:
71-
raise ChannelClosedError()
72+
raise ReceiverStoppedError(self)
7273
done, self._pending = await asyncio.wait(
7374
self._pending, return_when=asyncio.FIRST_COMPLETED
7475
)
@@ -87,9 +88,6 @@ async def ready(self) -> None:
8788
def consume(self) -> T:
8889
"""Return the latest value once `ready` is complete.
8990
90-
Raises:
91-
EOFError: When called before a call to `ready()` finishes.
92-
9391
Returns:
9492
The next value that was received.
9593
"""

0 commit comments

Comments
 (0)