Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 0 additions & 82 deletions src/frequenz/sdk/_internal/_channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,10 @@
"""General purpose classes for use with channels."""

import abc
import asyncio
import typing

from frequenz.channels import Receiver

from ._asyncio import cancel_and_await

T_co = typing.TypeVar("T_co", covariant=True)
U_co = typing.TypeVar("U_co", covariant=True)

Expand Down Expand Up @@ -58,82 +55,3 @@ def new_receiver(self, *, limit: int = 50) -> Receiver[U_co]:
A receiver instance.
"""
return self._mapping_function(self._fetcher.new_receiver(limit=limit))


class _Sentinel:
"""A sentinel to denote that no value has been received yet."""

def __str__(self) -> str:
"""Return a string representation of this sentinel."""
return "<no value received yet>"


class LatestValueCache(typing.Generic[T_co]):
"""A cache that stores the latest value in a receiver."""

def __init__(
self, receiver: Receiver[T_co], *, unique_id: str | None = None
) -> None:
"""Create a new cache.

Args:
receiver: The receiver to cache.
unique_id: A string to help uniquely identify this instance. If not
provided, a unique identifier will be generated from the object's
[`id()`][]. It is used mostly for debugging purposes.
"""
self._receiver = receiver
self._unique_id: str = hex(id(self)) if unique_id is None else unique_id
self._latest_value: T_co | _Sentinel = _Sentinel()
self._task = asyncio.create_task(
self._run(), name=f"LatestValueCache«{self._unique_id}»"
)

@property
def unique_id(self) -> str:
"""The unique identifier of this instance."""
return self._unique_id

def get(self) -> T_co:
"""Return the latest value that has been received.

This raises a `ValueError` if no value has been received yet. Use `has_value` to
check whether a value has been received yet, before trying to access the value,
to avoid the exception.

Returns:
The latest value that has been received.

Raises:
ValueError: If no value has been received yet.
"""
if isinstance(self._latest_value, _Sentinel):
raise ValueError("No value has been received yet.")
return self._latest_value

def has_value(self) -> bool:
"""Check whether a value has been received yet.

Returns:
`True` if a value has been received, `False` otherwise.
"""
return not isinstance(self._latest_value, _Sentinel)

async def _run(self) -> None:
async for value in self._receiver:
self._latest_value = value

async def stop(self) -> None:
"""Stop the cache."""
await cancel_and_await(self._task)

def __repr__(self) -> str:
"""Return a string representation of this cache."""
return (
f"<LatestValueCache latest_value={self._latest_value!r}, "
f"receiver={self._receiver!r}, unique_id={self._unique_id!r}>"
)

def __str__(self) -> str:
"""Return the last value seen by this cache."""
return str(self._latest_value)
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import typing
from datetime import timedelta

from frequenz.channels import Receiver, Sender
from frequenz.channels import LatestValueCache, Receiver, Sender
from frequenz.client.microgrid import (
BatteryData,
ClientError,
Expand All @@ -21,7 +21,6 @@
from typing_extensions import override

from .... import microgrid
from ...._internal._channels import LatestValueCache
from ...._internal._math import is_close_to_zero
from ....microgrid import connection_manager
from ....timeseries._quantities import Power
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@
import logging
from datetime import datetime, timedelta, timezone

from frequenz.channels import Broadcast, Sender, merge, select, selected_from
from frequenz.channels import (
Broadcast,
LatestValueCache,
Sender,
merge,
select,
selected_from,
)
from frequenz.client.microgrid import (
ApiClient,
ClientError,
Expand All @@ -19,7 +26,6 @@

from frequenz.sdk import microgrid

from ....._internal._channels import LatestValueCache
from ....._internal._math import is_close_to_zero
from .....timeseries import Power, Sample3Phase, Voltage
from ..._component_pool_status_tracker import ComponentPoolStatusTracker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import logging
from datetime import timedelta

from frequenz.channels import Broadcast, Sender
from frequenz.channels import Broadcast, LatestValueCache, Sender
from frequenz.client.microgrid import (
ClientError,
ComponentCategory,
Expand All @@ -17,7 +17,6 @@
)
from typing_extensions import override

from ....._internal._channels import LatestValueCache
from ....._internal._math import is_close_to_zero
from .....microgrid import connection_manager
from .....timeseries import Power
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@
from dataclasses import dataclass
from datetime import timedelta

from frequenz.channels import Broadcast, Sender, select, selected_from
from frequenz.channels import Broadcast, LatestValueCache, Sender, select, selected_from
from frequenz.channels.timer import SkipMissedAndDrift, Timer
from frequenz.client.microgrid import ComponentCategory, MeterData

from ..._internal._asyncio import cancel_and_await
from ..._internal._channels import LatestValueCache
from ...microgrid import connection_manager

_logger = logging.getLogger(__name__)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@
from unittest.mock import AsyncMock, MagicMock

import pytest
from frequenz.channels import Sender
from frequenz.channels import LatestValueCache, Sender
from pytest_mock import MockerFixture

from frequenz.sdk import microgrid, timeseries
from frequenz.sdk._internal._channels import LatestValueCache
from frequenz.sdk.actor import ResamplerConfig, power_distributing
from frequenz.sdk.actor.power_distributing import ComponentPoolStatus
from frequenz.sdk.actor.power_distributing._component_pool_status_tracker import (
Expand Down