Skip to content

Commit ba7f058

Browse files
committed
broker: topic: add topic.wait_for(value) convenience method
This convenience method allows you to wait until a topic holds a specified value. Signed-off-by: Leonard Göhrs <[email protected]>
1 parent 5445b24 commit ba7f058

File tree

1 file changed

+13
-0
lines changed

1 file changed

+13
-0
lines changed

src/broker/topic.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,19 @@ impl<E: Serialize + DeserializeOwned + Clone + PartialEq> Topic<E> {
336336

337337
self.modify(|prev| if prev != msg { msg } else { None });
338338
}
339+
340+
/// Wait until the topic is set to the specified value
341+
#[allow(dead_code)]
342+
pub async fn wait_for(self: &Arc<Self>, val: E) {
343+
let (mut stream, sub) = self.clone().subscribe_unbounded();
344+
345+
// Unwrap here to keep the interface simple. The stream could only yield
346+
// None if the sender side is dropped, which will not happen as we hold
347+
// an Arc to self which contains the senders vec.
348+
while stream.next().await.unwrap() != val {}
349+
350+
sub.unsubscribe()
351+
}
339352
}
340353

341354
impl<E: Serialize + DeserializeOwned + Clone + Not + Not<Output = E>> Topic<E> {

0 commit comments

Comments
 (0)