Skip to content

Commit b425905

Browse files
authored
Use a sentinel in LatestValueCache to denote if the cache is empty (#846)
This is necessary because `None` is a valid value that could have been sent through a channel, so the having a `None` value might not always mean that the cache is empty. Closes #820
2 parents e27e4ec + 2d915c4 commit b425905

File tree

3 files changed

+41
-17
lines changed

3 files changed

+41
-17
lines changed

src/frequenz/sdk/_internal/_channels.py

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ def new_receiver(self, *, maxsize: int = 50) -> Receiver[T]:
2727
"""
2828

2929

30+
class _Sentinel:
31+
"""A sentinel to denote that no value has been received yet."""
32+
33+
3034
class LatestValueCache(typing.Generic[T]):
3135
"""A cache that stores the latest value in a receiver."""
3236

@@ -37,14 +41,34 @@ def __init__(self, receiver: Receiver[T]) -> None:
3741
receiver: The receiver to cache.
3842
"""
3943
self._receiver = receiver
40-
self._latest_value: T | None = None
44+
self._latest_value: T | _Sentinel = _Sentinel()
4145
self._task = asyncio.create_task(self._run())
4246

43-
@property
44-
def latest_value(self) -> T | None:
45-
"""Get the latest value in the cache."""
47+
def get(self) -> T:
48+
"""Return the latest value that has been received.
49+
50+
This raises a `ValueError` if no value has been received yet. Use `has_value` to
51+
check whether a value has been received yet, before trying to access the value,
52+
to avoid the exception.
53+
54+
Returns:
55+
The latest value that has been received.
56+
57+
Raises:
58+
ValueError: If no value has been received yet.
59+
"""
60+
if isinstance(self._latest_value, _Sentinel):
61+
raise ValueError("No value has been received yet.")
4662
return self._latest_value
4763

64+
def has_value(self) -> bool:
65+
"""Check whether a value has been received yet.
66+
67+
Returns:
68+
`True` if a value has been received, `False` otherwise.
69+
"""
70+
return not isinstance(self._latest_value, _Sentinel)
71+
4872
async def _run(self) -> None:
4973
async for value in self._receiver:
5074
self._latest_value = value

src/frequenz/sdk/actor/power_distributing/_component_managers/_battery_manager.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -419,27 +419,27 @@ def _get_battery_inverter_data(
419419
Data for the battery and adjacent inverter without NaN values.
420420
Return None if we could not replace NaN values.
421421
"""
422-
battery_data_none = [
423-
self._battery_caches[battery_id].latest_value for battery_id in battery_ids
424-
]
425-
inverter_data_none = [
426-
self._inverter_caches[inverter_id].latest_value
427-
for inverter_id in inverter_ids
428-
]
429-
430-
# It means that nothing has been send on this channels, yet.
422+
# It means that nothing has been send on these channels, yet.
431423
# This should be handled by BatteryStatus. BatteryStatus should not return
432424
# this batteries as working.
433-
if not all(battery_data_none) or not all(inverter_data_none):
425+
if not all(
426+
self._battery_caches[bat_id].has_value for bat_id in battery_ids
427+
) or not all(
428+
self._inverter_caches[inv_id].has_value for inv_id in inverter_ids
429+
):
434430
_logger.error(
435431
"Battery %s or inverter %s send no data, yet. They should be not used.",
436432
battery_ids,
437433
inverter_ids,
438434
)
439435
return None
440436

441-
battery_data = typing.cast(list[BatteryData], battery_data_none)
442-
inverter_data = typing.cast(list[InverterData], inverter_data_none)
437+
battery_data = [
438+
self._battery_caches[battery_id].get() for battery_id in battery_ids
439+
]
440+
inverter_data = [
441+
self._inverter_caches[inverter_id].get() for inverter_id in inverter_ids
442+
]
443443

444444
DataType = typing.TypeVar("DataType", BatteryData, InverterData)
445445

src/frequenz/sdk/timeseries/ev_charger_pool/_set_current_bounds.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ async def _run(self) -> None:
103103
timer = Timer.timeout(timedelta(self._repeat_interval.total_seconds()))
104104

105105
async for selected in select(bound_chan, timer):
106-
meter = meter_data.latest_value
106+
meter = meter_data.get()
107107
if meter is None:
108108
raise ValueError("Meter channel closed.")
109109

0 commit comments

Comments
 (0)