diff --git a/benchmarks/benchmark_anycast.py b/benchmarks/benchmark_anycast.py index a71bae9a..2495b4a6 100644 --- a/benchmarks/benchmark_anycast.py +++ b/benchmarks/benchmark_anycast.py @@ -6,7 +6,8 @@ import asyncio import csv import timeit -from typing import Any, Coroutine, Dict, List, Tuple +from collections.abc import Coroutine +from typing import Any from frequenz.channels import Anycast, Receiver, Sender @@ -40,7 +41,7 @@ async def benchmark_anycast( Returns: int: Total number of messages received by all channels. """ - channels: List[Anycast[int]] = [Anycast(buffer_size) for _ in range(num_channels)] + channels: list[Anycast[int]] = [Anycast(buffer_size) for _ in range(num_channels)] senders = [ asyncio.create_task(send_msg(num_messages, bcast.new_sender())) for bcast in channels @@ -68,7 +69,7 @@ async def update_tracker_on_receive(chan: Receiver[int]) -> None: return recv_trackers[0] -def time_async_task(task: Coroutine[Any, Any, int]) -> Tuple[float, Any]: +def time_async_task(task: Coroutine[Any, Any, int]) -> tuple[float, Any]: """Run a task and return the time taken and the result. Args: @@ -87,7 +88,7 @@ def run_one( num_messages: int, num_receivers: int, buffer_size: int, -) -> Dict[str, Any]: +) -> dict[str, Any]: """Run a single benchmark.""" runtime, total_msgs = time_async_task( benchmark_anycast(num_channels, num_messages, num_receivers, buffer_size) diff --git a/benchmarks/benchmark_broadcast.py b/benchmarks/benchmark_broadcast.py index 959ab14e..7934e408 100644 --- a/benchmarks/benchmark_broadcast.py +++ b/benchmarks/benchmark_broadcast.py @@ -6,8 +6,9 @@ import asyncio import csv import timeit +from collections.abc import Callable, Coroutine from functools import partial -from typing import Any, Callable, Coroutine, Dict, List, Tuple +from typing import Any from frequenz.channels import Broadcast, Receiver, Sender @@ -60,8 +61,8 @@ async def benchmark_broadcast( Returns: int: Total number of messages received by all receivers. """ - channels: List[Broadcast[int]] = [Broadcast("meter") for _ in range(num_channels)] - senders: List[asyncio.Task[Any]] = [ + channels: list[Broadcast[int]] = [Broadcast("meter") for _ in range(num_channels)] + senders: list[asyncio.Task[Any]] = [ asyncio.create_task(send_msg(num_messages, bcast.new_sender())) for bcast in channels ] @@ -103,7 +104,7 @@ async def benchmark_single_task_broadcast( Returns: int: Total number of messages received by all receivers. """ - channels: List[Broadcast[int]] = [Broadcast("meter") for _ in range(num_channels)] + channels: list[Broadcast[int]] = [Broadcast("meter") for _ in range(num_channels)] senders = [b.new_sender() for b in channels] recv_tracker = 0 @@ -122,7 +123,7 @@ async def benchmark_single_task_broadcast( return recv_tracker -def time_async_task(task: Coroutine[Any, Any, int]) -> Tuple[float, Any]: +def time_async_task(task: Coroutine[Any, Any, int]) -> tuple[float, Any]: """Run a task and return the time taken and the result. Args: @@ -144,7 +145,7 @@ def run_one( num_receivers: int, tasks_used: str, interval_between_messages: float, -) -> Dict[str, Any]: +) -> dict[str, Any]: """Run a single benchmark.""" runtime, total_msgs = time_async_task( benchmark_method(num_channels, num_messages, num_receivers) diff --git a/src/conftest.py b/src/conftest.py index a4f76bc6..a5c6b352 100644 --- a/src/conftest.py +++ b/src/conftest.py @@ -7,7 +7,6 @@ This plugin extracts these code examples and validates them using pylint. """ -from __future__ import annotations import ast import os diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index cbdf3d1e..edc665ac 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -7,7 +7,7 @@ from asyncio import Condition from collections import deque -from typing import Deque, Generic, Type +from typing import Deque, Generic from ._base_classes import Receiver as BaseReceiver from ._base_classes import Sender as BaseSender @@ -169,7 +169,7 @@ def __init__(self, chan: Anycast[T]) -> None: chan: A reference to the channel that this receiver belongs to. """ self._chan = chan - self._next: T | Type[_Empty] = _Empty + self._next: T | type[_Empty] = _Empty async def ready(self) -> bool: """Wait until the receiver is ready with a value or an error. diff --git a/src/frequenz/channels/_base_classes.py b/src/frequenz/channels/_base_classes.py index d2090429..3518f116 100644 --- a/src/frequenz/channels/_base_classes.py +++ b/src/frequenz/channels/_base_classes.py @@ -6,7 +6,8 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import Callable, Generic, Optional, TypeVar +from collections.abc import Callable +from typing import Generic, TypeVar from ._exceptions import ReceiverStoppedError @@ -145,7 +146,7 @@ class Peekable(ABC, Generic[T]): """ @abstractmethod - def peek(self) -> Optional[T]: + def peek(self) -> T | None: """Return the latest value that was sent to the channel. Returns: diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 33f61a80..ef814e22 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -9,7 +9,7 @@ import weakref from asyncio import Condition from collections import deque -from typing import Deque, Dict, Generic, Optional +from typing import Deque, Generic from uuid import UUID, uuid4 from ._base_classes import Peekable as BasePeekable @@ -88,9 +88,9 @@ def __init__(self, name: str, resend_latest: bool = False) -> None: self._resend_latest = resend_latest self.recv_cv: Condition = Condition() - self.receivers: Dict[UUID, weakref.ReferenceType[Receiver[T]]] = {} + self.receivers: dict[UUID, weakref.ReferenceType[Receiver[T]]] = {} self.closed: bool = False - self._latest: Optional[T] = None + self._latest: T | None = None async def close(self) -> None: """Close the Broadcast channel. @@ -116,9 +116,7 @@ def new_sender(self) -> Sender[T]: """ return Sender(self) - def new_receiver( - self, name: Optional[str] = None, maxsize: int = 50 - ) -> Receiver[T]: + def new_receiver(self, name: str | None = None, maxsize: int = 50) -> Receiver[T]: """Create a new broadcast receiver. Broadcast receivers have their own buffer, and when messages are not @@ -346,7 +344,7 @@ def __init__(self, chan: Broadcast[T]) -> None: """ self._chan = chan - def peek(self) -> Optional[T]: + def peek(self) -> T | None: """Return the latest value that was sent to the channel. Returns: diff --git a/src/frequenz/channels/util/_merge.py b/src/frequenz/channels/util/_merge.py index 166b2f5f..c1170ffe 100644 --- a/src/frequenz/channels/util/_merge.py +++ b/src/frequenz/channels/util/_merge.py @@ -5,7 +5,7 @@ import asyncio from collections import deque -from typing import Any, Deque, Set +from typing import Any, Deque from .._base_classes import Receiver, T from .._exceptions import ReceiverStoppedError @@ -44,7 +44,7 @@ def __init__(self, *args: Receiver[T]) -> None: *args: sequence of channel receivers. """ self._receivers = {str(id): recv for id, recv in enumerate(args)} - self._pending: Set[asyncio.Task[Any]] = { + self._pending: set[asyncio.Task[Any]] = { asyncio.create_task(recv.__anext__(), name=name) for name, recv in self._receivers.items() } diff --git a/src/frequenz/channels/util/_merge_named.py b/src/frequenz/channels/util/_merge_named.py index 2267ef6a..7fa06585 100644 --- a/src/frequenz/channels/util/_merge_named.py +++ b/src/frequenz/channels/util/_merge_named.py @@ -5,13 +5,13 @@ import asyncio from collections import deque -from typing import Any, Deque, Set, Tuple +from typing import Any, Deque from .._base_classes import Receiver, T from .._exceptions import ReceiverStoppedError -class MergeNamed(Receiver[Tuple[str, T]]): +class MergeNamed(Receiver[tuple[str, T]]): """Merge messages coming from multiple named channels into a single stream. When `MergeNamed` is no longer needed, then it should be stopped using @@ -25,11 +25,11 @@ def __init__(self, **kwargs: Receiver[T]) -> None: **kwargs: sequence of channel receivers. """ self._receivers = kwargs - self._pending: Set[asyncio.Task[Any]] = { + self._pending: set[asyncio.Task[Any]] = { asyncio.create_task(recv.__anext__(), name=name) for name, recv in self._receivers.items() } - self._results: Deque[Tuple[str, T]] = deque(maxlen=len(self._receivers)) + self._results: Deque[tuple[str, T]] = deque(maxlen=len(self._receivers)) def __del__(self) -> None: """Cleanup any pending tasks.""" @@ -81,7 +81,7 @@ async def ready(self) -> bool: asyncio.create_task(self._receivers[name].__anext__(), name=name) ) - def consume(self) -> Tuple[str, T]: + def consume(self) -> tuple[str, T]: """Return the latest value once `ready` is complete. Returns: diff --git a/src/frequenz/channels/util/_select.py b/src/frequenz/channels/util/_select.py index aac62a75..f2d0d084 100644 --- a/src/frequenz/channels/util/_select.py +++ b/src/frequenz/channels/util/_select.py @@ -9,7 +9,8 @@ """ import asyncio -from typing import Any, AsyncIterator, Generic, TypeGuard, TypeVar +from collections.abc import AsyncIterator +from typing import Any, Generic, TypeGuard, TypeVar from .._base_classes import Receiver from .._exceptions import ReceiverStoppedError diff --git a/tests/test_anycast.py b/tests/test_anycast.py index 5a5e8c1e..bb063ea5 100644 --- a/tests/test_anycast.py +++ b/tests/test_anycast.py @@ -3,7 +3,6 @@ """Tests for the Channel implementation.""" -from __future__ import annotations import asyncio diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index 566252c0..936fe20d 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -3,10 +3,8 @@ """Tests for the Broadcast implementation.""" -from __future__ import annotations import asyncio -from typing import Tuple import pytest @@ -117,7 +115,7 @@ async def test_broadcast_overflow() -> None: big_receiver = bcast.new_receiver("named-recv", big_recv_size) small_receiver = bcast.new_receiver(None, small_recv_size) - async def drain_receivers() -> Tuple[int, int]: + async def drain_receivers() -> tuple[int, int]: big_sum = 0 small_sum = 0 while len(big_receiver) > 0: diff --git a/tests/test_merge.py b/tests/test_merge.py index 42643727..e9516b29 100644 --- a/tests/test_merge.py +++ b/tests/test_merge.py @@ -4,7 +4,6 @@ """Tests for the Merge implementation.""" import asyncio -from typing import List from frequenz.channels import Anycast, Sender from frequenz.channels.util import Merge @@ -26,7 +25,7 @@ async def send(ch1: Sender[int], ch2: Sender[int]) -> None: senders = asyncio.create_task(send(chan1.new_sender(), chan2.new_sender())) merge = Merge(chan1.new_receiver(), chan2.new_receiver()) - results: List[int] = [] + results: list[int] = [] async for item in merge: results.append(item) await senders @@ -37,5 +36,5 @@ async def send(ch1: Sender[int], ch2: Sender[int]) -> None: # order, where N is the number of channels. This only works in this # example because the `send` method sends values in immeidate # succession. - assert set((results[idx : idx + 2])) == {ctr + 1, ctr + 101} + assert set(results[idx : idx + 2]) == {ctr + 1, ctr + 101} assert results[-1] == 1000 diff --git a/tests/test_mergenamed.py b/tests/test_mergenamed.py index 04028e9c..f51868e1 100644 --- a/tests/test_mergenamed.py +++ b/tests/test_mergenamed.py @@ -4,7 +4,6 @@ """Tests for the MergeNamed implementation.""" import asyncio -from typing import List, Tuple from frequenz.channels import Anycast, Sender from frequenz.channels.util import MergeNamed @@ -27,7 +26,7 @@ async def send(ch1: Sender[int], ch2: Sender[int]) -> None: recvs = {"chan1": chan1.new_receiver(), "chan2": chan2.new_receiver()} merge = MergeNamed(**recvs) - results: List[Tuple[str, int]] = [] + results: list[tuple[str, int]] = [] async for item in merge: results.append(item) await senders @@ -38,7 +37,7 @@ async def send(ch1: Sender[int], ch2: Sender[int]) -> None: # order, where N is the number of channels. This only works in this # example because the `send` method sends values in immeidate # succession. - assert set((results[idx : idx + 2])) == { + assert set(results[idx : idx + 2]) == { ("chan1", ctr + 1), ("chan2", ctr + 101), } diff --git a/tests/utils/test_file_watcher.py b/tests/utils/test_file_watcher.py index c7f4de5e..789427b9 100644 --- a/tests/utils/test_file_watcher.py +++ b/tests/utils/test_file_watcher.py @@ -3,7 +3,6 @@ """Tests for `channel.FileWatcher`.""" -from __future__ import annotations import pathlib from collections.abc import AsyncGenerator, Iterator, Sequence diff --git a/tests/utils/test_timer.py b/tests/utils/test_timer.py index d5330d83..ecf774a5 100644 --- a/tests/utils/test_timer.py +++ b/tests/utils/test_timer.py @@ -3,7 +3,6 @@ """Tests for the timer.""" -from __future__ import annotations import asyncio import enum