Skip to content

Commit 9266229

Browse files
authored
Replace Merge and MergeNamed with merge() (#238)
- Remove `MergeNamed` - Add a `merge()` wrapper function that replaces `Merge` - Rename `Merge` to `_Merge` to make it private more explicitly - Rename `_Merge` variable `args` to `receivers` - Improve string representation of merge receiver - Raise an exception is `merge()` is called with less than two receivers Fixes #236, fixes #237.
2 parents 39ade4e + 0858233 commit 9266229

File tree

10 files changed

+91
-215
lines changed

10 files changed

+91
-215
lines changed

.github/ISSUE_TEMPLATE/bug.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ body:
5252
- Build script, CI, dependencies, etc. (part:tooling)
5353
- Channels, `Broadcast`, `Anycast`, etc. (part:channels)
5454
- Select (part:select)
55-
- Utility receivers, `Merge`, etc. (part:receivers)
55+
- Utility receivers, `Event`, `FileWatcher`, `Timer`, etc. (part:receivers)
5656
validations:
5757
required: true
5858
- type: textarea

RELEASE_NOTES.md

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,6 @@
5353

5454
* The following symbols were moved to the top-level `frequenz.channels` package:
5555

56-
- `Merge`
57-
- `MergeNamed`
5856
- `Selected`
5957
- `SelectError`
6058
- `SelectErrorGroup`
@@ -68,6 +66,14 @@
6866

6967
This channel was removed as it is not recommended practice and was a niche use case. If you need to use it, you can set up two channels or copy the `Bidirectional` class from the previous version to your project.
7068

69+
* `Merge`
70+
71+
Replaced by the new `merge()` function. When replacing `Merge` with `merge()` please keep in mind that this new function will raise a `ValueError` if no receivers are passed to it.
72+
73+
* `MergeNamed`
74+
75+
This class was redundant, use either the new `merge()` function or `select()` instead.
76+
7177
* `Peekable`
7278

7379
This class was removed because it was merely a shortcut to a receiver that caches the last value received. It did not fit the channel abstraction well and was infrequently used.
@@ -92,6 +98,8 @@
9298

9399
## New Features
94100

101+
* A new `merge()` function was added to replace `Merge`.
102+
95103
* `Anycast`
96104

97105
- The following new read-only properties were added:
@@ -126,14 +134,6 @@
126134

127135
- A more useful implementation of `__str__ and `__repr__` were added.
128136

129-
* `Merge`
130-
131-
- A more useful implementation of `__str__ and `__repr__` were added.
132-
133-
* `MergeNamed`
134-
135-
- A more useful implementation of `__str__ and `__repr__` were added.
136-
137137
* `Peekable`
138138

139139
- A more useful implementation of `__str__ and `__repr__` were added.

src/frequenz/channels/__init__.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,10 @@
2626
2727
Utilities to work with channels:
2828
29-
* [Merge][frequenz.channels.Merge] and [MergeNamed][frequenz.channels.MergeNamed]:
30-
[Receivers][frequenz.channels.Receiver] that merge messages coming from multiple
31-
receivers into a single stream.
29+
* [merge][frequenz.channels.merge]: Merge messages coming from multiple receivers into
30+
a single stream.
3231
33-
* [select][frequenz.channels.select]: Iterate over the values of all
32+
* [select][frequenz.channels.select]: Iterate over the values of all
3433
[receivers][frequenz.channels.Receiver] as new values become available.
3534
3635
Exception classes:
@@ -78,8 +77,7 @@
7877
from ._anycast import Anycast
7978
from ._broadcast import Broadcast
8079
from ._exceptions import ChannelClosedError, ChannelError, Error
81-
from ._merge import Merge
82-
from ._merge_named import MergeNamed
80+
from ._merge import merge
8381
from ._receiver import Receiver, ReceiverError, ReceiverStoppedError
8482
from ._select import (
8583
Selected,
@@ -97,8 +95,6 @@
9795
"ChannelClosedError",
9896
"ChannelError",
9997
"Error",
100-
"Merge",
101-
"MergeNamed",
10298
"Receiver",
10399
"ReceiverError",
104100
"ReceiverStoppedError",
@@ -108,6 +104,7 @@
108104
"Sender",
109105
"SenderError",
110106
"UnhandledSelectedError",
107+
"merge",
111108
"select",
112109
"selected_from",
113110
]

src/frequenz/channels/_anycast.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,8 @@ class Anycast(Generic[_T]):
3838
thread-safe.
3939
4040
When there are multiple channel receivers, they can be awaited
41-
simultaneously using [select][frequenz.channels.select],
42-
[Merge][frequenz.channels.Merge] or
43-
[MergeNamed][frequenz.channels.MergeNamed].
41+
simultaneously using [select][frequenz.channels.select] or
42+
[merge][frequenz.channels.merge].
4443
4544
Example:
4645
``` python

src/frequenz/channels/_broadcast.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,8 @@ class Broadcast(Generic[_T]):
3232
are thread-safe. Because of this, `Broadcast` channels are thread-safe.
3333
3434
When there are multiple channel receivers, they can be awaited
35-
simultaneously using [select][frequenz.channels.select],
36-
[Merge][frequenz.channels.Merge] or
37-
[MergeNamed][frequenz.channels.MergeNamed].
35+
simultaneously using [select][frequenz.channels.select] or
36+
[merge][frequenz.channels.merge].
3837
3938
Example:
4039
``` python

src/frequenz/channels/_merge.py

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@
1313
_T = TypeVar("_T")
1414

1515

16-
class Merge(Receiver[_T]):
17-
"""Merge messages coming from multiple channels into a single stream.
16+
def merge(*receivers: Receiver[_T]) -> Receiver[_T]:
17+
"""Merge messages coming from multiple receivers into a single stream.
1818
1919
Example:
2020
For example, if there are two channel receivers with the same type,
2121
they can be awaited together, and their results merged into a single
22-
stream, by using `Merge` like this:
22+
stream like this:
2323
2424
```python
2525
from frequenz.channels import Broadcast
@@ -29,25 +29,46 @@ class Merge(Receiver[_T]):
2929
receiver1 = channel1.new_receiver()
3030
receiver2 = channel2.new_receiver()
3131
32-
merge = Merge(receiver1, receiver2)
33-
while msg := await merge.receive():
34-
# do something with msg
35-
pass
32+
async for msg in merge(receiver1, receiver2):
33+
print(f"received {msg}")
3634
```
3735
38-
When `merge` is no longer needed, then it should be stopped using
39-
`self.stop()` method. This will cleanup any internal pending async tasks.
36+
Args:
37+
*receivers: The receivers to merge.
38+
39+
Returns:
40+
A receiver that merges the messages coming from multiple receivers into a
41+
single stream.
42+
43+
Raises:
44+
ValueError: if no receivers are provided.
4045
"""
46+
if not receivers:
47+
raise ValueError("At least one receiver must be provided")
48+
49+
# This is just a small optimization to avoid creating a merge receiver when it is
50+
# not really needed.
51+
if len(receivers) == 1:
52+
return receivers[0]
53+
54+
return _Merge(*receivers, name="merge")
55+
56+
57+
class _Merge(Receiver[_T]):
58+
"""A receiver that merges messages coming from multiple receivers into a single stream."""
4159

42-
def __init__(self, *args: Receiver[_T]) -> None:
43-
"""Create a `Merge` instance.
60+
def __init__(self, *receivers: Receiver[_T], name: str | None) -> None:
61+
"""Create a `_Merge` instance.
4462
4563
Args:
46-
*args: sequence of channel receivers.
64+
*receivers: The receivers to merge.
65+
name: The name of the receiver. Used to create the string representation
66+
of the receiver.
4767
"""
4868
self._receivers: dict[str, Receiver[_T]] = {
49-
str(id): recv for id, recv in enumerate(args)
69+
str(id): recv for id, recv in enumerate(receivers)
5070
}
71+
self._name: str = name if name is not None else type(self).__name__
5172
self._pending: set[asyncio.Task[Any]] = {
5273
asyncio.create_task(anext(recv), name=name)
5374
for name, recv in self._receivers.items()
@@ -61,7 +82,7 @@ def __del__(self) -> None:
6182
task.cancel()
6283

6384
async def stop(self) -> None:
64-
"""Stop the `Merge` instance and cleanup any pending tasks."""
85+
"""Stop the `_Merge` instance and cleanup any pending tasks."""
6586
for task in self._pending:
6687
task.cancel()
6788
await asyncio.gather(*self._pending, return_exceptions=True)
@@ -127,11 +148,11 @@ def __str__(self) -> str:
127148
receivers.append("…")
128149
else:
129150
receivers = [str(p) for p in self._receivers.values()]
130-
return f"{type(self).__name__}:{','.join(receivers)}"
151+
return f"{self._name}:{','.join(receivers)}"
131152

132153
def __repr__(self) -> str:
133154
"""Return a string representation of this receiver."""
134155
return (
135-
f"{type(self).__name__}("
156+
f"{self._name}("
136157
f"{', '.join(f'{k}={v!r}' for k, v in self._receivers.items())})"
137158
)

src/frequenz/channels/_merge_named.py

Lines changed: 0 additions & 121 deletions
This file was deleted.

src/frequenz/channels/_select.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -258,10 +258,8 @@ async def select(*receivers: Receiver[Any]) -> AsyncIterator[Selected[Any]]:
258258
receivers from a select loop, there are a few alternatives. Depending on your
259259
use case, one or the other could work better for you:
260260
261-
* Use [`Merge`][frequenz.channels.Merge] or
262-
[`MergeNamed`][frequenz.channels.MergeNamed]: this is useful when you
263-
have and unknown number of receivers of the same type that can be handled as
264-
a group.
261+
* Use [`merge()`][frequenz.channels.merge]: this is useful when you have an
262+
unknown number of receivers of the same type that can be handled as a group.
265263
* Use tasks to manage each receiver individually: this is better if there are no
266264
relationships between the receivers.
267265
* Break the `select()` loop and start a new one with the new set of receivers

0 commit comments

Comments
 (0)