Skip to content

Commit d4bea6a

Browse files
committed
Convert process_events_async to take an asynchronous Persister
Also provide a wrapper to allow a sync kvstore to be used.
1 parent a1d9e79 commit d4bea6a

File tree

2 files changed

+268
-55
lines changed

2 files changed

+268
-55
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 167 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ use lightning::sign::ChangeDestinationSourceSync;
4040
use lightning::sign::EntropySource;
4141
use lightning::sign::OutputSpender;
4242
use lightning::util::logger::Logger;
43-
use lightning::util::persist::{KVStoreSync, PersisterSync};
43+
use lightning::util::persist::KVStoreSync;
44+
use lightning::util::persist::Persister;
45+
use lightning::util::persist::PersisterSync;
4446
use lightning::util::sweep::OutputSweeper;
4547
#[cfg(feature = "std")]
4648
use lightning::util::sweep::OutputSweeperSync;
@@ -50,7 +52,9 @@ use lightning_rapid_gossip_sync::RapidGossipSync;
5052

5153
use lightning_liquidity::ALiquidityManager;
5254

55+
use core::future::Future;
5356
use core::ops::Deref;
57+
use core::pin::Pin;
5458
use core::time::Duration;
5559

5660
#[cfg(feature = "std")]
@@ -311,6 +315,15 @@ fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + Wri
311315
true
312316
}
313317

318+
macro_rules! maybe_await {
319+
(true, $e:expr) => {
320+
$e.await
321+
};
322+
(false, $e:expr) => {
323+
$e
324+
};
325+
}
326+
314327
macro_rules! define_run_body {
315328
(
316329
$persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
@@ -319,7 +332,7 @@ macro_rules! define_run_body {
319332
$peer_manager: ident, $gossip_sync: ident,
320333
$process_sweeper: expr,
321334
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
322-
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
335+
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $async_persist: tt,
323336
) => { {
324337
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
325338
$channel_manager.get_cm().timer_tick_occurred();
@@ -375,7 +388,7 @@ macro_rules! define_run_body {
375388

376389
if $channel_manager.get_cm().get_and_clear_needs_persistence() {
377390
log_trace!($logger, "Persisting ChannelManager...");
378-
$persister.persist_manager(&$channel_manager)?;
391+
maybe_await!($async_persist, $persister.persist_manager(&$channel_manager))?;
379392
log_trace!($logger, "Done persisting ChannelManager.");
380393
}
381394
if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
@@ -436,7 +449,7 @@ macro_rules! define_run_body {
436449
log_trace!($logger, "Persisting network graph.");
437450
}
438451

439-
if let Err(e) = $persister.persist_graph(network_graph) {
452+
if let Err(e) = maybe_await!($async_persist, $persister.persist_graph(network_graph)) {
440453
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
441454
}
442455

@@ -464,7 +477,7 @@ macro_rules! define_run_body {
464477
} else {
465478
log_trace!($logger, "Persisting scorer");
466479
}
467-
if let Err(e) = $persister.persist_scorer(&scorer) {
480+
if let Err(e) = maybe_await!($async_persist, $persister.persist_scorer(&scorer)) {
468481
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
469482
}
470483
}
@@ -487,16 +500,16 @@ macro_rules! define_run_body {
487500
// After we exit, ensure we persist the ChannelManager one final time - this avoids
488501
// some races where users quit while channel updates were in-flight, with
489502
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
490-
$persister.persist_manager(&$channel_manager)?;
503+
maybe_await!($async_persist, $persister.persist_manager(&$channel_manager))?;
491504

492505
// Persist Scorer on exit
493506
if let Some(ref scorer) = $scorer {
494-
$persister.persist_scorer(&scorer)?;
507+
maybe_await!($async_persist, $persister.persist_scorer(&scorer))?;
495508
}
496509

497510
// Persist NetworkGraph on exit
498511
if let Some(network_graph) = $gossip_sync.network_graph() {
499-
$persister.persist_graph(network_graph)?;
512+
maybe_await!($async_persist, $persister.persist_graph(network_graph))?;
500513
}
501514

502515
Ok(())
@@ -643,11 +656,12 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
643656
/// ```
644657
/// # use lightning::io;
645658
/// # use lightning::events::ReplayEvent;
646-
/// # use lightning::util::sweep::OutputSweeper;
647659
/// # use std::sync::{Arc, RwLock};
648660
/// # use std::sync::atomic::{AtomicBool, Ordering};
649661
/// # use std::time::SystemTime;
650-
/// # use lightning_background_processor::{process_events_async, GossipSync};
662+
/// # use lightning_background_processor::{process_events_full_async, GossipSync};
663+
/// # use core::future::Future;
664+
/// # use core::pin::Pin;
651665
/// # struct Logger {}
652666
/// # impl lightning::util::logger::Logger for Logger {
653667
/// # fn log(&self, _record: lightning::util::logger::Record) {}
@@ -659,6 +673,13 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
659673
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
660674
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
661675
/// # }
676+
/// # struct Store {}
677+
/// # impl lightning::util::persist::KVStore for Store {
678+
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> { todo!() }
679+
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
680+
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
681+
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> { todo!() }
682+
/// # }
662683
/// # struct EventHandler {}
663684
/// # impl EventHandler {
664685
/// # async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ReplayEvent> { Ok(()) }
@@ -677,14 +698,14 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
677698
/// # type LiquidityManager<B, F, FE> = lightning_liquidity::LiquidityManager<Arc<lightning::sign::KeysManager>, Arc<ChannelManager<B, F, FE>>, Arc<F>>;
678699
/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
679700
/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, StoreSync>;
680-
/// #
701+
/// # type OutputSweeper<B, D, FE, F, O> = lightning::util::sweep::OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<StoreSync>, Arc<Logger>, Arc<O>>;
702+
///
681703
/// # struct Node<
682704
/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
683705
/// # F: lightning::chain::Filter + Send + Sync + 'static,
684706
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
685707
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
686708
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
687-
/// # K: lightning::util::persist::KVStoreSync + Send + Sync + 'static,
688709
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
689710
/// # > {
690711
/// # peer_manager: Arc<PeerManager<B, F, FE, UL>>,
@@ -694,10 +715,10 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
694715
/// # liquidity_manager: Arc<LiquidityManager<B, F, FE>>,
695716
/// # chain_monitor: Arc<ChainMonitor<B, F, FE>>,
696717
/// # gossip_sync: Arc<P2PGossipSync<UL>>,
697-
/// # persister: Arc<StoreSync>,
718+
/// # persister: Arc<Store>,
698719
/// # logger: Arc<Logger>,
699720
/// # scorer: Arc<Scorer>,
700-
/// # sweeper: Arc<OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<K>, Arc<Logger>, Arc<O>>>,
721+
/// # sweeper: Arc<OutputSweeper<B, D, FE, F, O>>,
701722
/// # }
702723
/// #
703724
/// # async fn setup_background_processing<
@@ -706,10 +727,9 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
706727
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
707728
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
708729
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
709-
/// # K: lightning::util::persist::KVStoreSync + Send + Sync + 'static,
710730
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
711-
/// # >(node: Node<B, F, FE, UL, D, K, O>) {
712-
/// let background_persister = Arc::clone(&node.persister);
731+
/// # >(node: Node<B, F, FE, UL, D, O>) {
732+
/// let background_persister = Arc::new(Arc::clone(&node.persister));
713733
/// let background_event_handler = Arc::clone(&node.event_handler);
714734
/// let background_chain_mon = Arc::clone(&node.chain_monitor);
715735
/// let background_chan_man = Arc::clone(&node.channel_manager);
@@ -744,7 +764,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
744764
doc = " let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();"
745765
)]
746766
#[cfg_attr(not(feature = "std"), doc = " rt.block_on(async move {")]
747-
/// process_events_async(
767+
/// process_events_full_async(
748768
/// background_persister,
749769
/// |e| background_event_handler.handle_event(e),
750770
/// background_chain_mon,
@@ -769,7 +789,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
769789
#[cfg_attr(feature = "std", doc = " handle.await.unwrap()")]
770790
/// # }
771791
///```
772-
pub async fn process_events_async<
792+
pub async fn process_events_full_async<
773793
'a,
774794
UL: 'static + Deref,
775795
CF: 'static + Deref,
@@ -814,7 +834,7 @@ where
814834
F::Target: 'static + FeeEstimator,
815835
L::Target: 'static + Logger,
816836
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
817-
PS::Target: 'static + PersisterSync<'a, CM, L, S>,
837+
PS::Target: 'static + Persister<'a, CM, L, S>,
818838
ES::Target: 'static + EntropySource,
819839
CM::Target: AChannelManager,
820840
OM::Target: AOnionMessenger,
@@ -841,7 +861,7 @@ where
841861
if let Some(duration_since_epoch) = fetch_time() {
842862
if update_scorer(scorer, &event, duration_since_epoch) {
843863
log_trace!(logger, "Persisting scorer after update");
844-
if let Err(e) = persister.persist_scorer(&*scorer) {
864+
if let Err(e) = persister.persist_scorer(&*scorer).await {
845865
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
846866
// We opt not to abort early on persistence failure here as persisting
847867
// the scorer is non-critical and we still hope that it will have
@@ -919,9 +939,134 @@ where
919939
},
920940
mobile_interruptable_platform,
921941
fetch_time,
942+
true,
922943
)
923944
}
924945

946+
struct PersisterSyncWrapper<'a, PS, CM, L, S> {
947+
inner: PS,
948+
_phantom: core::marker::PhantomData<(&'a (), CM, L, S)>,
949+
}
950+
951+
impl<'a, PS, CM, L, S> Deref for PersisterSyncWrapper<'_, PS, CM, L, S> {
952+
type Target = Self;
953+
fn deref(&self) -> &Self {
954+
self
955+
}
956+
}
957+
958+
impl<'a, PS, CM, L, S> PersisterSyncWrapper<'a, PS, CM, L, S> {
959+
pub fn new(inner: PS) -> Self {
960+
Self { inner, _phantom: core::marker::PhantomData }
961+
}
962+
}
963+
964+
impl<'a, PS: Deref, CM: Deref + 'static, L: Deref + 'static, S: Deref + 'static>
965+
Persister<'a, CM, L, S> for PersisterSyncWrapper<'a, PS, CM, L, S>
966+
where
967+
PS::Target: PersisterSync<'a, CM, L, S>,
968+
CM::Target: AChannelManager,
969+
L::Target: Logger,
970+
S::Target: WriteableScore<'a>,
971+
{
972+
fn persist_manager(
973+
&self, channel_manager: &CM,
974+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
975+
let res = self.inner.persist_manager(&channel_manager);
976+
Box::pin(async move { res })
977+
}
978+
979+
fn persist_graph(
980+
&self, network_graph: &NetworkGraph<L>,
981+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
982+
let res = self.inner.persist_graph(&network_graph);
983+
Box::pin(async move { res })
984+
}
985+
986+
fn persist_scorer(
987+
&self, scorer: &S,
988+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
989+
let res = self.inner.persist_scorer(&scorer);
990+
Box::pin(async move { res })
991+
}
992+
}
993+
994+
/// Async events processor that is based on [`process_events_async`] but allows for [`PersisterSync`] to be used for
995+
/// synchronous background persistence.
996+
pub async fn process_events_async<
997+
UL: 'static + Deref,
998+
CF: 'static + Deref,
999+
T: 'static + Deref,
1000+
F: 'static + Deref,
1001+
G: 'static + Deref<Target = NetworkGraph<L>>,
1002+
L: 'static + Deref + Send + Sync,
1003+
P: 'static + Deref,
1004+
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
1005+
EventHandler: Fn(Event) -> EventHandlerFuture,
1006+
PS: 'static + Deref + Send + Sync,
1007+
ES: 'static + Deref + Send,
1008+
M: 'static
1009+
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>
1010+
+ Send
1011+
+ Sync,
1012+
CM: 'static + Deref + Send + Sync,
1013+
OM: 'static + Deref,
1014+
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>>,
1015+
RGS: 'static + Deref<Target = RapidGossipSync<G, L>>,
1016+
PM: 'static + Deref,
1017+
LM: 'static + Deref,
1018+
D: 'static + Deref,
1019+
O: 'static + Deref,
1020+
K: 'static + Deref,
1021+
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
1022+
S: 'static + Deref<Target = SC> + Send + Sync,
1023+
SC: for<'b> WriteableScore<'b>,
1024+
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
1025+
Sleeper: Fn(Duration) -> SleepFuture,
1026+
FetchTime: Fn() -> Option<Duration>,
1027+
>(
1028+
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
1029+
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
1030+
liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
1031+
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
1032+
) -> Result<(), lightning::io::Error>
1033+
where
1034+
UL::Target: 'static + UtxoLookup,
1035+
CF::Target: 'static + chain::Filter,
1036+
T::Target: 'static + BroadcasterInterface,
1037+
F::Target: 'static + FeeEstimator,
1038+
L::Target: 'static + Logger,
1039+
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
1040+
PS::Target: 'static + PersisterSync<'static, CM, L, S>,
1041+
ES::Target: 'static + EntropySource,
1042+
CM::Target: AChannelManager,
1043+
OM::Target: AOnionMessenger,
1044+
PM::Target: APeerManager,
1045+
LM::Target: ALiquidityManager,
1046+
O::Target: 'static + OutputSpender,
1047+
D::Target: 'static + ChangeDestinationSource,
1048+
K::Target: 'static + KVStoreSync,
1049+
{
1050+
let persister = PersisterSyncWrapper::<'static, PS, CM, L, S>::new(persister);
1051+
process_events_full_async(
1052+
persister,
1053+
event_handler,
1054+
chain_monitor,
1055+
channel_manager,
1056+
onion_messenger,
1057+
gossip_sync,
1058+
peer_manager,
1059+
liquidity_manager,
1060+
sweeper,
1061+
logger,
1062+
scorer,
1063+
sleeper,
1064+
mobile_interruptable_platform,
1065+
fetch_time,
1066+
)
1067+
.await
1068+
}
1069+
9251070
#[cfg(feature = "std")]
9261071
impl BackgroundProcessor {
9271072
/// Start a background thread that takes care of responsibilities enumerated in the [top-level
@@ -1098,6 +1243,7 @@ impl BackgroundProcessor {
10981243
.expect("Time should be sometime after 1970"),
10991244
)
11001245
},
1246+
false,
11011247
)
11021248
});
11031249
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }

0 commit comments

Comments
 (0)