@@ -443,9 +443,117 @@ pub(crate) mod futures_util {
443443 pub ( crate ) fn dummy_waker ( ) -> Waker {
444444 unsafe { Waker :: from_raw ( RawWaker :: new ( core:: ptr:: null ( ) , & DUMMY_WAKER_VTABLE ) ) }
445445 }
446+
447+ enum JoinerResult < E , F : Future < Output = Result < ( ) , E > > + Unpin > {
448+ Pending ( Option < F > ) ,
449+ Ready ( Result < ( ) , E > ) ,
450+ }
451+
452+ pub ( crate ) struct Joiner <
453+ E ,
454+ A : Future < Output = Result < ( ) , E > > + Unpin ,
455+ B : Future < Output = Result < ( ) , E > > + Unpin ,
456+ C : Future < Output = Result < ( ) , E > > + Unpin ,
457+ D : Future < Output = Result < ( ) , E > > + Unpin ,
458+ > {
459+ a : JoinerResult < E , A > ,
460+ b : JoinerResult < E , B > ,
461+ c : JoinerResult < E , C > ,
462+ d : JoinerResult < E , D > ,
463+ }
464+
465+ impl <
466+ E ,
467+ A : Future < Output = Result < ( ) , E > > + Unpin ,
468+ B : Future < Output = Result < ( ) , E > > + Unpin ,
469+ C : Future < Output = Result < ( ) , E > > + Unpin ,
470+ D : Future < Output = Result < ( ) , E > > + Unpin ,
471+ > Joiner < E , A , B , C , D >
472+ {
473+ pub ( crate ) fn new ( ) -> Self {
474+ Self {
475+ a : JoinerResult :: Pending ( None ) ,
476+ b : JoinerResult :: Pending ( None ) ,
477+ c : JoinerResult :: Pending ( None ) ,
478+ d : JoinerResult :: Pending ( None ) ,
479+ }
480+ }
481+
482+ pub ( crate ) fn set_a ( & mut self , fut : A ) {
483+ self . a = JoinerResult :: Pending ( Some ( fut) ) ;
484+ }
485+ pub ( crate ) fn set_b ( & mut self , fut : B ) {
486+ self . b = JoinerResult :: Pending ( Some ( fut) ) ;
487+ }
488+ pub ( crate ) fn set_c ( & mut self , fut : C ) {
489+ self . c = JoinerResult :: Pending ( Some ( fut) ) ;
490+ }
491+ pub ( crate ) fn set_d ( & mut self , fut : D ) {
492+ self . d = JoinerResult :: Pending ( Some ( fut) ) ;
493+ }
494+ }
495+
496+ impl <
497+ E ,
498+ A : Future < Output = Result < ( ) , E > > + Unpin ,
499+ B : Future < Output = Result < ( ) , E > > + Unpin ,
500+ C : Future < Output = Result < ( ) , E > > + Unpin ,
501+ D : Future < Output = Result < ( ) , E > > + Unpin ,
502+ > Future for Joiner < E , A , B , C , D >
503+ where
504+ Joiner < E , A , B , C , D > : Unpin ,
505+ {
506+ type Output = [ Result < ( ) , E > ; 4 ] ;
507+ fn poll ( mut self : Pin < & mut Self > , ctx : & mut core:: task:: Context < ' _ > ) -> Poll < Self :: Output > {
508+ let mut all_complete = true ;
509+ macro_rules! handle {
510+ ( $val: ident) => {
511+ match & mut ( self . $val) {
512+ JoinerResult :: Pending ( None ) => {
513+ self . $val = JoinerResult :: Ready ( Ok ( ( ) ) ) ;
514+ } ,
515+ JoinerResult :: <E , _>:: Pending ( Some ( ref mut val) ) => {
516+ match Pin :: new( val) . poll( ctx) {
517+ Poll :: Ready ( res) => {
518+ self . $val = JoinerResult :: Ready ( res) ;
519+ } ,
520+ Poll :: Pending => {
521+ all_complete = false ;
522+ } ,
523+ }
524+ } ,
525+ JoinerResult :: Ready ( _) => { } ,
526+ }
527+ } ;
528+ }
529+ handle ! ( a) ;
530+ handle ! ( b) ;
531+ handle ! ( c) ;
532+ handle ! ( d) ;
533+
534+ if all_complete {
535+ let mut res = [ Ok ( ( ) ) , Ok ( ( ) ) , Ok ( ( ) ) , Ok ( ( ) ) ] ;
536+ if let JoinerResult :: Ready ( ref mut val) = & mut self . a {
537+ core:: mem:: swap ( & mut res[ 0 ] , val) ;
538+ }
539+ if let JoinerResult :: Ready ( ref mut val) = & mut self . b {
540+ core:: mem:: swap ( & mut res[ 1 ] , val) ;
541+ }
542+ if let JoinerResult :: Ready ( ref mut val) = & mut self . c {
543+ core:: mem:: swap ( & mut res[ 2 ] , val) ;
544+ }
545+ if let JoinerResult :: Ready ( ref mut val) = & mut self . d {
546+ core:: mem:: swap ( & mut res[ 3 ] , val) ;
547+ }
548+ Poll :: Ready ( res)
549+ } else {
550+ Poll :: Pending
551+ }
552+ }
553+ }
446554}
447555use core:: task;
448- use futures_util:: { dummy_waker, OptionalSelector , Selector , SelectorOutput } ;
556+ use futures_util:: { dummy_waker, Joiner , OptionalSelector , Selector , SelectorOutput } ;
449557
450558/// Processes background events in a future.
451559///
@@ -812,16 +920,25 @@ where
812920 Some ( true ) => break ,
813921 None => { } ,
814922 }
923+
924+ let mut futures = Joiner :: new ( ) ;
925+
815926 if channel_manager. get_cm ( ) . get_and_clear_needs_persistence ( ) {
816927 log_trace ! ( logger, "Persisting ChannelManager..." ) ;
817- kv_store
818- . write (
819- CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
820- CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE ,
821- CHANNEL_MANAGER_PERSISTENCE_KEY ,
822- & channel_manager. get_cm ( ) . encode ( ) ,
823- )
824- . await ?;
928+
929+ let fut = async {
930+ kv_store
931+ . write (
932+ CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
933+ CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE ,
934+ CHANNEL_MANAGER_PERSISTENCE_KEY ,
935+ & channel_manager. get_cm ( ) . encode ( ) ,
936+ )
937+ . await
938+ } ;
939+ // TODO: Once our MSRV is 1.68 we should be able to drop the Box
940+ futures. set_a ( Box :: pin ( fut) ) ;
941+
825942 log_trace ! ( logger, "Done persisting ChannelManager." ) ;
826943 }
827944
@@ -854,17 +971,25 @@ where
854971 log_warn ! ( logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually." ) ;
855972 log_trace ! ( logger, "Persisting network graph." ) ;
856973 }
857- if let Err ( e) = kv_store
858- . write (
859- NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE ,
860- NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE ,
861- NETWORK_GRAPH_PERSISTENCE_KEY ,
862- & network_graph. encode ( ) ,
863- )
864- . await
865- {
866- log_error ! ( logger, "Error: Failed to persist network graph, check your disk and permissions {}" , e) ;
867- }
974+ let fut = async {
975+ if let Err ( e) = kv_store
976+ . write (
977+ NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE ,
978+ NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE ,
979+ NETWORK_GRAPH_PERSISTENCE_KEY ,
980+ & network_graph. encode ( ) ,
981+ )
982+ . await
983+ {
984+ log_error ! ( logger, "Error: Failed to persist network graph, check your disk and permissions {}" , e) ;
985+ }
986+
987+ Ok ( ( ) )
988+ } ;
989+
990+ // TODO: Once our MSRV is 1.68 we should be able to drop the Box
991+ futures. set_b ( Box :: pin ( fut) ) ;
992+
868993 have_pruned = true ;
869994 }
870995 let prune_timer =
@@ -889,21 +1014,28 @@ where
8891014 } else {
8901015 log_trace ! ( logger, "Persisting scorer" ) ;
8911016 }
892- if let Err ( e) = kv_store
893- . write (
894- SCORER_PERSISTENCE_PRIMARY_NAMESPACE ,
895- SCORER_PERSISTENCE_SECONDARY_NAMESPACE ,
896- SCORER_PERSISTENCE_KEY ,
897- & scorer. encode ( ) ,
898- )
899- . await
900- {
901- log_error ! (
1017+ let fut = async {
1018+ if let Err ( e) = kv_store
1019+ . write (
1020+ SCORER_PERSISTENCE_PRIMARY_NAMESPACE ,
1021+ SCORER_PERSISTENCE_SECONDARY_NAMESPACE ,
1022+ SCORER_PERSISTENCE_KEY ,
1023+ & scorer. encode ( ) ,
1024+ )
1025+ . await
1026+ {
1027+ log_error ! (
9021028 logger,
9031029 "Error: Failed to persist scorer, check your disk and permissions {}" ,
9041030 e
9051031 ) ;
906- }
1032+ }
1033+
1034+ Ok ( ( ) )
1035+ } ;
1036+
1037+ // TODO: Once our MSRV is 1.68 we should be able to drop the Box
1038+ futures. set_c ( Box :: pin ( fut) ) ;
9071039 }
9081040 last_scorer_persist_call = sleeper ( SCORER_PERSIST_TIMER ) ;
9091041 } ,
@@ -914,13 +1046,26 @@ where
9141046 Some ( false ) => {
9151047 log_trace ! ( logger, "Regenerating sweeper spends if necessary" ) ;
9161048 if let Some ( ref sweeper) = sweeper {
917- let _ = sweeper. regenerate_and_broadcast_spend_if_necessary ( ) . await ;
1049+ let fut = async {
1050+ let _ = sweeper. regenerate_and_broadcast_spend_if_necessary ( ) . await ;
1051+
1052+ Ok ( ( ) )
1053+ } ;
1054+
1055+ // TODO: Once our MSRV is 1.68 we should be able to drop the Box
1056+ futures. set_d ( Box :: pin ( fut) ) ;
9181057 }
9191058 last_sweeper_call = sleeper ( SWEEPER_TIMER ) ;
9201059 } ,
9211060 Some ( true ) => break ,
9221061 None => { } ,
9231062 }
1063+
1064+ // Run persistence tasks in parallel and exit if any of them returns an error.
1065+ for res in futures. await {
1066+ res?;
1067+ }
1068+
9241069 match check_sleeper ( & mut last_onion_message_handler_call) {
9251070 Some ( false ) => {
9261071 if let Some ( om) = & onion_messenger {
0 commit comments