Skip to content

Commit dadf08f

Browse files
committed
Let BackgroundProcessor drive HTLC forwarding
Previously, we'd require the user to manually call `process_pending_htlc_forwards` as part of `PendingHTLCsForwardable` event handling. Here, we rather move this responsibility to `BackgroundProcessor`, which simplyfies the flow and allows us to implement reasonable forwarding delays on our side rather than delegating to users' implementations. Note this also introduces batching rounds rather than calling `process_pending_htlc_forwards` individually for each `PendingHTLCsForwardable` event, which had been unintuitive anyways, as subsequent `PendingHTLCsForwardable` could lead to overlapping batch intervals, resulting in the shortest timespan 'winning' every time, as `process_pending_htlc_forwards` would of course handle all pending HTLCs at once.
1 parent cb5537e commit dadf08f

File tree

4 files changed

+114
-4
lines changed

4 files changed

+114
-4
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// This file is Copyright its original authors, visible in version control
2+
// history.
3+
//
4+
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5+
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6+
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7+
// You may not use this file except in accordance with one or both of these
8+
// licenses.
9+
10+
use core::time::Duration;
11+
12+
pub(crate) struct BatchDelay {
13+
next_batch_delay_millis: u16,
14+
}
15+
16+
impl BatchDelay {
17+
pub(crate) fn new() -> Self {
18+
let next_batch_delay_millis = rand_batch_delay_millis();
19+
Self { next_batch_delay_millis }
20+
}
21+
22+
pub(crate) fn get(&self) -> Duration {
23+
Duration::from_millis(self.next_batch_delay_millis as u64)
24+
}
25+
26+
pub(crate) fn next(&mut self) -> Duration {
27+
let next = rand_batch_delay_millis();
28+
self.next_batch_delay_millis = next;
29+
Duration::from_millis(next as u64)
30+
}
31+
}
32+
33+
fn rand_batch_delay_millis() -> u16 {
34+
// TODO: actually randomize the result.
35+
100
36+
}

lightning-background-processor/src/lib.rs

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ extern crate alloc;
2626
extern crate lightning;
2727
extern crate lightning_rapid_gossip_sync;
2828

29+
mod fwd_batch;
30+
31+
use fwd_batch::BatchDelay;
32+
2933
use lightning::chain;
3034
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
3135
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
@@ -328,7 +332,7 @@ macro_rules! define_run_body {
328332
$peer_manager: ident, $gossip_sync: ident,
329333
$process_sweeper: expr,
330334
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
331-
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
335+
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $batch_delay: expr,
332336
) => { {
333337
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
334338
$channel_manager.get_cm().timer_tick_occurred();
@@ -345,6 +349,9 @@ macro_rules! define_run_body {
345349
let mut have_pruned = false;
346350
let mut have_decayed_scorer = false;
347351

352+
let mut cur_batch_delay = $batch_delay.get();
353+
let mut last_forwards_processing_call = $get_timer(cur_batch_delay);
354+
348355
loop {
349356
$process_channel_manager_events;
350357
$process_chain_monitor_events;
@@ -369,6 +376,19 @@ macro_rules! define_run_body {
369376
break;
370377
}
371378

379+
if $timer_elapsed(&mut last_forwards_processing_call, cur_batch_delay) {
380+
$channel_manager.get_cm().process_pending_htlc_forwards();
381+
cur_batch_delay = $batch_delay.next();
382+
last_forwards_processing_call = $get_timer(cur_batch_delay);
383+
}
384+
385+
// Checke whether to exit the loop again, as some time might have passed since we
386+
// checked above.
387+
if $loop_exit_check {
388+
log_trace!($logger, "Terminating background processor.");
389+
break;
390+
}
391+
372392
// We wait up to 100ms, but track how long it takes to detect being put to sleep,
373393
// see `await_start`'s use below.
374394
let mut await_start = None;
@@ -523,12 +543,14 @@ pub(crate) mod futures_util {
523543
C: Future<Output = ()> + Unpin,
524544
D: Future<Output = ()> + Unpin,
525545
E: Future<Output = bool> + Unpin,
546+
F: Future<Output = bool> + Unpin,
526547
> {
527548
pub a: A,
528549
pub b: B,
529550
pub c: C,
530551
pub d: D,
531552
pub e: E,
553+
pub f: F,
532554
}
533555

534556
pub(crate) enum SelectorOutput {
@@ -537,6 +559,7 @@ pub(crate) mod futures_util {
537559
C,
538560
D,
539561
E(bool),
562+
F(bool),
540563
}
541564

542565
impl<
@@ -545,7 +568,8 @@ pub(crate) mod futures_util {
545568
C: Future<Output = ()> + Unpin,
546569
D: Future<Output = ()> + Unpin,
547570
E: Future<Output = bool> + Unpin,
548-
> Future for Selector<A, B, C, D, E>
571+
F: Future<Output = bool> + Unpin,
572+
> Future for Selector<A, B, C, D, E, F>
549573
{
550574
type Output = SelectorOutput;
551575
fn poll(
@@ -581,6 +605,12 @@ pub(crate) mod futures_util {
581605
},
582606
Poll::Pending => {},
583607
}
608+
match Pin::new(&mut self.f).poll(ctx) {
609+
Poll::Ready(res) => {
610+
return Poll::Ready(SelectorOutput::F(res));
611+
},
612+
Poll::Pending => {},
613+
}
584614
Poll::Pending
585615
}
586616
}
@@ -863,6 +893,7 @@ where
863893
event_handler(event).await
864894
})
865895
};
896+
let mut batch_delay = BatchDelay::new();
866897
define_run_body!(
867898
persister,
868899
chain_monitor,
@@ -901,7 +932,12 @@ where
901932
b: chain_monitor.get_update_future(),
902933
c: om_fut,
903934
d: lm_fut,
904-
e: sleeper(if mobile_interruptable_platform {
935+
e: sleeper(if channel_manager.get_cm().needs_pending_htlc_processing() {
936+
batch_delay.get()
937+
} else {
938+
Duration::MAX
939+
}),
940+
f: sleeper(if mobile_interruptable_platform {
905941
Duration::from_millis(100)
906942
} else {
907943
FASTEST_TIMER
@@ -912,6 +948,9 @@ where
912948
SelectorOutput::E(exit) => {
913949
should_break = exit;
914950
},
951+
SelectorOutput::F(exit) => {
952+
should_break = exit;
953+
},
915954
}
916955
},
917956
|t| sleeper(t),
@@ -928,6 +967,7 @@ where
928967
},
929968
mobile_interruptable_platform,
930969
fetch_time,
970+
batch_delay,
931971
)
932972
}
933973

@@ -1051,6 +1091,7 @@ impl BackgroundProcessor {
10511091
}
10521092
event_handler.handle_event(event)
10531093
};
1094+
let mut batch_delay = BatchDelay::new();
10541095
define_run_body!(
10551096
persister,
10561097
chain_monitor,
@@ -1094,7 +1135,13 @@ impl BackgroundProcessor {
10941135
&chain_monitor.get_update_future(),
10951136
),
10961137
};
1097-
sleeper.wait_timeout(Duration::from_millis(100));
1138+
let batch_delay = if channel_manager.get_cm().needs_pending_htlc_processing() {
1139+
batch_delay.get()
1140+
} else {
1141+
Duration::MAX
1142+
};
1143+
let fastest_timeout = batch_delay.min(Duration::from_millis(100));
1144+
sleeper.wait_timeout(fastest_timeout);
10981145
},
10991146
|_| Instant::now(),
11001147
|time: &Instant, dur| time.elapsed() > dur,
@@ -1107,6 +1154,7 @@ impl BackgroundProcessor {
11071154
.expect("Time should be sometime after 1970"),
11081155
)
11091156
},
1157+
batch_delay,
11101158
)
11111159
});
11121160
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }

lightning/src/ln/channelmanager.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6318,6 +6318,21 @@ where
63186318
}
63196319
}
63206320

6321+
/// Returns whether we have pending HTLC forwards that need to be processed via
6322+
/// [`Self::process_pending_htlc_forwards`].
6323+
pub fn needs_pending_htlc_processing(&self) -> bool {
6324+
if !self.forward_htlcs.lock().unwrap().is_empty() {
6325+
return true;
6326+
}
6327+
if !self.decode_update_add_htlcs.lock().unwrap().is_empty() {
6328+
return true;
6329+
}
6330+
if self.pending_outbound_payments.needs_abandon_or_retry() {
6331+
return true;
6332+
}
6333+
false
6334+
}
6335+
63216336
/// Processes HTLCs which are pending waiting on random forward delay.
63226337
///
63236338
/// Should only really ever be called in response to a PendingHTLCsForwardable event.

lightning/src/ln/outbound_payment.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1328,6 +1328,17 @@ impl OutboundPayments {
13281328
})
13291329
}
13301330

1331+
pub(super) fn needs_abandon_or_retry(&self) -> bool {
1332+
let outbounds = self.pending_outbound_payments.lock().unwrap();
1333+
outbounds.iter().any(|(_, pmt)| {
1334+
pmt.is_auto_retryable_now()
1335+
|| !pmt.is_auto_retryable_now()
1336+
&& pmt.remaining_parts() == 0
1337+
&& !pmt.is_fulfilled()
1338+
&& !pmt.is_awaiting_invoice()
1339+
})
1340+
}
1341+
13311342
#[rustfmt::skip]
13321343
fn find_initial_route<R: Deref, NS: Deref, IH, L: Deref>(
13331344
&self, payment_id: PaymentId, payment_hash: PaymentHash, recipient_onion: &RecipientOnionFields,

0 commit comments

Comments
 (0)