@@ -477,6 +477,7 @@ where L::Target: Logger {
477477 channel_liquidities : ChannelLiquidities ,
478478}
479479/// ChannelLiquidities contains live and historical liquidity bounds for each channel.
480+ #[ derive( Clone ) ]
480481pub struct ChannelLiquidities ( HashMap < u64 , ChannelLiquidity > ) ;
481482
482483impl ChannelLiquidities {
@@ -881,6 +882,7 @@ impl ProbabilisticScoringDecayParameters {
881882/// first node in the ordering of the channel's counterparties. Thus, swapping the two liquidity
882883/// offset fields gives the opposite direction.
883884#[ repr( C ) ] // Force the fields in memory to be in the order we specify
885+ #[ derive( Clone ) ]
884886pub struct ChannelLiquidity {
885887 /// Lower channel liquidity bound in terms of an offset from zero.
886888 min_liquidity_offset_msat : u64 ,
@@ -1151,6 +1153,15 @@ impl ChannelLiquidity {
11511153 }
11521154 }
11531155
1156+ fn merge ( & mut self , other : & Self ) {
1157+ // Take average for min/max liquidity offsets.
1158+ self . min_liquidity_offset_msat = ( self . min_liquidity_offset_msat + other. min_liquidity_offset_msat ) / 2 ;
1159+ self . max_liquidity_offset_msat = ( self . max_liquidity_offset_msat + other. max_liquidity_offset_msat ) / 2 ;
1160+
1161+ // Merge historical liquidity data.
1162+ self . liquidity_history . merge ( & other. liquidity_history ) ;
1163+ }
1164+
11541165 /// Returns a view of the channel liquidity directed from `source` to `target` assuming
11551166 /// `capacity_msat`.
11561167 fn as_directed (
@@ -1684,6 +1695,91 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref> ScoreUpdate for Probabilistic
16841695 }
16851696}
16861697
1698+ /// A probabilistic scorer that combines local and external information to score channels. This scorer is
1699+ /// shadow-tracking local only scores, so that it becomes possible to cleanly merge external scores when they become
1700+ /// available.
1701+ pub struct CombinedScorer < G : Deref < Target = NetworkGraph < L > > , L : Deref > where L :: Target : Logger {
1702+ local_only_scorer : ProbabilisticScorer < G , L > ,
1703+ scorer : ProbabilisticScorer < G , L > ,
1704+ }
1705+
1706+ impl < G : Deref < Target = NetworkGraph < L > > + Clone , L : Deref + Clone > CombinedScorer < G , L > where L :: Target : Logger {
1707+ /// Create a new combined scorer with the given local scorer.
1708+ pub fn new ( local_scorer : ProbabilisticScorer < G , L > ) -> Self {
1709+ let decay_params = local_scorer. decay_params ;
1710+ let network_graph = local_scorer. network_graph . clone ( ) ;
1711+ let logger = local_scorer. logger . clone ( ) ;
1712+ let mut scorer = ProbabilisticScorer :: new ( decay_params, network_graph, logger) ;
1713+
1714+ scorer. channel_liquidities = local_scorer. channel_liquidities . clone ( ) ;
1715+
1716+ Self {
1717+ local_only_scorer : local_scorer,
1718+ scorer : scorer,
1719+ }
1720+ }
1721+
1722+ /// Merge external channel liquidity information into the scorer.
1723+ pub fn merge ( & mut self , mut external_scores : ChannelLiquidities , duration_since_epoch : Duration ) {
1724+ // Decay both sets of scores to make them comparable and mergeable.
1725+ self . local_only_scorer . time_passed ( duration_since_epoch) ;
1726+ external_scores. time_passed ( duration_since_epoch, self . local_only_scorer . decay_params ) ;
1727+
1728+ let local_scores = & self . local_only_scorer . channel_liquidities ;
1729+
1730+ // For each channel, merge the external liquidity information with the isolated local liquidity information.
1731+ for ( scid, mut liquidity) in external_scores. 0 {
1732+ if let Some ( local_liquidity) = local_scores. get ( & scid) {
1733+ liquidity. merge ( local_liquidity) ;
1734+ }
1735+ self . scorer . channel_liquidities . insert ( scid, liquidity) ;
1736+ }
1737+ }
1738+ }
1739+
1740+ impl < G : Deref < Target = NetworkGraph < L > > , L : Deref > ScoreLookUp for CombinedScorer < G , L > where L :: Target : Logger {
1741+ type ScoreParams = ProbabilisticScoringFeeParameters ;
1742+
1743+ fn channel_penalty_msat (
1744+ & self , candidate : & CandidateRouteHop , usage : ChannelUsage , score_params : & ProbabilisticScoringFeeParameters
1745+ ) -> u64 {
1746+ self . scorer . channel_penalty_msat ( candidate, usage, score_params)
1747+ }
1748+ }
1749+
1750+ impl < G : Deref < Target = NetworkGraph < L > > , L : Deref > ScoreUpdate for CombinedScorer < G , L > where L :: Target : Logger {
1751+ fn payment_path_failed ( & mut self , path : & Path , short_channel_id : u64 , duration_since_epoch : Duration ) {
1752+ self . local_only_scorer . payment_path_failed ( path, short_channel_id, duration_since_epoch) ;
1753+ self . scorer . payment_path_failed ( path, short_channel_id, duration_since_epoch) ;
1754+ }
1755+
1756+ fn payment_path_successful ( & mut self , path : & Path , duration_since_epoch : Duration ) {
1757+ self . local_only_scorer . payment_path_successful ( path, duration_since_epoch) ;
1758+ self . scorer . payment_path_successful ( path, duration_since_epoch) ;
1759+ }
1760+
1761+ fn probe_failed ( & mut self , path : & Path , short_channel_id : u64 , duration_since_epoch : Duration ) {
1762+ self . local_only_scorer . probe_failed ( path, short_channel_id, duration_since_epoch) ;
1763+ self . scorer . probe_failed ( path, short_channel_id, duration_since_epoch) ;
1764+ }
1765+
1766+ fn probe_successful ( & mut self , path : & Path , duration_since_epoch : Duration ) {
1767+ self . local_only_scorer . probe_successful ( path, duration_since_epoch) ;
1768+ self . scorer . probe_successful ( path, duration_since_epoch) ;
1769+ }
1770+
1771+ fn time_passed ( & mut self , duration_since_epoch : Duration ) {
1772+ self . local_only_scorer . time_passed ( duration_since_epoch) ;
1773+ self . scorer . time_passed ( duration_since_epoch) ;
1774+ }
1775+ }
1776+
1777+ impl < G : Deref < Target = NetworkGraph < L > > , L : Deref > Writeable for CombinedScorer < G , L > where L :: Target : Logger {
1778+ fn write < W : crate :: util:: ser:: Writer > ( & self , writer : & mut W ) -> Result < ( ) , crate :: io:: Error > {
1779+ self . local_only_scorer . write ( writer)
1780+ }
1781+ }
1782+
16871783#[ cfg( c_bindings) ]
16881784impl < G : Deref < Target = NetworkGraph < L > > , L : Deref > Score for ProbabilisticScorer < G , L >
16891785where L :: Target : Logger { }
@@ -1863,6 +1959,13 @@ mod bucketed_history {
18631959 self . buckets [ bucket] = self . buckets [ bucket] . saturating_add ( BUCKET_FIXED_POINT_ONE ) ;
18641960 }
18651961 }
1962+
1963+ /// Returns the average of the buckets between the two trackers.
1964+ pub ( crate ) fn merge ( & mut self , other : & Self ) -> ( ) {
1965+ for ( index, bucket) in self . buckets . iter_mut ( ) . enumerate ( ) {
1966+ * bucket = ( * bucket + other. buckets [ index] ) / 2 ;
1967+ }
1968+ }
18661969 }
18671970
18681971 impl_writeable_tlv_based ! ( HistoricalBucketRangeTracker , { ( 0 , buckets, required) } ) ;
@@ -1959,6 +2062,13 @@ mod bucketed_history {
19592062 -> DirectedHistoricalLiquidityTracker < & ' a mut HistoricalLiquidityTracker > {
19602063 DirectedHistoricalLiquidityTracker { source_less_than_target, tracker : self }
19612064 }
2065+
2066+ /// Merges the historical liquidity data from another tracker into this one.
2067+ pub fn merge ( & mut self , other : & Self ) {
2068+ self . min_liquidity_offset_history . merge ( & other. min_liquidity_offset_history ) ;
2069+ self . max_liquidity_offset_history . merge ( & other. max_liquidity_offset_history ) ;
2070+ self . recalculate_valid_point_count ( ) ;
2071+ }
19622072 }
19632073
19642074 /// A set of buckets representing the history of where we've seen the minimum- and maximum-
@@ -2117,7 +2227,54 @@ mod bucketed_history {
21172227 Some ( ( cumulative_success_prob * ( 1024.0 * 1024.0 * 1024.0 ) ) as u64 )
21182228 }
21192229 }
2230+
2231+ #[ cfg( test) ]
2232+ mod tests {
2233+ use crate :: routing:: scoring:: ProbabilisticScoringFeeParameters ;
2234+
2235+ use super :: { HistoricalBucketRangeTracker , HistoricalLiquidityTracker } ;
2236+ #[ test]
2237+ fn historical_liquidity_bucket_merge ( ) {
2238+ let mut bucket1 = HistoricalBucketRangeTracker :: new ( ) ;
2239+ bucket1. track_datapoint ( 100 , 1000 ) ;
2240+ assert_eq ! ( bucket1. buckets, [ 0u16 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 32 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ] ) ;
2241+
2242+ let mut bucket2 = HistoricalBucketRangeTracker :: new ( ) ;
2243+ bucket2. track_datapoint ( 0 , 1000 ) ;
2244+ assert_eq ! ( bucket2. buckets, [ 32u16 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ] ) ;
2245+
2246+ bucket1. merge ( & bucket2) ;
2247+ assert_eq ! ( bucket1. buckets, [ 16u16 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 16 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ] ) ;
2248+ }
2249+
2250+ #[ test]
2251+ fn historical_liquidity_tracker_merge ( ) {
2252+ let params = ProbabilisticScoringFeeParameters :: default ( ) ;
2253+
2254+ let probability1: Option < u64 > ;
2255+ let mut tracker1 = HistoricalLiquidityTracker :: new ( ) ;
2256+ {
2257+ let mut directed_tracker1 = tracker1. as_directed_mut ( true ) ;
2258+ directed_tracker1. track_datapoint ( 100 , 200 , 1000 ) ;
2259+ probability1 = directed_tracker1. calculate_success_probability_times_billion ( & params, 500 , 1000 ) ;
2260+ }
2261+
2262+ let mut tracker2 = HistoricalLiquidityTracker :: new ( ) ;
2263+ {
2264+ let mut directed_tracker2 = tracker2. as_directed_mut ( true ) ;
2265+ directed_tracker2. track_datapoint ( 200 , 300 , 1000 ) ;
2266+ }
2267+
2268+ tracker1. merge ( & tracker2) ;
2269+
2270+ let directed_tracker1 = tracker1. as_directed ( true ) ;
2271+ let probability = directed_tracker1. calculate_success_probability_times_billion ( & params, 500 , 1000 ) ;
2272+
2273+ assert_ne ! ( probability1, probability) ;
2274+ }
2275+ }
21202276}
2277+
21212278use bucketed_history:: { LegacyHistoricalBucketRangeTracker , HistoricalBucketRangeTracker , DirectedHistoricalLiquidityTracker , HistoricalLiquidityTracker } ;
21222279use hashbrown:: hash_map:: Entry ;
21232280
@@ -2212,15 +2369,15 @@ impl Readable for ChannelLiquidity {
22122369
22132370#[ cfg( test) ]
22142371mod tests {
2215- use super :: { ChannelLiquidity , HistoricalLiquidityTracker , ProbabilisticScoringFeeParameters , ProbabilisticScoringDecayParameters , ProbabilisticScorer } ;
2372+ use super :: { ChannelLiquidity , HistoricalLiquidityTracker , ProbabilisticScorer , ProbabilisticScoringDecayParameters , ProbabilisticScoringFeeParameters } ;
22162373 use crate :: blinded_path:: BlindedHop ;
22172374 use crate :: util:: config:: UserConfig ;
22182375
22192376 use crate :: ln:: channelmanager;
22202377 use crate :: ln:: msgs:: { ChannelAnnouncement , ChannelUpdate , UnsignedChannelAnnouncement , UnsignedChannelUpdate } ;
22212378 use crate :: routing:: gossip:: { EffectiveCapacity , NetworkGraph , NodeId } ;
22222379 use crate :: routing:: router:: { BlindedTail , Path , RouteHop , CandidateRouteHop , PublicHopCandidate } ;
2223- use crate :: routing:: scoring:: { ChannelUsage , ScoreLookUp , ScoreUpdate } ;
2380+ use crate :: routing:: scoring:: { ChannelLiquidities , ChannelUsage , CombinedScorer , ScoreLookUp , ScoreUpdate } ;
22242381 use crate :: util:: ser:: { ReadableArgs , Writeable } ;
22252382 use crate :: util:: test_utils:: { self , TestLogger } ;
22262383
@@ -2230,6 +2387,7 @@ mod tests {
22302387 use bitcoin:: network:: Network ;
22312388 use bitcoin:: secp256k1:: { PublicKey , Secp256k1 , SecretKey } ;
22322389 use core:: time:: Duration ;
2390+ use std:: rc:: Rc ;
22332391 use crate :: io;
22342392
22352393 fn source_privkey ( ) -> SecretKey {
@@ -3721,6 +3879,47 @@ mod tests {
37213879 assert_eq ! ( scorer. historical_estimated_payment_success_probability( 42 , & target, amount_msat, & params, false ) ,
37223880 Some ( 0.0 ) ) ;
37233881 }
3882+
3883+ #[ test]
3884+ fn combined_scorer ( ) {
3885+ let logger = TestLogger :: new ( ) ;
3886+ let network_graph = network_graph ( & logger) ;
3887+ let params = ProbabilisticScoringFeeParameters :: default ( ) ;
3888+ let mut scorer = ProbabilisticScorer :: new ( ProbabilisticScoringDecayParameters :: default ( ) , & network_graph, & logger) ;
3889+ scorer. payment_path_failed ( & payment_path_for_amount ( 600 ) , 42 , Duration :: ZERO ) ;
3890+
3891+ let mut combined_scorer = CombinedScorer :: new ( scorer) ;
3892+
3893+ let source = source_node_id ( ) ;
3894+ let usage = ChannelUsage {
3895+ amount_msat : 750 ,
3896+ inflight_htlc_msat : 0 ,
3897+ effective_capacity : EffectiveCapacity :: Total { capacity_msat : 1_000 , htlc_maximum_msat : 1_000 } ,
3898+ } ;
3899+ let network_graph = network_graph. read_only ( ) ;
3900+ let channel = network_graph. channel ( 42 ) . unwrap ( ) ;
3901+ let ( info, _) = channel. as_directed_from ( & source) . unwrap ( ) ;
3902+ let candidate = CandidateRouteHop :: PublicHop ( PublicHopCandidate {
3903+ info,
3904+ short_channel_id : 42 ,
3905+ } ) ;
3906+ let penalty = combined_scorer. channel_penalty_msat ( & candidate, usage, & params) ;
3907+
3908+ let mut external_liquidity = ChannelLiquidity :: new ( Duration :: ZERO ) ;
3909+ let logger_rc = Rc :: new ( & logger) ; // Why necessary and not above for the network graph?
3910+ external_liquidity. as_directed_mut ( & source_node_id ( ) , & target_node_id ( ) , 1_000 ) .
3911+ successful ( 1000 , Duration :: ZERO , format_args ! ( "test channel" ) , logger_rc. as_ref ( ) ) ;
3912+
3913+ let mut external_scores = ChannelLiquidities :: new ( ) ;
3914+
3915+ external_scores. insert ( 42 , external_liquidity) ;
3916+ combined_scorer. merge ( external_scores, Duration :: ZERO ) ;
3917+
3918+ let penalty_after_merge = combined_scorer. channel_penalty_msat ( & candidate, usage, & params) ;
3919+
3920+ // Since the external source observed a successful payment, the penalty should be lower after the merge.
3921+ assert ! ( penalty_after_merge < penalty) ;
3922+ }
37243923}
37253924
37263925#[ cfg( ldk_bench) ]
0 commit comments