Skip to content

Commit f3feec9

Browse files
committed
Add LSPS2ServiceHandler persistence
We add simple `persist` call to `LSPS2ServiceHandler` that sequentially persist all the peer states under a key that encodes their node id.
1 parent e034a82 commit f3feec9

File tree

4 files changed

+109
-8
lines changed

4 files changed

+109
-8
lines changed

lightning-liquidity/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ pub mod lsps2;
6565
pub mod lsps5;
6666
mod manager;
6767
pub mod message_queue;
68+
pub mod persist;
6869
#[allow(dead_code)]
6970
#[allow(unused_imports)]
7071
mod sync;
@@ -73,6 +74,6 @@ mod tests;
7374
pub mod utils;
7475

7576
pub use manager::{
76-
ALiquidityManager, ALiquidityManagerSync, LiquidityClientConfig, LiquidityManager, LiquidityManagerSync,
77-
LiquidityServiceConfig,
77+
ALiquidityManager, ALiquidityManagerSync, LiquidityClientConfig, LiquidityManager,
78+
LiquidityManagerSync, LiquidityServiceConfig,
7879
};

lightning-liquidity/src/lsps2/service.rs

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,15 @@
99

1010
//! Contains the main bLIP-52 / LSPS2 server-side object, [`LSPS2ServiceHandler`].
1111
12+
use alloc::boxed::Box;
1213
use alloc::string::{String, ToString};
1314
use alloc::vec::Vec;
15+
use lightning::util::persist::KVStore;
1416

1517
use core::cmp::Ordering as CmpOrdering;
18+
use core::future::Future;
1619
use core::ops::Deref;
20+
use core::pin::Pin;
1721
use core::sync::atomic::{AtomicUsize, Ordering};
1822

1923
use crate::events::EventQueue;
@@ -28,6 +32,9 @@ use crate::lsps2::utils::{
2832
compute_opening_fee, is_expired_opening_fee_params, is_valid_opening_fee_params,
2933
};
3034
use crate::message_queue::{MessageQueue, MessageQueueNotifierGuard};
35+
use crate::persist::{
36+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
37+
};
3138
use crate::prelude::hash_map::Entry;
3239
use crate::prelude::{new_hash_map, HashMap};
3340
use crate::sync::{Arc, Mutex, MutexGuard, RwLock};
@@ -38,6 +45,7 @@ use lightning::ln::msgs::{ErrorAction, LightningError};
3845
use lightning::ln::types::ChannelId;
3946
use lightning::util::errors::APIError;
4047
use lightning::util::logger::Level;
48+
use lightning::util::ser::Writeable;
4149
use lightning::{impl_writeable_tlv_based, impl_writeable_tlv_based_enum};
4250

4351
use lightning_types::payment::PaymentHash;
@@ -569,6 +577,7 @@ where
569577
CM::Target: AChannelManager,
570578
{
571579
channel_manager: CM,
580+
kv_store: Arc<dyn KVStore + Send + Sync>,
572581
pending_messages: Arc<MessageQueue>,
573582
pending_events: Arc<EventQueue>,
574583
per_peer_state: RwLock<HashMap<PublicKey, Mutex<PeerState>>>,
@@ -585,7 +594,7 @@ where
585594
/// Constructs a `LSPS2ServiceHandler`.
586595
pub(crate) fn new(
587596
pending_messages: Arc<MessageQueue>, pending_events: Arc<EventQueue>, channel_manager: CM,
588-
config: LSPS2ServiceConfig,
597+
kv_store: Arc<dyn KVStore + Send + Sync>, config: LSPS2ServiceConfig,
589598
) -> Self {
590599
Self {
591600
pending_messages,
@@ -595,6 +604,7 @@ where
595604
peer_by_channel_id: RwLock::new(new_hash_map()),
596605
total_pending_requests: AtomicUsize::new(0),
597606
channel_manager,
607+
kv_store,
598608
config,
599609
}
600610
}
@@ -1442,6 +1452,45 @@ where
14421452
);
14431453
}
14441454

1455+
fn persist_peer_state(
1456+
&self, counterparty_node_id: PublicKey, peer_state: &PeerState,
1457+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + Send>> {
1458+
let key = counterparty_node_id.to_string();
1459+
let encoded = peer_state.encode();
1460+
self.kv_store.write(
1461+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1462+
LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
1463+
&key,
1464+
encoded,
1465+
)
1466+
}
1467+
1468+
pub(crate) fn persist(
1469+
&self,
1470+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + Send>> {
1471+
let outer_state_lock = self.per_peer_state.read().unwrap();
1472+
let mut futures = Vec::new();
1473+
for (counterparty_node_id, inner_state_lock) in outer_state_lock.iter() {
1474+
let peer_state_lock = inner_state_lock.lock().unwrap();
1475+
let fut = self.persist_peer_state(*counterparty_node_id, &*peer_state_lock);
1476+
futures.push(fut);
1477+
}
1478+
1479+
// TODO: We should eventually persist in parallel, however, when we do, we probably want to
1480+
// introduce some batching to upper-bound the number of requests inflight at any given
1481+
// time.
1482+
Box::pin(async move {
1483+
let mut ret = Ok(());
1484+
for fut in futures {
1485+
let res = fut.await;
1486+
if res.is_err() {
1487+
ret = res;
1488+
}
1489+
}
1490+
ret
1491+
})
1492+
}
1493+
14451494
pub(crate) fn peer_disconnected(&self, counterparty_node_id: PublicKey) {
14461495
let mut outer_state_lock = self.per_peer_state.write().unwrap();
14471496
let is_prunable =

lightning-liquidity/src/manager.rs

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
// You may not use this file except in accordance with one or both of these
88
// licenses.
99

10+
use alloc::boxed::Box;
1011
use alloc::string::ToString;
1112
use alloc::vec::Vec;
1213

@@ -53,7 +54,9 @@ use lightning_types::features::{InitFeatures, NodeFeatures};
5354

5455
use bitcoin::secp256k1::PublicKey;
5556

57+
use core::future::Future as StdFuture;
5658
use core::ops::Deref;
59+
use core::pin::Pin;
5760

5861
const LSPS_FEATURE_BIT: usize = 729;
5962

@@ -173,7 +176,9 @@ pub trait ALiquidityManagerSync {
173176
type TP: Deref<Target = Self::TimeProvider> + Clone;
174177
/// Returns the inner async [`LiquidityManager`] for testing purposes.
175178
#[cfg(any(test, feature = "_test_utils"))]
176-
fn get_lm_async(&self) -> Arc<LiquidityManager<Self::ES, Self::NS, Self::CM, Self::C, Self::TP>>;
179+
fn get_lm_async(
180+
&self,
181+
) -> Arc<LiquidityManager<Self::ES, Self::NS, Self::CM, Self::C, Self::TP>>;
177182
/// Returns a reference to the actual [`LiquidityManager`] object.
178183
fn get_lm(&self) -> &LiquidityManagerSync<Self::ES, Self::NS, Self::CM, Self::C, Self::TP>;
179184
}
@@ -204,7 +209,9 @@ where
204209
type TP = TP;
205210
/// Returns the inner async [`LiquidityManager`] for testing purposes.
206211
#[cfg(any(test, feature = "_test_utils"))]
207-
fn get_lm_async(&self) -> Arc<LiquidityManager<Self::ES, Self::NS, Self::CM, Self::C, Self::TP>> {
212+
fn get_lm_async(
213+
&self,
214+
) -> Arc<LiquidityManager<Self::ES, Self::NS, Self::CM, Self::C, Self::TP>> {
208215
Arc::clone(&self.inner)
209216
}
210217
fn get_lm(&self) -> &LiquidityManagerSync<ES, NS, CM, C, TP> {
@@ -262,7 +269,6 @@ pub struct LiquidityManager<
262269
_client_config: Option<LiquidityClientConfig>,
263270
best_block: RwLock<Option<BestBlock>>,
264271
_chain_source: Option<C>,
265-
kv_store: Arc<dyn KVStore + Send + Sync>,
266272
}
267273

268274
#[cfg(feature = "time")]
@@ -349,6 +355,7 @@ where
349355
Arc::clone(&pending_messages),
350356
Arc::clone(&pending_events),
351357
channel_manager.clone(),
358+
Arc::clone(&kv_store),
352359
config.clone(),
353360
)
354361
})
@@ -444,7 +451,6 @@ where
444451
_client_config: client_config,
445452
best_block: RwLock::new(chain_params.map(|chain_params| chain_params.best_block)),
446453
_chain_source: chain_source,
447-
kv_store,
448454
}
449455
}
450456

@@ -483,7 +489,7 @@ where
483489

484490
/// Returns a reference to the LSPS2 server-side handler.
485491
///
486-
/// The returned handler allows to initiate the LSPS2 service-side flow.
492+
/// The returned hendler allows to initiate the LSPS2 service-side flow.
487493
pub fn lsps2_service_handler(&self) -> Option<&LSPS2ServiceHandler<CM>> {
488494
self.lsps2_service_handler.as_ref()
489495
}
@@ -559,6 +565,31 @@ where
559565
self.pending_events.get_and_clear_pending_events()
560566
}
561567

568+
/// Persists the state of the service handlers towards the given [`KVStore`] implementation.
569+
///
570+
/// This will be regularly called by LDK's background processor if necessary and only needs to
571+
/// be called manually if it's not utilized.
572+
pub fn persist(
573+
&self,
574+
) -> Pin<Box<dyn StdFuture<Output = Result<(), lightning::io::Error>> + Send>> {
575+
let mut futures = Vec::new();
576+
if let Some(lsps2_service_handler) = self.lsps2_service_handler.as_ref() {
577+
futures.push(lsps2_service_handler.persist());
578+
}
579+
580+
// TODO: We should eventually persist in parallel.
581+
Box::pin(async move {
582+
let mut ret = Ok(());
583+
for fut in futures {
584+
let res = fut.await;
585+
if res.is_err() {
586+
ret = res;
587+
}
588+
}
589+
ret
590+
})
591+
}
592+
562593
fn handle_lsps_message(
563594
&self, msg: LSPSMessage, sender_node_id: &PublicKey,
564595
) -> Result<(), lightning::ln::msgs::LightningError> {

lightning-liquidity/src/persist.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// This file is Copyright its original authors, visible in version control
2+
// history.
3+
//
4+
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5+
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6+
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7+
// You may not use this file except in accordance with one or both of these
8+
// licenses.
9+
10+
//! Types and utils for persistence.
11+
12+
/// The primary namespace under which the [`LiquidityManager`] will be persisted.
13+
///
14+
/// [`LiquidityManager`]: crate::LiquidityManager
15+
pub const LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE: &str = "liquidity";
16+
17+
/// The secondary namespace under which the [`LSPS2ServiceHandler`] data will be persisted.
18+
///
19+
/// [`LSPS2ServiceHandler`]: crate::lsps2::service::LSPS2ServiceHandler
20+
pub const LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE: &str = "lsps2_service";

0 commit comments

Comments
 (0)