From b9ae38ce73084b5fcb4240994cc3ee3e7c282644 Mon Sep 17 00:00:00 2001 From: Ryan Levick Date: Fri, 13 Dec 2024 11:46:28 +0100 Subject: [PATCH 1/4] First working version Signed-off-by: Ryan Levick --- crates/key-value-azure/src/lib.rs | 8 +-- crates/key-value-azure/src/store.rs | 84 +++++++++++++++++------------ crates/runtime-config/src/lib.rs | 4 +- 3 files changed, 56 insertions(+), 40 deletions(-) diff --git a/crates/key-value-azure/src/lib.rs b/crates/key-value-azure/src/lib.rs index f36fde8abf..4966cd34ba 100644 --- a/crates/key-value-azure/src/lib.rs +++ b/crates/key-value-azure/src/lib.rs @@ -7,15 +7,14 @@ use store::{ }; /// A key-value store that uses Azure Cosmos as the backend. -#[derive(Default)] pub struct AzureKeyValueStore { - _priv: (), + app_id: String, } impl AzureKeyValueStore { /// Creates a new `AzureKeyValueStore`. - pub fn new() -> Self { - Self::default() + pub fn new(app_id: String) -> Self { + Self { app_id } } } @@ -55,6 +54,7 @@ impl MakeKeyValueStore for AzureKeyValueStore { runtime_config.database, runtime_config.container, auth_options, + self.app_id.clone(), ) } } diff --git a/crates/key-value-azure/src/store.rs b/crates/key-value-azure/src/store.rs index 001864ea77..923793b105 100644 --- a/crates/key-value-azure/src/store.rs +++ b/crates/key-value-azure/src/store.rs @@ -13,6 +13,7 @@ use std::sync::{Arc, Mutex}; pub struct KeyValueAzureCosmos { client: CollectionClient, + app_id: String, } /// Azure Cosmos Key / Value runtime config literal options for authentication @@ -71,6 +72,7 @@ impl KeyValueAzureCosmos { database: String, container: String, auth_options: KeyValueAzureCosmosAuthOptions, + app_id: String, ) -> Result { let token = match auth_options { KeyValueAzureCosmosAuthOptions::RuntimeConfigValues(config) => { @@ -86,15 +88,16 @@ impl KeyValueAzureCosmos { let database_client = cosmos_client.database_client(database); let client = database_client.collection_client(container); - Ok(Self { client }) + Ok(Self { client, app_id }) } } #[async_trait] impl StoreManager for KeyValueAzureCosmos { - async fn get(&self, _name: &str) -> Result, Error> { + async fn get(&self, name: &str) -> Result, Error> { Ok(Arc::new(AzureCosmosStore { client: self.client.clone(), + partition_key: format!("{}/{}", self.app_id, name), })) } @@ -114,13 +117,7 @@ impl StoreManager for KeyValueAzureCosmos { #[derive(Clone)] struct AzureCosmosStore { client: CollectionClient, -} - -struct CompareAndSwap { - key: String, - client: CollectionClient, - bucket_rep: u32, - etag: Mutex>, + partition_key: String, } #[async_trait] @@ -134,6 +131,7 @@ impl Store for AzureCosmosStore { let pair = Pair { id: key.to_string(), value: value.to_vec(), + partition_key: self.partition_key.clone(), }; self.client .create_document(pair) @@ -145,7 +143,10 @@ impl Store for AzureCosmosStore { async fn delete(&self, key: &str) -> Result<(), Error> { if self.exists(key).await? { - let document_client = self.client.document_client(key, &key).map_err(log_error)?; + let document_client = self + .client + .document_client(key, &self.partition_key) + .map_err(log_error)?; document_client.delete_document().await.map_err(log_error)?; } Ok(()) @@ -165,7 +166,10 @@ impl Store for AzureCosmosStore { .map(|k| format!("'{}'", k)) .collect::>() .join(", "); - let stmt = Query::new(format!("SELECT * FROM c WHERE c.id IN ({})", in_clause)); + let stmt = Query::new(format!( + "SELECT * FROM c WHERE c.id IN ({}) AND partition_key='{}'", + in_clause, self.partition_key + )); let query = self .client .query_documents(stmt) @@ -175,9 +179,11 @@ impl Store for AzureCosmosStore { let mut stream = query.into_stream::(); while let Some(resp) = stream.next().await { let resp = resp.map_err(log_error)?; - for (pair, _) in resp.results { - res.push((pair.id, Some(pair.value))); - } + res.extend( + resp.results + .into_iter() + .map(|(pair, _)| (pair.id, Some(pair.value))), + ); } Ok(res) } @@ -200,7 +206,7 @@ impl Store for AzureCosmosStore { let operations = vec![Operation::incr("/value", delta).map_err(log_error)?]; let _ = self .client - .document_client(key.clone(), &key.as_str()) + .document_client(key.clone(), &self.partition_key) .map_err(log_error)? .patch_document(operations) .await @@ -227,10 +233,19 @@ impl Store for AzureCosmosStore { client: self.client.clone(), etag: Mutex::new(None), bucket_rep, + partition_key: self.partition_key.clone(), })) } } +struct CompareAndSwap { + key: String, + client: CollectionClient, + bucket_rep: u32, + etag: Mutex>, + partition_key: String, +} + #[async_trait] impl Cas for CompareAndSwap { /// `current` will fetch the current value for the key and store the etag for the record. The @@ -239,8 +254,8 @@ impl Cas for CompareAndSwap { let mut stream = self .client .query_documents(Query::new(format!( - "SELECT * FROM c WHERE c.id='{}'", - self.key + "SELECT * FROM c WHERE c.id='{}' and c.partition_key='{}'", + self.key, self.partition_key ))) .query_cross_partition(true) .max_item_count(1) @@ -272,10 +287,11 @@ impl Cas for CompareAndSwap { /// `swap` updates the value for the key using the etag saved in the `current` function for /// optimistic concurrency. async fn swap(&self, value: Vec) -> Result<(), SwapError> { - let pk = PartitionKey::from(&self.key); + let pk = PartitionKey::from(&self.partition_key); let pair = Pair { id: self.key.clone(), value, + partition_key: self.partition_key.clone(), }; let doc_client = self @@ -318,23 +334,23 @@ impl AzureCosmosStore { async fn get_pair(&self, key: &str) -> Result, Error> { let query = self .client - .query_documents(Query::new(format!("SELECT * FROM c WHERE c.id='{}'", key))) + .query_documents(Query::new(format!( + "SELECT * FROM c WHERE c.id='{}' AND c.partition_key='{}'", + key, self.partition_key + ))) .query_cross_partition(true) .max_item_count(1); // There can be no duplicated keys, so we create the stream and only take the first result. let mut stream = query.into_stream::(); - let res = stream.next().await; - match res { - Some(r) => { - let r = r.map_err(log_error)?; - match r.results.first().cloned() { - Some((p, _)) => Ok(Some(p)), - None => Ok(None), - } - } - None => Ok(None), - } + let Some(res) = stream.next().await else { + return Ok(None); + }; + Ok(res + .map_err(log_error)? + .results + .first() + .map(|(p, _)| p.clone())) } async fn get_keys(&self) -> Result, Error> { @@ -347,9 +363,7 @@ impl AzureCosmosStore { let mut stream = query.into_stream::(); while let Some(resp) = stream.next().await { let resp = resp.map_err(log_error)?; - for (pair, _) in resp.results { - res.push(pair.id); - } + res.extend(resp.results.into_iter().map(|(pair, _)| pair.id)); } Ok(res) @@ -358,15 +372,15 @@ impl AzureCosmosStore { #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Pair { - // In Azure CosmosDB, the default partition key is "/id", and this implementation assumes that partition ID is not changed. pub id: String, pub value: Vec, + pub partition_key: String, } impl CosmosEntity for Pair { type Entity = String; fn partition_key(&self) -> Self::Entity { - self.id.clone() + self.partition_key.clone() } } diff --git a/crates/runtime-config/src/lib.rs b/crates/runtime-config/src/lib.rs index f1fb92e02d..062a8815b3 100644 --- a/crates/runtime-config/src/lib.rs +++ b/crates/runtime-config/src/lib.rs @@ -403,7 +403,9 @@ pub fn key_value_config_resolver( .register_store_type(spin_key_value_redis::RedisKeyValueStore::new()) .unwrap(); key_value - .register_store_type(spin_key_value_azure::AzureKeyValueStore::new()) + .register_store_type(spin_key_value_azure::AzureKeyValueStore::new( + "MY_APP".to_owned(), + )) .unwrap(); key_value .register_store_type(spin_key_value_aws::AwsDynamoKeyValueStore::new()) From 40dfc1e6c0f92cc6822413fe67705f0c14f2bae3 Mon Sep 17 00:00:00 2001 From: Ryan Levick Date: Fri, 13 Dec 2024 12:33:51 +0100 Subject: [PATCH 2/4] Make partition key optional Signed-off-by: Ryan Levick --- crates/key-value-azure/src/lib.rs | 7 ++- crates/key-value-azure/src/store.rs | 97 +++++++++++++++++++++-------- crates/runtime-config/src/lib.rs | 4 +- 3 files changed, 77 insertions(+), 31 deletions(-) diff --git a/crates/key-value-azure/src/lib.rs b/crates/key-value-azure/src/lib.rs index 4966cd34ba..911d2e5ab5 100644 --- a/crates/key-value-azure/src/lib.rs +++ b/crates/key-value-azure/src/lib.rs @@ -8,12 +8,15 @@ use store::{ /// A key-value store that uses Azure Cosmos as the backend. pub struct AzureKeyValueStore { - app_id: String, + app_id: Option, } impl AzureKeyValueStore { /// Creates a new `AzureKeyValueStore`. - pub fn new(app_id: String) -> Self { + /// + /// When `app_id` is provided, the store will a partition key of `$app_id/$store_name`, + /// otherwise the partition key will be `id`. + pub fn new(app_id: Option) -> Self { Self { app_id } } } diff --git a/crates/key-value-azure/src/store.rs b/crates/key-value-azure/src/store.rs index 923793b105..198afabd96 100644 --- a/crates/key-value-azure/src/store.rs +++ b/crates/key-value-azure/src/store.rs @@ -13,7 +13,7 @@ use std::sync::{Arc, Mutex}; pub struct KeyValueAzureCosmos { client: CollectionClient, - app_id: String, + app_id: Option, } /// Azure Cosmos Key / Value runtime config literal options for authentication @@ -72,7 +72,7 @@ impl KeyValueAzureCosmos { database: String, container: String, auth_options: KeyValueAzureCosmosAuthOptions, - app_id: String, + app_id: Option, ) -> Result { let token = match auth_options { KeyValueAzureCosmosAuthOptions::RuntimeConfigValues(config) => { @@ -97,7 +97,7 @@ impl StoreManager for KeyValueAzureCosmos { async fn get(&self, name: &str) -> Result, Error> { Ok(Arc::new(AzureCosmosStore { client: self.client.clone(), - partition_key: format!("{}/{}", self.app_id, name), + partition_key: self.app_id.as_ref().map(|i| format!("{i}/{name}")), })) } @@ -117,7 +117,10 @@ impl StoreManager for KeyValueAzureCosmos { #[derive(Clone)] struct AzureCosmosStore { client: CollectionClient, - partition_key: String, + /// An optional partition key to use for all operations. + /// + /// If the partition key is not set, the store will use `/id` as the partition key. + partition_key: Option, } #[async_trait] @@ -161,15 +164,7 @@ impl Store for AzureCosmosStore { } async fn get_many(&self, keys: Vec) -> Result>)>, Error> { - let in_clause: String = keys - .into_iter() - .map(|k| format!("'{}'", k)) - .collect::>() - .join(", "); - let stmt = Query::new(format!( - "SELECT * FROM c WHERE c.id IN ({}) AND partition_key='{}'", - in_clause, self.partition_key - )); + let stmt = Query::new(self.get_in_query(keys)); let query = self .client .query_documents(stmt) @@ -243,7 +238,19 @@ struct CompareAndSwap { client: CollectionClient, bucket_rep: u32, etag: Mutex>, - partition_key: String, + partition_key: Option, +} + +impl CompareAndSwap { + fn get_query(&self) -> String { + let mut query = format!("SELECT * FROM c WHERE c.id='{}'", self.key); + self.append_partition_key(&mut query); + query + } + + fn append_partition_key(&self, query: &mut String) { + append_partition_key_condition(query, self.partition_key.as_deref()); + } } #[async_trait] @@ -253,10 +260,7 @@ impl Cas for CompareAndSwap { async fn current(&self) -> Result>, Error> { let mut stream = self .client - .query_documents(Query::new(format!( - "SELECT * FROM c WHERE c.id='{}' and c.partition_key='{}'", - self.key, self.partition_key - ))) + .query_documents(Query::new(self.get_query())) .query_cross_partition(true) .max_item_count(1) .into_stream::(); @@ -287,7 +291,11 @@ impl Cas for CompareAndSwap { /// `swap` updates the value for the key using the etag saved in the `current` function for /// optimistic concurrency. async fn swap(&self, value: Vec) -> Result<(), SwapError> { - let pk = PartitionKey::from(&self.partition_key); + let pk = PartitionKey::from( + self.partition_key + .as_deref() + .unwrap_or_else(|| self.key.as_str()), + ); let pair = Pair { id: self.key.clone(), value, @@ -334,10 +342,7 @@ impl AzureCosmosStore { async fn get_pair(&self, key: &str) -> Result, Error> { let query = self .client - .query_documents(Query::new(format!( - "SELECT * FROM c WHERE c.id='{}' AND c.partition_key='{}'", - key, self.partition_key - ))) + .query_documents(Query::new(self.get_query(key))) .query_cross_partition(true) .max_item_count(1); @@ -356,7 +361,7 @@ impl AzureCosmosStore { async fn get_keys(&self) -> Result, Error> { let query = self .client - .query_documents(Query::new("SELECT * FROM c".to_string())) + .query_documents(Query::new(self.get_keys_query())) .query_cross_partition(true); let mut res = Vec::new(); @@ -368,19 +373,59 @@ impl AzureCosmosStore { Ok(res) } + + fn get_query(&self, key: &str) -> String { + let mut query = format!("SELECT * FROM c WHERE c.id='{}'", key); + self.append_partition_key(&mut query); + query + } + + fn get_keys_query(&self) -> String { + let mut query = "SELECT * FROM c".to_owned(); + self.append_partition_key(&mut query); + query + } + + fn get_in_query(&self, keys: Vec) -> String { + let in_clause: String = keys + .into_iter() + .map(|k| format!("'{}'", k)) + .collect::>() + .join(", "); + + let mut query = format!("SELECT * FROM c WHERE c.id IN ({})", in_clause); + self.append_partition_key(&mut query); + query + } + + fn append_partition_key(&self, query: &mut String) { + append_partition_key_condition(query, self.partition_key.as_deref()); + } +} + +/// Appends an option partition key condition to the query. +fn append_partition_key_condition(query: &mut String, partition_key: Option<&str>) { + if let Some(pk) = partition_key { + query.push_str(" AND c.partition_key='"); + query.push_str(pk); + query.push('\'') + } } #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Pair { pub id: String, pub value: Vec, - pub partition_key: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub partition_key: Option, } impl CosmosEntity for Pair { type Entity = String; fn partition_key(&self) -> Self::Entity { - self.partition_key.clone() + self.partition_key + .clone() + .unwrap_or_else(|| self.id.clone()) } } diff --git a/crates/runtime-config/src/lib.rs b/crates/runtime-config/src/lib.rs index 062a8815b3..3e7f22ada7 100644 --- a/crates/runtime-config/src/lib.rs +++ b/crates/runtime-config/src/lib.rs @@ -403,9 +403,7 @@ pub fn key_value_config_resolver( .register_store_type(spin_key_value_redis::RedisKeyValueStore::new()) .unwrap(); key_value - .register_store_type(spin_key_value_azure::AzureKeyValueStore::new( - "MY_APP".to_owned(), - )) + .register_store_type(spin_key_value_azure::AzureKeyValueStore::new(None)) .unwrap(); key_value .register_store_type(spin_key_value_aws::AwsDynamoKeyValueStore::new()) From 0576dda85171c9cc8c67585de656bf423f546fc0 Mon Sep 17 00:00:00 2001 From: Ryan Levick Date: Fri, 3 Jan 2025 16:55:56 +0100 Subject: [PATCH 3/4] Make some azure types public Signed-off-by: Ryan Levick --- crates/key-value-azure/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/key-value-azure/src/lib.rs b/crates/key-value-azure/src/lib.rs index 911d2e5ab5..82195fc5ab 100644 --- a/crates/key-value-azure/src/lib.rs +++ b/crates/key-value-azure/src/lib.rs @@ -2,7 +2,8 @@ mod store; use serde::Deserialize; use spin_factor_key_value::runtime_config::spin::MakeKeyValueStore; -use store::{ + +pub use store::{ KeyValueAzureCosmos, KeyValueAzureCosmosAuthOptions, KeyValueAzureCosmosRuntimeConfigOptions, }; From 6463da8c8effa99257e68ebebc3772ff41d0ff45 Mon Sep 17 00:00:00 2001 From: Ryan Levick Date: Mon, 13 Jan 2025 15:34:07 +0100 Subject: [PATCH 4/4] Rename partition_key to store_id Signed-off-by: Ryan Levick --- crates/app/src/lib.rs | 2 +- crates/key-value-azure/src/store.rs | 76 ++++++++++++++++------------- 2 files changed, 42 insertions(+), 36 deletions(-) diff --git a/crates/app/src/lib.rs b/crates/app/src/lib.rs index 806894bfa1..8c5b84b0e5 100644 --- a/crates/app/src/lib.rs +++ b/crates/app/src/lib.rs @@ -141,7 +141,7 @@ impl App { pub fn triggers_with_type<'a>( &'a self, trigger_type: &'a str, - ) -> impl Iterator { + ) -> impl Iterator> { self.triggers() .filter(move |trigger| trigger.locked.trigger_type == trigger_type) } diff --git a/crates/key-value-azure/src/store.rs b/crates/key-value-azure/src/store.rs index 198afabd96..86f0a8a92d 100644 --- a/crates/key-value-azure/src/store.rs +++ b/crates/key-value-azure/src/store.rs @@ -1,6 +1,5 @@ use anyhow::Result; use azure_data_cosmos::prelude::Operation; -use azure_data_cosmos::resources::collection::PartitionKey; use azure_data_cosmos::{ prelude::{AuthorizationToken, CollectionClient, CosmosClient, Query}, CosmosEntity, @@ -13,6 +12,11 @@ use std::sync::{Arc, Mutex}; pub struct KeyValueAzureCosmos { client: CollectionClient, + /// An optional app id + /// + /// If provided, the store will handle multiple stores per container using a + /// partition key of `/$app_id/$store_name`, otherwise there will be one container + /// per store, and the partition key will be `/id`. app_id: Option, } @@ -97,7 +101,7 @@ impl StoreManager for KeyValueAzureCosmos { async fn get(&self, name: &str) -> Result, Error> { Ok(Arc::new(AzureCosmosStore { client: self.client.clone(), - partition_key: self.app_id.as_ref().map(|i| format!("{i}/{name}")), + store_id: self.app_id.as_ref().map(|i| format!("{i}/{name}")), })) } @@ -117,10 +121,10 @@ impl StoreManager for KeyValueAzureCosmos { #[derive(Clone)] struct AzureCosmosStore { client: CollectionClient, - /// An optional partition key to use for all operations. + /// An optional store id to use as a partition key for all operations. /// - /// If the partition key is not set, the store will use `/id` as the partition key. - partition_key: Option, + /// If the store id not set, the store will use `/id` as the partition key. + store_id: Option, } #[async_trait] @@ -134,7 +138,7 @@ impl Store for AzureCosmosStore { let pair = Pair { id: key.to_string(), value: value.to_vec(), - partition_key: self.partition_key.clone(), + store_id: self.store_id.clone(), }; self.client .create_document(pair) @@ -148,7 +152,7 @@ impl Store for AzureCosmosStore { if self.exists(key).await? { let document_client = self .client - .document_client(key, &self.partition_key) + .document_client(key, &self.store_id) .map_err(log_error)?; document_client.delete_document().await.map_err(log_error)?; } @@ -201,7 +205,7 @@ impl Store for AzureCosmosStore { let operations = vec![Operation::incr("/value", delta).map_err(log_error)?]; let _ = self .client - .document_client(key.clone(), &self.partition_key) + .document_client(key.clone(), &self.store_id) .map_err(log_error)? .patch_document(operations) .await @@ -228,7 +232,7 @@ impl Store for AzureCosmosStore { client: self.client.clone(), etag: Mutex::new(None), bucket_rep, - partition_key: self.partition_key.clone(), + store_id: self.store_id.clone(), })) } } @@ -238,18 +242,18 @@ struct CompareAndSwap { client: CollectionClient, bucket_rep: u32, etag: Mutex>, - partition_key: Option, + store_id: Option, } impl CompareAndSwap { fn get_query(&self) -> String { let mut query = format!("SELECT * FROM c WHERE c.id='{}'", self.key); - self.append_partition_key(&mut query); + self.append_store_id(&mut query, true); query } - fn append_partition_key(&self, query: &mut String) { - append_partition_key_condition(query, self.partition_key.as_deref()); + fn append_store_id(&self, query: &mut String, condition_already_exists: bool) { + append_store_id_condition(query, self.store_id.as_deref(), condition_already_exists); } } @@ -291,20 +295,15 @@ impl Cas for CompareAndSwap { /// `swap` updates the value for the key using the etag saved in the `current` function for /// optimistic concurrency. async fn swap(&self, value: Vec) -> Result<(), SwapError> { - let pk = PartitionKey::from( - self.partition_key - .as_deref() - .unwrap_or_else(|| self.key.as_str()), - ); let pair = Pair { id: self.key.clone(), value, - partition_key: self.partition_key.clone(), + store_id: self.store_id.clone(), }; let doc_client = self .client - .document_client(&self.key, &pk) + .document_client(&self.key, &pair.partition_key()) .map_err(log_cas_error)?; let etag_value = self.etag.lock().unwrap().clone(); @@ -376,38 +375,47 @@ impl AzureCosmosStore { fn get_query(&self, key: &str) -> String { let mut query = format!("SELECT * FROM c WHERE c.id='{}'", key); - self.append_partition_key(&mut query); + self.append_store_id(&mut query, true); query } fn get_keys_query(&self) -> String { let mut query = "SELECT * FROM c".to_owned(); - self.append_partition_key(&mut query); + self.append_store_id(&mut query, false); query } fn get_in_query(&self, keys: Vec) -> String { let in_clause: String = keys .into_iter() - .map(|k| format!("'{}'", k)) + .map(|k| format!("'{k}'")) .collect::>() .join(", "); let mut query = format!("SELECT * FROM c WHERE c.id IN ({})", in_clause); - self.append_partition_key(&mut query); + self.append_store_id(&mut query, true); query } - fn append_partition_key(&self, query: &mut String) { - append_partition_key_condition(query, self.partition_key.as_deref()); + fn append_store_id(&self, query: &mut String, condition_already_exists: bool) { + append_store_id_condition(query, self.store_id.as_deref(), condition_already_exists); } } -/// Appends an option partition key condition to the query. -fn append_partition_key_condition(query: &mut String, partition_key: Option<&str>) { - if let Some(pk) = partition_key { - query.push_str(" AND c.partition_key='"); - query.push_str(pk); +/// Appends an option store id condition to the query. +fn append_store_id_condition( + query: &mut String, + store_id: Option<&str>, + condition_already_exists: bool, +) { + if let Some(s) = store_id { + if condition_already_exists { + query.push_str(" AND"); + } else { + query.push_str(" WHERE"); + } + query.push_str(" c.store_id='"); + query.push_str(s); query.push('\'') } } @@ -417,15 +425,13 @@ pub struct Pair { pub id: String, pub value: Vec, #[serde(skip_serializing_if = "Option::is_none")] - pub partition_key: Option, + pub store_id: Option, } impl CosmosEntity for Pair { type Entity = String; fn partition_key(&self) -> Self::Entity { - self.partition_key - .clone() - .unwrap_or_else(|| self.id.clone()) + self.store_id.clone().unwrap_or_else(|| self.id.clone()) } }