11use anyhow:: Result ;
22use azure_data_cosmos:: prelude:: Operation ;
3- use azure_data_cosmos:: resources:: collection:: PartitionKey ;
43use azure_data_cosmos:: {
54 prelude:: { AuthorizationToken , CollectionClient , CosmosClient , Query } ,
65 CosmosEntity ,
@@ -13,6 +12,11 @@ use std::sync::{Arc, Mutex};
1312
1413pub struct KeyValueAzureCosmos {
1514 client : CollectionClient ,
15+ /// An optional app id
16+ ///
17+ /// If provided, the store will handle multiple stores per container using a
18+ /// partition key of `/$app_id/$store_name`, otherwise there will be one container
19+ /// per store, and the partition key will be `/id`.
1620 app_id : Option < String > ,
1721}
1822
@@ -97,7 +101,7 @@ impl StoreManager for KeyValueAzureCosmos {
97101 async fn get ( & self , name : & str ) -> Result < Arc < dyn Store > , Error > {
98102 Ok ( Arc :: new ( AzureCosmosStore {
99103 client : self . client . clone ( ) ,
100- partition_key : self . app_id . as_ref ( ) . map ( |i| format ! ( "{i}/{name}" ) ) ,
104+ store_id : self . app_id . as_ref ( ) . map ( |i| format ! ( "{i}/{name}" ) ) ,
101105 } ) )
102106 }
103107
@@ -117,10 +121,10 @@ impl StoreManager for KeyValueAzureCosmos {
117121#[ derive( Clone ) ]
118122struct AzureCosmosStore {
119123 client : CollectionClient ,
120- /// An optional partition key to use for all operations.
124+ /// An optional store id to use as a partition key for all operations.
121125 ///
122- /// If the partition key is not set, the store will use `/id` as the partition key.
123- partition_key : Option < String > ,
126+ /// If the store id not set, the store will use `/id` as the partition key.
127+ store_id : Option < String > ,
124128}
125129
126130#[ async_trait]
@@ -134,7 +138,7 @@ impl Store for AzureCosmosStore {
134138 let pair = Pair {
135139 id : key. to_string ( ) ,
136140 value : value. to_vec ( ) ,
137- partition_key : self . partition_key . clone ( ) ,
141+ store_id : self . store_id . clone ( ) ,
138142 } ;
139143 self . client
140144 . create_document ( pair)
@@ -148,7 +152,7 @@ impl Store for AzureCosmosStore {
148152 if self . exists ( key) . await ? {
149153 let document_client = self
150154 . client
151- . document_client ( key, & self . partition_key )
155+ . document_client ( key, & self . store_id )
152156 . map_err ( log_error) ?;
153157 document_client. delete_document ( ) . await . map_err ( log_error) ?;
154158 }
@@ -201,7 +205,7 @@ impl Store for AzureCosmosStore {
201205 let operations = vec ! [ Operation :: incr( "/value" , delta) . map_err( log_error) ?] ;
202206 let _ = self
203207 . client
204- . document_client ( key. clone ( ) , & self . partition_key )
208+ . document_client ( key. clone ( ) , & self . store_id )
205209 . map_err ( log_error) ?
206210 . patch_document ( operations)
207211 . await
@@ -228,7 +232,7 @@ impl Store for AzureCosmosStore {
228232 client : self . client . clone ( ) ,
229233 etag : Mutex :: new ( None ) ,
230234 bucket_rep,
231- partition_key : self . partition_key . clone ( ) ,
235+ store_id : self . store_id . clone ( ) ,
232236 } ) )
233237 }
234238}
@@ -238,18 +242,18 @@ struct CompareAndSwap {
238242 client : CollectionClient ,
239243 bucket_rep : u32 ,
240244 etag : Mutex < Option < String > > ,
241- partition_key : Option < String > ,
245+ store_id : Option < String > ,
242246}
243247
244248impl CompareAndSwap {
245249 fn get_query ( & self ) -> String {
246250 let mut query = format ! ( "SELECT * FROM c WHERE c.id='{}'" , self . key) ;
247- self . append_partition_key ( & mut query) ;
251+ self . append_store_id ( & mut query, true ) ;
248252 query
249253 }
250254
251- fn append_partition_key ( & self , query : & mut String ) {
252- append_partition_key_condition ( query, self . partition_key . as_deref ( ) ) ;
255+ fn append_store_id ( & self , query : & mut String , condition_already_exists : bool ) {
256+ append_store_id_condition ( query, self . store_id . as_deref ( ) , condition_already_exists ) ;
253257 }
254258}
255259
@@ -291,20 +295,15 @@ impl Cas for CompareAndSwap {
291295 /// `swap` updates the value for the key using the etag saved in the `current` function for
292296 /// optimistic concurrency.
293297 async fn swap ( & self , value : Vec < u8 > ) -> Result < ( ) , SwapError > {
294- let pk = PartitionKey :: from (
295- self . partition_key
296- . as_deref ( )
297- . unwrap_or_else ( || self . key . as_str ( ) ) ,
298- ) ;
299298 let pair = Pair {
300299 id : self . key . clone ( ) ,
301300 value,
302- partition_key : self . partition_key . clone ( ) ,
301+ store_id : self . store_id . clone ( ) ,
303302 } ;
304303
305304 let doc_client = self
306305 . client
307- . document_client ( & self . key , & pk )
306+ . document_client ( & self . key , & pair . partition_key ( ) )
308307 . map_err ( log_cas_error) ?;
309308
310309 let etag_value = self . etag . lock ( ) . unwrap ( ) . clone ( ) ;
@@ -376,38 +375,47 @@ impl AzureCosmosStore {
376375
377376 fn get_query ( & self , key : & str ) -> String {
378377 let mut query = format ! ( "SELECT * FROM c WHERE c.id='{}'" , key) ;
379- self . append_partition_key ( & mut query) ;
378+ self . append_store_id ( & mut query, true ) ;
380379 query
381380 }
382381
383382 fn get_keys_query ( & self ) -> String {
384383 let mut query = "SELECT * FROM c" . to_owned ( ) ;
385- self . append_partition_key ( & mut query) ;
384+ self . append_store_id ( & mut query, false ) ;
386385 query
387386 }
388387
389388 fn get_in_query ( & self , keys : Vec < String > ) -> String {
390389 let in_clause: String = keys
391390 . into_iter ( )
392- . map ( |k| format ! ( "'{}'" , k ) )
391+ . map ( |k| format ! ( "'{k }'" ) )
393392 . collect :: < Vec < String > > ( )
394393 . join ( ", " ) ;
395394
396395 let mut query = format ! ( "SELECT * FROM c WHERE c.id IN ({})" , in_clause) ;
397- self . append_partition_key ( & mut query) ;
396+ self . append_store_id ( & mut query, true ) ;
398397 query
399398 }
400399
401- fn append_partition_key ( & self , query : & mut String ) {
402- append_partition_key_condition ( query, self . partition_key . as_deref ( ) ) ;
400+ fn append_store_id ( & self , query : & mut String , condition_already_exists : bool ) {
401+ append_store_id_condition ( query, self . store_id . as_deref ( ) , condition_already_exists ) ;
403402 }
404403}
405404
406- /// Appends an option partition key condition to the query.
407- fn append_partition_key_condition ( query : & mut String , partition_key : Option < & str > ) {
408- if let Some ( pk) = partition_key {
409- query. push_str ( " AND c.partition_key='" ) ;
410- query. push_str ( pk) ;
405+ /// Appends an option store id condition to the query.
406+ fn append_store_id_condition (
407+ query : & mut String ,
408+ store_id : Option < & str > ,
409+ condition_already_exists : bool ,
410+ ) {
411+ if let Some ( s) = store_id {
412+ if condition_already_exists {
413+ query. push_str ( " AND" ) ;
414+ } else {
415+ query. push_str ( " WHERE" ) ;
416+ }
417+ query. push_str ( " c.store_id='" ) ;
418+ query. push_str ( s) ;
411419 query. push ( '\'' )
412420 }
413421}
@@ -417,15 +425,13 @@ pub struct Pair {
417425 pub id : String ,
418426 pub value : Vec < u8 > ,
419427 #[ serde( skip_serializing_if = "Option::is_none" ) ]
420- pub partition_key : Option < String > ,
428+ pub store_id : Option < String > ,
421429}
422430
423431impl CosmosEntity for Pair {
424432 type Entity = String ;
425433
426434 fn partition_key ( & self ) -> Self :: Entity {
427- self . partition_key
428- . clone ( )
429- . unwrap_or_else ( || self . id . clone ( ) )
435+ self . store_id . clone ( ) . unwrap_or_else ( || self . id . clone ( ) )
430436 }
431437}
0 commit comments