@@ -13,7 +13,7 @@ use std::sync::{Arc, Mutex};
1313
1414pub struct KeyValueAzureCosmos {
1515 client : CollectionClient ,
16- app_id : String ,
16+ app_id : Option < String > ,
1717}
1818
1919/// Azure Cosmos Key / Value runtime config literal options for authentication
@@ -72,7 +72,7 @@ impl KeyValueAzureCosmos {
7272 database : String ,
7373 container : String ,
7474 auth_options : KeyValueAzureCosmosAuthOptions ,
75- app_id : String ,
75+ app_id : Option < String > ,
7676 ) -> Result < Self > {
7777 let token = match auth_options {
7878 KeyValueAzureCosmosAuthOptions :: RuntimeConfigValues ( config) => {
@@ -97,7 +97,7 @@ impl StoreManager for KeyValueAzureCosmos {
9797 async fn get ( & self , name : & str ) -> Result < Arc < dyn Store > , Error > {
9898 Ok ( Arc :: new ( AzureCosmosStore {
9999 client : self . client . clone ( ) ,
100- partition_key : format ! ( "{}/{}" , self . app_id , name ) ,
100+ partition_key : self . app_id . as_ref ( ) . map ( |i| format ! ( "{i }/{name}" ) ) ,
101101 } ) )
102102 }
103103
@@ -117,7 +117,10 @@ impl StoreManager for KeyValueAzureCosmos {
117117#[ derive( Clone ) ]
118118struct AzureCosmosStore {
119119 client : CollectionClient ,
120- partition_key : String ,
120+ /// An optional partition key to use for all operations.
121+ ///
122+ /// If the partition key is not set, the store will use `/id` as the partition key.
123+ partition_key : Option < String > ,
121124}
122125
123126#[ async_trait]
@@ -161,15 +164,7 @@ impl Store for AzureCosmosStore {
161164 }
162165
163166 async fn get_many ( & self , keys : Vec < String > ) -> Result < Vec < ( String , Option < Vec < u8 > > ) > , Error > {
164- let in_clause: String = keys
165- . into_iter ( )
166- . map ( |k| format ! ( "'{}'" , k) )
167- . collect :: < Vec < String > > ( )
168- . join ( ", " ) ;
169- let stmt = Query :: new ( format ! (
170- "SELECT * FROM c WHERE c.id IN ({}) AND partition_key='{}'" ,
171- in_clause, self . partition_key
172- ) ) ;
167+ let stmt = Query :: new ( self . get_in_query ( keys) ) ;
173168 let query = self
174169 . client
175170 . query_documents ( stmt)
@@ -243,7 +238,23 @@ struct CompareAndSwap {
243238 client : CollectionClient ,
244239 bucket_rep : u32 ,
245240 etag : Mutex < Option < String > > ,
246- partition_key : String ,
241+ partition_key : Option < String > ,
242+ }
243+
244+ impl CompareAndSwap {
245+ fn get_query ( & self ) -> String {
246+ let mut query = format ! ( "SELECT * FROM c WHERE c.id='{}'" , self . key) ;
247+ self . append_partition_key ( & mut query) ;
248+ query
249+ }
250+
251+ fn append_partition_key ( & self , query : & mut String ) {
252+ if let Some ( pk) = & self . partition_key {
253+ query. push_str ( " AND c.partition_key='" ) ;
254+ query. push_str ( pk) ;
255+ query. push ( '\'' )
256+ }
257+ }
247258}
248259
249260#[ async_trait]
@@ -253,10 +264,7 @@ impl Cas for CompareAndSwap {
253264 async fn current ( & self ) -> Result < Option < Vec < u8 > > , Error > {
254265 let mut stream = self
255266 . client
256- . query_documents ( Query :: new ( format ! (
257- "SELECT * FROM c WHERE c.id='{}' and c.partition_key='{}'" ,
258- self . key, self . partition_key
259- ) ) )
267+ . query_documents ( Query :: new ( self . get_query ( ) ) )
260268 . query_cross_partition ( true )
261269 . max_item_count ( 1 )
262270 . into_stream :: < Pair > ( ) ;
@@ -287,7 +295,11 @@ impl Cas for CompareAndSwap {
287295 /// `swap` updates the value for the key using the etag saved in the `current` function for
288296 /// optimistic concurrency.
289297 async fn swap ( & self , value : Vec < u8 > ) -> Result < ( ) , SwapError > {
290- let pk = PartitionKey :: from ( & self . partition_key ) ;
298+ let pk = PartitionKey :: from (
299+ self . partition_key
300+ . as_deref ( )
301+ . unwrap_or_else ( || self . key . as_str ( ) ) ,
302+ ) ;
291303 let pair = Pair {
292304 id : self . key . clone ( ) ,
293305 value,
@@ -334,10 +346,7 @@ impl AzureCosmosStore {
334346 async fn get_pair ( & self , key : & str ) -> Result < Option < Pair > , Error > {
335347 let query = self
336348 . client
337- . query_documents ( Query :: new ( format ! (
338- "SELECT * FROM c WHERE c.id='{}' AND c.partition_key='{}'" ,
339- key, self . partition_key
340- ) ) )
349+ . query_documents ( Query :: new ( self . get_query ( key) ) )
341350 . query_cross_partition ( true )
342351 . max_item_count ( 1 ) ;
343352
@@ -356,7 +365,7 @@ impl AzureCosmosStore {
356365 async fn get_keys ( & self ) -> Result < Vec < String > , Error > {
357366 let query = self
358367 . client
359- . query_documents ( Query :: new ( "SELECT * FROM c" . to_string ( ) ) )
368+ . query_documents ( Query :: new ( self . get_keys_query ( ) ) )
360369 . query_cross_partition ( true ) ;
361370 let mut res = Vec :: new ( ) ;
362371
@@ -368,19 +377,54 @@ impl AzureCosmosStore {
368377
369378 Ok ( res)
370379 }
380+
381+ fn get_query ( & self , key : & str ) -> String {
382+ let mut query = format ! ( "SELECT * FROM c WHERE c.id='{}'" , key) ;
383+ self . append_partition_key ( & mut query) ;
384+ query
385+ }
386+
387+ fn get_keys_query ( & self ) -> String {
388+ let mut query = "SELECT * FROM c" . to_owned ( ) ;
389+ self . append_partition_key ( & mut query) ;
390+ query
391+ }
392+
393+ fn get_in_query ( & self , keys : Vec < String > ) -> String {
394+ let in_clause: String = keys
395+ . into_iter ( )
396+ . map ( |k| format ! ( "'{}'" , k) )
397+ . collect :: < Vec < String > > ( )
398+ . join ( ", " ) ;
399+
400+ let mut query = format ! ( "SELECT * FROM c WHERE c.id IN ({})" , in_clause) ;
401+ self . append_partition_key ( & mut query) ;
402+ query
403+ }
404+
405+ fn append_partition_key ( & self , query : & mut String ) {
406+ if let Some ( pk) = & self . partition_key {
407+ query. push_str ( " AND c.partition_key='" ) ;
408+ query. push_str ( pk) ;
409+ query. push ( '\'' )
410+ }
411+ }
371412}
372413
373414#[ derive( Serialize , Deserialize , Clone , Debug ) ]
374415pub struct Pair {
375416 pub id : String ,
376417 pub value : Vec < u8 > ,
377- pub partition_key : String ,
418+ #[ serde( skip_serializing_if = "Option::is_none" ) ]
419+ pub partition_key : Option < String > ,
378420}
379421
380422impl CosmosEntity for Pair {
381423 type Entity = String ;
382424
383425 fn partition_key ( & self ) -> Self :: Entity {
384- self . partition_key . clone ( )
426+ self . partition_key
427+ . clone ( )
428+ . unwrap_or_else ( || self . id . clone ( ) )
385429 }
386430}
0 commit comments