11use anyhow:: Result ;
22use azure_data_cosmos:: prelude:: Operation ;
3- use azure_data_cosmos:: resources:: collection:: PartitionKey ;
43use azure_data_cosmos:: {
54 prelude:: { AuthorizationToken , CollectionClient , CosmosClient , Query } ,
65 CosmosEntity ,
@@ -13,6 +12,12 @@ use std::sync::{Arc, Mutex};
1312
1413pub struct KeyValueAzureCosmos {
1514 client : CollectionClient ,
15+ /// An optional app id
16+ ///
17+ /// If provided, the store will handle multiple stores per container using a
18+ /// partition key of `/$app_id/$store_name`, otherwise there will be one container
19+ /// per store, and the partition key will be `/id`.
20+ app_id : Option < String > ,
1621}
1722
1823/// Azure Cosmos Key / Value runtime config literal options for authentication
@@ -71,6 +76,7 @@ impl KeyValueAzureCosmos {
7176 database : String ,
7277 container : String ,
7378 auth_options : KeyValueAzureCosmosAuthOptions ,
79+ app_id : Option < String > ,
7480 ) -> Result < Self > {
7581 let token = match auth_options {
7682 KeyValueAzureCosmosAuthOptions :: RuntimeConfigValues ( config) => {
@@ -86,15 +92,16 @@ impl KeyValueAzureCosmos {
8692 let database_client = cosmos_client. database_client ( database) ;
8793 let client = database_client. collection_client ( container) ;
8894
89- Ok ( Self { client } )
95+ Ok ( Self { client, app_id } )
9096 }
9197}
9298
9399#[ async_trait]
94100impl StoreManager for KeyValueAzureCosmos {
95- async fn get ( & self , _name : & str ) -> Result < Arc < dyn Store > , Error > {
101+ async fn get ( & self , name : & str ) -> Result < Arc < dyn Store > , Error > {
96102 Ok ( Arc :: new ( AzureCosmosStore {
97103 client : self . client . clone ( ) ,
104+ store_id : self . app_id . as_ref ( ) . map ( |i| format ! ( "{i}/{name}" ) ) ,
98105 } ) )
99106 }
100107
@@ -114,13 +121,10 @@ impl StoreManager for KeyValueAzureCosmos {
114121#[ derive( Clone ) ]
115122struct AzureCosmosStore {
116123 client : CollectionClient ,
117- }
118-
119- struct CompareAndSwap {
120- key : String ,
121- client : CollectionClient ,
122- bucket_rep : u32 ,
123- etag : Mutex < Option < String > > ,
124+ /// An optional store id to use as a partition key for all operations.
125+ ///
126+ /// If the store id not set, the store will use `/id` as the partition key.
127+ store_id : Option < String > ,
124128}
125129
126130#[ async_trait]
@@ -134,6 +138,7 @@ impl Store for AzureCosmosStore {
134138 let pair = Pair {
135139 id : key. to_string ( ) ,
136140 value : value. to_vec ( ) ,
141+ store_id : self . store_id . clone ( ) ,
137142 } ;
138143 self . client
139144 . create_document ( pair)
@@ -145,7 +150,10 @@ impl Store for AzureCosmosStore {
145150
146151 async fn delete ( & self , key : & str ) -> Result < ( ) , Error > {
147152 if self . exists ( key) . await ? {
148- let document_client = self . client . document_client ( key, & key) . map_err ( log_error) ?;
153+ let document_client = self
154+ . client
155+ . document_client ( key, & self . store_id )
156+ . map_err ( log_error) ?;
149157 document_client. delete_document ( ) . await . map_err ( log_error) ?;
150158 }
151159 Ok ( ( ) )
@@ -160,12 +168,7 @@ impl Store for AzureCosmosStore {
160168 }
161169
162170 async fn get_many ( & self , keys : Vec < String > ) -> Result < Vec < ( String , Option < Vec < u8 > > ) > , Error > {
163- let in_clause: String = keys
164- . into_iter ( )
165- . map ( |k| format ! ( "'{}'" , k) )
166- . collect :: < Vec < String > > ( )
167- . join ( ", " ) ;
168- let stmt = Query :: new ( format ! ( "SELECT * FROM c WHERE c.id IN ({})" , in_clause) ) ;
171+ let stmt = Query :: new ( self . get_in_query ( keys) ) ;
169172 let query = self
170173 . client
171174 . query_documents ( stmt)
@@ -175,9 +178,11 @@ impl Store for AzureCosmosStore {
175178 let mut stream = query. into_stream :: < Pair > ( ) ;
176179 while let Some ( resp) = stream. next ( ) . await {
177180 let resp = resp. map_err ( log_error) ?;
178- for ( pair, _) in resp. results {
179- res. push ( ( pair. id , Some ( pair. value ) ) ) ;
180- }
181+ res. extend (
182+ resp. results
183+ . into_iter ( )
184+ . map ( |( pair, _) | ( pair. id , Some ( pair. value ) ) ) ,
185+ ) ;
181186 }
182187 Ok ( res)
183188 }
@@ -200,7 +205,7 @@ impl Store for AzureCosmosStore {
200205 let operations = vec ! [ Operation :: incr( "/value" , delta) . map_err( log_error) ?] ;
201206 let _ = self
202207 . client
203- . document_client ( key. clone ( ) , & key . as_str ( ) )
208+ . document_client ( key. clone ( ) , & self . store_id )
204209 . map_err ( log_error) ?
205210 . patch_document ( operations)
206211 . await
@@ -227,21 +232,39 @@ impl Store for AzureCosmosStore {
227232 client : self . client . clone ( ) ,
228233 etag : Mutex :: new ( None ) ,
229234 bucket_rep,
235+ store_id : self . store_id . clone ( ) ,
230236 } ) )
231237 }
232238}
233239
240+ struct CompareAndSwap {
241+ key : String ,
242+ client : CollectionClient ,
243+ bucket_rep : u32 ,
244+ etag : Mutex < Option < String > > ,
245+ store_id : Option < String > ,
246+ }
247+
248+ impl CompareAndSwap {
249+ fn get_query ( & self ) -> String {
250+ let mut query = format ! ( "SELECT * FROM c WHERE c.id='{}'" , self . key) ;
251+ self . append_store_id ( & mut query, true ) ;
252+ query
253+ }
254+
255+ fn append_store_id ( & self , query : & mut String , condition_already_exists : bool ) {
256+ append_store_id_condition ( query, self . store_id . as_deref ( ) , condition_already_exists) ;
257+ }
258+ }
259+
234260#[ async_trait]
235261impl Cas for CompareAndSwap {
236262 /// `current` will fetch the current value for the key and store the etag for the record. The
237263 /// etag will be used to perform and optimistic concurrency update using the `if-match` header.
238264 async fn current ( & self ) -> Result < Option < Vec < u8 > > , Error > {
239265 let mut stream = self
240266 . client
241- . query_documents ( Query :: new ( format ! (
242- "SELECT * FROM c WHERE c.id='{}'" ,
243- self . key
244- ) ) )
267+ . query_documents ( Query :: new ( self . get_query ( ) ) )
245268 . query_cross_partition ( true )
246269 . max_item_count ( 1 )
247270 . into_stream :: < Pair > ( ) ;
@@ -272,15 +295,15 @@ impl Cas for CompareAndSwap {
272295 /// `swap` updates the value for the key using the etag saved in the `current` function for
273296 /// optimistic concurrency.
274297 async fn swap ( & self , value : Vec < u8 > ) -> Result < ( ) , SwapError > {
275- let pk = PartitionKey :: from ( & self . key ) ;
276298 let pair = Pair {
277299 id : self . key . clone ( ) ,
278300 value,
301+ store_id : self . store_id . clone ( ) ,
279302 } ;
280303
281304 let doc_client = self
282305 . client
283- . document_client ( & self . key , & pk )
306+ . document_client ( & self . key , & pair . partition_key ( ) )
284307 . map_err ( log_cas_error) ?;
285308
286309 let etag_value = self . etag . lock ( ) . unwrap ( ) . clone ( ) ;
@@ -318,55 +341,97 @@ impl AzureCosmosStore {
318341 async fn get_pair ( & self , key : & str ) -> Result < Option < Pair > , Error > {
319342 let query = self
320343 . client
321- . query_documents ( Query :: new ( format ! ( "SELECT * FROM c WHERE c.id='{}'" , key) ) )
344+ . query_documents ( Query :: new ( self . get_query ( key) ) )
322345 . query_cross_partition ( true )
323346 . max_item_count ( 1 ) ;
324347
325348 // There can be no duplicated keys, so we create the stream and only take the first result.
326349 let mut stream = query. into_stream :: < Pair > ( ) ;
327- let res = stream. next ( ) . await ;
328- match res {
329- Some ( r) => {
330- let r = r. map_err ( log_error) ?;
331- match r. results . first ( ) . cloned ( ) {
332- Some ( ( p, _) ) => Ok ( Some ( p) ) ,
333- None => Ok ( None ) ,
334- }
335- }
336- None => Ok ( None ) ,
337- }
350+ let Some ( res) = stream. next ( ) . await else {
351+ return Ok ( None ) ;
352+ } ;
353+ Ok ( res
354+ . map_err ( log_error) ?
355+ . results
356+ . first ( )
357+ . map ( |( p, _) | p. clone ( ) ) )
338358 }
339359
340360 async fn get_keys ( & self ) -> Result < Vec < String > , Error > {
341361 let query = self
342362 . client
343- . query_documents ( Query :: new ( "SELECT * FROM c" . to_string ( ) ) )
363+ . query_documents ( Query :: new ( self . get_keys_query ( ) ) )
344364 . query_cross_partition ( true ) ;
345365 let mut res = Vec :: new ( ) ;
346366
347367 let mut stream = query. into_stream :: < Pair > ( ) ;
348368 while let Some ( resp) = stream. next ( ) . await {
349369 let resp = resp. map_err ( log_error) ?;
350- for ( pair, _) in resp. results {
351- res. push ( pair. id ) ;
352- }
370+ res. extend ( resp. results . into_iter ( ) . map ( |( pair, _) | pair. id ) ) ;
353371 }
354372
355373 Ok ( res)
356374 }
375+
376+ fn get_query ( & self , key : & str ) -> String {
377+ let mut query = format ! ( "SELECT * FROM c WHERE c.id='{}'" , key) ;
378+ self . append_store_id ( & mut query, true ) ;
379+ query
380+ }
381+
382+ fn get_keys_query ( & self ) -> String {
383+ let mut query = "SELECT * FROM c" . to_owned ( ) ;
384+ self . append_store_id ( & mut query, false ) ;
385+ query
386+ }
387+
388+ fn get_in_query ( & self , keys : Vec < String > ) -> String {
389+ let in_clause: String = keys
390+ . into_iter ( )
391+ . map ( |k| format ! ( "'{k}'" ) )
392+ . collect :: < Vec < String > > ( )
393+ . join ( ", " ) ;
394+
395+ let mut query = format ! ( "SELECT * FROM c WHERE c.id IN ({})" , in_clause) ;
396+ self . append_store_id ( & mut query, true ) ;
397+ query
398+ }
399+
400+ fn append_store_id ( & self , query : & mut String , condition_already_exists : bool ) {
401+ append_store_id_condition ( query, self . store_id . as_deref ( ) , condition_already_exists) ;
402+ }
403+ }
404+
405+ /// Appends an option store id condition to the query.
406+ fn append_store_id_condition (
407+ query : & mut String ,
408+ store_id : Option < & str > ,
409+ condition_already_exists : bool ,
410+ ) {
411+ if let Some ( s) = store_id {
412+ if condition_already_exists {
413+ query. push_str ( " AND" ) ;
414+ } else {
415+ query. push_str ( " WHERE" ) ;
416+ }
417+ query. push_str ( " c.store_id='" ) ;
418+ query. push_str ( s) ;
419+ query. push ( '\'' )
420+ }
357421}
358422
359423#[ derive( Serialize , Deserialize , Clone , Debug ) ]
360424pub struct Pair {
361- // In Azure CosmosDB, the default partition key is "/id", and this implementation assumes that partition ID is not changed.
362425 pub id : String ,
363426 pub value : Vec < u8 > ,
427+ #[ serde( skip_serializing_if = "Option::is_none" ) ]
428+ pub store_id : Option < String > ,
364429}
365430
366431impl CosmosEntity for Pair {
367432 type Entity = String ;
368433
369434 fn partition_key ( & self ) -> Self :: Entity {
370- self . id . clone ( )
435+ self . store_id . clone ( ) . unwrap_or_else ( || self . id . clone ( ) )
371436 }
372437}
0 commit comments