1313//!
1414//! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
1515
16+ use crate :: sync:: Mutex ;
1617use alloc:: sync:: Arc ;
1718use core:: mem;
18- use crate :: sync:: Mutex ;
1919
2020#[ allow( unused_imports) ]
2121use crate :: prelude:: * ;
@@ -26,9 +26,8 @@ use crate::sync::Condvar;
2626use std:: time:: Duration ;
2727
2828use core:: future:: Future as StdFuture ;
29- use core:: task:: { Context , Poll } ;
3029use core:: pin:: Pin ;
31-
30+ use core :: task :: { Context , Poll } ;
3231
3332/// Used to signal to one of many waiters that the condition they're waiting on has happened.
3433pub ( crate ) struct Notifier {
@@ -37,9 +36,7 @@ pub(crate) struct Notifier {
3736
3837impl Notifier {
3938 pub ( crate ) fn new ( ) -> Self {
40- Self {
41- notify_pending : Mutex :: new ( ( false , None ) ) ,
42- }
39+ Self { notify_pending : Mutex :: new ( ( false , None ) ) }
4340 }
4441
4542 /// Wake waiters, tracking that wake needs to occur even if there are currently no waiters.
@@ -198,7 +195,9 @@ impl Future {
198195 if state. complete {
199196 state. callbacks_made = true ;
200197 true
201- } else { false }
198+ } else {
199+ false
200+ }
202201 }
203202}
204203
@@ -251,11 +250,8 @@ impl Sleeper {
251250 // Note that this is the common case - a ChannelManager, a ChainMonitor, and an
252251 // OnionMessenger.
253252 pub fn from_three_futures ( fut_a : & Future , fut_b : & Future , fut_c : & Future ) -> Self {
254- let notifiers = vec ! [
255- Arc :: clone( & fut_a. state) ,
256- Arc :: clone( & fut_b. state) ,
257- Arc :: clone( & fut_c. state)
258- ] ;
253+ let notifiers =
254+ vec ! [ Arc :: clone( & fut_a. state) , Arc :: clone( & fut_b. state) , Arc :: clone( & fut_c. state) ] ;
259255 Self { notifiers }
260256 }
261257 /// Constructs a new sleeper on many futures, allowing blocking on all at once.
@@ -289,8 +285,11 @@ impl Sleeper {
289285 /// Wait until one of the [`Future`]s registered with this [`Sleeper`] has completed.
290286 pub fn wait ( & self ) {
291287 let ( cv, notified_fut_mtx) = self . setup_wait ( ) ;
292- let notified_fut = cv. wait_while ( notified_fut_mtx. lock ( ) . unwrap ( ) , |fut_opt| fut_opt. is_none ( ) )
293- . unwrap ( ) . take ( ) . expect ( "CV wait shouldn't have returned until the notifying future was set" ) ;
288+ let notified_fut = cv
289+ . wait_while ( notified_fut_mtx. lock ( ) . unwrap ( ) , |fut_opt| fut_opt. is_none ( ) )
290+ . unwrap ( )
291+ . take ( )
292+ . expect ( "CV wait shouldn't have returned until the notifying future was set" ) ;
294293 notified_fut. lock ( ) . unwrap ( ) . callbacks_made = true ;
295294 }
296295
@@ -300,10 +299,13 @@ impl Sleeper {
300299 pub fn wait_timeout ( & self , max_wait : Duration ) -> bool {
301300 let ( cv, notified_fut_mtx) = self . setup_wait ( ) ;
302301 let notified_fut =
303- match cv. wait_timeout_while ( notified_fut_mtx. lock ( ) . unwrap ( ) , max_wait, |fut_opt| fut_opt. is_none ( ) ) {
302+ match cv. wait_timeout_while ( notified_fut_mtx. lock ( ) . unwrap ( ) , max_wait, |fut_opt| {
303+ fut_opt. is_none ( )
304+ } ) {
304305 Ok ( ( _, e) ) if e. timed_out ( ) => return false ,
305- Ok ( ( mut notified_fut, _) ) =>
306- notified_fut. take ( ) . expect ( "CV wait shouldn't have returned until the notifying future was set" ) ,
306+ Ok ( ( mut notified_fut, _) ) => notified_fut
307+ . take ( )
308+ . expect ( "CV wait shouldn't have returned until the notifying future was set" ) ,
307309 Err ( _) => panic ! ( "Previous panic while a lock was held led to a lock panic" ) ,
308310 } ;
309311 notified_fut. lock ( ) . unwrap ( ) . callbacks_made = true ;
@@ -314,8 +316,8 @@ impl Sleeper {
314316#[ cfg( test) ]
315317mod tests {
316318 use super :: * ;
317- use core:: sync:: atomic:: { AtomicBool , Ordering } ;
318319 use core:: future:: Future as FutureTrait ;
320+ use core:: sync:: atomic:: { AtomicBool , Ordering } ;
319321 use core:: task:: { RawWaker , RawWakerVTable } ;
320322
321323 #[ test]
@@ -328,7 +330,9 @@ mod tests {
328330
329331 let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
330332 let callback_ref = Arc :: clone ( & callback) ;
331- notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
333+ notifier. get_future ( ) . register_callback ( Box :: new ( move || {
334+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
335+ } ) ) ;
332336 assert ! ( callback. load( Ordering :: SeqCst ) ) ;
333337 }
334338
@@ -343,15 +347,19 @@ mod tests {
343347 // a second `notify`.
344348 let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
345349 let callback_ref = Arc :: clone ( & callback) ;
346- notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
350+ notifier. get_future ( ) . register_callback ( Box :: new ( move || {
351+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
352+ } ) ) ;
347353 assert ! ( !callback. load( Ordering :: SeqCst ) ) ;
348354
349355 notifier. notify ( ) ;
350356 assert ! ( callback. load( Ordering :: SeqCst ) ) ;
351357
352358 let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
353359 let callback_ref = Arc :: clone ( & callback) ;
354- notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
360+ notifier. get_future ( ) . register_callback ( Box :: new ( move || {
361+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
362+ } ) ) ;
355363 assert ! ( !callback. load( Ordering :: SeqCst ) ) ;
356364
357365 notifier. notify ( ) ;
@@ -365,12 +373,16 @@ mod tests {
365373
366374 let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
367375 let callback_ref = Arc :: clone ( & callback) ;
368- future. register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
376+ future. register_callback ( Box :: new ( move || {
377+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
378+ } ) ) ;
369379 assert ! ( callback. load( Ordering :: SeqCst ) ) ;
370380
371381 let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
372382 let callback_ref = Arc :: clone ( & callback) ;
373- notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
383+ notifier. get_future ( ) . register_callback ( Box :: new ( move || {
384+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
385+ } ) ) ;
374386 assert ! ( !callback. load( Ordering :: SeqCst ) ) ;
375387 }
376388
@@ -384,12 +396,16 @@ mod tests {
384396
385397 let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
386398 let callback_ref = Arc :: clone ( & callback) ;
387- notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
399+ notifier. get_future ( ) . register_callback ( Box :: new ( move || {
400+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
401+ } ) ) ;
388402 assert ! ( callback. load( Ordering :: SeqCst ) ) ;
389403
390404 let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
391405 let callback_ref = Arc :: clone ( & callback) ;
392- notifier. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
406+ notifier. get_future ( ) . register_callback ( Box :: new ( move || {
407+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
408+ } ) ) ;
393409 assert ! ( !callback. load( Ordering :: SeqCst ) ) ;
394410
395411 notifier. notify ( ) ;
@@ -407,12 +423,10 @@ mod tests {
407423
408424 let exit_thread = Arc :: new ( AtomicBool :: new ( false ) ) ;
409425 let exit_thread_clone = exit_thread. clone ( ) ;
410- thread:: spawn ( move || {
411- loop {
412- thread_notifier. notify ( ) ;
413- if exit_thread_clone. load ( Ordering :: SeqCst ) {
414- break
415- }
426+ thread:: spawn ( move || loop {
427+ thread_notifier. notify ( ) ;
428+ if exit_thread_clone. load ( Ordering :: SeqCst ) {
429+ break ;
416430 }
417431 } ) ;
418432
@@ -423,7 +437,7 @@ mod tests {
423437 // available.
424438 loop {
425439 if persistence_notifier. get_future ( ) . wait_timeout ( Duration :: from_millis ( 100 ) ) {
426- break
440+ break ;
427441 }
428442 }
429443
@@ -433,7 +447,7 @@ mod tests {
433447 // are available.
434448 loop {
435449 if !persistence_notifier. get_future ( ) . wait_timeout ( Duration :: from_millis ( 100 ) ) {
436- break
450+ break ;
437451 }
438452 }
439453 }
@@ -493,7 +507,9 @@ mod tests {
493507 } ;
494508 let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
495509 let callback_ref = Arc :: clone ( & callback) ;
496- future. register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
510+ future. register_callback ( Box :: new ( move || {
511+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
512+ } ) ) ;
497513
498514 assert ! ( !callback. load( Ordering :: SeqCst ) ) ;
499515 complete_future ( & future. state ) ;
@@ -518,7 +534,9 @@ mod tests {
518534
519535 let callback = Arc :: new ( AtomicBool :: new ( false ) ) ;
520536 let callback_ref = Arc :: clone ( & callback) ;
521- future. register_callback ( Box :: new ( move || assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
537+ future. register_callback ( Box :: new ( move || {
538+ assert ! ( !callback_ref. fetch_or( true , Ordering :: SeqCst ) )
539+ } ) ) ;
522540
523541 assert ! ( callback. load( Ordering :: SeqCst ) ) ;
524542 assert ! ( future. state. lock( ) . unwrap( ) . callbacks. is_empty( ) ) ;
@@ -529,17 +547,27 @@ mod tests {
529547 // compared to a raw VTable). Instead, we have to write out a lot of boilerplate to build a
530548 // waker, which we do here with a trivial Arc<AtomicBool> data element to track woke-ness.
531549 const WAKER_V_TABLE : RawWakerVTable = RawWakerVTable :: new ( waker_clone, wake, wake_by_ref, drop) ;
532- unsafe fn wake_by_ref ( ptr : * const ( ) ) { let p = ptr as * const Arc < AtomicBool > ; assert ! ( !( * p) . fetch_or( true , Ordering :: SeqCst ) ) ; }
533- unsafe fn drop ( ptr : * const ( ) ) { let p = ptr as * mut Arc < AtomicBool > ; let _freed = Box :: from_raw ( p) ; }
534- unsafe fn wake ( ptr : * const ( ) ) { wake_by_ref ( ptr) ; drop ( ptr) ; }
550+ unsafe fn wake_by_ref ( ptr : * const ( ) ) {
551+ let p = ptr as * const Arc < AtomicBool > ;
552+ assert ! ( !( * p) . fetch_or( true , Ordering :: SeqCst ) ) ;
553+ }
554+ unsafe fn drop ( ptr : * const ( ) ) {
555+ let p = ptr as * mut Arc < AtomicBool > ;
556+ let _freed = Box :: from_raw ( p) ;
557+ }
558+ unsafe fn wake ( ptr : * const ( ) ) {
559+ wake_by_ref ( ptr) ;
560+ drop ( ptr) ;
561+ }
535562 unsafe fn waker_clone ( ptr : * const ( ) ) -> RawWaker {
536563 let p = ptr as * const Arc < AtomicBool > ;
537564 RawWaker :: new ( Box :: into_raw ( Box :: new ( Arc :: clone ( & * p) ) ) as * const ( ) , & WAKER_V_TABLE )
538565 }
539566
540567 fn create_waker ( ) -> ( Arc < AtomicBool > , Waker ) {
541568 let a = Arc :: new ( AtomicBool :: new ( false ) ) ;
542- let waker = unsafe { Waker :: from_raw ( waker_clone ( ( & a as * const Arc < AtomicBool > ) as * const ( ) ) ) } ;
569+ let waker =
570+ unsafe { Waker :: from_raw ( waker_clone ( ( & a as * const Arc < AtomicBool > ) as * const ( ) ) ) } ;
543571 ( a, waker)
544572 }
545573
@@ -563,14 +591,20 @@ mod tests {
563591 assert ! ( !woken. load( Ordering :: SeqCst ) ) ;
564592
565593 let ( second_woken, second_waker) = create_waker ( ) ;
566- assert_eq ! ( Pin :: new( & mut second_future) . poll( & mut Context :: from_waker( & second_waker) ) , Poll :: Pending ) ;
594+ assert_eq ! (
595+ Pin :: new( & mut second_future) . poll( & mut Context :: from_waker( & second_waker) ) ,
596+ Poll :: Pending
597+ ) ;
567598 assert ! ( !second_woken. load( Ordering :: SeqCst ) ) ;
568599
569600 complete_future ( & future. state ) ;
570601 assert ! ( woken. load( Ordering :: SeqCst ) ) ;
571602 assert ! ( second_woken. load( Ordering :: SeqCst ) ) ;
572603 assert_eq ! ( Pin :: new( & mut future) . poll( & mut Context :: from_waker( & waker) ) , Poll :: Ready ( ( ) ) ) ;
573- assert_eq ! ( Pin :: new( & mut second_future) . poll( & mut Context :: from_waker( & second_waker) ) , Poll :: Ready ( ( ) ) ) ;
604+ assert_eq ! (
605+ Pin :: new( & mut second_future) . poll( & mut Context :: from_waker( & second_waker) ) ,
606+ Poll :: Ready ( ( ) )
607+ ) ;
574608 }
575609
576610 #[ test]
@@ -713,8 +747,12 @@ mod tests {
713747 let callback_b = Arc :: new ( AtomicBool :: new ( false ) ) ;
714748 let callback_a_ref = Arc :: clone ( & callback_a) ;
715749 let callback_b_ref = Arc :: clone ( & callback_b) ;
716- notifier_a. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_a_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
717- notifier_b. get_future ( ) . register_callback ( Box :: new ( move || assert ! ( !callback_b_ref. fetch_or( true , Ordering :: SeqCst ) ) ) ) ;
750+ notifier_a. get_future ( ) . register_callback ( Box :: new ( move || {
751+ assert ! ( !callback_a_ref. fetch_or( true , Ordering :: SeqCst ) )
752+ } ) ) ;
753+ notifier_b. get_future ( ) . register_callback ( Box :: new ( move || {
754+ assert ! ( !callback_b_ref. fetch_or( true , Ordering :: SeqCst ) )
755+ } ) ) ;
718756 assert ! ( callback_a. load( Ordering :: SeqCst ) ^ callback_b. load( Ordering :: SeqCst ) ) ;
719757
720758 // If we now notify both notifiers again, the other callback will fire, completing the
@@ -739,14 +777,23 @@ mod tests {
739777
740778 // Test that simply polling a future twice doesn't result in two pending `Waker`s.
741779 let mut future_a = notifier. get_future ( ) ;
742- assert_eq ! ( Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) , Poll :: Pending ) ;
780+ assert_eq ! (
781+ Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) ,
782+ Poll :: Pending
783+ ) ;
743784 assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 1 ) ;
744- assert_eq ! ( Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) , Poll :: Pending ) ;
785+ assert_eq ! (
786+ Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) ,
787+ Poll :: Pending
788+ ) ;
745789 assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 1 ) ;
746790
747791 // If we poll a second future, however, that will store a second `Waker`.
748792 let mut future_b = notifier. get_future ( ) ;
749- assert_eq ! ( Pin :: new( & mut future_b) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) , Poll :: Pending ) ;
793+ assert_eq ! (
794+ Pin :: new( & mut future_b) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) ,
795+ Poll :: Pending
796+ ) ;
750797 assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 2 ) ;
751798
752799 // but when we drop the `Future`s, the pending Wakers will also be dropped.
@@ -757,13 +804,22 @@ mod tests {
757804
758805 // Further, after polling a future twice, if the notifier is woken all Wakers are dropped.
759806 let mut future_a = notifier. get_future ( ) ;
760- assert_eq ! ( Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) , Poll :: Pending ) ;
807+ assert_eq ! (
808+ Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) ,
809+ Poll :: Pending
810+ ) ;
761811 assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 1 ) ;
762- assert_eq ! ( Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) , Poll :: Pending ) ;
812+ assert_eq ! (
813+ Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) ,
814+ Poll :: Pending
815+ ) ;
763816 assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 1 ) ;
764817 notifier. notify ( ) ;
765818 assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 0 ) ;
766- assert_eq ! ( Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) , Poll :: Ready ( ( ) ) ) ;
819+ assert_eq ! (
820+ Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) ,
821+ Poll :: Ready ( ( ) )
822+ ) ;
767823 assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 0 ) ;
768824 }
769825}
0 commit comments