@@ -88,10 +88,19 @@ impl Notifier {
8888 /// Wake waiters, tracking that wake needs to occur even if there are currently no waiters.
8989 pub ( crate ) fn notify ( & self ) {
9090 let mut lock = self . notify_pending . lock ( ) . unwrap ( ) ;
91- lock . 0 = true ;
91+ let mut future_probably_generated_calls = false ;
9292 if let Some ( future_state) = lock. 1 . take ( ) {
93- future_state. lock ( ) . unwrap ( ) . complete ( ) ;
93+ future_probably_generated_calls |= future_state. lock ( ) . unwrap ( ) . complete ( ) ;
94+ future_probably_generated_calls |= Arc :: strong_count ( & future_state) > 1 ;
95+ }
96+ if future_probably_generated_calls {
97+ // If a future made some callbacks or has not yet been drop'd (i.e. the state has more
98+ // than the one reference we hold), assume the user was notified and skip setting the
99+ // notification-required flag. This will not cause the `wait` functions above to return
100+ // and avoid any future `Future`s starting in a completed state.
101+ return ;
94102 }
103+ lock. 0 = true ;
95104 mem:: drop ( lock) ;
96105 self . condvar . notify_all ( ) ;
97106 }
@@ -147,11 +156,14 @@ pub(crate) struct FutureState {
147156}
148157
149158impl FutureState {
150- fn complete ( & mut self ) {
159+ fn complete ( & mut self ) -> bool {
160+ let mut made_calls = false ;
151161 for callback in self . callbacks . drain ( ..) {
152162 callback. call ( ) ;
163+ made_calls = true ;
153164 }
154165 self . complete = true ;
166+ made_calls
155167 }
156168}
157169
@@ -231,6 +243,48 @@ mod tests {
231243 assert ! ( callback. load( Ordering :: SeqCst ) ) ;
232244 }
233245
246+ #[ test]
247+ fn notifier_future_completes_wake ( ) {
248+ // Previously, if we were only using the `Future` interface to learn when a `Notifier` has
249+ // been notified, we'd never mark the notifier as not-awaiting-notify. This caused the
250+ // `lightning-background-processor` to persist in a tight loop.
251+ let notifier = Notifier :: new ( ) ;
252+
253+ // First check the simple case, ensuring if we get notified a new future isn't woken until
254+ // a second `notify`.
255+ let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
256+ let callback_ref = Arc :: clone ( & callback) ;
257+ notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
258+ assert ! ( !callback. load( Ordering :: SeqCst ) ) ;
259+
260+ notifier. notify ( ) ;
261+ assert ! ( callback. load( Ordering :: SeqCst ) ) ;
262+
263+ let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
264+ let callback_ref = Arc :: clone ( & callback) ;
265+ notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
266+ assert ! ( !callback. load( Ordering :: SeqCst ) ) ;
267+
268+ notifier. notify ( ) ;
269+ assert ! ( callback. load( Ordering :: SeqCst ) ) ;
270+
271+ // Then check the case where the future is fetched before the notification, but a callback
272+ // is only registered after the `notify`, ensuring that it is still sufficient to ensure we
273+ // don't get an instant-wake when we get a new future.
274+ let future = notifier. get_future ( ) ;
275+ notifier. notify ( ) ;
276+
277+ let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
278+ let callback_ref = Arc :: clone ( & callback) ;
279+ future. register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
280+ assert ! ( callback. load( Ordering :: SeqCst ) ) ;
281+
282+ let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
283+ let callback_ref = Arc :: clone ( & callback) ;
284+ notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
285+ assert ! ( !callback. load( Ordering :: SeqCst ) ) ;
286+ }
287+
234288 #[ cfg( feature = "std" ) ]
235289 #[ test]
236290 fn test_wait_timeout ( ) {
0 commit comments