Skip to content

Commit 22cfb54

Browse files
committed
Remove Persister
In preparation for the addition of an async KVStore, we here remove the Persister pseudo-wrapper. The wrapper is thin, would need to be duplicated for async, and KVStore isn't fully abstracted anyway anymore because the sweeper takes it directly.
1 parent 1d507f2 commit 22cfb54

File tree

2 files changed

+74
-86
lines changed

2 files changed

+74
-86
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 74 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ extern crate alloc;
1717
extern crate lightning;
1818
extern crate lightning_rapid_gossip_sync;
1919

20+
use crate::lightning::util::ser::Writeable;
2021
use lightning::chain;
2122
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
2223
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
@@ -40,7 +41,13 @@ use lightning::sign::ChangeDestinationSourceSync;
4041
use lightning::sign::EntropySource;
4142
use lightning::sign::OutputSpender;
4243
use lightning::util::logger::Logger;
43-
use lightning::util::persist::{KVStore, Persister};
44+
use lightning::util::persist::{
45+
KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
46+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
47+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
48+
SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
49+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
50+
};
4451
use lightning::util::sweep::OutputSweeper;
4552
#[cfg(feature = "std")]
4653
use lightning::util::sweep::OutputSweeperSync;
@@ -313,7 +320,8 @@ fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + Wri
313320

314321
macro_rules! define_run_body {
315322
(
316-
$persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
323+
$kv_store: ident,
324+
$chain_monitor: ident, $process_chain_monitor_events: expr,
317325
$channel_manager: ident, $process_channel_manager_events: expr,
318326
$onion_messenger: ident, $process_onion_message_handler_events: expr,
319327
$peer_manager: ident, $gossip_sync: ident,
@@ -375,7 +383,12 @@ macro_rules! define_run_body {
375383

376384
if $channel_manager.get_cm().get_and_clear_needs_persistence() {
377385
log_trace!($logger, "Persisting ChannelManager...");
378-
$persister.persist_manager(&$channel_manager)?;
386+
$kv_store.write(
387+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
388+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
389+
CHANNEL_MANAGER_PERSISTENCE_KEY,
390+
&$channel_manager.get_cm().encode(),
391+
)?;
379392
log_trace!($logger, "Done persisting ChannelManager.");
380393
}
381394
if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
@@ -436,7 +449,12 @@ macro_rules! define_run_body {
436449
log_trace!($logger, "Persisting network graph.");
437450
}
438451

439-
if let Err(e) = $persister.persist_graph(network_graph) {
452+
if let Err(e) = $kv_store.write(
453+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
454+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
455+
NETWORK_GRAPH_PERSISTENCE_KEY,
456+
&network_graph.encode(),
457+
) {
440458
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
441459
}
442460

@@ -464,7 +482,12 @@ macro_rules! define_run_body {
464482
} else {
465483
log_trace!($logger, "Persisting scorer");
466484
}
467-
if let Err(e) = $persister.persist_scorer(&scorer) {
485+
if let Err(e) = $kv_store.write(
486+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
487+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
488+
SCORER_PERSISTENCE_KEY,
489+
&scorer.encode(),
490+
) {
468491
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
469492
}
470493
}
@@ -487,16 +510,31 @@ macro_rules! define_run_body {
487510
// After we exit, ensure we persist the ChannelManager one final time - this avoids
488511
// some races where users quit while channel updates were in-flight, with
489512
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
490-
$persister.persist_manager(&$channel_manager)?;
513+
$kv_store.write(
514+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
515+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
516+
CHANNEL_MANAGER_PERSISTENCE_KEY,
517+
&$channel_manager.get_cm().encode(),
518+
)?;
491519

492520
// Persist Scorer on exit
493521
if let Some(ref scorer) = $scorer {
494-
$persister.persist_scorer(&scorer)?;
522+
$kv_store.write(
523+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
524+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
525+
SCORER_PERSISTENCE_KEY,
526+
&scorer.encode(),
527+
)?;
495528
}
496529

497530
// Persist NetworkGraph on exit
498531
if let Some(network_graph) = $gossip_sync.network_graph() {
499-
$persister.persist_graph(network_graph)?;
532+
$kv_store.write(
533+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
534+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
535+
NETWORK_GRAPH_PERSISTENCE_KEY,
536+
&network_graph.encode(),
537+
)?;
500538
}
501539

502540
Ok(())
@@ -684,7 +722,6 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
684722
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
685723
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
686724
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
687-
/// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
688725
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
689726
/// # > {
690727
/// # peer_manager: Arc<PeerManager<B, F, FE, UL>>,
@@ -697,7 +734,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
697734
/// # persister: Arc<Store>,
698735
/// # logger: Arc<Logger>,
699736
/// # scorer: Arc<Scorer>,
700-
/// # sweeper: Arc<OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<K>, Arc<Logger>, Arc<O>>>,
737+
/// # sweeper: Arc<OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<Store>, Arc<Logger>, Arc<O>>>,
701738
/// # }
702739
/// #
703740
/// # async fn setup_background_processing<
@@ -706,9 +743,8 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
706743
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
707744
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
708745
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
709-
/// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
710746
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
711-
/// # >(node: Node<B, F, FE, UL, D, K, O>) {
747+
/// # >(node: Node<B, F, FE, UL, D, O>) {
712748
/// let background_persister = Arc::clone(&node.persister);
713749
/// let background_event_handler = Arc::clone(&node.event_handler);
714750
/// let background_chain_mon = Arc::clone(&node.chain_monitor);
@@ -780,7 +816,6 @@ pub async fn process_events_async<
780816
P: 'static + Deref,
781817
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
782818
EventHandler: Fn(Event) -> EventHandlerFuture,
783-
PS: 'static + Deref + Send,
784819
ES: 'static + Deref + Send,
785820
M: 'static
786821
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>
@@ -802,7 +837,7 @@ pub async fn process_events_async<
802837
Sleeper: Fn(Duration) -> SleepFuture,
803838
FetchTime: Fn() -> Option<Duration>,
804839
>(
805-
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
840+
kv_store: K, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
806841
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
807842
liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
808843
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
@@ -814,7 +849,6 @@ where
814849
F::Target: 'static + FeeEstimator,
815850
L::Target: 'static + Logger,
816851
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
817-
PS::Target: 'static + Persister<'a, CM, L, S>,
818852
ES::Target: 'static + EntropySource,
819853
CM::Target: AChannelManager,
820854
OM::Target: AOnionMessenger,
@@ -830,7 +864,7 @@ where
830864
let event_handler = &event_handler;
831865
let scorer = &scorer;
832866
let logger = &logger;
833-
let persister = &persister;
867+
let kv_store = &kv_store;
834868
let fetch_time = &fetch_time;
835869
// We should be able to drop the Box once our MSRV is 1.68
836870
Box::pin(async move {
@@ -841,7 +875,12 @@ where
841875
if let Some(duration_since_epoch) = fetch_time() {
842876
if update_scorer(scorer, &event, duration_since_epoch) {
843877
log_trace!(logger, "Persisting scorer after update");
844-
if let Err(e) = persister.persist_scorer(&*scorer) {
878+
if let Err(e) = kv_store.write(
879+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
880+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
881+
SCORER_PERSISTENCE_KEY,
882+
&scorer.encode(),
883+
) {
845884
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
846885
// We opt not to abort early on persistence failure here as persisting
847886
// the scorer is non-critical and we still hope that it will have
@@ -855,7 +894,7 @@ where
855894
})
856895
};
857896
define_run_body!(
858-
persister,
897+
kv_store,
859898
chain_monitor,
860899
chain_monitor.process_pending_events_async(async_event_handler).await,
861900
channel_manager,
@@ -978,7 +1017,6 @@ impl BackgroundProcessor {
9781017
L: 'static + Deref + Send,
9791018
P: 'static + Deref,
9801019
EH: 'static + EventHandler + Send,
981-
PS: 'static + Deref + Send,
9821020
ES: 'static + Deref + Send,
9831021
M: 'static
9841022
+ Deref<
@@ -996,10 +1034,10 @@ impl BackgroundProcessor {
9961034
SC: for<'b> WriteableScore<'b>,
9971035
D: 'static + Deref,
9981036
O: 'static + Deref,
999-
K: 'static + Deref,
1037+
K: 'static + Deref + Send,
10001038
OS: 'static + Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>> + Send,
10011039
>(
1002-
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
1040+
kv_store: K, event_handler: EH, chain_monitor: M, channel_manager: CM,
10031041
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
10041042
liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
10051043
) -> Self
@@ -1010,7 +1048,6 @@ impl BackgroundProcessor {
10101048
F::Target: 'static + FeeEstimator,
10111049
L::Target: 'static + Logger,
10121050
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
1013-
PS::Target: 'static + Persister<'a, CM, L, S>,
10141051
ES::Target: 'static + EntropySource,
10151052
CM::Target: AChannelManager,
10161053
OM::Target: AOnionMessenger,
@@ -1035,15 +1072,20 @@ impl BackgroundProcessor {
10351072
.expect("Time should be sometime after 1970");
10361073
if update_scorer(scorer, &event, duration_since_epoch) {
10371074
log_trace!(logger, "Persisting scorer after update");
1038-
if let Err(e) = persister.persist_scorer(&scorer) {
1075+
if let Err(e) = kv_store.write(
1076+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1077+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1078+
SCORER_PERSISTENCE_KEY,
1079+
&scorer.encode(),
1080+
) {
10391081
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
10401082
}
10411083
}
10421084
}
10431085
event_handler.handle_event(event)
10441086
};
10451087
define_run_body!(
1046-
persister,
1088+
kv_store,
10471089
chain_monitor,
10481090
chain_monitor.process_pending_events(&event_handler),
10491091
channel_manager,
@@ -1256,7 +1298,7 @@ mod tests {
12561298
Arc<test_utils::TestBroadcaster>,
12571299
Arc<test_utils::TestFeeEstimator>,
12581300
Arc<test_utils::TestLogger>,
1259-
Arc<FilesystemStore>,
1301+
Arc<Persister>,
12601302
Arc<KeysManager>,
12611303
>;
12621304

@@ -1314,7 +1356,7 @@ mod tests {
13141356
>,
13151357
liquidity_manager: Arc<LM>,
13161358
chain_monitor: Arc<ChainMonitor>,
1317-
kv_store: Arc<FilesystemStore>,
1359+
kv_store: Arc<Persister>,
13181360
tx_broadcaster: Arc<test_utils::TestBroadcaster>,
13191361
network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
13201362
logger: Arc<test_utils::TestLogger>,
@@ -1326,7 +1368,7 @@ mod tests {
13261368
Arc<TestWallet>,
13271369
Arc<test_utils::TestFeeEstimator>,
13281370
Arc<test_utils::TestChainSource>,
1329-
Arc<FilesystemStore>,
1371+
Arc<Persister>,
13301372
Arc<test_utils::TestLogger>,
13311373
Arc<KeysManager>,
13321374
>,
@@ -1418,6 +1460,10 @@ mod tests {
14181460
fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
14191461
Self { scorer_error: Some((error, message)), ..self }
14201462
}
1463+
1464+
pub fn get_data_dir(&self) -> PathBuf {
1465+
self.kv_store.get_data_dir()
1466+
}
14211467
}
14221468

14231469
impl KVStore for Persister {
@@ -1662,7 +1708,7 @@ mod tests {
16621708
));
16631709
let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
16641710
let kv_store =
1665-
Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into()));
1711+
Arc::new(Persister::new(format!("{}_persister_{}", &persist_dir, i).into()));
16661712
let now = Duration::from_secs(genesis_block.header.time as u64);
16671713
let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
16681714
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(

lightning/src/util/persist.rs

Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,7 @@ use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
2424
use crate::chain::chainmonitor::Persist;
2525
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate};
2626
use crate::chain::transaction::OutPoint;
27-
use crate::ln::channelmanager::AChannelManager;
2827
use crate::ln::types::ChannelId;
29-
use crate::routing::gossip::NetworkGraph;
30-
use crate::routing::scoring::WriteableScore;
3128
use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider};
3229
use crate::util::logger::Logger;
3330
use crate::util::ser::{Readable, ReadableArgs, Writeable};
@@ -199,61 +196,6 @@ pub fn migrate_kv_store_data<S: MigratableKVStore, T: MigratableKVStore>(
199196
Ok(())
200197
}
201198

202-
/// Trait that handles persisting a [`ChannelManager`], [`NetworkGraph`], and [`WriteableScore`] to disk.
203-
///
204-
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
205-
pub trait Persister<'a, CM: Deref, L: Deref, S: Deref>
206-
where
207-
CM::Target: 'static + AChannelManager,
208-
L::Target: 'static + Logger,
209-
S::Target: WriteableScore<'a>,
210-
{
211-
/// Persist the given ['ChannelManager'] to disk, returning an error if persistence failed.
212-
///
213-
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
214-
fn persist_manager(&self, channel_manager: &CM) -> Result<(), io::Error>;
215-
216-
/// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
217-
fn persist_graph(&self, network_graph: &NetworkGraph<L>) -> Result<(), io::Error>;
218-
219-
/// Persist the given [`WriteableScore`] to disk, returning an error if persistence failed.
220-
fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error>;
221-
}
222-
223-
impl<'a, A: KVStore + ?Sized, CM: Deref, L: Deref, S: Deref> Persister<'a, CM, L, S> for A
224-
where
225-
CM::Target: 'static + AChannelManager,
226-
L::Target: 'static + Logger,
227-
S::Target: WriteableScore<'a>,
228-
{
229-
fn persist_manager(&self, channel_manager: &CM) -> Result<(), io::Error> {
230-
self.write(
231-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
232-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
233-
CHANNEL_MANAGER_PERSISTENCE_KEY,
234-
&channel_manager.get_cm().encode(),
235-
)
236-
}
237-
238-
fn persist_graph(&self, network_graph: &NetworkGraph<L>) -> Result<(), io::Error> {
239-
self.write(
240-
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
241-
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
242-
NETWORK_GRAPH_PERSISTENCE_KEY,
243-
&network_graph.encode(),
244-
)
245-
}
246-
247-
fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error> {
248-
self.write(
249-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
250-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
251-
SCORER_PERSISTENCE_KEY,
252-
&scorer.encode(),
253-
)
254-
}
255-
}
256-
257199
impl<ChannelSigner: EcdsaChannelSigner, K: KVStore + ?Sized> Persist<ChannelSigner> for K {
258200
// TODO: We really need a way for the persister to inform the user that its time to crash/shut
259201
// down once these start returning failure.

0 commit comments

Comments
 (0)