Skip to content

Commit 2246af0

Browse files
committed
Implement the ability to block on multiple futures at once
In the next commits we'll be adding a second notify pipeline - from the `ChainMonitor` back to the background processor. This will cause the `background-processor` to need to await multiple wakers at once, which we cannot do in the current scheme without taking on a full async runtime. Building a multi-future waiter isn't so bad, and notably will allow us to remove the existing sleep pipeline entirely, reducing the complexity of our wakers implementation by only having one notify path to consider.
1 parent 77fbddf commit 2246af0

File tree

2 files changed

+156
-73
lines changed

2 files changed

+156
-73
lines changed

lightning/src/sync/debug_sync.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,24 +37,13 @@ impl Condvar {
3737
Condvar { inner: StdCondvar::new() }
3838
}
3939

40-
pub fn wait<'a, T>(&'a self, guard: MutexGuard<'a, T>) -> LockResult<MutexGuard<'a, T>> {
41-
let mutex: &'a Mutex<T> = guard.mutex;
42-
self.inner.wait(guard.into_inner()).map(|lock| MutexGuard { mutex, lock }).map_err(|_| ())
43-
}
44-
4540
pub fn wait_while<'a, T, F: FnMut(&mut T) -> bool>(&'a self, guard: MutexGuard<'a, T>, condition: F)
4641
-> LockResult<MutexGuard<'a, T>> {
4742
let mutex: &'a Mutex<T> = guard.mutex;
4843
self.inner.wait_while(guard.into_inner(), condition).map(|lock| MutexGuard { mutex, lock })
4944
.map_err(|_| ())
5045
}
5146

52-
#[allow(unused)]
53-
pub fn wait_timeout<'a, T>(&'a self, guard: MutexGuard<'a, T>, dur: Duration) -> LockResult<(MutexGuard<'a, T>, ())> {
54-
let mutex = guard.mutex;
55-
self.inner.wait_timeout(guard.into_inner(), dur).map(|(lock, _)| (MutexGuard { mutex, lock }, ())).map_err(|_| ())
56-
}
57-
5847
#[allow(unused)]
5948
pub fn wait_timeout_while<'a, T, F: FnMut(&mut T) -> bool>(&'a self, guard: MutexGuard<'a, T>, dur: Duration, condition: F)
6049
-> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)> {

lightning/src/util/wakers.rs

Lines changed: 156 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -30,75 +30,22 @@ use core::pin::Pin;
3030
/// Used to signal to one of many waiters that the condition they're waiting on has happened.
3131
pub(crate) struct Notifier {
3232
notify_pending: Mutex<(bool, Option<Arc<Mutex<FutureState>>>)>,
33-
condvar: Condvar,
34-
}
35-
36-
macro_rules! check_woken {
37-
($guard: expr, $retval: expr) => { {
38-
if $guard.0 {
39-
$guard.0 = false;
40-
if $guard.1.as_ref().map(|l| l.lock().unwrap().complete).unwrap_or(false) {
41-
// If we're about to return as woken, and the future state is marked complete, wipe
42-
// the future state and let the next future wait until we get a new notify.
43-
$guard.1.take();
44-
}
45-
return $retval;
46-
}
47-
} }
4833
}
4934

5035
impl Notifier {
5136
pub(crate) fn new() -> Self {
5237
Self {
5338
notify_pending: Mutex::new((false, None)),
54-
condvar: Condvar::new(),
55-
}
56-
}
57-
58-
fn propagate_future_state_to_notify_flag(&self) -> MutexGuard<(bool, Option<Arc<Mutex<FutureState>>>)> {
59-
let mut lock = self.notify_pending.lock().unwrap();
60-
if let Some(existing_state) = &lock.1 {
61-
if existing_state.lock().unwrap().callbacks_made {
62-
// If the existing `FutureState` has completed and actually made callbacks,
63-
// consider the notification flag to have been cleared and reset the future state.
64-
lock.1.take();
65-
lock.0 = false;
66-
}
6739
}
68-
lock
6940
}
7041

7142
pub(crate) fn wait(&self) {
72-
loop {
73-
let mut guard = self.propagate_future_state_to_notify_flag();
74-
check_woken!(guard, ());
75-
guard = self.condvar.wait(guard).unwrap();
76-
check_woken!(guard, ());
77-
}
43+
Sleeper::from_single_future(self.get_future()).wait();
7844
}
7945

8046
#[cfg(any(test, feature = "std"))]
8147
pub(crate) fn wait_timeout(&self, max_wait: Duration) -> bool {
82-
let current_time = Instant::now();
83-
loop {
84-
let mut guard = self.propagate_future_state_to_notify_flag();
85-
check_woken!(guard, true);
86-
guard = self.condvar.wait_timeout(guard, max_wait).unwrap().0;
87-
check_woken!(guard, true);
88-
// Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the
89-
// desired wait time has actually passed, and if not then restart the loop with a reduced wait
90-
// time. Note that this logic can be highly simplified through the use of
91-
// `Condvar::wait_while` and `Condvar::wait_timeout_while`, if and when our MSRV is raised to
92-
// 1.42.0.
93-
let elapsed = current_time.elapsed();
94-
if elapsed >= max_wait {
95-
return false;
96-
}
97-
match max_wait.checked_sub(elapsed) {
98-
None => return false,
99-
Some(_) => continue
100-
}
101-
}
48+
Sleeper::from_single_future(self.get_future()).wait_timeout(max_wait)
10249
}
10350

10451
/// Wake waiters, tracking that wake needs to occur even if there are currently no waiters.
@@ -111,13 +58,19 @@ impl Notifier {
11158
}
11259
}
11360
lock.0 = true;
114-
mem::drop(lock);
115-
self.condvar.notify_all();
11661
}
11762

11863
/// Gets a [`Future`] that will get woken up with any waiters
11964
pub(crate) fn get_future(&self) -> Future {
120-
let mut lock = self.propagate_future_state_to_notify_flag();
65+
let mut lock = self.notify_pending.lock().unwrap();
66+
if let Some(existing_state) = &lock.1 {
67+
if existing_state.lock().unwrap().callbacks_made {
68+
// If the existing `FutureState` has completed and actually made callbacks,
69+
// consider the notification flag to have been cleared and reset the future state.
70+
lock.1.take();
71+
lock.0 = false;
72+
}
73+
}
12174
if let Some(existing_state) = &lock.1 {
12275
Future { state: Arc::clone(&existing_state) }
12376
} else {
@@ -182,6 +135,9 @@ impl FutureState {
182135
}
183136

184137
/// A simple future which can complete once, and calls some callback(s) when it does so.
138+
///
139+
/// Clones can be made and all futures cloned from the same source will complete at the same time.
140+
#[derive(Clone)]
185141
pub struct Future {
186142
state: Arc<Mutex<FutureState>>,
187143
}
@@ -236,6 +192,85 @@ impl<'a> StdFuture for Future {
236192
}
237193
}
238194

195+
/// A struct which can be used to select across many [`Future`]s at once without relying on a full
196+
/// async context.
197+
pub struct Sleeper {
198+
notifiers: Vec<Arc<Mutex<FutureState>>>,
199+
}
200+
201+
impl Sleeper {
202+
/// Constructs a new sleeper from one future, allowing blocking on it.
203+
pub fn from_single_future(future: Future) -> Self {
204+
Self { notifiers: vec![future.state] }
205+
}
206+
/// Constructs a new sleeper from two futures, allowing blocking on both at once.
207+
// Note that this is the common case - a ChannelManager and ChainMonitor.
208+
pub fn from_two_futures(fut_a: Future, fut_b: Future) -> Self {
209+
Self { notifiers: vec![fut_a.state, fut_b.state] }
210+
}
211+
/// Constructs a new sleeper on many futures, allowing blocking on all at once.
212+
pub fn new(futures: Vec<Future>) -> Self {
213+
Self { notifiers: futures.into_iter().map(|f| f.state).collect() }
214+
}
215+
/// Prepares to go into a wait loop body, creating a condition variable which we can block on
216+
/// and an `Arc<Mutex<Option<_>>>` which gets set to the waking `Future`'s state prior to the
217+
/// condition variable being woken.
218+
fn setup_wait(&self) -> (Arc<Condvar>, Arc<Mutex<Option<Arc<Mutex<FutureState>>>>>) {
219+
let cv = Arc::new(Condvar::new());
220+
let notified_fut_mtx = Arc::new(Mutex::new(None));
221+
{
222+
for notifier_mtx in self.notifiers.iter() {
223+
let cv_ref = Arc::clone(&cv);
224+
let notified_ref = Arc::clone(&notified_fut_mtx);
225+
let notifier_ref = Arc::clone(&notifier_mtx);
226+
let mut notifier = notifier_mtx.lock().unwrap();
227+
if notifier.complete {
228+
*notified_fut_mtx.lock().unwrap() = Some(notifier_ref);
229+
break;
230+
}
231+
notifier.callbacks.push((false, Box::new(move || {
232+
*notified_ref.lock().unwrap() = Some(Arc::clone(&notifier_ref));
233+
cv_ref.notify_all();
234+
})));
235+
}
236+
}
237+
(cv, notified_fut_mtx)
238+
}
239+
240+
/// Wait until one of the [`Future`]s registered with this [`Sleeper`] has completed.
241+
pub fn wait(&self) {
242+
let (cv, notified_fut_mtx) = self.setup_wait();
243+
let notified_fut = loop {
244+
let mut notified_fut_lck = cv.wait(notified_fut_mtx.lock().unwrap()).unwrap();
245+
if let Some(notified_fut) = notified_fut_lck.take() {
246+
break notified_fut;
247+
}
248+
};
249+
notified_fut.lock().unwrap().callbacks_made = true;
250+
}
251+
252+
/// Wait until one of the [`Future`]s registered with this [`Sleeper`] has completed or the
253+
/// given amount of time has elapsed. Returns true if a [`Future`] completed, false if the time
254+
/// elapsed.
255+
#[cfg(any(test, feature = "std"))]
256+
pub fn wait_timeout(&self, max_wait: Duration) -> bool {
257+
let start_time = Instant::now();
258+
let (cv, notified_fut_mtx) = self.setup_wait();
259+
let notified_fut = loop {
260+
let wait_duration = max_wait.saturating_sub(start_time.elapsed());
261+
if wait_duration == Duration::from_secs(0) { return false; }
262+
match cv.wait_timeout(notified_fut_mtx.lock().unwrap(), wait_duration) {
263+
Ok((mut notified_fut, _)) if notified_fut.is_some() =>
264+
break notified_fut.take().unwrap(),
265+
Ok((notified_fut_lck, _)) => continue,
266+
Err(_) => panic!("Previous panic while a lock was held led to a lock panic"),
267+
};
268+
};
269+
notified_fut.lock().unwrap().callbacks_made = true;
270+
true
271+
}
272+
}
273+
239274
#[cfg(test)]
240275
mod tests {
241276
use super::*;
@@ -334,10 +369,7 @@ mod tests {
334369
let exit_thread_clone = exit_thread.clone();
335370
thread::spawn(move || {
336371
loop {
337-
let mut lock = thread_notifier.notify_pending.lock().unwrap();
338-
lock.0 = true;
339-
thread_notifier.condvar.notify_all();
340-
372+
thread_notifier.notify();
341373
if exit_thread_clone.load(Ordering::SeqCst) {
342374
break
343375
}
@@ -539,4 +571,66 @@ mod tests {
539571
assert!(woken.load(Ordering::SeqCst));
540572
assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(()));
541573
}
574+
575+
#[test]
576+
fn test_multi_future_sleep() {
577+
// Tests the `Sleeper` with multiple futures.
578+
let notifier_a = Notifier::new();
579+
let notifier_b = Notifier::new();
580+
581+
// Set both notifiers as woken without sleeping yet.
582+
notifier_a.notify();
583+
notifier_b.notify();
584+
Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
585+
586+
// One future has woken us up, but the other should still have a pending notification.
587+
Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
588+
589+
// However once we've slept twice, we should no longer have any pending notifications
590+
assert!(!Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future())
591+
.wait_timeout(Duration::from_millis(10)));
592+
593+
// Test ordering somewhat more.
594+
notifier_a.notify();
595+
Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
596+
}
597+
598+
#[test]
599+
#[cfg(feature = "std")]
600+
fn sleeper_with_pending_callbacks() {
601+
// This is similar to the above `test_multi_future_sleep` test, but in addition registers
602+
// "normal" callbacks which will cause the futures to assume notification has occurred,
603+
// rather than waiting for a woken sleeper.
604+
let notifier_a = Notifier::new();
605+
let notifier_b = Notifier::new();
606+
607+
// Set both notifiers as woken without sleeping yet.
608+
notifier_a.notify();
609+
notifier_b.notify();
610+
611+
// After sleeping one future (not guaranteed which one, however) will have its notification
612+
// bit cleared.
613+
Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
614+
615+
// By registering a callback on the futures for both notifiers, one will complete
616+
// immediately, but one will remain tied to the notifier, and will complete once the
617+
// notifier is next woken, which will be considered the completion of the notification.
618+
let callback_a = Arc::new(AtomicBool::new(false));
619+
let callback_b = Arc::new(AtomicBool::new(false));
620+
let callback_a_ref = Arc::clone(&callback_a);
621+
let callback_b_ref = Arc::clone(&callback_b);
622+
notifier_a.get_future().register_callback(Box::new(move || assert!(!callback_a_ref.fetch_or(true, Ordering::SeqCst))));
623+
notifier_b.get_future().register_callback(Box::new(move || assert!(!callback_b_ref.fetch_or(true, Ordering::SeqCst))));
624+
assert!(callback_a.load(Ordering::SeqCst) ^ callback_b.load(Ordering::SeqCst));
625+
626+
// If we now notify both notifiers again, the other callback will fire, completing the
627+
// notification, and we'll be back to one pending notification.
628+
notifier_a.notify();
629+
notifier_b.notify();
630+
631+
assert!(callback_a.load(Ordering::SeqCst) && callback_b.load(Ordering::SeqCst));
632+
Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future()).wait();
633+
assert!(!Sleeper::from_two_futures(notifier_a.get_future(), notifier_b.get_future())
634+
.wait_timeout(Duration::from_millis(10)));
635+
}
542636
}

0 commit comments

Comments
 (0)