From 71dde0f7add2edd99c59993dc43691c974b30fbb Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 11 Jul 2024 16:17:43 +0200 Subject: [PATCH 1/2] Switch to the `LatestValueCache` from the channels package Signed-off-by: Sahas Subramanian --- .../_component_managers/_battery_manager.py | 3 +-- .../_ev_charger_manager/_ev_charger_manager.py | 10 ++++++++-- .../_pv_inverter_manager/_pv_inverter_manager.py | 3 +-- .../timeseries/ev_charger_pool/_set_current_bounds.py | 3 +-- .../_battery_pool/test_battery_pool_control_methods.py | 3 +-- 5 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py index 3bf7b10e3..601870a7b 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py @@ -10,7 +10,7 @@ import typing from datetime import timedelta -from frequenz.channels import Receiver, Sender +from frequenz.channels import LatestValueCache, Receiver, Sender from frequenz.client.microgrid import ( BatteryData, ClientError, @@ -21,7 +21,6 @@ from typing_extensions import override from .... import microgrid -from ...._internal._channels import LatestValueCache from ...._internal._math import is_close_to_zero from ....microgrid import connection_manager from ....timeseries._quantities import Power diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py index e4c929097..e5d9e3249 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py @@ -8,7 +8,14 @@ import logging from datetime import datetime, timedelta, timezone -from frequenz.channels import Broadcast, Sender, merge, select, selected_from +from frequenz.channels import ( + Broadcast, + LatestValueCache, + Sender, + merge, + select, + selected_from, +) from frequenz.client.microgrid import ( ApiClient, ClientError, @@ -19,7 +26,6 @@ from frequenz.sdk import microgrid -from ....._internal._channels import LatestValueCache from ....._internal._math import is_close_to_zero from .....timeseries import Power, Sample3Phase, Voltage from ..._component_pool_status_tracker import ComponentPoolStatusTracker diff --git a/src/frequenz/sdk/actor/power_distributing/_component_managers/_pv_inverter_manager/_pv_inverter_manager.py b/src/frequenz/sdk/actor/power_distributing/_component_managers/_pv_inverter_manager/_pv_inverter_manager.py index b09690802..2c416636f 100644 --- a/src/frequenz/sdk/actor/power_distributing/_component_managers/_pv_inverter_manager/_pv_inverter_manager.py +++ b/src/frequenz/sdk/actor/power_distributing/_component_managers/_pv_inverter_manager/_pv_inverter_manager.py @@ -8,7 +8,7 @@ import logging from datetime import timedelta -from frequenz.channels import Broadcast, Sender +from frequenz.channels import Broadcast, LatestValueCache, Sender from frequenz.client.microgrid import ( ClientError, ComponentCategory, @@ -17,7 +17,6 @@ ) from typing_extensions import override -from ....._internal._channels import LatestValueCache from ....._internal._math import is_close_to_zero from .....microgrid import connection_manager from .....timeseries import Power diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_set_current_bounds.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_set_current_bounds.py index 81bd54630..7ff92e950 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_set_current_bounds.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_set_current_bounds.py @@ -8,12 +8,11 @@ from dataclasses import dataclass from datetime import timedelta -from frequenz.channels import Broadcast, Sender, select, selected_from +from frequenz.channels import Broadcast, LatestValueCache, Sender, select, selected_from from frequenz.channels.timer import SkipMissedAndDrift, Timer from frequenz.client.microgrid import ComponentCategory, MeterData from ..._internal._asyncio import cancel_and_await -from ..._internal._channels import LatestValueCache from ...microgrid import connection_manager _logger = logging.getLogger(__name__) diff --git a/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py b/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py index 0f5fd29d2..8797befef 100644 --- a/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py +++ b/tests/timeseries/_battery_pool/test_battery_pool_control_methods.py @@ -10,11 +10,10 @@ from unittest.mock import AsyncMock, MagicMock import pytest -from frequenz.channels import Sender +from frequenz.channels import LatestValueCache, Sender from pytest_mock import MockerFixture from frequenz.sdk import microgrid, timeseries -from frequenz.sdk._internal._channels import LatestValueCache from frequenz.sdk.actor import ResamplerConfig, power_distributing from frequenz.sdk.actor.power_distributing import ComponentPoolStatus from frequenz.sdk.actor.power_distributing._component_pool_status_tracker import ( From 54ed0d03853337c4042d8243556a3a773d613ef7 Mon Sep 17 00:00:00 2001 From: Sahas Subramanian Date: Thu, 11 Jul 2024 16:18:53 +0200 Subject: [PATCH 2/2] Remove internal `LatestValueCache` implementation This has been moved to the channels repo, and the SDK has switched to using that implementation instead. So the internal one is no longer necessary. Signed-off-by: Sahas Subramanian --- src/frequenz/sdk/_internal/_channels.py | 82 ------------------------- 1 file changed, 82 deletions(-) diff --git a/src/frequenz/sdk/_internal/_channels.py b/src/frequenz/sdk/_internal/_channels.py index 6eadca45e..945d7cf30 100644 --- a/src/frequenz/sdk/_internal/_channels.py +++ b/src/frequenz/sdk/_internal/_channels.py @@ -4,13 +4,10 @@ """General purpose classes for use with channels.""" import abc -import asyncio import typing from frequenz.channels import Receiver -from ._asyncio import cancel_and_await - T_co = typing.TypeVar("T_co", covariant=True) U_co = typing.TypeVar("U_co", covariant=True) @@ -58,82 +55,3 @@ def new_receiver(self, *, limit: int = 50) -> Receiver[U_co]: A receiver instance. """ return self._mapping_function(self._fetcher.new_receiver(limit=limit)) - - -class _Sentinel: - """A sentinel to denote that no value has been received yet.""" - - def __str__(self) -> str: - """Return a string representation of this sentinel.""" - return "" - - -class LatestValueCache(typing.Generic[T_co]): - """A cache that stores the latest value in a receiver.""" - - def __init__( - self, receiver: Receiver[T_co], *, unique_id: str | None = None - ) -> None: - """Create a new cache. - - Args: - receiver: The receiver to cache. - unique_id: A string to help uniquely identify this instance. If not - provided, a unique identifier will be generated from the object's - [`id()`][]. It is used mostly for debugging purposes. - """ - self._receiver = receiver - self._unique_id: str = hex(id(self)) if unique_id is None else unique_id - self._latest_value: T_co | _Sentinel = _Sentinel() - self._task = asyncio.create_task( - self._run(), name=f"LatestValueCache«{self._unique_id}»" - ) - - @property - def unique_id(self) -> str: - """The unique identifier of this instance.""" - return self._unique_id - - def get(self) -> T_co: - """Return the latest value that has been received. - - This raises a `ValueError` if no value has been received yet. Use `has_value` to - check whether a value has been received yet, before trying to access the value, - to avoid the exception. - - Returns: - The latest value that has been received. - - Raises: - ValueError: If no value has been received yet. - """ - if isinstance(self._latest_value, _Sentinel): - raise ValueError("No value has been received yet.") - return self._latest_value - - def has_value(self) -> bool: - """Check whether a value has been received yet. - - Returns: - `True` if a value has been received, `False` otherwise. - """ - return not isinstance(self._latest_value, _Sentinel) - - async def _run(self) -> None: - async for value in self._receiver: - self._latest_value = value - - async def stop(self) -> None: - """Stop the cache.""" - await cancel_and_await(self._task) - - def __repr__(self) -> str: - """Return a string representation of this cache.""" - return ( - f"" - ) - - def __str__(self) -> str: - """Return the last value seen by this cache.""" - return str(self._latest_value)