diff --git a/Cargo.lock b/Cargo.lock index 9d997e7c24..c7ca403378 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -330,6 +330,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-once-cell" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4288f83726785267c6f2ef073a3d83dc3f9b81464e9f99898240cced85fce35a" + [[package]] name = "async-priority-channel" version = "0.1.0" @@ -513,6 +519,329 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" +[[package]] +name = "aws-config" +version = "1.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b49afaa341e8dd8577e1a2200468f98956d6eda50bcf4a53246cc00174ba924" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-sdk-sso", + "aws-sdk-ssooidc", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.2.0", + "hex", + "http 0.2.12", + "ring", + "time", + "tokio", + "tracing", + "url", + "zeroize", +] + +[[package]] +name = "aws-credential-types" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60e8f6b615cb5fc60a98132268508ad104310f0cfb25a1c22eee76efdf9154da" +dependencies = [ + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "zeroize", +] + +[[package]] +name = "aws-runtime" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a10d5c055aa540164d9561a0e2e74ad30f0dcf7393c3a92f6733ddf9c5762468" +dependencies = [ + "aws-credential-types", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.2.0", + "http 0.2.12", + "http-body 0.4.6", + "once_cell", + "percent-encoding", + "pin-project-lite", + "tracing", + "uuid", +] + +[[package]] +name = "aws-sdk-dynamodb" +version = "1.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6355a9536b92daf4c4b7d4d4f60dd08ea780259ed80bac0312f2e1df4398176" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.2.0", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sso" +version = "1.49.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09677244a9da92172c8dc60109b4a9658597d4d298b188dd0018b6a66b410ca4" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-ssooidc" +version = "1.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81fea2f3a8bb3bd10932ae7ad59cc59f65f270fc9183a7e91f501dc5efbef7ee" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sts" +version = "1.49.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53dcf5e7d9bd1517b8b998e170e650047cea8a2b85fe1835abe3210713e541b7" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sigv4" +version = "1.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5619742a0d8f253be760bfbb8e8e8368c69e3587e4637af5754e488a611499b1" +dependencies = [ + "aws-credential-types", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "form_urlencoded", + "hex", + "hmac", + "http 0.2.12", + "http 1.1.0", + "once_cell", + "percent-encoding", + "sha2", + "time", + "tracing", +] + +[[package]] +name = "aws-smithy-async" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62220bc6e97f946ddd51b5f1361f78996e704677afc518a4ff66b7a72ea1378c" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "aws-smithy-http" +version = "0.60.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c8bc3e8fdc6b8d07d976e301c02fe553f72a39b7a9fea820e023268467d7ab6" +dependencies = [ + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.12", + "http-body 0.4.6", + "once_cell", + "percent-encoding", + "pin-project-lite", + "pin-utils", + "tracing", +] + +[[package]] +name = "aws-smithy-json" +version = "0.60.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4683df9469ef09468dad3473d129960119a0d3593617542b7d52086c8486f2d6" +dependencies = [ + "aws-smithy-types", +] + +[[package]] +name = "aws-smithy-query" +version = "0.60.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2fbd61ceb3fe8a1cb7352e42689cec5335833cd9f94103a61e98f9bb61c64bb" +dependencies = [ + "aws-smithy-types", + "urlencoding", +] + +[[package]] +name = "aws-smithy-runtime" +version = "1.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be28bd063fa91fd871d131fc8b68d7cd4c5fa0869bea68daca50dcb1cbd76be2" +dependencies = [ + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "fastrand 2.2.0", + "h2 0.3.26", + "http 0.2.12", + "http-body 0.4.6", + "http-body 1.0.1", + "httparse", + "hyper 0.14.31", + "hyper-rustls 0.24.2", + "once_cell", + "pin-project-lite", + "pin-utils", + "rustls 0.21.12", + "tokio", + "tracing", +] + +[[package]] +name = "aws-smithy-runtime-api" +version = "1.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92165296a47a812b267b4f41032ff8069ab7ff783696d217f0994a0d7ab585cd" +dependencies = [ + "aws-smithy-async", + "aws-smithy-types", + "bytes", + "http 0.2.12", + "http 1.1.0", + "pin-project-lite", + "tokio", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-types" +version = "1.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fbd94a32b3a7d55d3806fe27d98d3ad393050439dd05eb53ece36ec5e3d3510" +dependencies = [ + "base64-simd", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.12", + "http 1.1.0", + "http-body 0.4.6", + "http-body 1.0.1", + "http-body-util", + "itoa", + "num-integer", + "pin-project-lite", + "pin-utils", + "ryu", + "serde", + "time", + "tokio", + "tokio-util", +] + +[[package]] +name = "aws-smithy-xml" +version = "0.60.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab0b0166827aa700d3dc519f72f8b3a91c35d0b8d042dc5d643a91e6f80648fc" +dependencies = [ + "xmlparser", +] + +[[package]] +name = "aws-types" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5221b91b3e441e6675310829fd8984801b772cb1546ef6c0e54dec9f1ac13fef" +dependencies = [ + "aws-credential-types", + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "rustc_version", + "tracing", +] + [[package]] name = "axum" version = "0.7.7" @@ -678,6 +1007,16 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "base64-simd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "339abbe78e73178762e23bea9dfd08e697eb3f3301cd4be981c0f78ba5859195" +dependencies = [ + "outref", + "vsimd", +] + [[package]] name = "base64ct" version = "1.6.0" @@ -865,6 +1204,16 @@ dependencies = [ "serde", ] +[[package]] +name = "bytes-utils" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35" +dependencies = [ + "bytes", + "either", +] + [[package]] name = "camino" version = "1.1.9" @@ -3431,6 +3780,22 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +dependencies = [ + "futures-util", + "http 0.2.12", + "hyper 0.14.31", + "log", + "rustls 0.21.12", + "rustls-native-certs 0.6.3", + "tokio", + "tokio-rustls 0.24.1", +] + [[package]] name = "hyper-rustls" version = "0.25.0" @@ -3442,7 +3807,7 @@ dependencies = [ "hyper 0.14.31", "log", "rustls 0.22.4", - "rustls-native-certs", + "rustls-native-certs 0.7.3", "rustls-pki-types", "tokio", "tokio-rustls 0.25.0", @@ -5224,6 +5589,12 @@ version = "6.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1" +[[package]] +name = "outref" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" + [[package]] name = "overload" version = "0.1.1" @@ -6195,6 +6566,12 @@ dependencies = [ "regex-syntax 0.8.5", ] +[[package]] +name = "regex-lite" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" + [[package]] name = "regex-syntax" version = "0.6.29" @@ -6386,9 +6763,9 @@ dependencies = [ "flume", "futures-util", "log", - "rustls-native-certs", + "rustls-native-certs 0.7.3", "rustls-pemfile 2.2.0", - "rustls-webpki", + "rustls-webpki 0.102.8", "thiserror 1.0.69", "tokio", "tokio-rustls 0.25.0", @@ -6521,6 +6898,18 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustls" +version = "0.21.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" +dependencies = [ + "log", + "ring", + "rustls-webpki 0.101.7", + "sct", +] + [[package]] name = "rustls" version = "0.22.4" @@ -6530,7 +6919,7 @@ dependencies = [ "log", "ring", "rustls-pki-types", - "rustls-webpki", + "rustls-webpki 0.102.8", "subtle", "zeroize", ] @@ -6545,11 +6934,23 @@ dependencies = [ "once_cell", "ring", "rustls-pki-types", - "rustls-webpki", + "rustls-webpki 0.102.8", "subtle", "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +dependencies = [ + "openssl-probe", + "rustls-pemfile 1.0.4", + "schannel", + "security-framework", +] + [[package]] name = "rustls-native-certs" version = "0.7.3" @@ -6587,6 +6988,16 @@ version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" +[[package]] +name = "rustls-webpki" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustls-webpki" version = "0.102.8" @@ -6660,6 +7071,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "sec1" version = "0.7.3" @@ -7590,6 +8011,20 @@ dependencies = [ "wasmtime-wasi-http", ] +[[package]] +name = "spin-key-value-aws" +version = "3.1.0-pre0" +dependencies = [ + "anyhow", + "async-once-cell", + "aws-config", + "aws-credential-types", + "aws-sdk-dynamodb", + "serde", + "spin-core", + "spin-factor-key-value", +] + [[package]] name = "spin-key-value-azure" version = "3.1.0-pre0" @@ -7797,6 +8232,7 @@ dependencies = [ "spin-factor-wasi", "spin-factors", "spin-factors-test", + "spin-key-value-aws", "spin-key-value-azure", "spin-key-value-redis", "spin-key-value-spin", @@ -8636,6 +9072,16 @@ dependencies = [ "whoami", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls 0.21.12", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.25.0" @@ -9172,6 +9618,12 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "vsimd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" + [[package]] name = "wac-graph" version = "0.6.1" @@ -10705,6 +11157,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "xmlparser" +version = "0.13.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" + [[package]] name = "yaml-rust2" version = "0.8.1" diff --git a/crates/factor-key-value/src/host.rs b/crates/factor-key-value/src/host.rs index 06acd0bf40..ce5001a4cb 100644 --- a/crates/factor-key-value/src/host.rs +++ b/crates/factor-key-value/src/host.rs @@ -283,10 +283,10 @@ impl wasi_keyvalue::batch::Host for KeyValueDispatch { keys: Vec, ) -> std::result::Result>)>, wasi_keyvalue::store::Error> { let store = self.get_store_wasi(bucket)?; - store - .get_many(keys.iter().map(|k| k.to_string()).collect()) - .await - .map_err(to_wasi_err) + if keys.is_empty() { + return Ok(vec![]); + } + store.get_many(keys).await.map_err(to_wasi_err) } #[instrument(name = "spin_key_value.set_many", skip(self, bucket, key_values), err(level = Level::INFO), fields(otel.kind = "client"))] @@ -296,6 +296,9 @@ impl wasi_keyvalue::batch::Host for KeyValueDispatch { key_values: Vec<(String, Vec)>, ) -> std::result::Result<(), wasi_keyvalue::store::Error> { let store = self.get_store_wasi(bucket)?; + if key_values.is_empty() { + return Ok(()); + } store.set_many(key_values).await.map_err(to_wasi_err) } @@ -306,10 +309,10 @@ impl wasi_keyvalue::batch::Host for KeyValueDispatch { keys: Vec, ) -> std::result::Result<(), wasi_keyvalue::store::Error> { let store = self.get_store_wasi(bucket)?; - store - .delete_many(keys.iter().map(|k| k.to_string()).collect()) - .await - .map_err(to_wasi_err) + if keys.is_empty() { + return Ok(()); + } + store.delete_many(keys).await.map_err(to_wasi_err) } } @@ -355,6 +358,13 @@ impl wasi_keyvalue::atomics::HostCas for KeyValueDispatch { #[async_trait] impl wasi_keyvalue::atomics::Host for KeyValueDispatch { + fn convert_cas_error( + &mut self, + error: spin_world::wasi::keyvalue::atomics::CasError, + ) -> std::result::Result { + Ok(error) + } + #[instrument(name = "spin_key_value.increment", skip(self, bucket, key, delta), err(level = Level::INFO), fields(otel.kind = "client"))] async fn increment( &mut self, @@ -371,27 +381,29 @@ impl wasi_keyvalue::atomics::Host for KeyValueDispatch { &mut self, cas_res: Resource, value: Vec, - ) -> Result> { + ) -> Result<(), CasError> { let cas_rep = cas_res.rep(); let cas = self .get_cas(Resource::::new_own(cas_rep)) .map_err(|e| CasError::StoreError(atomics::Error::Other(e.to_string())))?; match cas.swap(value).await { - Ok(_) => Ok(Ok(())), + Ok(_) => Ok(()), Err(err) => match err { SwapError::CasFailed(_) => { let bucket = Resource::new_own(cas.bucket_rep().await); - let new_cas = self.new(bucket, cas.key().await).await?; + let new_cas = self + .new(bucket, cas.key().await) + .await + .map_err(CasError::StoreError)?; let new_cas_rep = new_cas.rep(); - self.current(Resource::new_own(new_cas_rep)).await?; - Err(anyhow::Error::new(CasError::CasFailed(Resource::new_own( - new_cas_rep, - )))) + self.current(Resource::new_own(new_cas_rep)) + .await + .map_err(CasError::StoreError)?; + let res = Resource::new_own(new_cas_rep); + Err(CasError::CasFailed(res)) } - SwapError::Other(msg) => Err(anyhow::Error::new(CasError::StoreError( - atomics::Error::Other(msg), - ))), + SwapError::Other(msg) => Err(CasError::StoreError(atomics::Error::Other(msg))), }, } } diff --git a/crates/factor-key-value/src/util.rs b/crates/factor-key-value/src/util.rs index cea72c92b6..9fec7e4348 100644 --- a/crates/factor-key-value/src/util.rs +++ b/crates/factor-key-value/src/util.rs @@ -251,6 +251,11 @@ impl Store for CachingStore { keys: Vec, ) -> anyhow::Result>)>, Error> { let mut state = self.state.lock().await; + + // Flush any outstanding writes first in case entries have been popped off the end of the LRU cache prior + // to their corresponding writes reaching the backing store. + state.flush().await?; + let mut found: Vec<(String, Option>)> = Vec::new(); let mut not_found: Vec = Vec::new(); for key in keys { @@ -260,10 +265,12 @@ impl Store for CachingStore { } } - let keys_and_values = self.inner.get_many(not_found).await?; - for (key, value) in keys_and_values { - found.push((key.clone(), value.clone())); - state.cache.put(key, value); + if !not_found.is_empty() { + let keys_and_values = self.inner.get_many(not_found).await?; + for (key, value) in keys_and_values { + found.push((key.clone(), value.clone())); + state.cache.put(key, value); + } } Ok(found) diff --git a/crates/key-value-aws/Cargo.toml b/crates/key-value-aws/Cargo.toml new file mode 100644 index 0000000000..26f636cbc4 --- /dev/null +++ b/crates/key-value-aws/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "spin-key-value-aws" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +rust-version.workspace = true + +[dependencies] +anyhow = { workspace = true } +async-once-cell = "0.5.4" +aws-config = "1.1.7" +aws-credential-types = "1.1.7" +aws-sdk-dynamodb = "1.49.0" +serde = { workspace = true } +spin-core = { path = "../core" } +spin-factor-key-value = { path = "../factor-key-value" } + +[lints] +workspace = true diff --git a/crates/key-value-aws/src/lib.rs b/crates/key-value-aws/src/lib.rs new file mode 100644 index 0000000000..85775975d6 --- /dev/null +++ b/crates/key-value-aws/src/lib.rs @@ -0,0 +1,74 @@ +mod store; + +use serde::Deserialize; +use spin_factor_key_value::runtime_config::spin::MakeKeyValueStore; +use store::{ + KeyValueAwsDynamo, KeyValueAwsDynamoAuthOptions, KeyValueAwsDynamoRuntimeConfigOptions, +}; + +/// A key-value store that uses AWS Dynamo as the backend. +#[derive(Default)] +pub struct AwsDynamoKeyValueStore { + _priv: (), +} + +impl AwsDynamoKeyValueStore { + /// Creates a new `AwsKeyValueStore`. + pub fn new() -> Self { + Self::default() + } +} + +/// Runtime configuration for the AWS Dynamo key-value store. +#[derive(Deserialize)] +pub struct AwsDynamoKeyValueRuntimeConfig { + /// The access key for the AWS Dynamo DB account role. + access_key: Option, + /// The secret key for authorization on the AWS Dynamo DB account. + secret_key: Option, + /// The token for authorization on the AWS Dynamo DB account. + token: Option, + /// The AWS region where the database is located + region: String, + /// Boolean determining whether to use strongly consistent reads. + /// Defaults to `false` but can be set to `true` to improve atomicity + consistent_read: Option, + /// The AWS Dynamo DB table. + table: String, +} + +impl MakeKeyValueStore for AwsDynamoKeyValueStore { + const RUNTIME_CONFIG_TYPE: &'static str = "aws_dynamo"; + + type RuntimeConfig = AwsDynamoKeyValueRuntimeConfig; + + type StoreManager = KeyValueAwsDynamo; + + fn make_store( + &self, + runtime_config: Self::RuntimeConfig, + ) -> anyhow::Result { + let AwsDynamoKeyValueRuntimeConfig { + access_key, + secret_key, + token, + region, + consistent_read, + table, + } = runtime_config; + let auth_options = match (access_key, secret_key) { + (Some(access_key), Some(secret_key)) => { + KeyValueAwsDynamoAuthOptions::RuntimeConfigValues( + KeyValueAwsDynamoRuntimeConfigOptions::new(access_key, secret_key, token), + ) + } + _ => KeyValueAwsDynamoAuthOptions::Environmental, + }; + KeyValueAwsDynamo::new( + region, + consistent_read.unwrap_or(false), + table, + auth_options, + ) + } +} diff --git a/crates/key-value-aws/src/store.rs b/crates/key-value-aws/src/store.rs new file mode 100644 index 0000000000..ed38e1e0b2 --- /dev/null +++ b/crates/key-value-aws/src/store.rs @@ -0,0 +1,567 @@ +use core::str; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; + +use anyhow::Result; +use aws_config::{BehaviorVersion, Region, SdkConfig}; +use aws_credential_types::Credentials; +use aws_sdk_dynamodb::{ + config::{ProvideCredentials, SharedCredentialsProvider}, + operation::{ + batch_get_item::BatchGetItemOutput, batch_write_item::BatchWriteItemOutput, + get_item::GetItemOutput, + }, + primitives::Blob, + types::{ + AttributeValue, DeleteRequest, KeysAndAttributes, PutRequest, TransactWriteItem, Update, + WriteRequest, + }, + Client, +}; +use spin_core::async_trait; +use spin_factor_key_value::{log_error, Cas, Error, Store, StoreManager, SwapError}; + +pub struct KeyValueAwsDynamo { + /// AWS region + region: String, + /// Whether to use strongly consistent reads + consistent_read: bool, + /// DynamoDB table, needs to be cloned when getting a store + table: Arc, + /// DynamoDB client + client: async_once_cell::Lazy< + Client, + std::pin::Pin + Send>>, + >, +} + +/// AWS Dynamo Key / Value runtime config literal options for authentication +#[derive(Clone, Debug)] +pub struct KeyValueAwsDynamoRuntimeConfigOptions { + access_key: String, + secret_key: String, + token: Option, +} + +impl KeyValueAwsDynamoRuntimeConfigOptions { + pub fn new(access_key: String, secret_key: String, token: Option) -> Self { + Self { + access_key, + secret_key, + token, + } + } +} + +impl ProvideCredentials for KeyValueAwsDynamoRuntimeConfigOptions { + fn provide_credentials<'a>( + &'a self, + ) -> aws_credential_types::provider::future::ProvideCredentials<'a> + where + Self: 'a, + { + aws_credential_types::provider::future::ProvideCredentials::ready(Ok(Credentials::new( + self.access_key.clone(), + self.secret_key.clone(), + self.token.clone(), + None, // Optional expiration time + "spin_custom_aws_provider", + ))) + } +} + +/// AWS Dynamo Key / Value enumeration for the possible authentication options +#[derive(Clone, Debug)] +pub enum KeyValueAwsDynamoAuthOptions { + /// Runtime Config values indicates credentials have been specified directly + RuntimeConfigValues(KeyValueAwsDynamoRuntimeConfigOptions), + /// Environmental indicates that the environment variables of the process should be used to + /// create the SDK Config for the Dynamo client. This will use the AWS Rust SDK's + /// aws_config::load_defaults to derive credentials based on what environment variables + /// have been set. + /// + /// See https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-authentication.html for options. + Environmental, +} + +impl KeyValueAwsDynamo { + pub fn new( + region: String, + consistent_read: bool, + table: String, + auth_options: KeyValueAwsDynamoAuthOptions, + ) -> Result { + let region_clone = region.clone(); + let client_fut = Box::pin(async move { + let sdk_config = match auth_options { + KeyValueAwsDynamoAuthOptions::RuntimeConfigValues(config) => SdkConfig::builder() + .credentials_provider(SharedCredentialsProvider::new(config)) + .region(Region::new(region_clone)) + .behavior_version(BehaviorVersion::latest()) + .build(), + KeyValueAwsDynamoAuthOptions::Environmental => { + aws_config::load_defaults(BehaviorVersion::latest()).await + } + }; + Client::new(&sdk_config) + }); + + Ok(Self { + region, + consistent_read, + table: Arc::new(table), + client: async_once_cell::Lazy::from_future(client_fut), + }) + } +} + +#[async_trait] +impl StoreManager for KeyValueAwsDynamo { + async fn get(&self, _name: &str) -> Result, Error> { + Ok(Arc::new(AwsDynamoStore { + client: self.client.get_unpin().await.clone(), + table: self.table.clone(), + consistent_read: self.consistent_read, + })) + } + + fn is_defined(&self, _store_name: &str) -> bool { + true + } + + fn summary(&self, _store_name: &str) -> Option { + Some(format!( + "AWS DynamoDB region: {}, table: {}", + self.region, self.table + )) + } +} + +struct AwsDynamoStore { + // Client wraps an Arc so should be low cost to clone + client: Client, + table: Arc, + consistent_read: bool, +} + +#[derive(Debug, Clone)] +enum CasState { + // Existing item with version + Versioned(String), + // Existing item without version + Unversioned(Blob), + // Item was missing when fetched during `current`, expected to be new + Unset, + // Potentially new item -- `current` was never called to fetch version + Unknown, +} + +struct CompareAndSwap { + key: String, + client: Client, + table: Arc, + bucket_rep: u32, + state: Mutex, +} + +/// Primary key in DynamoDB items used for querying items +const PK: &str = "PK"; +/// Value key in DynamoDB items storing item value as binary +const VAL: &str = "VAL"; +/// Version key in DynamoDB items used for atomic operations +const VER: &str = "VER"; + +#[async_trait] +impl Store for AwsDynamoStore { + async fn get(&self, key: &str) -> Result>, Error> { + let response = self + .client + .get_item() + .consistent_read(self.consistent_read) + .table_name(self.table.as_str()) + .key( + PK, + aws_sdk_dynamodb::types::AttributeValue::S(key.to_string()), + ) + .projection_expression(VAL) + .send() + .await + .map_err(log_error)?; + + let item = response.item.and_then(|mut item| { + if let Some(AttributeValue::B(val)) = item.remove(VAL) { + Some(val.into_inner()) + } else { + None + } + }); + + Ok(item) + } + + async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error> { + self.client + .put_item() + .table_name(self.table.as_str()) + .item(PK, AttributeValue::S(key.to_string())) + .item(VAL, AttributeValue::B(Blob::new(value))) + .send() + .await + .map_err(log_error)?; + Ok(()) + } + + async fn delete(&self, key: &str) -> Result<(), Error> { + self.client + .delete_item() + .table_name(self.table.as_str()) + .key(PK, AttributeValue::S(key.to_string())) + .send() + .await + .map_err(log_error)?; + Ok(()) + } + + async fn exists(&self, key: &str) -> Result { + let GetItemOutput { item, .. } = self + .client + .get_item() + .consistent_read(self.consistent_read) + .table_name(self.table.as_str()) + .key( + PK, + aws_sdk_dynamodb::types::AttributeValue::S(key.to_string()), + ) + .projection_expression(PK) + .send() + .await + .map_err(log_error)?; + + Ok(item.map(|item| item.contains_key(PK)).unwrap_or(false)) + } + + async fn get_keys(&self) -> Result, Error> { + let mut primary_keys = Vec::new(); + + let mut scan_paginator = self + .client + .scan() + .table_name(self.table.as_str()) + .projection_expression(PK) + .into_paginator() + .send(); + + while let Some(output) = scan_paginator.next().await { + let scan_output = output.map_err(log_error)?; + if let Some(items) = scan_output.items { + for mut item in items { + if let Some(AttributeValue::S(pk)) = item.remove(PK) { + primary_keys.push(pk); + } + } + } + } + + Ok(primary_keys) + } + + async fn get_many(&self, keys: Vec) -> Result>)>, Error> { + let mut results = Vec::with_capacity(keys.len()); + let mut keys_and_attributes_builder = KeysAndAttributes::builder() + .projection_expression(format!("{PK},{VAL}")) + .consistent_read(self.consistent_read); + for key in keys { + keys_and_attributes_builder = keys_and_attributes_builder.keys(HashMap::from_iter([( + PK.to_owned(), + AttributeValue::S(key), + )])) + } + let mut request_items = Some(HashMap::from_iter([( + self.table.to_string(), + keys_and_attributes_builder.build().map_err(log_error)?, + )])); + + while request_items.is_some() { + let BatchGetItemOutput { + responses, + unprocessed_keys, + .. + } = self + .client + .batch_get_item() + .set_request_items(request_items) + .send() + .await + .map_err(log_error)?; + + if let Some(items) = + responses.and_then(|mut responses| responses.remove(self.table.as_str())) + { + for mut item in items { + match (item.remove(PK), item.remove(VAL)) { + (Some(AttributeValue::S(pk)), Some(AttributeValue::B(val))) => { + results.push((pk, Some(val.into_inner()))); + } + (Some(AttributeValue::S(pk)), None) => { + results.push((pk, None)); + } + _ => (), + } + } + } + + request_items = unprocessed_keys.filter(|unprocessed| !unprocessed.is_empty()); + } + + Ok(results) + } + + async fn set_many(&self, key_values: Vec<(String, Vec)>) -> Result<(), Error> { + let mut data = Vec::with_capacity(key_values.len()); + for (key, val) in key_values { + data.push( + WriteRequest::builder() + .put_request( + PutRequest::builder() + .item(PK, AttributeValue::S(key)) + .item(VAL, AttributeValue::B(Blob::new(val))) + .build() + .map_err(log_error)?, + ) + .build(), + ) + } + + let mut request_items = Some(HashMap::from_iter([(self.table.to_string(), data)])); + + while request_items.is_some() { + let BatchWriteItemOutput { + unprocessed_items, .. + } = self + .client + .batch_write_item() + .set_request_items(request_items) + .send() + .await + .map_err(log_error)?; + + request_items = unprocessed_items.filter(|unprocessed| !unprocessed.is_empty()); + } + + Ok(()) + } + + async fn delete_many(&self, keys: Vec) -> Result<(), Error> { + let mut data = Vec::with_capacity(keys.len()); + for key in keys { + data.push( + WriteRequest::builder() + .delete_request( + DeleteRequest::builder() + .key(PK, AttributeValue::S(key)) + .build() + .map_err(log_error)?, + ) + .build(), + ) + } + + let mut request_items = Some(HashMap::from_iter([(self.table.to_string(), data)])); + + while request_items.is_some() { + let BatchWriteItemOutput { + unprocessed_items, .. + } = self + .client + .batch_write_item() + .set_request_items(request_items) + .send() + .await + .map_err(log_error)?; + + request_items = unprocessed_items.filter(|unprocessed| !unprocessed.is_empty()); + } + + Ok(()) + } + + async fn increment(&self, key: String, delta: i64) -> Result { + let GetItemOutput { item, .. } = self + .client + .get_item() + .consistent_read(true) + .table_name(self.table.as_str()) + .key(PK, AttributeValue::S(key.clone())) + .projection_expression(VAL) + .send() + .await + .map_err(log_error)?; + + let old_val = match item { + Some(mut current_item) => match current_item.remove(VAL) { + // We're expecting i64, so technically we could transmute but seems risky... + Some(AttributeValue::B(val)) => Some( + str::from_utf8(&val.into_inner()) + .map_err(log_error)? + .parse::() + .map_err(log_error)?, + ), + _ => None, + }, + None => None, + }; + + let new_val = old_val.unwrap_or(0) + delta; + + let mut update = Update::builder() + .table_name(self.table.as_str()) + .key(PK, AttributeValue::S(key)) + .update_expression("SET #VAL = :new_val") + .expression_attribute_names("#VAL", VAL) + .expression_attribute_values( + ":new_val", + AttributeValue::B(Blob::new(new_val.to_string().as_bytes())), + ); + + if let Some(old_val) = old_val { + update = update + .condition_expression("#VAL = :old_val") + .expression_attribute_values( + ":old_val", + AttributeValue::B(Blob::new(old_val.to_string().as_bytes())), + ) + } else { + update = update.condition_expression("attribute_not_exists (#VAL)") + } + + self.client + .transact_write_items() + .transact_items( + TransactWriteItem::builder() + .update(update.build().map_err(log_error)?) + .build(), + ) + .send() + .await + .map_err(log_error)?; + + Ok(new_val) + } + + async fn new_compare_and_swap( + &self, + bucket_rep: u32, + key: &str, + ) -> Result, Error> { + Ok(Arc::new(CompareAndSwap { + key: key.to_string(), + client: self.client.clone(), + table: self.table.clone(), + state: Mutex::new(CasState::Unknown), + bucket_rep, + })) + } +} + +#[async_trait] +impl Cas for CompareAndSwap { + async fn current(&self) -> Result>, Error> { + let GetItemOutput { item, .. } = self + .client + .get_item() + .consistent_read(true) + .table_name(self.table.as_str()) + .key(PK, AttributeValue::S(self.key.clone())) + .projection_expression(format!("{VAL},{VER}")) + .send() + .await + .map_err(log_error)?; + + match item { + Some(mut current_item) => match (current_item.remove(VAL), current_item.remove(VER)) { + (Some(AttributeValue::B(val)), Some(AttributeValue::N(ver))) => { + self.state + .lock() + .unwrap() + .clone_from(&CasState::Versioned(ver)); + + Ok(Some(val.into_inner())) + } + (Some(AttributeValue::B(val)), _) => { + self.state + .lock() + .unwrap() + .clone_from(&CasState::Unversioned(val.clone())); + + Ok(Some(val.into_inner())) + } + (_, _) => { + self.state.lock().unwrap().clone_from(&CasState::Unset); + Ok(None) + } + }, + None => { + self.state.lock().unwrap().clone_from(&CasState::Unset); + Ok(None) + } + } + } + + /// `swap` updates the value for the key -- if possible, using the version saved in the `current` function for + /// optimistic concurrency or the previous item value + async fn swap(&self, value: Vec) -> Result<(), SwapError> { + let mut update = Update::builder() + .table_name(self.table.as_str()) + .key(PK, AttributeValue::S(self.key.clone())) + .update_expression("SET #VAL = :val ADD #VER :increment") + .expression_attribute_names("#VAL", VAL) + .expression_attribute_names("#VER", VER) + .expression_attribute_values(":val", AttributeValue::B(Blob::new(value))) + .expression_attribute_values(":increment", AttributeValue::N("1".to_owned())); + + let state = self.state.lock().unwrap().clone(); + match state { + CasState::Versioned(version) => { + update = update + .condition_expression("#VER = :ver") + .expression_attribute_values(":ver", AttributeValue::N(version)); + } + CasState::Unversioned(old_val) => { + update = update + .condition_expression("#VAL = :old_val") + .expression_attribute_values(":old_val", AttributeValue::B(old_val)); + } + CasState::Unset => { + update = update.condition_expression("attribute_not_exists (#VAL)"); + } + CasState::Unknown => (), + }; + + self.client + .transact_write_items() + .transact_items( + TransactWriteItem::builder() + .update( + update + .build() + .map_err(|e| SwapError::Other(format!("{e:?}")))?, + ) + .build(), + ) + .send() + .await + .map_err(|e| SwapError::CasFailed(format!("{e:?}")))?; + + Ok(()) + } + + async fn bucket_rep(&self) -> u32 { + self.bucket_rep + } + + async fn key(&self) -> String { + self.key.clone() + } +} diff --git a/crates/key-value-azure/src/store.rs b/crates/key-value-azure/src/store.rs index 258934e4df..001864ea77 100644 --- a/crates/key-value-azure/src/store.rs +++ b/crates/key-value-azure/src/store.rs @@ -92,9 +92,8 @@ impl KeyValueAzureCosmos { #[async_trait] impl StoreManager for KeyValueAzureCosmos { - async fn get(&self, name: &str) -> Result, Error> { + async fn get(&self, _name: &str) -> Result, Error> { Ok(Arc::new(AzureCosmosStore { - _name: name.to_owned(), client: self.client.clone(), })) } @@ -114,7 +113,6 @@ impl StoreManager for KeyValueAzureCosmos { #[derive(Clone)] struct AzureCosmosStore { - _name: String, client: CollectionClient, } diff --git a/crates/key-value-spin/src/store.rs b/crates/key-value-spin/src/store.rs index f18b60f7b7..122e63c413 100644 --- a/crates/key-value-spin/src/store.rs +++ b/crates/key-value-spin/src/store.rs @@ -307,20 +307,35 @@ impl Cas for CompareAndSwap { async fn swap(&self, value: Vec) -> Result<(), SwapError> { task::block_in_place(|| { let old_value = self.value.lock().unwrap(); - let rows_changed = self.connection - .lock() - .unwrap() - .prepare_cached( - "UPDATE spin_key_value SET value=:new_value WHERE store=:name and key=:key and value=:old_value", - ) - .map_err(log_cas_error)? - .execute(named_params! { - ":name": &self.name, - ":key": self.key, - ":old_value": old_value.clone().unwrap(), - ":new_value": value, - }) - .map_err(log_cas_error)?; + let mut conn = self.connection.lock().unwrap(); + let rows_changed = match old_value.clone() { + Some(old_val) => { + conn + .prepare_cached( + "UPDATE spin_key_value SET value=:new_value WHERE store=:name and key=:key and value=:old_value") + .map_err(log_cas_error)? + .execute(named_params! { + ":name": &self.name, + ":key": self.key, + ":old_value": old_val, + ":new_value": value, + }) + .map_err(log_cas_error)? + } + None => { + let tx = conn.transaction().map_err(log_cas_error)?; + let rows = tx + .prepare_cached( + "INSERT INTO spin_key_value (store, key, value) VALUES ($1, $2, $3) + ON CONFLICT(store, key) DO UPDATE SET value=$3", + ) + .map_err(log_cas_error)? + .execute(rusqlite::params![&self.name, self.key, value]) + .map_err(log_cas_error)?; + tx.commit().map_err(log_cas_error)?; + rows + } + }; // We expect only 1 row to be updated. If 0, we know that the underlying value has changed. if rows_changed == 1 { @@ -507,15 +522,10 @@ mod test { let res = kv.swap(Resource::new_own(cas.rep()), cas_final_value).await; match res { Ok(_) => panic!("expected a CAS failure"), - Err(err) => { - for cause in err.chain() { - if let Some(cas_err) = cause.downcast_ref::() { - assert!(matches!(cas_err, CasError::CasFailed(_))); - return Ok(()); - } - } - panic!("expected a CAS failure") - } + Err(err) => match err { + CasError::CasFailed(_) => Ok(()), + CasError::StoreError(_) => panic!("expected a CasFailed error"), + }, } } diff --git a/crates/runtime-config/Cargo.toml b/crates/runtime-config/Cargo.toml index 569026129f..75839f47a6 100644 --- a/crates/runtime-config/Cargo.toml +++ b/crates/runtime-config/Cargo.toml @@ -23,6 +23,7 @@ spin-factor-sqlite = { path = "../factor-sqlite" } spin-factor-variables = { path = "../factor-variables" } spin-factor-wasi = { path = "../factor-wasi" } spin-factors = { path = "../factors" } +spin-key-value-aws = { path = "../key-value-aws" } spin-key-value-azure = { path = "../key-value-azure" } spin-key-value-redis = { path = "../key-value-redis" } spin-key-value-spin = { path = "../key-value-spin" } diff --git a/crates/runtime-config/src/lib.rs b/crates/runtime-config/src/lib.rs index fd0687361a..9639243633 100644 --- a/crates/runtime-config/src/lib.rs +++ b/crates/runtime-config/src/lib.rs @@ -400,6 +400,9 @@ pub fn key_value_config_resolver( key_value .register_store_type(spin_key_value_azure::AzureKeyValueStore::new()) .unwrap(); + key_value + .register_store_type(spin_key_value_aws::AwsDynamoKeyValueStore::new()) + .unwrap(); // Add handling of "default" store. let default_store_path = default_store_base_path.map(|p| p.join(DEFAULT_SPIN_STORE_FILENAME)); diff --git a/crates/world/src/lib.rs b/crates/world/src/lib.rs index a3ecf810e4..501ca9c568 100644 --- a/crates/world/src/lib.rs +++ b/crates/world/src/lib.rs @@ -33,6 +33,7 @@ wasmtime::component::bindgen!({ "spin:postgres/postgres/error" => spin::postgres::postgres::Error, "wasi:config/store@0.2.0-draft-2024-09-27/error" => wasi::config::store::Error, "wasi:keyvalue/store/error" => wasi::keyvalue::store::Error, + "wasi:keyvalue/atomics/cas-error" => wasi::keyvalue::atomics::CasError, }, trappable_imports: true, }); diff --git a/wit/deps/keyvalue-2024-10-17/atomic.wit b/wit/deps/keyvalue-2024-10-17/atomic.wit index 2c3e0d047b..4a02c58034 100644 --- a/wit/deps/keyvalue-2024-10-17/atomic.wit +++ b/wit/deps/keyvalue-2024-10-17/atomic.wit @@ -13,22 +13,22 @@ interface atomics { /// The error returned by a CAS operation variant cas-error { - /// A store error occurred when performing the operation - store-error(error), + /// A store error occurred when performing the operation + store-error(error), /// The CAS operation failed because the value was too old. This returns a new CAS handle /// for easy retries. Implementors MUST return a CAS handle that has been updated to the /// latest version or transaction. - cas-failed(cas), + cas-failed(cas), } /// A handle to a CAS (compare-and-swap) operation. resource cas { - /// Construct a new CAS operation. Implementors can map the underlying functionality - /// (transactions, versions, etc) as desired. - new: static func(bucket: borrow, key: string) -> result; - /// Get the current value of the key (if it exists). This allows for avoiding reads if all - /// that is needed to ensure the atomicity of the operation - current: func() -> result>, error>; + /// Construct a new CAS operation. Implementors can map the underlying functionality + /// (transactions, versions, etc) as desired. + new: static func(bucket: borrow, key: string) -> result; + /// Get the current value of the key (if it exists). This allows for avoiding reads if all + /// that is needed to ensure the atomicity of the operation + current: func() -> result>, error>; } /// Atomically increment the value associated with the key in the store by the given delta. It diff --git a/wit/deps/keyvalue-2024-10-17/world.wit b/wit/deps/keyvalue-2024-10-17/world.wit index 64eb4e1225..e8fb821a95 100644 --- a/wit/deps/keyvalue-2024-10-17/world.wit +++ b/wit/deps/keyvalue-2024-10-17/world.wit @@ -1,4 +1,4 @@ -package wasi: keyvalue@0.2.0-draft2; +package wasi:keyvalue@0.2.0-draft2; /// The `wasi:keyvalue/imports` world provides common APIs for interacting with key-value stores. /// Components targeting this world will be able to do: