@@ -97,24 +97,22 @@ impl VssStore {
9797 let ( data_encryption_key, obfuscation_master_key) =
9898 derive_data_encryption_and_obfuscation_keys ( & vss_seed) ;
9999 let key_obfuscator = KeyObfuscator :: new ( obfuscation_master_key) ;
100- let retry_policy = ExponentialBackoffRetryPolicy :: new ( Duration :: from_millis ( 10 ) )
101- . with_max_attempts ( 100 )
102- . with_max_total_delay ( Duration :: from_secs ( 60 ) )
103- . with_max_jitter ( Duration :: from_millis ( 50 ) )
104- . skip_retry_on_error ( Box :: new ( |e : & VssError | {
105- matches ! (
106- e,
107- VssError :: NoSuchKeyError ( ..)
108- | VssError :: InvalidRequestError ( ..)
109- | VssError :: ConflictError ( ..)
110- )
111- } ) as _ ) ;
112100
113- let client = VssClient :: new_with_headers ( base_url, retry_policy, header_provider) ;
101+ let sync_retry_policy = retry_policy ( ) ;
102+ let blocking_client = VssClient :: new_with_headers (
103+ base_url. clone ( ) ,
104+ sync_retry_policy,
105+ header_provider. clone ( ) ,
106+ ) ;
107+
108+ let async_retry_policy = retry_policy ( ) ;
109+ let async_client =
110+ VssClient :: new_with_headers ( base_url, async_retry_policy, header_provider) ;
114111
115112 let inner = Arc :: new ( VssStoreInner :: new (
116113 schema_version,
117- client,
114+ blocking_client,
115+ async_client,
118116 store_id,
119117 data_encryption_key,
120118 key_obfuscator,
@@ -163,8 +161,11 @@ impl KVStoreSync for VssStore {
163161 let secondary_namespace = secondary_namespace. to_string ( ) ;
164162 let key = key. to_string ( ) ;
165163 let inner = Arc :: clone ( & self . inner ) ;
166- let fut =
167- async move { inner. read_internal ( primary_namespace, secondary_namespace, key) . await } ;
164+ let fut = async move {
165+ inner
166+ . read_internal ( & inner. blocking_client , primary_namespace, secondary_namespace, key)
167+ . await
168+ } ;
168169 tokio:: task:: block_in_place ( move || internal_runtime. block_on ( fut) )
169170 }
170171
@@ -185,6 +186,7 @@ impl KVStoreSync for VssStore {
185186 let fut = async move {
186187 inner
187188 . write_internal (
189+ & inner. blocking_client ,
188190 inner_lock_ref,
189191 locking_key,
190192 version,
@@ -215,6 +217,7 @@ impl KVStoreSync for VssStore {
215217 let fut = async move {
216218 inner
217219 . remove_internal (
220+ & inner. blocking_client ,
218221 inner_lock_ref,
219222 locking_key,
220223 version,
@@ -236,7 +239,11 @@ impl KVStoreSync for VssStore {
236239 let primary_namespace = primary_namespace. to_string ( ) ;
237240 let secondary_namespace = secondary_namespace. to_string ( ) ;
238241 let inner = Arc :: clone ( & self . inner ) ;
239- let fut = async move { inner. list_internal ( primary_namespace, secondary_namespace) . await } ;
242+ let fut = async move {
243+ inner
244+ . list_internal ( & inner. blocking_client , primary_namespace, secondary_namespace)
245+ . await
246+ } ;
240247 tokio:: task:: block_in_place ( move || internal_runtime. block_on ( fut) )
241248 }
242249}
@@ -249,9 +256,11 @@ impl KVStore for VssStore {
249256 let secondary_namespace = secondary_namespace. to_string ( ) ;
250257 let key = key. to_string ( ) ;
251258 let inner = Arc :: clone ( & self . inner ) ;
252- Box :: pin (
253- async move { inner. read_internal ( primary_namespace, secondary_namespace, key) . await } ,
254- )
259+ Box :: pin ( async move {
260+ inner
261+ . read_internal ( & inner. async_client , primary_namespace, secondary_namespace, key)
262+ . await
263+ } )
255264 }
256265 fn write (
257266 & self , primary_namespace : & str , secondary_namespace : & str , key : & str , buf : Vec < u8 > ,
@@ -265,6 +274,7 @@ impl KVStore for VssStore {
265274 Box :: pin ( async move {
266275 inner
267276 . write_internal (
277+ & inner. async_client ,
268278 inner_lock_ref,
269279 locking_key,
270280 version,
@@ -288,6 +298,7 @@ impl KVStore for VssStore {
288298 Box :: pin ( async move {
289299 inner
290300 . remove_internal (
301+ & inner. async_client ,
291302 inner_lock_ref,
292303 locking_key,
293304 version,
@@ -304,7 +315,9 @@ impl KVStore for VssStore {
304315 let primary_namespace = primary_namespace. to_string ( ) ;
305316 let secondary_namespace = secondary_namespace. to_string ( ) ;
306317 let inner = Arc :: clone ( & self . inner ) ;
307- Box :: pin ( async move { inner. list_internal ( primary_namespace, secondary_namespace) . await } )
318+ Box :: pin ( async move {
319+ inner. list_internal ( & inner. async_client , primary_namespace, secondary_namespace) . await
320+ } )
308321 }
309322}
310323
@@ -317,7 +330,10 @@ impl Drop for VssStore {
317330
318331struct VssStoreInner {
319332 schema_version : VssSchemaVersion ,
320- client : VssClient < CustomRetryPolicy > ,
333+ blocking_client : VssClient < CustomRetryPolicy > ,
334+ // A secondary client that will only be used for async persistence via `KVStore`, to ensure TCP
335+ // connections aren't shared between our outer and the internal runtime.
336+ async_client : VssClient < CustomRetryPolicy > ,
321337 store_id : String ,
322338 data_encryption_key : [ u8 ; 32 ] ,
323339 key_obfuscator : KeyObfuscator ,
@@ -328,11 +344,20 @@ struct VssStoreInner {
328344
329345impl VssStoreInner {
330346 pub ( crate ) fn new (
331- schema_version : VssSchemaVersion , client : VssClient < CustomRetryPolicy > , store_id : String ,
347+ schema_version : VssSchemaVersion , blocking_client : VssClient < CustomRetryPolicy > ,
348+ async_client : VssClient < CustomRetryPolicy > , store_id : String ,
332349 data_encryption_key : [ u8 ; 32 ] , key_obfuscator : KeyObfuscator ,
333350 ) -> Self {
334351 let locks = Mutex :: new ( HashMap :: new ( ) ) ;
335- Self { schema_version, client, store_id, data_encryption_key, key_obfuscator, locks }
352+ Self {
353+ schema_version,
354+ blocking_client,
355+ async_client,
356+ store_id,
357+ data_encryption_key,
358+ key_obfuscator,
359+ locks,
360+ }
336361 }
337362
338363 fn get_inner_lock_ref ( & self , locking_key : String ) -> Arc < tokio:: sync:: Mutex < u64 > > {
@@ -392,7 +417,8 @@ impl VssStoreInner {
392417 }
393418
394419 async fn list_all_keys (
395- & self , primary_namespace : & str , secondary_namespace : & str ,
420+ & self , client : & VssClient < CustomRetryPolicy > , primary_namespace : & str ,
421+ secondary_namespace : & str ,
396422 ) -> io:: Result < Vec < String > > {
397423 let mut page_token = None ;
398424 let mut keys = vec ! [ ] ;
@@ -405,7 +431,7 @@ impl VssStoreInner {
405431 page_size : None ,
406432 } ;
407433
408- let response = self . client . list_key_versions ( & request) . await . map_err ( |e| {
434+ let response = client. list_key_versions ( & request) . await . map_err ( |e| {
409435 let msg = format ! (
410436 "Failed to list keys in {}/{}: {}" ,
411437 primary_namespace, secondary_namespace, e
@@ -422,13 +448,14 @@ impl VssStoreInner {
422448 }
423449
424450 async fn read_internal (
425- & self , primary_namespace : String , secondary_namespace : String , key : String ,
451+ & self , client : & VssClient < CustomRetryPolicy > , primary_namespace : String ,
452+ secondary_namespace : String , key : String ,
426453 ) -> io:: Result < Vec < u8 > > {
427454 check_namespace_key_validity ( & primary_namespace, & secondary_namespace, Some ( & key) , "read" ) ?;
428455
429456 let store_key = self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
430457 let request = GetObjectRequest { store_id : self . store_id . clone ( ) , key : store_key. clone ( ) } ;
431- let resp = self . client . get_object ( & request) . await . map_err ( |e| {
458+ let resp = client. get_object ( & request) . await . map_err ( |e| {
432459 let msg = format ! (
433460 "Failed to read from key {}/{}/{}: {}" ,
434461 primary_namespace, secondary_namespace, key, e
@@ -457,8 +484,9 @@ impl VssStoreInner {
457484 }
458485
459486 async fn write_internal (
460- & self , inner_lock_ref : Arc < tokio:: sync:: Mutex < u64 > > , locking_key : String , version : u64 ,
461- primary_namespace : String , secondary_namespace : String , key : String , buf : Vec < u8 > ,
487+ & self , client : & VssClient < CustomRetryPolicy > , inner_lock_ref : Arc < tokio:: sync:: Mutex < u64 > > ,
488+ locking_key : String , version : u64 , primary_namespace : String , secondary_namespace : String ,
489+ key : String , buf : Vec < u8 > ,
462490 ) -> io:: Result < ( ) > {
463491 check_namespace_key_validity (
464492 & primary_namespace,
@@ -486,7 +514,7 @@ impl VssStoreInner {
486514 } ;
487515
488516 self . execute_locked_write ( inner_lock_ref, locking_key, version, async move || {
489- self . client . put_object ( & request) . await . map_err ( |e| {
517+ client. put_object ( & request) . await . map_err ( |e| {
490518 let msg = format ! (
491519 "Failed to write to key {}/{}/{}: {}" ,
492520 primary_namespace, secondary_namespace, key, e
@@ -500,8 +528,9 @@ impl VssStoreInner {
500528 }
501529
502530 async fn remove_internal (
503- & self , inner_lock_ref : Arc < tokio:: sync:: Mutex < u64 > > , locking_key : String , version : u64 ,
504- primary_namespace : String , secondary_namespace : String , key : String ,
531+ & self , client : & VssClient < CustomRetryPolicy > , inner_lock_ref : Arc < tokio:: sync:: Mutex < u64 > > ,
532+ locking_key : String , version : u64 , primary_namespace : String , secondary_namespace : String ,
533+ key : String ,
505534 ) -> io:: Result < ( ) > {
506535 check_namespace_key_validity (
507536 & primary_namespace,
@@ -518,7 +547,7 @@ impl VssStoreInner {
518547 key_value : Some ( KeyValue { key : obfuscated_key, version : -1 , value : vec ! [ ] } ) ,
519548 } ;
520549
521- self . client . delete_object ( & request) . await . map_err ( |e| {
550+ client. delete_object ( & request) . await . map_err ( |e| {
522551 let msg = format ! (
523552 "Failed to delete key {}/{}/{}: {}" ,
524553 primary_namespace, secondary_namespace, key, e
@@ -532,12 +561,15 @@ impl VssStoreInner {
532561 }
533562
534563 async fn list_internal (
535- & self , primary_namespace : String , secondary_namespace : String ,
564+ & self , client : & VssClient < CustomRetryPolicy > , primary_namespace : String ,
565+ secondary_namespace : String ,
536566 ) -> io:: Result < Vec < String > > {
537567 check_namespace_key_validity ( & primary_namespace, & secondary_namespace, None , "list" ) ?;
538568
539- let keys =
540- self . list_all_keys ( & primary_namespace, & secondary_namespace) . await . map_err ( |e| {
569+ let keys = self
570+ . list_all_keys ( client, & primary_namespace, & secondary_namespace)
571+ . await
572+ . map_err ( |e| {
541573 let msg = format ! (
542574 "Failed to retrieve keys in namespace: {}/{} : {}" ,
543575 primary_namespace, secondary_namespace, e
@@ -606,6 +638,21 @@ fn derive_data_encryption_and_obfuscation_keys(vss_seed: &[u8; 32]) -> ([u8; 32]
606638 ( k1, k2)
607639}
608640
641+ fn retry_policy ( ) -> CustomRetryPolicy {
642+ ExponentialBackoffRetryPolicy :: new ( Duration :: from_millis ( 10 ) )
643+ . with_max_attempts ( 100 )
644+ . with_max_total_delay ( Duration :: from_secs ( 60 ) )
645+ . with_max_jitter ( Duration :: from_millis ( 50 ) )
646+ . skip_retry_on_error ( Box :: new ( |e : & VssError | {
647+ matches ! (
648+ e,
649+ VssError :: NoSuchKeyError ( ..)
650+ | VssError :: InvalidRequestError ( ..)
651+ | VssError :: ConflictError ( ..)
652+ )
653+ } ) as _ )
654+ }
655+
609656/// A source for generating entropy/randomness using [`rand`].
610657pub ( crate ) struct RandEntropySource ;
611658
0 commit comments