Skip to content
This repository was archived by the owner on Jan 6, 2025. It is now read-only.

Commit 64570d2

Browse files
committed
Drop APeerManager dependency and replace with process_msgs_callback
Previously, `LiquidityManager` (or at least its internal message queue) was holding a reference to `PeerManager`/`impl APeerManager` in order to be able to trigger message processing after new messages were enqueued. However, this had two major drawbacks: firstly, it introduced an ugly (and hard to resolve) cycle in the type definitions as `PeerManager` depends on `CustomMessageHandler`, which in this case is `LiquidityManager`, itself depending on said `PeerManager` reference. Secondly, it introduced the complex `PeerManager` LDK object to the LSPS implementation which is otherwise not that depedent on LDK's internal types. To resolve these issues, we heere replace the dependency on `PeerManager`/`impl APeerManager` with a simple generic callback that will be called every time new messages are enqueued.
1 parent 8cd4a4b commit 64570d2

File tree

2 files changed

+100
-84
lines changed

2 files changed

+100
-84
lines changed

src/manager.rs

Lines changed: 86 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use lightning::chain::{self, BestBlock, Confirm, Filter, Listen};
2323
use lightning::ln::channelmanager::{AChannelManager, ChainParameters};
2424
use lightning::ln::features::{InitFeatures, NodeFeatures};
2525
use lightning::ln::msgs::{ErrorAction, LightningError};
26-
use lightning::ln::peer_handler::{APeerManager, CustomMessageHandler};
26+
use lightning::ln::peer_handler::CustomMessageHandler;
2727
use lightning::ln::wire::CustomMessageReader;
2828
use lightning::sign::EntropySource;
2929
use lightning::util::logger::Level;
@@ -61,12 +61,11 @@ pub struct LiquidityClientConfig {
6161

6262
/// The main interface into LSP functionality.
6363
///
64-
/// Should be used as a [`CustomMessageHandler`] for your
65-
/// [`PeerManager`]'s [`MessageHandler`].
64+
/// Should be used as a [`CustomMessageHandler`] for your [`PeerManager`]'s [`MessageHandler`].
6665
///
67-
/// Should provide a reference to your [`PeerManager`] by calling
68-
/// [`LiquidityManager::set_peer_manager`] post construction. This allows the [`LiquidityManager`] to
69-
/// wake the [`PeerManager`] when there are pending messages to be sent.
66+
/// Users should provide a callback to process queued messages via
67+
/// [`LiquidityManager::set_process_msgs_callback`] post construction. This allows the
68+
/// [`LiquidityManager`] to wake the [`PeerManager`] when there are pending messages to be sent.
7069
///
7170
/// Users need to continually poll [`LiquidityManager::get_and_clear_pending_events`] in order to surface
7271
/// [`Event`]'s that likely need to be handled.
@@ -78,40 +77,33 @@ pub struct LiquidityClientConfig {
7877
/// [`MessageHandler`]: lightning::ln::peer_handler::MessageHandler
7978
/// [`Event::HTLCIntercepted`]: lightning::events::Event::HTLCIntercepted
8079
/// [`Event::ChannelReady`]: lightning::events::Event::ChannelReady
81-
pub struct LiquidityManager<
82-
ES: Deref + Clone,
83-
CM: Deref + Clone,
84-
PM: Deref + Clone,
85-
C: Deref + Clone,
86-
> where
80+
pub struct LiquidityManager<ES: Deref + Clone, CM: Deref + Clone, C: Deref + Clone>
81+
where
8782
ES::Target: EntropySource,
8883
CM::Target: AChannelManager,
89-
PM::Target: APeerManager,
9084
C::Target: Filter,
9185
{
92-
pending_messages: Arc<DefaultMessageQueue<PM>>,
86+
pending_messages: Arc<DefaultMessageQueue>,
9387
pending_events: Arc<EventQueue>,
9488
request_id_to_method_map: Mutex<HashMap<String, String>>,
95-
lsps0_client_handler: LSPS0ClientHandler<ES, Arc<DefaultMessageQueue<PM>>>,
96-
lsps0_service_handler: Option<LSPS0ServiceHandler<Arc<DefaultMessageQueue<PM>>>>,
89+
lsps0_client_handler: LSPS0ClientHandler<ES, Arc<DefaultMessageQueue>>,
90+
lsps0_service_handler: Option<LSPS0ServiceHandler<Arc<DefaultMessageQueue>>>,
9791
#[cfg(lsps1)]
98-
lsps1_service_handler: Option<LSPS1ServiceHandler<ES, CM, Arc<DefaultMessageQueue<PM>>, C>>,
92+
lsps1_service_handler: Option<LSPS1ServiceHandler<ES, CM, Arc<DefaultMessageQueue>, C>>,
9993
#[cfg(lsps1)]
100-
lsps1_client_handler: Option<LSPS1ClientHandler<ES, CM, Arc<DefaultMessageQueue<PM>>, C>>,
101-
lsps2_service_handler: Option<LSPS2ServiceHandler<CM, Arc<DefaultMessageQueue<PM>>>>,
102-
lsps2_client_handler: Option<LSPS2ClientHandler<ES, Arc<DefaultMessageQueue<PM>>>>,
94+
lsps1_client_handler: Option<LSPS1ClientHandler<ES, CM, Arc<DefaultMessageQueue>, C>>,
95+
lsps2_service_handler: Option<LSPS2ServiceHandler<CM, Arc<DefaultMessageQueue>>>,
96+
lsps2_client_handler: Option<LSPS2ClientHandler<ES, Arc<DefaultMessageQueue>>>,
10397
service_config: Option<LiquidityServiceConfig>,
10498
_client_config: Option<LiquidityClientConfig>,
10599
best_block: Option<RwLock<BestBlock>>,
106100
_chain_source: Option<C>,
107101
}
108102

109-
impl<ES: Deref + Clone, CM: Deref + Clone, PM: Deref + Clone, C: Deref + Clone>
110-
LiquidityManager<ES, CM, PM, C>
103+
impl<ES: Deref + Clone, CM: Deref + Clone, C: Deref + Clone> LiquidityManager<ES, CM, C>
111104
where
112105
ES::Target: EntropySource,
113106
CM::Target: AChannelManager,
114-
PM::Target: APeerManager,
115107
C::Target: Filter,
116108
{
117109
/// Constructor for the [`LiquidityManager`].
@@ -208,47 +200,104 @@ where {
208200
}
209201

210202
/// Returns a reference to the LSPS0 client-side handler.
211-
pub fn lsps0_client_handler(&self) -> &LSPS0ClientHandler<ES, Arc<DefaultMessageQueue<PM>>> {
203+
pub fn lsps0_client_handler(&self) -> &LSPS0ClientHandler<ES, Arc<DefaultMessageQueue>> {
212204
&self.lsps0_client_handler
213205
}
214206

215207
/// Returns a reference to the LSPS0 server-side handler.
216-
pub fn lsps0_service_handler(
217-
&self,
218-
) -> Option<&LSPS0ServiceHandler<Arc<DefaultMessageQueue<PM>>>> {
208+
pub fn lsps0_service_handler(&self) -> Option<&LSPS0ServiceHandler<Arc<DefaultMessageQueue>>> {
219209
self.lsps0_service_handler.as_ref()
220210
}
221211

222212
/// Returns a reference to the LSPS1 client-side handler.
223213
#[cfg(lsps1)]
224214
pub fn lsps1_client_handler(
225215
&self,
226-
) -> Option<&LSPS1ClientHandler<ES, CM, Arc<DefaultMessageQueue<PM>>, C>> {
216+
) -> Option<&LSPS1ClientHandler<ES, CM, Arc<DefaultMessageQueue>, C>> {
227217
self.lsps1_client_handler.as_ref()
228218
}
229219

230220
/// Returns a reference to the LSPS1 server-side handler.
231221
#[cfg(lsps1)]
232222
pub fn lsps1_service_handler(
233223
&self,
234-
) -> Option<&LSPS1ServiceHandler<ES, CM, Arc<DefaultMessageQueue<PM>>, C>> {
224+
) -> Option<&LSPS1ServiceHandler<ES, CM, Arc<DefaultMessageQueue>, C>> {
235225
self.lsps1_service_handler.as_ref()
236226
}
237227

238228
/// Returns a reference to the LSPS2 client-side handler.
239229
pub fn lsps2_client_handler(
240230
&self,
241-
) -> Option<&LSPS2ClientHandler<ES, Arc<DefaultMessageQueue<PM>>>> {
231+
) -> Option<&LSPS2ClientHandler<ES, Arc<DefaultMessageQueue>>> {
242232
self.lsps2_client_handler.as_ref()
243233
}
244234

245235
/// Returns a reference to the LSPS2 server-side handler.
246236
pub fn lsps2_service_handler(
247237
&self,
248-
) -> Option<&LSPS2ServiceHandler<CM, Arc<DefaultMessageQueue<PM>>>> {
238+
) -> Option<&LSPS2ServiceHandler<CM, Arc<DefaultMessageQueue>>> {
249239
self.lsps2_service_handler.as_ref()
250240
}
251241

242+
/// Allows to set a callback that will be called after new messages are pushed to the message
243+
/// queue.
244+
///
245+
/// Usually, you'll want to use this to call [`PeerManager::process_events`] to clear the
246+
/// message queue. For example:
247+
///
248+
/// ```
249+
/// # use lightning::io;
250+
/// # use lightning_liquidity::LiquidityManager;
251+
/// # use std::sync::{Arc, RwLock};
252+
/// # use std::sync::atomic::{AtomicBool, Ordering};
253+
/// # use std::time::SystemTime;
254+
/// # struct MyStore {}
255+
/// # impl lightning::util::persist::KVStore for MyStore {
256+
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
257+
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
258+
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
259+
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
260+
/// # }
261+
/// # struct MyEntropySource {}
262+
/// # impl lightning::sign::EntropySource for MyEntropySource {
263+
/// # fn get_secure_random_bytes(&self) -> [u8; 32] { [0u8; 32] }
264+
/// # }
265+
/// # struct MyEventHandler {}
266+
/// # impl MyEventHandler {
267+
/// # async fn handle_event(&self, _: lightning::events::Event) {}
268+
/// # }
269+
/// # #[derive(Eq, PartialEq, Clone, Hash)]
270+
/// # struct MySocketDescriptor {}
271+
/// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor {
272+
/// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
273+
/// # fn disconnect_socket(&mut self) {}
274+
/// # }
275+
/// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync;
276+
/// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync;
277+
/// # type MyNodeSigner = dyn lightning::sign::NodeSigner + Send + Sync;
278+
/// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync;
279+
/// # type MyFilter = dyn lightning::chain::Filter + Send + Sync;
280+
/// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync;
281+
/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyStore>>;
282+
/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, Arc<MyUtxoLookup>, MyLogger>;
283+
/// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
284+
/// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
285+
/// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
286+
/// # type MyScorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
287+
/// # type MyLiquidityManager = LiquidityManager<Arc<MyEntropySource>, Arc<MyChannelManager>, Arc<MyFilter>>;
288+
/// # fn setup_background_processing(my_persister: Arc<MyStore>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_logger: Arc<MyLogger>, my_peer_manager: Arc<MyPeerManager>, my_liquidity_manager: Arc<MyLiquidityManager>) {
289+
/// let process_msgs_pm = Arc::clone(&my_peer_manager);
290+
/// let process_msgs_callback = move || process_msgs_pm.process_events();
291+
///
292+
/// my_liquidity_manager.set_process_msgs_callback(process_msgs_callback);
293+
/// # }
294+
/// ```
295+
///
296+
/// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events
297+
pub fn set_process_msgs_callback(&self, callback: impl Fn() + Send + Sync + 'static) {
298+
self.pending_messages.set_process_msgs_callback(callback)
299+
}
300+
252301
/// Blocks the current thread until next event is ready and returns it.
253302
///
254303
/// Typically you would spawn a thread or task that calls this in a loop.
@@ -271,20 +320,6 @@ where {
271320
self.pending_events.get_and_clear_pending_events()
272321
}
273322

274-
/// Set a [`PeerManager`] reference for all configured message handlers.
275-
///
276-
/// This allows the message handlers to wake the [`PeerManager`] by calling
277-
/// [`PeerManager::process_events`] after enqueing messages to be sent.
278-
///
279-
/// Without this the messages will be sent based on whatever polling interval
280-
/// your background processor uses.
281-
///
282-
/// [`PeerManager`]: lightning::ln::peer_handler::PeerManager
283-
/// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events
284-
pub fn set_peer_manager(&self, peer_manager: PM) {
285-
self.pending_messages.set_peer_manager(peer_manager);
286-
}
287-
288323
fn handle_lsps_message(
289324
&self, msg: LSPSMessage, sender_node_id: &PublicKey,
290325
) -> Result<(), lightning::ln::msgs::LightningError> {
@@ -348,12 +383,11 @@ where {
348383
}
349384
}
350385

351-
impl<ES: Deref + Clone + Clone, CM: Deref + Clone, PM: Deref + Clone, C: Deref + Clone>
352-
CustomMessageReader for LiquidityManager<ES, CM, PM, C>
386+
impl<ES: Deref + Clone + Clone, CM: Deref + Clone, C: Deref + Clone> CustomMessageReader
387+
for LiquidityManager<ES, CM, C>
353388
where
354389
ES::Target: EntropySource,
355390
CM::Target: AChannelManager,
356-
PM::Target: APeerManager,
357391
C::Target: Filter,
358392
{
359393
type CustomMessage = RawLSPSMessage;
@@ -368,12 +402,11 @@ where
368402
}
369403
}
370404

371-
impl<ES: Deref + Clone, CM: Deref + Clone, PM: Deref + Clone, C: Deref + Clone> CustomMessageHandler
372-
for LiquidityManager<ES, CM, PM, C>
405+
impl<ES: Deref + Clone, CM: Deref + Clone, C: Deref + Clone> CustomMessageHandler
406+
for LiquidityManager<ES, CM, C>
373407
where
374408
ES::Target: EntropySource,
375409
CM::Target: AChannelManager,
376-
PM::Target: APeerManager,
377410
C::Target: Filter,
378411
{
379412
fn handle_custom_message(
@@ -431,12 +464,10 @@ where
431464
}
432465
}
433466

434-
impl<ES: Deref + Clone, CM: Deref + Clone, PM: Deref + Clone, C: Deref + Clone> Listen
435-
for LiquidityManager<ES, CM, PM, C>
467+
impl<ES: Deref + Clone, CM: Deref + Clone, C: Deref + Clone> Listen for LiquidityManager<ES, CM, C>
436468
where
437469
ES::Target: EntropySource,
438470
CM::Target: AChannelManager,
439-
PM::Target: APeerManager,
440471
C::Target: Filter,
441472
{
442473
fn filtered_block_connected(
@@ -472,12 +503,10 @@ where
472503
}
473504
}
474505

475-
impl<ES: Deref + Clone, CM: Deref + Clone, PM: Deref + Clone, C: Deref + Clone> Confirm
476-
for LiquidityManager<ES, CM, PM, C>
506+
impl<ES: Deref + Clone, CM: Deref + Clone, C: Deref + Clone> Confirm for LiquidityManager<ES, CM, C>
477507
where
478508
ES::Target: EntropySource,
479509
CM::Target: AChannelManager,
480-
PM::Target: APeerManager,
481510
C::Target: Filter,
482511
{
483512
fn transactions_confirmed(

src/message_queue.rs

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,11 @@
11
//! Holds types and traits used to implement message queues for [`LSPSMessage`]s.
22
33
use crate::lsps0::msgs::LSPSMessage;
4-
use crate::prelude::{Vec, VecDeque};
5-
use crate::sync::Mutex;
6-
7-
use lightning::ln::peer_handler::APeerManager;
4+
use crate::prelude::{Box, Vec, VecDeque};
5+
use crate::sync::{Mutex, RwLock};
86

97
use bitcoin::secp256k1::PublicKey;
108

11-
use core::ops::Deref;
12-
139
/// Represents a simple message queue that the LSPS message handlers use to send messages to a given counterparty.
1410
pub trait MessageQueue {
1511
/// Enqueues an LSPS message to be sent to the counterparty with the given node id.
@@ -22,45 +18,36 @@ pub trait MessageQueue {
2218
/// The default [`MessageQueue`] Implementation used by [`LiquidityManager`].
2319
///
2420
/// [`LiquidityManager`]: crate::LiquidityManager
25-
pub struct DefaultMessageQueue<PM: Deref>
26-
where
27-
PM::Target: APeerManager,
28-
{
21+
pub struct DefaultMessageQueue {
2922
queue: Mutex<VecDeque<(PublicKey, LSPSMessage)>>,
30-
peer_manager: Mutex<Option<PM>>,
23+
process_msgs_callback: RwLock<Option<Box<dyn Fn() + Send + Sync + 'static>>>,
3124
}
3225

33-
impl<PM: Deref> DefaultMessageQueue<PM>
34-
where
35-
PM::Target: APeerManager,
36-
{
26+
impl DefaultMessageQueue {
3727
pub(crate) fn new() -> Self {
3828
let queue = Mutex::new(VecDeque::new());
39-
let peer_manager = Mutex::new(None);
40-
Self { queue, peer_manager }
29+
let process_msgs_callback = RwLock::new(None);
30+
Self { queue, process_msgs_callback }
4131
}
4232

43-
pub(crate) fn get_and_clear_pending_msgs(&self) -> Vec<(PublicKey, LSPSMessage)> {
44-
self.queue.lock().unwrap().drain(..).collect()
33+
pub(crate) fn set_process_msgs_callback(&self, callback: impl Fn() + Send + Sync + 'static) {
34+
*self.process_msgs_callback.write().unwrap() = Some(Box::new(callback));
4535
}
4636

47-
pub(crate) fn set_peer_manager(&self, peer_manager: PM) {
48-
*self.peer_manager.lock().unwrap() = Some(peer_manager);
37+
pub(crate) fn get_and_clear_pending_msgs(&self) -> Vec<(PublicKey, LSPSMessage)> {
38+
self.queue.lock().unwrap().drain(..).collect()
4939
}
5040
}
5141

52-
impl<PM: Deref> MessageQueue for DefaultMessageQueue<PM>
53-
where
54-
PM::Target: APeerManager,
55-
{
42+
impl MessageQueue for DefaultMessageQueue {
5643
fn enqueue(&self, counterparty_node_id: &PublicKey, msg: LSPSMessage) {
5744
{
5845
let mut queue = self.queue.lock().unwrap();
5946
queue.push_back((*counterparty_node_id, msg));
6047
}
6148

62-
if let Some(peer_manager) = self.peer_manager.lock().unwrap().as_ref() {
63-
peer_manager.as_ref().process_events();
49+
if let Some(process_msgs_callback) = self.process_msgs_callback.read().unwrap().as_ref() {
50+
(process_msgs_callback)()
6451
}
6552
}
6653
}

0 commit comments

Comments
 (0)