@@ -38,6 +38,8 @@ use lightning::routing::router::Router;
3838use lightning:: routing:: scoring:: { Score , WriteableScore } ;
3939use lightning:: util:: logger:: Logger ;
4040use lightning:: util:: persist:: Persister ;
41+ #[ cfg( feature = "std" ) ]
42+ use lightning:: util:: wakers:: Sleeper ;
4143use lightning_rapid_gossip_sync:: RapidGossipSync ;
4244
4345use core:: ops:: Deref ;
@@ -114,6 +116,13 @@ const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
114116#[ cfg( test) ]
115117const FIRST_NETWORK_PRUNE_TIMER : u64 = 1 ;
116118
119+ #[ cfg( feature = "futures" ) ]
120+ /// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
121+ const fn min_u64 ( a : u64 , b : u64 ) -> u64 { if a < b { a } else { b } }
122+ #[ cfg( feature = "futures" ) ]
123+ const FASTEST_TIMER : u64 = min_u64 ( min_u64 ( FRESHNESS_TIMER , PING_TIMER ) ,
124+ min_u64 ( SCORER_PERSIST_TIMER , FIRST_NETWORK_PRUNE_TIMER ) ) ;
125+
117126/// Either [`P2PGossipSync`] or [`RapidGossipSync`].
118127pub enum GossipSync <
119128 P : Deref < Target = P2PGossipSync < G , U , L > > ,
@@ -256,7 +265,8 @@ macro_rules! define_run_body {
256265 ( $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
257266 $channel_manager: ident, $process_channel_manager_events: expr,
258267 $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
259- $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
268+ $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr,
269+ $check_slow_await: expr)
260270 => { {
261271 log_trace!( $logger, "Calling ChannelManager's timer_tick_occurred on startup" ) ;
262272 $channel_manager. timer_tick_occurred( ) ;
@@ -286,9 +296,10 @@ macro_rules! define_run_body {
286296
287297 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
288298 // see `await_start`'s use below.
289- let mut await_start = $get_timer( 1 ) ;
299+ let mut await_start = None ;
300+ if $check_slow_await { await_start = Some ( $get_timer( 1 ) ) ; }
290301 let updates_available = $await;
291- let await_slow = $ timer_elapsed( & mut await_start, 1 ) ;
302+ let await_slow = if $check_slow_await { $ timer_elapsed( & mut await_start. unwrap ( ) , 1 ) } else { false } ;
292303
293304 if updates_available {
294305 log_trace!( $logger, "Persisting ChannelManager..." ) ;
@@ -388,23 +399,32 @@ pub(crate) mod futures_util {
388399 use core:: task:: { Poll , Waker , RawWaker , RawWakerVTable } ;
389400 use core:: pin:: Pin ;
390401 use core:: marker:: Unpin ;
391- pub ( crate ) struct Selector < A : Future < Output =( ) > + Unpin , B : Future < Output =bool > + Unpin > {
402+ pub ( crate ) struct Selector <
403+ A : Future < Output =( ) > + Unpin , B : Future < Output =( ) > + Unpin , C : Future < Output =bool > + Unpin
404+ > {
392405 pub a : A ,
393406 pub b : B ,
407+ pub c : C ,
394408 }
395409 pub ( crate ) enum SelectorOutput {
396- A , B ( bool ) ,
410+ A , B , C ( bool ) ,
397411 }
398412
399- impl < A : Future < Output =( ) > + Unpin , B : Future < Output =bool > + Unpin > Future for Selector < A , B > {
413+ impl <
414+ A : Future < Output =( ) > + Unpin , B : Future < Output =( ) > + Unpin , C : Future < Output =bool > + Unpin
415+ > Future for Selector < A , B , C > {
400416 type Output = SelectorOutput ;
401417 fn poll ( mut self : Pin < & mut Self > , ctx : & mut core:: task:: Context < ' _ > ) -> Poll < SelectorOutput > {
402418 match Pin :: new ( & mut self . a ) . poll ( ctx) {
403419 Poll :: Ready ( ( ) ) => { return Poll :: Ready ( SelectorOutput :: A ) ; } ,
404420 Poll :: Pending => { } ,
405421 }
406422 match Pin :: new ( & mut self . b ) . poll ( ctx) {
407- Poll :: Ready ( res) => { return Poll :: Ready ( SelectorOutput :: B ( res) ) ; } ,
423+ Poll :: Ready ( ( ) ) => { return Poll :: Ready ( SelectorOutput :: B ) ; } ,
424+ Poll :: Pending => { } ,
425+ }
426+ match Pin :: new ( & mut self . c ) . poll ( ctx) {
427+ Poll :: Ready ( res) => { return Poll :: Ready ( SelectorOutput :: C ( res) ) ; } ,
408428 Poll :: Pending => { } ,
409429 }
410430 Poll :: Pending
@@ -438,6 +458,11 @@ use core::task;
438458/// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
439459/// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
440460/// manually instead.
461+ ///
462+ /// The `mobile_interruptable_platform` flag should be set if we're currently running on a
463+ /// mobile device, where we may need to check for interruption of the application regularly. If you
464+ /// are unsure, you should set the flag, as the performance impact of it is minimal unless there
465+ /// are hundreds or thousands of simultaneous process calls running.
441466#[ cfg( feature = "futures" ) ]
442467pub async fn process_events_async <
443468 ' a ,
@@ -473,7 +498,7 @@ pub async fn process_events_async<
473498> (
474499 persister : PS , event_handler : EventHandler , chain_monitor : M , channel_manager : CM ,
475500 gossip_sync : GossipSync < PGS , RGS , G , UL , L > , peer_manager : PM , logger : L , scorer : Option < S > ,
476- sleeper : Sleeper ,
501+ sleeper : Sleeper , mobile_interruptable_platform : bool ,
477502) -> Result < ( ) , lightning:: io:: Error >
478503where
479504 UL :: Target : ' static + UtxoLookup ,
@@ -514,11 +539,13 @@ where
514539 gossip_sync, peer_manager, logger, scorer, should_break, {
515540 let fut = Selector {
516541 a: channel_manager. get_persistable_update_future( ) ,
517- b: sleeper( Duration :: from_millis( 100 ) ) ,
542+ b: chain_monitor. get_update_future( ) ,
543+ c: sleeper( if mobile_interruptable_platform { Duration :: from_millis( 100 ) } else { Duration :: from_secs( FASTEST_TIMER ) } ) ,
518544 } ;
519545 match fut. await {
520546 SelectorOutput :: A => true ,
521- SelectorOutput :: B ( exit) => {
547+ SelectorOutput :: B => false ,
548+ SelectorOutput :: C ( exit) => {
522549 should_break = exit;
523550 false
524551 }
@@ -528,7 +555,7 @@ where
528555 let mut waker = dummy_waker( ) ;
529556 let mut ctx = task:: Context :: from_waker( & mut waker) ;
530557 core:: pin:: Pin :: new( fut) . poll( & mut ctx) . is_ready( )
531- } )
558+ } , mobile_interruptable_platform )
532559}
533560
534561#[ cfg( feature = "std" ) ]
@@ -643,8 +670,11 @@ impl BackgroundProcessor {
643670 define_run_body ! ( persister, chain_monitor, chain_monitor. process_pending_events( & event_handler) ,
644671 channel_manager, channel_manager. process_pending_events( & event_handler) ,
645672 gossip_sync, peer_manager, logger, scorer, stop_thread. load( Ordering :: Acquire ) ,
646- channel_manager. await_persistable_update_timeout( Duration :: from_millis( 100 ) ) ,
647- |_| Instant :: now( ) , |time: & Instant , dur| time. elapsed( ) . as_secs( ) > dur)
673+ Sleeper :: from_two_futures(
674+ channel_manager. get_persistable_update_future( ) ,
675+ chain_monitor. get_update_future( )
676+ ) . wait_timeout( Duration :: from_millis( 100 ) ) ,
677+ |_| Instant :: now( ) , |time: & Instant , dur| time. elapsed( ) . as_secs( ) > dur, false )
648678 } ) ;
649679 Self { stop_thread : stop_thread_clone, thread_handle : Some ( handle) }
650680 }
0 commit comments