Skip to content

Commit d237753

Browse files
committed
Make Merge public again
The `Merge` receiver was made private to try to hide the implementation details to the user, but it was a bad idea because the `Merge` receiver has more methods than a plain `Receiver`, in particular `stop()`, which users need to use for guaranteed cleanup. This also means now the `merge()` function returns the `Merge` instance instead of an abstract `Receiver`. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent 77e8c89 commit d237753

File tree

3 files changed

+15
-22
lines changed

3 files changed

+15
-22
lines changed

src/frequenz/channels/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@
7777
from ._anycast import Anycast
7878
from ._broadcast import Broadcast
7979
from ._exceptions import ChannelClosedError, ChannelError, Error
80-
from ._merge import merge
80+
from ._merge import Merger, merge
8181
from ._receiver import Receiver, ReceiverError, ReceiverStoppedError
8282
from ._select import (
8383
Selected,
@@ -95,6 +95,7 @@
9595
"ChannelClosedError",
9696
"ChannelError",
9797
"Error",
98+
"Merger",
9899
"Receiver",
99100
"ReceiverError",
100101
"ReceiverStoppedError",

src/frequenz/channels/_merge.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
"""Merge messages coming from channels into a single stream."""
55

6+
from __future__ import annotations
7+
68
import asyncio
79
import itertools
810
from collections import deque
@@ -13,7 +15,7 @@
1315
_T = TypeVar("_T")
1416

1517

16-
def merge(*receivers: Receiver[_T]) -> Receiver[_T]:
18+
def merge(*receivers: Receiver[_T]) -> Merger[_T]:
1719
"""Merge messages coming from multiple receivers into a single stream.
1820
1921
Example:
@@ -46,19 +48,19 @@ def merge(*receivers: Receiver[_T]) -> Receiver[_T]:
4648
if not receivers:
4749
raise ValueError("At least one receiver must be provided")
4850

49-
# This is just a small optimization to avoid creating a merge receiver when it is
50-
# not really needed.
51-
if len(receivers) == 1:
52-
return receivers[0]
51+
return Merger(*receivers, name="merge")
5352

54-
return _Merge(*receivers, name="merge")
5553

54+
class Merger(Receiver[_T]):
55+
"""A receiver that merges messages coming from multiple receivers into a single stream.
5656
57-
class _Merge(Receiver[_T]):
58-
"""A receiver that merges messages coming from multiple receivers into a single stream."""
57+
Tip:
58+
Please consider using the more idiomatic [`merge()`][frequenz.channels.merge]
59+
function instead of creating a `Merger` instance directly.
60+
"""
5961

6062
def __init__(self, *receivers: Receiver[_T], name: str | None) -> None:
61-
"""Create a `_Merge` instance.
63+
"""Create a `Merger` instance.
6264
6365
Args:
6466
*receivers: The receivers to merge.
@@ -82,7 +84,7 @@ def __del__(self) -> None:
8284
task.cancel()
8385

8486
async def stop(self) -> None:
85-
"""Stop the `_Merge` instance and cleanup any pending tasks."""
87+
"""Stop the `Merger` instance and cleanup any pending tasks."""
8688
for task in self._pending:
8789
task.cancel()
8890
await asyncio.gather(*self._pending, return_exceptions=True)

tests/test_merge.py

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,12 @@
33

44
"""Tests for the merge implementation."""
55

6-
from unittest import mock
7-
86
import pytest
97

10-
from frequenz.channels import Receiver, merge
8+
from frequenz.channels import merge
119

1210

1311
async def test_empty() -> None:
1412
"""Ensure merge() raises an exception when no receivers are provided."""
1513
with pytest.raises(ValueError, match="At least one receiver must be provided"):
1614
merge()
17-
18-
19-
async def test_one() -> None:
20-
"""Ensure merge() returns the same receiver when only one is provided."""
21-
receiver = mock.MagicMock(spec=Receiver[int])
22-
23-
merge_receiver: Receiver[int] = merge(receiver)
24-
assert merge_receiver is receiver

0 commit comments

Comments
 (0)