Skip to content

Commit cf742b7

Browse files
committed
Introduce check_sleeper helper
1 parent df1e30c commit cf742b7

File tree

1 file changed

+137
-158
lines changed
  • lightning-background-processor/src

1 file changed

+137
-158
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 137 additions & 158 deletions
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,6 @@ where
667667
D::Target: 'static + ChangeDestinationSource,
668668
K::Target: 'static + KVStore,
669669
{
670-
let mut should_break = false;
671670
let async_event_handler = |event| {
672671
let network_graph = gossip_sync.network_graph();
673672
let event_handler = &event_handler;
@@ -722,35 +721,36 @@ where
722721
let mut cur_batch_delay = batch_delay.get();
723722
let mut last_forwards_processing_call = sleeper(cur_batch_delay);
724723
loop {
724+
// Handle channel manager events.
725725
channel_manager.get_cm().process_pending_events_async(async_event_handler).await;
726+
727+
// Handle chain monitor events.
726728
chain_monitor.process_pending_events_async(async_event_handler).await;
729+
730+
// Handle onion messenger events.
727731
if let Some(om) = &onion_messenger {
728732
om.get_om().process_pending_events_async(async_event_handler).await
729733
}
734+
735+
// Handle peer manager events.
730736
peer_manager.as_ref().process_events();
731-
if (|fut: &mut SleepFuture| {
732-
let mut waker = dummy_waker();
733-
let mut ctx = task::Context::from_waker(&mut waker);
734-
match core::pin::Pin::new(fut).poll(&mut ctx) {
735-
task::Poll::Ready(exit) => {
736-
should_break = exit;
737-
true
738-
},
739-
task::Poll::Pending => false,
740-
}
741-
})(&mut last_forwards_processing_call)
742-
{
743-
channel_manager.get_cm().process_pending_htlc_forwards();
744-
cur_batch_delay = batch_delay.next();
745-
last_forwards_processing_call = sleeper(cur_batch_delay);
746-
}
747-
if should_break {
748-
break;
737+
match check_sleeper(&mut last_forwards_processing_call) {
738+
Some(false) => {
739+
channel_manager.get_cm().process_pending_htlc_forwards();
740+
cur_batch_delay = batch_delay.next();
741+
last_forwards_processing_call = sleeper(cur_batch_delay);
742+
},
743+
Some(true) => break,
744+
None => {},
749745
}
746+
747+
// Start mobile interruptable platform check timer.
750748
let mut await_start = None;
751749
if mobile_interruptable_platform {
752750
await_start = Some(sleeper(Duration::from_secs(1)));
753751
}
752+
753+
// Wait for events or timeouts.
754754
let om_fut = if let Some(om) = onion_messenger.as_ref() {
755755
let fut = om.get_om().get_update_future();
756756
OptionalSelector { optional_future: Some(fut) }
@@ -780,27 +780,24 @@ where
780780
match fut.await {
781781
SelectorOutput::A | SelectorOutput::B | SelectorOutput::C | SelectorOutput::D => {},
782782
SelectorOutput::E(exit) => {
783-
should_break = exit;
783+
if exit {
784+
break;
785+
}
784786
},
785787
}
788+
789+
// Check to see if we were interrupted on a mobile platform.
786790
let await_slow = if mobile_interruptable_platform {
787-
(|fut: &mut SleepFuture| {
788-
let mut waker = dummy_waker();
789-
let mut ctx = task::Context::from_waker(&mut waker);
790-
match core::pin::Pin::new(fut).poll(&mut ctx) {
791-
task::Poll::Ready(exit) => {
792-
should_break = exit;
793-
true
794-
},
795-
task::Poll::Pending => false,
796-
}
797-
})(&mut await_start.unwrap())
791+
match check_sleeper(&mut await_start.unwrap()) {
792+
Some(true) => break,
793+
Some(false) => true,
794+
None => false,
795+
}
798796
} else {
799797
false
800798
};
801-
if should_break {
802-
break;
803-
}
799+
800+
// Persist channel manager.
804801
if channel_manager.get_cm().get_and_clear_needs_persistence() {
805802
log_trace!(logger, "Persisting ChannelManager...");
806803
kv_store
@@ -813,71 +810,57 @@ where
813810
.await?;
814811
log_trace!(logger, "Done persisting ChannelManager.");
815812
}
816-
if (|fut: &mut SleepFuture| {
817-
let mut waker = dummy_waker();
818-
let mut ctx = task::Context::from_waker(&mut waker);
819-
match core::pin::Pin::new(fut).poll(&mut ctx) {
820-
task::Poll::Ready(exit) => {
821-
should_break = exit;
822-
true
823-
},
824-
task::Poll::Pending => false,
825-
}
826-
})(&mut last_freshness_call)
827-
{
828-
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
829-
channel_manager.get_cm().timer_tick_occurred();
830-
last_freshness_call = sleeper(FRESHNESS_TIMER);
831-
}
832-
if (|fut: &mut SleepFuture| {
833-
let mut waker = dummy_waker();
834-
let mut ctx = task::Context::from_waker(&mut waker);
835-
match core::pin::Pin::new(fut).poll(&mut ctx) {
836-
task::Poll::Ready(exit) => {
837-
should_break = exit;
838-
true
839-
},
840-
task::Poll::Pending => false,
841-
}
842-
})(&mut last_onion_message_handler_call)
843-
{
844-
if let Some(om) = &onion_messenger {
845-
log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred");
846-
om.get_om().timer_tick_occurred();
847-
}
848-
last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER);
813+
814+
// Channel manager timer tick.
815+
match check_sleeper(&mut last_freshness_call) {
816+
Some(false) => {
817+
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
818+
channel_manager.get_cm().timer_tick_occurred();
819+
last_freshness_call = sleeper(FRESHNESS_TIMER);
820+
},
821+
Some(true) => break,
822+
None => {},
823+
}
824+
825+
// Onion messenger timer tick.
826+
match check_sleeper(&mut last_onion_message_handler_call) {
827+
Some(false) => {
828+
if let Some(om) = &onion_messenger {
829+
log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred");
830+
om.get_om().timer_tick_occurred();
831+
}
832+
last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER);
833+
},
834+
Some(true) => break,
835+
None => {},
849836
}
837+
838+
// Peer manager timer tick. If we were interrupted on a mobile platform, we disconnect all peers.
850839
if await_slow {
851840
log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
852841
peer_manager.as_ref().disconnect_all_peers();
853842
last_ping_call = sleeper(PING_TIMER);
854-
} else if (|fut: &mut SleepFuture| {
855-
let mut waker = dummy_waker();
856-
let mut ctx = task::Context::from_waker(&mut waker);
857-
match core::pin::Pin::new(fut).poll(&mut ctx) {
858-
task::Poll::Ready(exit) => {
859-
should_break = exit;
860-
true
843+
} else {
844+
match check_sleeper(&mut last_ping_call) {
845+
Some(false) => {
846+
log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
847+
peer_manager.as_ref().timer_tick_occurred();
848+
last_ping_call = sleeper(PING_TIMER);
861849
},
862-
task::Poll::Pending => false,
850+
Some(true) => break,
851+
_ => {},
863852
}
864-
})(&mut last_ping_call)
865-
{
866-
log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
867-
peer_manager.as_ref().timer_tick_occurred();
868-
last_ping_call = sleeper(PING_TIMER);
869853
}
870-
let prune_timer_elapsed = (|fut: &mut SleepFuture| {
871-
let mut waker = dummy_waker();
872-
let mut ctx = task::Context::from_waker(&mut waker);
873-
match core::pin::Pin::new(fut).poll(&mut ctx) {
874-
task::Poll::Ready(exit) => {
875-
should_break = exit;
876-
true
877-
},
878-
task::Poll::Pending => false,
854+
855+
// Prune and persist the network graph if necessary.
856+
let prune_timer_elapsed = {
857+
match check_sleeper(&mut last_prune_call) {
858+
Some(false) => true,
859+
Some(true) => break,
860+
None => false,
879861
}
880-
})(&mut last_prune_call);
862+
};
863+
881864
let should_prune = match gossip_sync {
882865
GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed,
883866
_ => prune_timer_elapsed,
@@ -910,6 +893,8 @@ where
910893
if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
911894
last_prune_call = sleeper(prune_timer);
912895
}
896+
897+
// Decay and persist the scorer if necessary.
913898
if !have_decayed_scorer {
914899
if let Some(ref scorer) = scorer {
915900
if let Some(duration_since_epoch) = fetch_time() {
@@ -919,76 +904,59 @@ where
919904
}
920905
have_decayed_scorer = true;
921906
}
922-
if (|fut: &mut SleepFuture| {
923-
let mut waker = dummy_waker();
924-
let mut ctx = task::Context::from_waker(&mut waker);
925-
match core::pin::Pin::new(fut).poll(&mut ctx) {
926-
task::Poll::Ready(exit) => {
927-
should_break = exit;
928-
true
929-
},
930-
task::Poll::Pending => false,
931-
}
932-
})(&mut last_scorer_persist_call)
933-
{
934-
if let Some(ref scorer) = scorer {
935-
if let Some(duration_since_epoch) = fetch_time() {
936-
log_trace!(logger, "Calling time_passed and persisting scorer");
937-
scorer.write_lock().time_passed(duration_since_epoch);
938-
} else {
939-
log_trace!(logger, "Persisting scorer");
907+
match check_sleeper(&mut last_scorer_persist_call) {
908+
Some(false) => {
909+
if let Some(ref scorer) = scorer {
910+
if let Some(duration_since_epoch) = fetch_time() {
911+
log_trace!(logger, "Calling time_passed and persisting scorer");
912+
scorer.write_lock().time_passed(duration_since_epoch);
913+
} else {
914+
log_trace!(logger, "Persisting scorer");
915+
}
916+
if let Err(e) = kv_store
917+
.write(
918+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
919+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
920+
SCORER_PERSISTENCE_KEY,
921+
&scorer.encode(),
922+
)
923+
.await
924+
{
925+
log_error!(
926+
logger,
927+
"Error: Failed to persist scorer, check your disk and permissions {}",
928+
e
929+
);
930+
}
940931
}
941-
if let Err(e) = kv_store
942-
.write(
943-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
944-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
945-
SCORER_PERSISTENCE_KEY,
946-
&scorer.encode(),
947-
)
948-
.await
949-
{
950-
log_error!(
951-
logger,
952-
"Error: Failed to persist scorer, check your disk and permissions {}",
953-
e
954-
);
932+
last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER);
933+
},
934+
Some(true) => break,
935+
None => {},
936+
}
937+
938+
// Rebroadcast pending claims.
939+
match check_sleeper(&mut last_rebroadcast_call) {
940+
Some(false) => {
941+
log_trace!(logger, "Rebroadcasting monitor's pending claims");
942+
chain_monitor.rebroadcast_pending_claims();
943+
last_rebroadcast_call = sleeper(REBROADCAST_TIMER);
944+
},
945+
Some(true) => break,
946+
None => {},
947+
}
948+
949+
// Sweeper regeneration and broadcast.
950+
match check_sleeper(&mut last_sweeper_call) {
951+
Some(false) => {
952+
log_trace!(logger, "Regenerating sweeper spends if necessary");
953+
if let Some(ref sweeper) = sweeper {
954+
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;
955955
}
956-
}
957-
last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER);
958-
}
959-
if (|fut: &mut SleepFuture| {
960-
let mut waker = dummy_waker();
961-
let mut ctx = task::Context::from_waker(&mut waker);
962-
match core::pin::Pin::new(fut).poll(&mut ctx) {
963-
task::Poll::Ready(exit) => {
964-
should_break = exit;
965-
true
966-
},
967-
task::Poll::Pending => false,
968-
}
969-
})(&mut last_rebroadcast_call)
970-
{
971-
log_trace!(logger, "Rebroadcasting monitor's pending claims");
972-
chain_monitor.rebroadcast_pending_claims();
973-
last_rebroadcast_call = sleeper(REBROADCAST_TIMER);
974-
}
975-
if (|fut: &mut SleepFuture| {
976-
let mut waker = dummy_waker();
977-
let mut ctx = task::Context::from_waker(&mut waker);
978-
match core::pin::Pin::new(fut).poll(&mut ctx) {
979-
task::Poll::Ready(exit) => {
980-
should_break = exit;
981-
true
982-
},
983-
task::Poll::Pending => false,
984-
}
985-
})(&mut last_sweeper_call)
986-
{
987-
log_trace!(logger, "Regenerating sweeper spends if necessary");
988-
if let Some(ref sweeper) = sweeper {
989-
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;
990-
}
991-
last_sweeper_call = sleeper(SWEEPER_TIMER);
956+
last_sweeper_call = sleeper(SWEEPER_TIMER);
957+
},
958+
Some(true) => break,
959+
None => {},
992960
}
993961
}
994962
log_trace!(logger, "Terminating background processor.");
@@ -1024,6 +992,17 @@ where
1024992
Ok(())
1025993
}
1026994

995+
fn check_sleeper<SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin>(
996+
fut: &mut SleepFuture,
997+
) -> Option<bool> {
998+
let mut waker = dummy_waker();
999+
let mut ctx = task::Context::from_waker(&mut waker);
1000+
match core::pin::Pin::new(fut).poll(&mut ctx) {
1001+
task::Poll::Ready(exit) => Some(exit),
1002+
task::Poll::Pending => None,
1003+
}
1004+
}
1005+
10271006
/// Async events processor that is based on [`process_events_async`] but allows for [`KVStoreSync`] to be used for
10281007
/// synchronous background persistence.
10291008
pub async fn process_events_async_with_kv_store_sync<

0 commit comments

Comments
 (0)