Skip to content

Commit fa02602

Browse files
committed
Introduce check_sleeper helper
1 parent e15f6a3 commit fa02602

File tree

1 file changed

+109
-158
lines changed
  • lightning-background-processor/src

1 file changed

+109
-158
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 109 additions & 158 deletions
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,6 @@ where
667667
D::Target: 'static + ChangeDestinationSource,
668668
K::Target: 'static + KVStore,
669669
{
670-
let mut should_break = false;
671670
let async_event_handler = |event| {
672671
let network_graph = gossip_sync.network_graph();
673672
let event_handler = &event_handler;
@@ -744,24 +743,14 @@ where
744743
// generally, and as a fallback place such blocking only immediately before
745744
// persistence.
746745
peer_manager.as_ref().process_events();
747-
if (|fut: &mut SleepFuture| {
748-
let mut waker = dummy_waker();
749-
let mut ctx = task::Context::from_waker(&mut waker);
750-
match core::pin::Pin::new(fut).poll(&mut ctx) {
751-
task::Poll::Ready(exit) => {
752-
should_break = exit;
753-
true
754-
},
755-
task::Poll::Pending => false,
756-
}
757-
})(&mut last_forwards_processing_call)
758-
{
759-
channel_manager.get_cm().process_pending_htlc_forwards();
760-
cur_batch_delay = batch_delay.next();
761-
last_forwards_processing_call = sleeper(cur_batch_delay);
762-
}
763-
if should_break {
764-
break;
746+
match check_sleeper(&mut last_forwards_processing_call) {
747+
Some(false) => {
748+
channel_manager.get_cm().process_pending_htlc_forwards();
749+
cur_batch_delay = batch_delay.next();
750+
last_forwards_processing_call = sleeper(cur_batch_delay);
751+
},
752+
Some(true) => break,
753+
None => {},
765754
}
766755

767756
// We wait up to 100ms, but track how long it takes to detect being put to sleep,
@@ -799,27 +788,21 @@ where
799788
match fut.await {
800789
SelectorOutput::A | SelectorOutput::B | SelectorOutput::C | SelectorOutput::D => {},
801790
SelectorOutput::E(exit) => {
802-
should_break = exit;
791+
if exit {
792+
break;
793+
}
803794
},
804795
}
796+
805797
let await_slow = if mobile_interruptable_platform {
806-
(|fut: &mut SleepFuture| {
807-
let mut waker = dummy_waker();
808-
let mut ctx = task::Context::from_waker(&mut waker);
809-
match core::pin::Pin::new(fut).poll(&mut ctx) {
810-
task::Poll::Ready(exit) => {
811-
should_break = exit;
812-
true
813-
},
814-
task::Poll::Pending => false,
815-
}
816-
})(&mut await_start.unwrap())
798+
match check_sleeper(&mut await_start.unwrap()) {
799+
Some(true) => break,
800+
Some(false) => true,
801+
None => false,
802+
}
817803
} else {
818804
false
819805
};
820-
if should_break {
821-
break;
822-
}
823806
if channel_manager.get_cm().get_and_clear_needs_persistence() {
824807
log_trace!(logger, "Persisting ChannelManager...");
825808
kv_store
@@ -832,39 +815,25 @@ where
832815
.await?;
833816
log_trace!(logger, "Done persisting ChannelManager.");
834817
}
835-
if (|fut: &mut SleepFuture| {
836-
let mut waker = dummy_waker();
837-
let mut ctx = task::Context::from_waker(&mut waker);
838-
match core::pin::Pin::new(fut).poll(&mut ctx) {
839-
task::Poll::Ready(exit) => {
840-
should_break = exit;
841-
true
842-
},
843-
task::Poll::Pending => false,
844-
}
845-
})(&mut last_freshness_call)
846-
{
847-
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
848-
channel_manager.get_cm().timer_tick_occurred();
849-
last_freshness_call = sleeper(FRESHNESS_TIMER);
850-
}
851-
if (|fut: &mut SleepFuture| {
852-
let mut waker = dummy_waker();
853-
let mut ctx = task::Context::from_waker(&mut waker);
854-
match core::pin::Pin::new(fut).poll(&mut ctx) {
855-
task::Poll::Ready(exit) => {
856-
should_break = exit;
857-
true
858-
},
859-
task::Poll::Pending => false,
860-
}
861-
})(&mut last_onion_message_handler_call)
862-
{
863-
if let Some(om) = &onion_messenger {
864-
log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred");
865-
om.get_om().timer_tick_occurred();
866-
}
867-
last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER);
818+
match check_sleeper(&mut last_freshness_call) {
819+
Some(false) => {
820+
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
821+
channel_manager.get_cm().timer_tick_occurred();
822+
last_freshness_call = sleeper(FRESHNESS_TIMER);
823+
},
824+
Some(true) => break,
825+
None => {},
826+
}
827+
match check_sleeper(&mut last_onion_message_handler_call) {
828+
Some(false) => {
829+
if let Some(om) = &onion_messenger {
830+
log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred");
831+
om.get_om().timer_tick_occurred();
832+
}
833+
last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER);
834+
},
835+
Some(true) => break,
836+
None => {},
868837
}
869838
if await_slow {
870839
// On various platforms, we may be starved of CPU cycles for several reasons.
@@ -882,39 +851,31 @@ where
882851
log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
883852
peer_manager.as_ref().disconnect_all_peers();
884853
last_ping_call = sleeper(PING_TIMER);
885-
} else if (|fut: &mut SleepFuture| {
886-
let mut waker = dummy_waker();
887-
let mut ctx = task::Context::from_waker(&mut waker);
888-
match core::pin::Pin::new(fut).poll(&mut ctx) {
889-
task::Poll::Ready(exit) => {
890-
should_break = exit;
891-
true
854+
} else {
855+
match check_sleeper(&mut last_ping_call) {
856+
Some(false) => {
857+
log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
858+
peer_manager.as_ref().timer_tick_occurred();
859+
last_ping_call = sleeper(PING_TIMER);
892860
},
893-
task::Poll::Pending => false,
861+
Some(true) => break,
862+
_ => {},
894863
}
895-
})(&mut last_ping_call)
896-
{
897-
log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
898-
peer_manager.as_ref().timer_tick_occurred();
899-
last_ping_call = sleeper(PING_TIMER);
900864
}
901865

902866
// Note that we want to run a graph prune once not long after startup before
903867
// falling back to our usual hourly prunes. This avoids short-lived clients never
904868
// pruning their network graph. We run once 60 seconds after startup before
905869
// continuing our normal cadence. For RGS, since 60 seconds is likely too long,
906870
// we prune after an initial sync completes.
907-
let prune_timer_elapsed = (|fut: &mut SleepFuture| {
908-
let mut waker = dummy_waker();
909-
let mut ctx = task::Context::from_waker(&mut waker);
910-
match core::pin::Pin::new(fut).poll(&mut ctx) {
911-
task::Poll::Ready(exit) => {
912-
should_break = exit;
913-
true
914-
},
915-
task::Poll::Pending => false,
871+
let prune_timer_elapsed = {
872+
match check_sleeper(&mut last_prune_call) {
873+
Some(false) => true,
874+
Some(true) => break,
875+
None => false,
916876
}
917-
})(&mut last_prune_call);
877+
};
878+
918879
let should_prune = match gossip_sync {
919880
GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed,
920881
_ => prune_timer_elapsed,
@@ -957,76 +918,55 @@ where
957918
}
958919
have_decayed_scorer = true;
959920
}
960-
if (|fut: &mut SleepFuture| {
961-
let mut waker = dummy_waker();
962-
let mut ctx = task::Context::from_waker(&mut waker);
963-
match core::pin::Pin::new(fut).poll(&mut ctx) {
964-
task::Poll::Ready(exit) => {
965-
should_break = exit;
966-
true
967-
},
968-
task::Poll::Pending => false,
969-
}
970-
})(&mut last_scorer_persist_call)
971-
{
972-
if let Some(ref scorer) = scorer {
973-
if let Some(duration_since_epoch) = fetch_time() {
974-
log_trace!(logger, "Calling time_passed and persisting scorer");
975-
scorer.write_lock().time_passed(duration_since_epoch);
976-
} else {
977-
log_trace!(logger, "Persisting scorer");
921+
match check_sleeper(&mut last_scorer_persist_call) {
922+
Some(false) => {
923+
if let Some(ref scorer) = scorer {
924+
if let Some(duration_since_epoch) = fetch_time() {
925+
log_trace!(logger, "Calling time_passed and persisting scorer");
926+
scorer.write_lock().time_passed(duration_since_epoch);
927+
} else {
928+
log_trace!(logger, "Persisting scorer");
929+
}
930+
if let Err(e) = kv_store
931+
.write(
932+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
933+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
934+
SCORER_PERSISTENCE_KEY,
935+
&scorer.encode(),
936+
)
937+
.await
938+
{
939+
log_error!(
940+
logger,
941+
"Error: Failed to persist scorer, check your disk and permissions {}",
942+
e
943+
);
944+
}
978945
}
979-
if let Err(e) = kv_store
980-
.write(
981-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
982-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
983-
SCORER_PERSISTENCE_KEY,
984-
&scorer.encode(),
985-
)
986-
.await
987-
{
988-
log_error!(
989-
logger,
990-
"Error: Failed to persist scorer, check your disk and permissions {}",
991-
e
992-
);
946+
last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER);
947+
},
948+
Some(true) => break,
949+
None => {},
950+
}
951+
match check_sleeper(&mut last_rebroadcast_call) {
952+
Some(false) => {
953+
log_trace!(logger, "Rebroadcasting monitor's pending claims");
954+
chain_monitor.rebroadcast_pending_claims();
955+
last_rebroadcast_call = sleeper(REBROADCAST_TIMER);
956+
},
957+
Some(true) => break,
958+
None => {},
959+
}
960+
match check_sleeper(&mut last_sweeper_call) {
961+
Some(false) => {
962+
log_trace!(logger, "Regenerating sweeper spends if necessary");
963+
if let Some(ref sweeper) = sweeper {
964+
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;
993965
}
994-
}
995-
last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER);
996-
}
997-
if (|fut: &mut SleepFuture| {
998-
let mut waker = dummy_waker();
999-
let mut ctx = task::Context::from_waker(&mut waker);
1000-
match core::pin::Pin::new(fut).poll(&mut ctx) {
1001-
task::Poll::Ready(exit) => {
1002-
should_break = exit;
1003-
true
1004-
},
1005-
task::Poll::Pending => false,
1006-
}
1007-
})(&mut last_rebroadcast_call)
1008-
{
1009-
log_trace!(logger, "Rebroadcasting monitor's pending claims");
1010-
chain_monitor.rebroadcast_pending_claims();
1011-
last_rebroadcast_call = sleeper(REBROADCAST_TIMER);
1012-
}
1013-
if (|fut: &mut SleepFuture| {
1014-
let mut waker = dummy_waker();
1015-
let mut ctx = task::Context::from_waker(&mut waker);
1016-
match core::pin::Pin::new(fut).poll(&mut ctx) {
1017-
task::Poll::Ready(exit) => {
1018-
should_break = exit;
1019-
true
1020-
},
1021-
task::Poll::Pending => false,
1022-
}
1023-
})(&mut last_sweeper_call)
1024-
{
1025-
log_trace!(logger, "Regenerating sweeper spends if necessary");
1026-
if let Some(ref sweeper) = sweeper {
1027-
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;
1028-
}
1029-
last_sweeper_call = sleeper(SWEEPER_TIMER);
966+
last_sweeper_call = sleeper(SWEEPER_TIMER);
967+
},
968+
Some(true) => break,
969+
None => {},
1030970
}
1031971
}
1032972
log_trace!(logger, "Terminating background processor.");
@@ -1065,6 +1005,17 @@ where
10651005
Ok(())
10661006
}
10671007

1008+
fn check_sleeper<SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin>(
1009+
fut: &mut SleepFuture,
1010+
) -> Option<bool> {
1011+
let mut waker = dummy_waker();
1012+
let mut ctx = task::Context::from_waker(&mut waker);
1013+
match core::pin::Pin::new(fut).poll(&mut ctx) {
1014+
task::Poll::Ready(exit) => Some(exit),
1015+
task::Poll::Pending => None,
1016+
}
1017+
}
1018+
10681019
/// Async events processor that is based on [`process_events_async`] but allows for [`KVStoreSync`] to be used for
10691020
/// synchronous background persistence.
10701021
pub async fn process_events_async_with_kv_store_sync<

0 commit comments

Comments
 (0)