Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions .darglint

This file was deleted.

2 changes: 2 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@

Replaced by the new `merge()` function. When replacing `Merge` with `merge()` please keep in mind that this new function will raise a `ValueError` if no receivers are passed to it.

Please note that the old `Merge` class is still also available but it was renamed to `Merger` to avoid confusion with the new `merge()` function, but it is only present for typing reasons and should not be used directly.

* `MergeNamed`

This class was redundant, use either the new `merge()` function or `select()` instead.
Expand Down
2 changes: 0 additions & 2 deletions docs/_scripts/macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,5 @@ def define_env(env: macros.MacrosPlugin) -> None:
# https://squidfunk.github.io/mkdocs-material/reference/code-blocks/#adding-annotations
env.variables["code_annotation_marker"] = _CODE_ANNOTATION_MARKER

# TODO(cookiecutter): Add any other macros, variables and filters here.

# This hook needs to be done at the end of the `define_env` function.
_hook_macros_plugin(env)
3 changes: 2 additions & 1 deletion src/frequenz/channels/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
from ._anycast import Anycast
from ._broadcast import Broadcast
from ._exceptions import ChannelClosedError, ChannelError, Error
from ._merge import merge
from ._merge import Merger, merge
from ._receiver import Receiver, ReceiverError, ReceiverStoppedError
from ._select import (
Selected,
Expand All @@ -95,6 +95,7 @@
"ChannelClosedError",
"ChannelError",
"Error",
"Merger",
"Receiver",
"ReceiverError",
"ReceiverStoppedError",
Expand Down
22 changes: 12 additions & 10 deletions src/frequenz/channels/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

"""Merge messages coming from channels into a single stream."""

from __future__ import annotations

import asyncio
import itertools
from collections import deque
Expand All @@ -13,7 +15,7 @@
_T = TypeVar("_T")


def merge(*receivers: Receiver[_T]) -> Receiver[_T]:
def merge(*receivers: Receiver[_T]) -> Merger[_T]:
"""Merge messages coming from multiple receivers into a single stream.

Example:
Expand Down Expand Up @@ -46,19 +48,19 @@ def merge(*receivers: Receiver[_T]) -> Receiver[_T]:
if not receivers:
raise ValueError("At least one receiver must be provided")

# This is just a small optimization to avoid creating a merge receiver when it is
# not really needed.
if len(receivers) == 1:
return receivers[0]
return Merger(*receivers, name="merge")

return _Merge(*receivers, name="merge")

class Merger(Receiver[_T]):
"""A receiver that merges messages coming from multiple receivers into a single stream.

class _Merge(Receiver[_T]):
"""A receiver that merges messages coming from multiple receivers into a single stream."""
Tip:
Please consider using the more idiomatic [`merge()`][frequenz.channels.merge]
function instead of creating a `Merger` instance directly.
"""

def __init__(self, *receivers: Receiver[_T], name: str | None) -> None:
"""Create a `_Merge` instance.
"""Create a `Merger` instance.

Args:
*receivers: The receivers to merge.
Expand All @@ -82,7 +84,7 @@ def __del__(self) -> None:
task.cancel()

async def stop(self) -> None:
"""Stop the `_Merge` instance and cleanup any pending tasks."""
"""Stop the `Merger` instance and cleanup any pending tasks."""
for task in self._pending:
task.cancel()
await asyncio.gather(*self._pending, return_exceptions=True)
Expand Down
1 change: 0 additions & 1 deletion src/frequenz/channels/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ def calculate_next_tick_time(
The next tick time (in microseconds) according to
`missed_tick_policy`.
"""
return 0 # dummy value to avoid darglint warnings

def __repr__(self) -> str:
"""Return a string representation of the instance.
Expand Down
12 changes: 1 addition & 11 deletions tests/test_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,12 @@

"""Tests for the merge implementation."""

from unittest import mock

import pytest

from frequenz.channels import Receiver, merge
from frequenz.channels import merge


async def test_empty() -> None:
"""Ensure merge() raises an exception when no receivers are provided."""
with pytest.raises(ValueError, match="At least one receiver must be provided"):
merge()


async def test_one() -> None:
"""Ensure merge() returns the same receiver when only one is provided."""
receiver = mock.MagicMock(spec=Receiver[int])

merge_receiver: Receiver[int] = merge(receiver)
assert merge_receiver is receiver