@@ -40,7 +40,9 @@ use lightning::sign::ChangeDestinationSourceSync;
4040use lightning:: sign:: EntropySource ;
4141use lightning:: sign:: OutputSpender ;
4242use lightning:: util:: logger:: Logger ;
43- use lightning:: util:: persist:: { KVStoreSync , PersisterSync } ;
43+ #[ cfg( feature = "std" ) ]
44+ use lightning:: util:: persist:: PersisterSync ;
45+ use lightning:: util:: persist:: { KVStoreSync , Persister } ;
4446use lightning:: util:: sweep:: OutputSweeper ;
4547#[ cfg( feature = "std" ) ]
4648use lightning:: util:: sweep:: OutputSweeperSync ;
@@ -50,7 +52,9 @@ use lightning_rapid_gossip_sync::RapidGossipSync;
5052
5153use lightning_liquidity:: ALiquidityManager ;
5254
55+ use core:: future:: Future ;
5356use core:: ops:: Deref ;
57+ use core:: pin:: Pin ;
5458use core:: time:: Duration ;
5559
5660#[ cfg( feature = "std" ) ]
@@ -311,6 +315,15 @@ fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + Wri
311315 true
312316}
313317
318+ macro_rules! maybe_await {
319+ ( true , $e: expr) => {
320+ $e. await
321+ } ;
322+ ( false , $e: expr) => {
323+ $e
324+ } ;
325+ }
326+
314327macro_rules! define_run_body {
315328 (
316329 $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
@@ -319,7 +332,7 @@ macro_rules! define_run_body {
319332 $peer_manager: ident, $gossip_sync: ident,
320333 $process_sweeper: expr,
321334 $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
322- $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
335+ $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $async : tt ,
323336 ) => { {
324337 log_trace!( $logger, "Calling ChannelManager's timer_tick_occurred on startup" ) ;
325338 $channel_manager. get_cm( ) . timer_tick_occurred( ) ;
@@ -375,7 +388,7 @@ macro_rules! define_run_body {
375388
376389 if $channel_manager. get_cm( ) . get_and_clear_needs_persistence( ) {
377390 log_trace!( $logger, "Persisting ChannelManager..." ) ;
378- $ persister. persist_manager( & $channel_manager) ?;
391+ maybe_await! ( $async , $ persister. persist_manager( & $channel_manager) ) ?;
379392 log_trace!( $logger, "Done persisting ChannelManager." ) ;
380393 }
381394 if $timer_elapsed( & mut last_freshness_call, FRESHNESS_TIMER ) {
@@ -436,7 +449,7 @@ macro_rules! define_run_body {
436449 log_trace!( $logger, "Persisting network graph." ) ;
437450 }
438451
439- if let Err ( e) = $ persister. persist_graph( network_graph) {
452+ if let Err ( e) = maybe_await! ( $async , $ persister. persist_graph( network_graph) ) {
440453 log_error!( $logger, "Error: Failed to persist network graph, check your disk and permissions {}" , e)
441454 }
442455
@@ -464,7 +477,7 @@ macro_rules! define_run_body {
464477 } else {
465478 log_trace!( $logger, "Persisting scorer" ) ;
466479 }
467- if let Err ( e) = $ persister. persist_scorer( & scorer) {
480+ if let Err ( e) = maybe_await! ( $async , $ persister. persist_scorer( & scorer) ) {
468481 log_error!( $logger, "Error: Failed to persist scorer, check your disk and permissions {}" , e)
469482 }
470483 }
@@ -487,16 +500,16 @@ macro_rules! define_run_body {
487500 // After we exit, ensure we persist the ChannelManager one final time - this avoids
488501 // some races where users quit while channel updates were in-flight, with
489502 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
490- $ persister. persist_manager( & $channel_manager) ?;
503+ maybe_await! ( $async , $ persister. persist_manager( & $channel_manager) ) ?;
491504
492505 // Persist Scorer on exit
493506 if let Some ( ref scorer) = $scorer {
494- $ persister. persist_scorer( & scorer) ?;
507+ maybe_await! ( $async , $ persister. persist_scorer( & scorer) ) ?;
495508 }
496509
497510 // Persist NetworkGraph on exit
498511 if let Some ( network_graph) = $gossip_sync. network_graph( ) {
499- $ persister. persist_graph( network_graph) ?;
512+ maybe_await! ( $async , $ persister. persist_graph( network_graph) ) ?;
500513 }
501514
502515 Ok ( ( ) )
@@ -643,22 +656,30 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
643656/// ```
644657/// # use lightning::io;
645658/// # use lightning::events::ReplayEvent;
646- /// # use lightning::util::sweep::OutputSweeper;
647659/// # use std::sync::{Arc, RwLock};
648660/// # use std::sync::atomic::{AtomicBool, Ordering};
649661/// # use std::time::SystemTime;
650662/// # use lightning_background_processor::{process_events_async, GossipSync};
663+ /// # use core::future::Future;
664+ /// # use core::pin::Pin;
651665/// # struct Logger {}
652666/// # impl lightning::util::logger::Logger for Logger {
653667/// # fn log(&self, _record: lightning::util::logger::Record) {}
654668/// # }
655- /// # struct Store {}
656- /// # impl lightning::util::persist::KVStore for Store {
669+ /// # struct StoreSync {}
670+ /// # impl lightning::util::persist::KVStoreSync for StoreSync {
657671/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
658672/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
659673/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
660674/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
661675/// # }
676+ /// # struct Store {}
677+ /// # impl lightning::util::persist::KVStore for Store {
678+ /// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> { todo!() }
679+ /// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
680+ /// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
681+ /// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> { todo!() }
682+ /// # }
662683/// # struct EventHandler {}
663684/// # impl EventHandler {
664685/// # async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ReplayEvent> { Ok(()) }
@@ -669,22 +690,22 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
669690/// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
670691/// # fn disconnect_socket(&mut self) {}
671692/// # }
672- /// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<Store >, Arc<lightning::sign::KeysManager>>;
693+ /// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync >, Arc<lightning::sign::KeysManager>>;
673694/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
674695/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
675696/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
676697/// # type OnionMessenger<B, F, FE> = lightning::onion_message::messenger::OnionMessenger<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<Logger>, Arc<ChannelManager<B, F, FE>>, Arc<lightning::onion_message::messenger::DefaultMessageRouter<Arc<NetworkGraph>, Arc<Logger>, Arc<lightning::sign::KeysManager>>>, Arc<ChannelManager<B, F, FE>>, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler>;
677698/// # type LiquidityManager<B, F, FE> = lightning_liquidity::LiquidityManager<Arc<lightning::sign::KeysManager>, Arc<ChannelManager<B, F, FE>>, Arc<F>>;
678699/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
679- /// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, Store>;
680- /// #
700+ /// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, StoreSync>;
701+ /// # type OutputSweeper<B, D, FE, F, O> = lightning::util::sweep::OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<StoreSync>, Arc<Logger>, Arc<O>>;
702+ ///
681703/// # struct Node<
682704/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
683705/// # F: lightning::chain::Filter + Send + Sync + 'static,
684706/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
685707/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
686708/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
687- /// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
688709/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
689710/// # > {
690711/// # peer_manager: Arc<PeerManager<B, F, FE, UL>>,
@@ -697,7 +718,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
697718/// # persister: Arc<Store>,
698719/// # logger: Arc<Logger>,
699720/// # scorer: Arc<Scorer>,
700- /// # sweeper: Arc<OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<K>, Arc<Logger>, Arc<O> >>,
721+ /// # sweeper: Arc<OutputSweeper<B, D, FE, F, O >>,
701722/// # }
702723/// #
703724/// # async fn setup_background_processing<
@@ -706,10 +727,9 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
706727/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
707728/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
708729/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
709- /// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
710730/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
711- /// # >(node: Node<B, F, FE, UL, D, K, O>) {
712- /// let background_persister = Arc::clone(&node.persister);
731+ /// # >(node: Node<B, F, FE, UL, D, O>) {
732+ /// let background_persister = Arc::new(Arc:: clone(&node.persister) );
713733/// let background_event_handler = Arc::clone(&node.event_handler);
714734/// let background_chain_mon = Arc::clone(&node.chain_monitor);
715735/// let background_chan_man = Arc::clone(&node.channel_manager);
@@ -814,7 +834,7 @@ where
814834 F :: Target : ' static + FeeEstimator ,
815835 L :: Target : ' static + Logger ,
816836 P :: Target : ' static + Persist < <CM :: Target as AChannelManager >:: Signer > ,
817- PS :: Target : ' static + PersisterSync < ' a , CM , L , S > ,
837+ PS :: Target : ' static + Persister < ' a , CM , L , S > ,
818838 ES :: Target : ' static + EntropySource ,
819839 CM :: Target : AChannelManager ,
820840 OM :: Target : AOnionMessenger ,
@@ -841,7 +861,7 @@ where
841861 if let Some ( duration_since_epoch) = fetch_time ( ) {
842862 if update_scorer ( scorer, & event, duration_since_epoch) {
843863 log_trace ! ( logger, "Persisting scorer after update" ) ;
844- if let Err ( e) = persister. persist_scorer ( & * scorer) {
864+ if let Err ( e) = persister. persist_scorer ( & * scorer) . await {
845865 log_error ! ( logger, "Error: Failed to persist scorer, check your disk and permissions {}" , e) ;
846866 // We opt not to abort early on persistence failure here as persisting
847867 // the scorer is non-critical and we still hope that it will have
@@ -919,7 +939,134 @@ where
919939 } ,
920940 mobile_interruptable_platform,
921941 fetch_time,
942+ true ,
943+ )
944+ }
945+
946+ /// A wrapper that turns a synchronous [`PersisterSync`] into an async [`Persister`].
947+ struct PersisterSyncWrapper < ' a , PS , CM , L , S > {
948+ inner : Arc < PS > ,
949+ _phantom : std:: marker:: PhantomData < ( & ' a ( ) , CM , L , S ) > ,
950+ }
951+
952+ impl < ' a , PS , CM , L , S > Deref for PersisterSyncWrapper < ' _ , PS , CM , L , S > {
953+ type Target = Self ;
954+ fn deref ( & self ) -> & Self {
955+ self
956+ }
957+ }
958+
959+ impl < ' a , PS , CM , L , S > PersisterSyncWrapper < ' a , PS , CM , L , S > {
960+ /// Constructs a new [`PersisterSyncWrapper`] from the given sync persister.
961+ pub fn new ( inner : PS ) -> Self {
962+ Self { inner : Arc :: new ( inner) , _phantom : std:: marker:: PhantomData }
963+ }
964+ }
965+
966+ impl < ' a , PS : Deref , CM : Deref + ' static , L : Deref + ' static , S : Deref + ' static >
967+ Persister < ' a , CM , L , S > for PersisterSyncWrapper < ' a , PS , CM , L , S >
968+ where
969+ PS :: Target : PersisterSync < ' a , CM , L , S > ,
970+ CM :: Target : AChannelManager ,
971+ L :: Target : Logger ,
972+ S :: Target : WriteableScore < ' a > ,
973+ {
974+ fn persist_manager (
975+ & self , channel_manager : & CM ,
976+ ) -> Pin < Box < dyn Future < Output = Result < ( ) , lightning:: io:: Error > > + ' static + Send > > {
977+ let res = self . inner . persist_manager ( & channel_manager) ;
978+ Box :: pin ( async move { res } )
979+ }
980+
981+ fn persist_graph (
982+ & self , network_graph : & NetworkGraph < L > ,
983+ ) -> Pin < Box < dyn Future < Output = Result < ( ) , lightning:: io:: Error > > + ' static + Send > > {
984+ let res = self . inner . persist_graph ( & network_graph) ;
985+ Box :: pin ( async move { res } )
986+ }
987+
988+ fn persist_scorer (
989+ & self , scorer : & S ,
990+ ) -> Pin < Box < dyn Future < Output = Result < ( ) , lightning:: io:: Error > > + ' static + Send > > {
991+ let res = self . inner . persist_scorer ( & scorer) ;
992+ Box :: pin ( async move { res } )
993+ }
994+ }
995+
996+ /// Async events processor the is based on [`process_events_async`] but allows for [`PersisterSync`] to be used for
997+ /// synchronous background persistence.
998+ pub async fn process_events_partial_async <
999+ UL : ' static + Deref ,
1000+ CF : ' static + Deref ,
1001+ T : ' static + Deref ,
1002+ F : ' static + Deref ,
1003+ G : ' static + Deref < Target = NetworkGraph < L > > ,
1004+ L : ' static + Deref + Send + Sync ,
1005+ P : ' static + Deref ,
1006+ EventHandlerFuture : core:: future:: Future < Output = Result < ( ) , ReplayEvent > > ,
1007+ EventHandler : Fn ( Event ) -> EventHandlerFuture ,
1008+ PS : ' static + Deref + Send + Sync ,
1009+ ES : ' static + Deref + Send ,
1010+ M : ' static
1011+ + Deref < Target = ChainMonitor < <CM :: Target as AChannelManager >:: Signer , CF , T , F , L , P , ES > >
1012+ + Send
1013+ + Sync ,
1014+ CM : ' static + Deref + Send + Sync ,
1015+ OM : ' static + Deref ,
1016+ PGS : ' static + Deref < Target = P2PGossipSync < G , UL , L > > ,
1017+ RGS : ' static + Deref < Target = RapidGossipSync < G , L > > ,
1018+ PM : ' static + Deref ,
1019+ LM : ' static + Deref ,
1020+ D : ' static + Deref ,
1021+ O : ' static + Deref ,
1022+ K : ' static + Deref ,
1023+ OS : ' static + Deref < Target = OutputSweeper < T , D , F , CF , K , L , O > > ,
1024+ S : ' static + Deref < Target = SC > + Send + Sync ,
1025+ SC : for < ' b > WriteableScore < ' b > ,
1026+ SleepFuture : core:: future:: Future < Output = bool > + core:: marker:: Unpin ,
1027+ Sleeper : Fn ( Duration ) -> SleepFuture ,
1028+ FetchTime : Fn ( ) -> Option < Duration > ,
1029+ > (
1030+ persister : PS , event_handler : EventHandler , chain_monitor : M , channel_manager : CM ,
1031+ onion_messenger : Option < OM > , gossip_sync : GossipSync < PGS , RGS , G , UL , L > , peer_manager : PM ,
1032+ liquidity_manager : Option < LM > , sweeper : Option < OS > , logger : L , scorer : Option < S > ,
1033+ sleeper : Sleeper , mobile_interruptable_platform : bool , fetch_time : FetchTime ,
1034+ ) -> Result < ( ) , lightning:: io:: Error >
1035+ where
1036+ UL :: Target : ' static + UtxoLookup ,
1037+ CF :: Target : ' static + chain:: Filter ,
1038+ T :: Target : ' static + BroadcasterInterface ,
1039+ F :: Target : ' static + FeeEstimator ,
1040+ L :: Target : ' static + Logger ,
1041+ P :: Target : ' static + Persist < <CM :: Target as AChannelManager >:: Signer > ,
1042+ PS :: Target : ' static + PersisterSync < ' static , CM , L , S > ,
1043+ ES :: Target : ' static + EntropySource ,
1044+ CM :: Target : AChannelManager ,
1045+ OM :: Target : AOnionMessenger ,
1046+ PM :: Target : APeerManager ,
1047+ LM :: Target : ALiquidityManager ,
1048+ O :: Target : ' static + OutputSpender ,
1049+ D :: Target : ' static + ChangeDestinationSource ,
1050+ K :: Target : ' static + KVStoreSync ,
1051+ {
1052+ let persister = PersisterSyncWrapper :: < ' static , PS , CM , L , S > :: new ( persister) ;
1053+ process_events_async (
1054+ persister,
1055+ event_handler,
1056+ chain_monitor,
1057+ channel_manager,
1058+ onion_messenger,
1059+ gossip_sync,
1060+ peer_manager,
1061+ liquidity_manager,
1062+ sweeper,
1063+ logger,
1064+ scorer,
1065+ sleeper,
1066+ mobile_interruptable_platform,
1067+ fetch_time,
9221068 )
1069+ . await
9231070}
9241071
9251072#[ cfg( feature = "std" ) ]
@@ -1098,6 +1245,7 @@ impl BackgroundProcessor {
10981245 . expect( "Time should be sometime after 1970" ) ,
10991246 )
11001247 } ,
1248+ false ,
11011249 )
11021250 } ) ;
11031251 Self { stop_thread : stop_thread_clone, thread_handle : Some ( handle) }
@@ -2111,7 +2259,7 @@ mod tests {
21112259 PersisterSync :: new ( data_dir) . with_manager_error ( std:: io:: ErrorKind :: Other , "test" ) ,
21122260 ) ;
21132261
2114- let bp_future = super :: process_events_async (
2262+ let bp_future = super :: process_events_partial_async (
21152263 persister,
21162264 |_: _ | async { Ok ( ( ) ) } ,
21172265 Arc :: clone ( & nodes[ 0 ] . chain_monitor ) ,
@@ -2622,7 +2770,7 @@ mod tests {
26222770 Arc :: new ( PersisterSync :: new ( data_dir) . with_graph_persistence_notifier ( sender) ) ;
26232771
26242772 let ( exit_sender, exit_receiver) = tokio:: sync:: watch:: channel ( ( ) ) ;
2625- let bp_future = super :: process_events_async (
2773+ let bp_future = super :: process_events_partial_async (
26262774 persister,
26272775 |_: _ | async { Ok ( ( ) ) } ,
26282776 Arc :: clone ( & nodes[ 0 ] . chain_monitor ) ,
@@ -2839,7 +2987,7 @@ mod tests {
28392987
28402988 let ( exit_sender, exit_receiver) = tokio:: sync:: watch:: channel ( ( ) ) ;
28412989
2842- let bp_future = super :: process_events_async (
2990+ let bp_future = super :: process_events_partial_async (
28432991 persister,
28442992 event_handler,
28452993 Arc :: clone ( & nodes[ 0 ] . chain_monitor ) ,
0 commit comments