Skip to content

Commit c700249

Browse files
committed
Don't raise exceptions in Receiver.ready()
Instead, exceptions are raised in Receiver.consume(). This improves the API for several reasons: * There is no need to try: a whole select.ready() loop. Exceptions will be raised when something is actually consumed. * It removes exception handling from tasks monitoring pending receivers. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent d0d87e3 commit c700249

File tree

11 files changed

+210
-92
lines changed

11 files changed

+210
-92
lines changed

RELEASE_NOTES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
* `ChannelClosedError` now requires the argument `channel` (before it was optional).
2020

21+
* Now exceptions are not raised in Receiver.ready() but in Receiver.consume() (receive() or the async iterator `anext`).
22+
2123
## New Features
2224

2325
* New exceptions were added:

src/frequenz/channels/_anycast.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -167,32 +167,44 @@ def __init__(self, chan: Anycast[T]) -> None:
167167
self._chan = chan
168168
self._next: Optional[T] = None
169169

170-
async def ready(self) -> None:
171-
"""Wait until the receiver is ready with a value.
170+
async def ready(self) -> bool:
171+
"""Wait until the receiver is ready with a value or an error.
172172
173-
Raises:
174-
ReceiverStoppedError: if the receiver stopped producing messages.
175-
ReceiverError: if there is some problem with the receiver.
173+
Once a call to `ready()` has finished, the value should be read with
174+
a call to `consume()` (`receive()` or iterated over). The receiver will
175+
remain ready (this method will return immediately) until it is
176+
consumed.
177+
178+
Returns:
179+
Whether the receiver is still active.
176180
"""
177181
# if a message is already ready, then return immediately.
178182
if self._next is not None:
179-
return
183+
return True
180184

181185
while len(self._chan.deque) == 0:
182186
if self._chan.closed:
183-
raise ReceiverStoppedError(self) from ChannelClosedError(self._chan)
187+
return False
184188
async with self._chan.recv_cv:
185189
await self._chan.recv_cv.wait()
186190
self._next = self._chan.deque.popleft()
187191
async with self._chan.send_cv:
188192
self._chan.send_cv.notify(1)
193+
return True
189194

190195
def consume(self) -> T:
191196
"""Return the latest value once `ready()` is complete.
192197
193198
Returns:
194199
The next value that was received.
200+
201+
Raises:
202+
ReceiverStoppedError: if the receiver stopped producing messages.
203+
ReceiverError: if there is some problem with the receiver.
195204
"""
205+
if self._next is None and self._chan.closed:
206+
raise ReceiverStoppedError(self) from ChannelClosedError(self._chan)
207+
196208
assert (
197209
self._next is not None
198210
), "calls to `consume()` must be follow a call to `ready()`"

src/frequenz/channels/_base_classes.py

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,16 @@ async def __anext__(self) -> T:
4949
raise StopAsyncIteration() from exc
5050

5151
@abstractmethod
52-
async def ready(self) -> None:
53-
"""Wait until the receiver is ready with a value.
52+
async def ready(self) -> bool:
53+
"""Wait until the receiver is ready with a value or an error.
5454
55-
Once a call to `ready()` has finished, the value should be read with a call to
56-
`consume()`.
55+
Once a call to `ready()` has finished, the value should be read with
56+
a call to `consume()` (`receive()` or iterated over). The receiver will
57+
remain ready (this method will return immediately) until it is
58+
consumed.
5759
58-
Raises:
59-
ReceiverStoppedError: if the receiver stopped producing messages.
60-
ReceiverError: if there is some problem with the receiver.
60+
Returns:
61+
Whether the receiver is still active.
6162
"""
6263

6364
@abstractmethod
@@ -68,6 +69,10 @@ def consume(self) -> T:
6869
6970
Returns:
7071
The next value received.
72+
73+
Raises:
74+
ReceiverStoppedError: if the receiver stopped producing messages.
75+
ReceiverError: if there is some problem with the receiver.
7176
"""
7277

7378
def __aiter__(self) -> Receiver[T]:
@@ -81,13 +86,13 @@ def __aiter__(self) -> Receiver[T]:
8186
async def receive(self) -> T:
8287
"""Receive a message from the channel.
8388
89+
Returns:
90+
The received message.
91+
8492
Raises:
8593
ReceiverStoppedError: if there is some problem with the receiver.
8694
ReceiverError: if there is some problem with the receiver.
8795
88-
Returns:
89-
The received message.
90-
9196
# noqa: DAR401 __cause__ (https://github.com/terrencepreilly/darglint/issues/181)
9297
"""
9398
try:
@@ -169,14 +174,26 @@ def __init__(self, recv: Receiver[T], transform: Callable[[T], U]) -> None:
169174
self._recv = recv
170175
self._transform = transform
171176

172-
async def ready(self) -> None:
173-
"""Wait until the receiver is ready with a value."""
174-
await self._recv.ready() # pylint: disable=protected-access
177+
async def ready(self) -> bool:
178+
"""Wait until the receiver is ready with a value or an error.
179+
180+
Once a call to `ready()` has finished, the value should be read with
181+
a call to `consume()` (`receive()` or iterated over). The receiver will
182+
remain ready (this method will return immediately) until it is
183+
consumed.
184+
185+
Returns:
186+
Whether the receiver is still active.
187+
"""
188+
return await self._recv.ready() # pylint: disable=protected-access
175189

176190
def consume(self) -> U:
177191
"""Return a transformed value once `ready()` is complete.
178192
179193
Returns:
180194
The next value that was received.
195+
196+
Raises:
197+
ChannelClosedError: if the underlying channel is closed.
181198
"""
182199
return self._transform(self._recv.consume()) # pylint: disable=protected-access

src/frequenz/channels/_bidirectional.py

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,33 @@ async def send(self, msg: V) -> None:
6868
err.__cause__ = this_chan_error
6969
raise err
7070

71-
async def ready(self) -> None:
72-
"""Wait until the receiver is ready with a value.
71+
async def ready(self) -> bool:
72+
"""Wait until the receiver is ready with a value or an error.
73+
74+
Once a call to `ready()` has finished, the value should be read with
75+
a call to `consume()` (`receive()` or iterated over). The receiver will
76+
remain ready (this method will return immediately) until it is
77+
consumed.
78+
79+
Returns:
80+
Whether the receiver is still active.
81+
"""
82+
return await self._receiver.ready() # pylint: disable=protected-access
83+
84+
def consume(self) -> W:
85+
"""Return the latest value once `_ready` is complete.
86+
87+
Returns:
88+
The next value that was received.
7389
7490
Raises:
75-
ReceiverStoppedError: if the receiver stopped producing messages.
91+
ReceiverStoppedError: if there is some problem with the receiver.
7692
ReceiverError: if there is some problem with the receiver.
93+
94+
# noqa: DAR401 err (https://github.com/terrencepreilly/darglint/issues/181)
7795
"""
7896
try:
79-
await self._receiver.ready() # pylint: disable=protected-access
97+
return self._receiver.consume() # pylint: disable=protected-access
8098
except ReceiverError as err:
8199
# If this comes from a channel error, then we inject another
82100
# ChannelError having the information about the Bidirectional
@@ -91,14 +109,6 @@ async def ready(self) -> None:
91109
err.__cause__ = this_chan_error
92110
raise err
93111

94-
def consume(self) -> W:
95-
"""Return the latest value once `_ready` is complete.
96-
97-
Returns:
98-
The next value that was received.
99-
"""
100-
return self._receiver.consume() # pylint: disable=protected-access
101-
102112
def __init__(self, client_id: str, service_id: str) -> None:
103113
"""Create a `Bidirectional` instance.
104114

src/frequenz/channels/_broadcast.py

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -257,29 +257,35 @@ def __len__(self) -> int:
257257
"""
258258
return len(self._q)
259259

260-
async def ready(self) -> None:
261-
"""Wait until the receiver is ready with a value.
260+
async def ready(self) -> bool:
261+
"""Wait until the receiver is ready with a value or an error.
262262
263-
Raises:
264-
ReceiverStoppedError: if there is some problem with the receiver.
265-
ReceiverInvalidatedError: if the receiver was converted into
266-
a peekable.
263+
Once a call to `ready()` has finished, the value should be read with
264+
a call to `consume()` (`receive()` or iterated over). The receiver will
265+
remain ready (this method will return immediately) until it is
266+
consumed.
267+
268+
Returns:
269+
Whether the receiver is still active.
267270
"""
271+
# if there are still messages to consume from the queue, return immediately
272+
if self._q:
273+
return True
274+
275+
# if it is not longer active, return immediately
268276
if not self._active:
269-
raise ReceiverInvalidatedError(
270-
"This receiver was converted into a Peekable so it is not longer valid.",
271-
self,
272-
)
277+
return False
273278

274279
# Use a while loop here, to handle spurious wakeups of condition variables.
275280
#
276281
# The condition also makes sure that if there are already messages ready to be
277282
# consumed, then we return immediately.
278283
while len(self._q) == 0:
279284
if self._chan.closed:
280-
raise ReceiverStoppedError(self) from ChannelClosedError(self._chan)
285+
return False
281286
async with self._chan.recv_cv:
282287
await self._chan.recv_cv.wait()
288+
return True
283289

284290
def _deactivate(self) -> None:
285291
"""Set the receiver as inactive and remove it from the channel."""
@@ -292,10 +298,23 @@ def consume(self) -> T:
292298
293299
Returns:
294300
The next value that was received.
301+
302+
Raises:
303+
ReceiverStoppedError: if there is some problem with the receiver.
304+
ReceiverInvalidatedError: if the receiver was converted into
305+
a peekable.
295306
"""
307+
if not self._q and not self._active:
308+
raise ReceiverInvalidatedError(
309+
"This receiver was converted into a Peekable so it is not longer valid.",
310+
self,
311+
)
312+
313+
if not self._q and self._chan.closed:
314+
raise ReceiverStoppedError(self) from ChannelClosedError(self._chan)
315+
296316
assert self._q, "calls to `consume()` must be follow a call to `ready()`"
297-
ret = self._q.popleft()
298-
return ret
317+
return self._q.popleft()
299318

300319
def into_peekable(self) -> Peekable[T]:
301320
"""Convert the `Receiver` implementation into a `Peekable`.

src/frequenz/channels/util/_file_watcher.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ def __init__(
5353
and pathlib.Path(path_str).is_file()
5454
),
5555
)
56+
self._awatch_stopped_exc: Optional[Exception] = None
5657
self._changes: Set[FileChange] = set()
5758

5859
def __del__(self) -> None:
@@ -64,31 +65,44 @@ def __del__(self) -> None:
6465
"""
6566
self._stop_event.set()
6667

67-
async def ready(self) -> None:
68-
"""Wait for the next file event and return its path.
68+
async def ready(self) -> bool:
69+
"""Wait until the receiver is ready with a value or an error.
6970
70-
Returns:
71-
Path of next file.
71+
Once a call to `ready()` has finished, the value should be read with
72+
a call to `consume()` (`receive()` or iterated over). The receiver will
73+
remain ready (this method will return immediately) until it is
74+
consumed.
7275
73-
Raises:
74-
ReceiverStoppedError: if the receiver stopped producing messages.
75-
ReceiverError: if there is some problem with the receiver.
76+
Returns:
77+
Whether the receiver is still active.
7678
"""
7779
# if there are messages waiting to be consumed, return immediately.
7880
if self._changes:
79-
return
81+
return True
82+
83+
# if it was already stopped, return immediately.
84+
if self._awatch_stopped_exc is not None:
85+
return False
8086

8187
try:
8288
self._changes = await self._awatch.__anext__()
8389
except StopAsyncIteration as err:
84-
raise ReceiverStoppedError(self) from err
90+
self._awatch_stopped_exc = err
91+
92+
return True
8593

8694
def consume(self) -> pathlib.Path:
8795
"""Return the latest change once `ready` is complete.
8896
8997
Returns:
9098
The next change that was received.
99+
100+
Raises:
101+
ReceiverStoppedError: if there is some problem with the receiver.
91102
"""
103+
if not self._changes and self._awatch_stopped_exc is not None:
104+
raise ReceiverStoppedError(self) from self._awatch_stopped_exc
105+
92106
assert self._changes, "calls to `consume()` must be follow a call to `ready()`"
93107
change = self._changes.pop()
94108
# Tuple of (Change, path) returned by watchfiles

src/frequenz/channels/util/_merge.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,22 +55,28 @@ async def stop(self) -> None:
5555
await asyncio.gather(*self._pending, return_exceptions=True)
5656
self._pending = set()
5757

58-
async def ready(self) -> None:
59-
"""Wait until the receiver is ready with a value.
58+
async def ready(self) -> bool:
59+
"""Wait until the receiver is ready with a value or an error.
6060
61-
Raises:
62-
ReceiverStoppedError: if the receiver stopped producing messages.
63-
ReceiverError: if there is some problem with the receiver.
61+
Once a call to `ready()` has finished, the value should be read with
62+
a call to `consume()` (`receive()` or iterated over). The receiver will
63+
remain ready (this method will return immediately) until it is
64+
consumed.
65+
66+
Returns:
67+
Whether the receiver is still active.
6468
"""
6569
# we use a while loop to continue to wait for new data, in case the
6670
# previous `wait` completed because a channel was closed.
6771
while True:
6872
# if there are messages waiting to be consumed, return immediately.
6973
if len(self._results) > 0:
70-
return
74+
return True
7175

76+
# if there are no more pending receivers, we return immediately.
7277
if len(self._pending) == 0:
73-
raise ReceiverStoppedError(self)
78+
return False
79+
7480
done, self._pending = await asyncio.wait(
7581
self._pending, return_when=asyncio.FIRST_COMPLETED
7682
)
@@ -91,7 +97,14 @@ def consume(self) -> T:
9197
9298
Returns:
9399
The next value that was received.
100+
101+
Raises:
102+
ReceiverStoppedError: if the receiver stopped producing messages.
103+
ReceiverError: if there is some problem with the receiver.
94104
"""
105+
if not self._results and not self._pending:
106+
raise ReceiverStoppedError(self)
107+
95108
assert self._results, "calls to `consume()` must be follow a call to `ready()`"
96109

97110
return self._results.popleft()

0 commit comments

Comments
 (0)