@@ -57,13 +57,14 @@ use crate::routing::router::{Path, CandidateRouteHop, PublicHopCandidate};
5757use crate :: routing:: log_approx;
5858use crate :: util:: ser:: { Readable , ReadableArgs , Writeable , Writer } ;
5959use crate :: util:: logger:: Logger ;
60-
6160use crate :: prelude:: * ;
61+ use crate :: prelude:: hash_map:: Entry ;
6262use core:: { cmp, fmt} ;
6363use core:: ops:: { Deref , DerefMut } ;
6464use core:: time:: Duration ;
6565use crate :: io:: { self , Read } ;
6666use crate :: sync:: { RwLock , RwLockReadGuard , RwLockWriteGuard } ;
67+ use bucketed_history:: { LegacyHistoricalBucketRangeTracker , HistoricalBucketRangeTracker , DirectedHistoricalLiquidityTracker , HistoricalLiquidityTracker } ;
6768#[ cfg( not( c_bindings) ) ]
6869use {
6970 core:: cell:: { RefCell , RefMut , Ref } ,
@@ -474,7 +475,86 @@ where L::Target: Logger {
474475 decay_params : ProbabilisticScoringDecayParameters ,
475476 network_graph : G ,
476477 logger : L ,
477- channel_liquidities : HashMap < u64 , ChannelLiquidity > ,
478+ channel_liquidities : ChannelLiquidities ,
479+ }
480+ /// Container for live and historical liquidity bounds for each channel.
481+ pub struct ChannelLiquidities ( HashMap < u64 , ChannelLiquidity > ) ;
482+
483+ impl ChannelLiquidities {
484+ fn new ( ) -> Self {
485+ Self ( new_hash_map ( ) )
486+ }
487+
488+ fn time_passed ( & mut self , duration_since_epoch : Duration , decay_params : ProbabilisticScoringDecayParameters ) {
489+ self . 0 . retain ( |_scid, liquidity| {
490+ liquidity. min_liquidity_offset_msat =
491+ liquidity. decayed_offset ( liquidity. min_liquidity_offset_msat , duration_since_epoch, decay_params) ;
492+ liquidity. max_liquidity_offset_msat =
493+ liquidity. decayed_offset ( liquidity. max_liquidity_offset_msat , duration_since_epoch, decay_params) ;
494+ liquidity. last_updated = duration_since_epoch;
495+
496+ // Only decay the historical buckets if there hasn't been new data for a while. This ties back to our
497+ // earlier conclusion that fixed half-lives for scoring data are inherently flawed—they tend to be either
498+ // too fast or too slow. Ideally, historical buckets should only decay as new data is added, which naturally
499+ // happens when fresh data arrives. However, scoring a channel based on month-old data while treating it the
500+ // same as one with minute-old data is problematic. To address this, we introduced a decay mechanism, but it
501+ // runs very slowly and only activates when no new data has been received for a while, as our preference is
502+ // to decay based on incoming data.
503+ let elapsed_time =
504+ duration_since_epoch. saturating_sub ( liquidity. offset_history_last_updated ) ;
505+ if elapsed_time > decay_params. historical_no_updates_half_life {
506+ let half_life = decay_params. historical_no_updates_half_life . as_secs_f64 ( ) ;
507+ if half_life != 0.0 {
508+ liquidity. liquidity_history . decay_buckets ( elapsed_time. as_secs_f64 ( ) / half_life) ;
509+ liquidity. offset_history_last_updated = duration_since_epoch;
510+ }
511+ }
512+ liquidity. min_liquidity_offset_msat != 0 || liquidity. max_liquidity_offset_msat != 0 ||
513+ liquidity. liquidity_history . has_datapoints ( )
514+ } ) ;
515+ }
516+
517+ fn get ( & self , short_channel_id : & u64 ) -> Option < & ChannelLiquidity > {
518+ self . 0 . get ( short_channel_id)
519+ }
520+
521+ fn insert ( & mut self , short_channel_id : u64 , liquidity : ChannelLiquidity ) -> Option < ChannelLiquidity > {
522+ self . 0 . insert ( short_channel_id, liquidity)
523+ }
524+
525+ fn iter ( & self ) -> impl Iterator < Item = ( & u64 , & ChannelLiquidity ) > {
526+ self . 0 . iter ( )
527+ }
528+
529+ fn entry ( & mut self , short_channel_id : u64 ) -> Entry < u64 , ChannelLiquidity , RandomState > {
530+ self . 0 . entry ( short_channel_id)
531+ }
532+
533+ #[ cfg( test) ]
534+ fn get_mut ( & mut self , short_channel_id : & u64 ) -> Option < & mut ChannelLiquidity > {
535+ self . 0 . get_mut ( short_channel_id)
536+ }
537+ }
538+
539+ impl Readable for ChannelLiquidities {
540+ #[ inline]
541+ fn read < R : Read > ( r : & mut R ) -> Result < Self , DecodeError > {
542+ let mut channel_liquidities = new_hash_map ( ) ;
543+ read_tlv_fields ! ( r, {
544+ ( 0 , channel_liquidities, required) ,
545+ } ) ;
546+ Ok ( ChannelLiquidities ( channel_liquidities) )
547+ }
548+ }
549+
550+ impl Writeable for ChannelLiquidities {
551+ #[ inline]
552+ fn write < W : Writer > ( & self , w : & mut W ) -> Result < ( ) , io:: Error > {
553+ write_tlv_fields ! ( w, {
554+ ( 0 , self . 0 , required) ,
555+ } ) ;
556+ Ok ( ( ) )
557+ }
478558}
479559
480560/// Parameters for configuring [`ProbabilisticScorer`].
@@ -849,7 +929,7 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref> ProbabilisticScorer<G, L> whe
849929 decay_params,
850930 network_graph,
851931 logger,
852- channel_liquidities : new_hash_map ( ) ,
932+ channel_liquidities : ChannelLiquidities :: new ( ) ,
853933 }
854934 }
855935
@@ -1603,26 +1683,7 @@ impl<G: Deref<Target = NetworkGraph<L>>, L: Deref> ScoreUpdate for Probabilistic
16031683 }
16041684
16051685 fn time_passed ( & mut self , duration_since_epoch : Duration ) {
1606- let decay_params = self . decay_params ;
1607- self . channel_liquidities . retain ( |_scid, liquidity| {
1608- liquidity. min_liquidity_offset_msat =
1609- liquidity. decayed_offset ( liquidity. min_liquidity_offset_msat , duration_since_epoch, decay_params) ;
1610- liquidity. max_liquidity_offset_msat =
1611- liquidity. decayed_offset ( liquidity. max_liquidity_offset_msat , duration_since_epoch, decay_params) ;
1612- liquidity. last_updated = duration_since_epoch;
1613-
1614- let elapsed_time =
1615- duration_since_epoch. saturating_sub ( liquidity. offset_history_last_updated ) ;
1616- if elapsed_time > decay_params. historical_no_updates_half_life {
1617- let half_life = decay_params. historical_no_updates_half_life . as_secs_f64 ( ) ;
1618- if half_life != 0.0 {
1619- liquidity. liquidity_history . decay_buckets ( elapsed_time. as_secs_f64 ( ) / half_life) ;
1620- liquidity. offset_history_last_updated = duration_since_epoch;
1621- }
1622- }
1623- liquidity. min_liquidity_offset_msat != 0 || liquidity. max_liquidity_offset_msat != 0 ||
1624- liquidity. liquidity_history . has_datapoints ( )
1625- } ) ;
1686+ self . channel_liquidities . time_passed ( duration_since_epoch, self . decay_params ) ;
16261687 }
16271688}
16281689
@@ -2060,15 +2121,11 @@ mod bucketed_history {
20602121 }
20612122 }
20622123}
2063- use bucketed_history:: { LegacyHistoricalBucketRangeTracker , HistoricalBucketRangeTracker , DirectedHistoricalLiquidityTracker , HistoricalLiquidityTracker } ;
20642124
20652125impl < G : Deref < Target = NetworkGraph < L > > , L : Deref > Writeable for ProbabilisticScorer < G , L > where L :: Target : Logger {
20662126 #[ inline]
20672127 fn write < W : Writer > ( & self , w : & mut W ) -> Result < ( ) , io:: Error > {
2068- write_tlv_fields ! ( w, {
2069- ( 0 , self . channel_liquidities, required) ,
2070- } ) ;
2071- Ok ( ( ) )
2128+ self . channel_liquidities . write ( w)
20722129 }
20732130}
20742131
@@ -2079,10 +2136,7 @@ ReadableArgs<(ProbabilisticScoringDecayParameters, G, L)> for ProbabilisticScore
20792136 r : & mut R , args : ( ProbabilisticScoringDecayParameters , G , L )
20802137 ) -> Result < Self , DecodeError > {
20812138 let ( decay_params, network_graph, logger) = args;
2082- let mut channel_liquidities = new_hash_map ( ) ;
2083- read_tlv_fields ! ( r, {
2084- ( 0 , channel_liquidities, required) ,
2085- } ) ;
2139+ let channel_liquidities = ChannelLiquidities :: read ( r) ?;
20862140 Ok ( Self {
20872141 decay_params,
20882142 network_graph,
0 commit comments