|
1 | 1 | # License: MIT |
2 | 2 | # Copyright © 2022 Frequenz Energy-as-a-Service GmbH |
3 | 3 |
|
4 | | -"""Merge messages coming from channels into a single stream.""" |
| 4 | +"""Merge messages coming from multiple receivers into a single receiver. |
| 5 | +
|
| 6 | +# Usage |
| 7 | +
|
| 8 | +If you just need to receive the same type of messages but from multiple sources in one |
| 9 | +stream, you can use [`merge()`][frequenz.channels.merge] to create a new receiver that |
| 10 | +will receive messages from all the given receivers: |
| 11 | +
|
| 12 | +```python |
| 13 | +from frequenz.channels import Anycast, Receiver, merge |
| 14 | +
|
| 15 | +channel1: Anycast[int] = Anycast(name="channel1") |
| 16 | +channel2: Anycast[int] = Anycast(name="channel2") |
| 17 | +receiver1 = channel1.new_receiver() |
| 18 | +receiver2 = channel2.new_receiver() |
| 19 | +
|
| 20 | +async for value in merge(receiver1, receiver2): |
| 21 | + print(value) |
| 22 | +``` |
| 23 | +
|
| 24 | +If the first message comes from `channel2` and the second message from `channel1`, the |
| 25 | +first message will be received immediately, and the second message will be received as |
| 26 | +soon as it is available. |
| 27 | +
|
| 28 | +This can be helpful when you just need to receive messages and don't care about |
| 29 | +where are they coming from specifically. If you need to know where the message came |
| 30 | +from, you can use [`select()`][frequenz.channels.select] instead. |
| 31 | +
|
| 32 | +# Stopping |
| 33 | +
|
| 34 | +A merge receiver will be stopped automatically when all the receivers that it merges are |
| 35 | +stopped. When using the async iterator interface, this means that the iterator will stop |
| 36 | +as soon as all the receivers are stopped. When using |
| 37 | +[`receive()`][frequenz.channels.Receiver.receive], this means that the method will raise |
| 38 | +a [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError] exception as soon as |
| 39 | +all the receivers are stopped. |
| 40 | +
|
| 41 | +If you want to stop a merge receiver manually, you can use the |
| 42 | +[`stop()`][frequenz.channels.Merger.stop] method. |
| 43 | +
|
| 44 | +When using [`receive()`][frequenz.channels.Receiver.receive], you should make sure to |
| 45 | +either stop all the receivers that you are merging, or to stop the merge receiver |
| 46 | +manually. This is to make sure that all the tasks created by the merge receiver are |
| 47 | +cleaned up properly. |
| 48 | +""" |
5 | 49 |
|
6 | 50 | from __future__ import annotations |
7 | 51 |
|
|
0 commit comments