Skip to content

Commit 04425f9

Browse files
committed
Pass the current time through ScoreUpDate methods
In the coming commits, we'll stop relying on fetching the time during routefetching, preferring to decay score data in the background instead. The first step towards this - passing the current time through into the scorer when updating.
1 parent de41ecd commit 04425f9

File tree

4 files changed

+105
-88
lines changed

4 files changed

+105
-88
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -245,30 +245,30 @@ fn handle_network_graph_update<L: Deref>(
245245
/// Updates scorer based on event and returns whether an update occurred so we can decide whether
246246
/// to persist.
247247
fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
248-
scorer: &'a S, event: &Event
248+
scorer: &'a S, event: &Event, duration_since_epoch: Duration,
249249
) -> bool {
250250
match event {
251251
Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
252252
let mut score = scorer.write_lock();
253-
score.payment_path_failed(path, *scid);
253+
score.payment_path_failed(path, *scid, duration_since_epoch);
254254
},
255255
Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
256256
// Reached if the destination explicitly failed it back. We treat this as a successful probe
257257
// because the payment made it all the way to the destination with sufficient liquidity.
258258
let mut score = scorer.write_lock();
259-
score.probe_successful(path);
259+
score.probe_successful(path, duration_since_epoch);
260260
},
261261
Event::PaymentPathSuccessful { path, .. } => {
262262
let mut score = scorer.write_lock();
263-
score.payment_path_successful(path);
263+
score.payment_path_successful(path, duration_since_epoch);
264264
},
265265
Event::ProbeSuccessful { path, .. } => {
266266
let mut score = scorer.write_lock();
267-
score.probe_successful(path);
267+
score.probe_successful(path, duration_since_epoch);
268268
},
269269
Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
270270
let mut score = scorer.write_lock();
271-
score.probe_failed(path, *scid);
271+
score.probe_failed(path, *scid, duration_since_epoch);
272272
},
273273
_ => return false,
274274
}
@@ -281,7 +281,7 @@ macro_rules! define_run_body {
281281
$channel_manager: ident, $process_channel_manager_events: expr,
282282
$peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident,
283283
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
284-
$timer_elapsed: expr, $check_slow_await: expr
284+
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
285285
) => { {
286286
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
287287
$channel_manager.timer_tick_occurred();
@@ -384,11 +384,10 @@ macro_rules! define_run_body {
384384
if should_prune {
385385
// The network graph must not be pruned while rapid sync completion is pending
386386
if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
387-
#[cfg(feature = "std")] {
387+
if let Some(duration_since_epoch) = $time_fetch() {
388388
log_trace!($logger, "Pruning and persisting network graph.");
389-
network_graph.remove_stale_channels_and_tracking();
390-
}
391-
#[cfg(not(feature = "std"))] {
389+
network_graph.remove_stale_channels_and_tracking_with_time(duration_since_epoch.as_secs());
390+
} else {
392391
log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
393392
log_trace!($logger, "Persisting network graph.");
394393
}
@@ -511,12 +510,16 @@ use core::task;
511510
/// are unsure, you should set the flag, as the performance impact of it is minimal unless there
512511
/// are hundreds or thousands of simultaneous process calls running.
513512
///
513+
/// The `fetch_time` parameter should return the current wall clock time, if one is available. If
514+
/// no time is available, some features may be disabled, however the node will still operate fine.
515+
///
514516
/// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
515517
/// could setup `process_events_async` like this:
516518
/// ```
517519
/// # use lightning::io;
518520
/// # use std::sync::{Arc, RwLock};
519521
/// # use std::sync::atomic::{AtomicBool, Ordering};
522+
/// # use std::time::SystemTime;
520523
/// # use lightning_background_processor::{process_events_async, GossipSync};
521524
/// # struct MyStore {}
522525
/// # impl lightning::util::persist::KVStore for MyStore {
@@ -585,6 +588,7 @@ use core::task;
585588
/// Some(background_scorer),
586589
/// sleeper,
587590
/// mobile_interruptable_platform,
591+
/// || Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap())
588592
/// )
589593
/// .await
590594
/// .expect("Failed to process events");
@@ -621,11 +625,12 @@ pub async fn process_events_async<
621625
S: 'static + Deref<Target = SC> + Send + Sync,
622626
SC: for<'b> WriteableScore<'b>,
623627
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
624-
Sleeper: Fn(Duration) -> SleepFuture
628+
Sleeper: Fn(Duration) -> SleepFuture,
629+
FetchTime: Fn() -> Option<Duration>,
625630
>(
626631
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
627632
gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
628-
sleeper: Sleeper, mobile_interruptable_platform: bool,
633+
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
629634
) -> Result<(), lightning::io::Error>
630635
where
631636
UL::Target: 'static + UtxoLookup,
@@ -649,15 +654,18 @@ where
649654
let scorer = &scorer;
650655
let logger = &logger;
651656
let persister = &persister;
657+
let fetch_time = &fetch_time;
652658
async move {
653659
if let Some(network_graph) = network_graph {
654660
handle_network_graph_update(network_graph, &event)
655661
}
656662
if let Some(ref scorer) = scorer {
657-
if update_scorer(scorer, &event) {
658-
log_trace!(logger, "Persisting scorer after update");
659-
if let Err(e) = persister.persist_scorer(&scorer) {
660-
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
663+
if let Some(duration_since_epoch) = fetch_time() {
664+
if update_scorer(scorer, &event, duration_since_epoch) {
665+
log_trace!(logger, "Persisting scorer after update");
666+
if let Err(e) = persister.persist_scorer(&scorer) {
667+
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
668+
}
661669
}
662670
}
663671
}
@@ -689,7 +697,7 @@ where
689697
task::Poll::Ready(exit) => { should_break = exit; true },
690698
task::Poll::Pending => false,
691699
}
692-
}, mobile_interruptable_platform
700+
}, mobile_interruptable_platform, fetch_time,
693701
)
694702
}
695703

@@ -811,7 +819,10 @@ impl BackgroundProcessor {
811819
handle_network_graph_update(network_graph, &event)
812820
}
813821
if let Some(ref scorer) = scorer {
814-
if update_scorer(scorer, &event) {
822+
use std::time::SystemTime;
823+
let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
824+
.expect("Time should be sometime after 1970");
825+
if update_scorer(scorer, &event, duration_since_epoch) {
815826
log_trace!(logger, "Persisting scorer after update");
816827
if let Err(e) = persister.persist_scorer(&scorer) {
817828
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
@@ -830,7 +841,12 @@ impl BackgroundProcessor {
830841
channel_manager.get_event_or_persistence_needed_future(),
831842
chain_monitor.get_update_future()
832843
).wait_timeout(Duration::from_millis(100)); },
833-
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false
844+
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false,
845+
|| {
846+
use std::time::SystemTime;
847+
Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
848+
.expect("Time should be sometime after 1970"))
849+
},
834850
)
835851
});
836852
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
@@ -1118,7 +1134,7 @@ mod tests {
11181134
}
11191135

11201136
impl ScoreUpdate for TestScorer {
1121-
fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64) {
1137+
fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64, _: Duration) {
11221138
if let Some(expectations) = &mut self.event_expectations {
11231139
match expectations.pop_front().unwrap() {
11241140
TestResult::PaymentFailure { path, short_channel_id } => {
@@ -1138,7 +1154,7 @@ mod tests {
11381154
}
11391155
}
11401156

1141-
fn payment_path_successful(&mut self, actual_path: &Path) {
1157+
fn payment_path_successful(&mut self, actual_path: &Path, _: Duration) {
11421158
if let Some(expectations) = &mut self.event_expectations {
11431159
match expectations.pop_front().unwrap() {
11441160
TestResult::PaymentFailure { path, .. } => {
@@ -1157,7 +1173,7 @@ mod tests {
11571173
}
11581174
}
11591175

1160-
fn probe_failed(&mut self, actual_path: &Path, _: u64) {
1176+
fn probe_failed(&mut self, actual_path: &Path, _: u64, _: Duration) {
11611177
if let Some(expectations) = &mut self.event_expectations {
11621178
match expectations.pop_front().unwrap() {
11631179
TestResult::PaymentFailure { path, .. } => {
@@ -1175,7 +1191,7 @@ mod tests {
11751191
}
11761192
}
11771193
}
1178-
fn probe_successful(&mut self, actual_path: &Path) {
1194+
fn probe_successful(&mut self, actual_path: &Path, _: Duration) {
11791195
if let Some(expectations) = &mut self.event_expectations {
11801196
match expectations.pop_front().unwrap() {
11811197
TestResult::PaymentFailure { path, .. } => {
@@ -1470,7 +1486,7 @@ mod tests {
14701486
tokio::time::sleep(dur).await;
14711487
false // Never exit
14721488
})
1473-
}, false,
1489+
}, false, || Some(Duration::ZERO),
14741490
);
14751491
match bp_future.await {
14761492
Ok(_) => panic!("Expected error persisting manager"),
@@ -1700,7 +1716,7 @@ mod tests {
17001716
_ = exit_receiver.changed() => true,
17011717
}
17021718
})
1703-
}, false,
1719+
}, false, || Some(Duration::from_secs(1696300000)),
17041720
);
17051721

17061722
let t1 = tokio::spawn(bp_future);
@@ -1875,7 +1891,7 @@ mod tests {
18751891
_ = exit_receiver.changed() => true,
18761892
}
18771893
})
1878-
}, false,
1894+
}, false, || Some(Duration::ZERO),
18791895
);
18801896
let t1 = tokio::spawn(bp_future);
18811897
let t2 = tokio::spawn(async move {

lightning/src/routing/router.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8160,6 +8160,7 @@ mod tests {
81608160
pub(crate) mod bench_utils {
81618161
use super::*;
81628162
use std::fs::File;
8163+
use std::time::Duration;
81638164

81648165
use bitcoin::hashes::Hash;
81658166
use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
@@ -8308,10 +8309,10 @@ pub(crate) mod bench_utils {
83088309
if let Ok(route) = route_res {
83098310
for path in route.paths {
83108311
if seed & 0x80 == 0 {
8311-
scorer.payment_path_successful(&path);
8312+
scorer.payment_path_successful(&path, Duration::ZERO);
83128313
} else {
83138314
let short_channel_id = path.hops[path.hops.len() / 2].short_channel_id;
8314-
scorer.payment_path_failed(&path, short_channel_id);
8315+
scorer.payment_path_failed(&path, short_channel_id, Duration::ZERO);
83158316
}
83168317
seed = seed.overflowing_mul(6364136223846793005).0.overflowing_add(1).0;
83178318
}

0 commit comments

Comments
 (0)