@@ -64,6 +64,9 @@ use std::time::Instant;
6464#[ cfg( not( feature = "std" ) ) ]
6565use alloc:: boxed:: Box ;
6666
67+ #[ cfg( feature = "std" ) ]
68+ use std:: marker:: PhantomData ;
69+
6770/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
6871/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
6972/// responsibilities are:
@@ -1135,6 +1138,193 @@ impl BackgroundProcessor {
11351138 None => Ok ( ( ) ) ,
11361139 }
11371140 }
1141+
1142+ /// Creates a new [`BackgroundProcessorBuilder`] to construct a [`BackgroundProcessor`] with optional components.
1143+ pub fn builder <
1144+ ' a ,
1145+ UL : ' static + Deref + Send + Sync ,
1146+ CF : ' static + Deref + Send + Sync ,
1147+ T : ' static + Deref + Send + Sync ,
1148+ F : ' static + Deref + Send + Sync ,
1149+ G : ' static + Deref < Target = NetworkGraph < L > > + Send + Sync ,
1150+ L : ' static + Deref + Send + Sync ,
1151+ P : ' static + Deref + Send + Sync ,
1152+ EH : ' static + EventHandler + Send ,
1153+ PS : ' static + Deref + Send ,
1154+ M : ' static
1155+ + Deref < Target = ChainMonitor < <CM :: Target as AChannelManager >:: Signer , CF , T , F , L , P > >
1156+ + Send
1157+ + Sync ,
1158+ CM : ' static + Deref + Send + Sync ,
1159+ OM : ' static + Deref + Send + Sync ,
1160+ PGS : ' static + Deref < Target = P2PGossipSync < G , UL , L > > + Send + Sync ,
1161+ RGS : ' static + Deref < Target = RapidGossipSync < G , L > > + Send ,
1162+ PM : ' static + Deref + Send + Sync ,
1163+ S : ' static + Deref < Target = SC > + Send + Sync ,
1164+ SC : for < ' b > WriteableScore < ' b > ,
1165+ > (
1166+ persister : PS , event_handler : EH , chain_monitor : M , channel_manager : CM ,
1167+ gossip_sync : GossipSync < PGS , RGS , G , UL , L > , peer_manager : PM , logger : L ,
1168+ ) -> BackgroundProcessorBuilder < ' a , UL , CF , T , F , G , L , P , EH , PS , M , CM , OM , PGS , RGS , PM , S , SC >
1169+ where
1170+ UL :: Target : ' static + UtxoLookup ,
1171+ CF :: Target : ' static + chain:: Filter ,
1172+ T :: Target : ' static + BroadcasterInterface ,
1173+ F :: Target : ' static + FeeEstimator ,
1174+ L :: Target : ' static + Logger ,
1175+ P :: Target : ' static + Persist < <CM :: Target as AChannelManager >:: Signer > ,
1176+ PS :: Target : ' static + Persister < ' a , CM , L , S > ,
1177+ CM :: Target : AChannelManager + Send + Sync ,
1178+ OM :: Target : AOnionMessenger + Send + Sync ,
1179+ PM :: Target : APeerManager + Send + Sync ,
1180+ {
1181+ BackgroundProcessorBuilder :: new (
1182+ persister,
1183+ event_handler,
1184+ chain_monitor,
1185+ channel_manager,
1186+ gossip_sync,
1187+ peer_manager,
1188+ logger,
1189+ )
1190+ }
1191+ }
1192+
1193+ /// A builder for constructing a [`BackgroundProcessor`] with optional components.
1194+ ///
1195+ /// This builder provides a flexible and type-safe way to construct a [`BackgroundProcessor`]
1196+ /// with optional components like `onion_messenger` and `scorer`. It helps avoid specifying
1197+ /// concrete types for components that aren't being used.
1198+ ///
1199+ /// Use [`BackgroundProcessor::builder`] to create a new builder instance.
1200+ #[ cfg( feature = "std" ) ]
1201+ pub struct BackgroundProcessorBuilder <
1202+ ' a ,
1203+ UL : ' static + Deref + Send + Sync ,
1204+ CF : ' static + Deref + Send + Sync ,
1205+ T : ' static + Deref + Send + Sync ,
1206+ F : ' static + Deref + Send + Sync ,
1207+ G : ' static + Deref < Target = NetworkGraph < L > > + Send + Sync ,
1208+ L : ' static + Deref + Send + Sync ,
1209+ P : ' static + Deref + Send + Sync ,
1210+ EH : ' static + EventHandler + Send ,
1211+ PS : ' static + Deref + Send ,
1212+ M : ' static
1213+ + Deref < Target = ChainMonitor < <CM :: Target as AChannelManager >:: Signer , CF , T , F , L , P > >
1214+ + Send
1215+ + Sync ,
1216+ CM : ' static + Deref + Send + Sync ,
1217+ OM : ' static + Deref + Send + Sync ,
1218+ PGS : ' static + Deref < Target = P2PGossipSync < G , UL , L > > + Send + Sync ,
1219+ RGS : ' static + Deref < Target = RapidGossipSync < G , L > > + Send ,
1220+ PM : ' static + Deref + Send + Sync ,
1221+ S : ' static + Deref < Target = SC > + Send + Sync ,
1222+ SC : for < ' b > WriteableScore < ' b > ,
1223+ > where
1224+ UL :: Target : ' static + UtxoLookup ,
1225+ CF :: Target : ' static + chain:: Filter ,
1226+ T :: Target : ' static + BroadcasterInterface ,
1227+ F :: Target : ' static + FeeEstimator ,
1228+ L :: Target : ' static + Logger ,
1229+ P :: Target : ' static + Persist < <CM :: Target as AChannelManager >:: Signer > ,
1230+ PS :: Target : ' static + Persister < ' a , CM , L , S > ,
1231+ CM :: Target : AChannelManager + Send + Sync ,
1232+ OM :: Target : AOnionMessenger + Send + Sync ,
1233+ PM :: Target : APeerManager + Send + Sync ,
1234+ {
1235+ persister : PS ,
1236+ event_handler : EH ,
1237+ chain_monitor : M ,
1238+ channel_manager : CM ,
1239+ onion_messenger : Option < OM > ,
1240+ gossip_sync : GossipSync < PGS , RGS , G , UL , L > ,
1241+ peer_manager : PM ,
1242+ logger : L ,
1243+ scorer : Option < S > ,
1244+ _phantom : PhantomData < ( & ' a ( ) , CF , T , F , P ) > ,
1245+ }
1246+
1247+ #[ cfg( feature = "std" ) ]
1248+ impl <
1249+ ' a ,
1250+ UL : ' static + Deref + Send + Sync ,
1251+ CF : ' static + Deref + Send + Sync ,
1252+ T : ' static + Deref + Send + Sync ,
1253+ F : ' static + Deref + Send + Sync ,
1254+ G : ' static + Deref < Target = NetworkGraph < L > > + Send + Sync ,
1255+ L : ' static + Deref + Send + Sync ,
1256+ P : ' static + Deref + Send + Sync ,
1257+ EH : ' static + EventHandler + Send ,
1258+ PS : ' static + Deref + Send ,
1259+ M : ' static
1260+ + Deref < Target = ChainMonitor < <CM :: Target as AChannelManager >:: Signer , CF , T , F , L , P > >
1261+ + Send
1262+ + Sync ,
1263+ CM : ' static + Deref + Send + Sync ,
1264+ OM : ' static + Deref + Send + Sync ,
1265+ PGS : ' static + Deref < Target = P2PGossipSync < G , UL , L > > + Send + Sync ,
1266+ RGS : ' static + Deref < Target = RapidGossipSync < G , L > > + Send ,
1267+ PM : ' static + Deref + Send + Sync ,
1268+ S : ' static + Deref < Target = SC > + Send + Sync ,
1269+ SC : for < ' b > WriteableScore < ' b > ,
1270+ > BackgroundProcessorBuilder < ' a , UL , CF , T , F , G , L , P , EH , PS , M , CM , OM , PGS , RGS , PM , S , SC >
1271+ where
1272+ UL :: Target : ' static + UtxoLookup ,
1273+ CF :: Target : ' static + chain:: Filter ,
1274+ T :: Target : ' static + BroadcasterInterface ,
1275+ F :: Target : ' static + FeeEstimator ,
1276+ L :: Target : ' static + Logger ,
1277+ P :: Target : ' static + Persist < <CM :: Target as AChannelManager >:: Signer > ,
1278+ PS :: Target : ' static + Persister < ' a , CM , L , S > ,
1279+ CM :: Target : AChannelManager + Send + Sync ,
1280+ OM :: Target : AOnionMessenger + Send + Sync ,
1281+ PM :: Target : APeerManager + Send + Sync ,
1282+ {
1283+ /// Creates a new builder instance. This is an internal method - use [`BackgroundProcessor::builder`] instead.
1284+ pub fn new (
1285+ persister : PS , event_handler : EH , chain_monitor : M , channel_manager : CM ,
1286+ gossip_sync : GossipSync < PGS , RGS , G , UL , L > , peer_manager : PM , logger : L ,
1287+ ) -> Self {
1288+ Self {
1289+ persister,
1290+ event_handler,
1291+ chain_monitor,
1292+ channel_manager,
1293+ onion_messenger : None ,
1294+ gossip_sync,
1295+ peer_manager,
1296+ logger,
1297+ scorer : None ,
1298+ _phantom : PhantomData ,
1299+ }
1300+ }
1301+
1302+ /// Sets the optional onion messenger component.
1303+ pub fn with_onion_messenger ( & mut self , onion_messenger : OM ) -> & mut Self {
1304+ self . onion_messenger = Some ( onion_messenger) ;
1305+ self
1306+ }
1307+
1308+ /// Sets the optional scorer component.
1309+ pub fn with_scorer ( & mut self , scorer : S ) -> & mut Self {
1310+ self . scorer = Some ( scorer) ;
1311+ self
1312+ }
1313+
1314+ /// Builds and starts the [`BackgroundProcessor`].
1315+ pub fn build ( self ) -> BackgroundProcessor {
1316+ BackgroundProcessor :: start (
1317+ self . persister ,
1318+ self . event_handler ,
1319+ self . chain_monitor ,
1320+ self . channel_manager ,
1321+ self . onion_messenger ,
1322+ self . gossip_sync ,
1323+ self . peer_manager ,
1324+ self . logger ,
1325+ self . scorer ,
1326+ )
1327+ }
11381328}
11391329
11401330#[ cfg( feature = "std" ) ]
@@ -2861,4 +3051,103 @@ mod tests {
28613051 r1. unwrap ( ) . unwrap ( ) ;
28623052 r2. unwrap ( )
28633053 }
3054+
3055+ #[ test]
3056+ fn test_background_processor_builder ( ) {
3057+ // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
3058+ // updates. Also test that when new updates are available, the manager signals that it needs
3059+ // re-persistence and is successfully re-persisted.
3060+ let ( persist_dir, nodes) = create_nodes ( 2 , "test_background_processor_builder" ) ;
3061+
3062+ // Go through the channel creation process so that each node has something to persist. Since
3063+ // open_channel consumes events, it must complete before starting BackgroundProcessor to
3064+ // avoid a race with processing events.
3065+ let tx = open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , 100000 ) ;
3066+
3067+ // Initiate the background processors to watch each node.
3068+ let data_dir = nodes[ 0 ] . kv_store . get_data_dir ( ) ;
3069+ let persister = Arc :: new ( Persister :: new ( data_dir) ) ;
3070+ let event_handler = |_: _ | Ok ( ( ) ) ;
3071+ let bg_processor = BackgroundProcessor :: builder (
3072+ persister,
3073+ event_handler,
3074+ nodes[ 0 ] . chain_monitor . clone ( ) ,
3075+ nodes[ 0 ] . node . clone ( ) ,
3076+ nodes[ 0 ] . p2p_gossip_sync ( ) ,
3077+ nodes[ 0 ] . peer_manager . clone ( ) ,
3078+ nodes[ 0 ] . logger . clone ( ) ,
3079+ )
3080+ . with_onion_messenger ( nodes[ 0 ] . messenger . clone ( ) )
3081+ . with_scorer ( nodes[ 0 ] . scorer . clone ( ) )
3082+ . build ( ) ;
3083+
3084+ macro_rules! check_persisted_data {
3085+ ( $node: expr, $filepath: expr) => {
3086+ let mut expected_bytes = Vec :: new( ) ;
3087+ loop {
3088+ expected_bytes. clear( ) ;
3089+ match $node. write( & mut expected_bytes) {
3090+ Ok ( ( ) ) => match std:: fs:: read( $filepath) {
3091+ Ok ( bytes) => {
3092+ if bytes == expected_bytes {
3093+ break ;
3094+ } else {
3095+ continue ;
3096+ }
3097+ } ,
3098+ Err ( _) => continue ,
3099+ } ,
3100+ Err ( e) => panic!( "Unexpected error: {}" , e) ,
3101+ }
3102+ }
3103+ } ;
3104+ }
3105+
3106+ // Check that the initial data is persisted as expected
3107+ let filepath =
3108+ get_full_filepath ( format ! ( "{}_persister_0" , & persist_dir) , "manager" . to_string ( ) ) ;
3109+ check_persisted_data ! ( nodes[ 0 ] . node, filepath. clone( ) ) ;
3110+
3111+ loop {
3112+ if !nodes[ 0 ] . node . get_event_or_persist_condvar_value ( ) {
3113+ break ;
3114+ }
3115+ }
3116+
3117+ // Force-close the channel.
3118+ let error_message = "Channel force-closed" ;
3119+ nodes[ 0 ]
3120+ . node
3121+ . force_close_broadcasting_latest_txn (
3122+ & ChannelId :: v1_from_funding_outpoint ( OutPoint {
3123+ txid : tx. compute_txid ( ) ,
3124+ index : 0 ,
3125+ } ) ,
3126+ & nodes[ 1 ] . node . get_our_node_id ( ) ,
3127+ error_message. to_string ( ) ,
3128+ )
3129+ . unwrap ( ) ;
3130+
3131+ // Check that the force-close updates are persisted
3132+ check_persisted_data ! ( nodes[ 0 ] . node, manager_path. clone( ) ) ;
3133+ loop {
3134+ if !nodes[ 0 ] . node . get_event_or_persist_condvar_value ( ) {
3135+ break ;
3136+ }
3137+ }
3138+
3139+ // Check network graph is persisted
3140+ let filepath =
3141+ get_full_filepath ( format ! ( "{}_persister_0" , & persist_dir) , "network_graph" . to_string ( ) ) ;
3142+ check_persisted_data ! ( nodes[ 0 ] . network_graph, filepath) ;
3143+
3144+ // Check scorer is persisted
3145+ let filepath =
3146+ get_full_filepath ( format ! ( "{}_persister_0" , & persist_dir) , "scorer" . to_string ( ) ) ;
3147+ check_persisted_data ! ( nodes[ 0 ] . scorer, filepath) ;
3148+
3149+ if !std:: thread:: panicking ( ) {
3150+ bg_processor. stop ( ) . unwrap ( ) ;
3151+ }
3152+ }
28643153}
0 commit comments