@@ -45,11 +45,15 @@ type CustomRetryPolicy = FilteredRetryPolicy<
4545 Box < dyn Fn ( & VssError ) -> bool + ' static + Send + Sync > ,
4646> ;
4747
48+ #[ derive( Debug , PartialEq ) ]
4849enum VssSchemaVersion {
4950 // The initial schema version.
5051 // This used an empty `aad` and unobfuscated `primary_namespace`/`secondary_namespace`s in the
5152 // stored key.
5253 V0 ,
54+ // The second deployed schema version.
55+ // Here we started to obfuscate the primary and secondary namespaces and the obfuscated `store_key` (`obfuscate(primary_namespace#secondary_namespace)#obfuscate(key)`) is now used as `aad` for encryption, ensuring that the encrypted blobs commit to the key they're stored under.
56+ V1 ,
5357}
5458
5559// We set this to a small number of threads that would still allow to make some progress if one
@@ -324,9 +328,10 @@ impl Drop for VssStore {
324328}
325329
326330struct VssStoreInner {
331+ schema_version : VssSchemaVersion ,
327332 client : VssClient < CustomRetryPolicy > ,
328333 store_id : String ,
329- storable_builder : StorableBuilder < RandEntropySource > ,
334+ data_encryption_key : [ u8 ; 32 ] ,
330335 key_obfuscator : KeyObfuscator ,
331336 // Per-key locks that ensures that we don't have concurrent writes to the same namespace/key.
332337 // The lock also encapsulates the latest written version per key.
@@ -339,10 +344,10 @@ impl VssStoreInner {
339344 base_url : String , store_id : String , vss_seed : [ u8 ; 32 ] ,
340345 header_provider : Arc < dyn VssHeaderProvider > ,
341346 ) -> Self {
347+ let schema_version = VssSchemaVersion :: V0 ;
342348 let ( data_encryption_key, obfuscation_master_key) =
343349 derive_data_encryption_and_obfuscation_keys ( & vss_seed) ;
344350 let key_obfuscator = KeyObfuscator :: new ( obfuscation_master_key) ;
345- let storable_builder = StorableBuilder :: new ( data_encryption_key, RandEntropySource ) ;
346351 let retry_policy = ExponentialBackoffRetryPolicy :: new ( Duration :: from_millis ( 10 ) )
347352 . with_max_attempts ( 10 )
348353 . with_max_total_delay ( Duration :: from_secs ( 15 ) )
@@ -359,7 +364,15 @@ impl VssStoreInner {
359364 let client = VssClient :: new_with_headers ( base_url, retry_policy, header_provider) ;
360365 let locks = Mutex :: new ( HashMap :: new ( ) ) ;
361366 let pending_lazy_deletes = Mutex :: new ( Vec :: new ( ) ) ;
362- Self { client, store_id, storable_builder, key_obfuscator, locks, pending_lazy_deletes }
367+ Self {
368+ schema_version,
369+ client,
370+ store_id,
371+ data_encryption_key,
372+ key_obfuscator,
373+ locks,
374+ pending_lazy_deletes,
375+ }
363376 }
364377
365378 fn get_inner_lock_ref ( & self , locking_key : String ) -> Arc < tokio:: sync:: Mutex < u64 > > {
@@ -370,17 +383,45 @@ impl VssStoreInner {
370383 fn build_obfuscated_key (
371384 & self , primary_namespace : & str , secondary_namespace : & str , key : & str ,
372385 ) -> String {
373- let obfuscated_key = self . key_obfuscator . obfuscate ( key) ;
374- if primary_namespace. is_empty ( ) {
375- obfuscated_key
386+ if self . schema_version == VssSchemaVersion :: V1 {
387+ let obfuscated_prefix =
388+ self . build_obfuscated_prefix ( primary_namespace, secondary_namespace) ;
389+ let obfuscated_key = self . key_obfuscator . obfuscate ( key) ;
390+ format ! ( "{}#{}" , obfuscated_prefix, obfuscated_key)
391+ } else {
392+ // Default to V0 schema
393+ let obfuscated_key = self . key_obfuscator . obfuscate ( key) ;
394+ if primary_namespace. is_empty ( ) {
395+ obfuscated_key
396+ } else {
397+ format ! ( "{}#{}#{}" , primary_namespace, secondary_namespace, obfuscated_key)
398+ }
399+ }
400+ }
401+
402+ fn build_obfuscated_prefix (
403+ & self , primary_namespace : & str , secondary_namespace : & str ,
404+ ) -> String {
405+ if self . schema_version == VssSchemaVersion :: V1 {
406+ let prefix = format ! ( "{}#{}" , primary_namespace, secondary_namespace) ;
407+ self . key_obfuscator . obfuscate ( & prefix)
376408 } else {
377- format ! ( "{}#{}#{}" , primary_namespace, secondary_namespace, obfuscated_key)
409+ // Default to V0 schema
410+ format ! ( "{}#{}" , primary_namespace, secondary_namespace)
378411 }
379412 }
380413
381414 fn extract_key ( & self , unified_key : & str ) -> io:: Result < String > {
382- let mut parts = unified_key. splitn ( 3 , '#' ) ;
383- let ( _primary_namespace, _secondary_namespace) = ( parts. next ( ) , parts. next ( ) ) ;
415+ let mut parts = if self . schema_version == VssSchemaVersion :: V1 {
416+ let mut parts = unified_key. splitn ( 2 , '#' ) ;
417+ let _obfuscated_namespace = parts. next ( ) ;
418+ parts
419+ } else {
420+ // Default to V0 schema
421+ let mut parts = unified_key. splitn ( 3 , '#' ) ;
422+ let ( _primary_namespace, _secondary_namespace) = ( parts. next ( ) , parts. next ( ) ) ;
423+ parts
424+ } ;
384425 match parts. next ( ) {
385426 Some ( obfuscated_key) => {
386427 let actual_key = self . key_obfuscator . deobfuscate ( obfuscated_key) ?;
@@ -395,7 +436,7 @@ impl VssStoreInner {
395436 ) -> io:: Result < Vec < String > > {
396437 let mut page_token = None ;
397438 let mut keys = vec ! [ ] ;
398- let key_prefix = format ! ( "{}#{}" , primary_namespace, secondary_namespace) ;
439+ let key_prefix = self . build_obfuscated_prefix ( primary_namespace, secondary_namespace) ;
399440 while page_token != Some ( "" . to_string ( ) ) {
400441 let request = ListKeyVersionsRequest {
401442 store_id : self . store_id . clone ( ) ,
@@ -425,9 +466,8 @@ impl VssStoreInner {
425466 ) -> io:: Result < Vec < u8 > > {
426467 check_namespace_key_validity ( & primary_namespace, & secondary_namespace, Some ( & key) , "read" ) ?;
427468
428- let obfuscated_key =
429- self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
430- let request = GetObjectRequest { store_id : self . store_id . clone ( ) , key : obfuscated_key } ;
469+ let store_key = self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
470+ let request = GetObjectRequest { store_id : self . store_id . clone ( ) , key : store_key. clone ( ) } ;
431471 let resp = self . client . get_object ( & request) . await . map_err ( |e| {
432472 let msg = format ! (
433473 "Failed to read from key {}/{}/{}: {}" ,
@@ -449,7 +489,11 @@ impl VssStoreInner {
449489 Error :: new ( ErrorKind :: Other , msg)
450490 } ) ?;
451491
452- Ok ( self . storable_builder . deconstruct ( storable) ?. 0 )
492+ let storable_builder = StorableBuilder :: new ( RandEntropySource ) ;
493+ let aad =
494+ if self . schema_version == VssSchemaVersion :: V1 { store_key. as_bytes ( ) } else { & [ ] } ;
495+ let decrypted = storable_builder. deconstruct ( storable, & self . data_encryption_key , aad) ?. 0 ;
496+ Ok ( decrypted)
453497 }
454498
455499 async fn write_internal (
@@ -469,22 +513,25 @@ impl VssStoreInner {
469513 . ok ( )
470514 . and_then ( |mut guard| guard. take ( ) )
471515 . unwrap_or_default ( ) ;
472- self . execute_locked_write ( inner_lock_ref, locking_key, version, async move || {
473- let obfuscated_key =
474- self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
475- let vss_version = -1 ;
476- let storable = self . storable_builder . build ( buf, vss_version) ;
477- let request = PutObjectRequest {
478- store_id : self . store_id . clone ( ) ,
479- global_version : None ,
480- transaction_items : vec ! [ KeyValue {
481- key: obfuscated_key,
482- version: vss_version,
483- value: storable. encode_to_vec( ) ,
484- } ] ,
485- delete_items : delete_items. clone ( ) ,
486- } ;
516+ let store_key = self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
517+ let vss_version = -1 ;
518+ let storable_builder = StorableBuilder :: new ( RandEntropySource ) ;
519+ let aad =
520+ if self . schema_version == VssSchemaVersion :: V1 { store_key. as_bytes ( ) } else { & [ ] } ;
521+ let storable =
522+ storable_builder. build ( buf. to_vec ( ) , vss_version, & self . data_encryption_key , aad) ;
523+ let request = PutObjectRequest {
524+ store_id : self . store_id . clone ( ) ,
525+ global_version : None ,
526+ transaction_items : vec ! [ KeyValue {
527+ key: store_key,
528+ version: vss_version,
529+ value: storable. encode_to_vec( ) ,
530+ } ] ,
531+ delete_items : delete_items. clone ( ) ,
532+ } ;
487533
534+ self . execute_locked_write ( inner_lock_ref, locking_key, version, async move || {
488535 self . client . put_object ( & request) . await . map_err ( |e| {
489536 // Restore delete items so they'll be retried on next write.
490537 if !delete_items. is_empty ( ) {
0 commit comments