Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions src/frequenz/sdk/_internal/_channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,36 @@ def new_receiver(self, *, limit: int = 50) -> Receiver[T_co]:
class _Sentinel:
"""A sentinel to denote that no value has been received yet."""

def __str__(self) -> str:
"""Return a string representation of this sentinel."""
return "<no value received yet>"


class LatestValueCache(typing.Generic[T_co]):
"""A cache that stores the latest value in a receiver."""

def __init__(self, receiver: Receiver[T_co]) -> None:
def __init__(
self, receiver: Receiver[T_co], *, unique_id: str | None = None
) -> None:
"""Create a new cache.

Args:
receiver: The receiver to cache.
unique_id: A string to help uniquely identify this instance. If not
provided, a unique identifier will be generated from the object's
[`id()`][]. It is used mostly for debugging purposes.
"""
self._receiver = receiver
self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
self._latest_value: T_co | _Sentinel = _Sentinel()
self._task = asyncio.create_task(self._run())
self._task = asyncio.create_task(
self._run(), name=f"LatestValueCache«{self._unique_id}»"
)

@property
def unique_id(self) -> str:
"""The unique identifier of this instance."""
return self._unique_id

def get(self) -> T_co:
"""Return the latest value that has been received.
Expand Down Expand Up @@ -78,3 +95,14 @@ async def _run(self) -> None:
async def stop(self) -> None:
"""Stop the cache."""
await cancel_and_await(self._task)

def __repr__(self) -> str:
"""Return a string representation of this cache."""
return (
f"<LatestValueCache latest_value={self._latest_value!r}, "
f"receiver={self._receiver!r}, unique_id={self._unique_id!r}>"
)

def __str__(self) -> str:
"""Return the last value seen by this cache."""
return str(self._latest_value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the docstring should say the latest value in the cache instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I tend to use just a generic docstring for __str__ and __repr__ but maybe in this case it is worth to be more explicit.

Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,19 @@ async def _distribute_power(
async def _create_channels(self) -> None:
"""Create channels to get data of components in microgrid."""
api = connection_manager.get().api_client
manager_id = f"{type(self).__name__}»{hex(id(self))}»"
for battery_id, inverter_ids in self._bat_invs_map.items():
bat_recv: Receiver[BatteryData] = await api.battery_data(battery_id)
self._battery_caches[battery_id] = LatestValueCache(bat_recv)
self._battery_caches[battery_id] = LatestValueCache(
bat_recv,
unique_id=f"{manager_id}:battery«{battery_id}»",
)

for inverter_id in inverter_ids:
inv_recv: Receiver[InverterData] = await api.inverter_data(inverter_id)
self._inverter_caches[inverter_id] = LatestValueCache(inv_recv)
self._inverter_caches[inverter_id] = LatestValueCache(
inv_recv, unique_id=f"{manager_id}:inverter«{inverter_id}»"
)

def _get_bounds(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ def __init__(
self._ev_charger_ids = self._get_ev_charger_ids()
self._evc_states = EvcStates()
self._voltage_cache: LatestValueCache[Sample3Phase[Voltage]] = LatestValueCache(
microgrid.voltage().new_receiver()
microgrid.voltage().new_receiver(),
unique_id=f"{type(self).__name__}«{hex(id(self))}»:voltage_cache",
)
self._config = EVDistributionConfig(component_ids=self._ev_charger_ids)
self._component_pool_status_tracker = ComponentPoolStatusTracker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ async def start(self) -> None:
"""Start the PV inverter manager."""
self._component_data_caches = {
inv_id: LatestValueCache(
await connection_manager.get().api_client.inverter_data(inv_id)
await connection_manager.get().api_client.inverter_data(inv_id),
unique_id=f"{type(self).__name__}«{hex(id(self))}»:inverter«{inv_id}»",
)
for inv_id in self._pv_inverter_ids
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from frequenz.channels import Receiver, Sender
from frequenz.client.microgrid import ComponentCategory, ComponentType, InverterType
from typing_extensions import override

from ...actor._actor import Actor
from ._component_managers import (
Expand Down Expand Up @@ -113,6 +114,7 @@ def __init__( # pylint: disable=too-many-arguments
f"PowerDistributor doesn't support controlling: {component_category}"
)

@override
async def _run(self) -> None: # pylint: disable=too-many-locals
"""Run actor main function.

Expand All @@ -127,6 +129,7 @@ async def _run(self) -> None: # pylint: disable=too-many-locals
async for request in self._requests_receiver:
await self._component_manager.distribute_power(request)

@override
async def stop(self, msg: str | None = None) -> None:
"""Stop this actor.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,10 @@ async def _run(self) -> None:
_logger.error(err)
raise RuntimeError(err)

meter_id = next(iter(meters)).component_id
self._meter_data_cache = LatestValueCache(
await api_client.meter_data(next(iter(meters)).component_id)
await api_client.meter_data(meter_id),
unique_id=f"{type(self).__name__}«{hex(id(self))}»:meter«{meter_id}»",
)
latest_bound: dict[int, ComponentCurrentLimit] = {}

Expand Down