Skip to content

Commit 8811205

Browse files
committed
Improve the _merge module description
Explain how to use the `merge()` function and the `Merger` type on a higher level than the function documentation. This module is not publicly available, so users won't be able to access the information in IDEs for example, but it will be rendered as part of the User Guide. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 6ec641a commit 8811205

File tree

1 file changed

+45
-1
lines changed

1 file changed

+45
-1
lines changed

src/frequenz/channels/_merge.py

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,51 @@
11
# License: MIT
22
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
33

4-
"""Merge messages coming from channels into a single stream."""
4+
"""Merge messages coming from multiple receivers into a single receiver.
5+
6+
# Usage
7+
8+
If you just need to receive the same type of messages but from multiple sources in one
9+
stream, you can use [`merge()`][frequenz.channels.merge] to create a new receiver that
10+
will receive messages from all the given receivers:
11+
12+
```python
13+
from frequenz.channels import Anycast, Receiver, merge
14+
15+
channel1: Anycast[int] = Anycast(name="channel1")
16+
channel2: Anycast[int] = Anycast(name="channel2")
17+
receiver1 = channel1.new_receiver()
18+
receiver2 = channel2.new_receiver()
19+
20+
async for value in merge(receiver1, receiver2):
21+
print(value)
22+
```
23+
24+
If the first message comes from `channel2` and the second message from `channel1`, the
25+
first message will be received immediately, and the second message will be received as
26+
soon as it is available.
27+
28+
This is extremely convenient when you just need to receive messages and don't care about
29+
where are they coming from specifically. If you need to know where the message came
30+
from, you can use [`select()`][frequenz.channels.select] instead.
31+
32+
# Stopping
33+
34+
A merge receiver will be stopped automatically when all the receivers that it merges are
35+
stopped. When using the async iterator interface, this means that the iterator will stop
36+
as soon as all the receivers are stopped. When using
37+
[`receive()`][frequenz.channels.Receiver.receive], this means that the method will raise
38+
a [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError] exception as soon as
39+
all the receivers are stopped.
40+
41+
If you want to stop a merge receiver manually, you can use the
42+
[`stop()`][frequenz.channels.Merger.stop] method.
43+
44+
When using [`receive()`][frequenz.channels.Receiver.receive], you should make sure to
45+
either stop all the receivers that you are merging, or to stop the merge receiver
46+
manually. This is to make sure that all the tasks created by the merge receiver are
47+
cleaned up properly.
48+
"""
549

650
from __future__ import annotations
751

0 commit comments

Comments
 (0)