@@ -478,6 +478,7 @@ where L::Target: Logger {
478478 channel_liquidities : ChannelLiquidities ,
479479}
480480/// Container for live and historical liquidity bounds for each channel.
481+ #[ derive( Clone ) ]
481482pub struct ChannelLiquidities ( HashMap < u64 , ChannelLiquidity > ) ;
482483
483484impl ChannelLiquidities {
@@ -878,6 +879,7 @@ impl ProbabilisticScoringDecayParameters {
878879/// first node in the ordering of the channel's counterparties. Thus, swapping the two liquidity
879880/// offset fields gives the opposite direction.
880881#[ repr( C ) ] // Force the fields in memory to be in the order we specify
882+ #[ derive( Clone ) ]
881883pub struct ChannelLiquidity {
882884 /// Lower channel liquidity bound in terms of an offset from zero.
883885 min_liquidity_offset_msat : u64 ,
@@ -1148,6 +1150,15 @@ impl ChannelLiquidity {
11481150 }
11491151 }
11501152
1153+ fn merge ( & mut self , other : & Self ) {
1154+ // Take average for min/max liquidity offsets.
1155+ self . min_liquidity_offset_msat = ( self . min_liquidity_offset_msat + other. min_liquidity_offset_msat ) / 2 ;
1156+ self . max_liquidity_offset_msat = ( self . max_liquidity_offset_msat + other. max_liquidity_offset_msat ) / 2 ;
1157+
1158+ // Merge historical liquidity data.
1159+ self . liquidity_history . merge ( & other. liquidity_history ) ;
1160+ }
1161+
11511162 /// Returns a view of the channel liquidity directed from `source` to `target` assuming
11521163 /// `capacity_msat`.
11531164 fn as_directed (
@@ -1681,6 +1692,96 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref> ScoreUpdate for Probabilistic
16811692 }
16821693}
16831694
1695+ /// A probabilistic scorer that combines local and external information to score channels. This scorer is
1696+ /// shadow-tracking local only scores, so that it becomes possible to cleanly merge external scores when they become
1697+ /// available.
1698+ ///
1699+ /// This is useful for nodes that have a limited local view of the network and need to augment their view with scores
1700+ /// from an external source to improve payment reliability. The external source may use something like background
1701+ /// probing to gather a more complete view of the network. Merging reduces the likelihood of losing unique local data on
1702+ /// particular channels.
1703+ pub struct CombinedScorer < G : Deref < Target = NetworkGraph < L > > , L : Deref > where L :: Target : Logger {
1704+ local_only_scorer : ProbabilisticScorer < G , L > ,
1705+ scorer : ProbabilisticScorer < G , L > ,
1706+ }
1707+
1708+ impl < G : Deref < Target = NetworkGraph < L > > + Clone , L : Deref + Clone > CombinedScorer < G , L > where L :: Target : Logger {
1709+ /// Create a new combined scorer with the given local scorer.
1710+ pub fn new ( local_scorer : ProbabilisticScorer < G , L > ) -> Self {
1711+ let decay_params = local_scorer. decay_params ;
1712+ let network_graph = local_scorer. network_graph . clone ( ) ;
1713+ let logger = local_scorer. logger . clone ( ) ;
1714+ let mut scorer = ProbabilisticScorer :: new ( decay_params, network_graph, logger) ;
1715+
1716+ scorer. channel_liquidities = local_scorer. channel_liquidities . clone ( ) ;
1717+
1718+ Self {
1719+ local_only_scorer : local_scorer,
1720+ scorer : scorer,
1721+ }
1722+ }
1723+
1724+ /// Merge external channel liquidity information into the scorer.
1725+ pub fn merge ( & mut self , mut external_scores : ChannelLiquidities , duration_since_epoch : Duration ) {
1726+ // Decay both sets of scores to make them comparable and mergeable.
1727+ self . local_only_scorer . time_passed ( duration_since_epoch) ;
1728+ external_scores. time_passed ( duration_since_epoch, self . local_only_scorer . decay_params ) ;
1729+
1730+ let local_scores = & self . local_only_scorer . channel_liquidities ;
1731+
1732+ // For each channel, merge the external liquidity information with the isolated local liquidity information.
1733+ for ( scid, mut liquidity) in external_scores. 0 {
1734+ if let Some ( local_liquidity) = local_scores. get ( & scid) {
1735+ liquidity. merge ( local_liquidity) ;
1736+ }
1737+ self . scorer . channel_liquidities . insert ( scid, liquidity) ;
1738+ }
1739+ }
1740+ }
1741+
1742+ impl < G : Deref < Target = NetworkGraph < L > > , L : Deref > ScoreLookUp for CombinedScorer < G , L > where L :: Target : Logger {
1743+ type ScoreParams = ProbabilisticScoringFeeParameters ;
1744+
1745+ fn channel_penalty_msat (
1746+ & self , candidate : & CandidateRouteHop , usage : ChannelUsage , score_params : & ProbabilisticScoringFeeParameters
1747+ ) -> u64 {
1748+ self . scorer . channel_penalty_msat ( candidate, usage, score_params)
1749+ }
1750+ }
1751+
1752+ impl < G : Deref < Target = NetworkGraph < L > > , L : Deref > ScoreUpdate for CombinedScorer < G , L > where L :: Target : Logger {
1753+ fn payment_path_failed ( & mut self , path : & Path , short_channel_id : u64 , duration_since_epoch : Duration ) {
1754+ self . local_only_scorer . payment_path_failed ( path, short_channel_id, duration_since_epoch) ;
1755+ self . scorer . payment_path_failed ( path, short_channel_id, duration_since_epoch) ;
1756+ }
1757+
1758+ fn payment_path_successful ( & mut self , path : & Path , duration_since_epoch : Duration ) {
1759+ self . local_only_scorer . payment_path_successful ( path, duration_since_epoch) ;
1760+ self . scorer . payment_path_successful ( path, duration_since_epoch) ;
1761+ }
1762+
1763+ fn probe_failed ( & mut self , path : & Path , short_channel_id : u64 , duration_since_epoch : Duration ) {
1764+ self . local_only_scorer . probe_failed ( path, short_channel_id, duration_since_epoch) ;
1765+ self . scorer . probe_failed ( path, short_channel_id, duration_since_epoch) ;
1766+ }
1767+
1768+ fn probe_successful ( & mut self , path : & Path , duration_since_epoch : Duration ) {
1769+ self . local_only_scorer . probe_successful ( path, duration_since_epoch) ;
1770+ self . scorer . probe_successful ( path, duration_since_epoch) ;
1771+ }
1772+
1773+ fn time_passed ( & mut self , duration_since_epoch : Duration ) {
1774+ self . local_only_scorer . time_passed ( duration_since_epoch) ;
1775+ self . scorer . time_passed ( duration_since_epoch) ;
1776+ }
1777+ }
1778+
1779+ impl < G : Deref < Target = NetworkGraph < L > > , L : Deref > Writeable for CombinedScorer < G , L > where L :: Target : Logger {
1780+ fn write < W : crate :: util:: ser:: Writer > ( & self , writer : & mut W ) -> Result < ( ) , crate :: io:: Error > {
1781+ self . local_only_scorer . write ( writer)
1782+ }
1783+ }
1784+
16841785#[ cfg( c_bindings) ]
16851786impl < G : Deref < Target = NetworkGraph < L > > , L : Deref > Score for ProbabilisticScorer < G , L >
16861787where L :: Target : Logger { }
@@ -1860,6 +1961,13 @@ mod bucketed_history {
18601961 self . buckets [ bucket] = self . buckets [ bucket] . saturating_add ( BUCKET_FIXED_POINT_ONE ) ;
18611962 }
18621963 }
1964+
1965+ /// Returns the average of the buckets between the two trackers.
1966+ pub ( crate ) fn merge ( & mut self , other : & Self ) -> ( ) {
1967+ for ( index, bucket) in self . buckets . iter_mut ( ) . enumerate ( ) {
1968+ * bucket = ( * bucket + other. buckets [ index] ) / 2 ;
1969+ }
1970+ }
18631971 }
18641972
18651973 impl_writeable_tlv_based ! ( HistoricalBucketRangeTracker , { ( 0 , buckets, required) } ) ;
@@ -1956,6 +2064,13 @@ mod bucketed_history {
19562064 -> DirectedHistoricalLiquidityTracker < & ' a mut HistoricalLiquidityTracker > {
19572065 DirectedHistoricalLiquidityTracker { source_less_than_target, tracker : self }
19582066 }
2067+
2068+ /// Merges the historical liquidity data from another tracker into this one.
2069+ pub fn merge ( & mut self , other : & Self ) {
2070+ self . min_liquidity_offset_history . merge ( & other. min_liquidity_offset_history ) ;
2071+ self . max_liquidity_offset_history . merge ( & other. max_liquidity_offset_history ) ;
2072+ self . recalculate_valid_point_count ( ) ;
2073+ }
19592074 }
19602075
19612076 /// A set of buckets representing the history of where we've seen the minimum- and maximum-
@@ -2114,6 +2229,72 @@ mod bucketed_history {
21142229 Some ( ( cumulative_success_prob * ( 1024.0 * 1024.0 * 1024.0 ) ) as u64 )
21152230 }
21162231 }
2232+
2233+ #[ cfg( test) ]
2234+ mod tests {
2235+ use crate :: routing:: scoring:: ProbabilisticScoringFeeParameters ;
2236+
2237+ use super :: { HistoricalBucketRangeTracker , HistoricalLiquidityTracker } ;
2238+ #[ test]
2239+ fn historical_liquidity_bucket_merge ( ) {
2240+ let mut bucket1 = HistoricalBucketRangeTracker :: new ( ) ;
2241+ bucket1. track_datapoint ( 100 , 1000 ) ;
2242+ assert_eq ! (
2243+ bucket1. buckets,
2244+ [
2245+ 0u16 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 32 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ,
2246+ 0 , 0 , 0 , 0 , 0 , 0 , 0
2247+ ]
2248+ ) ;
2249+
2250+ let mut bucket2 = HistoricalBucketRangeTracker :: new ( ) ;
2251+ bucket2. track_datapoint ( 0 , 1000 ) ;
2252+ assert_eq ! (
2253+ bucket2. buckets,
2254+ [
2255+ 32u16 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ,
2256+ 0 , 0 , 0 , 0 , 0 , 0 , 0
2257+ ]
2258+ ) ;
2259+
2260+ bucket1. merge ( & bucket2) ;
2261+ assert_eq ! (
2262+ bucket1. buckets,
2263+ [
2264+ 16u16 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 16 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ,
2265+ 0 , 0 , 0 , 0 , 0 , 0 , 0
2266+ ]
2267+ ) ;
2268+ }
2269+
2270+ #[ test]
2271+ fn historical_liquidity_tracker_merge ( ) {
2272+ let params = ProbabilisticScoringFeeParameters :: default ( ) ;
2273+
2274+ let probability1: Option < u64 > ;
2275+ let mut tracker1 = HistoricalLiquidityTracker :: new ( ) ;
2276+ {
2277+ let mut directed_tracker1 = tracker1. as_directed_mut ( true ) ;
2278+ directed_tracker1. track_datapoint ( 100 , 200 , 1000 ) ;
2279+ probability1 = directed_tracker1
2280+ . calculate_success_probability_times_billion ( & params, 500 , 1000 ) ;
2281+ }
2282+
2283+ let mut tracker2 = HistoricalLiquidityTracker :: new ( ) ;
2284+ {
2285+ let mut directed_tracker2 = tracker2. as_directed_mut ( true ) ;
2286+ directed_tracker2. track_datapoint ( 200 , 300 , 1000 ) ;
2287+ }
2288+
2289+ tracker1. merge ( & tracker2) ;
2290+
2291+ let directed_tracker1 = tracker1. as_directed ( true ) ;
2292+ let probability =
2293+ directed_tracker1. calculate_success_probability_times_billion ( & params, 500 , 1000 ) ;
2294+
2295+ assert_ne ! ( probability1, probability) ;
2296+ }
2297+ }
21172298}
21182299
21192300impl < G : Deref < Target = NetworkGraph < L > > , L : Deref > Writeable for ProbabilisticScorer < G , L > where L :: Target : Logger {
@@ -2207,15 +2388,15 @@ impl Readable for ChannelLiquidity {
22072388
22082389#[ cfg( test) ]
22092390mod tests {
2210- use super :: { ChannelLiquidity , HistoricalLiquidityTracker , ProbabilisticScoringFeeParameters , ProbabilisticScoringDecayParameters , ProbabilisticScorer } ;
2391+ use super :: { ChannelLiquidity , HistoricalLiquidityTracker , ProbabilisticScorer , ProbabilisticScoringDecayParameters , ProbabilisticScoringFeeParameters } ;
22112392 use crate :: blinded_path:: BlindedHop ;
22122393 use crate :: util:: config:: UserConfig ;
22132394
22142395 use crate :: ln:: channelmanager;
22152396 use crate :: ln:: msgs:: { ChannelAnnouncement , ChannelUpdate , UnsignedChannelAnnouncement , UnsignedChannelUpdate } ;
22162397 use crate :: routing:: gossip:: { EffectiveCapacity , NetworkGraph , NodeId } ;
22172398 use crate :: routing:: router:: { BlindedTail , Path , RouteHop , CandidateRouteHop , PublicHopCandidate } ;
2218- use crate :: routing:: scoring:: { ChannelUsage , ScoreLookUp , ScoreUpdate } ;
2399+ use crate :: routing:: scoring:: { ChannelLiquidities , ChannelUsage , CombinedScorer , ScoreLookUp , ScoreUpdate } ;
22192400 use crate :: util:: ser:: { ReadableArgs , Writeable } ;
22202401 use crate :: util:: test_utils:: { self , TestLogger } ;
22212402
@@ -2225,6 +2406,7 @@ mod tests {
22252406 use bitcoin:: network:: Network ;
22262407 use bitcoin:: secp256k1:: { PublicKey , Secp256k1 , SecretKey } ;
22272408 use core:: time:: Duration ;
2409+ use std:: rc:: Rc ;
22282410 use crate :: io;
22292411
22302412 fn source_privkey ( ) -> SecretKey {
@@ -3716,6 +3898,68 @@ mod tests {
37163898 assert_eq ! ( scorer. historical_estimated_payment_success_probability( 42 , & target, amount_msat, & params, false ) ,
37173899 Some ( 0.0 ) ) ;
37183900 }
3901+
3902+ #[ test]
3903+ fn combined_scorer ( ) {
3904+ let logger = TestLogger :: new ( ) ;
3905+ let network_graph = network_graph ( & logger) ;
3906+ let params = ProbabilisticScoringFeeParameters :: default ( ) ;
3907+ let mut scorer = ProbabilisticScorer :: new (
3908+ ProbabilisticScoringDecayParameters :: default ( ) ,
3909+ & network_graph,
3910+ & logger,
3911+ ) ;
3912+ scorer. payment_path_failed ( & payment_path_for_amount ( 600 ) , 42 , Duration :: ZERO ) ;
3913+
3914+ let mut combined_scorer = CombinedScorer :: new ( scorer) ;
3915+
3916+ // Verify that the combined_scorer has the correct liquidity range after a failed 600 msat payment.
3917+ let liquidity_range =
3918+ combined_scorer. scorer . estimated_channel_liquidity_range ( 42 , & target_node_id ( ) ) ;
3919+ assert_eq ! ( liquidity_range. unwrap( ) , ( 0 , 600 ) ) ;
3920+
3921+ let source = source_node_id ( ) ;
3922+ let usage = ChannelUsage {
3923+ amount_msat : 750 ,
3924+ inflight_htlc_msat : 0 ,
3925+ effective_capacity : EffectiveCapacity :: Total {
3926+ capacity_msat : 1_000 ,
3927+ htlc_maximum_msat : 1_000 ,
3928+ } ,
3929+ } ;
3930+
3931+ {
3932+ let network_graph = network_graph. read_only ( ) ;
3933+ let channel = network_graph. channel ( 42 ) . unwrap ( ) ;
3934+ let ( info, _) = channel. as_directed_from ( & source) . unwrap ( ) ;
3935+ let candidate =
3936+ CandidateRouteHop :: PublicHop ( PublicHopCandidate { info, short_channel_id : 42 } ) ;
3937+
3938+ let penalty = combined_scorer. channel_penalty_msat ( & candidate, usage, & params) ;
3939+
3940+ let mut external_liquidity = ChannelLiquidity :: new ( Duration :: ZERO ) ;
3941+ let logger_rc = Rc :: new ( & logger) ; // Why necessary and not above for the network graph?
3942+ external_liquidity
3943+ . as_directed_mut ( & source_node_id ( ) , & target_node_id ( ) , 1_000 )
3944+ . successful ( 1000 , Duration :: ZERO , format_args ! ( "test channel" ) , logger_rc. as_ref ( ) ) ;
3945+
3946+ let mut external_scores = ChannelLiquidities :: new ( ) ;
3947+
3948+ external_scores. insert ( 42 , external_liquidity) ;
3949+ combined_scorer. merge ( external_scores, Duration :: ZERO ) ;
3950+
3951+ let penalty_after_merge =
3952+ combined_scorer. channel_penalty_msat ( & candidate, usage, & params) ;
3953+
3954+ // Since the external source observed a successful payment, the penalty should be lower after the merge.
3955+ assert ! ( penalty_after_merge < penalty) ;
3956+ }
3957+
3958+ // Verify that after the merge with a successful payment, the liquidity range is increased.
3959+ let liquidity_range =
3960+ combined_scorer. scorer . estimated_channel_liquidity_range ( 42 , & target_node_id ( ) ) ;
3961+ assert_eq ! ( liquidity_range. unwrap( ) , ( 0 , 300 ) ) ;
3962+ }
37193963}
37203964
37213965#[ cfg( ldk_bench) ]
0 commit comments