@@ -273,9 +273,9 @@ macro_rules! define_run_body {
273273	( 
274274		$persister:  ident,  $chain_monitor:  ident,  $process_chain_monitor_events:  expr, 
275275		$channel_manager:  ident,  $process_channel_manager_events:  expr, 
276- 		$peer_manager:  ident,  $gossip_sync :  ident ,  $logger :  ident ,  $scorer :  ident, 
277- 		$loop_exit_check :  expr ,  $await :   expr,  $get_timer :  expr,  $timer_elapsed :  expr, 
278- 		$check_slow_await:  expr
276+ 		$peer_manager:  ident,  $process_onion_message_handler_events :  expr ,  $gossip_sync :  ident, 
277+ 		$logger :  ident ,  $scorer :  ident ,  $loop_exit_check :   expr,  $await :  expr,  $get_timer :  expr, 
278+ 		$timer_elapsed :  expr ,  $ check_slow_await:  expr
279279	)  => {  { 
280280		log_trace!( $logger,  "Calling ChannelManager's timer_tick_occurred on startup" ) ; 
281281		$channel_manager. timer_tick_occurred( ) ; 
@@ -292,6 +292,7 @@ macro_rules! define_run_body {
292292		loop  { 
293293			$process_channel_manager_events; 
294294			$process_chain_monitor_events; 
295+ 			$process_onion_message_handler_events; 
295296
296297			// Note that the PeerManager::process_events may block on ChannelManager's locks, 
297298			// hence it comes last here. When the ChannelManager finishes whatever it's doing, 
@@ -655,7 +656,8 @@ where
655656		persister,  chain_monitor, 
656657		chain_monitor. process_pending_events_async( async_event_handler) . await , 
657658		channel_manager,  channel_manager. process_pending_events_async( async_event_handler) . await , 
658- 		peer_manager,  gossip_sync,  logger,  scorer,  should_break,  { 
659+ 		peer_manager,  process_onion_message_handler_events_async( & peer_manager,  async_event_handler) . await , 
660+ 		gossip_sync,  logger,  scorer,  should_break,  { 
659661			let  fut = Selector  { 
660662				a:  channel_manager. get_event_or_persistence_needed_future( ) , 
661663				b:  chain_monitor. get_update_future( ) , 
@@ -679,6 +681,25 @@ where
679681	) 
680682} 
681683
684+ #[ cfg( feature = "futures" ) ]  
685+ async  fn  process_onion_message_handler_events_async < 
686+ 	EventHandlerFuture :  core:: future:: Future < Output  = ( ) > , 
687+ 	EventHandler :  Fn ( Event )  -> EventHandlerFuture , 
688+ 	PM :  ' static  + Deref  + Send  + Sync , 
689+ > ( 
690+ 	peer_manager :  & PM ,  handler :  EventHandler 
691+ ) 
692+ where 
693+ 	PM :: Target :  APeerManager  + Send  + Sync , 
694+ { 
695+ 	let  events = core:: cell:: RefCell :: new ( Vec :: new ( ) ) ; 
696+ 	peer_manager. onion_message_handler ( ) . process_pending_events ( & |e| events. borrow_mut ( ) . push ( e) ) ; 
697+ 	for  event in  events. into_inner ( )  { 
698+ 		handler ( event) . await 
699+ 	} 
700+ } 
701+ 
702+ 
682703#[ cfg( feature = "std" ) ]  
683704impl  BackgroundProcessor  { 
684705	/// Start a background thread that takes care of responsibilities enumerated in the [top-level 
@@ -788,7 +809,9 @@ impl BackgroundProcessor {
788809			define_run_body ! ( 
789810				persister,  chain_monitor,  chain_monitor. process_pending_events( & event_handler) , 
790811				channel_manager,  channel_manager. process_pending_events( & event_handler) , 
791- 				peer_manager,  gossip_sync,  logger,  scorer,  stop_thread. load( Ordering :: Acquire ) , 
812+ 				peer_manager, 
813+ 				peer_manager. onion_message_handler( ) . process_pending_events( & event_handler) , 
814+ 				gossip_sync,  logger,  scorer,  stop_thread. load( Ordering :: Acquire ) , 
792815				{  Sleeper :: from_two_futures( 
793816					channel_manager. get_event_or_persistence_needed_future( ) , 
794817					chain_monitor. get_update_future( ) 
0 commit comments