@@ -69,6 +69,8 @@ use lightning::util::wakers::Sleeper;
6969use lightning_rapid_gossip_sync:: RapidGossipSync ;
7070
7171use lightning_liquidity:: ALiquidityManager ;
72+ #[ cfg( feature = "std" ) ]
73+ use lightning_liquidity:: ALiquidityManagerSync ;
7274
7375use core:: ops:: Deref ;
7476use core:: time:: Duration ;
@@ -424,6 +426,31 @@ pub const NO_LIQUIDITY_MANAGER: Option<
424426 CM = & DynChannelManager ,
425427 Filter = dyn chain:: Filter ,
426428 C = & dyn chain:: Filter ,
429+ KVStore = dyn lightning:: util:: persist:: KVStore ,
430+ K = & dyn lightning:: util:: persist:: KVStore ,
431+ TimeProvider = dyn lightning_liquidity:: utils:: time:: TimeProvider ,
432+ TP = & dyn lightning_liquidity:: utils:: time:: TimeProvider ,
433+ > + Send
434+ + Sync ,
435+ > ,
436+ > = None ;
437+
438+ /// When initializing a background processor without a liquidity manager, this can be used to avoid
439+ /// specifying a concrete `LiquidityManagerSync` type.
440+ #[ cfg( all( not( c_bindings) , feature = "std" ) ) ]
441+ pub const NO_LIQUIDITY_MANAGER_SYNC : Option <
442+ Arc <
443+ dyn ALiquidityManagerSync <
444+ EntropySource = dyn EntropySource ,
445+ ES = & dyn EntropySource ,
446+ NodeSigner = dyn lightning:: sign:: NodeSigner ,
447+ NS = & dyn lightning:: sign:: NodeSigner ,
448+ AChannelManager = DynChannelManager ,
449+ CM = & DynChannelManager ,
450+ Filter = dyn chain:: Filter ,
451+ C = & dyn chain:: Filter ,
452+ KVStoreSync = dyn lightning:: util:: persist:: KVStoreSync ,
453+ KS = & dyn lightning:: util:: persist:: KVStoreSync ,
427454 TimeProvider = dyn lightning_liquidity:: utils:: time:: TimeProvider ,
428455 TP = & dyn lightning_liquidity:: utils:: time:: TimeProvider ,
429456 > + Send
@@ -544,45 +571,49 @@ pub(crate) mod futures_util {
544571 unsafe { Waker :: from_raw ( RawWaker :: new ( core:: ptr:: null ( ) , & DUMMY_WAKER_VTABLE ) ) }
545572 }
546573
547- enum JoinerResult < E , F : Future < Output = Result < ( ) , E > > + Unpin > {
574+ enum JoinerResult < ERR , F : Future < Output = Result < ( ) , ERR > > + Unpin > {
548575 Pending ( Option < F > ) ,
549- Ready ( Result < ( ) , E > ) ,
576+ Ready ( Result < ( ) , ERR > ) ,
550577 }
551578
552579 pub ( crate ) struct Joiner <
553- E ,
554- A : Future < Output = Result < ( ) , E > > + Unpin ,
555- B : Future < Output = Result < ( ) , E > > + Unpin ,
556- C : Future < Output = Result < ( ) , E > > + Unpin ,
557- D : Future < Output = Result < ( ) , E > > + Unpin ,
580+ ERR ,
581+ A : Future < Output = Result < ( ) , ERR > > + Unpin ,
582+ B : Future < Output = Result < ( ) , ERR > > + Unpin ,
583+ C : Future < Output = Result < ( ) , ERR > > + Unpin ,
584+ D : Future < Output = Result < ( ) , ERR > > + Unpin ,
585+ E : Future < Output = Result < ( ) , ERR > > + Unpin ,
558586 > {
559- a : JoinerResult < E , A > ,
560- b : JoinerResult < E , B > ,
561- c : JoinerResult < E , C > ,
562- d : JoinerResult < E , D > ,
587+ a : JoinerResult < ERR , A > ,
588+ b : JoinerResult < ERR , B > ,
589+ c : JoinerResult < ERR , C > ,
590+ d : JoinerResult < ERR , D > ,
591+ e : JoinerResult < ERR , E > ,
563592 }
564593
565594 impl <
566- E ,
567- A : Future < Output = Result < ( ) , E > > + Unpin ,
568- B : Future < Output = Result < ( ) , E > > + Unpin ,
569- C : Future < Output = Result < ( ) , E > > + Unpin ,
570- D : Future < Output = Result < ( ) , E > > + Unpin ,
571- > Joiner < E , A , B , C , D >
595+ ERR ,
596+ A : Future < Output = Result < ( ) , ERR > > + Unpin ,
597+ B : Future < Output = Result < ( ) , ERR > > + Unpin ,
598+ C : Future < Output = Result < ( ) , ERR > > + Unpin ,
599+ D : Future < Output = Result < ( ) , ERR > > + Unpin ,
600+ E : Future < Output = Result < ( ) , ERR > > + Unpin ,
601+ > Joiner < ERR , A , B , C , D , E >
572602 {
573603 pub ( crate ) fn new ( ) -> Self {
574604 Self {
575605 a : JoinerResult :: Pending ( None ) ,
576606 b : JoinerResult :: Pending ( None ) ,
577607 c : JoinerResult :: Pending ( None ) ,
578608 d : JoinerResult :: Pending ( None ) ,
609+ e : JoinerResult :: Pending ( None ) ,
579610 }
580611 }
581612
582613 pub ( crate ) fn set_a ( & mut self , fut : A ) {
583614 self . a = JoinerResult :: Pending ( Some ( fut) ) ;
584615 }
585- pub ( crate ) fn set_a_res ( & mut self , res : Result < ( ) , E > ) {
616+ pub ( crate ) fn set_a_res ( & mut self , res : Result < ( ) , ERR > ) {
586617 self . a = JoinerResult :: Ready ( res) ;
587618 }
588619 pub ( crate ) fn set_b ( & mut self , fut : B ) {
@@ -594,19 +625,23 @@ pub(crate) mod futures_util {
594625 pub ( crate ) fn set_d ( & mut self , fut : D ) {
595626 self . d = JoinerResult :: Pending ( Some ( fut) ) ;
596627 }
628+ pub ( crate ) fn set_e ( & mut self , fut : E ) {
629+ self . e = JoinerResult :: Pending ( Some ( fut) ) ;
630+ }
597631 }
598632
599633 impl <
600- E ,
601- A : Future < Output = Result < ( ) , E > > + Unpin ,
602- B : Future < Output = Result < ( ) , E > > + Unpin ,
603- C : Future < Output = Result < ( ) , E > > + Unpin ,
604- D : Future < Output = Result < ( ) , E > > + Unpin ,
605- > Future for Joiner < E , A , B , C , D >
634+ ERR ,
635+ A : Future < Output = Result < ( ) , ERR > > + Unpin ,
636+ B : Future < Output = Result < ( ) , ERR > > + Unpin ,
637+ C : Future < Output = Result < ( ) , ERR > > + Unpin ,
638+ D : Future < Output = Result < ( ) , ERR > > + Unpin ,
639+ E : Future < Output = Result < ( ) , ERR > > + Unpin ,
640+ > Future for Joiner < ERR , A , B , C , D , E >
606641 where
607- Joiner < E , A , B , C , D > : Unpin ,
642+ Joiner < ERR , A , B , C , D , E > : Unpin ,
608643 {
609- type Output = [ Result < ( ) , E > ; 4 ] ;
644+ type Output = [ Result < ( ) , ERR > ; 5 ] ;
610645 fn poll ( mut self : Pin < & mut Self > , ctx : & mut core:: task:: Context < ' _ > ) -> Poll < Self :: Output > {
611646 let mut all_complete = true ;
612647 macro_rules! handle {
@@ -615,7 +650,7 @@ pub(crate) mod futures_util {
615650 JoinerResult :: Pending ( None ) => {
616651 self . $val = JoinerResult :: Ready ( Ok ( ( ) ) ) ;
617652 } ,
618- JoinerResult :: <E , _>:: Pending ( Some ( ref mut val) ) => {
653+ JoinerResult :: <ERR , _>:: Pending ( Some ( ref mut val) ) => {
619654 match Pin :: new( val) . poll( ctx) {
620655 Poll :: Ready ( res) => {
621656 self . $val = JoinerResult :: Ready ( res) ;
@@ -633,9 +668,10 @@ pub(crate) mod futures_util {
633668 handle ! ( b) ;
634669 handle ! ( c) ;
635670 handle ! ( d) ;
671+ handle ! ( e) ;
636672
637673 if all_complete {
638- let mut res = [ Ok ( ( ) ) , Ok ( ( ) ) , Ok ( ( ) ) , Ok ( ( ) ) ] ;
674+ let mut res = [ Ok ( ( ) ) , Ok ( ( ) ) , Ok ( ( ) ) , Ok ( ( ) ) , Ok ( ( ) ) ] ;
639675 if let JoinerResult :: Ready ( ref mut val) = & mut self . a {
640676 core:: mem:: swap ( & mut res[ 0 ] , val) ;
641677 }
@@ -648,6 +684,9 @@ pub(crate) mod futures_util {
648684 if let JoinerResult :: Ready ( ref mut val) = & mut self . d {
649685 core:: mem:: swap ( & mut res[ 3 ] , val) ;
650686 }
687+ if let JoinerResult :: Ready ( ref mut val) = & mut self . e {
688+ core:: mem:: swap ( & mut res[ 4 ] , val) ;
689+ }
651690 Poll :: Ready ( res)
652691 } else {
653692 Poll :: Pending
@@ -731,7 +770,7 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
731770/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
732771/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
733772/// # type OnionMessenger<B, F, FE> = lightning::onion_message::messenger::OnionMessenger<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<Logger>, Arc<ChannelManager<B, F, FE>>, Arc<lightning::onion_message::messenger::DefaultMessageRouter<Arc<NetworkGraph>, Arc<Logger>, Arc<lightning::sign::KeysManager>>>, Arc<ChannelManager<B, F, FE>>, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler>;
734- /// # type LiquidityManager<B, F, FE> = lightning_liquidity::LiquidityManager<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<ChannelManager<B, F, FE>>, Arc<F>, Arc<DefaultTimeProvider>>;
773+ /// # type LiquidityManager<B, F, FE> = lightning_liquidity::LiquidityManager<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<ChannelManager<B, F, FE>>, Arc<F>, Arc<Store>, Arc< DefaultTimeProvider>>;
735774/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
736775/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, StoreSync>;
737776/// # type OutputSweeper<B, D, FE, F, O> = lightning::util::sweep::OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<Store>, Arc<Logger>, Arc<O>>;
@@ -976,7 +1015,7 @@ where
9761015 OptionalSelector { optional_future : None }
9771016 } ;
9781017 let lm_fut = if let Some ( lm) = liquidity_manager. as_ref ( ) {
979- let fut = lm. get_lm ( ) . get_pending_msgs_future ( ) ;
1018+ let fut = lm. get_lm ( ) . get_pending_msgs_or_needs_persist_future ( ) ;
9801019 OptionalSelector { optional_future : Some ( fut) }
9811020 } else {
9821021 OptionalSelector { optional_future : None }
@@ -1179,6 +1218,17 @@ where
11791218 None => { } ,
11801219 }
11811220
1221+ if let Some ( liquidity_manager) = liquidity_manager. as_ref ( ) {
1222+ log_trace ! ( logger, "Persisting LiquidityManager..." ) ;
1223+ let fut = async {
1224+ liquidity_manager. get_lm ( ) . persist ( ) . await . map_err ( |e| {
1225+ log_error ! ( logger, "Persisting LiquidityManager failed: {}" , e) ;
1226+ e
1227+ } )
1228+ } ;
1229+ futures. set_e ( Box :: pin ( fut) ) ;
1230+ }
1231+
11821232 // Run persistence tasks in parallel and exit if any of them returns an error.
11831233 for res in futures. await {
11841234 res?;
@@ -1450,7 +1500,7 @@ impl BackgroundProcessor {
14501500 CM :: Target : AChannelManager ,
14511501 OM :: Target : AOnionMessenger ,
14521502 PM :: Target : APeerManager ,
1453- LM :: Target : ALiquidityManager ,
1503+ LM :: Target : ALiquidityManagerSync ,
14541504 D :: Target : ChangeDestinationSourceSync ,
14551505 O :: Target : ' static + OutputSpender ,
14561506 K :: Target : ' static + KVStoreSync ,
@@ -1535,7 +1585,7 @@ impl BackgroundProcessor {
15351585 & channel_manager. get_cm ( ) . get_event_or_persistence_needed_future ( ) ,
15361586 & chain_monitor. get_update_future ( ) ,
15371587 & om. get_om ( ) . get_update_future ( ) ,
1538- & lm. get_lm ( ) . get_pending_msgs_future ( ) ,
1588+ & lm. get_lm ( ) . get_pending_msgs_or_needs_persist_future ( ) ,
15391589 ) ,
15401590 ( Some ( om) , None ) => Sleeper :: from_three_futures (
15411591 & channel_manager. get_cm ( ) . get_event_or_persistence_needed_future ( ) ,
@@ -1545,7 +1595,7 @@ impl BackgroundProcessor {
15451595 ( None , Some ( lm) ) => Sleeper :: from_three_futures (
15461596 & channel_manager. get_cm ( ) . get_event_or_persistence_needed_future ( ) ,
15471597 & chain_monitor. get_update_future ( ) ,
1548- & lm. get_lm ( ) . get_pending_msgs_future ( ) ,
1598+ & lm. get_lm ( ) . get_pending_msgs_or_needs_persist_future ( ) ,
15491599 ) ,
15501600 ( None , None ) => Sleeper :: from_two_futures (
15511601 & channel_manager. get_cm ( ) . get_event_or_persistence_needed_future ( ) ,
@@ -1579,6 +1629,13 @@ impl BackgroundProcessor {
15791629 log_trace ! ( logger, "Done persisting ChannelManager." ) ;
15801630 }
15811631
1632+ if let Some ( liquidity_manager) = liquidity_manager. as_ref ( ) {
1633+ log_trace ! ( logger, "Persisting LiquidityManager..." ) ;
1634+ let _ = liquidity_manager. get_lm ( ) . persist ( ) . map_err ( |e| {
1635+ log_error ! ( logger, "Persisting LiquidityManager failed: {}" , e) ;
1636+ } ) ;
1637+ }
1638+
15821639 // Note that we want to run a graph prune once not long after startup before
15831640 // falling back to our usual hourly prunes. This avoids short-lived clients never
15841641 // pruning their network graph. We run once 60 seconds after startup before
@@ -1793,7 +1850,7 @@ mod tests {
17931850 use lightning:: util:: test_utils;
17941851 use lightning:: { get_event, get_event_msg} ;
17951852 use lightning_liquidity:: utils:: time:: DefaultTimeProvider ;
1796- use lightning_liquidity:: LiquidityManager ;
1853+ use lightning_liquidity:: { ALiquidityManagerSync , LiquidityManagerSync } ;
17971854 use lightning_persister:: fs_store:: FilesystemStore ;
17981855 use lightning_rapid_gossip_sync:: RapidGossipSync ;
17991856 use std:: collections:: VecDeque ;
@@ -1890,11 +1947,12 @@ mod tests {
18901947 IgnoringMessageHandler ,
18911948 > ;
18921949
1893- type LM = LiquidityManager <
1950+ type LM = LiquidityManagerSync <
18941951 Arc < KeysManager > ,
18951952 Arc < KeysManager > ,
18961953 Arc < ChannelManager > ,
18971954 Arc < dyn Filter + Sync + Send > ,
1955+ Arc < Persister > ,
18981956 Arc < DefaultTimeProvider > ,
18991957 > ;
19001958
@@ -2342,15 +2400,19 @@ mod tests {
23422400 Arc :: clone ( & logger) ,
23432401 Arc :: clone ( & keys_manager) ,
23442402 ) ) ;
2345- let liquidity_manager = Arc :: new ( LiquidityManager :: new (
2346- Arc :: clone ( & keys_manager) ,
2347- Arc :: clone ( & keys_manager) ,
2348- Arc :: clone ( & manager) ,
2349- None ,
2350- None ,
2351- None ,
2352- None ,
2353- ) ) ;
2403+ let liquidity_manager = Arc :: new (
2404+ LiquidityManagerSync :: new (
2405+ Arc :: clone ( & keys_manager) ,
2406+ Arc :: clone ( & keys_manager) ,
2407+ Arc :: clone ( & manager) ,
2408+ None ,
2409+ None ,
2410+ Arc :: clone ( & kv_store) ,
2411+ None ,
2412+ None ,
2413+ )
2414+ . unwrap ( ) ,
2415+ ) ;
23542416 let node = Node {
23552417 node : manager,
23562418 p2p_gossip_sync,
@@ -2727,7 +2789,7 @@ mod tests {
27272789 Some ( Arc :: clone ( & nodes[ 0 ] . messenger ) ) ,
27282790 nodes[ 0 ] . rapid_gossip_sync ( ) ,
27292791 Arc :: clone ( & nodes[ 0 ] . peer_manager ) ,
2730- Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2792+ Some ( nodes[ 0 ] . liquidity_manager . get_lm_async ( ) ) ,
27312793 Some ( nodes[ 0 ] . sweeper . sweeper_async ( ) ) ,
27322794 Arc :: clone ( & nodes[ 0 ] . logger ) ,
27332795 Some ( Arc :: clone ( & nodes[ 0 ] . scorer ) ) ,
@@ -3236,7 +3298,7 @@ mod tests {
32363298 Some ( Arc :: clone ( & nodes[ 0 ] . messenger ) ) ,
32373299 nodes[ 0 ] . rapid_gossip_sync ( ) ,
32383300 Arc :: clone ( & nodes[ 0 ] . peer_manager ) ,
3239- Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
3301+ Some ( nodes[ 0 ] . liquidity_manager . get_lm_async ( ) ) ,
32403302 Some ( nodes[ 0 ] . sweeper . sweeper_async ( ) ) ,
32413303 Arc :: clone ( & nodes[ 0 ] . logger ) ,
32423304 Some ( Arc :: clone ( & nodes[ 0 ] . scorer ) ) ,
@@ -3451,7 +3513,7 @@ mod tests {
34513513 Some ( Arc :: clone ( & nodes[ 0 ] . messenger ) ) ,
34523514 nodes[ 0 ] . no_gossip_sync ( ) ,
34533515 Arc :: clone ( & nodes[ 0 ] . peer_manager ) ,
3454- Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
3516+ Some ( nodes[ 0 ] . liquidity_manager . get_lm_async ( ) ) ,
34553517 Some ( nodes[ 0 ] . sweeper . sweeper_async ( ) ) ,
34563518 Arc :: clone ( & nodes[ 0 ] . logger ) ,
34573519 Some ( Arc :: clone ( & nodes[ 0 ] . scorer ) ) ,
@@ -3500,7 +3562,7 @@ mod tests {
35003562 crate :: NO_ONION_MESSENGER ,
35013563 nodes[ 0 ] . no_gossip_sync ( ) ,
35023564 Arc :: clone ( & nodes[ 0 ] . peer_manager ) ,
3503- crate :: NO_LIQUIDITY_MANAGER ,
3565+ crate :: NO_LIQUIDITY_MANAGER_SYNC ,
35043566 Some ( Arc :: clone ( & nodes[ 0 ] . sweeper ) ) ,
35053567 Arc :: clone ( & nodes[ 0 ] . logger ) ,
35063568 Some ( Arc :: clone ( & nodes[ 0 ] . scorer ) ) ,
0 commit comments