@@ -44,11 +44,15 @@ type CustomRetryPolicy = FilteredRetryPolicy<
4444 Box < dyn Fn ( & VssError ) -> bool + ' static + Send + Sync > ,
4545> ;
4646
47+ #[ derive( Debug , PartialEq ) ]
4748enum VssSchemaVersion {
4849 // The initial schema version.
4950 // This used an empty `aad` and unobfuscated `primary_namespace`/`secondary_namespace`s in the
5051 // stored key.
5152 V0 ,
53+ // The second deployed schema version.
54+ // Here we started to obfuscate the primary and secondary namespaces and the obfuscated `store_key` (`obfuscate(primary_namespace#secondary_namespace)#obfuscate(key)`) is now used as `aad` for encryption, ensuring that the encrypted blobs commit to the key they're stored under.
55+ V1 ,
5256}
5357
5458// We set this to a small number of threads that would still allow to make some progress if one
@@ -321,9 +325,10 @@ impl Drop for VssStore {
321325}
322326
323327struct VssStoreInner {
328+ schema_version : VssSchemaVersion ,
324329 client : VssClient < CustomRetryPolicy > ,
325330 store_id : String ,
326- storable_builder : StorableBuilder < RandEntropySource > ,
331+ data_encryption_key : [ u8 ; 32 ] ,
327332 key_obfuscator : KeyObfuscator ,
328333 // Per-key locks that ensures that we don't have concurrent writes to the same namespace/key.
329334 // The lock also encapsulates the latest written version per key.
@@ -335,10 +340,10 @@ impl VssStoreInner {
335340 base_url : String , store_id : String , vss_seed : [ u8 ; 32 ] ,
336341 header_provider : Arc < dyn VssHeaderProvider > ,
337342 ) -> Self {
343+ let schema_version = VssSchemaVersion :: V0 ;
338344 let ( data_encryption_key, obfuscation_master_key) =
339345 derive_data_encryption_and_obfuscation_keys ( & vss_seed) ;
340346 let key_obfuscator = KeyObfuscator :: new ( obfuscation_master_key) ;
341- let storable_builder = StorableBuilder :: new ( data_encryption_key, RandEntropySource ) ;
342347 let retry_policy = ExponentialBackoffRetryPolicy :: new ( Duration :: from_millis ( 10 ) )
343348 . with_max_attempts ( 10 )
344349 . with_max_total_delay ( Duration :: from_secs ( 15 ) )
@@ -354,7 +359,7 @@ impl VssStoreInner {
354359
355360 let client = VssClient :: new_with_headers ( base_url, retry_policy, header_provider) ;
356361 let locks = Mutex :: new ( HashMap :: new ( ) ) ;
357- Self { client, store_id, storable_builder , key_obfuscator, locks }
362+ Self { schema_version , client, store_id, data_encryption_key , key_obfuscator, locks }
358363 }
359364
360365 fn get_inner_lock_ref ( & self , locking_key : String ) -> Arc < tokio:: sync:: Mutex < u64 > > {
@@ -365,17 +370,45 @@ impl VssStoreInner {
365370 fn build_obfuscated_key (
366371 & self , primary_namespace : & str , secondary_namespace : & str , key : & str ,
367372 ) -> String {
368- let obfuscated_key = self . key_obfuscator . obfuscate ( key) ;
369- if primary_namespace. is_empty ( ) {
370- obfuscated_key
373+ if self . schema_version == VssSchemaVersion :: V1 {
374+ let obfuscated_prefix =
375+ self . build_obfuscated_prefix ( primary_namespace, secondary_namespace) ;
376+ let obfuscated_key = self . key_obfuscator . obfuscate ( key) ;
377+ format ! ( "{}#{}" , obfuscated_prefix, obfuscated_key)
378+ } else {
379+ // Default to V0 schema
380+ let obfuscated_key = self . key_obfuscator . obfuscate ( key) ;
381+ if primary_namespace. is_empty ( ) {
382+ obfuscated_key
383+ } else {
384+ format ! ( "{}#{}#{}" , primary_namespace, secondary_namespace, obfuscated_key)
385+ }
386+ }
387+ }
388+
389+ fn build_obfuscated_prefix (
390+ & self , primary_namespace : & str , secondary_namespace : & str ,
391+ ) -> String {
392+ if self . schema_version == VssSchemaVersion :: V1 {
393+ let prefix = format ! ( "{}#{}" , primary_namespace, secondary_namespace) ;
394+ self . key_obfuscator . obfuscate ( & prefix)
371395 } else {
372- format ! ( "{}#{}#{}" , primary_namespace, secondary_namespace, obfuscated_key)
396+ // Default to V0 schema
397+ format ! ( "{}#{}" , primary_namespace, secondary_namespace)
373398 }
374399 }
375400
376401 fn extract_key ( & self , unified_key : & str ) -> io:: Result < String > {
377- let mut parts = unified_key. splitn ( 3 , '#' ) ;
378- let ( _primary_namespace, _secondary_namespace) = ( parts. next ( ) , parts. next ( ) ) ;
402+ let mut parts = if self . schema_version == VssSchemaVersion :: V1 {
403+ let mut parts = unified_key. splitn ( 2 , '#' ) ;
404+ let _obfuscated_namespace = parts. next ( ) ;
405+ parts
406+ } else {
407+ // Default to V0 schema
408+ let mut parts = unified_key. splitn ( 3 , '#' ) ;
409+ let ( _primary_namespace, _secondary_namespace) = ( parts. next ( ) , parts. next ( ) ) ;
410+ parts
411+ } ;
379412 match parts. next ( ) {
380413 Some ( obfuscated_key) => {
381414 let actual_key = self . key_obfuscator . deobfuscate ( obfuscated_key) ?;
@@ -390,7 +423,7 @@ impl VssStoreInner {
390423 ) -> io:: Result < Vec < String > > {
391424 let mut page_token = None ;
392425 let mut keys = vec ! [ ] ;
393- let key_prefix = format ! ( "{}#{}" , primary_namespace, secondary_namespace) ;
426+ let key_prefix = self . build_obfuscated_prefix ( primary_namespace, secondary_namespace) ;
394427 while page_token != Some ( "" . to_string ( ) ) {
395428 let request = ListKeyVersionsRequest {
396429 store_id : self . store_id . clone ( ) ,
@@ -420,9 +453,8 @@ impl VssStoreInner {
420453 ) -> io:: Result < Vec < u8 > > {
421454 check_namespace_key_validity ( & primary_namespace, & secondary_namespace, Some ( & key) , "read" ) ?;
422455
423- let obfuscated_key =
424- self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
425- let request = GetObjectRequest { store_id : self . store_id . clone ( ) , key : obfuscated_key } ;
456+ let store_key = self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
457+ let request = GetObjectRequest { store_id : self . store_id . clone ( ) , key : store_key. clone ( ) } ;
426458 let resp = self . client . get_object ( & request) . await . map_err ( |e| {
427459 let msg = format ! (
428460 "Failed to read from key {}/{}/{}: {}" ,
@@ -444,7 +476,11 @@ impl VssStoreInner {
444476 Error :: new ( ErrorKind :: Other , msg)
445477 } ) ?;
446478
447- Ok ( self . storable_builder . deconstruct ( storable) ?. 0 )
479+ let storable_builder = StorableBuilder :: new ( RandEntropySource ) ;
480+ let aad =
481+ if self . schema_version == VssSchemaVersion :: V1 { store_key. as_bytes ( ) } else { & [ ] } ;
482+ let decrypted = storable_builder. deconstruct ( storable, & self . data_encryption_key , aad) ?. 0 ;
483+ Ok ( decrypted)
448484 }
449485
450486 async fn write_internal (
@@ -458,22 +494,25 @@ impl VssStoreInner {
458494 "write" ,
459495 ) ?;
460496
461- self . execute_locked_write ( inner_lock_ref, locking_key, version, async move || {
462- let obfuscated_key =
463- self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
464- let vss_version = -1 ;
465- let storable = self . storable_builder . build ( buf, vss_version) ;
466- let request = PutObjectRequest {
467- store_id : self . store_id . clone ( ) ,
468- global_version : None ,
469- transaction_items : vec ! [ KeyValue {
470- key: obfuscated_key,
471- version: vss_version,
472- value: storable. encode_to_vec( ) ,
473- } ] ,
474- delete_items : vec ! [ ] ,
475- } ;
497+ let store_key = self . build_obfuscated_key ( & primary_namespace, & secondary_namespace, & key) ;
498+ let vss_version = -1 ;
499+ let storable_builder = StorableBuilder :: new ( RandEntropySource ) ;
500+ let aad =
501+ if self . schema_version == VssSchemaVersion :: V1 { store_key. as_bytes ( ) } else { & [ ] } ;
502+ let storable =
503+ storable_builder. build ( buf. to_vec ( ) , vss_version, & self . data_encryption_key , aad) ;
504+ let request = PutObjectRequest {
505+ store_id : self . store_id . clone ( ) ,
506+ global_version : None ,
507+ transaction_items : vec ! [ KeyValue {
508+ key: store_key,
509+ version: vss_version,
510+ value: storable. encode_to_vec( ) ,
511+ } ] ,
512+ delete_items : vec ! [ ] ,
513+ } ;
476514
515+ self . execute_locked_write ( inner_lock_ref, locking_key, version, async move || {
477516 self . client . put_object ( & request) . await . map_err ( |e| {
478517 let msg = format ! (
479518 "Failed to write to key {}/{}/{}: {}" ,
0 commit comments