Skip to content

Commit 9bbe38c

Browse files
committed
Add LSPS5ServiceHandler persistence
We add simple `persist` call to `LSPS5ServiceHandler` that sequentially persist all the peer states under a key that encodes their node id.
1 parent d229c51 commit 9bbe38c

File tree

3 files changed

+60
-1
lines changed

3 files changed

+60
-1
lines changed

lightning-liquidity/src/lsps5/service.rs

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ use crate::lsps5::msgs::{
1717
SetWebhookRequest, SetWebhookResponse, WebhookNotification, WebhookNotificationMethod,
1818
};
1919
use crate::message_queue::MessageQueue;
20+
use crate::persist::{
21+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
22+
};
2023
use crate::prelude::*;
2124
use crate::sync::{Arc, Mutex, RwLock, RwLockWriteGuard};
2225
use crate::utils::time::TimeProvider;
@@ -28,10 +31,15 @@ use lightning::ln::channelmanager::AChannelManager;
2831
use lightning::ln::msgs::{ErrorAction, LightningError};
2932
use lightning::sign::NodeSigner;
3033
use lightning::util::logger::Level;
34+
use lightning::util::persist::KVStore;
35+
use lightning::util::ser::Writeable;
3136

37+
use core::future::Future;
3238
use core::ops::Deref;
39+
use core::pin::Pin;
3340
use core::time::Duration;
3441

42+
use alloc::boxed::Box;
3543
use alloc::string::String;
3644
use alloc::vec::Vec;
3745

@@ -131,6 +139,7 @@ where
131139
time_provider: TP,
132140
channel_manager: CM,
133141
node_signer: NS,
142+
kv_store: Arc<dyn KVStore + Send + Sync>,
134143
last_pruning: Mutex<Option<LSPSDateTime>>,
135144
}
136145

@@ -143,7 +152,8 @@ where
143152
/// Constructs a `LSPS5ServiceHandler` using the given time provider.
144153
pub(crate) fn new_with_time_provider(
145154
event_queue: Arc<EventQueue>, pending_messages: Arc<MessageQueue>, channel_manager: CM,
146-
node_signer: NS, config: LSPS5ServiceConfig, time_provider: TP,
155+
kv_store: Arc<dyn KVStore + Send + Sync>, node_signer: NS, config: LSPS5ServiceConfig,
156+
time_provider: TP,
147157
) -> Self {
148158
assert!(config.max_webhooks_per_client > 0, "`max_webhooks_per_client` must be > 0");
149159
Self {
@@ -154,6 +164,7 @@ where
154164
time_provider,
155165
channel_manager,
156166
node_signer,
167+
kv_store,
157168
last_pruning: Mutex::new(None),
158169
}
159170
}
@@ -186,6 +197,44 @@ where
186197
}
187198
}
188199

200+
fn persist_peer_state(
201+
&self, counterparty_node_id: PublicKey, peer_state: &PeerState,
202+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + Send>> {
203+
let key = counterparty_node_id.to_string();
204+
let encoded = peer_state.encode();
205+
self.kv_store.write(
206+
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
207+
LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
208+
&key,
209+
encoded,
210+
)
211+
}
212+
213+
pub(crate) fn persist(
214+
&self,
215+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + Send>> {
216+
let outer_state_lock = self.per_peer_state.read().unwrap();
217+
let mut futures = Vec::new();
218+
for (counterparty_node_id, peer_state) in outer_state_lock.iter() {
219+
let fut = self.persist_peer_state(*counterparty_node_id, peer_state);
220+
futures.push(fut);
221+
}
222+
223+
// TODO: We should eventually persist in parallel, however, when we do, we probably want to
224+
// introduce some batching to upper-bound the number of requests inflight at any given
225+
// time.
226+
Box::pin(async move {
227+
let mut ret = Ok(());
228+
for fut in futures {
229+
let res = fut.await;
230+
if res.is_err() {
231+
ret = res;
232+
}
233+
}
234+
ret
235+
})
236+
}
237+
189238
fn check_prune_stale_webhooks<'a>(
190239
&self, outer_state_lock: &mut RwLockWriteGuard<'a, HashMap<PublicKey, PeerState>>,
191240
) {

lightning-liquidity/src/manager.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ where
384384
Arc::clone(&pending_events),
385385
Arc::clone(&pending_messages),
386386
channel_manager.clone(),
387+
Arc::clone(&kv_store),
387388
node_signer,
388389
config.clone(),
389390
time_provider,
@@ -577,6 +578,10 @@ where
577578
futures.push(lsps2_service_handler.persist());
578579
}
579580

581+
if let Some(lsps5_service_handler) = self.lsps5_service_handler.as_ref() {
582+
futures.push(lsps5_service_handler.persist());
583+
}
584+
580585
// TODO: We should eventually persist in parallel.
581586
Box::pin(async move {
582587
let mut ret = Ok(());

lightning-liquidity/src/persist.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,8 @@ pub const LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE: &str = "liquidity";
1818
///
1919
/// [`LSPS2ServiceHandler`]: crate::lsps2::service::LSPS2ServiceHandler
2020
pub const LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE: &str = "lsps2_service";
21+
22+
/// The secondary namespace under which the [`LSPS5ServiceHandler`] data will be persisted.
23+
///
24+
/// [`LSPS5ServiceHandler`]: crate::lsps5::service::LSPS5ServiceHandler
25+
pub const LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE: &str = "lsps5_service";

0 commit comments

Comments
 (0)