Skip to content

Commit c966148

Browse files
committed
Add async KVStore
1 parent a5d3546 commit c966148

File tree

3 files changed

+535
-151
lines changed

3 files changed

+535
-151
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 145 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,16 @@ use lightning::sign::EntropySource;
4242
use lightning::sign::OutputSpender;
4343
use lightning::util::logger::Logger;
4444
use lightning::util::persist::{
45-
KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
46-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
47-
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
48-
SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
49-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
45+
KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
46+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
47+
NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
48+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY,
49+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
5050
};
5151
use lightning::util::sweep::OutputSweeper;
5252
#[cfg(feature = "std")]
5353
use lightning::util::sweep::OutputSweeperSync;
54+
use lightning::util::sweep::OutputSweeperSyncKVStore;
5455
#[cfg(feature = "std")]
5556
use lightning::util::wakers::Sleeper;
5657
use lightning_rapid_gossip_sync::RapidGossipSync;
@@ -318,6 +319,15 @@ fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + Wri
318319
true
319320
}
320321

322+
macro_rules! maybe_await {
323+
(true, $e:expr) => {
324+
$e.await
325+
};
326+
(false, $e:expr) => {
327+
$e
328+
};
329+
}
330+
321331
macro_rules! define_run_body {
322332
(
323333
$kv_store: ident,
@@ -327,7 +337,7 @@ macro_rules! define_run_body {
327337
$peer_manager: ident, $gossip_sync: ident,
328338
$process_sweeper: expr,
329339
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
330-
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
340+
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $async_persist: tt,
331341
) => { {
332342
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
333343
$channel_manager.get_cm().timer_tick_occurred();
@@ -383,12 +393,12 @@ macro_rules! define_run_body {
383393

384394
if $channel_manager.get_cm().get_and_clear_needs_persistence() {
385395
log_trace!($logger, "Persisting ChannelManager...");
386-
$kv_store.write(
396+
maybe_await!($async_persist, $kv_store.write(
387397
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
388398
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
389399
CHANNEL_MANAGER_PERSISTENCE_KEY,
390400
&$channel_manager.get_cm().encode(),
391-
)?;
401+
))?;
392402
log_trace!($logger, "Done persisting ChannelManager.");
393403
}
394404
if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
@@ -449,12 +459,12 @@ macro_rules! define_run_body {
449459
log_trace!($logger, "Persisting network graph.");
450460
}
451461

452-
if let Err(e) = $kv_store.write(
462+
if let Err(e) = maybe_await!($async_persist, $kv_store.write(
453463
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
454464
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
455465
NETWORK_GRAPH_PERSISTENCE_KEY,
456466
&network_graph.encode(),
457-
) {
467+
)) {
458468
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
459469
}
460470

@@ -482,12 +492,12 @@ macro_rules! define_run_body {
482492
} else {
483493
log_trace!($logger, "Persisting scorer");
484494
}
485-
if let Err(e) = $kv_store.write(
495+
if let Err(e) = maybe_await!($async_persist, $kv_store.write(
486496
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
487497
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
488498
SCORER_PERSISTENCE_KEY,
489499
&scorer.encode(),
490-
) {
500+
)) {
491501
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
492502
}
493503
}
@@ -510,31 +520,31 @@ macro_rules! define_run_body {
510520
// After we exit, ensure we persist the ChannelManager one final time - this avoids
511521
// some races where users quit while channel updates were in-flight, with
512522
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
513-
$kv_store.write(
523+
maybe_await!($async_persist, $kv_store.write(
514524
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
515525
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
516526
CHANNEL_MANAGER_PERSISTENCE_KEY,
517527
&$channel_manager.get_cm().encode(),
518-
)?;
528+
))?;
519529

520530
// Persist Scorer on exit
521531
if let Some(ref scorer) = $scorer {
522-
$kv_store.write(
532+
maybe_await!($async_persist, $kv_store.write(
523533
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
524534
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
525535
SCORER_PERSISTENCE_KEY,
526536
&scorer.encode(),
527-
)?;
537+
))?;
528538
}
529539

530540
// Persist NetworkGraph on exit
531541
if let Some(network_graph) = $gossip_sync.network_graph() {
532-
$kv_store.write(
542+
maybe_await!($async_persist, $kv_store.write(
533543
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
534544
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
535545
NETWORK_GRAPH_PERSISTENCE_KEY,
536546
&network_graph.encode(),
537-
)?;
547+
))?;
538548
}
539549

540550
Ok(())
@@ -681,11 +691,12 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
681691
/// ```
682692
/// # use lightning::io;
683693
/// # use lightning::events::ReplayEvent;
684-
/// # use lightning::util::sweep::OutputSweeper;
685694
/// # use std::sync::{Arc, RwLock};
686695
/// # use std::sync::atomic::{AtomicBool, Ordering};
687696
/// # use std::time::SystemTime;
688-
/// # use lightning_background_processor::{process_events_async, GossipSync};
697+
/// # use lightning_background_processor::{process_events_full_async, GossipSync};
698+
/// # use core::future::Future;
699+
/// # use core::pin::Pin;
689700
/// # struct Logger {}
690701
/// # impl lightning::util::logger::Logger for Logger {
691702
/// # fn log(&self, _record: lightning::util::logger::Record) {}
@@ -697,6 +708,13 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
697708
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
698709
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
699710
/// # }
711+
/// # struct Store {}
712+
/// # impl lightning::util::persist::KVStore for Store {
713+
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> { todo!() }
714+
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
715+
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
716+
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> { todo!() }
717+
/// # }
700718
/// # struct EventHandler {}
701719
/// # impl EventHandler {
702720
/// # async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ReplayEvent> { Ok(()) }
@@ -715,7 +733,8 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
715733
/// # type LiquidityManager<B, F, FE> = lightning_liquidity::LiquidityManager<Arc<lightning::sign::KeysManager>, Arc<ChannelManager<B, F, FE>>, Arc<F>>;
716734
/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
717735
/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, StoreSync>;
718-
/// #
736+
/// # type OutputSweeper<B, D, FE, F, O> = lightning::util::sweep::OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<Store>, Arc<Logger>, Arc<O>>;
737+
///
719738
/// # struct Node<
720739
/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
721740
/// # F: lightning::chain::Filter + Send + Sync + 'static,
@@ -731,10 +750,10 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
731750
/// # liquidity_manager: Arc<LiquidityManager<B, F, FE>>,
732751
/// # chain_monitor: Arc<ChainMonitor<B, F, FE>>,
733752
/// # gossip_sync: Arc<P2PGossipSync<UL>>,
734-
/// # persister: Arc<StoreSync>,
753+
/// # persister: Arc<Store>,
735754
/// # logger: Arc<Logger>,
736755
/// # scorer: Arc<Scorer>,
737-
/// # sweeper: Arc<OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<Store>, Arc<Logger>, Arc<O>>>,
756+
/// # sweeper: Arc<OutputSweeper<B, D, FE, F, O>>,
738757
/// # }
739758
/// #
740759
/// # async fn setup_background_processing<
@@ -780,7 +799,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
780799
doc = " let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();"
781800
)]
782801
#[cfg_attr(not(feature = "std"), doc = " rt.block_on(async move {")]
783-
/// process_events_async(
802+
/// process_events_full_async(
784803
/// background_persister,
785804
/// |e| background_event_handler.handle_event(e),
786805
/// background_chain_mon,
@@ -805,7 +824,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
805824
#[cfg_attr(feature = "std", doc = " handle.await.unwrap()")]
806825
/// # }
807826
///```
808-
pub async fn process_events_async<
827+
pub async fn process_events_full_async<
809828
'a,
810829
UL: 'static + Deref,
811830
CF: 'static + Deref,
@@ -856,7 +875,7 @@ where
856875
LM::Target: ALiquidityManager,
857876
O::Target: 'static + OutputSpender,
858877
D::Target: 'static + ChangeDestinationSource,
859-
K::Target: 'static + KVStoreSync,
878+
K::Target: 'static + KVStore,
860879
{
861880
let mut should_break = false;
862881
let async_event_handler = |event| {
@@ -875,12 +894,15 @@ where
875894
if let Some(duration_since_epoch) = fetch_time() {
876895
if update_scorer(scorer, &event, duration_since_epoch) {
877896
log_trace!(logger, "Persisting scorer after update");
878-
if let Err(e) = kv_store.write(
879-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
880-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
881-
SCORER_PERSISTENCE_KEY,
882-
&scorer.encode(),
883-
) {
897+
if let Err(e) = kv_store
898+
.write(
899+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
900+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
901+
SCORER_PERSISTENCE_KEY,
902+
&scorer.encode(),
903+
)
904+
.await
905+
{
884906
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
885907
// We opt not to abort early on persistence failure here as persisting
886908
// the scorer is non-critical and we still hope that it will have
@@ -958,7 +980,83 @@ where
958980
},
959981
mobile_interruptable_platform,
960982
fetch_time,
983+
true,
984+
)
985+
}
986+
987+
/// Async events processor that is based on [`process_events_async`] but allows for [`KVStoreSync`] to be used for
988+
/// synchronous background persistence.
989+
pub async fn process_events_async<
990+
UL: 'static + Deref,
991+
CF: 'static + Deref,
992+
T: 'static + Deref,
993+
F: 'static + Deref,
994+
G: 'static + Deref<Target = NetworkGraph<L>>,
995+
L: 'static + Deref + Send + Sync,
996+
P: 'static + Deref,
997+
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
998+
EventHandler: Fn(Event) -> EventHandlerFuture,
999+
ES: 'static + Deref + Send,
1000+
M: 'static
1001+
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>
1002+
+ Send
1003+
+ Sync,
1004+
CM: 'static + Deref + Send + Sync,
1005+
OM: 'static + Deref,
1006+
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>>,
1007+
RGS: 'static + Deref<Target = RapidGossipSync<G, L>>,
1008+
PM: 'static + Deref,
1009+
LM: 'static + Deref,
1010+
D: 'static + Deref,
1011+
O: 'static + Deref,
1012+
K: 'static + Deref,
1013+
OS: 'static + Deref<Target = OutputSweeperSyncKVStore<T, D, F, CF, K, L, O>>,
1014+
S: 'static + Deref<Target = SC> + Send + Sync,
1015+
SC: for<'b> WriteableScore<'b>,
1016+
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
1017+
Sleeper: Fn(Duration) -> SleepFuture,
1018+
FetchTime: Fn() -> Option<Duration>,
1019+
>(
1020+
kv_store: K, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
1021+
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
1022+
liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
1023+
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
1024+
) -> Result<(), lightning::io::Error>
1025+
where
1026+
UL::Target: 'static + UtxoLookup,
1027+
CF::Target: 'static + chain::Filter,
1028+
T::Target: 'static + BroadcasterInterface,
1029+
F::Target: 'static + FeeEstimator,
1030+
L::Target: 'static + Logger,
1031+
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
1032+
ES::Target: 'static + EntropySource,
1033+
CM::Target: AChannelManager,
1034+
OM::Target: AOnionMessenger,
1035+
PM::Target: APeerManager,
1036+
LM::Target: ALiquidityManager,
1037+
O::Target: 'static + OutputSpender,
1038+
D::Target: 'static + ChangeDestinationSource,
1039+
K::Target: 'static + KVStoreSync,
1040+
{
1041+
let kv_store = Arc::new(KVStoreSyncWrapper(kv_store));
1042+
let sweeper = sweeper.map(|s| s.sweeper_async());
1043+
process_events_full_async(
1044+
kv_store,
1045+
event_handler,
1046+
chain_monitor,
1047+
channel_manager,
1048+
onion_messenger,
1049+
gossip_sync,
1050+
peer_manager,
1051+
liquidity_manager,
1052+
sweeper,
1053+
logger,
1054+
scorer,
1055+
sleeper,
1056+
mobile_interruptable_platform,
1057+
fetch_time,
9611058
)
1059+
.await
9621060
}
9631061

9641062
#[cfg(feature = "std")]
@@ -1140,6 +1238,7 @@ impl BackgroundProcessor {
11401238
.expect("Time should be sometime after 1970"),
11411239
)
11421240
},
1241+
false,
11431242
)
11441243
});
11451244
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
@@ -1228,7 +1327,7 @@ mod tests {
12281327
use lightning::types::payment::PaymentHash;
12291328
use lightning::util::config::UserConfig;
12301329
use lightning::util::persist::{
1231-
KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY,
1330+
KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
12321331
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
12331332
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
12341333
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
@@ -2153,12 +2252,13 @@ mod tests {
21532252
open_channel!(nodes[0], nodes[1], 100000);
21542253

21552254
let data_dir = nodes[0].kv_store.get_data_dir();
2156-
let persister = Arc::new(
2255+
let kv_store_sync = Arc::new(
21572256
Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
21582257
);
2258+
let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync));
21592259

2160-
let bp_future = super::process_events_async(
2161-
persister,
2260+
let bp_future = super::process_events_full_async(
2261+
kv_store,
21622262
|_: _| async { Ok(()) },
21632263
Arc::clone(&nodes[0].chain_monitor),
21642264
Arc::clone(&nodes[0].node),
@@ -2661,11 +2761,13 @@ mod tests {
26612761
let (_, nodes) =
26622762
create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
26632763
let data_dir = nodes[0].kv_store.get_data_dir();
2664-
let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
2764+
let kv_store_sync =
2765+
Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
2766+
let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync));
26652767

26662768
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
2667-
let bp_future = super::process_events_async(
2668-
persister,
2769+
let bp_future = super::process_events_full_async(
2770+
kv_store,
26692771
|_: _| async { Ok(()) },
26702772
Arc::clone(&nodes[0].chain_monitor),
26712773
Arc::clone(&nodes[0].node),
@@ -2877,12 +2979,13 @@ mod tests {
28772979

28782980
let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async");
28792981
let data_dir = nodes[0].kv_store.get_data_dir();
2880-
let persister = Arc::new(Persister::new(data_dir));
2982+
let kv_store_sync = Arc::new(Persister::new(data_dir));
2983+
let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync));
28812984

28822985
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
28832986

2884-
let bp_future = super::process_events_async(
2885-
persister,
2987+
let bp_future = super::process_events_full_async(
2988+
kv_store,
28862989
event_handler,
28872990
Arc::clone(&nodes[0].chain_monitor),
28882991
Arc::clone(&nodes[0].node),

0 commit comments

Comments
 (0)