|
4 | 4 | """General purpose classes for use with channels.""" |
5 | 5 |
|
6 | 6 | import abc |
7 | | -import asyncio |
8 | 7 | import typing |
9 | 8 |
|
10 | 9 | from frequenz.channels import Receiver |
11 | 10 |
|
12 | | -from ._asyncio import cancel_and_await |
13 | | - |
14 | 11 | T_co = typing.TypeVar("T_co", covariant=True) |
15 | 12 | U_co = typing.TypeVar("U_co", covariant=True) |
16 | 13 |
|
@@ -58,82 +55,3 @@ def new_receiver(self, *, limit: int = 50) -> Receiver[U_co]: |
58 | 55 | A receiver instance. |
59 | 56 | """ |
60 | 57 | return self._mapping_function(self._fetcher.new_receiver(limit=limit)) |
61 | | - |
62 | | - |
63 | | -class _Sentinel: |
64 | | - """A sentinel to denote that no value has been received yet.""" |
65 | | - |
66 | | - def __str__(self) -> str: |
67 | | - """Return a string representation of this sentinel.""" |
68 | | - return "<no value received yet>" |
69 | | - |
70 | | - |
71 | | -class LatestValueCache(typing.Generic[T_co]): |
72 | | - """A cache that stores the latest value in a receiver.""" |
73 | | - |
74 | | - def __init__( |
75 | | - self, receiver: Receiver[T_co], *, unique_id: str | None = None |
76 | | - ) -> None: |
77 | | - """Create a new cache. |
78 | | -
|
79 | | - Args: |
80 | | - receiver: The receiver to cache. |
81 | | - unique_id: A string to help uniquely identify this instance. If not |
82 | | - provided, a unique identifier will be generated from the object's |
83 | | - [`id()`][]. It is used mostly for debugging purposes. |
84 | | - """ |
85 | | - self._receiver = receiver |
86 | | - self._unique_id: str = hex(id(self)) if unique_id is None else unique_id |
87 | | - self._latest_value: T_co | _Sentinel = _Sentinel() |
88 | | - self._task = asyncio.create_task( |
89 | | - self._run(), name=f"LatestValueCache«{self._unique_id}»" |
90 | | - ) |
91 | | - |
92 | | - @property |
93 | | - def unique_id(self) -> str: |
94 | | - """The unique identifier of this instance.""" |
95 | | - return self._unique_id |
96 | | - |
97 | | - def get(self) -> T_co: |
98 | | - """Return the latest value that has been received. |
99 | | -
|
100 | | - This raises a `ValueError` if no value has been received yet. Use `has_value` to |
101 | | - check whether a value has been received yet, before trying to access the value, |
102 | | - to avoid the exception. |
103 | | -
|
104 | | - Returns: |
105 | | - The latest value that has been received. |
106 | | -
|
107 | | - Raises: |
108 | | - ValueError: If no value has been received yet. |
109 | | - """ |
110 | | - if isinstance(self._latest_value, _Sentinel): |
111 | | - raise ValueError("No value has been received yet.") |
112 | | - return self._latest_value |
113 | | - |
114 | | - def has_value(self) -> bool: |
115 | | - """Check whether a value has been received yet. |
116 | | -
|
117 | | - Returns: |
118 | | - `True` if a value has been received, `False` otherwise. |
119 | | - """ |
120 | | - return not isinstance(self._latest_value, _Sentinel) |
121 | | - |
122 | | - async def _run(self) -> None: |
123 | | - async for value in self._receiver: |
124 | | - self._latest_value = value |
125 | | - |
126 | | - async def stop(self) -> None: |
127 | | - """Stop the cache.""" |
128 | | - await cancel_and_await(self._task) |
129 | | - |
130 | | - def __repr__(self) -> str: |
131 | | - """Return a string representation of this cache.""" |
132 | | - return ( |
133 | | - f"<LatestValueCache latest_value={self._latest_value!r}, " |
134 | | - f"receiver={self._receiver!r}, unique_id={self._unique_id!r}>" |
135 | | - ) |
136 | | - |
137 | | - def __str__(self) -> str: |
138 | | - """Return the last value seen by this cache.""" |
139 | | - return str(self._latest_value) |
0 commit comments