Skip to content

Commit 9d71d3a

Browse files
authored
Merge pull request #449 from joostjager/import-scores
Periodical external pathfinding scores merge
2 parents dd51908 + 52705d3 commit 9d71d3a

File tree

7 files changed

+258
-14
lines changed

7 files changed

+258
-14
lines changed

bindings/ldk_node.udl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ interface Builder {
8181
void set_chain_source_bitcoind_rest(string rest_host, u16 rest_port, string rpc_host, u16 rpc_port, string rpc_user, string rpc_password);
8282
void set_gossip_source_p2p();
8383
void set_gossip_source_rgs(string rgs_server_url);
84+
void set_pathfinding_scores_source(string url);
8485
void set_liquidity_source_lsps1(PublicKey node_id, SocketAddress address, string? token);
8586
void set_liquidity_source_lsps2(PublicKey node_id, SocketAddress address, string? token);
8687
void set_storage_dir_path(string storage_dir_path);
@@ -330,6 +331,7 @@ dictionary NodeStatus {
330331
u64? latest_onchain_wallet_sync_timestamp;
331332
u64? latest_fee_rate_cache_update_timestamp;
332333
u64? latest_rgs_snapshot_timestamp;
334+
u64? latest_pathfinding_scores_sync_timestamp;
333335
u64? latest_node_announcement_broadcast_timestamp;
334336
u32? latest_channel_monitor_archival_height;
335337
};

src/builder.rs

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ use lightning::io::Cursor;
2424
use lightning::ln::channelmanager::{self, ChainParameters, ChannelManagerReadArgs};
2525
use lightning::ln::msgs::{RoutingMessageHandler, SocketAddress};
2626
use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler};
27+
use lightning::log_trace;
2728
use lightning::routing::gossip::NodeAlias;
2829
use lightning::routing::router::DefaultRouter;
2930
use lightning::routing::scoring::{
30-
ProbabilisticScorer, ProbabilisticScoringDecayParameters, ProbabilisticScoringFeeParameters,
31+
CombinedScorer, ProbabilisticScorer, ProbabilisticScoringDecayParameters,
32+
ProbabilisticScoringFeeParameters,
3133
};
3234
use lightning::sign::{EntropySource, NodeSigner};
3335
use lightning::util::persist::{
@@ -50,7 +52,9 @@ use crate::event::EventQueue;
5052
use crate::fee_estimator::OnchainFeeEstimator;
5153
use crate::gossip::GossipSource;
5254
use crate::io::sqlite_store::SqliteStore;
53-
use crate::io::utils::{read_node_metrics, write_node_metrics};
55+
use crate::io::utils::{
56+
read_external_pathfinding_scores_from_cache, read_node_metrics, write_node_metrics,
57+
};
5458
use crate::io::vss_store::VssStore;
5559
use crate::io::{
5660
self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
@@ -110,6 +114,11 @@ enum GossipSourceConfig {
110114
RapidGossipSync(String),
111115
}
112116

117+
#[derive(Debug, Clone)]
118+
struct PathfindingScoresSyncConfig {
119+
url: String,
120+
}
121+
113122
#[derive(Debug, Clone, Default)]
114123
struct LiquiditySourceConfig {
115124
// Act as an LSPS1 client connecting to the given service.
@@ -243,6 +252,7 @@ pub struct NodeBuilder {
243252
log_writer_config: Option<LogWriterConfig>,
244253
async_payments_role: Option<AsyncPaymentsRole>,
245254
runtime_handle: Option<tokio::runtime::Handle>,
255+
pathfinding_scores_sync_config: Option<PathfindingScoresSyncConfig>,
246256
}
247257

248258
impl NodeBuilder {
@@ -260,6 +270,7 @@ impl NodeBuilder {
260270
let liquidity_source_config = None;
261271
let log_writer_config = None;
262272
let runtime_handle = None;
273+
let pathfinding_scores_sync_config = None;
263274
Self {
264275
config,
265276
entropy_source_config,
@@ -269,6 +280,7 @@ impl NodeBuilder {
269280
log_writer_config,
270281
runtime_handle,
271282
async_payments_role: None,
283+
pathfinding_scores_sync_config,
272284
}
273285
}
274286

@@ -411,6 +423,14 @@ impl NodeBuilder {
411423
self
412424
}
413425

426+
/// Configures the [`Node`] instance to source its external scores from the given URL.
427+
///
428+
/// The external scores are merged into the local scoring system to improve routing.
429+
pub fn set_pathfinding_scores_source(&mut self, url: String) -> &mut Self {
430+
self.pathfinding_scores_sync_config = Some(PathfindingScoresSyncConfig { url });
431+
self
432+
}
433+
414434
/// Configures the [`Node`] instance to source inbound liquidity from the given
415435
/// [bLIP-51 / LSPS1] service.
416436
///
@@ -718,6 +738,7 @@ impl NodeBuilder {
718738
self.chain_data_source_config.as_ref(),
719739
self.gossip_source_config.as_ref(),
720740
self.liquidity_source_config.as_ref(),
741+
self.pathfinding_scores_sync_config.as_ref(),
721742
self.async_payments_role,
722743
seed_bytes,
723744
runtime,
@@ -751,6 +772,7 @@ impl NodeBuilder {
751772
self.chain_data_source_config.as_ref(),
752773
self.gossip_source_config.as_ref(),
753774
self.liquidity_source_config.as_ref(),
775+
self.pathfinding_scores_sync_config.as_ref(),
754776
self.async_payments_role,
755777
seed_bytes,
756778
runtime,
@@ -910,6 +932,13 @@ impl ArcedNodeBuilder {
910932
self.inner.write().unwrap().set_gossip_source_rgs(rgs_server_url);
911933
}
912934

935+
/// Configures the [`Node`] instance to source its external scores from the given URL.
936+
///
937+
/// The external scores are merged into the local scoring system to improve routing.
938+
pub fn set_pathfinding_scores_source(&self, url: String) {
939+
self.inner.write().unwrap().set_pathfinding_scores_source(url);
940+
}
941+
913942
/// Configures the [`Node`] instance to source inbound liquidity from the given
914943
/// [bLIP-51 / LSPS1] service.
915944
///
@@ -1110,6 +1139,7 @@ fn build_with_store_internal(
11101139
config: Arc<Config>, chain_data_source_config: Option<&ChainDataSourceConfig>,
11111140
gossip_source_config: Option<&GossipSourceConfig>,
11121141
liquidity_source_config: Option<&LiquiditySourceConfig>,
1142+
pathfinding_scores_sync_config: Option<&PathfindingScoresSyncConfig>,
11131143
async_payments_role: Option<AsyncPaymentsRole>, seed_bytes: [u8; 64], runtime: Arc<Runtime>,
11141144
logger: Arc<Logger>, kv_store: Arc<DynStore>,
11151145
) -> Result<Node, BuildError> {
@@ -1365,26 +1395,38 @@ fn build_with_store_internal(
13651395
},
13661396
};
13671397

1368-
let scorer = match io::utils::read_scorer(
1398+
let local_scorer = match io::utils::read_scorer(
13691399
Arc::clone(&kv_store),
13701400
Arc::clone(&network_graph),
13711401
Arc::clone(&logger),
13721402
) {
1373-
Ok(scorer) => Arc::new(Mutex::new(scorer)),
1403+
Ok(scorer) => scorer,
13741404
Err(e) => {
13751405
if e.kind() == std::io::ErrorKind::NotFound {
13761406
let params = ProbabilisticScoringDecayParameters::default();
1377-
Arc::new(Mutex::new(ProbabilisticScorer::new(
1378-
params,
1379-
Arc::clone(&network_graph),
1380-
Arc::clone(&logger),
1381-
)))
1407+
ProbabilisticScorer::new(params, Arc::clone(&network_graph), Arc::clone(&logger))
13821408
} else {
13831409
return Err(BuildError::ReadFailed);
13841410
}
13851411
},
13861412
};
13871413

1414+
let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer)));
1415+
1416+
// Restore external pathfinding scores from cache if possible.
1417+
match read_external_pathfinding_scores_from_cache(Arc::clone(&kv_store), Arc::clone(&logger)) {
1418+
Ok(external_scores) => {
1419+
scorer.lock().unwrap().merge(external_scores, cur_time);
1420+
log_trace!(logger, "External scores from cache merged successfully");
1421+
},
1422+
Err(e) => {
1423+
if e.kind() != std::io::ErrorKind::NotFound {
1424+
log_error!(logger, "Error while reading external scores from cache: {}", e);
1425+
return Err(BuildError::ReadFailed);
1426+
}
1427+
},
1428+
}
1429+
13881430
let scoring_fee_params = ProbabilisticScoringFeeParameters::default();
13891431
let router = Arc::new(DefaultRouter::new(
13901432
Arc::clone(&network_graph),
@@ -1716,6 +1758,8 @@ fn build_with_store_internal(
17161758
let (background_processor_stop_sender, _) = tokio::sync::watch::channel(());
17171759
let is_running = Arc::new(RwLock::new(false));
17181760

1761+
let pathfinding_scores_sync_url = pathfinding_scores_sync_config.map(|c| c.url.clone());
1762+
17191763
Ok(Node {
17201764
runtime,
17211765
stop_sender,
@@ -1734,6 +1778,7 @@ fn build_with_store_internal(
17341778
keys_manager,
17351779
network_graph,
17361780
gossip_source,
1781+
pathfinding_scores_sync_url,
17371782
liquidity_source,
17381783
kv_store,
17391784
logger,

src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ pub(crate) const PEER_RECONNECTION_INTERVAL: Duration = Duration::from_secs(60);
6363
// The time in-between RGS sync attempts.
6464
pub(crate) const RGS_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60);
6565

66+
// The time in-between external scores sync attempts.
67+
pub(crate) const EXTERNAL_PATHFINDING_SCORES_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60);
68+
6669
// The time in-between node announcement broadcast attempts.
6770
pub(crate) const NODE_ANN_BCAST_INTERVAL: Duration = Duration::from_secs(60 * 60);
6871

@@ -93,6 +96,9 @@ pub(crate) const RGS_SYNC_TIMEOUT_SECS: u64 = 5;
9396
/// The length in bytes of our wallets' keys seed.
9497
pub const WALLET_KEYS_SEED_LEN: usize = 64;
9598

99+
// The timeout after which we abort a external scores sync operation.
100+
pub(crate) const EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS: u64 = 5;
101+
96102
#[derive(Debug, Clone)]
97103
/// Represents the configuration of an [`Node`] instance.
98104
///

src/io/utils.rs

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@ use bitcoin::Network;
2222
use lightning::io::Cursor;
2323
use lightning::ln::msgs::DecodeError;
2424
use lightning::routing::gossip::NetworkGraph;
25-
use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringDecayParameters};
25+
use lightning::routing::scoring::{
26+
ChannelLiquidities, ProbabilisticScorer, ProbabilisticScoringDecayParameters,
27+
};
2628
use lightning::util::persist::{
27-
KVStoreSync, KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN,
29+
KVStore, KVStoreSync, KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN,
2830
NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
2931
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_KEY,
3032
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
@@ -48,6 +50,8 @@ use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper};
4850
use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper};
4951
use crate::{Error, EventQueue, NodeMetrics, PaymentDetails};
5052

53+
pub const EXTERNAL_PATHFINDING_SCORES_CACHE_KEY: &str = "external_pathfinding_scores_cache";
54+
5155
/// Generates a random [BIP 39] mnemonic.
5256
///
5357
/// The result may be used to initialize the [`Node`] entropy, i.e., can be given to
@@ -164,6 +168,53 @@ where
164168
})
165169
}
166170

171+
/// Read previously persisted external pathfinding scores from the cache.
172+
pub(crate) fn read_external_pathfinding_scores_from_cache<L: Deref>(
173+
kv_store: Arc<DynStore>, logger: L,
174+
) -> Result<ChannelLiquidities, std::io::Error>
175+
where
176+
L::Target: LdkLogger,
177+
{
178+
let mut reader = Cursor::new(KVStoreSync::read(
179+
&*kv_store,
180+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
181+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
182+
EXTERNAL_PATHFINDING_SCORES_CACHE_KEY,
183+
)?);
184+
ChannelLiquidities::read(&mut reader).map_err(|e| {
185+
log_error!(logger, "Failed to deserialize scorer: {}", e);
186+
std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize Scorer")
187+
})
188+
}
189+
190+
/// Persist external pathfinding scores to the cache.
191+
pub(crate) async fn write_external_pathfinding_scores_to_cache<L: Deref>(
192+
kv_store: Arc<DynStore>, data: &ChannelLiquidities, logger: L,
193+
) -> Result<(), Error>
194+
where
195+
L::Target: LdkLogger,
196+
{
197+
KVStore::write(
198+
&*kv_store,
199+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
200+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
201+
EXTERNAL_PATHFINDING_SCORES_CACHE_KEY,
202+
data.encode(),
203+
)
204+
.await
205+
.map_err(|e| {
206+
log_error!(
207+
logger,
208+
"Writing data to key {}/{}/{} failed due to: {}",
209+
NODE_METRICS_PRIMARY_NAMESPACE,
210+
NODE_METRICS_SECONDARY_NAMESPACE,
211+
EXTERNAL_PATHFINDING_SCORES_CACHE_KEY,
212+
e
213+
);
214+
Error::PersistenceFailed
215+
})
216+
}
217+
167218
/// Read previously persisted events from the store.
168219
pub(crate) fn read_event_queue<L: Deref + Clone>(
169220
kv_store: Arc<DynStore>, logger: L,

src/lib.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ mod message_handler;
9797
pub mod payment;
9898
mod peer_store;
9999
mod runtime;
100+
mod scoring;
100101
mod tx_broadcaster;
101102
mod types;
102103
mod wallet;
@@ -106,6 +107,7 @@ use std::net::ToSocketAddrs;
106107
use std::sync::{Arc, Mutex, RwLock};
107108
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
108109

110+
use crate::scoring::setup_background_pathfinding_scores_sync;
109111
pub use balance::{BalanceDetails, LightningBalance, PendingSweepBalance};
110112
use bitcoin::secp256k1::PublicKey;
111113
#[cfg(feature = "uniffi")]
@@ -156,6 +158,7 @@ use types::{
156158
pub use types::{
157159
ChannelDetails, CustomTlvRecord, DynStore, PeerDetails, SyncAndAsyncKVStore, UserChannelId,
158160
};
161+
159162
pub use {
160163
bip39, bitcoin, lightning, lightning_invoice, lightning_liquidity, lightning_types, tokio,
161164
vss_client,
@@ -185,6 +188,7 @@ pub struct Node {
185188
keys_manager: Arc<KeysManager>,
186189
network_graph: Arc<Graph>,
187190
gossip_source: Arc<GossipSource>,
191+
pathfinding_scores_sync_url: Option<String>,
188192
liquidity_source: Option<Arc<LiquiditySource<Arc<Logger>>>>,
189193
kv_store: Arc<DynStore>,
190194
logger: Arc<Logger>,
@@ -261,7 +265,6 @@ impl Node {
261265
return;
262266
}
263267
_ = interval.tick() => {
264-
let gossip_sync_logger = Arc::clone(&gossip_sync_logger);
265268
let now = Instant::now();
266269
match gossip_source.update_rgs_snapshot().await {
267270
Ok(updated_timestamp) => {
@@ -293,6 +296,18 @@ impl Node {
293296
});
294297
}
295298

299+
if let Some(pathfinding_scores_sync_url) = self.pathfinding_scores_sync_url.as_ref() {
300+
setup_background_pathfinding_scores_sync(
301+
pathfinding_scores_sync_url.clone(),
302+
Arc::clone(&self.scorer),
303+
Arc::clone(&self.node_metrics),
304+
Arc::clone(&self.kv_store),
305+
Arc::clone(&self.logger),
306+
Arc::clone(&self.runtime),
307+
self.stop_sender.subscribe(),
308+
);
309+
}
310+
296311
if let Some(listening_addresses) = &self.config.listening_addresses {
297312
// Setup networking
298313
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);
@@ -694,6 +709,8 @@ impl Node {
694709
locked_node_metrics.latest_fee_rate_cache_update_timestamp;
695710
let latest_rgs_snapshot_timestamp =
696711
locked_node_metrics.latest_rgs_snapshot_timestamp.map(|val| val as u64);
712+
let latest_pathfinding_scores_sync_timestamp =
713+
locked_node_metrics.latest_pathfinding_scores_sync_timestamp;
697714
let latest_node_announcement_broadcast_timestamp =
698715
locked_node_metrics.latest_node_announcement_broadcast_timestamp;
699716
let latest_channel_monitor_archival_height =
@@ -706,6 +723,7 @@ impl Node {
706723
latest_onchain_wallet_sync_timestamp,
707724
latest_fee_rate_cache_update_timestamp,
708725
latest_rgs_snapshot_timestamp,
726+
latest_pathfinding_scores_sync_timestamp,
709727
latest_node_announcement_broadcast_timestamp,
710728
latest_channel_monitor_archival_height,
711729
}
@@ -1533,6 +1551,8 @@ pub struct NodeStatus {
15331551
///
15341552
/// Will be `None` if RGS isn't configured or the snapshot hasn't been updated yet.
15351553
pub latest_rgs_snapshot_timestamp: Option<u64>,
1554+
/// The timestamp, in seconds since start of the UNIX epoch, when we last successfully merged external scores.
1555+
pub latest_pathfinding_scores_sync_timestamp: Option<u64>,
15361556
/// The timestamp, in seconds since start of the UNIX epoch, when we last broadcasted a node
15371557
/// announcement.
15381558
///
@@ -1551,6 +1571,7 @@ pub(crate) struct NodeMetrics {
15511571
latest_onchain_wallet_sync_timestamp: Option<u64>,
15521572
latest_fee_rate_cache_update_timestamp: Option<u64>,
15531573
latest_rgs_snapshot_timestamp: Option<u32>,
1574+
latest_pathfinding_scores_sync_timestamp: Option<u64>,
15541575
latest_node_announcement_broadcast_timestamp: Option<u64>,
15551576
latest_channel_monitor_archival_height: Option<u32>,
15561577
}
@@ -1562,6 +1583,7 @@ impl Default for NodeMetrics {
15621583
latest_onchain_wallet_sync_timestamp: None,
15631584
latest_fee_rate_cache_update_timestamp: None,
15641585
latest_rgs_snapshot_timestamp: None,
1586+
latest_pathfinding_scores_sync_timestamp: None,
15651587
latest_node_announcement_broadcast_timestamp: None,
15661588
latest_channel_monitor_archival_height: None,
15671589
}
@@ -1570,6 +1592,7 @@ impl Default for NodeMetrics {
15701592

15711593
impl_writeable_tlv_based!(NodeMetrics, {
15721594
(0, latest_lightning_wallet_sync_timestamp, option),
1595+
(1, latest_pathfinding_scores_sync_timestamp, option),
15731596
(2, latest_onchain_wallet_sync_timestamp, option),
15741597
(4, latest_fee_rate_cache_update_timestamp, option),
15751598
(6, latest_rgs_snapshot_timestamp, option),

0 commit comments

Comments
 (0)