|  | 
| 4 | 4 | """General purpose classes for use with channels.""" | 
| 5 | 5 | 
 | 
| 6 | 6 | import abc | 
| 7 |  | -import asyncio | 
| 8 | 7 | import typing | 
| 9 | 8 | 
 | 
| 10 | 9 | from frequenz.channels import Receiver | 
| 11 | 10 | 
 | 
| 12 |  | -from ._asyncio import cancel_and_await | 
| 13 |  | - | 
| 14 | 11 | T_co = typing.TypeVar("T_co", covariant=True) | 
| 15 | 12 | U_co = typing.TypeVar("U_co", covariant=True) | 
| 16 | 13 | 
 | 
| @@ -58,82 +55,3 @@ def new_receiver(self, *, limit: int = 50) -> Receiver[U_co]: | 
| 58 | 55 |             A receiver instance. | 
| 59 | 56 |         """ | 
| 60 | 57 |         return self._mapping_function(self._fetcher.new_receiver(limit=limit)) | 
| 61 |  | - | 
| 62 |  | - | 
| 63 |  | -class _Sentinel: | 
| 64 |  | -    """A sentinel to denote that no value has been received yet.""" | 
| 65 |  | - | 
| 66 |  | -    def __str__(self) -> str: | 
| 67 |  | -        """Return a string representation of this sentinel.""" | 
| 68 |  | -        return "<no value received yet>" | 
| 69 |  | - | 
| 70 |  | - | 
| 71 |  | -class LatestValueCache(typing.Generic[T_co]): | 
| 72 |  | -    """A cache that stores the latest value in a receiver.""" | 
| 73 |  | - | 
| 74 |  | -    def __init__( | 
| 75 |  | -        self, receiver: Receiver[T_co], *, unique_id: str | None = None | 
| 76 |  | -    ) -> None: | 
| 77 |  | -        """Create a new cache. | 
| 78 |  | -
 | 
| 79 |  | -        Args: | 
| 80 |  | -            receiver: The receiver to cache. | 
| 81 |  | -            unique_id: A string to help uniquely identify this instance. If not | 
| 82 |  | -                provided, a unique identifier will be generated from the object's | 
| 83 |  | -                [`id()`][]. It is used mostly for debugging purposes. | 
| 84 |  | -        """ | 
| 85 |  | -        self._receiver = receiver | 
| 86 |  | -        self._unique_id: str = hex(id(self)) if unique_id is None else unique_id | 
| 87 |  | -        self._latest_value: T_co | _Sentinel = _Sentinel() | 
| 88 |  | -        self._task = asyncio.create_task( | 
| 89 |  | -            self._run(), name=f"LatestValueCache«{self._unique_id}»" | 
| 90 |  | -        ) | 
| 91 |  | - | 
| 92 |  | -    @property | 
| 93 |  | -    def unique_id(self) -> str: | 
| 94 |  | -        """The unique identifier of this instance.""" | 
| 95 |  | -        return self._unique_id | 
| 96 |  | - | 
| 97 |  | -    def get(self) -> T_co: | 
| 98 |  | -        """Return the latest value that has been received. | 
| 99 |  | -
 | 
| 100 |  | -        This raises a `ValueError` if no value has been received yet. Use `has_value` to | 
| 101 |  | -        check whether a value has been received yet, before trying to access the value, | 
| 102 |  | -        to avoid the exception. | 
| 103 |  | -
 | 
| 104 |  | -        Returns: | 
| 105 |  | -            The latest value that has been received. | 
| 106 |  | -
 | 
| 107 |  | -        Raises: | 
| 108 |  | -            ValueError: If no value has been received yet. | 
| 109 |  | -        """ | 
| 110 |  | -        if isinstance(self._latest_value, _Sentinel): | 
| 111 |  | -            raise ValueError("No value has been received yet.") | 
| 112 |  | -        return self._latest_value | 
| 113 |  | - | 
| 114 |  | -    def has_value(self) -> bool: | 
| 115 |  | -        """Check whether a value has been received yet. | 
| 116 |  | -
 | 
| 117 |  | -        Returns: | 
| 118 |  | -            `True` if a value has been received, `False` otherwise. | 
| 119 |  | -        """ | 
| 120 |  | -        return not isinstance(self._latest_value, _Sentinel) | 
| 121 |  | - | 
| 122 |  | -    async def _run(self) -> None: | 
| 123 |  | -        async for value in self._receiver: | 
| 124 |  | -            self._latest_value = value | 
| 125 |  | - | 
| 126 |  | -    async def stop(self) -> None: | 
| 127 |  | -        """Stop the cache.""" | 
| 128 |  | -        await cancel_and_await(self._task) | 
| 129 |  | - | 
| 130 |  | -    def __repr__(self) -> str: | 
| 131 |  | -        """Return a string representation of this cache.""" | 
| 132 |  | -        return ( | 
| 133 |  | -            f"<LatestValueCache latest_value={self._latest_value!r}, " | 
| 134 |  | -            f"receiver={self._receiver!r}, unique_id={self._unique_id!r}>" | 
| 135 |  | -        ) | 
| 136 |  | - | 
| 137 |  | -    def __str__(self) -> str: | 
| 138 |  | -        """Return the last value seen by this cache.""" | 
| 139 |  | -        return str(self._latest_value) | 
0 commit comments