@@ -15,6 +15,7 @@ use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1515use std:: sync:: { Arc , Mutex } ;
1616use std:: time:: Duration ;
1717
18+ use bdk_chain:: Merge ;
1819use bitcoin:: hashes:: { sha256, Hash , HashEngine , Hmac , HmacEngine } ;
1920use lightning:: io:: { self , Error , ErrorKind } ;
2021use lightning:: util:: persist:: { KVStore , KVStoreSync } ;
@@ -181,7 +182,7 @@ impl KVStoreSync for VssStore {
181182 }
182183
183184 fn remove (
184- & self , primary_namespace : & str , secondary_namespace : & str , key : & str , _lazy : bool ,
185+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str , lazy : bool ,
185186 ) -> io:: Result < ( ) > {
186187 let internal_runtime = self . internal_runtime . as_ref ( ) . ok_or_else ( || {
187188 debug_assert ! ( false , "Failed to access internal runtime" ) ;
@@ -203,6 +204,7 @@ impl KVStoreSync for VssStore {
203204 primary_namespace,
204205 secondary_namespace,
205206 key,
207+ lazy,
206208 )
207209 . await
208210 } ;
@@ -275,7 +277,7 @@ impl KVStore for VssStore {
275277 } )
276278 }
277279 fn remove (
278- & self , primary_namespace : & str , secondary_namespace : & str , key : & str , _lazy : bool ,
280+ & self , primary_namespace : & str , secondary_namespace : & str , key : & str , lazy : bool ,
279281 ) -> Pin < Box < dyn Future < Output = Result < ( ) , io:: Error > > + Send > > {
280282 let locking_key = self . build_locking_key ( primary_namespace, secondary_namespace, key) ;
281283 let ( inner_lock_ref, version) = self . get_new_version_and_lock_ref ( locking_key. clone ( ) ) ;
@@ -292,6 +294,7 @@ impl KVStore for VssStore {
292294 primary_namespace,
293295 secondary_namespace,
294296 key,
297+ lazy,
295298 )
296299 . await
297300 } )
@@ -321,6 +324,7 @@ struct VssStoreInner {
321324 // Per-key locks that ensures that we don't have concurrent writes to the same namespace/key.
322325 // The lock also encapsulates the latest written version per key.
323326 locks : Mutex < HashMap < String , Arc < tokio:: sync:: Mutex < u64 > > > > ,
327+ pending_lazy_deletes : Mutex < Vec < KeyValue > > ,
324328}
325329
326330impl VssStoreInner {
@@ -347,7 +351,8 @@ impl VssStoreInner {
347351
348352 let client = VssClient :: new_with_headers ( base_url, retry_policy, header_provider) ;
349353 let locks = Mutex :: new ( HashMap :: new ( ) ) ;
350- Self { client, store_id, storable_builder, key_obfuscator, locks }
354+ let pending_lazy_deletes = Mutex :: new ( Vec :: new ( ) ) ;
355+ Self { client, store_id, storable_builder, key_obfuscator, locks, pending_lazy_deletes }
351356 }
352357
353358 fn get_inner_lock_ref ( & self , locking_key : String ) -> Arc < tokio:: sync:: Mutex < u64 > > {
@@ -451,6 +456,12 @@ impl VssStoreInner {
451456 "write" ,
452457 ) ?;
453458
459+ let delete_items = self
460+ . pending_lazy_deletes
461+ . try_lock ( )
462+ . ok ( )
463+ . and_then ( |mut guard| guard. take ( ) )
464+ . unwrap_or_default ( ) ;
454465 self . execute_locked_write ( inner_lock_ref, locking_key, version, async move || {
455466 let obfuscated_key =
456467 self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
@@ -464,10 +475,15 @@ impl VssStoreInner {
464475 version: vss_version,
465476 value: storable. encode_to_vec( ) ,
466477 } ] ,
467- delete_items : vec ! [ ] ,
478+ delete_items : delete_items . clone ( ) ,
468479 } ;
469480
470481 self . client . put_object ( & request) . await . map_err ( |e| {
482+ // Restore delete items so they'll be retried on next write.
483+ if !delete_items. is_empty ( ) {
484+ self . pending_lazy_deletes . lock ( ) . unwrap ( ) . extend ( delete_items) ;
485+ }
486+
471487 let msg = format ! (
472488 "Failed to write to key {}/{}/{}: {}" ,
473489 primary_namespace, secondary_namespace, key, e
@@ -482,7 +498,7 @@ impl VssStoreInner {
482498
483499 async fn remove_internal (
484500 & self , inner_lock_ref : Arc < tokio:: sync:: Mutex < u64 > > , locking_key : String , version : u64 ,
485- primary_namespace : String , secondary_namespace : String , key : String ,
501+ primary_namespace : String , secondary_namespace : String , key : String , lazy : bool ,
486502 ) -> io:: Result < ( ) > {
487503 check_namespace_key_validity (
488504 & primary_namespace,
@@ -491,13 +507,19 @@ impl VssStoreInner {
491507 "remove" ,
492508 ) ?;
493509
510+ let obfuscated_key =
511+ self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
512+
513+ let key_value = KeyValue { key : obfuscated_key, version : -1 , value : vec ! [ ] } ;
514+ if lazy {
515+ let mut pending_lazy_deletes = self . pending_lazy_deletes . lock ( ) . unwrap ( ) ;
516+ pending_lazy_deletes. push ( key_value) ;
517+ return Ok ( ( ) ) ;
518+ }
519+
494520 self . execute_locked_write ( inner_lock_ref, locking_key, version, async move || {
495- let obfuscated_key =
496- self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
497- let request = DeleteObjectRequest {
498- store_id : self . store_id . clone ( ) ,
499- key_value : Some ( KeyValue { key : obfuscated_key, version : -1 , value : vec ! [ ] } ) ,
500- } ;
521+ let request =
522+ DeleteObjectRequest { store_id : self . store_id . clone ( ) , key_value : Some ( key_value) } ;
501523
502524 self . client . delete_object ( & request) . await . map_err ( |e| {
503525 let msg = format ! (
@@ -644,4 +666,87 @@ mod tests {
644666 do_read_write_remove_list_persist ( & vss_store) ;
645667 drop ( vss_store)
646668 }
669+
670+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
671+ async fn vss_lazy_delete ( ) {
672+ let vss_base_url = std:: env:: var ( "TEST_VSS_BASE_URL" ) . unwrap ( ) ;
673+ let mut rng = rng ( ) ;
674+ let rand_store_id: String = ( 0 ..7 ) . map ( |_| rng. sample ( Alphanumeric ) as char ) . collect ( ) ;
675+ let mut vss_seed = [ 0u8 ; 32 ] ;
676+ rng. fill_bytes ( & mut vss_seed) ;
677+ let header_provider = Arc :: new ( FixedHeaders :: new ( HashMap :: new ( ) ) ) ;
678+ let logger = Arc :: new ( Logger :: new_log_facade ( ) ) ;
679+ let runtime = Arc :: new ( Runtime :: new ( logger) . unwrap ( ) ) ;
680+ let vss_store =
681+ VssStore :: new ( vss_base_url, rand_store_id, vss_seed, header_provider, runtime) ;
682+
683+ let primary_namespace = "test_namespace" ;
684+ let secondary_namespace = "" ;
685+ let key_to_delete = "key_to_delete" ;
686+ let key_for_trigger = "key_for_trigger" ;
687+ let data_to_delete = b"data_to_delete" . to_vec ( ) ;
688+ let trigger_data = b"trigger_data" . to_vec ( ) ;
689+
690+ // Write the key that we'll later lazily delete
691+ KVStore :: write (
692+ & vss_store,
693+ primary_namespace,
694+ secondary_namespace,
695+ key_to_delete,
696+ data_to_delete. clone ( ) ,
697+ )
698+ . await
699+ . unwrap ( ) ;
700+
701+ // Verify the key exists
702+ let read_data =
703+ KVStore :: read ( & vss_store, primary_namespace, secondary_namespace, key_to_delete)
704+ . await
705+ . unwrap ( ) ;
706+ assert_eq ! ( read_data, data_to_delete) ;
707+
708+ // Perform a lazy delete
709+ KVStore :: remove ( & vss_store, primary_namespace, secondary_namespace, key_to_delete, true )
710+ . await
711+ . unwrap ( ) ;
712+
713+ // Verify the key still exists (lazy delete doesn't immediately remove it)
714+ let read_data =
715+ KVStore :: read ( & vss_store, primary_namespace, secondary_namespace, key_to_delete)
716+ . await
717+ . unwrap ( ) ;
718+ assert_eq ! ( read_data, data_to_delete) ;
719+
720+ // Verify the key is still in the list
721+ let keys = KVStore :: list ( & vss_store, primary_namespace, secondary_namespace) . await . unwrap ( ) ;
722+ assert ! ( keys. contains( & key_to_delete. to_string( ) ) ) ;
723+
724+ // Trigger the actual deletion by performing a write operation
725+ KVStore :: write (
726+ & vss_store,
727+ primary_namespace,
728+ secondary_namespace,
729+ key_for_trigger,
730+ trigger_data. clone ( ) ,
731+ )
732+ . await
733+ . unwrap ( ) ;
734+
735+ // Now verify the key is actually deleted
736+ let read_result =
737+ KVStore :: read ( & vss_store, primary_namespace, secondary_namespace, key_to_delete) . await ;
738+ assert ! ( read_result. is_err( ) ) ;
739+ assert_eq ! ( read_result. unwrap_err( ) . kind( ) , ErrorKind :: NotFound ) ;
740+
741+ // Verify the key is no longer in the list
742+ let keys = KVStore :: list ( & vss_store, primary_namespace, secondary_namespace) . await . unwrap ( ) ;
743+ assert ! ( !keys. contains( & key_to_delete. to_string( ) ) ) ;
744+
745+ // Verify the trigger key still exists
746+ let read_data =
747+ KVStore :: read ( & vss_store, primary_namespace, secondary_namespace, key_for_trigger)
748+ . await
749+ . unwrap ( ) ;
750+ assert_eq ! ( read_data, trigger_data) ;
751+ }
647752}
0 commit comments