Skip to content

Commit 7a9c301

Browse files
committed
Let BackgroundProcessor drive HTLC forwarding
Previously, we'd require the user to manually call `process_pending_htlc_forwards` as part of `PendingHTLCsForwardable` event handling. Here, we rather move this responsibility to `BackgroundProcessor`, which simplyfies the flow and allows us to implement reasonable forwarding delays on our side rather than delegating to users' implementations. Note this also introduces batching rounds rather than calling `process_pending_htlc_forwards` individually for each `PendingHTLCsForwardable` event, which had been unintuitive anyways, as subsequent `PendingHTLCsForwardable` could lead to overlapping batch intervals, resulting in the shortest timespan 'winning' every time, as `process_pending_htlc_forwards` would of course handle all pending HTLCs at once.
1 parent 597b28b commit 7a9c301

File tree

3 files changed

+99
-4
lines changed

3 files changed

+99
-4
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// This file is Copyright its original authors, visible in version control
2+
// history.
3+
//
4+
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5+
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6+
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7+
// You may not use this file except in accordance with one or both of these
8+
// licenses.
9+
10+
use core::time::Duration;
11+
12+
pub(crate) struct BatchDelay {
13+
next_batch_delay_millis: u16,
14+
}
15+
16+
impl BatchDelay {
17+
pub(crate) fn new() -> Self {
18+
let next_batch_delay_millis = rand_batch_delay_millis();
19+
Self { next_batch_delay_millis }
20+
}
21+
22+
pub(crate) fn get(&self) -> Duration {
23+
Duration::from_millis(self.next_batch_delay_millis as u64)
24+
}
25+
26+
pub(crate) fn next(&mut self) -> Duration {
27+
let next = rand_batch_delay_millis();
28+
self.next_batch_delay_millis = next;
29+
Duration::from_millis(next as u64)
30+
}
31+
}
32+
33+
fn rand_batch_delay_millis() -> u16 {
34+
// TODO: actually randomize the result.
35+
100
36+
}

lightning-background-processor/src/lib.rs

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ extern crate alloc;
2626
extern crate lightning;
2727
extern crate lightning_rapid_gossip_sync;
2828

29+
mod fwd_batch;
30+
31+
use fwd_batch::BatchDelay;
32+
2933
use lightning::chain;
3034
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
3135
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
@@ -328,7 +332,7 @@ macro_rules! define_run_body {
328332
$peer_manager: ident, $gossip_sync: ident,
329333
$process_sweeper: expr,
330334
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
331-
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
335+
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $batch_delay: expr,
332336
) => { {
333337
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
334338
$channel_manager.get_cm().timer_tick_occurred();
@@ -345,6 +349,9 @@ macro_rules! define_run_body {
345349
let mut have_pruned = false;
346350
let mut have_decayed_scorer = false;
347351

352+
let mut cur_batch_delay = $batch_delay.get();
353+
let mut last_forwards_processing_call = $get_timer(cur_batch_delay);
354+
348355
loop {
349356
$process_channel_manager_events;
350357
$process_chain_monitor_events;
@@ -369,6 +376,21 @@ macro_rules! define_run_body {
369376
break;
370377
}
371378

379+
if $timer_elapsed(&mut last_forwards_processing_call, cur_batch_delay) {
380+
if $channel_manager.get_cm().is_pending_htlc_processing() {
381+
$channel_manager.get_cm().process_pending_htlc_forwards();
382+
}
383+
cur_batch_delay = $batch_delay.next();
384+
last_forwards_processing_call = $get_timer(cur_batch_delay);
385+
}
386+
387+
// Checke whether to exit the loop again, as some time might have passed since we
388+
// checked above.
389+
if $loop_exit_check {
390+
log_trace!($logger, "Terminating background processor.");
391+
break;
392+
}
393+
372394
// We wait up to 100ms, but track how long it takes to detect being put to sleep,
373395
// see `await_start`'s use below.
374396
let mut await_start = None;
@@ -523,12 +545,14 @@ pub(crate) mod futures_util {
523545
C: Future<Output = ()> + Unpin,
524546
D: Future<Output = ()> + Unpin,
525547
E: Future<Output = bool> + Unpin,
548+
F: Future<Output = bool> + Unpin,
526549
> {
527550
pub a: A,
528551
pub b: B,
529552
pub c: C,
530553
pub d: D,
531554
pub e: E,
555+
pub f: F,
532556
}
533557

534558
pub(crate) enum SelectorOutput {
@@ -537,6 +561,7 @@ pub(crate) mod futures_util {
537561
C,
538562
D,
539563
E(bool),
564+
F(bool),
540565
}
541566

542567
impl<
@@ -545,7 +570,8 @@ pub(crate) mod futures_util {
545570
C: Future<Output = ()> + Unpin,
546571
D: Future<Output = ()> + Unpin,
547572
E: Future<Output = bool> + Unpin,
548-
> Future for Selector<A, B, C, D, E>
573+
F: Future<Output = bool> + Unpin,
574+
> Future for Selector<A, B, C, D, E, F>
549575
{
550576
type Output = SelectorOutput;
551577
fn poll(
@@ -581,6 +607,12 @@ pub(crate) mod futures_util {
581607
},
582608
Poll::Pending => {},
583609
}
610+
match Pin::new(&mut self.f).poll(ctx) {
611+
Poll::Ready(res) => {
612+
return Poll::Ready(SelectorOutput::F(res));
613+
},
614+
Poll::Pending => {},
615+
}
584616
Poll::Pending
585617
}
586618
}
@@ -863,6 +895,7 @@ where
863895
event_handler(event).await
864896
})
865897
};
898+
let mut batch_delay = BatchDelay::new();
866899
define_run_body!(
867900
persister,
868901
chain_monitor,
@@ -901,7 +934,12 @@ where
901934
b: chain_monitor.get_update_future(),
902935
c: om_fut,
903936
d: lm_fut,
904-
e: sleeper(if mobile_interruptable_platform {
937+
e: sleeper(if channel_manager.get_cm().is_pending_htlc_processing() {
938+
batch_delay.get()
939+
} else {
940+
Duration::MAX
941+
}),
942+
f: sleeper(if mobile_interruptable_platform {
905943
Duration::from_millis(100)
906944
} else {
907945
FASTEST_TIMER
@@ -912,6 +950,9 @@ where
912950
SelectorOutput::E(exit) => {
913951
should_break = exit;
914952
},
953+
SelectorOutput::F(exit) => {
954+
should_break = exit;
955+
},
915956
}
916957
},
917958
|t| sleeper(t),
@@ -928,6 +969,7 @@ where
928969
},
929970
mobile_interruptable_platform,
930971
fetch_time,
972+
batch_delay,
931973
)
932974
}
933975

@@ -1051,6 +1093,7 @@ impl BackgroundProcessor {
10511093
}
10521094
event_handler.handle_event(event)
10531095
};
1096+
let mut batch_delay = BatchDelay::new();
10541097
define_run_body!(
10551098
persister,
10561099
chain_monitor,
@@ -1094,7 +1137,13 @@ impl BackgroundProcessor {
10941137
&chain_monitor.get_update_future(),
10951138
),
10961139
};
1097-
sleeper.wait_timeout(Duration::from_millis(100));
1140+
let batch_delay = if channel_manager.get_cm().is_pending_htlc_processing() {
1141+
batch_delay.get()
1142+
} else {
1143+
Duration::MAX
1144+
};
1145+
let fastest_timeout = batch_delay.min(Duration::from_millis(100));
1146+
sleeper.wait_timeout(fastest_timeout);
10981147
},
10991148
|_| Instant::now(),
11001149
|time: &Instant, dur| time.elapsed() > dur,
@@ -1107,6 +1156,7 @@ impl BackgroundProcessor {
11071156
.expect("Time should be sometime after 1970"),
11081157
)
11091158
},
1159+
batch_delay,
11101160
)
11111161
});
11121162
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }

lightning/src/ln/channelmanager.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6317,6 +6317,15 @@ where
63176317
}
63186318
}
63196319

6320+
/// Returns whether we have pending HTLC forwards that need to be processed via
6321+
/// [`Self::process_pending_htlc_forwards`].
6322+
pub fn is_pending_htlc_processing(&self) -> bool {
6323+
let has_forward_htlcs = !self.forward_htlcs.lock().unwrap().is_empty();
6324+
let has_decode_update_add_htlcs = !self.decode_update_add_htlcs.lock().unwrap().is_empty();
6325+
let has_outbound_needing_abandon = self.pending_outbound_payments.needs_abandon();
6326+
has_forward_htlcs || has_decode_update_add_htlcs || has_outbound_needing_abandon
6327+
}
6328+
63206329
/// Processes HTLCs which are pending waiting on random forward delay.
63216330
///
63226331
/// Should only really ever be called in response to a PendingHTLCsForwardable event.

0 commit comments

Comments
 (0)