Skip to content

Commit 8f3c763

Browse files
committed
Make send() raise a SenderError
Instead of returning a `bool`, which can be easily overlooked, it now raises a new `SenerError`. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent dada006 commit 8f3c763

File tree

9 files changed

+123
-35
lines changed

9 files changed

+123
-35
lines changed

RELEASE_NOTES.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@
66

77
## Upgrading
88

9-
<!-- Here goes notes on how to upgrade from previous versions, including if there are any depractions and what they should be replaced with -->
9+
* The `Sender.send()` method now `raise`s a `SenderError` instead of returning `False`. The `SenderError` will typically have a `ChannelClosedError` and the underlying reason as a chained exception.
1010

1111
## New Features
1212

13-
* A new base exception `frequenz.channels.Error` was added. All exceptions from
14-
this library inherit from this exception.
13+
* New exceptions were added:
14+
15+
* `Error`: A base exception from which all exceptions from this library inherit.
16+
17+
* `SendError`: Raised for errors when sending messages.
1518

1619
## Bug Fixes
1720

src/frequenz/channels/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@
4646
4747
* [ChannelClosedError][frequenz.channels.ChannelClosedError]: Error raised when
4848
trying to operate (send, receive, etc.) through a closed channel.
49+
50+
* [SenderError][frequenz.channels.SenderError]: Base class for all errors
51+
related to senders.
4952
"""
5053

5154
from . import util
@@ -57,6 +60,7 @@
5760
Peekable,
5861
Receiver,
5962
Sender,
63+
SenderError,
6064
)
6165
from ._bidirectional import Bidirectional
6266
from ._broadcast import Broadcast
@@ -71,5 +75,6 @@
7175
"Peekable",
7276
"Receiver",
7377
"Sender",
78+
"SenderError",
7479
"util",
7580
]

src/frequenz/channels/_anycast.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from ._base_classes import ChannelClosedError
1313
from ._base_classes import Receiver as BaseReceiver
1414
from ._base_classes import Sender as BaseSender
15-
from ._base_classes import T
15+
from ._base_classes import SenderError, T
1616

1717

1818
class Anycast(Generic[T]):
@@ -123,7 +123,7 @@ def __init__(self, chan: Anycast[T]) -> None:
123123
"""
124124
self._chan = chan
125125

126-
async def send(self, msg: T) -> bool:
126+
async def send(self, msg: T) -> None:
127127
"""Send a message across the channel.
128128
129129
To send, this method inserts the message into the Anycast channel's
@@ -134,19 +134,21 @@ async def send(self, msg: T) -> bool:
134134
Args:
135135
msg: The message to be sent.
136136
137-
Returns:
138-
Whether the message was sent, based on whether the channel is open
139-
or not.
137+
Raises:
138+
SenderError: if the underlying channel was closed.
139+
A [ChannelClosedError][frequenz.channels.ChannelClosedError] is
140+
set as the cause.
140141
"""
141142
if self._chan.closed:
142-
return False
143+
raise SenderError("The channel was closed", self) from ChannelClosedError(
144+
self._chan
145+
)
143146
while len(self._chan.deque) == self._chan.deque.maxlen:
144147
async with self._chan.send_cv:
145148
await self._chan.send_cv.wait()
146149
self._chan.deque.append(msg)
147150
async with self._chan.recv_cv:
148151
self._chan.recv_cv.notify(1)
149-
return True
150152

151153

152154
class Receiver(BaseReceiver[T]):

src/frequenz/channels/_base_classes.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def __init__(self, message: Any):
2828

2929

3030
class ChannelError(Error):
31-
"""Base channel error.
31+
"""An error produced in a channel.
3232
3333
All exceptions generated by channels inherit from this exception.
3434
"""
@@ -56,19 +56,36 @@ def __init__(self, channel: Any = None):
5656
super().__init__(f"Channel {channel} was closed", channel)
5757

5858

59+
class SenderError(Error, Generic[T]):
60+
"""An error produced in a [Sender][frequenz.channels.Sender].
61+
62+
All exceptions generated by senders inherit from this exception.
63+
"""
64+
65+
def __init__(self, message: Any, sender: Sender[T]):
66+
"""Create an instance.
67+
68+
Args:
69+
message: An error message.
70+
sender: The [Sender][frequenz.channels.Sender] where the error
71+
happened.
72+
"""
73+
super().__init__(message)
74+
self.sender: Sender[T] = sender
75+
76+
5977
class Sender(ABC, Generic[T]):
6078
"""A channel Sender."""
6179

6280
@abstractmethod
63-
async def send(self, msg: T) -> bool:
81+
async def send(self, msg: T) -> None:
6482
"""Send a message to the channel.
6583
6684
Args:
6785
msg: The message to be sent.
6886
69-
Returns:
70-
Whether the message was sent, based on whether the channel is open
71-
or not.
87+
Raises:
88+
SenderError: if there was an error sending the message.
7289
"""
7390

7491

src/frequenz/channels/_bidirectional.py

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from typing import Generic, TypeVar
99

10-
from ._base_classes import Receiver, Sender, T, U
10+
from ._base_classes import ChannelError, Receiver, Sender, SenderError, T, U
1111
from ._broadcast import Broadcast
1212

1313
V = TypeVar("V")
@@ -23,26 +23,49 @@ class Handle(Sender[V], Receiver[W]):
2323
It can be used to send/receive values between the client and service.
2424
"""
2525

26-
def __init__(self, sender: Sender[V], receiver: Receiver[W]) -> None:
26+
def __init__(
27+
self,
28+
channel: Bidirectional[V, W] | Bidirectional[W, V],
29+
sender: Sender[V],
30+
receiver: Receiver[W],
31+
) -> None:
2732
"""Create a `Bidirectional.Handle` instance.
2833
2934
Args:
35+
channel: The underlying channel.
3036
sender: A sender to send values with.
3137
receiver: A receiver to receive values from.
3238
"""
39+
self._chan = channel
3340
self._sender = sender
3441
self._receiver = receiver
3542

36-
async def send(self, msg: V) -> bool:
43+
async def send(self, msg: V) -> None:
3744
"""Send a value to the other side.
3845
3946
Args:
4047
msg: The value to send.
4148
42-
Returns:
43-
Whether the send was successful or not.
49+
Raises:
50+
SenderError: if the underlying channel was closed.
51+
A [ChannelClosedError][frequenz.channels.ChannelClosedError]
52+
is set as the cause.
4453
"""
45-
return await self._sender.send(msg)
54+
try:
55+
await self._sender.send(msg)
56+
except SenderError as err:
57+
# If this comes from a channel error, then we inject another
58+
# ChannelError having the information about the Bidirectional
59+
# channel to hide (at least partially) the underlaying
60+
# Broadcast channels we use.
61+
if isinstance(err.__cause__, ChannelError):
62+
this_chan_error = ChannelError(
63+
f"Error in the underlying channel {err.__cause__.channel}: {err.__cause__}",
64+
self._chan, # pylint: disable=protected-access
65+
)
66+
this_chan_error.__cause__ = err.__cause__
67+
err.__cause__ = this_chan_error
68+
raise err
4669

4770
async def ready(self) -> None:
4871
"""Wait until the receiver is ready with a value."""
@@ -70,10 +93,12 @@ def __init__(self, client_id: str, service_id: str) -> None:
7093
)
7194

7295
self._client_handle = Bidirectional.Handle(
96+
self,
7397
self._request_channel.new_sender(),
7498
self._response_channel.new_receiver(),
7599
)
76100
self._service_handle = Bidirectional.Handle(
101+
self,
77102
self._response_channel.new_sender(),
78103
self._request_channel.new_receiver(),
79104
)

src/frequenz/channels/_broadcast.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from ._base_classes import Peekable as BasePeekable
1717
from ._base_classes import Receiver as BaseReceiver
1818
from ._base_classes import Sender as BaseSender
19-
from ._base_classes import T
19+
from ._base_classes import SenderError, T
2020

2121
logger = logging.Logger(__name__)
2222

@@ -165,18 +165,21 @@ def __init__(self, chan: Broadcast[T]) -> None:
165165
"""
166166
self._chan = chan
167167

168-
async def send(self, msg: T) -> bool:
168+
async def send(self, msg: T) -> None:
169169
"""Send a message to all broadcast receivers.
170170
171171
Args:
172172
msg: The message to be broadcast.
173173
174-
Returns:
175-
Whether the message was sent, based on whether the broadcast
176-
channel is open or not.
174+
Raises:
175+
SenderError: if the underlying channel was closed.
176+
A [ChannelClosedError][frequenz.channels.ChannelClosedError] is
177+
set as the cause.
177178
"""
178179
if self._chan.closed:
179-
return False
180+
raise SenderError("The channel was closed", self) from ChannelClosedError(
181+
self._chan
182+
)
180183
# pylint: disable=protected-access
181184
self._chan._latest = msg
182185
stale_refs = []
@@ -190,7 +193,6 @@ async def send(self, msg: T) -> bool:
190193
del self._chan.receivers[name]
191194
async with self._chan.recv_cv:
192195
self._chan.recv_cv.notify_all()
193-
return True
194196

195197

196198
class Receiver(BaseReceiver[T]):

tests/test_anycast.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import pytest
99

10-
from frequenz.channels import Anycast, ChannelClosedError, Receiver, Sender
10+
from frequenz.channels import Anycast, ChannelClosedError, Receiver, Sender, SenderError
1111

1212

1313
async def test_anycast() -> None:
@@ -58,7 +58,8 @@ async def update_tracker_on_receive(receiver_id: int, chan: Receiver[int]) -> No
5858
await acast.close()
5959
await receivers_runs
6060

61-
assert await after_close_sender.send(5) is False
61+
with pytest.raises(SenderError):
62+
await after_close_sender.send(5)
6263
with pytest.raises(ChannelClosedError):
6364
await after_close_receiver.receive()
6465

@@ -77,11 +78,13 @@ async def test_anycast_after_close() -> None:
7778
receiver = acast.new_receiver()
7879
sender = acast.new_sender()
7980

80-
assert await sender.send(2) is True
81+
await sender.send(2)
8182

8283
await acast.close()
8384

84-
assert await sender.send(5) is False
85+
with pytest.raises(SenderError):
86+
await sender.send(5)
87+
8588
assert await receiver.receive() == 2
8689
with pytest.raises(ChannelClosedError):
8790
await receiver.receive()

tests/test_bidirectional.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,14 @@
55

66
import asyncio
77

8-
from frequenz.channels import Bidirectional
8+
import pytest
9+
10+
from frequenz.channels import (
11+
Bidirectional,
12+
ChannelClosedError,
13+
ChannelError,
14+
SenderError,
15+
)
916

1017

1118
async def test_request_response() -> None:
@@ -41,3 +48,20 @@ async def service(handle: Bidirectional.Handle[str, int]) -> None:
4148

4249
await client_handle.send(42) # Stop the service task
4350
await service_task
51+
52+
53+
async def test_sender_error_chaining() -> None:
54+
"""Ensure bi-directional communication is possible."""
55+
56+
req_resp: Bidirectional[int, str] = Bidirectional("test_client", "test_service")
57+
58+
await req_resp._response_channel.close() # pylint: disable=protected-access
59+
60+
with pytest.raises(SenderError, match="The channel was closed") as exc_info:
61+
await req_resp.service_handle.send("I'm closed!")
62+
63+
err = exc_info.value
64+
cause = err.__cause__
65+
assert isinstance(cause, ChannelError)
66+
assert cause.args[0].startswith("Error in the underlying channel")
67+
assert isinstance(cause.__cause__, ChannelClosedError)

tests/test_broadcast.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,13 @@
88

99
import pytest
1010

11-
from frequenz.channels import Broadcast, ChannelClosedError, Receiver, Sender
11+
from frequenz.channels import (
12+
Broadcast,
13+
ChannelClosedError,
14+
Receiver,
15+
Sender,
16+
SenderError,
17+
)
1218

1319

1420
async def test_broadcast() -> None:
@@ -68,7 +74,8 @@ async def test_broadcast_after_close() -> None:
6874

6975
await bcast.close()
7076

71-
assert await sender.send(5) is False
77+
with pytest.raises(SenderError):
78+
await sender.send(5)
7279
with pytest.raises(ChannelClosedError):
7380
await receiver.receive()
7481

0 commit comments

Comments
 (0)