9
9
10
10
//! Contains the main bLIP-52 / LSPS2 server-side object, [`LSPS2ServiceHandler`].
11
11
12
+ use alloc:: boxed:: Box ;
12
13
use alloc:: string:: { String , ToString } ;
13
14
use alloc:: vec:: Vec ;
15
+ use lightning:: util:: persist:: KVStore ;
14
16
15
17
use core:: cmp:: Ordering as CmpOrdering ;
18
+ use core:: future:: Future ;
16
19
use core:: ops:: Deref ;
20
+ use core:: pin:: Pin ;
17
21
use core:: sync:: atomic:: { AtomicUsize , Ordering } ;
18
22
19
23
use crate :: events:: EventQueue ;
@@ -28,6 +32,9 @@ use crate::lsps2::utils::{
28
32
compute_opening_fee, is_expired_opening_fee_params, is_valid_opening_fee_params,
29
33
} ;
30
34
use crate :: message_queue:: { MessageQueue , MessageQueueNotifierGuard } ;
35
+ use crate :: persist:: {
36
+ LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE , LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE ,
37
+ } ;
31
38
use crate :: prelude:: hash_map:: Entry ;
32
39
use crate :: prelude:: { new_hash_map, HashMap } ;
33
40
use crate :: sync:: { Arc , Mutex , MutexGuard , RwLock } ;
@@ -38,6 +45,7 @@ use lightning::ln::msgs::{ErrorAction, LightningError};
38
45
use lightning:: ln:: types:: ChannelId ;
39
46
use lightning:: util:: errors:: APIError ;
40
47
use lightning:: util:: logger:: Level ;
48
+ use lightning:: util:: ser:: Writeable ;
41
49
use lightning:: { impl_writeable_tlv_based, impl_writeable_tlv_based_enum} ;
42
50
43
51
use lightning_types:: payment:: PaymentHash ;
@@ -569,6 +577,7 @@ where
569
577
CM :: Target : AChannelManager ,
570
578
{
571
579
channel_manager : CM ,
580
+ kv_store : Arc < dyn KVStore + Send + Sync > ,
572
581
pending_messages : Arc < MessageQueue > ,
573
582
pending_events : Arc < EventQueue > ,
574
583
per_peer_state : RwLock < HashMap < PublicKey , Mutex < PeerState > > > ,
@@ -585,7 +594,7 @@ where
585
594
/// Constructs a `LSPS2ServiceHandler`.
586
595
pub ( crate ) fn new (
587
596
pending_messages : Arc < MessageQueue > , pending_events : Arc < EventQueue > , channel_manager : CM ,
588
- config : LSPS2ServiceConfig ,
597
+ kv_store : Arc < dyn KVStore + Send + Sync > , config : LSPS2ServiceConfig ,
589
598
) -> Self {
590
599
Self {
591
600
pending_messages,
@@ -595,6 +604,7 @@ where
595
604
peer_by_channel_id : RwLock :: new ( new_hash_map ( ) ) ,
596
605
total_pending_requests : AtomicUsize :: new ( 0 ) ,
597
606
channel_manager,
607
+ kv_store,
598
608
config,
599
609
}
600
610
}
@@ -1442,6 +1452,45 @@ where
1442
1452
) ;
1443
1453
}
1444
1454
1455
+ fn persist_peer_state (
1456
+ & self , counterparty_node_id : PublicKey , peer_state : & PeerState ,
1457
+ ) -> Pin < Box < dyn Future < Output = Result < ( ) , lightning:: io:: Error > > + Send > > {
1458
+ let key = counterparty_node_id. to_string ( ) ;
1459
+ let encoded = peer_state. encode ( ) ;
1460
+ self . kv_store . write (
1461
+ LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
1462
+ LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE ,
1463
+ & key,
1464
+ encoded,
1465
+ )
1466
+ }
1467
+
1468
+ pub ( crate ) fn persist (
1469
+ & self ,
1470
+ ) -> Pin < Box < dyn Future < Output = Result < ( ) , lightning:: io:: Error > > + Send > > {
1471
+ let outer_state_lock = self . per_peer_state . read ( ) . unwrap ( ) ;
1472
+ let mut futures = Vec :: new ( ) ;
1473
+ for ( counterparty_node_id, inner_state_lock) in outer_state_lock. iter ( ) {
1474
+ let peer_state_lock = inner_state_lock. lock ( ) . unwrap ( ) ;
1475
+ let fut = self . persist_peer_state ( * counterparty_node_id, & * peer_state_lock) ;
1476
+ futures. push ( fut) ;
1477
+ }
1478
+
1479
+ // TODO: We should eventually persist in parallel, however, when we do, we probably want to
1480
+ // introduce some batching to upper-bound the number of requests inflight at any given
1481
+ // time.
1482
+ Box :: pin ( async move {
1483
+ let mut ret = Ok ( ( ) ) ;
1484
+ for fut in futures {
1485
+ let res = fut. await ;
1486
+ if res. is_err ( ) {
1487
+ ret = res;
1488
+ }
1489
+ }
1490
+ ret
1491
+ } )
1492
+ }
1493
+
1445
1494
pub ( crate ) fn peer_disconnected ( & self , counterparty_node_id : PublicKey ) {
1446
1495
let mut outer_state_lock = self . per_peer_state . write ( ) . unwrap ( ) ;
1447
1496
let is_prunable =
0 commit comments