@@ -42,6 +42,8 @@ use lightning::util::persist::Persister;
4242use lightning:: util:: wakers:: Sleeper ;
4343use lightning_rapid_gossip_sync:: RapidGossipSync ;
4444
45+ use lightning_liquidity:: ALiquidityManager ;
46+
4547use core:: ops:: Deref ;
4648use core:: time:: Duration ;
4749
@@ -492,27 +494,31 @@ pub(crate) mod futures_util {
492494 A : Future < Output = ( ) > + Unpin ,
493495 B : Future < Output = ( ) > + Unpin ,
494496 C : Future < Output = ( ) > + Unpin ,
495- D : Future < Output = bool > + Unpin ,
497+ D : Future < Output = ( ) > + Unpin ,
498+ E : Future < Output = bool > + Unpin ,
496499 > {
497500 pub a : A ,
498501 pub b : B ,
499502 pub c : C ,
500503 pub d : D ,
504+ pub e : E ,
501505 }
502506
503507 pub ( crate ) enum SelectorOutput {
504508 A ,
505509 B ,
506510 C ,
507- D ( bool ) ,
511+ D ,
512+ E ( bool ) ,
508513 }
509514
510515 impl <
511516 A : Future < Output = ( ) > + Unpin ,
512517 B : Future < Output = ( ) > + Unpin ,
513518 C : Future < Output = ( ) > + Unpin ,
514- D : Future < Output = bool > + Unpin ,
515- > Future for Selector < A , B , C , D >
519+ D : Future < Output = ( ) > + Unpin ,
520+ E : Future < Output = bool > + Unpin ,
521+ > Future for Selector < A , B , C , D , E >
516522 {
517523 type Output = SelectorOutput ;
518524 fn poll (
@@ -537,8 +543,14 @@ pub(crate) mod futures_util {
537543 Poll :: Pending => { } ,
538544 }
539545 match Pin :: new ( & mut self . d ) . poll ( ctx) {
546+ Poll :: Ready ( ( ) ) => {
547+ return Poll :: Ready ( SelectorOutput :: D ) ;
548+ } ,
549+ Poll :: Pending => { } ,
550+ }
551+ match Pin :: new ( & mut self . e ) . poll ( ctx) {
540552 Poll :: Ready ( res) => {
541- return Poll :: Ready ( SelectorOutput :: D ( res) ) ;
553+ return Poll :: Ready ( SelectorOutput :: E ( res) ) ;
542554 } ,
543555 Poll :: Pending => { } ,
544556 }
@@ -648,6 +660,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
648660/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
649661/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
650662/// # type OnionMessenger<B, F, FE> = lightning::onion_message::messenger::OnionMessenger<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<Logger>, Arc<ChannelManager<B, F, FE>>, Arc<lightning::onion_message::messenger::DefaultMessageRouter<Arc<NetworkGraph>, Arc<Logger>, Arc<lightning::sign::KeysManager>>>, Arc<ChannelManager<B, F, FE>>, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler>;
663+ /// # type LiquidityManager<B, F, FE> = lightning_liquidity::LiquidityManager<Arc<lightning::sign::KeysManager>, Arc<ChannelManager<B, F, FE>>, Arc<F>>;
651664/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
652665/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger>;
653666/// #
@@ -661,6 +674,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
661674/// # event_handler: Arc<EventHandler>,
662675/// # channel_manager: Arc<ChannelManager<B, F, FE>>,
663676/// # onion_messenger: Arc<OnionMessenger<B, F, FE>>,
677+ /// # liquidity_manager: Arc<LiquidityManager<B, F, FE>>,
664678/// # chain_monitor: Arc<ChainMonitor<B, F, FE>>,
665679/// # gossip_sync: Arc<P2PGossipSync<UL>>,
666680/// # persister: Arc<Store>,
@@ -681,6 +695,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
681695/// let background_gossip_sync = GossipSync::p2p(Arc::clone(&node.gossip_sync));
682696/// let background_peer_man = Arc::clone(&node.peer_manager);
683697/// let background_onion_messenger = Arc::clone(&node.onion_messenger);
698+ /// let background_liquidity_manager = Arc::clone(&node.liquidity_manager);
684699/// let background_logger = Arc::clone(&node.logger);
685700/// let background_scorer = Arc::clone(&node.scorer);
686701///
@@ -708,6 +723,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
708723/// Some(background_onion_messenger),
709724/// background_gossip_sync,
710725/// background_peer_man,
726+ /// Some(background_liquidity_manager),
711727/// background_logger,
712728/// Some(background_scorer),
713729/// sleeper,
@@ -745,6 +761,7 @@ pub async fn process_events_async<
745761 PGS : ' static + Deref < Target = P2PGossipSync < G , UL , L > > + Send + Sync ,
746762 RGS : ' static + Deref < Target = RapidGossipSync < G , L > > + Send ,
747763 PM : ' static + Deref + Send + Sync ,
764+ LM : ' static + Deref + Send + Sync ,
748765 S : ' static + Deref < Target = SC > + Send + Sync ,
749766 SC : for < ' b > WriteableScore < ' b > ,
750767 SleepFuture : core:: future:: Future < Output = bool > + core:: marker:: Unpin ,
@@ -753,8 +770,8 @@ pub async fn process_events_async<
753770> (
754771 persister : PS , event_handler : EventHandler , chain_monitor : M , channel_manager : CM ,
755772 onion_messenger : Option < OM > , gossip_sync : GossipSync < PGS , RGS , G , UL , L > , peer_manager : PM ,
756- logger : L , scorer : Option < S > , sleeper : Sleeper , mobile_interruptable_platform : bool ,
757- fetch_time : FetchTime ,
773+ liquidity_manager : Option < LM > , logger : L , scorer : Option < S > , sleeper : Sleeper ,
774+ mobile_interruptable_platform : bool , fetch_time : FetchTime ,
758775) -> Result < ( ) , lightning:: io:: Error >
759776where
760777 UL :: Target : ' static + UtxoLookup ,
@@ -767,6 +784,7 @@ where
767784 CM :: Target : AChannelManager + Send + Sync ,
768785 OM :: Target : AOnionMessenger + Send + Sync ,
769786 PM :: Target : APeerManager + Send + Sync ,
787+ LM :: Target : ALiquidityManager + Send + Sync ,
770788{
771789 let mut should_break = false ;
772790 let async_event_handler = |event| {
@@ -820,19 +838,26 @@ where
820838 } else {
821839 OptionalSelector { optional_future: None }
822840 } ;
841+ let lm_fut = if let Some ( lm) = liquidity_manager. as_ref( ) {
842+ let fut = lm. get_lm( ) . get_pending_msgs_future( ) ;
843+ OptionalSelector { optional_future: Some ( fut) }
844+ } else {
845+ OptionalSelector { optional_future: None }
846+ } ;
823847 let fut = Selector {
824848 a: channel_manager. get_cm( ) . get_event_or_persistence_needed_future( ) ,
825849 b: chain_monitor. get_update_future( ) ,
826850 c: om_fut,
827- d: sleeper( if mobile_interruptable_platform {
851+ d: lm_fut,
852+ e: sleeper( if mobile_interruptable_platform {
828853 Duration :: from_millis( 100 )
829854 } else {
830855 Duration :: from_secs( FASTEST_TIMER )
831856 } ) ,
832857 } ;
833858 match fut. await {
834- SelectorOutput :: A | SelectorOutput :: B | SelectorOutput :: C => { } ,
835- SelectorOutput :: D ( exit) => {
859+ SelectorOutput :: A | SelectorOutput :: B | SelectorOutput :: C | SelectorOutput :: D => { } ,
860+ SelectorOutput :: E ( exit) => {
836861 should_break = exit;
837862 } ,
838863 }
@@ -920,12 +945,13 @@ impl BackgroundProcessor {
920945 PGS : ' static + Deref < Target = P2PGossipSync < G , UL , L > > + Send + Sync ,
921946 RGS : ' static + Deref < Target = RapidGossipSync < G , L > > + Send ,
922947 PM : ' static + Deref + Send + Sync ,
948+ LM : ' static + Deref + Send + Sync ,
923949 S : ' static + Deref < Target = SC > + Send + Sync ,
924950 SC : for < ' b > WriteableScore < ' b > ,
925951 > (
926952 persister : PS , event_handler : EH , chain_monitor : M , channel_manager : CM ,
927953 onion_messenger : Option < OM > , gossip_sync : GossipSync < PGS , RGS , G , UL , L > , peer_manager : PM ,
928- logger : L , scorer : Option < S > ,
954+ liquidity_manager : Option < LM > , logger : L , scorer : Option < S > ,
929955 ) -> Self
930956 where
931957 UL :: Target : ' static + UtxoLookup ,
@@ -938,6 +964,7 @@ impl BackgroundProcessor {
938964 CM :: Target : AChannelManager + Send + Sync ,
939965 OM :: Target : AOnionMessenger + Send + Sync ,
940966 PM :: Target : APeerManager + Send + Sync ,
967+ LM :: Target : ALiquidityManager + Send + Sync ,
941968 {
942969 let stop_thread = Arc :: new ( AtomicBool :: new ( false ) ) ;
943970 let stop_thread_clone = stop_thread. clone ( ) ;
@@ -977,17 +1004,27 @@ impl BackgroundProcessor {
9771004 scorer,
9781005 stop_thread. load( Ordering :: Acquire ) ,
9791006 {
980- let sleeper = if let Some ( om ) = onion_messenger . as_ref( ) {
981- Sleeper :: from_three_futures (
1007+ let sleeper = match ( onion_messenger . as_ref ( ) , liquidity_manager . as_ref( ) ) {
1008+ ( Some ( om ) , Some ( lm ) ) => Sleeper :: from_four_futures (
9821009 & channel_manager. get_cm( ) . get_event_or_persistence_needed_future( ) ,
9831010 & chain_monitor. get_update_future( ) ,
9841011 & om. get_om( ) . get_update_future( ) ,
985- )
986- } else {
987- Sleeper :: from_two_futures (
1012+ & lm . get_lm ( ) . get_pending_msgs_future ( ) ,
1013+ ) ,
1014+ ( Some ( om ) , None ) => Sleeper :: from_three_futures (
9881015 & channel_manager. get_cm( ) . get_event_or_persistence_needed_future( ) ,
9891016 & chain_monitor. get_update_future( ) ,
990- )
1017+ & om. get_om( ) . get_update_future( ) ,
1018+ ) ,
1019+ ( None , Some ( lm) ) => Sleeper :: from_three_futures(
1020+ & channel_manager. get_cm( ) . get_event_or_persistence_needed_future( ) ,
1021+ & chain_monitor. get_update_future( ) ,
1022+ & lm. get_lm( ) . get_pending_msgs_future( ) ,
1023+ ) ,
1024+ ( None , None ) => Sleeper :: from_two_futures(
1025+ & channel_manager. get_cm( ) . get_event_or_persistence_needed_future( ) ,
1026+ & chain_monitor. get_update_future( ) ,
1027+ ) ,
9911028 } ;
9921029 sleeper. wait_timeout( Duration :: from_millis( 100 ) ) ;
9931030 } ,
@@ -1102,6 +1139,7 @@ mod tests {
11021139 use lightning:: util:: sweep:: { OutputSpendStatus , OutputSweeper , PRUNE_DELAY_BLOCKS } ;
11031140 use lightning:: util:: test_utils;
11041141 use lightning:: { get_event, get_event_msg} ;
1142+ use lightning_liquidity:: LiquidityManager ;
11051143 use lightning_persister:: fs_store:: FilesystemStore ;
11061144 use lightning_rapid_gossip_sync:: RapidGossipSync ;
11071145 use std:: collections:: VecDeque ;
@@ -1196,6 +1234,9 @@ mod tests {
11961234 IgnoringMessageHandler ,
11971235 > ;
11981236
1237+ type LM =
1238+ LiquidityManager < Arc < KeysManager > , Arc < ChannelManager > , Arc < dyn Filter + Sync + Send > > ;
1239+
11991240 struct Node {
12001241 node : Arc < ChannelManager > ,
12011242 messenger : Arc < OM > ,
@@ -1212,6 +1253,7 @@ mod tests {
12121253 Arc < KeysManager > ,
12131254 > ,
12141255 > ,
1256+ liquidity_manager : Arc < LM > ,
12151257 chain_monitor : Arc < ChainMonitor > ,
12161258 kv_store : Arc < FilesystemStore > ,
12171259 tx_broadcaster : Arc < test_utils:: TestBroadcaster > ,
@@ -1631,11 +1673,20 @@ mod tests {
16311673 logger. clone ( ) ,
16321674 keys_manager. clone ( ) ,
16331675 ) ) ;
1676+ let liquidity_manager = Arc :: new ( LiquidityManager :: new (
1677+ Arc :: clone ( & keys_manager) ,
1678+ Arc :: clone ( & manager) ,
1679+ None ,
1680+ None ,
1681+ None ,
1682+ None ,
1683+ ) ) ;
16341684 let node = Node {
16351685 node : manager,
16361686 p2p_gossip_sync,
16371687 rapid_gossip_sync,
16381688 peer_manager,
1689+ liquidity_manager,
16391690 chain_monitor,
16401691 kv_store,
16411692 tx_broadcaster,
@@ -1833,6 +1884,7 @@ mod tests {
18331884 Some ( nodes[ 0 ] . messenger . clone ( ) ) ,
18341885 nodes[ 0 ] . p2p_gossip_sync ( ) ,
18351886 nodes[ 0 ] . peer_manager . clone ( ) ,
1887+ Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
18361888 nodes[ 0 ] . logger . clone ( ) ,
18371889 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
18381890 ) ;
@@ -1926,6 +1978,7 @@ mod tests {
19261978 Some ( nodes[ 0 ] . messenger . clone ( ) ) ,
19271979 nodes[ 0 ] . no_gossip_sync ( ) ,
19281980 nodes[ 0 ] . peer_manager . clone ( ) ,
1981+ Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
19291982 nodes[ 0 ] . logger . clone ( ) ,
19301983 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
19311984 ) ;
@@ -1968,6 +2021,7 @@ mod tests {
19682021 Some ( nodes[ 0 ] . messenger . clone ( ) ) ,
19692022 nodes[ 0 ] . no_gossip_sync ( ) ,
19702023 nodes[ 0 ] . peer_manager . clone ( ) ,
2024+ Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
19712025 nodes[ 0 ] . logger . clone ( ) ,
19722026 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
19732027 ) ;
@@ -2000,6 +2054,7 @@ mod tests {
20002054 Some ( nodes[ 0 ] . messenger . clone ( ) ) ,
20012055 nodes[ 0 ] . rapid_gossip_sync ( ) ,
20022056 nodes[ 0 ] . peer_manager . clone ( ) ,
2057+ Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
20032058 nodes[ 0 ] . logger . clone ( ) ,
20042059 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
20052060 move |dur : Duration | {
@@ -2036,6 +2091,7 @@ mod tests {
20362091 Some ( nodes[ 0 ] . messenger . clone ( ) ) ,
20372092 nodes[ 0 ] . p2p_gossip_sync ( ) ,
20382093 nodes[ 0 ] . peer_manager . clone ( ) ,
2094+ Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
20392095 nodes[ 0 ] . logger . clone ( ) ,
20402096 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
20412097 ) ;
@@ -2065,6 +2121,7 @@ mod tests {
20652121 Some ( nodes[ 0 ] . messenger . clone ( ) ) ,
20662122 nodes[ 0 ] . no_gossip_sync ( ) ,
20672123 nodes[ 0 ] . peer_manager . clone ( ) ,
2124+ Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
20682125 nodes[ 0 ] . logger . clone ( ) ,
20692126 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
20702127 ) ;
@@ -2111,6 +2168,7 @@ mod tests {
21112168 Some ( nodes[ 0 ] . messenger . clone ( ) ) ,
21122169 nodes[ 0 ] . no_gossip_sync ( ) ,
21132170 nodes[ 0 ] . peer_manager . clone ( ) ,
2171+ Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
21142172 nodes[ 0 ] . logger . clone ( ) ,
21152173 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
21162174 ) ;
@@ -2173,6 +2231,7 @@ mod tests {
21732231 Some ( nodes[ 0 ] . messenger . clone ( ) ) ,
21742232 nodes[ 0 ] . no_gossip_sync ( ) ,
21752233 nodes[ 0 ] . peer_manager . clone ( ) ,
2234+ Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
21762235 nodes[ 0 ] . logger . clone ( ) ,
21772236 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
21782237 ) ;
@@ -2324,6 +2383,7 @@ mod tests {
23242383 Some ( nodes[ 0 ] . messenger . clone ( ) ) ,
23252384 nodes[ 0 ] . no_gossip_sync ( ) ,
23262385 nodes[ 0 ] . peer_manager . clone ( ) ,
2386+ Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
23272387 nodes[ 0 ] . logger . clone ( ) ,
23282388 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
23292389 ) ;
@@ -2353,6 +2413,7 @@ mod tests {
23532413 Some ( nodes[ 0 ] . messenger . clone ( ) ) ,
23542414 nodes[ 0 ] . no_gossip_sync ( ) ,
23552415 nodes[ 0 ] . peer_manager . clone ( ) ,
2416+ Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
23562417 nodes[ 0 ] . logger . clone ( ) ,
23572418 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
23582419 ) ;
@@ -2448,6 +2509,7 @@ mod tests {
24482509 Some ( nodes[ 0 ] . messenger . clone ( ) ) ,
24492510 nodes[ 0 ] . rapid_gossip_sync ( ) ,
24502511 nodes[ 0 ] . peer_manager . clone ( ) ,
2512+ Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
24512513 nodes[ 0 ] . logger . clone ( ) ,
24522514 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
24532515 ) ;
@@ -2480,6 +2542,7 @@ mod tests {
24802542 Some ( nodes[ 0 ] . messenger . clone ( ) ) ,
24812543 nodes[ 0 ] . rapid_gossip_sync ( ) ,
24822544 nodes[ 0 ] . peer_manager . clone ( ) ,
2545+ Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
24832546 nodes[ 0 ] . logger . clone ( ) ,
24842547 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
24852548 move |dur : Duration | {
@@ -2638,6 +2701,7 @@ mod tests {
26382701 Some ( nodes[ 0 ] . messenger . clone ( ) ) ,
26392702 nodes[ 0 ] . no_gossip_sync ( ) ,
26402703 nodes[ 0 ] . peer_manager . clone ( ) ,
2704+ Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
26412705 nodes[ 0 ] . logger . clone ( ) ,
26422706 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
26432707 ) ;
@@ -2688,6 +2752,7 @@ mod tests {
26882752 Some ( nodes[ 0 ] . messenger . clone ( ) ) ,
26892753 nodes[ 0 ] . no_gossip_sync ( ) ,
26902754 nodes[ 0 ] . peer_manager . clone ( ) ,
2755+ Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
26912756 nodes[ 0 ] . logger . clone ( ) ,
26922757 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
26932758 move |dur : Duration | {
0 commit comments