Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/benchmark_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def time_async_task(task: Coroutine[Any, Any, int]) -> tuple[float, Any]:
return timeit.default_timer() - start, ret


# pylint: disable=too-many-arguments
# pylint: disable=too-many-arguments,too-many-positional-arguments
def run_one(
benchmark_method: Callable[[int, int, int], Coroutine[Any, Any, int]],
num_channels: int,
Expand Down
16 changes: 8 additions & 8 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ dev-flake8 = [
"flake8 == 7.1.1",
"flake8-docstrings == 1.7.0",
"flake8-pyproject == 1.2.3", # For reading the flake8 config from pyproject.toml
"pydoclint == 0.5.6",
"pydoclint == 0.5.9",
"pydocstyle == 6.3.0",
]
dev-formatting = ["black == 24.8.0", "isort == 5.13.2"]
Expand All @@ -54,10 +54,10 @@ dev-mkdocs = [
"mkdocs-gen-files == 0.5.0",
"mkdocs-include-markdown-plugin == 6.2.2",
"mkdocs-literate-nav == 0.6.1",
"mkdocs-macros-plugin == 1.0.5",
"mkdocs-material == 9.5.34",
"mkdocstrings[python] == 0.26.0",
"mkdocstrings-python == 1.10.9",
"mkdocs-macros-plugin == 1.2.0",
"mkdocs-material == 9.5.39",
"mkdocstrings[python] == 0.26.1",
"mkdocstrings-python == 1.11.1",
"pymdownx-superfence-filter-lines == 0.1.0",
]
dev-mypy = [
Expand All @@ -70,13 +70,13 @@ dev-noxfile = ["nox == 2024.4.15", "frequenz-repo-config[lib] == 0.10.0"]
dev-pylint = [
# For checking the noxfile, docs/ script, and tests
"frequenz-channels[dev-mkdocs,dev-noxfile,dev-pytest]",
"pylint == 3.2.7",
"pylint == 3.3.1",
]
dev-pytest = [
"async-solipsism == 0.7",
"frequenz-repo-config[extra-lint-examples] == 0.10.0",
"hypothesis == 6.111.2",
"pytest == 8.3.2",
"hypothesis == 6.112.2",
"pytest == 8.3.3",
"pytest-asyncio == 0.24.0",
"pytest-mock == 3.14.0",
]
Expand Down
1 change: 0 additions & 1 deletion src/frequenz/channels/_anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,6 @@ def consume(self) -> _T:

Raises:
ReceiverStoppedError: If the receiver stopped producing messages.
ReceiverError: If there is some problem with the receiver.
"""
if ( # pylint: disable=protected-access
self._next is _Empty and self._channel._closed
Expand Down
1 change: 0 additions & 1 deletion src/frequenz/channels/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ def consume(self) -> ReceiverMessageT_co:

Raises:
ReceiverStoppedError: If the receiver stopped producing messages.
ReceiverError: If there is some problem with the receiver.
"""
if not self._results and not self._pending:
raise ReceiverStoppedError(self)
Expand Down
20 changes: 11 additions & 9 deletions src/frequenz/channels/_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@
class Receiver(ABC, Generic[ReceiverMessageT_co]):
"""An endpoint to receive messages."""

async def __anext__(self) -> ReceiverMessageT_co:
# We need the noqa here because ReceiverError can be raised by ready() and consume()
# implementations.
async def __anext__(self) -> ReceiverMessageT_co: # noqa: DOC503
"""Await the next message in the async iteration over received messages.

Returns:
Expand Down Expand Up @@ -215,7 +217,9 @@ def __aiter__(self) -> Self:
"""
return self

async def receive(self) -> ReceiverMessageT_co:
# We need the noqa here because ReceiverError can be raised by consume()
# implementations.
async def receive(self) -> ReceiverMessageT_co: # noqa: DOC503
"""Receive a message.

Returns:
Expand All @@ -226,19 +230,18 @@ async def receive(self) -> ReceiverMessageT_co:
ReceiverError: If there is some problem with the receiver.
"""
try:
received = await self.__anext__() # pylint: disable=unnecessary-dunder-call
received = await anext(self)
except StopAsyncIteration as exc:
# If we already had a cause and it was the receiver was stopped,
# then reuse that error, as StopAsyncIteration is just an artifact
# introduced by __anext__.
if (
isinstance(exc.__cause__, ReceiverStoppedError)
# pylint is not smart enough to figure out we checked above
# this is a ReceiverStoppedError and thus it does have
# a receiver member
and exc.__cause__.receiver is self # pylint: disable=no-member
and exc.__cause__.receiver is self
):
raise exc.__cause__
# This is a false positive, we are actually checking __cause__ is a
# ReceiverStoppedError which is an exception.
raise exc.__cause__ # pylint: disable=raising-non-exception
raise ReceiverStoppedError(self) from exc
return received

Expand Down Expand Up @@ -450,7 +453,6 @@ def consume(self) -> ReceiverMessageT_co:

Raises:
ReceiverStoppedError: If the receiver stopped producing messages.
ReceiverError: If there is a problem with the receiver.
"""
if self._recv_closed:
raise ReceiverStoppedError(self)
Expand Down
10 changes: 8 additions & 2 deletions src/frequenz/channels/_select.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,10 @@ def __init__(self, receiver: Receiver[ReceiverMessageT_co], /) -> None:
self._handled: bool = False
"""Flag to indicate if this selected has been handled in the if-chain."""

# We need the noqa here because pydoclint can't figure out raise self._exception
# actually raise an Exception.
@property
def message(self) -> ReceiverMessageT_co:
def message(self) -> ReceiverMessageT_co: # noqa: DOC503
"""The message that was received, if any.

Returns:
Expand Down Expand Up @@ -339,7 +341,11 @@ def __init__(self, selected: Selected[ReceiverMessageT_co]) -> None:
# https://github.com/python/mypy/issues/13597


async def select(*receivers: Receiver[Any]) -> AsyncIterator[Selected[Any]]:
# We need the noqa here because BaseExceptionGroup can be raised indirectly by
# _stop_pending_tasks.
async def select( # noqa: DOC503
*receivers: Receiver[Any],
) -> AsyncIterator[Selected[Any]]:
"""Iterate over the messages of all receivers as they receive new messages.

This function is used to iterate over the messages of all receivers as they receive
Expand Down
6 changes: 3 additions & 3 deletions src/frequenz/channels/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

import asyncio as _asyncio

from frequenz.channels import _receiver
from frequenz.channels._receiver import Receiver, ReceiverStoppedError


class Event(_receiver.Receiver[None]):
class Event(Receiver[None]):
"""A receiver that can be made ready directly.

# Usage
Expand Down Expand Up @@ -161,7 +161,7 @@ def consume(self) -> None:
ReceiverStoppedError: If this receiver is stopped.
"""
if not self._is_set and self._is_stopped:
raise _receiver.ReceiverStoppedError(self)
raise ReceiverStoppedError(self)

assert self._is_set, "calls to `consume()` must be follow a call to `ready()`"

Expand Down
6 changes: 4 additions & 2 deletions src/frequenz/channels/timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,8 @@ class Timer(Receiver[timedelta]):
depending on the chosen policy.
"""

def __init__( # pylint: disable=too-many-arguments
# We need the noqa here because RuntimeError is raised indirectly.
def __init__( # noqa: DOC503 pylint: disable=too-many-arguments
self,
interval: timedelta,
missed_tick_policy: MissedTickPolicy,
Expand Down Expand Up @@ -586,7 +587,8 @@ def is_running(self) -> bool:
"""Whether the timer is running."""
return not self._stopped

def reset(
# We need the noqa here because RuntimeError is raised indirectly.
def reset( # noqa: DOC503
self,
*,
interval: timedelta | None = None,
Expand Down
Loading