Skip to content

Commit 9b1d570

Browse files
committed
Pass the current time through ScoreUpDate methods
In the coming commits, we'll stop relying on fetching the time during routefetching, preferring to decay score data in the background instead. The first step towards this - passing the current time through into the scorer when updating.
1 parent 6efb2e8 commit 9b1d570

File tree

4 files changed

+105
-88
lines changed

4 files changed

+105
-88
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -239,30 +239,30 @@ fn handle_network_graph_update<L: Deref>(
239239
/// Updates scorer based on event and returns whether an update occurred so we can decide whether
240240
/// to persist.
241241
fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
242-
scorer: &'a S, event: &Event
242+
scorer: &'a S, event: &Event, duration_since_epoch: Duration,
243243
) -> bool {
244244
match event {
245245
Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
246246
let mut score = scorer.write_lock();
247-
score.payment_path_failed(path, *scid);
247+
score.payment_path_failed(path, *scid, duration_since_epoch);
248248
},
249249
Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
250250
// Reached if the destination explicitly failed it back. We treat this as a successful probe
251251
// because the payment made it all the way to the destination with sufficient liquidity.
252252
let mut score = scorer.write_lock();
253-
score.probe_successful(path);
253+
score.probe_successful(path, duration_since_epoch);
254254
},
255255
Event::PaymentPathSuccessful { path, .. } => {
256256
let mut score = scorer.write_lock();
257-
score.payment_path_successful(path);
257+
score.payment_path_successful(path, duration_since_epoch);
258258
},
259259
Event::ProbeSuccessful { path, .. } => {
260260
let mut score = scorer.write_lock();
261-
score.probe_successful(path);
261+
score.probe_successful(path, duration_since_epoch);
262262
},
263263
Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
264264
let mut score = scorer.write_lock();
265-
score.probe_failed(path, *scid);
265+
score.probe_failed(path, *scid, duration_since_epoch);
266266
},
267267
_ => return false,
268268
}
@@ -274,7 +274,7 @@ macro_rules! define_run_body {
274274
$channel_manager: ident, $process_channel_manager_events: expr,
275275
$gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
276276
$loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr,
277-
$check_slow_await: expr)
277+
$check_slow_await: expr, $time_fetch: expr)
278278
=> { {
279279
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
280280
$channel_manager.timer_tick_occurred();
@@ -370,11 +370,10 @@ macro_rules! define_run_body {
370370
if should_prune {
371371
// The network graph must not be pruned while rapid sync completion is pending
372372
if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
373-
#[cfg(feature = "std")] {
373+
if let Some(duration_since_epoch) = $time_fetch() {
374374
log_trace!($logger, "Pruning and persisting network graph.");
375-
network_graph.remove_stale_channels_and_tracking();
376-
}
377-
#[cfg(not(feature = "std"))] {
375+
network_graph.remove_stale_channels_and_tracking_with_time(duration_since_epoch.as_secs());
376+
} else {
378377
log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
379378
log_trace!($logger, "Persisting network graph.");
380379
}
@@ -497,12 +496,16 @@ use core::task;
497496
/// are unsure, you should set the flag, as the performance impact of it is minimal unless there
498497
/// are hundreds or thousands of simultaneous process calls running.
499498
///
499+
/// The `fetch_time` parameter should return the current wall clock time, if one is available. If
500+
/// no time is available, some features may be disabled, however the node will still operate fine.
501+
///
500502
/// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
501503
/// could setup `process_events_async` like this:
502504
/// ```
503505
/// # use lightning::io;
504506
/// # use std::sync::{Arc, RwLock};
505507
/// # use std::sync::atomic::{AtomicBool, Ordering};
508+
/// # use std::time::SystemTime;
506509
/// # use lightning_background_processor::{process_events_async, GossipSync};
507510
/// # struct MyStore {}
508511
/// # impl lightning::util::persist::KVStore for MyStore {
@@ -571,6 +574,7 @@ use core::task;
571574
/// Some(background_scorer),
572575
/// sleeper,
573576
/// mobile_interruptable_platform,
577+
/// || Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap())
574578
/// )
575579
/// .await
576580
/// .expect("Failed to process events");
@@ -608,11 +612,12 @@ pub async fn process_events_async<
608612
S: 'static + Deref<Target = SC> + Send + Sync,
609613
SC: for<'b> WriteableScore<'b>,
610614
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
611-
Sleeper: Fn(Duration) -> SleepFuture
615+
Sleeper: Fn(Duration) -> SleepFuture,
616+
FetchTime: Fn() -> Option<Duration>,
612617
>(
613618
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
614619
gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
615-
sleeper: Sleeper, mobile_interruptable_platform: bool,
620+
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
616621
) -> Result<(), lightning::io::Error>
617622
where
618623
UL::Target: 'static + UtxoLookup,
@@ -635,15 +640,18 @@ where
635640
let scorer = &scorer;
636641
let logger = &logger;
637642
let persister = &persister;
643+
let fetch_time = &fetch_time;
638644
async move {
639645
if let Some(network_graph) = network_graph {
640646
handle_network_graph_update(network_graph, &event)
641647
}
642648
if let Some(ref scorer) = scorer {
643-
if update_scorer(scorer, &event) {
644-
log_trace!(logger, "Persisting scorer after update");
645-
if let Err(e) = persister.persist_scorer(&scorer) {
646-
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
649+
if let Some(duration_since_epoch) = fetch_time() {
650+
if update_scorer(scorer, &event, duration_since_epoch) {
651+
log_trace!(logger, "Persisting scorer after update");
652+
if let Err(e) = persister.persist_scorer(&scorer) {
653+
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
654+
}
647655
}
648656
}
649657
}
@@ -673,7 +681,7 @@ where
673681
task::Poll::Ready(exit) => { should_break = exit; true },
674682
task::Poll::Pending => false,
675683
}
676-
}, mobile_interruptable_platform)
684+
}, mobile_interruptable_platform, fetch_time)
677685
}
678686

679687
#[cfg(feature = "std")]
@@ -773,7 +781,10 @@ impl BackgroundProcessor {
773781
handle_network_graph_update(network_graph, &event)
774782
}
775783
if let Some(ref scorer) = scorer {
776-
if update_scorer(scorer, &event) {
784+
use std::time::SystemTime;
785+
let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
786+
.expect("Time should be sometime after 1970");
787+
if update_scorer(scorer, &event, duration_since_epoch) {
777788
log_trace!(logger, "Persisting scorer after update");
778789
if let Err(e) = persister.persist_scorer(&scorer) {
779790
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
@@ -789,7 +800,12 @@ impl BackgroundProcessor {
789800
channel_manager.get_event_or_persistence_needed_future(),
790801
chain_monitor.get_update_future()
791802
).wait_timeout(Duration::from_millis(100)); },
792-
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false)
803+
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false,
804+
|| {
805+
use std::time::SystemTime;
806+
Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
807+
.expect("Time should be sometime after 1970"))
808+
})
793809
});
794810
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
795811
}
@@ -1076,7 +1092,7 @@ mod tests {
10761092
}
10771093

10781094
impl ScoreUpdate for TestScorer {
1079-
fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64) {
1095+
fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64, _: Duration) {
10801096
if let Some(expectations) = &mut self.event_expectations {
10811097
match expectations.pop_front().unwrap() {
10821098
TestResult::PaymentFailure { path, short_channel_id } => {
@@ -1096,7 +1112,7 @@ mod tests {
10961112
}
10971113
}
10981114

1099-
fn payment_path_successful(&mut self, actual_path: &Path) {
1115+
fn payment_path_successful(&mut self, actual_path: &Path, _: Duration) {
11001116
if let Some(expectations) = &mut self.event_expectations {
11011117
match expectations.pop_front().unwrap() {
11021118
TestResult::PaymentFailure { path, .. } => {
@@ -1115,7 +1131,7 @@ mod tests {
11151131
}
11161132
}
11171133

1118-
fn probe_failed(&mut self, actual_path: &Path, _: u64) {
1134+
fn probe_failed(&mut self, actual_path: &Path, _: u64, _: Duration) {
11191135
if let Some(expectations) = &mut self.event_expectations {
11201136
match expectations.pop_front().unwrap() {
11211137
TestResult::PaymentFailure { path, .. } => {
@@ -1133,7 +1149,7 @@ mod tests {
11331149
}
11341150
}
11351151
}
1136-
fn probe_successful(&mut self, actual_path: &Path) {
1152+
fn probe_successful(&mut self, actual_path: &Path, _: Duration) {
11371153
if let Some(expectations) = &mut self.event_expectations {
11381154
match expectations.pop_front().unwrap() {
11391155
TestResult::PaymentFailure { path, .. } => {
@@ -1424,7 +1440,7 @@ mod tests {
14241440
tokio::time::sleep(dur).await;
14251441
false // Never exit
14261442
})
1427-
}, false,
1443+
}, false, || Some(Duration::ZERO),
14281444
);
14291445
match bp_future.await {
14301446
Ok(_) => panic!("Expected error persisting manager"),
@@ -1654,7 +1670,7 @@ mod tests {
16541670
_ = exit_receiver.changed() => true,
16551671
}
16561672
})
1657-
}, false,
1673+
}, false, || Some(Duration::from_secs(1696300000)),
16581674
);
16591675

16601676
let t1 = tokio::spawn(bp_future);
@@ -1829,7 +1845,7 @@ mod tests {
18291845
_ = exit_receiver.changed() => true,
18301846
}
18311847
})
1832-
}, false,
1848+
}, false, || Some(Duration::ZERO),
18331849
);
18341850
let t1 = tokio::spawn(bp_future);
18351851
let t2 = tokio::spawn(async move {

lightning/src/routing/router.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7919,6 +7919,7 @@ mod tests {
79197919
pub(crate) mod bench_utils {
79207920
use super::*;
79217921
use std::fs::File;
7922+
use std::time::Duration;
79227923

79237924
use bitcoin::hashes::Hash;
79247925
use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
@@ -8067,10 +8068,10 @@ pub(crate) mod bench_utils {
80678068
if let Ok(route) = route_res {
80688069
for path in route.paths {
80698070
if seed & 0x80 == 0 {
8070-
scorer.payment_path_successful(&path);
8071+
scorer.payment_path_successful(&path, Duration::ZERO);
80718072
} else {
80728073
let short_channel_id = path.hops[path.hops.len() / 2].short_channel_id;
8073-
scorer.payment_path_failed(&path, short_channel_id);
8074+
scorer.payment_path_failed(&path, short_channel_id, Duration::ZERO);
80748075
}
80758076
seed = seed.overflowing_mul(6364136223846793005).0.overflowing_add(1).0;
80768077
}

0 commit comments

Comments
 (0)