Skip to content

Commit 55f3608

Browse files
committed
Improve the _receiver module description
Give a general introduction to receivers, explaining how one usually create them, how to map values, how to handle receiving errors and how to use the low-level methods. This module is not publicly available, so users won't be able to access the information in IDEs for example, but it will be rendered as part of the User Guide. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent b8e4624 commit 55f3608

File tree

1 file changed

+131
-2
lines changed

1 file changed

+131
-2
lines changed

src/frequenz/channels/_receiver.py

Lines changed: 131 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,136 @@
11
# License: MIT
22
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
33

4-
"""Channel receiver and associated exceptions."""
4+
"""Receiver interface and related exceptions.
5+
6+
# Receivers
7+
8+
Messages are received from [channels](/user-guide/channels/index.md) through
9+
[Receiver][frequenz.channels.Receiver] objects. [Receivers][frequenz.channels.Receiver]
10+
are usually created by calling `channel.new_receiver()` and are [async
11+
iterators][typing.AsyncIterator], so the easiest way to receive values from them as
12+
a stream is to use `async for`:
13+
14+
```python
15+
from frequenz.channels import Anycast
16+
17+
channel = Anycast[int](name="test-channel")
18+
receiver = channel.new_receiver()
19+
20+
async for value in receiver:
21+
print(value)
22+
```
23+
24+
If you need to receive values in different places or expecting a particular
25+
sequence, you can use the [`receive()`][frequenz.channels.Receiver.receive] method:
26+
27+
```python
28+
from frequenz.channels import Anycast
29+
30+
channel = Anycast[int](name="test-channel")
31+
receiver = channel.new_receiver()
32+
33+
first_value = await receiver.receive()
34+
print(f"First value: {first_value}")
35+
36+
second_value = await receiver.receive()
37+
print(f"Second value: {second_value}")
38+
```
39+
40+
# Value Transformation
41+
42+
If you need to transform the received values, receivers provide a
43+
[`map()`][frequenz.channels.Receiver.map] method to easily do so:
44+
45+
```python
46+
from frequenz.channels import Anycast
47+
48+
channel = Anycast[int](name="test-channel")
49+
receiver = channel.new_receiver()
50+
51+
async for value in receiver.map(lambda x: x + 1):
52+
print(value)
53+
```
54+
55+
[`map()`][frequenz.channels.Receiver.map] returns a new full receiver, so you can
56+
use it in any of the ways described above.
57+
58+
# Error Handling
59+
60+
!!! Tip inline end
61+
62+
For more information about handling errors, please refer to the
63+
[Error Handling](/user-guide/error-handling/) section of the user guide.
64+
65+
If there is an error while receiving a message,
66+
a [`ReceiverError`][frequenz.channels.ReceiverError] exception is raised for both
67+
[`receive()`][frequenz.channels.Receiver.receive] method and async iteration
68+
interface.
69+
70+
If the receiver has completely stopped (for example the underlying channel was
71+
closed), a [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError] exception
72+
is raised by [`receive()`][frequenz.channels.Receiver.receive] method.
73+
74+
```python
75+
from frequenz.channels import Anycast
76+
77+
channel = Anycast[int](name="test-channel")
78+
receiver = channel.new_receiver()
79+
80+
try:
81+
await receiver.receive()
82+
except ReceiverStoppedError as error:
83+
print("The receiver was stopped")
84+
except ReceiverError as error:
85+
print(f"There was an error trying to receive: {error}")
86+
```
87+
88+
When used as an async iterator, the iteration will just stop without raising an
89+
exception:
90+
91+
```python
92+
from frequenz.channels import Anycast
93+
94+
channel = Anycast[int](name="test-channel")
95+
receiver = channel.new_receiver()
96+
97+
try:
98+
async for value in receiver:
99+
print(value)
100+
except ReceiverStoppedError as error:
101+
print("Will never happen")
102+
except ReceiverError as error:
103+
print(f"There was an error trying to receive: {error}")
104+
# If we get here, the receiver was stopped
105+
```
106+
107+
# Advanced Usage
108+
109+
!!! Warning inline end
110+
111+
This section is intended for library developers that want to build other low-level
112+
abstractions on top of channels. If you are just using channels, you can safely
113+
ignore this section.
114+
115+
Receivers extend on the [async iterator protocol][typing.AsyncIterator] by providing
116+
a [`ready()`][frequenz.channels.Receiver.ready] and
117+
a [`consume()`][frequenz.channels.Receiver.consume] method.
118+
119+
The [`ready()`][frequenz.channels.Receiver.ready] method is used to await until the
120+
receiver has a new value available, but without actually consuming it. The
121+
[`consume()`][frequenz.channels.Receiver.consume] method consumes the next available
122+
value and returns it.
123+
124+
[`ready()`][frequenz.channels.Receiver.ready] can be called multiple times, and it
125+
will return immediately if the receiver is already ready.
126+
[`consume()`][frequenz.channels.Receiver.consume] must be called only after
127+
[`ready()`][frequenz.channels.Receiver.ready] is done and only once, until the next
128+
call to [`ready()`][frequenz.channels.Receiver.ready].
129+
130+
Exceptions are never raised by [`ready()`][frequenz.channels.Receiver.ready], they
131+
are always delayed until [`consume()`][frequenz.channels.Receiver.consume] is
132+
called.
133+
"""
5134

6135
from __future__ import annotations
7136

@@ -16,7 +145,7 @@
16145

17146

18147
class Receiver(ABC, Generic[_T]):
19-
"""A channel Receiver."""
148+
"""An endpoint to receive messages."""
20149

21150
async def __anext__(self) -> _T:
22151
"""Await the next value in the async iteration over received values.

0 commit comments

Comments
 (0)