|
6 | 6 |
|
7 | 7 | from fastcs.attributes.attribute import Attribute |
8 | 8 | from fastcs.attributes.attribute_io_ref import AttributeIORefT |
| 9 | +from fastcs.attributes.util import AttrValuePredicate, PredicateEvent |
9 | 10 | from fastcs.datatypes import DataType, DType_T |
10 | 11 | from fastcs.logging import bind_logger |
11 | 12 |
|
@@ -39,6 +40,8 @@ def __init__( |
39 | 40 | """Callback to update the value of the attribute with an IO to the source""" |
40 | 41 | self._on_update_callbacks: list[AttrOnUpdateCallback[DType_T]] | None = None |
41 | 42 | """Callbacks to publish changes to the value of the attribute""" |
| 43 | + self._on_update_events: set[PredicateEvent[DType_T]] = set() |
| 44 | + """Events to set when the value satisifies some predicate""" |
42 | 45 |
|
43 | 46 | def get(self) -> DType_T: |
44 | 47 | """Get the cached value of the attribute.""" |
@@ -67,6 +70,10 @@ async def update(self, value: Any) -> None: |
67 | 70 |
|
68 | 71 | self._value = self._datatype.validate(value) |
69 | 72 |
|
| 73 | + self._on_update_events -= { |
| 74 | + e for e in self._on_update_events if e.set(self._value) |
| 75 | + } |
| 76 | + |
70 | 77 | if self._on_update_callbacks is not None: |
71 | 78 | try: |
72 | 79 | await asyncio.gather( |
@@ -115,3 +122,62 @@ async def update_attribute(): |
115 | 122 | raise |
116 | 123 |
|
117 | 124 | return update_attribute |
| 125 | + |
| 126 | + async def wait_for_predicate( |
| 127 | + self, predicate: AttrValuePredicate[DType_T], *, timeout: float |
| 128 | + ): |
| 129 | + """Wait for the predicate to be satisfied when called with the current value |
| 130 | +
|
| 131 | + Args: |
| 132 | + predicate: The predicate to test - a callable that takes the attribute |
| 133 | + value and returns True if the event should be set |
| 134 | + timeout: The timeout in seconds |
| 135 | +
|
| 136 | + """ |
| 137 | + if predicate(self._value): |
| 138 | + self.log_event( |
| 139 | + "Predicate already satisfied", predicate=predicate, attribute=self |
| 140 | + ) |
| 141 | + return |
| 142 | + |
| 143 | + self._on_update_events.add(update_event := PredicateEvent(predicate)) |
| 144 | + |
| 145 | + self.log_event("Waiting for predicate", predicate=predicate, attribute=self) |
| 146 | + try: |
| 147 | + await asyncio.wait_for(update_event.wait(), timeout) |
| 148 | + except TimeoutError: |
| 149 | + self._on_update_events.remove(update_event) |
| 150 | + raise TimeoutError( |
| 151 | + f"Timeout waiting for predicate {predicate}. " |
| 152 | + f"Current value: {self._value}" |
| 153 | + ) from None |
| 154 | + |
| 155 | + self.log_event("Predicate satisfied", predicate=predicate, attribute=self) |
| 156 | + |
| 157 | + async def wait_for_value(self, value: DType_T, *, timeout: float): |
| 158 | + """Wait for value to equal the required value |
| 159 | +
|
| 160 | + Args: |
| 161 | + value: The value to wait for |
| 162 | + timeout: The timeout in seconds |
| 163 | +
|
| 164 | + Raises: |
| 165 | + TimeoutError: If the attribute does not reach the required value within the |
| 166 | + timeout |
| 167 | +
|
| 168 | + """ |
| 169 | + if self._value == value: |
| 170 | + self.log_event("Value already equal", value=value, attribute=self) |
| 171 | + return |
| 172 | + |
| 173 | + def predicate(v: DType_T) -> bool: |
| 174 | + return v == value |
| 175 | + |
| 176 | + try: |
| 177 | + await self.wait_for_predicate(predicate, timeout=timeout) |
| 178 | + except TimeoutError: |
| 179 | + raise TimeoutError( |
| 180 | + f"Timeout waiting for value {value}. Current value: {self._value}" |
| 181 | + ) from None |
| 182 | + |
| 183 | + self.log_event("Value equal", value=value, attribute=self) |
0 commit comments