@@ -23,7 +23,7 @@ use lightning::chain::{self, BestBlock, Confirm, Filter, Listen};
2323use lightning:: ln:: channelmanager:: { AChannelManager , ChainParameters } ;
2424use lightning:: ln:: features:: { InitFeatures , NodeFeatures } ;
2525use lightning:: ln:: msgs:: { ErrorAction , LightningError } ;
26- use lightning:: ln:: peer_handler:: { APeerManager , CustomMessageHandler } ;
26+ use lightning:: ln:: peer_handler:: CustomMessageHandler ;
2727use lightning:: ln:: wire:: CustomMessageReader ;
2828use lightning:: sign:: EntropySource ;
2929use lightning:: util:: logger:: Level ;
@@ -61,12 +61,11 @@ pub struct LiquidityClientConfig {
6161
6262/// The main interface into LSP functionality.
6363///
64- /// Should be used as a [`CustomMessageHandler`] for your
65- /// [`PeerManager`]'s [`MessageHandler`].
64+ /// Should be used as a [`CustomMessageHandler`] for your [`PeerManager`]'s [`MessageHandler`].
6665///
67- /// Should provide a reference to your [`PeerManager`] by calling
68- /// [`LiquidityManager::set_peer_manager `] post construction. This allows the [`LiquidityManager`] to
69- /// wake the [`PeerManager`] when there are pending messages to be sent.
66+ /// Users should provide a callback to process queued messages via
67+ /// [`LiquidityManager::set_process_msgs_callback `] post construction. This allows the
68+ /// [`LiquidityManager`] to wake the [`PeerManager`] when there are pending messages to be sent.
7069///
7170/// Users need to continually poll [`LiquidityManager::get_and_clear_pending_events`] in order to surface
7271/// [`Event`]'s that likely need to be handled.
@@ -78,40 +77,33 @@ pub struct LiquidityClientConfig {
7877/// [`MessageHandler`]: lightning::ln::peer_handler::MessageHandler
7978/// [`Event::HTLCIntercepted`]: lightning::events::Event::HTLCIntercepted
8079/// [`Event::ChannelReady`]: lightning::events::Event::ChannelReady
81- pub struct LiquidityManager <
82- ES : Deref + Clone ,
83- CM : Deref + Clone ,
84- PM : Deref + Clone ,
85- C : Deref + Clone ,
86- > where
80+ pub struct LiquidityManager < ES : Deref + Clone , CM : Deref + Clone , C : Deref + Clone >
81+ where
8782 ES :: Target : EntropySource ,
8883 CM :: Target : AChannelManager ,
89- PM :: Target : APeerManager ,
9084 C :: Target : Filter ,
9185{
92- pending_messages : Arc < DefaultMessageQueue < PM > > ,
86+ pending_messages : Arc < DefaultMessageQueue > ,
9387 pending_events : Arc < EventQueue > ,
9488 request_id_to_method_map : Mutex < HashMap < String , String > > ,
95- lsps0_client_handler : LSPS0ClientHandler < ES , Arc < DefaultMessageQueue < PM > > > ,
96- lsps0_service_handler : Option < LSPS0ServiceHandler < Arc < DefaultMessageQueue < PM > > > > ,
89+ lsps0_client_handler : LSPS0ClientHandler < ES , Arc < DefaultMessageQueue > > ,
90+ lsps0_service_handler : Option < LSPS0ServiceHandler < Arc < DefaultMessageQueue > > > ,
9791 #[ cfg( lsps1) ]
98- lsps1_service_handler : Option < LSPS1ServiceHandler < ES , CM , Arc < DefaultMessageQueue < PM > > , C > > ,
92+ lsps1_service_handler : Option < LSPS1ServiceHandler < ES , CM , Arc < DefaultMessageQueue > , C > > ,
9993 #[ cfg( lsps1) ]
100- lsps1_client_handler : Option < LSPS1ClientHandler < ES , CM , Arc < DefaultMessageQueue < PM > > , C > > ,
101- lsps2_service_handler : Option < LSPS2ServiceHandler < CM , Arc < DefaultMessageQueue < PM > > > > ,
102- lsps2_client_handler : Option < LSPS2ClientHandler < ES , Arc < DefaultMessageQueue < PM > > > > ,
94+ lsps1_client_handler : Option < LSPS1ClientHandler < ES , CM , Arc < DefaultMessageQueue > , C > > ,
95+ lsps2_service_handler : Option < LSPS2ServiceHandler < CM , Arc < DefaultMessageQueue > > > ,
96+ lsps2_client_handler : Option < LSPS2ClientHandler < ES , Arc < DefaultMessageQueue > > > ,
10397 service_config : Option < LiquidityServiceConfig > ,
10498 _client_config : Option < LiquidityClientConfig > ,
10599 best_block : Option < RwLock < BestBlock > > ,
106100 _chain_source : Option < C > ,
107101}
108102
109- impl < ES : Deref + Clone , CM : Deref + Clone , PM : Deref + Clone , C : Deref + Clone >
110- LiquidityManager < ES , CM , PM , C >
103+ impl < ES : Deref + Clone , CM : Deref + Clone , C : Deref + Clone > LiquidityManager < ES , CM , C >
111104where
112105 ES :: Target : EntropySource ,
113106 CM :: Target : AChannelManager ,
114- PM :: Target : APeerManager ,
115107 C :: Target : Filter ,
116108{
117109 /// Constructor for the [`LiquidityManager`].
@@ -208,47 +200,104 @@ where {
208200 }
209201
210202 /// Returns a reference to the LSPS0 client-side handler.
211- pub fn lsps0_client_handler ( & self ) -> & LSPS0ClientHandler < ES , Arc < DefaultMessageQueue < PM > > > {
203+ pub fn lsps0_client_handler ( & self ) -> & LSPS0ClientHandler < ES , Arc < DefaultMessageQueue > > {
212204 & self . lsps0_client_handler
213205 }
214206
215207 /// Returns a reference to the LSPS0 server-side handler.
216- pub fn lsps0_service_handler (
217- & self ,
218- ) -> Option < & LSPS0ServiceHandler < Arc < DefaultMessageQueue < PM > > > > {
208+ pub fn lsps0_service_handler ( & self ) -> Option < & LSPS0ServiceHandler < Arc < DefaultMessageQueue > > > {
219209 self . lsps0_service_handler . as_ref ( )
220210 }
221211
222212 /// Returns a reference to the LSPS1 client-side handler.
223213 #[ cfg( lsps1) ]
224214 pub fn lsps1_client_handler (
225215 & self ,
226- ) -> Option < & LSPS1ClientHandler < ES , CM , Arc < DefaultMessageQueue < PM > > , C > > {
216+ ) -> Option < & LSPS1ClientHandler < ES , CM , Arc < DefaultMessageQueue > , C > > {
227217 self . lsps1_client_handler . as_ref ( )
228218 }
229219
230220 /// Returns a reference to the LSPS1 server-side handler.
231221 #[ cfg( lsps1) ]
232222 pub fn lsps1_service_handler (
233223 & self ,
234- ) -> Option < & LSPS1ServiceHandler < ES , CM , Arc < DefaultMessageQueue < PM > > , C > > {
224+ ) -> Option < & LSPS1ServiceHandler < ES , CM , Arc < DefaultMessageQueue > , C > > {
235225 self . lsps1_service_handler . as_ref ( )
236226 }
237227
238228 /// Returns a reference to the LSPS2 client-side handler.
239229 pub fn lsps2_client_handler (
240230 & self ,
241- ) -> Option < & LSPS2ClientHandler < ES , Arc < DefaultMessageQueue < PM > > > > {
231+ ) -> Option < & LSPS2ClientHandler < ES , Arc < DefaultMessageQueue > > > {
242232 self . lsps2_client_handler . as_ref ( )
243233 }
244234
245235 /// Returns a reference to the LSPS2 server-side handler.
246236 pub fn lsps2_service_handler (
247237 & self ,
248- ) -> Option < & LSPS2ServiceHandler < CM , Arc < DefaultMessageQueue < PM > > > > {
238+ ) -> Option < & LSPS2ServiceHandler < CM , Arc < DefaultMessageQueue > > > {
249239 self . lsps2_service_handler . as_ref ( )
250240 }
251241
242+ /// Allows to set a callback that will be called after new messages are pushed to the message
243+ /// queue.
244+ ///
245+ /// Usually, you'll want to use this to call [`PeerManager::process_events`] to clear the
246+ /// message queue. For example:
247+ ///
248+ /// ```
249+ /// # use lightning::io;
250+ /// # use lightning_liquidity::LiquidityManager;
251+ /// # use std::sync::{Arc, RwLock};
252+ /// # use std::sync::atomic::{AtomicBool, Ordering};
253+ /// # use std::time::SystemTime;
254+ /// # struct MyStore {}
255+ /// # impl lightning::util::persist::KVStore for MyStore {
256+ /// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
257+ /// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
258+ /// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
259+ /// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
260+ /// # }
261+ /// # struct MyEntropySource {}
262+ /// # impl lightning::sign::EntropySource for MyEntropySource {
263+ /// # fn get_secure_random_bytes(&self) -> [u8; 32] { [0u8; 32] }
264+ /// # }
265+ /// # struct MyEventHandler {}
266+ /// # impl MyEventHandler {
267+ /// # async fn handle_event(&self, _: lightning::events::Event) {}
268+ /// # }
269+ /// # #[derive(Eq, PartialEq, Clone, Hash)]
270+ /// # struct MySocketDescriptor {}
271+ /// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor {
272+ /// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
273+ /// # fn disconnect_socket(&mut self) {}
274+ /// # }
275+ /// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync;
276+ /// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync;
277+ /// # type MyNodeSigner = dyn lightning::sign::NodeSigner + Send + Sync;
278+ /// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync;
279+ /// # type MyFilter = dyn lightning::chain::Filter + Send + Sync;
280+ /// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync;
281+ /// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyStore>>;
282+ /// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, Arc<MyUtxoLookup>, MyLogger>;
283+ /// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
284+ /// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
285+ /// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
286+ /// # type MyScorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
287+ /// # type MyLiquidityManager = LiquidityManager<Arc<MyEntropySource>, Arc<MyChannelManager>, Arc<MyFilter>>;
288+ /// # fn setup_background_processing(my_persister: Arc<MyStore>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_logger: Arc<MyLogger>, my_peer_manager: Arc<MyPeerManager>, my_liquidity_manager: Arc<MyLiquidityManager>) {
289+ /// let process_msgs_pm = Arc::clone(&my_peer_manager);
290+ /// let process_msgs_callback = move || process_msgs_pm.process_events();
291+ ///
292+ /// my_liquidity_manager.set_process_msgs_callback(process_msgs_callback);
293+ /// # }
294+ /// ```
295+ ///
296+ /// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events
297+ pub fn set_process_msgs_callback ( & self , callback : impl Fn ( ) + Send + Sync + ' static ) {
298+ self . pending_messages . set_process_msgs_callback ( callback)
299+ }
300+
252301 /// Blocks the current thread until next event is ready and returns it.
253302 ///
254303 /// Typically you would spawn a thread or task that calls this in a loop.
@@ -271,20 +320,6 @@ where {
271320 self . pending_events . get_and_clear_pending_events ( )
272321 }
273322
274- /// Set a [`PeerManager`] reference for all configured message handlers.
275- ///
276- /// This allows the message handlers to wake the [`PeerManager`] by calling
277- /// [`PeerManager::process_events`] after enqueing messages to be sent.
278- ///
279- /// Without this the messages will be sent based on whatever polling interval
280- /// your background processor uses.
281- ///
282- /// [`PeerManager`]: lightning::ln::peer_handler::PeerManager
283- /// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events
284- pub fn set_peer_manager ( & self , peer_manager : PM ) {
285- self . pending_messages . set_peer_manager ( peer_manager) ;
286- }
287-
288323 fn handle_lsps_message (
289324 & self , msg : LSPSMessage , sender_node_id : & PublicKey ,
290325 ) -> Result < ( ) , lightning:: ln:: msgs:: LightningError > {
@@ -348,12 +383,11 @@ where {
348383 }
349384}
350385
351- impl < ES : Deref + Clone + Clone , CM : Deref + Clone , PM : Deref + Clone , C : Deref + Clone >
352- CustomMessageReader for LiquidityManager < ES , CM , PM , C >
386+ impl < ES : Deref + Clone + Clone , CM : Deref + Clone , C : Deref + Clone > CustomMessageReader
387+ for LiquidityManager < ES , CM , C >
353388where
354389 ES :: Target : EntropySource ,
355390 CM :: Target : AChannelManager ,
356- PM :: Target : APeerManager ,
357391 C :: Target : Filter ,
358392{
359393 type CustomMessage = RawLSPSMessage ;
@@ -368,12 +402,11 @@ where
368402 }
369403}
370404
371- impl < ES : Deref + Clone , CM : Deref + Clone , PM : Deref + Clone , C : Deref + Clone > CustomMessageHandler
372- for LiquidityManager < ES , CM , PM , C >
405+ impl < ES : Deref + Clone , CM : Deref + Clone , C : Deref + Clone > CustomMessageHandler
406+ for LiquidityManager < ES , CM , C >
373407where
374408 ES :: Target : EntropySource ,
375409 CM :: Target : AChannelManager ,
376- PM :: Target : APeerManager ,
377410 C :: Target : Filter ,
378411{
379412 fn handle_custom_message (
@@ -431,12 +464,10 @@ where
431464 }
432465}
433466
434- impl < ES : Deref + Clone , CM : Deref + Clone , PM : Deref + Clone , C : Deref + Clone > Listen
435- for LiquidityManager < ES , CM , PM , C >
467+ impl < ES : Deref + Clone , CM : Deref + Clone , C : Deref + Clone > Listen for LiquidityManager < ES , CM , C >
436468where
437469 ES :: Target : EntropySource ,
438470 CM :: Target : AChannelManager ,
439- PM :: Target : APeerManager ,
440471 C :: Target : Filter ,
441472{
442473 fn filtered_block_connected (
@@ -472,12 +503,10 @@ where
472503 }
473504}
474505
475- impl < ES : Deref + Clone , CM : Deref + Clone , PM : Deref + Clone , C : Deref + Clone > Confirm
476- for LiquidityManager < ES , CM , PM , C >
506+ impl < ES : Deref + Clone , CM : Deref + Clone , C : Deref + Clone > Confirm for LiquidityManager < ES , CM , C >
477507where
478508 ES :: Target : EntropySource ,
479509 CM :: Target : AChannelManager ,
480- PM :: Target : APeerManager ,
481510 C :: Target : Filter ,
482511{
483512 fn transactions_confirmed (
0 commit comments