Skip to content

Commit 0b9e09e

Browse files
committed
Introduce next_event_async allowing to poll event queue
We implement a way to asynchronously poll the queue for new events, providing an async alternative to `wait_next_event`.
1 parent 741e706 commit 0b9e09e

File tree

2 files changed

+51
-5
lines changed

2 files changed

+51
-5
lines changed

src/event.rs

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
2626
use bitcoin::blockdata::locktime::absolute::LockTime;
2727
use bitcoin::secp256k1::PublicKey;
2828
use bitcoin::OutPoint;
29+
use core::future::Future;
30+
use core::task::{Poll, Waker};
2931
use rand::{thread_rng, Rng};
3032
use std::collections::VecDeque;
3133
use std::ops::Deref;
@@ -125,7 +127,8 @@ pub struct EventQueue<K: KVStore + Sync + Send, L: Deref>
125127
where
126128
L::Target: Logger,
127129
{
128-
queue: Mutex<VecDeque<Event>>,
130+
queue: Arc<Mutex<VecDeque<Event>>>,
131+
waker: Arc<Mutex<Option<Waker>>>,
129132
notifier: Condvar,
130133
kv_store: Arc<K>,
131134
logger: L,
@@ -136,9 +139,10 @@ where
136139
L::Target: Logger,
137140
{
138141
pub(crate) fn new(kv_store: Arc<K>, logger: L) -> Self {
139-
let queue: Mutex<VecDeque<Event>> = Mutex::new(VecDeque::new());
142+
let queue = Arc::new(Mutex::new(VecDeque::new()));
143+
let waker = Arc::new(Mutex::new(None));
140144
let notifier = Condvar::new();
141-
Self { queue, notifier, kv_store, logger }
145+
Self { queue, waker, notifier, kv_store, logger }
142146
}
143147

144148
pub(crate) fn add_event(&self, event: Event) -> Result<(), Error> {
@@ -149,6 +153,10 @@ where
149153
}
150154

151155
self.notifier.notify_one();
156+
157+
if let Some(waker) = self.waker.lock().unwrap().take() {
158+
waker.wake();
159+
}
152160
Ok(())
153161
}
154162

@@ -157,6 +165,10 @@ where
157165
locked_queue.front().map(|e| e.clone())
158166
}
159167

168+
pub(crate) async fn next_event_async(&self) -> Event {
169+
EventFuture { event_queue: Arc::clone(&self.queue), waker: Arc::clone(&self.waker) }.await
170+
}
171+
160172
pub(crate) fn wait_next_event(&self) -> Event {
161173
let locked_queue =
162174
self.notifier.wait_while(self.queue.lock().unwrap(), |queue| queue.is_empty()).unwrap();
@@ -170,6 +182,10 @@ where
170182
self.persist_queue(&locked_queue)?;
171183
}
172184
self.notifier.notify_one();
185+
186+
if let Some(waker) = self.waker.lock().unwrap().take() {
187+
waker.wake();
188+
}
173189
Ok(())
174190
}
175191

@@ -207,9 +223,10 @@ where
207223
) -> Result<Self, lightning::ln::msgs::DecodeError> {
208224
let (kv_store, logger) = args;
209225
let read_queue: EventQueueDeserWrapper = Readable::read(reader)?;
210-
let queue: Mutex<VecDeque<Event>> = Mutex::new(read_queue.0);
226+
let queue = Arc::new(Mutex::new(read_queue.0));
227+
let waker = Arc::new(Mutex::new(None));
211228
let notifier = Condvar::new();
212-
Ok(Self { queue, notifier, kv_store, logger })
229+
Ok(Self { queue, waker, notifier, kv_store, logger })
213230
}
214231
}
215232

@@ -240,6 +257,26 @@ impl Writeable for EventQueueSerWrapper<'_> {
240257
}
241258
}
242259

260+
struct EventFuture {
261+
event_queue: Arc<Mutex<VecDeque<Event>>>,
262+
waker: Arc<Mutex<Option<Waker>>>,
263+
}
264+
265+
impl Future for EventFuture {
266+
type Output = Event;
267+
268+
fn poll(
269+
self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>,
270+
) -> core::task::Poll<Self::Output> {
271+
if let Some(event) = self.event_queue.lock().unwrap().front() {
272+
Poll::Ready(event.clone())
273+
} else {
274+
*self.waker.lock().unwrap() = Some(cx.waker().clone());
275+
Poll::Pending
276+
}
277+
}
278+
}
279+
243280
pub(crate) struct EventHandler<K: KVStore + Sync + Send, L: Deref>
244281
where
245282
L::Target: Logger,

src/lib.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -811,6 +811,15 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
811811
self.event_queue.next_event()
812812
}
813813

814+
/// Returns the next event in the event queue.
815+
///
816+
/// Will asynchronously poll the event queue until the next event is ready.
817+
///
818+
/// **Note:** this will always return the same event until handling is confirmed via [`Node::event_handled`].
819+
pub async fn next_event_async(&self) -> Event {
820+
self.event_queue.next_event_async().await
821+
}
822+
814823
/// Returns the next event in the event queue.
815824
///
816825
/// Will block the current thread until the next event is available.

0 commit comments

Comments
 (0)