Skip to content

Commit 33dcfc8

Browse files
tnulldomZippilli
authored andcommitted
Use FilesystemStore in BackgroundProcessor
We switch our BP over to use `FilesystemStore`, which also gives us test coverage and ensures the compatibility.
1 parent 2e076d2 commit 33dcfc8

File tree

3 files changed

+26
-2
lines changed

3 files changed

+26
-2
lines changed

lightning-background-processor/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,4 @@ lightning-rapid-gossip-sync = { version = "0.0.116", path = "../lightning-rapid-
2929
tokio = { version = "1.14", features = [ "macros", "rt", "rt-multi-thread", "sync", "time" ] }
3030
lightning = { version = "0.0.116", path = "../lightning", features = ["_test_utils"] }
3131
lightning-invoice = { version = "0.24.0", path = "../lightning-invoice" }
32-
lightning-persister = { version = "0.0.116", path = "../lightning-persister" }
32+
lightning-storage = { version = "0.0.116", path = "../lightning-storage" }

lightning-background-processor/src/lib.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,17 @@ use core::task;
507507
/// # fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
508508
/// # fn remove(&self, namespace: &str, key: &str) -> io::Result<()> { Ok(()) }
509509
/// # fn list(&self, namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
510+
/// # use lightning::io;
511+
/// # use std::sync::{Arc, Mutex};
512+
/// # use std::sync::atomic::{AtomicBool, Ordering};
513+
/// # use lightning_background_processor::{process_events_async, GossipSync};
514+
/// # struct MyStore {}
515+
/// # impl lightning::util::persist::KVStore for MyStore {
516+
/// # type Reader = io::Cursor<Vec<u8>>;
517+
/// # fn read(&self, namespace: &str, key: &str) -> io::Result<Self::Reader> { Ok(io::Cursor::new(Vec::new())) }
518+
/// # fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
519+
/// # fn remove(&self, namespace: &str, key: &str) -> io::Result<()> { Ok(()) }
520+
/// # fn list(&self, namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
510521
/// # }
511522
/// # struct MyEventHandler {}
512523
/// # impl MyEventHandler {
@@ -868,7 +879,7 @@ mod tests {
868879
use lightning::util::ser::Writeable;
869880
use lightning::util::test_utils;
870881
use lightning::util::persist::{KVStore, CHANNEL_MANAGER_PERSISTENCE_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, SCORER_PERSISTENCE_NAMESPACE, SCORER_PERSISTENCE_KEY};
871-
use lightning_persister::fs_store::{FilesystemStore, FilesystemReader};
882+
use lightning_storage::fs_store::{FilesystemStore, FilesystemReader};
872883
use std::collections::VecDeque;
873884
use std::{fs, env};
874885
use std::path::PathBuf;

lightning/src/util/persist.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ pub trait Persister<'a, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F:
100100
}
101101

102102

103+
impl<'a, A: KVStore, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref, S: WriteableScore<'a>> Persister<'a, M, T, ES, NS, SP, F, R, L, S> for A
104+
103105
impl<'a, A: KVStore, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref, S: WriteableScore<'a>> Persister<'a, M, T, ES, NS, SP, F, R, L, S> for A
104106
where M::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
105107
T::Target: 'static + BroadcasterInterface,
@@ -110,29 +112,38 @@ impl<'a, A: KVStore, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Der
110112
R::Target: 'static + Router,
111113
L::Target: 'static + Logger,
112114
{
115+
/// Persist the given ['ChannelManager'] to disk, returning an error if persistence failed.
113116
/// Persist the given ['ChannelManager'] to disk, returning an error if persistence failed.
114117
fn persist_manager(&self, channel_manager: &ChannelManager<M, T, ES, NS, SP, F, R, L>) -> Result<(), io::Error> {
115118
self.write(CHANNEL_MANAGER_PERSISTENCE_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, &channel_manager.encode())
119+
self.write(CHANNEL_MANAGER_PERSISTENCE_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, &channel_manager.encode())
116120
}
117121

122+
/// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
118123
/// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
119124
fn persist_graph(&self, network_graph: &NetworkGraph<L>) -> Result<(), io::Error> {
120125
self.write(NETWORK_GRAPH_PERSISTENCE_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, &network_graph.encode())
126+
self.write(NETWORK_GRAPH_PERSISTENCE_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, &network_graph.encode())
121127
}
122128

129+
/// Persist the given [`WriteableScore`] to disk, returning an error if persistence failed.
123130
/// Persist the given [`WriteableScore`] to disk, returning an error if persistence failed.
124131
fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error> {
125132
self.write(SCORER_PERSISTENCE_NAMESPACE, SCORER_PERSISTENCE_KEY, &scorer.encode())
133+
self.write(SCORER_PERSISTENCE_NAMESPACE, SCORER_PERSISTENCE_KEY, &scorer.encode())
126134
}
127135
}
128136

137+
impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStore> Persist<ChannelSigner> for K {
129138
impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStore> Persist<ChannelSigner> for K {
130139
// TODO: We really need a way for the persister to inform the user that its time to crash/shut
131140
// down once these start returning failure.
132141
// A PermanentFailure implies we should probably just shut down the node since we're
133142
// force-closing channels without even broadcasting!
134143

135144
fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
145+
let key = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
146+
match self.write(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, &key, &monitor.encode()) {
136147
let key = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
137148
match self.write(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, &key, &monitor.encode()) {
138149
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
@@ -141,6 +152,8 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStore> Persist<ChannelSign
141152
}
142153

143154
fn update_persisted_channel(&self, funding_txo: OutPoint, _update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
155+
let key = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
156+
match self.write(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, &key, &monitor.encode()) {
144157
let key = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
145158
match self.write(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, &key, &monitor.encode()) {
146159
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,

0 commit comments

Comments
 (0)