@@ -826,11 +826,11 @@ where
826826{
827827 let mut should_break = false ;
828828 let async_event_handler = |event| {
829- let network_graph = gossip_sync. network_graph ( ) ;
830- let event_handler = & event_handler;
831- let scorer = & scorer;
832- let logger = & logger;
833- let persister = & persister;
829+ let network_graph = config . gossip_sync . network_graph ( ) ;
830+ let event_handler = & config . event_handler ;
831+ let scorer = & config . scorer ;
832+ let logger = & config . logger ;
833+ let persister = & config . persister ;
834834 let fetch_time = & fetch_time;
835835 // We should be able to drop the Box once our MSRV is 1.68
836836 Box :: pin ( async move {
@@ -855,13 +855,13 @@ where
855855 } )
856856 } ;
857857 define_run_body ! (
858- persister,
859- chain_monitor,
860- chain_monitor. process_pending_events_async( async_event_handler) . await ,
861- channel_manager,
862- channel_manager. get_cm( ) . process_pending_events_async( async_event_handler) . await ,
863- onion_messenger,
864- if let Some ( om) = & onion_messenger {
858+ config . persister,
859+ config . chain_monitor,
860+ config . chain_monitor. process_pending_events_async( async_event_handler) . await ,
861+ config . channel_manager,
862+ config . channel_manager. get_cm( ) . process_pending_events_async( async_event_handler) . await ,
863+ config . onion_messenger,
864+ if let Some ( om) = & config . onion_messenger {
865865 om. get_om( ) . process_pending_events_async( async_event_handler) . await
866866 } ,
867867 peer_manager,
@@ -875,7 +875,7 @@ where
875875 scorer,
876876 should_break,
877877 {
878- let om_fut = if let Some ( om) = onion_messenger. as_ref( ) {
878+ let om_fut = if let Some ( om) = config . onion_messenger. as_ref( ) {
879879 let fut = om. get_om( ) . get_update_future( ) ;
880880 OptionalSelector { optional_future: Some ( fut) }
881881 } else {
@@ -888,8 +888,8 @@ where
888888 OptionalSelector { optional_future: None }
889889 } ;
890890 let fut = Selector {
891- a: channel_manager. get_cm( ) . get_event_or_persistence_needed_future( ) ,
892- b: chain_monitor. get_update_future( ) ,
891+ a: config . channel_manager. get_cm( ) . get_event_or_persistence_needed_future( ) ,
892+ b: config . chain_monitor. get_update_future( ) ,
893893 c: om_fut,
894894 d: lm_fut,
895895 e: sleeper( if mobile_interruptable_platform {
@@ -1099,6 +1099,98 @@ impl BackgroundProcessor {
10991099 Self { stop_thread : stop_thread_clone, thread_handle : Some ( handle) }
11001100 }
11011101
1102+ /// Creates a new [`BackgroundProcessor`] from a [`BackgroundProcessorConfig`].
1103+ /// This provides a more structured approach to configuration. The processor will start processing events immediately upon creation.
1104+ ///
1105+ /// This method is functionally equivalent to [`BackgroundProcessor::start`], but takes a configuration
1106+ /// object instead of individual parameters.
1107+ ///
1108+ /// # Example
1109+ /// ```
1110+ /// # use lightning_background_processor::*;
1111+ /// let config = BackgroundProcessorBuilder::new(
1112+ /// persister,
1113+ /// event_handler,
1114+ /// chain_monitor,
1115+ /// channel_manager,
1116+ /// gossip_sync,
1117+ /// peer_manager,
1118+ /// logger
1119+ /// )
1120+ /// .with_onion_messenger(messenger)
1121+ /// .with_scorer(scorer)
1122+ /// .build();
1123+ /// let bg_processor = BackgroundProcessor::from_config(config);
1124+ /// ```
1125+ pub fn from_config <
1126+ ' a ,
1127+ UL : ' static + Deref + Send + Sync ,
1128+ CF : ' static + Deref + Send + Sync ,
1129+ T : ' static + Deref + Send + Sync ,
1130+ F : ' static + Deref + Send + Sync ,
1131+ G : ' static + Deref < Target = NetworkGraph < L > > + Send + Sync ,
1132+ L : ' static + Deref + Send + Sync ,
1133+ P : ' static + Deref + Send + Sync ,
1134+ EH : ' static + EventHandler + Send ,
1135+ PS : ' static + Deref + Send ,
1136+ M : ' static
1137+ + Deref < Target = ChainMonitor < <CM :: Target as AChannelManager >:: Signer , CF , T , F , L , P > >
1138+ + Send
1139+ + Sync ,
1140+ CM : ' static + Deref + Send + Sync ,
1141+ OM : ' static + Deref + Send + Sync ,
1142+ PGS : ' static + Deref < Target = P2PGossipSync < G , UL , L > > + Send + Sync ,
1143+ RGS : ' static + Deref < Target = RapidGossipSync < G , L > > + Send ,
1144+ PM : ' static + Deref + Send + Sync ,
1145+ S : ' static + Deref < Target = SC > + Send + Sync ,
1146+ SC : for < ' b > WriteableScore < ' b > ,
1147+ > (
1148+ config : BackgroundProcessorConfig <
1149+ ' a ,
1150+ UL ,
1151+ CF ,
1152+ T ,
1153+ F ,
1154+ G ,
1155+ L ,
1156+ P ,
1157+ EH ,
1158+ PS ,
1159+ M ,
1160+ CM ,
1161+ OM ,
1162+ PGS ,
1163+ RGS ,
1164+ PM ,
1165+ S ,
1166+ SC ,
1167+ > ,
1168+ ) -> Self
1169+ where
1170+ UL :: Target : ' static + UtxoLookup ,
1171+ CF :: Target : ' static + chain:: Filter ,
1172+ T :: Target : ' static + BroadcasterInterface ,
1173+ F :: Target : ' static + FeeEstimator ,
1174+ L :: Target : ' static + Logger ,
1175+ P :: Target : ' static + Persist < <CM :: Target as AChannelManager >:: Signer > ,
1176+ PS :: Target : ' static + Persister < ' a , CM , L , S > ,
1177+ CM :: Target : AChannelManager + Send + Sync ,
1178+ OM :: Target : AOnionMessenger + Send + Sync ,
1179+ PM :: Target : APeerManager + Send + Sync ,
1180+ {
1181+ Self :: start (
1182+ config. persister ,
1183+ config. event_handler ,
1184+ config. chain_monitor ,
1185+ config. channel_manager ,
1186+ config. onion_messenger ,
1187+ config. gossip_sync ,
1188+ config. peer_manager ,
1189+ config. logger ,
1190+ config. scorer ,
1191+ )
1192+ }
1193+
11021194 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
11031195 /// [`ChannelManager`].
11041196 ///
@@ -1138,7 +1230,102 @@ impl BackgroundProcessor {
11381230 None => Ok ( ( ) ) ,
11391231 }
11401232 }
1233+ }
11411234
1235+ /// Configuration for synchronous [`BackgroundProcessor`]
1236+ #[ cfg_attr( feature = "futures" , doc = "and asynchronous [`process_events_async`]" ) ]
1237+ /// event processing.
1238+ ///
1239+ /// This configuration holds all components needed for background processing,
1240+ /// including required components (like the channel manager and peer manager) and optional
1241+ /// components (like the onion messenger and scorer).
1242+ ///
1243+ /// The configuration can be constructed using [`BackgroundProcessorBuilder`], which provides
1244+ /// a convenient builder pattern for setting up both required and optional components.
1245+ ///
1246+ /// This same configuration can be used for
1247+ #[ cfg_attr(
1248+ not( feature = "futures" ) ,
1249+ doc = "creating a [`BackgroundProcessor`] via [`BackgroundProcessor::from_config`]."
1250+ ) ]
1251+ #[ cfg_attr(
1252+ feature = "futures" ,
1253+ doc = "both:
1254+ /// * Creating a [`BackgroundProcessor`] via [`BackgroundProcessor::from_config`]
1255+ /// * Running the async variant of the background processor via [`process_events_async`]"
1256+ ) ]
1257+ ///
1258+ /// # Example
1259+ /// ```
1260+ /// # use lightning_background_processor::*;
1261+ /// let config = BackgroundProcessorBuilder::new(
1262+ /// persister,
1263+ /// event_handler,
1264+ /// chain_monitor,
1265+ /// channel_manager,
1266+ /// gossip_sync,
1267+ /// peer_manager,
1268+ /// logger
1269+ /// )
1270+ /// .with_onion_messenger(messenger) // Optional
1271+ /// .with_scorer(scorer) // Optional
1272+ /// .build();
1273+ ///
1274+ /// // Use with BackgroundProcessor
1275+ /// let processor = BackgroundProcessor::from_config(config);
1276+ ///
1277+ #[ cfg_attr(
1278+ feature = "futures" ,
1279+ doc = "
1280+ /// // Or use with async processing
1281+ /// process_events_async(config, sleeper, mobile_interruptable_platform, fetch_time).await?;"
1282+ ) ]
1283+ /// ```
1284+ #[ cfg( feature = "std" ) ]
1285+ pub struct BackgroundProcessorConfig <
1286+ ' a ,
1287+ UL : ' static + Deref + Send + Sync ,
1288+ CF : ' static + Deref + Send + Sync ,
1289+ T : ' static + Deref + Send + Sync ,
1290+ F : ' static + Deref + Send + Sync ,
1291+ G : ' static + Deref < Target = NetworkGraph < L > > + Send + Sync ,
1292+ L : ' static + Deref + Send + Sync ,
1293+ P : ' static + Deref + Send + Sync ,
1294+ EH : ' static + EventHandler + Send ,
1295+ PS : ' static + Deref + Send ,
1296+ M : ' static
1297+ + Deref < Target = ChainMonitor < <CM :: Target as AChannelManager >:: Signer , CF , T , F , L , P > >
1298+ + Send
1299+ + Sync ,
1300+ CM : ' static + Deref + Send + Sync ,
1301+ OM : ' static + Deref + Send + Sync ,
1302+ PGS : ' static + Deref < Target = P2PGossipSync < G , UL , L > > + Send + Sync ,
1303+ RGS : ' static + Deref < Target = RapidGossipSync < G , L > > + Send ,
1304+ PM : ' static + Deref + Send + Sync ,
1305+ S : ' static + Deref < Target = SC > + Send + Sync ,
1306+ SC : for < ' b > WriteableScore < ' b > ,
1307+ > where
1308+ UL :: Target : ' static + UtxoLookup ,
1309+ CF :: Target : ' static + chain:: Filter ,
1310+ T :: Target : ' static + BroadcasterInterface ,
1311+ F :: Target : ' static + FeeEstimator ,
1312+ L :: Target : ' static + Logger ,
1313+ P :: Target : ' static + Persist < <CM :: Target as AChannelManager >:: Signer > ,
1314+ PS :: Target : ' static + Persister < ' a , CM , L , S > ,
1315+ CM :: Target : AChannelManager + Send + Sync ,
1316+ OM :: Target : AOnionMessenger + Send + Sync ,
1317+ PM :: Target : APeerManager + Send + Sync ,
1318+ {
1319+ persister : PS ,
1320+ event_handler : EH ,
1321+ chain_monitor : M ,
1322+ channel_manager : CM ,
1323+ onion_messenger : Option < OM > ,
1324+ gossip_sync : GossipSync < PGS , RGS , G , UL , L > ,
1325+ peer_manager : PM ,
1326+ logger : L ,
1327+ scorer : Option < S > ,
1328+ _phantom : PhantomData < ( & ' a ( ) , CF , T , F , P ) > ,
11421329}
11431330
11441331/// A builder for constructing a [`BackgroundProcessor`] with optional components.
@@ -1260,19 +1447,23 @@ where
12601447 self
12611448 }
12621449
1263- /// Builds and starts the [`BackgroundProcessor`].
1264- pub fn build ( self ) -> BackgroundProcessor {
1265- BackgroundProcessor :: start (
1266- self . persister ,
1267- self . event_handler ,
1268- self . chain_monitor ,
1269- self . channel_manager ,
1270- self . onion_messenger ,
1271- self . gossip_sync ,
1272- self . peer_manager ,
1273- self . logger ,
1274- self . scorer ,
1275- )
1450+ /// Builds and returns a [`BackgroundProcessorConfig`] object.
1451+ pub fn build (
1452+ self ,
1453+ ) -> BackgroundProcessorConfig < ' a , UL , CF , T , F , G , L , P , EH , PS , M , CM , OM , PGS , RGS , PM , S , SC >
1454+ {
1455+ BackgroundProcessorConfig {
1456+ persister : self . persister ,
1457+ event_handler : self . event_handler ,
1458+ chain_monitor : self . chain_monitor ,
1459+ channel_manager : self . channel_manager ,
1460+ onion_messenger : self . onion_messenger ,
1461+ gossip_sync : self . gossip_sync ,
1462+ peer_manager : self . peer_manager ,
1463+ logger : self . logger ,
1464+ scorer : self . scorer ,
1465+ _phantom : PhantomData ,
1466+ }
12761467 }
12771468}
12781469
@@ -2237,18 +2428,23 @@ mod tests {
22372428 Persister :: new ( data_dir) . with_manager_error ( std:: io:: ErrorKind :: Other , "test" ) ,
22382429 ) ;
22392430
2240- let bp_future = super :: process_events_async (
2431+ let config = BackgroundProcessorBuilder :: new (
22412432 persister,
22422433 |_: _ | async { Ok ( ( ) ) } ,
22432434 nodes[ 0 ] . chain_monitor . clone ( ) ,
22442435 nodes[ 0 ] . node . clone ( ) ,
2245- Some ( nodes[ 0 ] . messenger . clone ( ) ) ,
22462436 nodes[ 0 ] . rapid_gossip_sync ( ) ,
22472437 nodes[ 0 ] . peer_manager . clone ( ) ,
22482438 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
22492439 Some ( nodes[ 0 ] . sweeper . sweeper_async ( ) ) ,
22502440 nodes[ 0 ] . logger . clone ( ) ,
2251- Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
2441+ )
2442+ . with_onion_messenger ( nodes[ 0 ] . messenger . clone ( ) )
2443+ . with_scorer ( nodes[ 0 ] . scorer . clone ( ) )
2444+ . build ( ) ;
2445+
2446+ let bp_future = super :: process_events_async (
2447+ config,
22522448 move |dur : Duration | {
22532449 Box :: pin ( async move {
22542450 tokio:: time:: sleep ( dur) . await ;
@@ -2743,19 +2939,24 @@ mod tests {
27432939 let data_dir = nodes[ 0 ] . kv_store . get_data_dir ( ) ;
27442940 let persister = Arc :: new ( Persister :: new ( data_dir) . with_graph_persistence_notifier ( sender) ) ;
27452941
2746- let ( exit_sender, exit_receiver) = tokio:: sync:: watch:: channel ( ( ) ) ;
2747- let bp_future = super :: process_events_async (
2942+ let config = BackgroundProcessorBuilder :: new (
27482943 persister,
27492944 |_: _ | async { Ok ( ( ) ) } ,
27502945 nodes[ 0 ] . chain_monitor . clone ( ) ,
27512946 nodes[ 0 ] . node . clone ( ) ,
2752- Some ( nodes[ 0 ] . messenger . clone ( ) ) ,
27532947 nodes[ 0 ] . rapid_gossip_sync ( ) ,
27542948 nodes[ 0 ] . peer_manager . clone ( ) ,
27552949 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
27562950 Some ( nodes[ 0 ] . sweeper . sweeper_async ( ) ) ,
27572951 nodes[ 0 ] . logger . clone ( ) ,
2758- Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
2952+ )
2953+ . with_onion_messenger ( nodes[ 0 ] . messenger . clone ( ) )
2954+ . with_scorer ( nodes[ 0 ] . scorer . clone ( ) )
2955+ . build ( ) ;
2956+
2957+ let ( exit_sender, exit_receiver) = tokio:: sync:: watch:: channel ( ( ) ) ;
2958+ let bp_future = super :: process_events_async (
2959+ config,
27592960 move |dur : Duration | {
27602961 let mut exit_receiver = exit_receiver. clone ( ) ;
27612962 Box :: pin ( async move {
@@ -2959,18 +3160,23 @@ mod tests {
29593160
29603161 let ( exit_sender, exit_receiver) = tokio:: sync:: watch:: channel ( ( ) ) ;
29613162
2962- let bp_future = super :: process_events_async (
3163+ let config = BackgroundProcessorBuilder :: new (
29633164 persister,
29643165 event_handler,
29653166 nodes[ 0 ] . chain_monitor . clone ( ) ,
29663167 nodes[ 0 ] . node . clone ( ) ,
2967- Some ( nodes[ 0 ] . messenger . clone ( ) ) ,
29683168 nodes[ 0 ] . no_gossip_sync ( ) ,
29693169 nodes[ 0 ] . peer_manager . clone ( ) ,
29703170 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
29713171 Some ( nodes[ 0 ] . sweeper . sweeper_async ( ) ) ,
29723172 nodes[ 0 ] . logger . clone ( ) ,
2973- Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
3173+ )
3174+ . with_onion_messenger ( nodes[ 0 ] . messenger . clone ( ) )
3175+ . with_scorer ( nodes[ 0 ] . scorer . clone ( ) )
3176+ . build ( ) ;
3177+
3178+ let bp_future = super :: process_events_async (
3179+ config,
29743180 move |dur : Duration | {
29753181 let mut exit_receiver = exit_receiver. clone ( ) ;
29763182 Box :: pin ( async move {
@@ -3017,7 +3223,7 @@ mod tests {
30173223 let data_dir = nodes[ 0 ] . kv_store . get_data_dir ( ) ;
30183224 let persister = Arc :: new ( Persister :: new ( data_dir) ) ;
30193225 let event_handler = |_: _ | Ok ( ( ) ) ;
3020- let bg_processor = BackgroundProcessorBuilder :: new (
3226+ let config = BackgroundProcessorBuilder :: new (
30213227 persister,
30223228 event_handler,
30233229 nodes[ 0 ] . chain_monitor . clone ( ) ,
@@ -3030,6 +3236,8 @@ mod tests {
30303236 . with_scorer ( nodes[ 0 ] . scorer . clone ( ) )
30313237 . build ( ) ;
30323238
3239+ let bg_processor = BackgroundProcessor :: from_config ( config) ;
3240+
30333241 macro_rules! check_persisted_data {
30343242 ( $node: expr, $filepath: expr) => {
30353243 let mut expected_bytes = Vec :: new( ) ;
0 commit comments