Skip to content

Commit 6c366cf

Browse files
committed
Pass the current time through ScoreUpDate methods
In the coming commits, we'll stop relying on fetching the time during routefetching, preferring to decay score data in the background instead. The first step towards this - passing the current time through into the scorer when updating.
1 parent 6471eb0 commit 6c366cf

File tree

4 files changed

+105
-88
lines changed

4 files changed

+105
-88
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -244,30 +244,30 @@ fn handle_network_graph_update<L: Deref>(
244244
/// Updates scorer based on event and returns whether an update occurred so we can decide whether
245245
/// to persist.
246246
fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
247-
scorer: &'a S, event: &Event
247+
scorer: &'a S, event: &Event, duration_since_epoch: Duration,
248248
) -> bool {
249249
match event {
250250
Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
251251
let mut score = scorer.write_lock();
252-
score.payment_path_failed(path, *scid);
252+
score.payment_path_failed(path, *scid, duration_since_epoch);
253253
},
254254
Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
255255
// Reached if the destination explicitly failed it back. We treat this as a successful probe
256256
// because the payment made it all the way to the destination with sufficient liquidity.
257257
let mut score = scorer.write_lock();
258-
score.probe_successful(path);
258+
score.probe_successful(path, duration_since_epoch);
259259
},
260260
Event::PaymentPathSuccessful { path, .. } => {
261261
let mut score = scorer.write_lock();
262-
score.payment_path_successful(path);
262+
score.payment_path_successful(path, duration_since_epoch);
263263
},
264264
Event::ProbeSuccessful { path, .. } => {
265265
let mut score = scorer.write_lock();
266-
score.probe_successful(path);
266+
score.probe_successful(path, duration_since_epoch);
267267
},
268268
Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
269269
let mut score = scorer.write_lock();
270-
score.probe_failed(path, *scid);
270+
score.probe_failed(path, *scid, duration_since_epoch);
271271
},
272272
_ => return false,
273273
}
@@ -280,7 +280,7 @@ macro_rules! define_run_body {
280280
$channel_manager: ident, $process_channel_manager_events: expr,
281281
$peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident,
282282
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
283-
$timer_elapsed: expr, $check_slow_await: expr
283+
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
284284
) => { {
285285
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
286286
$channel_manager.timer_tick_occurred();
@@ -383,11 +383,10 @@ macro_rules! define_run_body {
383383
if should_prune {
384384
// The network graph must not be pruned while rapid sync completion is pending
385385
if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
386-
#[cfg(feature = "std")] {
386+
if let Some(duration_since_epoch) = $time_fetch() {
387387
log_trace!($logger, "Pruning and persisting network graph.");
388-
network_graph.remove_stale_channels_and_tracking();
389-
}
390-
#[cfg(not(feature = "std"))] {
388+
network_graph.remove_stale_channels_and_tracking_with_time(duration_since_epoch.as_secs());
389+
} else {
391390
log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
392391
log_trace!($logger, "Persisting network graph.");
393392
}
@@ -510,12 +509,16 @@ use core::task;
510509
/// are unsure, you should set the flag, as the performance impact of it is minimal unless there
511510
/// are hundreds or thousands of simultaneous process calls running.
512511
///
512+
/// The `fetch_time` parameter should return the current wall clock time, if one is available. If
513+
/// no time is available, some features may be disabled, however the node will still operate fine.
514+
///
513515
/// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
514516
/// could setup `process_events_async` like this:
515517
/// ```
516518
/// # use lightning::io;
517519
/// # use std::sync::{Arc, RwLock};
518520
/// # use std::sync::atomic::{AtomicBool, Ordering};
521+
/// # use std::time::SystemTime;
519522
/// # use lightning_background_processor::{process_events_async, GossipSync};
520523
/// # struct MyStore {}
521524
/// # impl lightning::util::persist::KVStore for MyStore {
@@ -584,6 +587,7 @@ use core::task;
584587
/// Some(background_scorer),
585588
/// sleeper,
586589
/// mobile_interruptable_platform,
590+
/// || Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap())
587591
/// )
588592
/// .await
589593
/// .expect("Failed to process events");
@@ -620,11 +624,12 @@ pub async fn process_events_async<
620624
S: 'static + Deref<Target = SC> + Send + Sync,
621625
SC: for<'b> WriteableScore<'b>,
622626
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
623-
Sleeper: Fn(Duration) -> SleepFuture
627+
Sleeper: Fn(Duration) -> SleepFuture,
628+
FetchTime: Fn() -> Option<Duration>,
624629
>(
625630
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
626631
gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
627-
sleeper: Sleeper, mobile_interruptable_platform: bool,
632+
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
628633
) -> Result<(), lightning::io::Error>
629634
where
630635
UL::Target: 'static + UtxoLookup,
@@ -648,15 +653,18 @@ where
648653
let scorer = &scorer;
649654
let logger = &logger;
650655
let persister = &persister;
656+
let fetch_time = &fetch_time;
651657
async move {
652658
if let Some(network_graph) = network_graph {
653659
handle_network_graph_update(network_graph, &event)
654660
}
655661
if let Some(ref scorer) = scorer {
656-
if update_scorer(scorer, &event) {
657-
log_trace!(logger, "Persisting scorer after update");
658-
if let Err(e) = persister.persist_scorer(&scorer) {
659-
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
662+
if let Some(duration_since_epoch) = fetch_time() {
663+
if update_scorer(scorer, &event, duration_since_epoch) {
664+
log_trace!(logger, "Persisting scorer after update");
665+
if let Err(e) = persister.persist_scorer(&scorer) {
666+
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
667+
}
660668
}
661669
}
662670
}
@@ -688,7 +696,7 @@ where
688696
task::Poll::Ready(exit) => { should_break = exit; true },
689697
task::Poll::Pending => false,
690698
}
691-
}, mobile_interruptable_platform
699+
}, mobile_interruptable_platform, fetch_time,
692700
)
693701
}
694702

@@ -810,7 +818,10 @@ impl BackgroundProcessor {
810818
handle_network_graph_update(network_graph, &event)
811819
}
812820
if let Some(ref scorer) = scorer {
813-
if update_scorer(scorer, &event) {
821+
use std::time::SystemTime;
822+
let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
823+
.expect("Time should be sometime after 1970");
824+
if update_scorer(scorer, &event, duration_since_epoch) {
814825
log_trace!(logger, "Persisting scorer after update");
815826
if let Err(e) = persister.persist_scorer(&scorer) {
816827
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
@@ -829,7 +840,12 @@ impl BackgroundProcessor {
829840
channel_manager.get_event_or_persistence_needed_future(),
830841
chain_monitor.get_update_future()
831842
).wait_timeout(Duration::from_millis(100)); },
832-
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false
843+
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false,
844+
|| {
845+
use std::time::SystemTime;
846+
Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
847+
.expect("Time should be sometime after 1970"))
848+
},
833849
)
834850
});
835851
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
@@ -1117,7 +1133,7 @@ mod tests {
11171133
}
11181134

11191135
impl ScoreUpdate for TestScorer {
1120-
fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64) {
1136+
fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64, _: Duration) {
11211137
if let Some(expectations) = &mut self.event_expectations {
11221138
match expectations.pop_front().unwrap() {
11231139
TestResult::PaymentFailure { path, short_channel_id } => {
@@ -1137,7 +1153,7 @@ mod tests {
11371153
}
11381154
}
11391155

1140-
fn payment_path_successful(&mut self, actual_path: &Path) {
1156+
fn payment_path_successful(&mut self, actual_path: &Path, _: Duration) {
11411157
if let Some(expectations) = &mut self.event_expectations {
11421158
match expectations.pop_front().unwrap() {
11431159
TestResult::PaymentFailure { path, .. } => {
@@ -1156,7 +1172,7 @@ mod tests {
11561172
}
11571173
}
11581174

1159-
fn probe_failed(&mut self, actual_path: &Path, _: u64) {
1175+
fn probe_failed(&mut self, actual_path: &Path, _: u64, _: Duration) {
11601176
if let Some(expectations) = &mut self.event_expectations {
11611177
match expectations.pop_front().unwrap() {
11621178
TestResult::PaymentFailure { path, .. } => {
@@ -1174,7 +1190,7 @@ mod tests {
11741190
}
11751191
}
11761192
}
1177-
fn probe_successful(&mut self, actual_path: &Path) {
1193+
fn probe_successful(&mut self, actual_path: &Path, _: Duration) {
11781194
if let Some(expectations) = &mut self.event_expectations {
11791195
match expectations.pop_front().unwrap() {
11801196
TestResult::PaymentFailure { path, .. } => {
@@ -1469,7 +1485,7 @@ mod tests {
14691485
tokio::time::sleep(dur).await;
14701486
false // Never exit
14711487
})
1472-
}, false,
1488+
}, false, || Some(Duration::ZERO),
14731489
);
14741490
match bp_future.await {
14751491
Ok(_) => panic!("Expected error persisting manager"),
@@ -1699,7 +1715,7 @@ mod tests {
16991715
_ = exit_receiver.changed() => true,
17001716
}
17011717
})
1702-
}, false,
1718+
}, false, || Some(Duration::from_secs(1696300000)),
17031719
);
17041720

17051721
let t1 = tokio::spawn(bp_future);
@@ -1874,7 +1890,7 @@ mod tests {
18741890
_ = exit_receiver.changed() => true,
18751891
}
18761892
})
1877-
}, false,
1893+
}, false, || Some(Duration::ZERO),
18781894
);
18791895
let t1 = tokio::spawn(bp_future);
18801896
let t2 = tokio::spawn(async move {

lightning/src/routing/router.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8160,6 +8160,7 @@ mod tests {
81608160
pub(crate) mod bench_utils {
81618161
use super::*;
81628162
use std::fs::File;
8163+
use std::time::Duration;
81638164

81648165
use bitcoin::hashes::Hash;
81658166
use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
@@ -8308,10 +8309,10 @@ pub(crate) mod bench_utils {
83088309
if let Ok(route) = route_res {
83098310
for path in route.paths {
83108311
if seed & 0x80 == 0 {
8311-
scorer.payment_path_successful(&path);
8312+
scorer.payment_path_successful(&path, Duration::ZERO);
83128313
} else {
83138314
let short_channel_id = path.hops[path.hops.len() / 2].short_channel_id;
8314-
scorer.payment_path_failed(&path, short_channel_id);
8315+
scorer.payment_path_failed(&path, short_channel_id, Duration::ZERO);
83158316
}
83168317
seed = seed.overflowing_mul(6364136223846793005).0.overflowing_add(1).0;
83178318
}

0 commit comments

Comments
 (0)