Skip to content

Commit 92f1a01

Browse files
authored
Improve and add exceptions (#61)
- Properly await for service task - Make output more verbose for darglint - Add base exception - Make `send()` raise a `SenderError` - Make receivers raise `ReceiverError` - Move exceptions into its own module - Add a `ReceiverInvalidatedError` - Don't raise exceptions in Receiver.ready() Fixes #52.
2 parents ff7e347 + c700249 commit 92f1a01

File tree

16 files changed

+568
-166
lines changed

16 files changed

+568
-166
lines changed

RELEASE_NOTES.md

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,33 @@
66

77
## Upgrading
88

9-
<!-- Here goes notes on how to upgrade from previous versions, including if there are any depractions and what they should be replaced with -->
9+
* The `Sender.send()` method now `raise`s a `SenderError` instead of returning `False`. The `SenderError` will typically have a `ChannelClosedError` and the underlying reason as a chained exception.
10+
11+
* The `Receiver.ready()` method (and related `receive()` and `__anext__` when used as an async iterator) now `raise`s a `ReceiverError` and in particular a `ReceiverStoppedError` when the receiver has no more messages to receive.
12+
13+
`Receiver.consume()` doesn't raise any exceptions.
14+
15+
Receivers raising `EOFError` now raise `ReceiverInvalidatedError` instead.
16+
17+
* For channels which senders raise an error when the channel is closed or which receivers stop receiving when the channel is closed, the `SenderError` and `ReceiverStoppedError` are chained with a `__cause__` that is a `ChannelClosedError` with the channel that was closed.
18+
19+
* `ChannelClosedError` now requires the argument `channel` (before it was optional).
20+
21+
* Now exceptions are not raised in Receiver.ready() but in Receiver.consume() (receive() or the async iterator `anext`).
1022

1123
## New Features
1224

13-
<!-- Here goes the main new features and examples or instructions on how to use them -->
25+
* New exceptions were added:
26+
27+
* `Error`: A base exception from which all exceptions from this library inherit.
28+
29+
* `SendError`: Raised for errors when sending messages.
30+
31+
* `ReceiverError`: Raised for errors when receiving messages.
32+
33+
* `ReceiverClosedError`: Raised when a receiver don't have more messages to receive.
34+
35+
* `ReceiverInvalidatedError`: Raised when a receiver was invalidated (for example it was converted into a `Peekable`).
1436

1537
## Bug Fixes
1638

noxfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def docstrings(session: nox.Session) -> None:
6666
# Darglint checks that function argument and return values are documented.
6767
# This is needed only for the `src` dir, so we exclude the other top level
6868
# dirs that contain code.
69-
session.run("darglint", "src")
69+
session.run("darglint", "-v2", "src")
7070

7171

7272
@nox.session

src/frequenz/channels/__init__.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,27 +38,57 @@
3838
3939
Exception classes:
4040
41+
* [Error][frequenz.channels.Error]: Base class for all errors in this
42+
library.
43+
4144
* [ChannelError][frequenz.channels.ChannelError]: Base class for all errors
4245
related to channels.
4346
4447
* [ChannelClosedError][frequenz.channels.ChannelClosedError]: Error raised when
4548
trying to operate (send, receive, etc.) through a closed channel.
49+
50+
* [SenderError][frequenz.channels.SenderError]: Base class for all errors
51+
related to senders.
52+
53+
* [ReceiverError][frequenz.channels.ReceiverError]: Base class for all errors
54+
related to receivers.
55+
56+
* [ReceiverStoppedError][frequenz.channels.ReceiverStoppedError]: A receiver
57+
stopped producing messages.
58+
59+
* [ReceiverInvalidatedError][frequenz.channels.ReceiverInvalidatedError]:
60+
A receiver is not longer valid (for example if it was converted into
61+
a peekable.
4662
"""
4763

4864
from . import util
4965
from ._anycast import Anycast
50-
from ._base_classes import ChannelClosedError, ChannelError, Peekable, Receiver, Sender
66+
from ._base_classes import Peekable, Receiver, Sender
5167
from ._bidirectional import Bidirectional
5268
from ._broadcast import Broadcast
69+
from ._exceptions import (
70+
ChannelClosedError,
71+
ChannelError,
72+
Error,
73+
ReceiverError,
74+
ReceiverInvalidatedError,
75+
ReceiverStoppedError,
76+
SenderError,
77+
)
5378

5479
__all__ = [
5580
"Anycast",
5681
"Bidirectional",
5782
"Broadcast",
5883
"ChannelClosedError",
5984
"ChannelError",
85+
"Error",
6086
"Peekable",
6187
"Receiver",
88+
"ReceiverError",
89+
"ReceiverInvalidatedError",
90+
"ReceiverStoppedError",
6291
"Sender",
92+
"SenderError",
6393
"util",
6494
]

src/frequenz/channels/_anycast.py

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@
99
from collections import deque
1010
from typing import Deque, Generic, Optional
1111

12-
from ._base_classes import ChannelClosedError
1312
from ._base_classes import Receiver as BaseReceiver
1413
from ._base_classes import Sender as BaseSender
1514
from ._base_classes import T
15+
from ._exceptions import ChannelClosedError, ReceiverStoppedError, SenderError
1616

1717

1818
class Anycast(Generic[T]):
@@ -123,7 +123,7 @@ def __init__(self, chan: Anycast[T]) -> None:
123123
"""
124124
self._chan = chan
125125

126-
async def send(self, msg: T) -> bool:
126+
async def send(self, msg: T) -> None:
127127
"""Send a message across the channel.
128128
129129
To send, this method inserts the message into the Anycast channel's
@@ -134,19 +134,21 @@ async def send(self, msg: T) -> bool:
134134
Args:
135135
msg: The message to be sent.
136136
137-
Returns:
138-
Whether the message was sent, based on whether the channel is open
139-
or not.
137+
Raises:
138+
SenderError: if the underlying channel was closed.
139+
A [ChannelClosedError][frequenz.channels.ChannelClosedError] is
140+
set as the cause.
140141
"""
141142
if self._chan.closed:
142-
return False
143+
raise SenderError("The channel was closed", self) from ChannelClosedError(
144+
self._chan
145+
)
143146
while len(self._chan.deque) == self._chan.deque.maxlen:
144147
async with self._chan.send_cv:
145148
await self._chan.send_cv.wait()
146149
self._chan.deque.append(msg)
147150
async with self._chan.recv_cv:
148151
self._chan.recv_cv.notify(1)
149-
return True
150152

151153

152154
class Receiver(BaseReceiver[T]):
@@ -165,31 +167,44 @@ def __init__(self, chan: Anycast[T]) -> None:
165167
self._chan = chan
166168
self._next: Optional[T] = None
167169

168-
async def ready(self) -> None:
169-
"""Wait until the receiver is ready with a value.
170+
async def ready(self) -> bool:
171+
"""Wait until the receiver is ready with a value or an error.
170172
171-
Raises:
172-
ChannelClosedError: if the underlying channel is closed.
173+
Once a call to `ready()` has finished, the value should be read with
174+
a call to `consume()` (`receive()` or iterated over). The receiver will
175+
remain ready (this method will return immediately) until it is
176+
consumed.
177+
178+
Returns:
179+
Whether the receiver is still active.
173180
"""
174181
# if a message is already ready, then return immediately.
175182
if self._next is not None:
176-
return
183+
return True
177184

178185
while len(self._chan.deque) == 0:
179186
if self._chan.closed:
180-
raise ChannelClosedError()
187+
return False
181188
async with self._chan.recv_cv:
182189
await self._chan.recv_cv.wait()
183190
self._next = self._chan.deque.popleft()
184191
async with self._chan.send_cv:
185192
self._chan.send_cv.notify(1)
193+
return True
186194

187195
def consume(self) -> T:
188196
"""Return the latest value once `ready()` is complete.
189197
190198
Returns:
191199
The next value that was received.
200+
201+
Raises:
202+
ReceiverStoppedError: if the receiver stopped producing messages.
203+
ReceiverError: if there is some problem with the receiver.
192204
"""
205+
if self._next is None and self._chan.closed:
206+
raise ReceiverStoppedError(self) from ChannelClosedError(self._chan)
207+
193208
assert (
194209
self._next is not None
195210
), "calls to `consume()` must be follow a call to `ready()`"

src/frequenz/channels/_base_classes.py

Lines changed: 52 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -6,54 +6,26 @@
66
from __future__ import annotations
77

88
from abc import ABC, abstractmethod
9-
from typing import Any, Callable, Generic, Optional, TypeVar
9+
from typing import Callable, Generic, Optional, TypeVar
10+
11+
from ._exceptions import ReceiverStoppedError
1012

1113
T = TypeVar("T")
1214
U = TypeVar("U")
1315

1416

15-
class ChannelError(RuntimeError):
16-
"""Base channel error.
17-
18-
All exceptions generated by channels inherit from this exception.
19-
"""
20-
21-
def __init__(self, message: Any, channel: Any = None):
22-
"""Create a ChannelError instance.
23-
24-
Args:
25-
message: An error message.
26-
channel: A reference to the channel that encountered the error.
27-
"""
28-
super().__init__(message)
29-
self.channel: Any = channel
30-
31-
32-
class ChannelClosedError(ChannelError):
33-
"""Error raised when trying to operate on a closed channel."""
34-
35-
def __init__(self, channel: Any = None):
36-
"""Create a `ChannelClosedError` instance.
37-
38-
Args:
39-
channel: A reference to the channel that was closed.
40-
"""
41-
super().__init__(f"Channel {channel} was closed", channel)
42-
43-
4417
class Sender(ABC, Generic[T]):
4518
"""A channel Sender."""
4619

4720
@abstractmethod
48-
async def send(self, msg: T) -> bool:
21+
async def send(self, msg: T) -> None:
4922
"""Send a message to the channel.
5023
5124
Args:
5225
msg: The message to be sent.
5326
54-
Returns:
55-
Whether the message was sent, based on whether the channel is open
56-
or not.
27+
Raises:
28+
SenderError: if there was an error sending the message.
5729
"""
5830

5931

@@ -67,23 +39,26 @@ async def __anext__(self) -> T:
6739
The next value received.
6840
6941
Raises:
70-
StopAsyncIteration: if the underlying channel is closed.
42+
StopAsyncIteration: if the receiver stopped producing messages.
43+
ReceiverError: if there is some problem with the receiver.
7144
"""
7245
try:
7346
await self.ready()
7447
return self.consume()
75-
except ChannelClosedError as exc:
48+
except ReceiverStoppedError as exc:
7649
raise StopAsyncIteration() from exc
7750

7851
@abstractmethod
79-
async def ready(self) -> None:
80-
"""Wait until the receiver is ready with a value.
52+
async def ready(self) -> bool:
53+
"""Wait until the receiver is ready with a value or an error.
8154
82-
Once a call to `ready()` has finished, the value should be read with a call to
83-
`consume()`.
55+
Once a call to `ready()` has finished, the value should be read with
56+
a call to `consume()` (`receive()` or iterated over). The receiver will
57+
remain ready (this method will return immediately) until it is
58+
consumed.
8459
85-
Raises:
86-
ChannelClosedError: if the underlying channel is closed.
60+
Returns:
61+
Whether the receiver is still active.
8762
"""
8863

8964
@abstractmethod
@@ -96,7 +71,8 @@ def consume(self) -> T:
9671
The next value received.
9772
9873
Raises:
99-
ChannelClosedError: if the underlying channel is closed.
74+
ReceiverStoppedError: if the receiver stopped producing messages.
75+
ReceiverError: if there is some problem with the receiver.
10076
"""
10177

10278
def __aiter__(self) -> Receiver[T]:
@@ -110,16 +86,30 @@ def __aiter__(self) -> Receiver[T]:
11086
async def receive(self) -> T:
11187
"""Receive a message from the channel.
11288
113-
Raises:
114-
ChannelClosedError: if the underlying channel is closed.
115-
11689
Returns:
11790
The received message.
91+
92+
Raises:
93+
ReceiverStoppedError: if there is some problem with the receiver.
94+
ReceiverError: if there is some problem with the receiver.
95+
96+
# noqa: DAR401 __cause__ (https://github.com/terrencepreilly/darglint/issues/181)
11897
"""
11998
try:
12099
received = await self.__anext__() # pylint: disable=unnecessary-dunder-call
121100
except StopAsyncIteration as exc:
122-
raise ChannelClosedError() from exc
101+
# If we already had a cause and it was the receiver was stopped,
102+
# then reuse that error, as StopAsyncIteration is just an artifact
103+
# introduced by __anext__.
104+
if (
105+
isinstance(exc.__cause__, ReceiverStoppedError)
106+
# pylint is not smart enough to figure out we checked above
107+
# this is a ReceiverStoppedError and thus it does have
108+
# a receiver member
109+
and exc.__cause__.receiver is self # pylint: disable=no-member
110+
):
111+
raise exc.__cause__
112+
raise ReceiverStoppedError(self) from exc
123113
return received
124114

125115
def map(self, call: Callable[[T], U]) -> Receiver[U]:
@@ -184,14 +174,26 @@ def __init__(self, recv: Receiver[T], transform: Callable[[T], U]) -> None:
184174
self._recv = recv
185175
self._transform = transform
186176

187-
async def ready(self) -> None:
188-
"""Wait until the receiver is ready with a value."""
189-
await self._recv.ready() # pylint: disable=protected-access
177+
async def ready(self) -> bool:
178+
"""Wait until the receiver is ready with a value or an error.
179+
180+
Once a call to `ready()` has finished, the value should be read with
181+
a call to `consume()` (`receive()` or iterated over). The receiver will
182+
remain ready (this method will return immediately) until it is
183+
consumed.
184+
185+
Returns:
186+
Whether the receiver is still active.
187+
"""
188+
return await self._recv.ready() # pylint: disable=protected-access
190189

191190
def consume(self) -> U:
192191
"""Return a transformed value once `ready()` is complete.
193192
194193
Returns:
195194
The next value that was received.
195+
196+
Raises:
197+
ChannelClosedError: if the underlying channel is closed.
196198
"""
197199
return self._transform(self._recv.consume()) # pylint: disable=protected-access

0 commit comments

Comments
 (0)