1
1
use super :: LiquidityEvent ;
2
+
3
+ use crate :: persist:: {
4
+ LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_KEY ,
5
+ LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE ,
6
+ LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
7
+ } ;
2
8
use crate :: sync:: { Arc , Mutex } ;
3
9
10
+ use alloc:: boxed:: Box ;
4
11
use alloc:: collections:: VecDeque ;
5
12
use alloc:: vec:: Vec ;
6
13
7
14
use core:: future:: Future ;
15
+ use core:: pin:: Pin ;
8
16
use core:: task:: { Poll , Waker } ;
9
17
18
+ use lightning:: util:: persist:: KVStore ;
19
+ use lightning:: util:: ser:: { CollectionLength , MaybeReadable , Readable , Writeable , Writer } ;
20
+
10
21
/// The maximum queue size we allow before starting to drop events.
11
22
pub const MAX_EVENT_QUEUE_SIZE : usize = 1000 ;
12
23
@@ -15,17 +26,19 @@ pub(crate) struct EventQueue {
15
26
waker : Arc < Mutex < Option < Waker > > > ,
16
27
#[ cfg( feature = "std" ) ]
17
28
condvar : Arc < crate :: sync:: Condvar > ,
29
+ kv_store : Arc < dyn KVStore + Send + Sync > ,
18
30
}
19
31
20
32
impl EventQueue {
21
- pub fn new ( ) -> Self {
33
+ pub fn new ( kv_store : Arc < dyn KVStore + Send + Sync > ) -> Self {
22
34
let queue = Arc :: new ( Mutex :: new ( VecDeque :: new ( ) ) ) ;
23
35
let waker = Arc :: new ( Mutex :: new ( None ) ) ;
24
36
Self {
25
37
queue,
26
38
waker,
27
39
#[ cfg( feature = "std" ) ]
28
40
condvar : Arc :: new ( crate :: sync:: Condvar :: new ( ) ) ,
41
+ kv_store,
29
42
}
30
43
}
31
44
@@ -70,6 +83,20 @@ impl EventQueue {
70
83
pub fn notifier ( & self ) -> EventQueueNotifierGuard < ' _ > {
71
84
EventQueueNotifierGuard ( self )
72
85
}
86
+
87
+ pub fn persist (
88
+ & self ,
89
+ ) -> Pin < Box < dyn Future < Output = Result < ( ) , lightning:: io:: Error > > + Send > > {
90
+ let queue = self . queue . lock ( ) . unwrap ( ) ;
91
+ let encoded = EventQueueSerWrapper ( & queue) . encode ( ) ;
92
+
93
+ self . kv_store . write (
94
+ LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
95
+ LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE ,
96
+ LIQUIDITY_MANAGER_EVENT_QUEUE_PERSISTENCE_KEY ,
97
+ encoded,
98
+ )
99
+ }
73
100
}
74
101
75
102
// A guard type that will notify about new events when dropped.
@@ -122,6 +149,35 @@ impl Future for EventFuture {
122
149
}
123
150
}
124
151
152
+ pub ( crate ) struct EventQueueDeserWrapper ( pub VecDeque < LiquidityEvent > ) ;
153
+
154
+ impl Readable for EventQueueDeserWrapper {
155
+ fn read < R : lightning:: io:: Read > (
156
+ reader : & mut R ,
157
+ ) -> Result < Self , lightning:: ln:: msgs:: DecodeError > {
158
+ let len: CollectionLength = Readable :: read ( reader) ?;
159
+ let mut queue = VecDeque :: with_capacity ( len. 0 as usize ) ;
160
+ for _ in 0 ..len. 0 {
161
+ if let Some ( event) = MaybeReadable :: read ( reader) ? {
162
+ queue. push_back ( event) ;
163
+ }
164
+ }
165
+ Ok ( Self ( queue) )
166
+ }
167
+ }
168
+
169
+ struct EventQueueSerWrapper < ' a > ( & ' a VecDeque < LiquidityEvent > ) ;
170
+
171
+ impl Writeable for EventQueueSerWrapper < ' _ > {
172
+ fn write < W : Writer > ( & self , writer : & mut W ) -> Result < ( ) , lightning:: io:: Error > {
173
+ CollectionLength ( self . 0 . len ( ) as u64 ) . write ( writer) ?;
174
+ for e in self . 0 . iter ( ) {
175
+ e. write ( writer) ?;
176
+ }
177
+ Ok ( ( ) )
178
+ }
179
+ }
180
+
125
181
#[ cfg( test) ]
126
182
mod tests {
127
183
#[ tokio:: test]
@@ -131,10 +187,13 @@ mod tests {
131
187
use crate :: lsps0:: event:: LSPS0ClientEvent ;
132
188
use bitcoin:: secp256k1:: { PublicKey , Secp256k1 , SecretKey } ;
133
189
use core:: sync:: atomic:: { AtomicU16 , Ordering } ;
190
+ use lightning:: util:: persist:: KVStoreSyncWrapper ;
191
+ use lightning:: util:: test_utils:: TestStore ;
134
192
use std:: sync:: Arc ;
135
193
use std:: time:: Duration ;
136
194
137
- let event_queue = Arc :: new ( EventQueue :: new ( ) ) ;
195
+ let kv_store = Arc :: new ( KVStoreSyncWrapper ( Arc :: new ( TestStore :: new ( false ) ) ) ) ;
196
+ let event_queue = Arc :: new ( EventQueue :: new ( kv_store) ) ;
138
197
assert_eq ! ( event_queue. next_event( ) , None ) ;
139
198
140
199
let secp_ctx = Secp256k1 :: new ( ) ;
0 commit comments