@@ -273,9 +273,9 @@ macro_rules! define_run_body {
273273 (
274274 $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
275275 $channel_manager: ident, $process_channel_manager_events: expr,
276- $peer_manager: ident, $gossip_sync : ident , $logger : ident , $scorer : ident,
277- $loop_exit_check : expr , $await : expr, $get_timer : expr, $timer_elapsed : expr,
278- $check_slow_await: expr
276+ $peer_manager: ident, $process_onion_message_handler_events : expr , $gossip_sync : ident,
277+ $logger : ident , $scorer : ident , $loop_exit_check : expr, $await : expr, $get_timer : expr,
278+ $timer_elapsed : expr , $ check_slow_await: expr
279279 ) => { {
280280 log_trace!( $logger, "Calling ChannelManager's timer_tick_occurred on startup" ) ;
281281 $channel_manager. timer_tick_occurred( ) ;
@@ -292,6 +292,7 @@ macro_rules! define_run_body {
292292 loop {
293293 $process_channel_manager_events;
294294 $process_chain_monitor_events;
295+ $process_onion_message_handler_events;
295296
296297 // Note that the PeerManager::process_events may block on ChannelManager's locks,
297298 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
@@ -655,7 +656,8 @@ where
655656 persister, chain_monitor,
656657 chain_monitor. process_pending_events_async( async_event_handler) . await ,
657658 channel_manager, channel_manager. process_pending_events_async( async_event_handler) . await ,
658- peer_manager, gossip_sync, logger, scorer, should_break, {
659+ peer_manager, process_onion_message_handler_events_async( & peer_manager, async_event_handler) . await ,
660+ gossip_sync, logger, scorer, should_break, {
659661 let fut = Selector {
660662 a: channel_manager. get_event_or_persistence_needed_future( ) ,
661663 b: chain_monitor. get_update_future( ) ,
@@ -679,6 +681,25 @@ where
679681 )
680682}
681683
684+ #[ cfg( feature = "futures" ) ]
685+ async fn process_onion_message_handler_events_async <
686+ EventHandlerFuture : core:: future:: Future < Output = ( ) > ,
687+ EventHandler : Fn ( Event ) -> EventHandlerFuture ,
688+ PM : ' static + Deref + Send + Sync ,
689+ > (
690+ peer_manager : & PM , handler : EventHandler
691+ )
692+ where
693+ PM :: Target : APeerManager + Send + Sync ,
694+ {
695+ let events = core:: cell:: RefCell :: new ( Vec :: new ( ) ) ;
696+ peer_manager. onion_message_handler ( ) . process_pending_events ( & |e| events. borrow_mut ( ) . push ( e) ) ;
697+ for event in events. into_inner ( ) {
698+ handler ( event) . await
699+ }
700+ }
701+
702+
682703#[ cfg( feature = "std" ) ]
683704impl BackgroundProcessor {
684705 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
@@ -788,7 +809,9 @@ impl BackgroundProcessor {
788809 define_run_body ! (
789810 persister, chain_monitor, chain_monitor. process_pending_events( & event_handler) ,
790811 channel_manager, channel_manager. process_pending_events( & event_handler) ,
791- peer_manager, gossip_sync, logger, scorer, stop_thread. load( Ordering :: Acquire ) ,
812+ peer_manager,
813+ peer_manager. onion_message_handler( ) . process_pending_events( & event_handler) ,
814+ gossip_sync, logger, scorer, stop_thread. load( Ordering :: Acquire ) ,
792815 { Sleeper :: from_two_futures(
793816 channel_manager. get_event_or_persistence_needed_future( ) ,
794817 chain_monitor. get_update_future( )
0 commit comments