@@ -11,7 +11,7 @@ use std::future::Future;
1111#[ cfg( test) ]
1212use std:: panic:: RefUnwindSafe ;
1313use std:: pin:: Pin ;
14- use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
14+ use std:: sync:: atomic:: { AtomicU64 , AtomicUsize , Ordering } ;
1515use std:: sync:: { Arc , Mutex } ;
1616use std:: time:: Duration ;
1717
@@ -44,13 +44,25 @@ type CustomRetryPolicy = FilteredRetryPolicy<
4444 Box < dyn Fn ( & VssError ) -> bool + ' static + Send + Sync > ,
4545> ;
4646
47+ // We set this to a small number of threads that would still allow to make some progress if one
48+ // would hit a blocking case
49+ const INTERNAL_RUNTIME_WORKERS : usize = 2 ;
50+ const VSS_IO_TIMEOUT : Duration = Duration :: from_secs ( 5 ) ;
51+
4752/// A [`KVStoreSync`] implementation that writes to and reads from a [VSS](https://github.com/lightningdevkit/vss-server/blob/main/README.md) backend.
4853pub struct VssStore {
4954 inner : Arc < VssStoreInner > ,
5055 // Version counter to ensure that writes are applied in the correct order. It is assumed that read and list
5156 // operations aren't sensitive to the order of execution.
5257 next_version : AtomicU64 ,
5358 runtime : Arc < Runtime > ,
59+ // A VSS-internal runtime we use to avoid any deadlocks we could hit when waiting on a spawned
60+ // blocking task to finish while the blocked thread had acquired the reactor. In particular,
61+ // this works around a previously-hit case where a concurrent call to
62+ // `PeerManager::process_pending_events` -> `ChannelManager::get_and_clear_pending_msg_events`
63+ // would deadlock when trying to acquire sync `Mutex` locks that are held by the thread
64+ // currently being blocked waiting on the VSS operation to finish.
65+ internal_runtime : Option < tokio:: runtime:: Runtime > ,
5466}
5567
5668impl VssStore {
@@ -60,7 +72,21 @@ impl VssStore {
6072 ) -> Self {
6173 let inner = Arc :: new ( VssStoreInner :: new ( base_url, store_id, vss_seed, header_provider) ) ;
6274 let next_version = AtomicU64 :: new ( 1 ) ;
63- Self { inner, next_version, runtime }
75+ let internal_runtime = Some (
76+ tokio:: runtime:: Builder :: new_multi_thread ( )
77+ . enable_all ( )
78+ . thread_name_fn ( || {
79+ static ATOMIC_ID : AtomicUsize = AtomicUsize :: new ( 0 ) ;
80+ let id = ATOMIC_ID . fetch_add ( 1 , Ordering :: SeqCst ) ;
81+ format ! ( "ldk-node-vss-runtime-{}" , id)
82+ } )
83+ . worker_threads ( INTERNAL_RUNTIME_WORKERS )
84+ . max_blocking_threads ( INTERNAL_RUNTIME_WORKERS )
85+ . build ( )
86+ . unwrap ( ) ,
87+ ) ;
88+
89+ Self { inner, next_version, runtime, internal_runtime }
6490 }
6591
6692 // Same logic as for the obfuscated keys below, but just for locking, using the plaintext keys
@@ -94,46 +120,122 @@ impl KVStoreSync for VssStore {
94120 fn read (
95121 & self , primary_namespace : & str , secondary_namespace : & str , key : & str ,
96122 ) -> io:: Result < Vec < u8 > > {
97- let fut = self . inner . read_internal ( primary_namespace, secondary_namespace, key) ;
98- self . runtime . block_on ( fut)
123+ let internal_runtime = self . internal_runtime . as_ref ( ) . ok_or_else ( || {
124+ debug_assert ! ( false , "Failed to access internal runtime" ) ;
125+ let msg = format ! ( "Failed to access internal runtime" ) ;
126+ Error :: new ( ErrorKind :: Other , msg)
127+ } ) ?;
128+ let primary_namespace = primary_namespace. to_string ( ) ;
129+ let secondary_namespace = secondary_namespace. to_string ( ) ;
130+ let key = key. to_string ( ) ;
131+ let inner = Arc :: clone ( & self . inner ) ;
132+ let fut =
133+ async move { inner. read_internal ( primary_namespace, secondary_namespace, key) . await } ;
134+ // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
135+ // times out.
136+ let spawned_fut = internal_runtime. spawn ( async move {
137+ tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
138+ let msg = "VssStore::read timed out" ;
139+ Error :: new ( ErrorKind :: Other , msg)
140+ } )
141+ } ) ;
142+ self . runtime . block_on ( spawned_fut) . expect ( "We should always finish" ) ?
99143 }
100144
101145 fn write (
102146 & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : Vec < u8 > ,
103147 ) -> io:: Result < ( ) > {
104- let locking_key = self . build_locking_key ( primary_namespace, secondary_namespace, key) ;
148+ let internal_runtime = self . internal_runtime . as_ref ( ) . ok_or_else ( || {
149+ debug_assert ! ( false , "Failed to access internal runtime" ) ;
150+ let msg = format ! ( "Failed to access internal runtime" ) ;
151+ Error :: new ( ErrorKind :: Other , msg)
152+ } ) ?;
153+ let primary_namespace = primary_namespace. to_string ( ) ;
154+ let secondary_namespace = secondary_namespace. to_string ( ) ;
155+ let key = key. to_string ( ) ;
156+ let inner = Arc :: clone ( & self . inner ) ;
157+ let locking_key = self . build_locking_key ( & primary_namespace, & secondary_namespace, & key) ;
105158 let ( inner_lock_ref, version) = self . get_new_version_and_lock_ref ( locking_key. clone ( ) ) ;
106- let fut = self . inner . write_internal (
107- inner_lock_ref,
108- locking_key,
109- version,
110- primary_namespace,
111- secondary_namespace,
112- key,
113- buf,
114- ) ;
115- self . runtime . block_on ( fut)
159+ let fut = async move {
160+ inner
161+ . write_internal (
162+ inner_lock_ref,
163+ locking_key,
164+ version,
165+ primary_namespace,
166+ secondary_namespace,
167+ key,
168+ buf,
169+ )
170+ . await
171+ } ;
172+ // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
173+ // times out.
174+ let spawned_fut = internal_runtime. spawn ( async move {
175+ tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
176+ let msg = "VssStore::write timed out" ;
177+ Error :: new ( ErrorKind :: Other , msg)
178+ } )
179+ } ) ;
180+ self . runtime . block_on ( spawned_fut) . expect ( "We should always finish" ) ?
116181 }
117182
118183 fn remove (
119184 & self , primary_namespace : & str , secondary_namespace : & str , key : & str ,
120185 ) -> io:: Result < ( ) > {
121- let locking_key = self . build_locking_key ( primary_namespace, secondary_namespace, key) ;
186+ let internal_runtime = self . internal_runtime . as_ref ( ) . ok_or_else ( || {
187+ debug_assert ! ( false , "Failed to access internal runtime" ) ;
188+ let msg = format ! ( "Failed to access internal runtime" ) ;
189+ Error :: new ( ErrorKind :: Other , msg)
190+ } ) ?;
191+ let primary_namespace = primary_namespace. to_string ( ) ;
192+ let secondary_namespace = secondary_namespace. to_string ( ) ;
193+ let key = key. to_string ( ) ;
194+ let inner = Arc :: clone ( & self . inner ) ;
195+ let locking_key = self . build_locking_key ( & primary_namespace, & secondary_namespace, & key) ;
122196 let ( inner_lock_ref, version) = self . get_new_version_and_lock_ref ( locking_key. clone ( ) ) ;
123- let fut = self . inner . remove_internal (
124- inner_lock_ref,
125- locking_key,
126- version,
127- primary_namespace,
128- secondary_namespace,
129- key,
130- ) ;
131- self . runtime . block_on ( fut)
197+ let fut = async move {
198+ inner
199+ . remove_internal (
200+ inner_lock_ref,
201+ locking_key,
202+ version,
203+ primary_namespace,
204+ secondary_namespace,
205+ key,
206+ )
207+ . await
208+ } ;
209+ // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
210+ // times out.
211+ let spawned_fut = internal_runtime. spawn ( async move {
212+ tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
213+ let msg = "VssStore::remove timed out" ;
214+ Error :: new ( ErrorKind :: Other , msg)
215+ } )
216+ } ) ;
217+ self . runtime . block_on ( spawned_fut) . expect ( "We should always finish" ) ?
132218 }
133219
134220 fn list ( & self , primary_namespace : & str , secondary_namespace : & str ) -> io:: Result < Vec < String > > {
135- let fut = self . inner . list_internal ( primary_namespace, secondary_namespace) ;
136- self . runtime . block_on ( fut)
221+ let internal_runtime = self . internal_runtime . as_ref ( ) . ok_or_else ( || {
222+ debug_assert ! ( false , "Failed to access internal runtime" ) ;
223+ let msg = format ! ( "Failed to access internal runtime" ) ;
224+ Error :: new ( ErrorKind :: Other , msg)
225+ } ) ?;
226+ let primary_namespace = primary_namespace. to_string ( ) ;
227+ let secondary_namespace = secondary_namespace. to_string ( ) ;
228+ let inner = Arc :: clone ( & self . inner ) ;
229+ let fut = async move { inner. list_internal ( primary_namespace, secondary_namespace) . await } ;
230+ // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always
231+ // times out.
232+ let spawned_fut = internal_runtime. spawn ( async move {
233+ tokio:: time:: timeout ( VSS_IO_TIMEOUT , fut) . await . map_err ( |_| {
234+ let msg = "VssStore::list timed out" ;
235+ Error :: new ( ErrorKind :: Other , msg)
236+ } )
237+ } ) ;
238+ self . runtime . block_on ( spawned_fut) . expect ( "We should always finish" ) ?
137239 }
138240}
139241
@@ -145,9 +247,9 @@ impl KVStore for VssStore {
145247 let secondary_namespace = secondary_namespace. to_string ( ) ;
146248 let key = key. to_string ( ) ;
147249 let inner = Arc :: clone ( & self . inner ) ;
148- Box :: pin ( async move {
149- inner. read_internal ( & primary_namespace, & secondary_namespace, & key) . await
150- } )
250+ Box :: pin (
251+ async move { inner. read_internal ( primary_namespace, secondary_namespace, key) . await } ,
252+ )
151253 }
152254 fn write (
153255 & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : Vec < u8 > ,
@@ -164,9 +266,9 @@ impl KVStore for VssStore {
164266 inner_lock_ref,
165267 locking_key,
166268 version,
167- & primary_namespace,
168- & secondary_namespace,
169- & key,
269+ primary_namespace,
270+ secondary_namespace,
271+ key,
170272 buf,
171273 )
172274 . await
@@ -187,9 +289,9 @@ impl KVStore for VssStore {
187289 inner_lock_ref,
188290 locking_key,
189291 version,
190- & primary_namespace,
191- & secondary_namespace,
192- & key,
292+ primary_namespace,
293+ secondary_namespace,
294+ key,
193295 )
194296 . await
195297 } )
@@ -200,7 +302,14 @@ impl KVStore for VssStore {
200302 let primary_namespace = primary_namespace. to_string ( ) ;
201303 let secondary_namespace = secondary_namespace. to_string ( ) ;
202304 let inner = Arc :: clone ( & self . inner ) ;
203- Box :: pin ( async move { inner. list_internal ( & primary_namespace, & secondary_namespace) . await } )
305+ Box :: pin ( async move { inner. list_internal ( primary_namespace, secondary_namespace) . await } )
306+ }
307+ }
308+
309+ impl Drop for VssStore {
310+ fn drop ( & mut self ) {
311+ let internal_runtime = self . internal_runtime . take ( ) ;
312+ tokio:: task:: block_in_place ( move || drop ( internal_runtime) ) ;
204313 }
205314}
206315
@@ -300,11 +409,12 @@ impl VssStoreInner {
300409 }
301410
302411 async fn read_internal (
303- & self , primary_namespace : & str , secondary_namespace : & str , key : & str ,
412+ & self , primary_namespace : String , secondary_namespace : String , key : String ,
304413 ) -> io:: Result < Vec < u8 > > {
305- check_namespace_key_validity ( primary_namespace, secondary_namespace, Some ( key) , "read" ) ?;
414+ check_namespace_key_validity ( & primary_namespace, & secondary_namespace, Some ( & key) , "read" ) ?;
306415
307- let obfuscated_key = self . build_obfuscated_key ( primary_namespace, secondary_namespace, key) ;
416+ let obfuscated_key =
417+ self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
308418 let request = GetObjectRequest { store_id : self . store_id . clone ( ) , key : obfuscated_key } ;
309419 let resp = self . client . get_object ( & request) . await . map_err ( |e| {
310420 let msg = format ! (
@@ -332,13 +442,18 @@ impl VssStoreInner {
332442
333443 async fn write_internal (
334444 & self , inner_lock_ref : Arc < tokio:: sync:: Mutex < u64 > > , locking_key : String , version : u64 ,
335- primary_namespace : & str , secondary_namespace : & str , key : & str , buf : Vec < u8 > ,
445+ primary_namespace : String , secondary_namespace : String , key : String , buf : Vec < u8 > ,
336446 ) -> io:: Result < ( ) > {
337- check_namespace_key_validity ( primary_namespace, secondary_namespace, Some ( key) , "write" ) ?;
447+ check_namespace_key_validity (
448+ & primary_namespace,
449+ & secondary_namespace,
450+ Some ( & key) ,
451+ "write" ,
452+ ) ?;
338453
339454 self . execute_locked_write ( inner_lock_ref, locking_key, version, async move || {
340455 let obfuscated_key =
341- self . build_obfuscated_key ( primary_namespace, secondary_namespace, key) ;
456+ self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
342457 let vss_version = -1 ;
343458 let storable = self . storable_builder . build ( buf, vss_version) ;
344459 let request = PutObjectRequest {
@@ -367,13 +482,18 @@ impl VssStoreInner {
367482
368483 async fn remove_internal (
369484 & self , inner_lock_ref : Arc < tokio:: sync:: Mutex < u64 > > , locking_key : String , version : u64 ,
370- primary_namespace : & str , secondary_namespace : & str , key : & str ,
485+ primary_namespace : String , secondary_namespace : String , key : String ,
371486 ) -> io:: Result < ( ) > {
372- check_namespace_key_validity ( primary_namespace, secondary_namespace, Some ( key) , "remove" ) ?;
487+ check_namespace_key_validity (
488+ & primary_namespace,
489+ & secondary_namespace,
490+ Some ( & key) ,
491+ "remove" ,
492+ ) ?;
373493
374494 self . execute_locked_write ( inner_lock_ref, locking_key, version, async move || {
375495 let obfuscated_key =
376- self . build_obfuscated_key ( primary_namespace, secondary_namespace, key) ;
496+ self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
377497 let request = DeleteObjectRequest {
378498 store_id : self . store_id . clone ( ) ,
379499 key_value : Some ( KeyValue { key : obfuscated_key, version : -1 , value : vec ! [ ] } ) ,
@@ -393,12 +513,12 @@ impl VssStoreInner {
393513 }
394514
395515 async fn list_internal (
396- & self , primary_namespace : & str , secondary_namespace : & str ,
516+ & self , primary_namespace : String , secondary_namespace : String ,
397517 ) -> io:: Result < Vec < String > > {
398- check_namespace_key_validity ( primary_namespace, secondary_namespace, None , "list" ) ?;
518+ check_namespace_key_validity ( & primary_namespace, & secondary_namespace, None , "list" ) ?;
399519
400520 let keys =
401- self . list_all_keys ( primary_namespace, secondary_namespace) . await . map_err ( |e| {
521+ self . list_all_keys ( & primary_namespace, & secondary_namespace) . await . map_err ( |e| {
402522 let msg = format ! (
403523 "Failed to retrieve keys in namespace: {}/{} : {}" ,
404524 primary_namespace, secondary_namespace, e
0 commit comments