@@ -478,6 +478,7 @@ where L::Target: Logger {
478478 channel_liquidities : ChannelLiquidities ,
479479}
480480/// Container for live and historical liquidity bounds for each channel.
481+ #[ derive( Clone ) ]
481482pub struct ChannelLiquidities ( HashMap < u64 , ChannelLiquidity > ) ;
482483
483484impl ChannelLiquidities {
@@ -884,6 +885,7 @@ impl ProbabilisticScoringDecayParameters {
884885/// first node in the ordering of the channel's counterparties. Thus, swapping the two liquidity
885886/// offset fields gives the opposite direction.
886887#[ repr( C ) ] // Force the fields in memory to be in the order we specify
888+ #[ derive( Clone ) ]
887889struct ChannelLiquidity {
888890 /// Lower channel liquidity bound in terms of an offset from zero.
889891 min_liquidity_offset_msat : u64 ,
@@ -1154,6 +1156,15 @@ impl ChannelLiquidity {
11541156 }
11551157 }
11561158
1159+ fn merge ( & mut self , other : & Self ) {
1160+ // Take average for min/max liquidity offsets.
1161+ self . min_liquidity_offset_msat = ( self . min_liquidity_offset_msat + other. min_liquidity_offset_msat ) / 2 ;
1162+ self . max_liquidity_offset_msat = ( self . max_liquidity_offset_msat + other. max_liquidity_offset_msat ) / 2 ;
1163+
1164+ // Merge historical liquidity data.
1165+ self . liquidity_history . merge ( & other. liquidity_history ) ;
1166+ }
1167+
11571168 /// Returns a view of the channel liquidity directed from `source` to `target` assuming
11581169 /// `capacity_msat`.
11591170 fn as_directed (
@@ -1687,6 +1698,99 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref> ScoreUpdate for Probabilistic
16871698 }
16881699}
16891700
1701+ /// A probabilistic scorer that combines local and external information to score channels. This scorer is
1702+ /// shadow-tracking local only scores, so that it becomes possible to cleanly merge external scores when they become
1703+ /// available.
1704+ ///
1705+ /// This is useful for nodes that have a limited local view of the network and need to augment their view with scores
1706+ /// from an external source to improve payment reliability. The external source may use something like background
1707+ /// probing to gather a more complete view of the network. Merging reduces the likelihood of losing unique local data on
1708+ /// particular channels.
1709+ ///
1710+ /// Note that only the locally acquired data is persisted. After a restart, the external scores will be lost and must be
1711+ /// resupplied.
1712+ pub struct CombinedScorer < G : Deref < Target = NetworkGraph < L > > , L : Deref > where L :: Target : Logger {
1713+ local_only_scorer : ProbabilisticScorer < G , L > ,
1714+ scorer : ProbabilisticScorer < G , L > ,
1715+ }
1716+
1717+ impl < G : Deref < Target = NetworkGraph < L > > + Clone , L : Deref + Clone > CombinedScorer < G , L > where L :: Target : Logger {
1718+ /// Create a new combined scorer with the given local scorer.
1719+ pub fn new ( local_scorer : ProbabilisticScorer < G , L > ) -> Self {
1720+ let decay_params = local_scorer. decay_params ;
1721+ let network_graph = local_scorer. network_graph . clone ( ) ;
1722+ let logger = local_scorer. logger . clone ( ) ;
1723+ let mut scorer = ProbabilisticScorer :: new ( decay_params, network_graph, logger) ;
1724+
1725+ scorer. channel_liquidities = local_scorer. channel_liquidities . clone ( ) ;
1726+
1727+ Self {
1728+ local_only_scorer : local_scorer,
1729+ scorer : scorer,
1730+ }
1731+ }
1732+
1733+ /// Merge external channel liquidity information into the scorer.
1734+ pub fn merge ( & mut self , mut external_scores : ChannelLiquidities , duration_since_epoch : Duration ) {
1735+ // Decay both sets of scores to make them comparable and mergeable.
1736+ self . local_only_scorer . time_passed ( duration_since_epoch) ;
1737+ external_scores. time_passed ( duration_since_epoch, self . local_only_scorer . decay_params ) ;
1738+
1739+ let local_scores = & self . local_only_scorer . channel_liquidities ;
1740+
1741+ // For each channel, merge the external liquidity information with the isolated local liquidity information.
1742+ for ( scid, mut liquidity) in external_scores. 0 {
1743+ if let Some ( local_liquidity) = local_scores. get ( & scid) {
1744+ liquidity. merge ( local_liquidity) ;
1745+ }
1746+ self . scorer . channel_liquidities . insert ( scid, liquidity) ;
1747+ }
1748+ }
1749+ }
1750+
1751+ impl < G : Deref < Target = NetworkGraph < L > > , L : Deref > ScoreLookUp for CombinedScorer < G , L > where L :: Target : Logger {
1752+ type ScoreParams = ProbabilisticScoringFeeParameters ;
1753+
1754+ fn channel_penalty_msat (
1755+ & self , candidate : & CandidateRouteHop , usage : ChannelUsage , score_params : & ProbabilisticScoringFeeParameters
1756+ ) -> u64 {
1757+ self . scorer . channel_penalty_msat ( candidate, usage, score_params)
1758+ }
1759+ }
1760+
1761+ impl < G : Deref < Target = NetworkGraph < L > > , L : Deref > ScoreUpdate for CombinedScorer < G , L > where L :: Target : Logger {
1762+ fn payment_path_failed ( & mut self , path : & Path , short_channel_id : u64 , duration_since_epoch : Duration ) {
1763+ self . local_only_scorer . payment_path_failed ( path, short_channel_id, duration_since_epoch) ;
1764+ self . scorer . payment_path_failed ( path, short_channel_id, duration_since_epoch) ;
1765+ }
1766+
1767+ fn payment_path_successful ( & mut self , path : & Path , duration_since_epoch : Duration ) {
1768+ self . local_only_scorer . payment_path_successful ( path, duration_since_epoch) ;
1769+ self . scorer . payment_path_successful ( path, duration_since_epoch) ;
1770+ }
1771+
1772+ fn probe_failed ( & mut self , path : & Path , short_channel_id : u64 , duration_since_epoch : Duration ) {
1773+ self . local_only_scorer . probe_failed ( path, short_channel_id, duration_since_epoch) ;
1774+ self . scorer . probe_failed ( path, short_channel_id, duration_since_epoch) ;
1775+ }
1776+
1777+ fn probe_successful ( & mut self , path : & Path , duration_since_epoch : Duration ) {
1778+ self . local_only_scorer . probe_successful ( path, duration_since_epoch) ;
1779+ self . scorer . probe_successful ( path, duration_since_epoch) ;
1780+ }
1781+
1782+ fn time_passed ( & mut self , duration_since_epoch : Duration ) {
1783+ self . local_only_scorer . time_passed ( duration_since_epoch) ;
1784+ self . scorer . time_passed ( duration_since_epoch) ;
1785+ }
1786+ }
1787+
1788+ impl < G : Deref < Target = NetworkGraph < L > > , L : Deref > Writeable for CombinedScorer < G , L > where L :: Target : Logger {
1789+ fn write < W : crate :: util:: ser:: Writer > ( & self , writer : & mut W ) -> Result < ( ) , crate :: io:: Error > {
1790+ self . local_only_scorer . write ( writer)
1791+ }
1792+ }
1793+
16901794#[ cfg( c_bindings) ]
16911795impl < G : Deref < Target = NetworkGraph < L > > , L : Deref > Score for ProbabilisticScorer < G , L >
16921796where L :: Target : Logger { }
@@ -1866,6 +1970,13 @@ mod bucketed_history {
18661970 self . buckets [ bucket] = self . buckets [ bucket] . saturating_add ( BUCKET_FIXED_POINT_ONE ) ;
18671971 }
18681972 }
1973+
1974+ /// Returns the average of the buckets between the two trackers.
1975+ pub ( crate ) fn merge ( & mut self , other : & Self ) -> ( ) {
1976+ for ( bucket, other_bucket) in self . buckets . iter_mut ( ) . zip ( other. buckets . iter ( ) ) {
1977+ * bucket = ( ( * bucket as u32 + * other_bucket as u32 ) / 2 ) as u16 ;
1978+ }
1979+ }
18691980 }
18701981
18711982 impl_writeable_tlv_based ! ( HistoricalBucketRangeTracker , { ( 0 , buckets, required) } ) ;
@@ -1962,6 +2073,13 @@ mod bucketed_history {
19622073 -> DirectedHistoricalLiquidityTracker < & ' a mut HistoricalLiquidityTracker > {
19632074 DirectedHistoricalLiquidityTracker { source_less_than_target, tracker : self }
19642075 }
2076+
2077+ /// Merges the historical liquidity data from another tracker into this one.
2078+ pub fn merge ( & mut self , other : & Self ) {
2079+ self . min_liquidity_offset_history . merge ( & other. min_liquidity_offset_history ) ;
2080+ self . max_liquidity_offset_history . merge ( & other. max_liquidity_offset_history ) ;
2081+ self . recalculate_valid_point_count ( ) ;
2082+ }
19652083 }
19662084
19672085 /// A set of buckets representing the history of where we've seen the minimum- and maximum-
@@ -2120,6 +2238,72 @@ mod bucketed_history {
21202238 Some ( ( cumulative_success_prob * ( 1024.0 * 1024.0 * 1024.0 ) ) as u64 )
21212239 }
21222240 }
2241+
2242+ #[ cfg( test) ]
2243+ mod tests {
2244+ use crate :: routing:: scoring:: ProbabilisticScoringFeeParameters ;
2245+
2246+ use super :: { HistoricalBucketRangeTracker , HistoricalLiquidityTracker } ;
2247+ #[ test]
2248+ fn historical_liquidity_bucket_merge ( ) {
2249+ let mut bucket1 = HistoricalBucketRangeTracker :: new ( ) ;
2250+ bucket1. track_datapoint ( 100 , 1000 ) ;
2251+ assert_eq ! (
2252+ bucket1. buckets,
2253+ [
2254+ 0u16 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 32 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ,
2255+ 0 , 0 , 0 , 0 , 0 , 0 , 0
2256+ ]
2257+ ) ;
2258+
2259+ let mut bucket2 = HistoricalBucketRangeTracker :: new ( ) ;
2260+ bucket2. track_datapoint ( 0 , 1000 ) ;
2261+ assert_eq ! (
2262+ bucket2. buckets,
2263+ [
2264+ 32u16 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ,
2265+ 0 , 0 , 0 , 0 , 0 , 0 , 0
2266+ ]
2267+ ) ;
2268+
2269+ bucket1. merge ( & bucket2) ;
2270+ assert_eq ! (
2271+ bucket1. buckets,
2272+ [
2273+ 16u16 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 16 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ,
2274+ 0 , 0 , 0 , 0 , 0 , 0 , 0
2275+ ]
2276+ ) ;
2277+ }
2278+
2279+ #[ test]
2280+ fn historical_liquidity_tracker_merge ( ) {
2281+ let params = ProbabilisticScoringFeeParameters :: default ( ) ;
2282+
2283+ let probability1: Option < u64 > ;
2284+ let mut tracker1 = HistoricalLiquidityTracker :: new ( ) ;
2285+ {
2286+ let mut directed_tracker1 = tracker1. as_directed_mut ( true ) ;
2287+ directed_tracker1. track_datapoint ( 100 , 200 , 1000 ) ;
2288+ probability1 = directed_tracker1
2289+ . calculate_success_probability_times_billion ( & params, 500 , 1000 ) ;
2290+ }
2291+
2292+ let mut tracker2 = HistoricalLiquidityTracker :: new ( ) ;
2293+ {
2294+ let mut directed_tracker2 = tracker2. as_directed_mut ( true ) ;
2295+ directed_tracker2. track_datapoint ( 200 , 300 , 1000 ) ;
2296+ }
2297+
2298+ tracker1. merge ( & tracker2) ;
2299+
2300+ let directed_tracker1 = tracker1. as_directed ( true ) ;
2301+ let probability =
2302+ directed_tracker1. calculate_success_probability_times_billion ( & params, 500 , 1000 ) ;
2303+
2304+ assert_ne ! ( probability1, probability) ;
2305+ }
2306+ }
21232307}
21242308
21252309impl < G : Deref < Target = NetworkGraph < L > > , L : Deref > Writeable for ProbabilisticScorer < G , L > where L :: Target : Logger {
@@ -2213,15 +2397,15 @@ impl Readable for ChannelLiquidity {
22132397
22142398#[ cfg( test) ]
22152399mod tests {
2216- use super :: { ChannelLiquidity , HistoricalLiquidityTracker , ProbabilisticScoringFeeParameters , ProbabilisticScoringDecayParameters , ProbabilisticScorer } ;
2400+ use super :: { ChannelLiquidity , HistoricalLiquidityTracker , ProbabilisticScorer , ProbabilisticScoringDecayParameters , ProbabilisticScoringFeeParameters } ;
22172401 use crate :: blinded_path:: BlindedHop ;
22182402 use crate :: util:: config:: UserConfig ;
22192403
22202404 use crate :: ln:: channelmanager;
22212405 use crate :: ln:: msgs:: { ChannelAnnouncement , ChannelUpdate , UnsignedChannelAnnouncement , UnsignedChannelUpdate } ;
22222406 use crate :: routing:: gossip:: { EffectiveCapacity , NetworkGraph , NodeId } ;
22232407 use crate :: routing:: router:: { BlindedTail , Path , RouteHop , CandidateRouteHop , PublicHopCandidate } ;
2224- use crate :: routing:: scoring:: { ChannelUsage , ScoreLookUp , ScoreUpdate } ;
2408+ use crate :: routing:: scoring:: { ChannelLiquidities , ChannelUsage , CombinedScorer , ScoreLookUp , ScoreUpdate } ;
22252409 use crate :: util:: ser:: { ReadableArgs , Writeable } ;
22262410 use crate :: util:: test_utils:: { self , TestLogger } ;
22272411
@@ -2231,6 +2415,7 @@ mod tests {
22312415 use bitcoin:: network:: Network ;
22322416 use bitcoin:: secp256k1:: { PublicKey , Secp256k1 , SecretKey } ;
22332417 use core:: time:: Duration ;
2418+ use std:: rc:: Rc ;
22342419 use crate :: io;
22352420
22362421 fn source_privkey ( ) -> SecretKey {
@@ -3722,6 +3907,68 @@ mod tests {
37223907 assert_eq ! ( scorer. historical_estimated_payment_success_probability( 42 , & target, amount_msat, & params, false ) ,
37233908 Some ( 0.0 ) ) ;
37243909 }
3910+
3911+ #[ test]
3912+ fn combined_scorer ( ) {
3913+ let logger = TestLogger :: new ( ) ;
3914+ let network_graph = network_graph ( & logger) ;
3915+ let params = ProbabilisticScoringFeeParameters :: default ( ) ;
3916+ let mut scorer = ProbabilisticScorer :: new (
3917+ ProbabilisticScoringDecayParameters :: default ( ) ,
3918+ & network_graph,
3919+ & logger,
3920+ ) ;
3921+ scorer. payment_path_failed ( & payment_path_for_amount ( 600 ) , 42 , Duration :: ZERO ) ;
3922+
3923+ let mut combined_scorer = CombinedScorer :: new ( scorer) ;
3924+
3925+ // Verify that the combined_scorer has the correct liquidity range after a failed 600 msat payment.
3926+ let liquidity_range =
3927+ combined_scorer. scorer . estimated_channel_liquidity_range ( 42 , & target_node_id ( ) ) ;
3928+ assert_eq ! ( liquidity_range. unwrap( ) , ( 0 , 600 ) ) ;
3929+
3930+ let source = source_node_id ( ) ;
3931+ let usage = ChannelUsage {
3932+ amount_msat : 750 ,
3933+ inflight_htlc_msat : 0 ,
3934+ effective_capacity : EffectiveCapacity :: Total {
3935+ capacity_msat : 1_000 ,
3936+ htlc_maximum_msat : 1_000 ,
3937+ } ,
3938+ } ;
3939+
3940+ {
3941+ let network_graph = network_graph. read_only ( ) ;
3942+ let channel = network_graph. channel ( 42 ) . unwrap ( ) ;
3943+ let ( info, _) = channel. as_directed_from ( & source) . unwrap ( ) ;
3944+ let candidate =
3945+ CandidateRouteHop :: PublicHop ( PublicHopCandidate { info, short_channel_id : 42 } ) ;
3946+
3947+ let penalty = combined_scorer. channel_penalty_msat ( & candidate, usage, & params) ;
3948+
3949+ let mut external_liquidity = ChannelLiquidity :: new ( Duration :: ZERO ) ;
3950+ let logger_rc = Rc :: new ( & logger) ; // Why necessary and not above for the network graph?
3951+ external_liquidity
3952+ . as_directed_mut ( & source_node_id ( ) , & target_node_id ( ) , 1_000 )
3953+ . successful ( 1000 , Duration :: ZERO , format_args ! ( "test channel" ) , logger_rc. as_ref ( ) ) ;
3954+
3955+ let mut external_scores = ChannelLiquidities :: new ( ) ;
3956+
3957+ external_scores. insert ( 42 , external_liquidity) ;
3958+ combined_scorer. merge ( external_scores, Duration :: ZERO ) ;
3959+
3960+ let penalty_after_merge =
3961+ combined_scorer. channel_penalty_msat ( & candidate, usage, & params) ;
3962+
3963+ // Since the external source observed a successful payment, the penalty should be lower after the merge.
3964+ assert ! ( penalty_after_merge < penalty) ;
3965+ }
3966+
3967+ // Verify that after the merge with a successful payment, the liquidity range is increased.
3968+ let liquidity_range =
3969+ combined_scorer. scorer . estimated_channel_liquidity_range ( 42 , & target_node_id ( ) ) ;
3970+ assert_eq ! ( liquidity_range. unwrap( ) , ( 0 , 300 ) ) ;
3971+ }
37253972}
37263973
37273974#[ cfg( ldk_bench) ]
0 commit comments