Skip to content

Commit aeaf1eb

Browse files
committed
Extend check_sleeper to always get a new sleeper when ready
This makes it impossible to forget to reinitialize the sleeper and poll a future that is already ready.
1 parent 664511b commit aeaf1eb

File tree

1 file changed

+32
-26
lines changed
  • lightning-background-processor/src

1 file changed

+32
-26
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -829,8 +829,7 @@ where
829829
let mut have_pruned = false;
830830
let mut have_decayed_scorer = false;
831831

832-
let mut cur_batch_delay = batch_delay.get();
833-
let mut last_forwards_processing_call = sleeper(cur_batch_delay);
832+
let mut last_forwards_processing_call = sleeper(batch_delay.get());
834833

835834
loop {
836835
channel_manager.get_cm().process_pending_events_async(async_event_handler).await;
@@ -851,11 +850,11 @@ where
851850
// generally, and as a fallback place such blocking only immediately before
852851
// persistence.
853852
peer_manager.as_ref().process_events();
854-
match check_sleeper(&mut last_forwards_processing_call) {
853+
match check_and_reset_sleeper(&mut last_forwards_processing_call, || {
854+
sleeper(batch_delay.next())
855+
}) {
855856
Some(false) => {
856857
channel_manager.get_cm().process_pending_htlc_forwards();
857-
cur_batch_delay = batch_delay.next();
858-
last_forwards_processing_call = sleeper(cur_batch_delay);
859858
},
860859
Some(true) => break,
861860
None => {},
@@ -903,19 +902,20 @@ where
903902
}
904903

905904
let await_slow = if mobile_interruptable_platform {
906-
match check_sleeper(&mut await_start.unwrap()) {
905+
// Specify a zero new sleeper timeout because we won't use the new sleeper. It is re-initialized in the next
906+
// loop iteration.
907+
match check_and_reset_sleeper(&mut await_start.unwrap(), || sleeper(Duration::ZERO)) {
907908
Some(true) => break,
908909
Some(false) => true,
909910
None => false,
910911
}
911912
} else {
912913
false
913914
};
914-
match check_sleeper(&mut last_freshness_call) {
915+
match check_and_reset_sleeper(&mut last_freshness_call, || sleeper(FRESHNESS_TIMER)) {
915916
Some(false) => {
916917
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
917918
channel_manager.get_cm().timer_tick_occurred();
918-
last_freshness_call = sleeper(FRESHNESS_TIMER);
919919
},
920920
Some(true) => break,
921921
None => {},
@@ -947,8 +947,13 @@ where
947947
// pruning their network graph. We run once 60 seconds after startup before
948948
// continuing our normal cadence. For RGS, since 60 seconds is likely too long,
949949
// we prune after an initial sync completes.
950+
let prune_timer = if gossip_sync.prunable_network_graph().is_some() {
951+
NETWORK_PRUNE_TIMER
952+
} else {
953+
FIRST_NETWORK_PRUNE_TIMER
954+
};
950955
let prune_timer_elapsed = {
951-
match check_sleeper(&mut last_prune_call) {
956+
match check_and_reset_sleeper(&mut last_prune_call, || sleeper(prune_timer)) {
952957
Some(false) => true,
953958
Some(true) => break,
954959
None => false,
@@ -992,9 +997,6 @@ where
992997

993998
have_pruned = true;
994999
}
995-
let prune_timer =
996-
if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
997-
last_prune_call = sleeper(prune_timer);
9981000
}
9991001
if !have_decayed_scorer {
10001002
if let Some(ref scorer) = scorer {
@@ -1005,7 +1007,9 @@ where
10051007
}
10061008
have_decayed_scorer = true;
10071009
}
1008-
match check_sleeper(&mut last_scorer_persist_call) {
1010+
match check_and_reset_sleeper(&mut last_scorer_persist_call, || {
1011+
sleeper(SCORER_PERSIST_TIMER)
1012+
}) {
10091013
Some(false) => {
10101014
if let Some(ref scorer) = scorer {
10111015
if let Some(duration_since_epoch) = fetch_time() {
@@ -1037,12 +1041,11 @@ where
10371041
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
10381042
futures.set_c(Box::pin(fut));
10391043
}
1040-
last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER);
10411044
},
10421045
Some(true) => break,
10431046
None => {},
10441047
}
1045-
match check_sleeper(&mut last_sweeper_call) {
1048+
match check_and_reset_sleeper(&mut last_sweeper_call, || sleeper(SWEEPER_TIMER)) {
10461049
Some(false) => {
10471050
log_trace!(logger, "Regenerating sweeper spends if necessary");
10481051
if let Some(ref sweeper) = sweeper {
@@ -1055,7 +1058,6 @@ where
10551058
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
10561059
futures.set_d(Box::pin(fut));
10571060
}
1058-
last_sweeper_call = sleeper(SWEEPER_TIMER);
10591061
},
10601062
Some(true) => break,
10611063
None => {},
@@ -1066,13 +1068,14 @@ where
10661068
res?;
10671069
}
10681070

1069-
match check_sleeper(&mut last_onion_message_handler_call) {
1071+
match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
1072+
sleeper(ONION_MESSAGE_HANDLER_TIMER)
1073+
}) {
10701074
Some(false) => {
10711075
if let Some(om) = &onion_messenger {
10721076
log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred");
10731077
om.get_om().timer_tick_occurred();
10741078
}
1075-
last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER);
10761079
},
10771080
Some(true) => break,
10781081
None => {},
@@ -1096,23 +1099,21 @@ where
10961099
peer_manager.as_ref().disconnect_all_peers();
10971100
last_ping_call = sleeper(PING_TIMER);
10981101
} else {
1099-
match check_sleeper(&mut last_ping_call) {
1102+
match check_and_reset_sleeper(&mut last_ping_call, || sleeper(PING_TIMER)) {
11001103
Some(false) => {
11011104
log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
11021105
peer_manager.as_ref().timer_tick_occurred();
1103-
last_ping_call = sleeper(PING_TIMER);
11041106
},
11051107
Some(true) => break,
11061108
_ => {},
11071109
}
11081110
}
11091111

11101112
// Rebroadcast pending claims.
1111-
match check_sleeper(&mut last_rebroadcast_call) {
1113+
match check_and_reset_sleeper(&mut last_rebroadcast_call, || sleeper(REBROADCAST_TIMER)) {
11121114
Some(false) => {
11131115
log_trace!(logger, "Rebroadcasting monitor's pending claims");
11141116
chain_monitor.rebroadcast_pending_claims();
1115-
last_rebroadcast_call = sleeper(REBROADCAST_TIMER);
11161117
},
11171118
Some(true) => break,
11181119
None => {},
@@ -1154,13 +1155,18 @@ where
11541155
Ok(())
11551156
}
11561157

1157-
fn check_sleeper<SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin>(
1158-
fut: &mut SleepFuture,
1158+
fn check_and_reset_sleeper<
1159+
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
1160+
>(
1161+
fut: &mut SleepFuture, mut new_sleeper: impl FnMut() -> SleepFuture,
11591162
) -> Option<bool> {
11601163
let mut waker = dummy_waker();
11611164
let mut ctx = task::Context::from_waker(&mut waker);
1162-
match core::pin::Pin::new(fut).poll(&mut ctx) {
1163-
task::Poll::Ready(exit) => Some(exit),
1165+
match core::pin::Pin::new(&mut *fut).poll(&mut ctx) {
1166+
task::Poll::Ready(exit) => {
1167+
*fut = new_sleeper();
1168+
Some(exit)
1169+
},
11641170
task::Poll::Pending => None,
11651171
}
11661172
}

0 commit comments

Comments
 (0)