66
77from fastcs .attributes .attribute import Attribute
88from fastcs .attributes .attribute_io_ref import AttributeIORefT
9+ from fastcs .attributes .util import AttrValuePredicate , PredicateEvent
910from fastcs .datatypes import DataType , DType_T
1011from fastcs .logging import bind_logger
1112
@@ -39,6 +40,8 @@ def __init__(
3940 """Callback to update the value of the attribute with an IO to the source"""
4041 self ._on_update_callbacks : list [AttrOnUpdateCallback [DType_T ]] | None = None
4142 """Callbacks to publish changes to the value of the attribute"""
43+ self ._on_update_events : set [PredicateEvent [DType_T ]] = set ()
44+ """Events to set when the value satisifies some predicate"""
4245
4346 def get (self ) -> DType_T :
4447 """Get the cached value of the attribute."""
@@ -67,6 +70,10 @@ async def update(self, value: Any) -> None:
6770
6871 self ._value = self ._datatype .validate (value )
6972
73+ self ._on_update_events -= {
74+ e for e in self ._on_update_events if e .set (self ._value )
75+ }
76+
7077 if self ._on_update_callbacks is not None :
7178 try :
7279 await asyncio .gather (
@@ -78,6 +85,16 @@ async def update(self, value: Any) -> None:
7885 )
7986 raise
8087
88+ def _register_update_event (self , event : PredicateEvent ):
89+ """Register an event to be set when the value satisfies a predicate
90+
91+ Args:
92+ predicate: The predicate to filter event set by
93+ event: The event to set
94+
95+ """
96+ self ._on_update_events .add (event )
97+
8198 def add_on_update_callback (self , callback : AttrOnUpdateCallback [DType_T ]) -> None :
8299 """Add a callback to be called when the value of the attribute is updated
83100
@@ -115,3 +132,62 @@ async def update_attribute():
115132 raise
116133
117134 return update_attribute
135+
136+ async def wait_for_predicate (
137+ self , predicate : AttrValuePredicate [DType_T ], * , timeout : float
138+ ):
139+ """Wait for the predicate to be satisfied when called with the current value
140+
141+ Args:
142+ predicate: The predicate to test - a callable that takes the attribute
143+ value and returns True if the event should be set
144+ timeout: The timeout in seconds
145+
146+ """
147+ if predicate (self ._value ):
148+ self .log_event (
149+ "Predicate already satisfied" , predicate = predicate , attribute = self
150+ )
151+ return
152+
153+ self ._register_update_event (update_event := PredicateEvent (predicate ))
154+
155+ self .log_event ("Waiting for predicate" , predicate = predicate , attribute = self )
156+ try :
157+ await asyncio .wait_for (update_event .wait (), timeout )
158+ except TimeoutError :
159+ self ._on_update_events .remove (update_event )
160+ raise TimeoutError (
161+ f"Timeout waiting for predicate { predicate } . "
162+ f"Current value: { self ._value } "
163+ ) from None
164+
165+ self .log_event ("Predicate satisfied" , predicate = predicate , attribute = self )
166+
167+ async def wait_for_value (self , value : DType_T , * , timeout : float ):
168+ """Wait for value to equal the required value
169+
170+ Args:
171+ value: The value to wait for
172+ timeout: The timeout in seconds
173+
174+ Raises:
175+ TimeoutError: If the attribute does not reach the required value within the
176+ timeout
177+
178+ """
179+ if self ._value == value :
180+ self .log_event ("Value already equal" , value = value , attribute = self )
181+ return
182+
183+ def predicate (v : DType_T ) -> bool :
184+ return v == value
185+
186+ try :
187+ await self .wait_for_predicate (predicate , timeout = timeout )
188+ except TimeoutError :
189+ raise TimeoutError (
190+ f"Timeout waiting for value { value } . Current value: { self ._value } "
191+ ) from None
192+
193+ self .log_event ("Value equal" , value = value , attribute = self )
0 commit comments