@@ -102,12 +102,14 @@ impl StorageService {
102102 ) -> anyhow:: Result < Self > {
103103 let high_volume_backend = create_backend ( high_volume_config) . await ?;
104104 let long_term_backend = create_backend ( long_term_config) . await ?;
105+ Ok ( Self :: from_backends ( high_volume_backend, long_term_backend) )
106+ }
105107
106- let inner = StorageServiceInner {
108+ fn from_backends ( high_volume_backend : BoxedBackend , long_term_backend : BoxedBackend ) -> Self {
109+ Self ( Arc :: new ( StorageServiceInner {
107110 high_volume_backend,
108111 long_term_backend,
109- } ;
110- Ok ( Self ( Arc :: new ( inner) ) )
112+ } ) )
111113 }
112114
113115 /// Creates or overwrites an object.
@@ -430,6 +432,151 @@ mod tests {
430432 assert_eq ! ( file_contents. as_ref( ) , b"oh hai!" ) ;
431433 }
432434
435+ fn make_localfs_service ( ) -> ( StorageService , tempfile:: TempDir , tempfile:: TempDir ) {
436+ let hv_dir = tempfile:: tempdir ( ) . unwrap ( ) ;
437+ let lt_dir = tempfile:: tempdir ( ) . unwrap ( ) ;
438+ let hv = Box :: new ( backend:: local_fs:: LocalFsBackend :: new ( hv_dir. path ( ) ) ) ;
439+ let lt = Box :: new ( backend:: local_fs:: LocalFsBackend :: new ( lt_dir. path ( ) ) ) ;
440+ ( StorageService :: from_backends ( hv, lt) , hv_dir, lt_dir)
441+ }
442+
443+ // --- Tombstone inconsistency tests ---
444+
445+ /// A backend where put_object always fails, but reads/deletes work normally.
446+ #[ derive( Debug ) ]
447+ struct FailingPutBackend ( backend:: local_fs:: LocalFsBackend ) ;
448+
449+ #[ async_trait:: async_trait]
450+ impl backend:: common:: Backend for FailingPutBackend {
451+ fn name ( & self ) -> & ' static str {
452+ "failing-put"
453+ }
454+
455+ async fn put_object (
456+ & self ,
457+ _id : & ObjectId ,
458+ _metadata : & Metadata ,
459+ _stream : PayloadStream ,
460+ ) -> ServiceResult < ( ) > {
461+ Err ( ServiceError :: Io ( std:: io:: Error :: new (
462+ std:: io:: ErrorKind :: ConnectionRefused ,
463+ "simulated tombstone write failure" ,
464+ ) ) )
465+ }
466+
467+ async fn get_object (
468+ & self ,
469+ id : & ObjectId ,
470+ ) -> ServiceResult < Option < ( Metadata , PayloadStream ) > > {
471+ self . 0 . get_object ( id) . await
472+ }
473+
474+ async fn delete_object ( & self , id : & ObjectId ) -> ServiceResult < ( ) > {
475+ self . 0 . delete_object ( id) . await
476+ }
477+ }
478+
479+ /// If the tombstone write to the high-volume backend fails after the long-term
480+ /// write succeeds, the long-term object must be cleaned up so we never leave
481+ /// an unreachable orphan in long-term storage.
482+ #[ tokio:: test]
483+ async fn no_orphan_when_tombstone_write_fails ( ) {
484+ let lt_dir = tempfile:: tempdir ( ) . unwrap ( ) ;
485+ let lt_backend_for_inspection = backend:: local_fs:: LocalFsBackend :: new ( lt_dir. path ( ) ) ;
486+
487+ // High-volume backend always fails on put (simulating BigTable being down).
488+ // This means the tombstone write will fail after the long-term write succeeds.
489+ let hv: BoxedBackend = Box :: new ( FailingPutBackend ( backend:: local_fs:: LocalFsBackend :: new (
490+ tempfile:: tempdir ( ) . unwrap ( ) . path ( ) ,
491+ ) ) ) ;
492+ let lt: BoxedBackend = Box :: new ( backend:: local_fs:: LocalFsBackend :: new ( lt_dir. path ( ) ) ) ;
493+ let service = StorageService :: from_backends ( hv, lt) ;
494+
495+ let payload = vec ! [ 0xABu8 ; 2 * 1024 * 1024 ] ; // 2 MiB -> long-term path
496+ let result = service
497+ . insert_object (
498+ make_context ( ) ,
499+ Some ( "orphan-test" . into ( ) ) ,
500+ & Default :: default ( ) ,
501+ make_stream ( & payload) ,
502+ )
503+ . await ;
504+
505+ // The insert should fail (tombstone write failed)
506+ assert ! ( result. is_err( ) ) ;
507+
508+ // The long-term object must have been cleaned up — no orphan
509+ let id = ObjectId :: from_parts (
510+ "testing" . into ( ) ,
511+ Scopes :: from_iter ( [ Scope :: create ( "testing" , "value" ) . unwrap ( ) ] ) ,
512+ "orphan-test" . into ( ) ,
513+ ) ;
514+ let orphan = lt_backend_for_inspection. get_object ( & id) . await . unwrap ( ) ;
515+ assert ! (
516+ orphan. is_none( ) ,
517+ "long-term object was not cleaned up after tombstone write failure"
518+ ) ;
519+ }
520+
521+ /// If a tombstone exists in high-volume but the corresponding object is
522+ /// missing from long-term storage (e.g. due to a race condition or partial
523+ /// cleanup), reads should gracefully return None rather than error.
524+ #[ tokio:: test]
525+ async fn orphan_tombstone_returns_none_on_get ( ) {
526+ let ( service, _hv_dir, lt_dir) = make_localfs_service ( ) ;
527+ let payload = vec ! [ 0xCDu8 ; 2 * 1024 * 1024 ] ; // 2 MiB
528+
529+ let id = service
530+ . insert_object (
531+ make_context ( ) ,
532+ Some ( "orphan-tombstone" . into ( ) ) ,
533+ & Default :: default ( ) ,
534+ make_stream ( & payload) ,
535+ )
536+ . await
537+ . unwrap ( ) ;
538+
539+ // Manually delete the long-term object, leaving an orphan tombstone
540+ let lt_backend = backend:: local_fs:: LocalFsBackend :: new ( lt_dir. path ( ) ) ;
541+ lt_backend. delete_object ( & id) . await . unwrap ( ) ;
542+
543+ // get_object should gracefully return None, not error
544+ let result = service. get_object ( & id) . await . unwrap ( ) ;
545+ assert ! (
546+ result. is_none( ) ,
547+ "orphan tombstone should resolve to None, not return the tombstone"
548+ ) ;
549+ }
550+
551+ /// Same as above but for get_metadata — an orphan tombstone should return
552+ /// None rather than exposing the tombstone metadata to callers.
553+ #[ tokio:: test]
554+ async fn orphan_tombstone_returns_none_on_get_metadata ( ) {
555+ let ( service, _hv_dir, lt_dir) = make_localfs_service ( ) ;
556+ let payload = vec ! [ 0xEFu8 ; 2 * 1024 * 1024 ] ; // 2 MiB
557+
558+ let id = service
559+ . insert_object (
560+ make_context ( ) ,
561+ Some ( "orphan-tombstone-meta" . into ( ) ) ,
562+ & Default :: default ( ) ,
563+ make_stream ( & payload) ,
564+ )
565+ . await
566+ . unwrap ( ) ;
567+
568+ // Manually delete the long-term object
569+ let lt_backend = backend:: local_fs:: LocalFsBackend :: new ( lt_dir. path ( ) ) ;
570+ lt_backend. delete_object ( & id) . await . unwrap ( ) ;
571+
572+ // get_metadata should gracefully return None
573+ let result = service. get_metadata ( & id) . await . unwrap ( ) ;
574+ assert ! (
575+ result. is_none( ) ,
576+ "orphan tombstone metadata should resolve to None"
577+ ) ;
578+ }
579+
433580 #[ tokio:: test]
434581 async fn test_tombstone_redirect_and_delete ( ) {
435582 let high_volume = StorageConfig :: BigTable {
0 commit comments