From 12f3dad3814a0283c6b422e44d9ff0fe93c8f2e9 Mon Sep 17 00:00:00 2001 From: Darwin Boersma Date: Sun, 6 Oct 2024 16:57:52 -0600 Subject: [PATCH 01/13] Add AWS key value store Signed-off-by: Darwin Boersma --- Cargo.lock | 468 +++++++++++++++++++++++++++- crates/factor-key-value/src/host.rs | 10 +- crates/key-value-aws/Cargo.toml | 22 ++ crates/key-value-aws/src/lib.rs | 65 ++++ crates/key-value-aws/src/store.rs | 232 ++++++++++++++ crates/runtime-config/Cargo.toml | 1 + crates/runtime-config/src/lib.rs | 3 + 7 files changed, 788 insertions(+), 13 deletions(-) create mode 100644 crates/key-value-aws/Cargo.toml create mode 100644 crates/key-value-aws/src/lib.rs create mode 100644 crates/key-value-aws/src/store.rs diff --git a/Cargo.lock b/Cargo.lock index 9d997e7c24..c7ca403378 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -330,6 +330,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-once-cell" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4288f83726785267c6f2ef073a3d83dc3f9b81464e9f99898240cced85fce35a" + [[package]] name = "async-priority-channel" version = "0.1.0" @@ -513,6 +519,329 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" +[[package]] +name = "aws-config" +version = "1.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b49afaa341e8dd8577e1a2200468f98956d6eda50bcf4a53246cc00174ba924" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-sdk-sso", + "aws-sdk-ssooidc", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.2.0", + "hex", + "http 0.2.12", + "ring", + "time", + "tokio", + "tracing", + "url", + "zeroize", +] + +[[package]] +name = "aws-credential-types" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60e8f6b615cb5fc60a98132268508ad104310f0cfb25a1c22eee76efdf9154da" +dependencies = [ + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "zeroize", +] + +[[package]] +name = "aws-runtime" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a10d5c055aa540164d9561a0e2e74ad30f0dcf7393c3a92f6733ddf9c5762468" +dependencies = [ + "aws-credential-types", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.2.0", + "http 0.2.12", + "http-body 0.4.6", + "once_cell", + "percent-encoding", + "pin-project-lite", + "tracing", + "uuid", +] + +[[package]] +name = "aws-sdk-dynamodb" +version = "1.53.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6355a9536b92daf4c4b7d4d4f60dd08ea780259ed80bac0312f2e1df4398176" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.2.0", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sso" +version = "1.49.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09677244a9da92172c8dc60109b4a9658597d4d298b188dd0018b6a66b410ca4" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-ssooidc" +version = "1.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81fea2f3a8bb3bd10932ae7ad59cc59f65f270fc9183a7e91f501dc5efbef7ee" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sts" +version = "1.49.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53dcf5e7d9bd1517b8b998e170e650047cea8a2b85fe1835abe3210713e541b7" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sigv4" +version = "1.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5619742a0d8f253be760bfbb8e8e8368c69e3587e4637af5754e488a611499b1" +dependencies = [ + "aws-credential-types", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "form_urlencoded", + "hex", + "hmac", + "http 0.2.12", + "http 1.1.0", + "once_cell", + "percent-encoding", + "sha2", + "time", + "tracing", +] + +[[package]] +name = "aws-smithy-async" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62220bc6e97f946ddd51b5f1361f78996e704677afc518a4ff66b7a72ea1378c" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "aws-smithy-http" +version = "0.60.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c8bc3e8fdc6b8d07d976e301c02fe553f72a39b7a9fea820e023268467d7ab6" +dependencies = [ + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.12", + "http-body 0.4.6", + "once_cell", + "percent-encoding", + "pin-project-lite", + "pin-utils", + "tracing", +] + +[[package]] +name = "aws-smithy-json" +version = "0.60.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4683df9469ef09468dad3473d129960119a0d3593617542b7d52086c8486f2d6" +dependencies = [ + "aws-smithy-types", +] + +[[package]] +name = "aws-smithy-query" +version = "0.60.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2fbd61ceb3fe8a1cb7352e42689cec5335833cd9f94103a61e98f9bb61c64bb" +dependencies = [ + "aws-smithy-types", + "urlencoding", +] + +[[package]] +name = "aws-smithy-runtime" +version = "1.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be28bd063fa91fd871d131fc8b68d7cd4c5fa0869bea68daca50dcb1cbd76be2" +dependencies = [ + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "fastrand 2.2.0", + "h2 0.3.26", + "http 0.2.12", + "http-body 0.4.6", + "http-body 1.0.1", + "httparse", + "hyper 0.14.31", + "hyper-rustls 0.24.2", + "once_cell", + "pin-project-lite", + "pin-utils", + "rustls 0.21.12", + "tokio", + "tracing", +] + +[[package]] +name = "aws-smithy-runtime-api" +version = "1.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92165296a47a812b267b4f41032ff8069ab7ff783696d217f0994a0d7ab585cd" +dependencies = [ + "aws-smithy-async", + "aws-smithy-types", + "bytes", + "http 0.2.12", + "http 1.1.0", + "pin-project-lite", + "tokio", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-types" +version = "1.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fbd94a32b3a7d55d3806fe27d98d3ad393050439dd05eb53ece36ec5e3d3510" +dependencies = [ + "base64-simd", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.12", + "http 1.1.0", + "http-body 0.4.6", + "http-body 1.0.1", + "http-body-util", + "itoa", + "num-integer", + "pin-project-lite", + "pin-utils", + "ryu", + "serde", + "time", + "tokio", + "tokio-util", +] + +[[package]] +name = "aws-smithy-xml" +version = "0.60.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab0b0166827aa700d3dc519f72f8b3a91c35d0b8d042dc5d643a91e6f80648fc" +dependencies = [ + "xmlparser", +] + +[[package]] +name = "aws-types" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5221b91b3e441e6675310829fd8984801b772cb1546ef6c0e54dec9f1ac13fef" +dependencies = [ + "aws-credential-types", + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "rustc_version", + "tracing", +] + [[package]] name = "axum" version = "0.7.7" @@ -678,6 +1007,16 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "base64-simd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "339abbe78e73178762e23bea9dfd08e697eb3f3301cd4be981c0f78ba5859195" +dependencies = [ + "outref", + "vsimd", +] + [[package]] name = "base64ct" version = "1.6.0" @@ -865,6 +1204,16 @@ dependencies = [ "serde", ] +[[package]] +name = "bytes-utils" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35" +dependencies = [ + "bytes", + "either", +] + [[package]] name = "camino" version = "1.1.9" @@ -3431,6 +3780,22 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +dependencies = [ + "futures-util", + "http 0.2.12", + "hyper 0.14.31", + "log", + "rustls 0.21.12", + "rustls-native-certs 0.6.3", + "tokio", + "tokio-rustls 0.24.1", +] + [[package]] name = "hyper-rustls" version = "0.25.0" @@ -3442,7 +3807,7 @@ dependencies = [ "hyper 0.14.31", "log", "rustls 0.22.4", - "rustls-native-certs", + "rustls-native-certs 0.7.3", "rustls-pki-types", "tokio", "tokio-rustls 0.25.0", @@ -5224,6 +5589,12 @@ version = "6.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1" +[[package]] +name = "outref" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" + [[package]] name = "overload" version = "0.1.1" @@ -6195,6 +6566,12 @@ dependencies = [ "regex-syntax 0.8.5", ] +[[package]] +name = "regex-lite" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" + [[package]] name = "regex-syntax" version = "0.6.29" @@ -6386,9 +6763,9 @@ dependencies = [ "flume", "futures-util", "log", - "rustls-native-certs", + "rustls-native-certs 0.7.3", "rustls-pemfile 2.2.0", - "rustls-webpki", + "rustls-webpki 0.102.8", "thiserror 1.0.69", "tokio", "tokio-rustls 0.25.0", @@ -6521,6 +6898,18 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustls" +version = "0.21.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" +dependencies = [ + "log", + "ring", + "rustls-webpki 0.101.7", + "sct", +] + [[package]] name = "rustls" version = "0.22.4" @@ -6530,7 +6919,7 @@ dependencies = [ "log", "ring", "rustls-pki-types", - "rustls-webpki", + "rustls-webpki 0.102.8", "subtle", "zeroize", ] @@ -6545,11 +6934,23 @@ dependencies = [ "once_cell", "ring", "rustls-pki-types", - "rustls-webpki", + "rustls-webpki 0.102.8", "subtle", "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +dependencies = [ + "openssl-probe", + "rustls-pemfile 1.0.4", + "schannel", + "security-framework", +] + [[package]] name = "rustls-native-certs" version = "0.7.3" @@ -6587,6 +6988,16 @@ version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" +[[package]] +name = "rustls-webpki" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustls-webpki" version = "0.102.8" @@ -6660,6 +7071,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "sec1" version = "0.7.3" @@ -7590,6 +8011,20 @@ dependencies = [ "wasmtime-wasi-http", ] +[[package]] +name = "spin-key-value-aws" +version = "3.1.0-pre0" +dependencies = [ + "anyhow", + "async-once-cell", + "aws-config", + "aws-credential-types", + "aws-sdk-dynamodb", + "serde", + "spin-core", + "spin-factor-key-value", +] + [[package]] name = "spin-key-value-azure" version = "3.1.0-pre0" @@ -7797,6 +8232,7 @@ dependencies = [ "spin-factor-wasi", "spin-factors", "spin-factors-test", + "spin-key-value-aws", "spin-key-value-azure", "spin-key-value-redis", "spin-key-value-spin", @@ -8636,6 +9072,16 @@ dependencies = [ "whoami", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls 0.21.12", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.25.0" @@ -9172,6 +9618,12 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "vsimd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" + [[package]] name = "wac-graph" version = "0.6.1" @@ -10705,6 +11157,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "xmlparser" +version = "0.13.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" + [[package]] name = "yaml-rust2" version = "0.8.1" diff --git a/crates/factor-key-value/src/host.rs b/crates/factor-key-value/src/host.rs index 06acd0bf40..60d08b509d 100644 --- a/crates/factor-key-value/src/host.rs +++ b/crates/factor-key-value/src/host.rs @@ -283,10 +283,7 @@ impl wasi_keyvalue::batch::Host for KeyValueDispatch { keys: Vec, ) -> std::result::Result>)>, wasi_keyvalue::store::Error> { let store = self.get_store_wasi(bucket)?; - store - .get_many(keys.iter().map(|k| k.to_string()).collect()) - .await - .map_err(to_wasi_err) + store.get_many(keys).await.map_err(to_wasi_err) } #[instrument(name = "spin_key_value.set_many", skip(self, bucket, key_values), err(level = Level::INFO), fields(otel.kind = "client"))] @@ -306,10 +303,7 @@ impl wasi_keyvalue::batch::Host for KeyValueDispatch { keys: Vec, ) -> std::result::Result<(), wasi_keyvalue::store::Error> { let store = self.get_store_wasi(bucket)?; - store - .delete_many(keys.iter().map(|k| k.to_string()).collect()) - .await - .map_err(to_wasi_err) + store.delete_many(keys).await.map_err(to_wasi_err) } } diff --git a/crates/key-value-aws/Cargo.toml b/crates/key-value-aws/Cargo.toml new file mode 100644 index 0000000000..26f636cbc4 --- /dev/null +++ b/crates/key-value-aws/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "spin-key-value-aws" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +rust-version.workspace = true + +[dependencies] +anyhow = { workspace = true } +async-once-cell = "0.5.4" +aws-config = "1.1.7" +aws-credential-types = "1.1.7" +aws-sdk-dynamodb = "1.49.0" +serde = { workspace = true } +spin-core = { path = "../core" } +spin-factor-key-value = { path = "../factor-key-value" } + +[lints] +workspace = true diff --git a/crates/key-value-aws/src/lib.rs b/crates/key-value-aws/src/lib.rs new file mode 100644 index 0000000000..aaf3ecca71 --- /dev/null +++ b/crates/key-value-aws/src/lib.rs @@ -0,0 +1,65 @@ +mod store; + +use serde::Deserialize; +use spin_factor_key_value::runtime_config::spin::MakeKeyValueStore; +use store::{ + KeyValueAwsDynamo, KeyValueAwsDynamoAuthOptions, KeyValueAwsDynamoRuntimeConfigOptions, +}; + +/// A key-value store that uses AWS Dynamo as the backend. +#[derive(Default)] +pub struct AwsKeyValueStore { + _priv: (), +} + +impl AwsKeyValueStore { + /// Creates a new `AwsKeyValueStore`. + pub fn new() -> Self { + Self::default() + } +} + +/// Runtime configuration for the AWS Dynamo key-value store. +#[derive(Deserialize)] +pub struct AwsDynamoKeyValueRuntimeConfig { + /// The access key for the AWS Dynamo DB account role. + access_key: Option, + /// The secret key for authorization on the AWS Dynamo DB account. + secret_key: Option, + /// The token for authorization on the AWS Dynamo DB account. + token: Option, + /// The AWS region where the database is located + region: String, + /// The AWS Dynamo DB table. + table: String, +} + +impl MakeKeyValueStore for AwsKeyValueStore { + const RUNTIME_CONFIG_TYPE: &'static str = "aws_dynamo"; + + type RuntimeConfig = AwsDynamoKeyValueRuntimeConfig; + + type StoreManager = KeyValueAwsDynamo; + + fn make_store( + &self, + runtime_config: Self::RuntimeConfig, + ) -> anyhow::Result { + let AwsDynamoKeyValueRuntimeConfig { + access_key, + secret_key, + token, + region, + table, + } = runtime_config; + let auth_options = match (access_key, secret_key) { + (Some(access_key), Some(secret_key)) => { + KeyValueAwsDynamoAuthOptions::RuntimeConfigValues( + KeyValueAwsDynamoRuntimeConfigOptions::new(access_key, secret_key, token), + ) + } + _ => KeyValueAwsDynamoAuthOptions::Environmental, + }; + KeyValueAwsDynamo::new(region, table, auth_options) + } +} diff --git a/crates/key-value-aws/src/store.rs b/crates/key-value-aws/src/store.rs new file mode 100644 index 0000000000..7a1b8548b3 --- /dev/null +++ b/crates/key-value-aws/src/store.rs @@ -0,0 +1,232 @@ +use std::sync::Arc; + +use anyhow::Result; +use aws_config::{BehaviorVersion, Region, SdkConfig}; +use aws_credential_types::Credentials; +use aws_sdk_dynamodb::{ + config::{ProvideCredentials, SharedCredentialsProvider}, + primitives::Blob, + types::AttributeValue, + Client, +}; +use spin_core::async_trait; +use spin_factor_key_value::{log_error, Error, Store, StoreManager}; + +pub struct KeyValueAwsDynamo { + table: String, + region: String, + client: async_once_cell::Lazy< + Client, + std::pin::Pin + Send>>, + >, +} + +/// AWS Dynamo Key / Value runtime config literal options for authentication +#[derive(Clone, Debug)] +pub struct KeyValueAwsDynamoRuntimeConfigOptions { + access_key: String, + secret_key: String, + token: Option, +} + +impl KeyValueAwsDynamoRuntimeConfigOptions { + pub fn new(access_key: String, secret_key: String, token: Option) -> Self { + Self { + access_key, + secret_key, + token, + } + } +} + +impl ProvideCredentials for KeyValueAwsDynamoRuntimeConfigOptions { + fn provide_credentials<'a>( + &'a self, + ) -> aws_credential_types::provider::future::ProvideCredentials<'a> + where + Self: 'a, + { + aws_credential_types::provider::future::ProvideCredentials::ready(Ok(Credentials::new( + self.access_key.clone(), + self.secret_key.clone(), + self.token.clone(), + None, // Optional expiration time + "spin_custom_aws_provider", + ))) + } +} + +/// AWS Dynamo Key / Value enumeration for the possible authentication options +#[derive(Clone, Debug)] +pub enum KeyValueAwsDynamoAuthOptions { + /// Runtime Config values indicates credentials have been specified directly + RuntimeConfigValues(KeyValueAwsDynamoRuntimeConfigOptions), + /// Environmental indicates that the environment variables of the process should be used to + /// create the SDK Config for the Dynamo client. This will use the AWS Rust SDK's + /// aws_config::load_defaults to derive credentials based on what environment variables + /// have been set. + /// + /// See https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-authentication.html for options. + Environmental, +} + +impl KeyValueAwsDynamo { + pub fn new( + region: String, + table: String, + auth_options: KeyValueAwsDynamoAuthOptions, + ) -> Result { + let region_clone = region.clone(); + let client_fut: std::pin::Pin + Send>> = + Box::pin(async move { + let config = match auth_options { + KeyValueAwsDynamoAuthOptions::RuntimeConfigValues(config) => { + SdkConfig::builder() + .credentials_provider(SharedCredentialsProvider::new(config)) + .region(Region::new(region_clone)) + .behavior_version(BehaviorVersion::latest()) + .build() + } + KeyValueAwsDynamoAuthOptions::Environmental => { + aws_config::load_defaults(BehaviorVersion::latest()).await + } + }; + Client::new(&config) + }); + + Ok(Self { + client: async_once_cell::Lazy::from_future(client_fut), + table, + region, + }) + } +} + +#[async_trait] +impl StoreManager for KeyValueAwsDynamo { + async fn get(&self, name: &str) -> Result, Error> { + Ok(Arc::new(AwsDynamoStore { + _name: name.to_owned(), + client: self.client.get_unpin().await.clone(), + table: self.table.clone(), + })) + } + + fn is_defined(&self, _store_name: &str) -> bool { + true + } + + fn summary(&self, _store_name: &str) -> Option { + Some(format!( + "AWS DynamoDB region: {:?}, table: {}", + self.region, self.table + )) + } +} + +struct AwsDynamoStore { + _name: String, + client: Client, + table: String, +} + +const PK: &str = "PK"; +const VAL: &str = "val"; + +#[async_trait] +impl Store for AwsDynamoStore { + async fn get(&self, key: &str) -> Result>, Error> { + let item = self.get_item(key).await?; + Ok(item) + } + + async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error> { + self.client + .put_item() + .table_name(self.table.clone()) + .item(PK, AttributeValue::S(key.to_string())) + .item(VAL, AttributeValue::B(Blob::new(value))) + .send() + .await + .map_err(log_error)?; + Ok(()) + } + + async fn delete(&self, key: &str) -> Result<(), Error> { + if self.exists(key).await? { + self.client + .delete_item() + .table_name(self.table.clone()) + .key(PK, AttributeValue::S(key.to_string())) + .send() + .await + .map_err(log_error)?; + } + Ok(()) + } + + async fn exists(&self, key: &str) -> Result { + Ok(self.get_item(key).await?.is_some()) + } + + async fn get_keys(&self) -> Result, Error> { + self.get_keys().await + } +} + +impl AwsDynamoStore { + async fn get_item(&self, key: &str) -> Result>, Error> { + let query = self + .client + .get_item() + .table_name(self.table.clone()) + .key(PK, aws_sdk_dynamodb::types::AttributeValue::S(key.into())) + .send() + .await + .map_err(log_error)?; + + Ok(query.item.and_then(|item| { + if let Some(AttributeValue::B(val)) = item.get(VAL) { + Some(val.clone().into_inner()) + } else { + None + } + })) + } + + async fn get_keys(&self) -> Result, Error> { + let mut primary_keys = Vec::new(); + let mut last_evaluated_key = None; + + loop { + let mut scan_builder = self + .client + .scan() + .table_name(self.table.clone()) + .projection_expression(PK); + + if let Some(keys) = last_evaluated_key { + for (key, val) in keys { + scan_builder = scan_builder.exclusive_start_key(key, val); + } + } + + let scan_output = scan_builder.send().await.map_err(log_error)?; + + if let Some(items) = scan_output.items { + for item in items { + if let Some(AttributeValue::S(pk)) = item.get(PK) { + primary_keys.push(pk.clone()); + } + } + } + + last_evaluated_key = scan_output.last_evaluated_key; + if last_evaluated_key.is_none() { + break; + } + } + + Ok(primary_keys) + } +} diff --git a/crates/runtime-config/Cargo.toml b/crates/runtime-config/Cargo.toml index 569026129f..75839f47a6 100644 --- a/crates/runtime-config/Cargo.toml +++ b/crates/runtime-config/Cargo.toml @@ -23,6 +23,7 @@ spin-factor-sqlite = { path = "../factor-sqlite" } spin-factor-variables = { path = "../factor-variables" } spin-factor-wasi = { path = "../factor-wasi" } spin-factors = { path = "../factors" } +spin-key-value-aws = { path = "../key-value-aws" } spin-key-value-azure = { path = "../key-value-azure" } spin-key-value-redis = { path = "../key-value-redis" } spin-key-value-spin = { path = "../key-value-spin" } diff --git a/crates/runtime-config/src/lib.rs b/crates/runtime-config/src/lib.rs index fd0687361a..aa14c5dc47 100644 --- a/crates/runtime-config/src/lib.rs +++ b/crates/runtime-config/src/lib.rs @@ -400,6 +400,9 @@ pub fn key_value_config_resolver( key_value .register_store_type(spin_key_value_azure::AzureKeyValueStore::new()) .unwrap(); + key_value + .register_store_type(spin_key_value_aws::AwsKeyValueStore::new()) + .unwrap(); // Add handling of "default" store. let default_store_path = default_store_base_path.map(|p| p.join(DEFAULT_SPIN_STORE_FILENAME)); From 3b6e6aa3a55b797dc870aa10eb11a3ace6ce7b60 Mon Sep 17 00:00:00 2001 From: Darwin Boersma Date: Wed, 30 Oct 2024 09:00:46 -0600 Subject: [PATCH 02/13] Update to DynamoKeyValueStore to make other AWS KV store implementations easier Signed-off-by: Darwin Boersma --- crates/key-value-aws/src/lib.rs | 6 +-- crates/key-value-aws/src/store.rs | 62 ++++++++++++++++--------------- crates/runtime-config/src/lib.rs | 2 +- 3 files changed, 36 insertions(+), 34 deletions(-) diff --git a/crates/key-value-aws/src/lib.rs b/crates/key-value-aws/src/lib.rs index aaf3ecca71..a0305be668 100644 --- a/crates/key-value-aws/src/lib.rs +++ b/crates/key-value-aws/src/lib.rs @@ -8,11 +8,11 @@ use store::{ /// A key-value store that uses AWS Dynamo as the backend. #[derive(Default)] -pub struct AwsKeyValueStore { +pub struct AwsDynamoKeyValueStore { _priv: (), } -impl AwsKeyValueStore { +impl AwsDynamoKeyValueStore { /// Creates a new `AwsKeyValueStore`. pub fn new() -> Self { Self::default() @@ -34,7 +34,7 @@ pub struct AwsDynamoKeyValueRuntimeConfig { table: String, } -impl MakeKeyValueStore for AwsKeyValueStore { +impl MakeKeyValueStore for AwsDynamoKeyValueStore { const RUNTIME_CONFIG_TYPE: &'static str = "aws_dynamo"; type RuntimeConfig = AwsDynamoKeyValueRuntimeConfig; diff --git a/crates/key-value-aws/src/store.rs b/crates/key-value-aws/src/store.rs index 7a1b8548b3..ca9d0d5737 100644 --- a/crates/key-value-aws/src/store.rs +++ b/crates/key-value-aws/src/store.rs @@ -77,27 +77,24 @@ impl KeyValueAwsDynamo { auth_options: KeyValueAwsDynamoAuthOptions, ) -> Result { let region_clone = region.clone(); - let client_fut: std::pin::Pin + Send>> = - Box::pin(async move { - let config = match auth_options { - KeyValueAwsDynamoAuthOptions::RuntimeConfigValues(config) => { - SdkConfig::builder() - .credentials_provider(SharedCredentialsProvider::new(config)) - .region(Region::new(region_clone)) - .behavior_version(BehaviorVersion::latest()) - .build() - } - KeyValueAwsDynamoAuthOptions::Environmental => { - aws_config::load_defaults(BehaviorVersion::latest()).await - } - }; - Client::new(&config) - }); + let client_fut = Box::pin(async move { + let sdk_config = match auth_options { + KeyValueAwsDynamoAuthOptions::RuntimeConfigValues(config) => SdkConfig::builder() + .credentials_provider(SharedCredentialsProvider::new(config)) + .region(Region::new(region_clone)) + .behavior_version(BehaviorVersion::latest()) + .build(), + KeyValueAwsDynamoAuthOptions::Environmental => { + aws_config::load_defaults(BehaviorVersion::latest()).await + } + }; + Client::new(&sdk_config) + }); Ok(Self { - client: async_once_cell::Lazy::from_future(client_fut), table, region, + client: async_once_cell::Lazy::from_future(client_fut), }) } } @@ -143,7 +140,7 @@ impl Store for AwsDynamoStore { async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error> { self.client .put_item() - .table_name(self.table.clone()) + .table_name(&self.table) .item(PK, AttributeValue::S(key.to_string())) .item(VAL, AttributeValue::B(Blob::new(value))) .send() @@ -156,7 +153,7 @@ impl Store for AwsDynamoStore { if self.exists(key).await? { self.client .delete_item() - .table_name(self.table.clone()) + .table_name(&self.table) .key(PK, AttributeValue::S(key.to_string())) .send() .await @@ -176,22 +173,27 @@ impl Store for AwsDynamoStore { impl AwsDynamoStore { async fn get_item(&self, key: &str) -> Result>, Error> { - let query = self + let response = self .client .get_item() - .table_name(self.table.clone()) - .key(PK, aws_sdk_dynamodb::types::AttributeValue::S(key.into())) + .table_name(&self.table) + .key( + PK, + aws_sdk_dynamodb::types::AttributeValue::S(key.to_string()), + ) .send() .await .map_err(log_error)?; - Ok(query.item.and_then(|item| { - if let Some(AttributeValue::B(val)) = item.get(VAL) { - Some(val.clone().into_inner()) + let val = response.item.and_then(|mut item| { + if let Some(AttributeValue::B(val)) = item.remove(VAL) { + Some(val.into_inner()) } else { None } - })) + }); + + Ok(val) } async fn get_keys(&self) -> Result, Error> { @@ -202,7 +204,7 @@ impl AwsDynamoStore { let mut scan_builder = self .client .scan() - .table_name(self.table.clone()) + .table_name(&self.table) .projection_expression(PK); if let Some(keys) = last_evaluated_key { @@ -214,9 +216,9 @@ impl AwsDynamoStore { let scan_output = scan_builder.send().await.map_err(log_error)?; if let Some(items) = scan_output.items { - for item in items { - if let Some(AttributeValue::S(pk)) = item.get(PK) { - primary_keys.push(pk.clone()); + for mut item in items { + if let Some(AttributeValue::S(pk)) = item.remove(PK) { + primary_keys.push(pk); } } } diff --git a/crates/runtime-config/src/lib.rs b/crates/runtime-config/src/lib.rs index aa14c5dc47..9639243633 100644 --- a/crates/runtime-config/src/lib.rs +++ b/crates/runtime-config/src/lib.rs @@ -401,7 +401,7 @@ pub fn key_value_config_resolver( .register_store_type(spin_key_value_azure::AzureKeyValueStore::new()) .unwrap(); key_value - .register_store_type(spin_key_value_aws::AwsKeyValueStore::new()) + .register_store_type(spin_key_value_aws::AwsDynamoKeyValueStore::new()) .unwrap(); // Add handling of "default" store. From 74cfa2bda65b897ccc8003d4cbc46fcb562f52d5 Mon Sep 17 00:00:00 2001 From: Darwin Boersma Date: Tue, 5 Nov 2024 07:43:47 -0700 Subject: [PATCH 03/13] Compiling with todos for new functionality Signed-off-by: Darwin Boersma --- crates/key-value-aws/src/store.rs | 35 +++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/crates/key-value-aws/src/store.rs b/crates/key-value-aws/src/store.rs index ca9d0d5737..86bc8cb27d 100644 --- a/crates/key-value-aws/src/store.rs +++ b/crates/key-value-aws/src/store.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use anyhow::Result; use aws_config::{BehaviorVersion, Region, SdkConfig}; @@ -6,7 +6,7 @@ use aws_credential_types::Credentials; use aws_sdk_dynamodb::{ config::{ProvideCredentials, SharedCredentialsProvider}, primitives::Blob, - types::AttributeValue, + types::{AttributeValue, KeysAndAttributes}, Client, }; use spin_core::async_trait; @@ -127,6 +127,13 @@ struct AwsDynamoStore { table: String, } +// struct CompareAndSwap { +// key: String, +// client: CollectionClient, +// bucket_rep: u32, +// etag: Mutex>, +// } + const PK: &str = "PK"; const VAL: &str = "val"; @@ -169,6 +176,30 @@ impl Store for AwsDynamoStore { async fn get_keys(&self) -> Result, Error> { self.get_keys().await } + + async fn get_many(&self, keys: Vec) -> Result>)>, Error> { + todo!() + } + + async fn set_many(&self, key_values: Vec<(String, Vec)>) -> Result<(), Error> { + todo!() + } + + async fn delete_many(&self, keys: Vec) -> Result<(), Error> { + todo!() + } + + async fn increment(&self, key: String, delta: i64) -> Result { + todo!() + } + + async fn new_compare_and_swap( + &self, + bucket_rep: u32, + key: &str, + ) -> Result, Error> { + todo!() + } } impl AwsDynamoStore { From ef2e26e2031b56edc5e6a9a9f8e85d0c997f6871 Mon Sep 17 00:00:00 2001 From: Darwin Boersma Date: Tue, 5 Nov 2024 08:36:35 -0700 Subject: [PATCH 04/13] Implemented first draft batch operations Signed-off-by: Darwin Boersma --- crates/key-value-aws/src/store.rs | 122 +++++++++++++++++++++++++++++- 1 file changed, 118 insertions(+), 4 deletions(-) diff --git a/crates/key-value-aws/src/store.rs b/crates/key-value-aws/src/store.rs index 86bc8cb27d..e20ca08fa3 100644 --- a/crates/key-value-aws/src/store.rs +++ b/crates/key-value-aws/src/store.rs @@ -5,8 +5,9 @@ use aws_config::{BehaviorVersion, Region, SdkConfig}; use aws_credential_types::Credentials; use aws_sdk_dynamodb::{ config::{ProvideCredentials, SharedCredentialsProvider}, + operation::batch_get_item::BatchGetItemOutput, primitives::Blob, - types::{AttributeValue, KeysAndAttributes}, + types::{AttributeValue, DeleteRequest, KeysAndAttributes, PutRequest, WriteRequest}, Client, }; use spin_core::async_trait; @@ -178,15 +179,128 @@ impl Store for AwsDynamoStore { } async fn get_many(&self, keys: Vec) -> Result>)>, Error> { - todo!() + let mut results = Vec::with_capacity(keys.len()); + + let mut keys_and_attributes_builder = KeysAndAttributes::builder(); + for key in keys { + keys_and_attributes_builder = keys_and_attributes_builder.keys(HashMap::from_iter([( + PK.to_owned(), + AttributeValue::S(key), + )])) + } + let mut request_items = Some(HashMap::from_iter([( + self.table.clone(), + keys_and_attributes_builder.build().map_err(log_error)?, + )])); + + loop { + let BatchGetItemOutput { + responses: Some(mut responses), + unprocessed_keys, + .. + } = self + .client + .batch_get_item() + .set_request_items(request_items) + .send() + .await + .map_err(log_error)? + else { + return Err(Error::Other("No results".into())); + }; + + if let Some(items) = responses.remove(&self.table) { + for mut item in items { + let Some(AttributeValue::S(pk)) = item.remove(PK) else { + return Err(Error::Other( + "Could not find 'PK' key on DynamoDB item".into(), + )); + }; + let Some(AttributeValue::B(val)) = item.remove(VAL) else { + return Err(Error::Other( + "Could not find 'val' key on DynamoDB item".into(), + )); + }; + + results.push((pk, Some(val.into_inner()))); + } + } + + match unprocessed_keys { + None => return Ok(results), + // TODO: break out if we have retried 10+ times? + remaining_keys => request_items = remaining_keys, + } + } } async fn set_many(&self, key_values: Vec<(String, Vec)>) -> Result<(), Error> { - todo!() + let mut data = Vec::with_capacity(key_values.len()); + for (key, val) in key_values { + data.push( + WriteRequest::builder() + .put_request( + PutRequest::builder() + .item(PK, AttributeValue::S(key)) + .item(VAL, AttributeValue::B(Blob::new(val))) + .build() + .map_err(log_error)?, + ) + .build(), + ) + } + + let mut request_items = Some(HashMap::from_iter([(self.table.clone(), data)])); + + loop { + let results = self + .client + .batch_write_item() + .set_request_items(request_items) + .send() + .await + .map_err(log_error)?; + + match results.unprocessed_items { + None => return Ok(()), + // TODO: break out if we have retried 10+ times? + remaining_items => request_items = remaining_items, + } + } } async fn delete_many(&self, keys: Vec) -> Result<(), Error> { - todo!() + let mut data = Vec::with_capacity(keys.len()); + for key in keys { + data.push( + WriteRequest::builder() + .delete_request( + DeleteRequest::builder() + .key(PK, AttributeValue::S(key)) + .build() + .map_err(log_error)?, + ) + .build(), + ) + } + + let mut input = Some(HashMap::from_iter([(self.table.clone(), data)])); + + loop { + let results = self + .client + .batch_write_item() + .set_request_items(input) + .send() + .await + .map_err(log_error)?; + + match results.unprocessed_items { + None => return Ok(()), + // TODO: break out if we have retried 10+ times? + remaining_items => input = remaining_items, + } + } } async fn increment(&self, key: String, delta: i64) -> Result { From 2460378fcba14a0317d8e71c54dcc534ff84cebf Mon Sep 17 00:00:00 2001 From: Darwin Boersma Date: Tue, 5 Nov 2024 08:54:08 -0700 Subject: [PATCH 05/13] More updates, partial CAS implementation Signed-off-by: Darwin Boersma --- crates/key-value-aws/src/store.rs | 70 ++++++++++++++++++++++++++----- 1 file changed, 59 insertions(+), 11 deletions(-) diff --git a/crates/key-value-aws/src/store.rs b/crates/key-value-aws/src/store.rs index e20ca08fa3..e5d1553170 100644 --- a/crates/key-value-aws/src/store.rs +++ b/crates/key-value-aws/src/store.rs @@ -1,17 +1,20 @@ -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; use anyhow::Result; use aws_config::{BehaviorVersion, Region, SdkConfig}; use aws_credential_types::Credentials; use aws_sdk_dynamodb::{ config::{ProvideCredentials, SharedCredentialsProvider}, - operation::batch_get_item::BatchGetItemOutput, + operation::{batch_get_item::BatchGetItemOutput, update_item::UpdateItemOutput}, primitives::Blob, types::{AttributeValue, DeleteRequest, KeysAndAttributes, PutRequest, WriteRequest}, Client, }; use spin_core::async_trait; -use spin_factor_key_value::{log_error, Error, Store, StoreManager}; +use spin_factor_key_value::{log_error, Cas, Error, Store, StoreManager, SwapError}; pub struct KeyValueAwsDynamo { table: String, @@ -128,12 +131,12 @@ struct AwsDynamoStore { table: String, } -// struct CompareAndSwap { -// key: String, -// client: CollectionClient, -// bucket_rep: u32, -// etag: Mutex>, -// } +struct CompareAndSwap { + key: String, + client: Client, + bucket_rep: u32, + etag: Mutex>, +} const PK: &str = "PK"; const VAL: &str = "val"; @@ -304,7 +307,26 @@ impl Store for AwsDynamoStore { } async fn increment(&self, key: String, delta: i64) -> Result { - todo!() + let result = self + .client + .update_item() + .table_name(&self.table) + .key(PK, AttributeValue::S(key)) + .update_expression("ADD #val :delta") + .expression_attribute_names("#val", VAL) + .expression_attribute_values(":delta", AttributeValue::N(delta.to_string())) + .return_values(aws_sdk_dynamodb::types::ReturnValue::UpdatedNew) + .send() + .await + .map_err(log_error)?; + + if let Some(updated_attributes) = result.attributes { + if let Some(AttributeValue::N(new_value)) = updated_attributes.get(VAL) { + return Ok(new_value.parse::().map_err(log_error))?; + } + } + + Err(Error::Other("Failed to increment value".into())) } async fn new_compare_and_swap( @@ -312,7 +334,33 @@ impl Store for AwsDynamoStore { bucket_rep: u32, key: &str, ) -> Result, Error> { - todo!() + Ok(Arc::new(CompareAndSwap { + key: key.to_string(), + client: self.client.clone(), + etag: Mutex::new(None), + bucket_rep, + })) + } +} + +#[async_trait] +impl Cas for CompareAndSwap { + async fn current(&self) -> Result>, Error> { + todo!(); + } + + /// `swap` updates the value for the key using the etag saved in the `current` function for + /// optimistic concurrency. + async fn swap(&self, value: Vec) -> Result<(), SwapError> { + todo!(); + } + + async fn bucket_rep(&self) -> u32 { + self.bucket_rep + } + + async fn key(&self) -> String { + self.key.clone() } } From c3d587b42a4044cf0f6b5474fd94a52e3d852406 Mon Sep 17 00:00:00 2001 From: Darwin Boersma Date: Wed, 6 Nov 2024 20:43:36 -0700 Subject: [PATCH 06/13] CAS implementation Signed-off-by: Darwin Boersma --- crates/key-value-aws/src/store.rs | 136 ++++++++++++++++++++++-------- 1 file changed, 100 insertions(+), 36 deletions(-) diff --git a/crates/key-value-aws/src/store.rs b/crates/key-value-aws/src/store.rs index e5d1553170..4171eb0cff 100644 --- a/crates/key-value-aws/src/store.rs +++ b/crates/key-value-aws/src/store.rs @@ -8,7 +8,7 @@ use aws_config::{BehaviorVersion, Region, SdkConfig}; use aws_credential_types::Credentials; use aws_sdk_dynamodb::{ config::{ProvideCredentials, SharedCredentialsProvider}, - operation::{batch_get_item::BatchGetItemOutput, update_item::UpdateItemOutput}, + operation::{batch_get_item::BatchGetItemOutput, get_item::GetItemOutput}, primitives::Blob, types::{AttributeValue, DeleteRequest, KeysAndAttributes, PutRequest, WriteRequest}, Client, @@ -17,8 +17,8 @@ use spin_core::async_trait; use spin_factor_key_value::{log_error, Cas, Error, Store, StoreManager, SwapError}; pub struct KeyValueAwsDynamo { - table: String, - region: String, + table: Arc, + region: Arc, client: async_once_cell::Lazy< Client, std::pin::Pin + Send>>, @@ -96,8 +96,8 @@ impl KeyValueAwsDynamo { }); Ok(Self { - table, - region, + table: Arc::new(table), + region: Arc::new(region), client: async_once_cell::Lazy::from_future(client_fut), }) } @@ -128,18 +128,23 @@ impl StoreManager for KeyValueAwsDynamo { struct AwsDynamoStore { _name: String, client: Client, - table: String, + table: Arc, } struct CompareAndSwap { key: String, client: Client, + table: Arc, bucket_rep: u32, etag: Mutex>, } +/// Primary key in DynamoDB items used for querying items const PK: &str = "PK"; +/// Value key in DynamoDB items storing item value as binary const VAL: &str = "val"; +/// Version key in DynamoDB items used for optimistic locking +const VER: &str = "ver"; #[async_trait] impl Store for AwsDynamoStore { @@ -151,7 +156,7 @@ impl Store for AwsDynamoStore { async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error> { self.client .put_item() - .table_name(&self.table) + .table_name(self.table.as_str()) .item(PK, AttributeValue::S(key.to_string())) .item(VAL, AttributeValue::B(Blob::new(value))) .send() @@ -164,7 +169,7 @@ impl Store for AwsDynamoStore { if self.exists(key).await? { self.client .delete_item() - .table_name(&self.table) + .table_name(self.table.as_str()) .key(PK, AttributeValue::S(key.to_string())) .send() .await @@ -192,11 +197,11 @@ impl Store for AwsDynamoStore { )])) } let mut request_items = Some(HashMap::from_iter([( - self.table.clone(), + self.table.to_string(), keys_and_attributes_builder.build().map_err(log_error)?, )])); - loop { + while request_items.is_some() { let BatchGetItemOutput { responses: Some(mut responses), unprocessed_keys, @@ -212,7 +217,7 @@ impl Store for AwsDynamoStore { return Err(Error::Other("No results".into())); }; - if let Some(items) = responses.remove(&self.table) { + if let Some(items) = responses.remove(self.table.as_str()) { for mut item in items { let Some(AttributeValue::S(pk)) = item.remove(PK) else { return Err(Error::Other( @@ -229,12 +234,10 @@ impl Store for AwsDynamoStore { } } - match unprocessed_keys { - None => return Ok(results), - // TODO: break out if we have retried 10+ times? - remaining_keys => request_items = remaining_keys, - } + request_items = unprocessed_keys; } + + Ok(results) } async fn set_many(&self, key_values: Vec<(String, Vec)>) -> Result<(), Error> { @@ -253,9 +256,9 @@ impl Store for AwsDynamoStore { ) } - let mut request_items = Some(HashMap::from_iter([(self.table.clone(), data)])); + let mut request_items = Some(HashMap::from_iter([(self.table.to_string(), data)])); - loop { + while request_items.is_some() { let results = self .client .batch_write_item() @@ -264,12 +267,10 @@ impl Store for AwsDynamoStore { .await .map_err(log_error)?; - match results.unprocessed_items { - None => return Ok(()), - // TODO: break out if we have retried 10+ times? - remaining_items => request_items = remaining_items, - } + request_items = results.unprocessed_items; } + + Ok(()) } async fn delete_many(&self, keys: Vec) -> Result<(), Error> { @@ -287,30 +288,28 @@ impl Store for AwsDynamoStore { ) } - let mut input = Some(HashMap::from_iter([(self.table.clone(), data)])); + let mut request_items = Some(HashMap::from_iter([(self.table.to_string(), data)])); - loop { + while request_items.is_some() { let results = self .client .batch_write_item() - .set_request_items(input) + .set_request_items(request_items) .send() .await .map_err(log_error)?; - match results.unprocessed_items { - None => return Ok(()), - // TODO: break out if we have retried 10+ times? - remaining_items => input = remaining_items, - } + request_items = results.unprocessed_items; } + + Ok(()) } async fn increment(&self, key: String, delta: i64) -> Result { let result = self .client .update_item() - .table_name(&self.table) + .table_name(self.table.as_str()) .key(PK, AttributeValue::S(key)) .update_expression("ADD #val :delta") .expression_attribute_names("#val", VAL) @@ -337,6 +336,7 @@ impl Store for AwsDynamoStore { Ok(Arc::new(CompareAndSwap { key: key.to_string(), client: self.client.clone(), + table: self.table.clone(), etag: Mutex::new(None), bucket_rep, })) @@ -346,13 +346,77 @@ impl Store for AwsDynamoStore { #[async_trait] impl Cas for CompareAndSwap { async fn current(&self) -> Result>, Error> { - todo!(); + let GetItemOutput { + item: Some(mut current_item), + .. + } = self + .client + .get_item() + .table_name(self.table.as_str()) + .key( + PK, + aws_sdk_dynamodb::types::AttributeValue::S(self.key.clone()), + ) + .send() + .await + .map_err(log_error)? + else { + return Ok(None); + }; + + if let Some(AttributeValue::B(val)) = current_item.remove(VAL) { + let version = if let Some(AttributeValue::N(ver)) = current_item.remove(VER) { + Some(ver) + } else { + Some(String::from("0")) + }; + self.etag.lock().unwrap().clone_from(&version); + Ok(Some(val.into_inner())) + } else { + Ok(None) + } } /// `swap` updates the value for the key using the etag saved in the `current` function for /// optimistic concurrency. async fn swap(&self, value: Vec) -> Result<(), SwapError> { - todo!(); + let mut update_item = self + .client + .update_item() + .table_name(self.table.as_str()) + .key(PK, AttributeValue::S(self.key.clone())) + .update_expression("SET #val=:val, ADD #ver :increment") + .expression_attribute_names("#val", VAL) + .expression_attribute_names("#ver", VER) + .expression_attribute_values(":val", AttributeValue::B(Blob::new(value))) + .expression_attribute_values(":increment", AttributeValue::N("1".to_owned())) + .return_values(aws_sdk_dynamodb::types::ReturnValue::None); + + let current_version = self.etag.lock().unwrap().clone(); + match current_version { + // Existing item with no version key, update under condition that version key still does not exist in Dynamo when operation is executed + Some(version) if version == "0" => { + update_item = update_item.condition_expression("attribute_not_exists(#ver)"); + } + // Existing item with version key, update under condition that version in Dynamo matches stored version + Some(version) => { + update_item = update_item + .condition_expression("#ver = :ver") + .expression_attribute_values(":ver", AttributeValue::N(version)); + } + // Assume new item, insert under condition that item does not already exist + None => { + update_item = update_item + .condition_expression("attribute_not_exists(#pk)") + .expression_attribute_names("#pk", PK); + } + } + + update_item + .send() + .await + .map(|_| ()) + .map_err(|e| SwapError::CasFailed(format!("{e:?}"))) } async fn bucket_rep(&self) -> u32 { @@ -369,7 +433,7 @@ impl AwsDynamoStore { let response = self .client .get_item() - .table_name(&self.table) + .table_name(self.table.as_str()) .key( PK, aws_sdk_dynamodb::types::AttributeValue::S(key.to_string()), @@ -397,7 +461,7 @@ impl AwsDynamoStore { let mut scan_builder = self .client .scan() - .table_name(&self.table) + .table_name(self.table.as_str()) .projection_expression(PK); if let Some(keys) = last_evaluated_key { From 2400dc0dd52126127a92a16b9070a10b10bf54dd Mon Sep 17 00:00:00 2001 From: Darwin Boersma Date: Sun, 10 Nov 2024 13:34:30 -0700 Subject: [PATCH 07/13] Updates from testing Signed-off-by: Darwin Boersma --- crates/key-value-aws/src/store.rs | 60 ++++++++++++++++--------------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/crates/key-value-aws/src/store.rs b/crates/key-value-aws/src/store.rs index 4171eb0cff..70c9f23ed6 100644 --- a/crates/key-value-aws/src/store.rs +++ b/crates/key-value-aws/src/store.rs @@ -8,7 +8,10 @@ use aws_config::{BehaviorVersion, Region, SdkConfig}; use aws_credential_types::Credentials; use aws_sdk_dynamodb::{ config::{ProvideCredentials, SharedCredentialsProvider}, - operation::{batch_get_item::BatchGetItemOutput, get_item::GetItemOutput}, + operation::{ + batch_get_item::BatchGetItemOutput, batch_write_item::BatchWriteItemOutput, + get_item::GetItemOutput, + }, primitives::Blob, types::{AttributeValue, DeleteRequest, KeysAndAttributes, PutRequest, WriteRequest}, Client, @@ -136,7 +139,7 @@ struct CompareAndSwap { client: Client, table: Arc, bucket_rep: u32, - etag: Mutex>, + version: Mutex>, } /// Primary key in DynamoDB items used for querying items @@ -234,7 +237,7 @@ impl Store for AwsDynamoStore { } } - request_items = unprocessed_keys; + request_items = unprocessed_keys.filter(|unprocessed| !unprocessed.is_empty()); } Ok(results) @@ -259,7 +262,9 @@ impl Store for AwsDynamoStore { let mut request_items = Some(HashMap::from_iter([(self.table.to_string(), data)])); while request_items.is_some() { - let results = self + let BatchWriteItemOutput { + unprocessed_items, .. + } = self .client .batch_write_item() .set_request_items(request_items) @@ -267,7 +272,7 @@ impl Store for AwsDynamoStore { .await .map_err(log_error)?; - request_items = results.unprocessed_items; + request_items = unprocessed_items.filter(|unprocessed| !unprocessed.is_empty()); } Ok(()) @@ -291,7 +296,9 @@ impl Store for AwsDynamoStore { let mut request_items = Some(HashMap::from_iter([(self.table.to_string(), data)])); while request_items.is_some() { - let results = self + let BatchWriteItemOutput { + unprocessed_items, .. + } = self .client .batch_write_item() .set_request_items(request_items) @@ -299,7 +306,7 @@ impl Store for AwsDynamoStore { .await .map_err(log_error)?; - request_items = results.unprocessed_items; + request_items = unprocessed_items.filter(|unprocessed| !unprocessed.is_empty()); } Ok(()) @@ -337,7 +344,7 @@ impl Store for AwsDynamoStore { key: key.to_string(), client: self.client.clone(), table: self.table.clone(), - etag: Mutex::new(None), + version: Mutex::new(None), bucket_rep, })) } @@ -365,12 +372,12 @@ impl Cas for CompareAndSwap { }; if let Some(AttributeValue::B(val)) = current_item.remove(VAL) { - let version = if let Some(AttributeValue::N(ver)) = current_item.remove(VER) { - Some(ver) - } else { - Some(String::from("0")) + let version = match current_item.remove(VER) { + Some(AttributeValue::N(ver)) => Some(ver), + _ => None, }; - self.etag.lock().unwrap().clone_from(&version); + + self.version.lock().unwrap().clone_from(&version); Ok(Some(val.into_inner())) } else { Ok(None) @@ -385,38 +392,35 @@ impl Cas for CompareAndSwap { .update_item() .table_name(self.table.as_str()) .key(PK, AttributeValue::S(self.key.clone())) - .update_expression("SET #val=:val, ADD #ver :increment") .expression_attribute_names("#val", VAL) - .expression_attribute_names("#ver", VER) .expression_attribute_values(":val", AttributeValue::B(Blob::new(value))) + .expression_attribute_names("#ver", VER) .expression_attribute_values(":increment", AttributeValue::N("1".to_owned())) - .return_values(aws_sdk_dynamodb::types::ReturnValue::None); + .return_values(aws_sdk_dynamodb::types::ReturnValue::UpdatedNew); - let current_version = self.etag.lock().unwrap().clone(); + let current_version = self.version.lock().unwrap().clone(); match current_version { - // Existing item with no version key, update under condition that version key still does not exist in Dynamo when operation is executed - Some(version) if version == "0" => { - update_item = update_item.condition_expression("attribute_not_exists(#ver)"); - } - // Existing item with version key, update under condition that version in Dynamo matches stored version + // Existing item with version key, update under condition that version in DynamoDB matches stored version (optimistic lock) Some(version) => { update_item = update_item + .update_expression("SET #val=:val ADD #ver :increment") .condition_expression("#ver = :ver") .expression_attribute_values(":ver", AttributeValue::N(version)); } - // Assume new item, insert under condition that item does not already exist + // Assume new/unversioned item, upsert under condition that item does not already have a version -- if it does, another atomic operation has already started None => { update_item = update_item - .condition_expression("attribute_not_exists(#pk)") - .expression_attribute_names("#pk", PK); + .condition_expression("attribute_not_exists(#ver)") + .update_expression("SET #val=:val, #ver=:increment"); } - } + }; update_item .send() .await - .map(|_| ()) - .map_err(|e| SwapError::CasFailed(format!("{e:?}"))) + .map_err(|e| SwapError::CasFailed(format!("{e:?}")))?; + + Ok(()) } async fn bucket_rep(&self) -> u32 { From e805726e6a7572ef1bb418aafa2e806bc53dcfd7 Mon Sep 17 00:00:00 2001 From: Darwin Boersma Date: Sun, 10 Nov 2024 17:31:30 -0700 Subject: [PATCH 08/13] Final updates to ensure better atomicity Signed-off-by: Darwin Boersma --- crates/key-value-aws/src/store.rs | 76 ++++++++++++++++++++----------- 1 file changed, 49 insertions(+), 27 deletions(-) diff --git a/crates/key-value-aws/src/store.rs b/crates/key-value-aws/src/store.rs index 70c9f23ed6..b692788ff0 100644 --- a/crates/key-value-aws/src/store.rs +++ b/crates/key-value-aws/src/store.rs @@ -8,12 +8,12 @@ use aws_config::{BehaviorVersion, Region, SdkConfig}; use aws_credential_types::Credentials; use aws_sdk_dynamodb::{ config::{ProvideCredentials, SharedCredentialsProvider}, - operation::{ - batch_get_item::BatchGetItemOutput, batch_write_item::BatchWriteItemOutput, - get_item::GetItemOutput, - }, + operation::{batch_get_item::BatchGetItemOutput, batch_write_item::BatchWriteItemOutput}, primitives::Blob, - types::{AttributeValue, DeleteRequest, KeysAndAttributes, PutRequest, WriteRequest}, + types::{ + AttributeValue, DeleteRequest, Get, KeysAndAttributes, PutRequest, TransactGetItem, + TransactWriteItem, Update, WriteRequest, + }, Client, }; use spin_core::async_trait; @@ -353,21 +353,34 @@ impl Store for AwsDynamoStore { #[async_trait] impl Cas for CompareAndSwap { async fn current(&self) -> Result>, Error> { - let GetItemOutput { - item: Some(mut current_item), - .. - } = self + // TransactGetItems fails if concurrent writes are in progress on an item + let output = self .client - .get_item() - .table_name(self.table.as_str()) - .key( - PK, - aws_sdk_dynamodb::types::AttributeValue::S(self.key.clone()), + .transact_get_items() + .transact_items( + TransactGetItem::builder() + .get( + Get::builder() + .table_name(self.table.as_str()) + .key( + PK, + aws_sdk_dynamodb::types::AttributeValue::S(self.key.clone()), + ) + .build() + .map_err(log_error)?, + ) + .build(), ) .send() .await - .map_err(log_error)? - else { + .map_err(log_error)?; + + let item = output + .responses + .and_then(|responses| responses.into_iter().next()) + .and_then(|response| response.item); + + let Some(mut current_item) = item else { return Ok(None); }; @@ -384,38 +397,47 @@ impl Cas for CompareAndSwap { } } - /// `swap` updates the value for the key using the etag saved in the `current` function for + /// `swap` updates the value for the key using the version saved in the `current` function for /// optimistic concurrency. async fn swap(&self, value: Vec) -> Result<(), SwapError> { - let mut update_item = self - .client - .update_item() + let mut update_item = Update::builder() .table_name(self.table.as_str()) .key(PK, AttributeValue::S(self.key.clone())) .expression_attribute_names("#val", VAL) .expression_attribute_values(":val", AttributeValue::B(Blob::new(value))) .expression_attribute_names("#ver", VER) .expression_attribute_values(":increment", AttributeValue::N("1".to_owned())) - .return_values(aws_sdk_dynamodb::types::ReturnValue::UpdatedNew); + .return_values_on_condition_check_failure( + aws_sdk_dynamodb::types::ReturnValuesOnConditionCheckFailure::None, + ); let current_version = self.version.lock().unwrap().clone(); match current_version { - // Existing item with version key, update under condition that version in DynamoDB matches stored version (optimistic lock) + // Existing item with version, update under condition that version in DynamoDB matches cached version Some(version) => { update_item = update_item .update_expression("SET #val=:val ADD #ver :increment") .condition_expression("#ver = :ver") .expression_attribute_values(":ver", AttributeValue::N(version)); } - // Assume new/unversioned item, upsert under condition that item does not already have a version -- if it does, another atomic operation has already started + // New/unversioned item, upsert atomically but without optimistic locking guarantee None => { - update_item = update_item - .condition_expression("attribute_not_exists(#ver)") - .update_expression("SET #val=:val, #ver=:increment"); + update_item = update_item.update_expression("SET #val=:val, #ver=:increment"); } }; - update_item + // TransactWriteItems fails if concurrent writes are in progress on an item. + self.client + .transact_write_items() + .transact_items( + TransactWriteItem::builder() + .update( + update_item + .build() + .map_err(|e| SwapError::Other(format!("{e:?}")))?, + ) + .build(), + ) .send() .await .map_err(|e| SwapError::CasFailed(format!("{e:?}")))?; From de4f334d7423c2f0f851077d50499806fbf92342 Mon Sep 17 00:00:00 2001 From: Darwin Boersma Date: Mon, 11 Nov 2024 21:08:45 -0700 Subject: [PATCH 09/13] Removed arcs, adjusted atomic functions to use updateItem, minimized returned values for getItem calls Signed-off-by: Darwin Boersma --- crates/key-value-aws/src/store.rs | 174 +++++++++++++--------------- crates/key-value-azure/src/store.rs | 4 +- 2 files changed, 79 insertions(+), 99 deletions(-) diff --git a/crates/key-value-aws/src/store.rs b/crates/key-value-aws/src/store.rs index b692788ff0..f21262a9bc 100644 --- a/crates/key-value-aws/src/store.rs +++ b/crates/key-value-aws/src/store.rs @@ -8,11 +8,14 @@ use aws_config::{BehaviorVersion, Region, SdkConfig}; use aws_credential_types::Credentials; use aws_sdk_dynamodb::{ config::{ProvideCredentials, SharedCredentialsProvider}, - operation::{batch_get_item::BatchGetItemOutput, batch_write_item::BatchWriteItemOutput}, + operation::{ + batch_get_item::BatchGetItemOutput, batch_write_item::BatchWriteItemOutput, + get_item::GetItemOutput, update_item::UpdateItemOutput, + }, primitives::Blob, types::{ - AttributeValue, DeleteRequest, Get, KeysAndAttributes, PutRequest, TransactGetItem, - TransactWriteItem, Update, WriteRequest, + AttributeValue, DeleteRequest, KeysAndAttributes, PutRequest, TransactWriteItem, Update, + WriteRequest, }, Client, }; @@ -20,8 +23,9 @@ use spin_core::async_trait; use spin_factor_key_value::{log_error, Cas, Error, Store, StoreManager, SwapError}; pub struct KeyValueAwsDynamo { + region: String, + // Needs to be cloned when getting a store table: Arc, - region: Arc, client: async_once_cell::Lazy< Client, std::pin::Pin + Send>>, @@ -99,8 +103,8 @@ impl KeyValueAwsDynamo { }); Ok(Self { + region, table: Arc::new(table), - region: Arc::new(region), client: async_once_cell::Lazy::from_future(client_fut), }) } @@ -108,9 +112,8 @@ impl KeyValueAwsDynamo { #[async_trait] impl StoreManager for KeyValueAwsDynamo { - async fn get(&self, name: &str) -> Result, Error> { + async fn get(&self, _name: &str) -> Result, Error> { Ok(Arc::new(AwsDynamoStore { - _name: name.to_owned(), client: self.client.get_unpin().await.clone(), table: self.table.clone(), })) @@ -122,14 +125,14 @@ impl StoreManager for KeyValueAwsDynamo { fn summary(&self, _store_name: &str) -> Option { Some(format!( - "AWS DynamoDB region: {:?}, table: {}", + "AWS DynamoDB region: {}, table: {}", self.region, self.table )) } } struct AwsDynamoStore { - _name: String, + // Client wraps an Arc so should be low cost to clone client: Client, table: Arc, } @@ -139,20 +142,40 @@ struct CompareAndSwap { client: Client, table: Arc, bucket_rep: u32, - version: Mutex>, + has_lock: Mutex, } /// Primary key in DynamoDB items used for querying items const PK: &str = "PK"; /// Value key in DynamoDB items storing item value as binary const VAL: &str = "val"; -/// Version key in DynamoDB items used for optimistic locking -const VER: &str = "ver"; +/// Lock key in DynamoDB items used for atomic operations +const LOCK: &str = "lock"; #[async_trait] impl Store for AwsDynamoStore { async fn get(&self, key: &str) -> Result>, Error> { - let item = self.get_item(key).await?; + let response = self + .client + .get_item() + .table_name(self.table.as_str()) + .key( + PK, + aws_sdk_dynamodb::types::AttributeValue::S(key.to_string()), + ) + .projection_expression(VAL) + .send() + .await + .map_err(log_error)?; + + let item = response.item.and_then(|mut item| { + if let Some(AttributeValue::B(val)) = item.remove(VAL) { + Some(val.into_inner()) + } else { + None + } + }); + Ok(item) } @@ -182,7 +205,20 @@ impl Store for AwsDynamoStore { } async fn exists(&self, key: &str) -> Result { - Ok(self.get_item(key).await?.is_some()) + let GetItemOutput { item, .. } = self + .client + .get_item() + .table_name(self.table.as_str()) + .key( + PK, + aws_sdk_dynamodb::types::AttributeValue::S(key.to_string()), + ) + .projection_expression(PK) + .send() + .await + .map_err(log_error)?; + + Ok(item.map(|item| item.contains_key(PK)).unwrap_or(false)) } async fn get_keys(&self) -> Result, Error> { @@ -192,7 +228,8 @@ impl Store for AwsDynamoStore { async fn get_many(&self, keys: Vec) -> Result>)>, Error> { let mut results = Vec::with_capacity(keys.len()); - let mut keys_and_attributes_builder = KeysAndAttributes::builder(); + let mut keys_and_attributes_builder = + KeysAndAttributes::builder().projection_expression(format!("{PK},{VAL}")); for key in keys { keys_and_attributes_builder = keys_and_attributes_builder.keys(HashMap::from_iter([( PK.to_owned(), @@ -344,7 +381,7 @@ impl Store for AwsDynamoStore { key: key.to_string(), client: self.client.clone(), table: self.table.clone(), - version: Mutex::new(None), + has_lock: Mutex::new(false), bucket_rep, })) } @@ -353,47 +390,28 @@ impl Store for AwsDynamoStore { #[async_trait] impl Cas for CompareAndSwap { async fn current(&self) -> Result>, Error> { - // TransactGetItems fails if concurrent writes are in progress on an item - let output = self + let UpdateItemOutput { attributes, .. } = self .client - .transact_get_items() - .transact_items( - TransactGetItem::builder() - .get( - Get::builder() - .table_name(self.table.as_str()) - .key( - PK, - aws_sdk_dynamodb::types::AttributeValue::S(self.key.clone()), - ) - .build() - .map_err(log_error)?, - ) - .build(), - ) + .update_item() + .table_name(self.table.as_str()) + .key(PK, AttributeValue::S(self.key.clone())) + .update_expression("SET #lock=:lock") + .expression_attribute_names("#lock", LOCK) + .expression_attribute_values(":lock", AttributeValue::Null(true)) + .condition_expression("attribute_not_exists (#lock)") + .return_values(aws_sdk_dynamodb::types::ReturnValue::AllNew) .send() .await .map_err(log_error)?; - let item = output - .responses - .and_then(|responses| responses.into_iter().next()) - .and_then(|response| response.item); - - let Some(mut current_item) = item else { - return Ok(None); - }; - - if let Some(AttributeValue::B(val)) = current_item.remove(VAL) { - let version = match current_item.remove(VER) { - Some(AttributeValue::N(ver)) => Some(ver), - _ => None, - }; + self.has_lock.lock().unwrap().clone_from(&true); - self.version.lock().unwrap().clone_from(&version); - Ok(Some(val.into_inner())) - } else { - Ok(None) + match attributes { + Some(mut item) => match item.remove(VAL) { + Some(AttributeValue::B(val)) => Ok(Some(val.into_inner())), + _ => Ok(None), + }, + None => Ok(None), } } @@ -403,30 +421,18 @@ impl Cas for CompareAndSwap { let mut update_item = Update::builder() .table_name(self.table.as_str()) .key(PK, AttributeValue::S(self.key.clone())) + .update_expression("SET #val=:val REMOVE #lock") .expression_attribute_names("#val", VAL) .expression_attribute_values(":val", AttributeValue::B(Blob::new(value))) - .expression_attribute_names("#ver", VER) - .expression_attribute_values(":increment", AttributeValue::N("1".to_owned())) - .return_values_on_condition_check_failure( - aws_sdk_dynamodb::types::ReturnValuesOnConditionCheckFailure::None, - ); - - let current_version = self.version.lock().unwrap().clone(); - match current_version { - // Existing item with version, update under condition that version in DynamoDB matches cached version - Some(version) => { - update_item = update_item - .update_expression("SET #val=:val ADD #ver :increment") - .condition_expression("#ver = :ver") - .expression_attribute_values(":ver", AttributeValue::N(version)); - } - // New/unversioned item, upsert atomically but without optimistic locking guarantee - None => { - update_item = update_item.update_expression("SET #val=:val, #ver=:increment"); - } - }; + .expression_attribute_names("#lock", LOCK); + + let has_lock = *self.has_lock.lock().unwrap(); + // Ensure exclusive access between fetching the current value of the item and swapping + if has_lock { + update_item = update_item.condition_expression("attribute_exists (#lock)"); + } - // TransactWriteItems fails if concurrent writes are in progress on an item. + // TransactWriteItems fails if concurrent writes are in progress on an item, so even without locking, we get atomicity in overwriting self.client .transact_write_items() .transact_items( @@ -455,30 +461,6 @@ impl Cas for CompareAndSwap { } impl AwsDynamoStore { - async fn get_item(&self, key: &str) -> Result>, Error> { - let response = self - .client - .get_item() - .table_name(self.table.as_str()) - .key( - PK, - aws_sdk_dynamodb::types::AttributeValue::S(key.to_string()), - ) - .send() - .await - .map_err(log_error)?; - - let val = response.item.and_then(|mut item| { - if let Some(AttributeValue::B(val)) = item.remove(VAL) { - Some(val.into_inner()) - } else { - None - } - }); - - Ok(val) - } - async fn get_keys(&self) -> Result, Error> { let mut primary_keys = Vec::new(); let mut last_evaluated_key = None; diff --git a/crates/key-value-azure/src/store.rs b/crates/key-value-azure/src/store.rs index 258934e4df..001864ea77 100644 --- a/crates/key-value-azure/src/store.rs +++ b/crates/key-value-azure/src/store.rs @@ -92,9 +92,8 @@ impl KeyValueAzureCosmos { #[async_trait] impl StoreManager for KeyValueAzureCosmos { - async fn get(&self, name: &str) -> Result, Error> { + async fn get(&self, _name: &str) -> Result, Error> { Ok(Arc::new(AzureCosmosStore { - _name: name.to_owned(), client: self.client.clone(), })) } @@ -114,7 +113,6 @@ impl StoreManager for KeyValueAzureCosmos { #[derive(Clone)] struct AzureCosmosStore { - _name: String, client: CollectionClient, } From ed7299fecd24fa532cd7c5a8b5d451cd1f3baa41 Mon Sep 17 00:00:00 2001 From: Darwin Boersma Date: Tue, 12 Nov 2024 21:37:39 -0700 Subject: [PATCH 10/13] enum for CAS states, use paginator, add configuration for strong consistency Signed-off-by: Darwin Boersma --- crates/key-value-aws/src/lib.rs | 11 +- crates/key-value-aws/src/store.rs | 209 +++++++++++++++++------------- 2 files changed, 129 insertions(+), 91 deletions(-) diff --git a/crates/key-value-aws/src/lib.rs b/crates/key-value-aws/src/lib.rs index a0305be668..85775975d6 100644 --- a/crates/key-value-aws/src/lib.rs +++ b/crates/key-value-aws/src/lib.rs @@ -30,6 +30,9 @@ pub struct AwsDynamoKeyValueRuntimeConfig { token: Option, /// The AWS region where the database is located region: String, + /// Boolean determining whether to use strongly consistent reads. + /// Defaults to `false` but can be set to `true` to improve atomicity + consistent_read: Option, /// The AWS Dynamo DB table. table: String, } @@ -50,6 +53,7 @@ impl MakeKeyValueStore for AwsDynamoKeyValueStore { secret_key, token, region, + consistent_read, table, } = runtime_config; let auth_options = match (access_key, secret_key) { @@ -60,6 +64,11 @@ impl MakeKeyValueStore for AwsDynamoKeyValueStore { } _ => KeyValueAwsDynamoAuthOptions::Environmental, }; - KeyValueAwsDynamo::new(region, table, auth_options) + KeyValueAwsDynamo::new( + region, + consistent_read.unwrap_or(false), + table, + auth_options, + ) } } diff --git a/crates/key-value-aws/src/store.rs b/crates/key-value-aws/src/store.rs index f21262a9bc..96a6e33828 100644 --- a/crates/key-value-aws/src/store.rs +++ b/crates/key-value-aws/src/store.rs @@ -10,22 +10,23 @@ use aws_sdk_dynamodb::{ config::{ProvideCredentials, SharedCredentialsProvider}, operation::{ batch_get_item::BatchGetItemOutput, batch_write_item::BatchWriteItemOutput, - get_item::GetItemOutput, update_item::UpdateItemOutput, + get_item::GetItemOutput, }, primitives::Blob, - types::{ - AttributeValue, DeleteRequest, KeysAndAttributes, PutRequest, TransactWriteItem, Update, - WriteRequest, - }, + types::{AttributeValue, DeleteRequest, KeysAndAttributes, PutRequest, WriteRequest}, Client, }; use spin_core::async_trait; use spin_factor_key_value::{log_error, Cas, Error, Store, StoreManager, SwapError}; pub struct KeyValueAwsDynamo { + /// AWS region region: String, - // Needs to be cloned when getting a store + /// Whether to use strongly consistent reads + consistent_read: bool, + /// DynamoDB table, needs to be cloned when getting a store table: Arc, + /// DynamoDB client client: async_once_cell::Lazy< Client, std::pin::Pin + Send>>, @@ -84,6 +85,7 @@ pub enum KeyValueAwsDynamoAuthOptions { impl KeyValueAwsDynamo { pub fn new( region: String, + consistent_read: bool, table: String, auth_options: KeyValueAwsDynamoAuthOptions, ) -> Result { @@ -104,6 +106,7 @@ impl KeyValueAwsDynamo { Ok(Self { region, + consistent_read, table: Arc::new(table), client: async_once_cell::Lazy::from_future(client_fut), }) @@ -116,6 +119,7 @@ impl StoreManager for KeyValueAwsDynamo { Ok(Arc::new(AwsDynamoStore { client: self.client.get_unpin().await.clone(), table: self.table.clone(), + consistent_read: self.consistent_read, })) } @@ -135,6 +139,19 @@ struct AwsDynamoStore { // Client wraps an Arc so should be low cost to clone client: Client, table: Arc, + consistent_read: bool, +} + +#[derive(Debug, Clone)] +enum CasState { + // Existing item with version + Versioned(String), + // Existing item without version + Unversioned(Blob), + // Item was null when fetched during `current` + Unset, + // Potentially new item -- `current` was never called to fetch version + Unknown, } struct CompareAndSwap { @@ -142,15 +159,15 @@ struct CompareAndSwap { client: Client, table: Arc, bucket_rep: u32, - has_lock: Mutex, + state: Mutex, } /// Primary key in DynamoDB items used for querying items const PK: &str = "PK"; /// Value key in DynamoDB items storing item value as binary -const VAL: &str = "val"; -/// Lock key in DynamoDB items used for atomic operations -const LOCK: &str = "lock"; +const VAL: &str = "VAL"; +/// Version key in DynamoDB items used for atomic operations +const VER: &str = "VER"; #[async_trait] impl Store for AwsDynamoStore { @@ -158,6 +175,7 @@ impl Store for AwsDynamoStore { let response = self .client .get_item() + .consistent_read(self.consistent_read) .table_name(self.table.as_str()) .key( PK, @@ -208,6 +226,7 @@ impl Store for AwsDynamoStore { let GetItemOutput { item, .. } = self .client .get_item() + .consistent_read(self.consistent_read) .table_name(self.table.as_str()) .key( PK, @@ -228,8 +247,13 @@ impl Store for AwsDynamoStore { async fn get_many(&self, keys: Vec) -> Result>)>, Error> { let mut results = Vec::with_capacity(keys.len()); - let mut keys_and_attributes_builder = - KeysAndAttributes::builder().projection_expression(format!("{PK},{VAL}")); + if keys.is_empty() { + return Ok(results); + } + + let mut keys_and_attributes_builder = KeysAndAttributes::builder() + .projection_expression(format!("{PK},{VAL}")) + .consistent_read(self.consistent_read); for key in keys { keys_and_attributes_builder = keys_and_attributes_builder.keys(HashMap::from_iter([( PK.to_owned(), @@ -243,7 +267,7 @@ impl Store for AwsDynamoStore { while request_items.is_some() { let BatchGetItemOutput { - responses: Some(mut responses), + responses, unprocessed_keys, .. } = self @@ -252,25 +276,21 @@ impl Store for AwsDynamoStore { .set_request_items(request_items) .send() .await - .map_err(log_error)? - else { - return Err(Error::Other("No results".into())); - }; + .map_err(log_error)?; - if let Some(items) = responses.remove(self.table.as_str()) { + if let Some(items) = + responses.and_then(|mut responses| responses.remove(self.table.as_str())) + { for mut item in items { - let Some(AttributeValue::S(pk)) = item.remove(PK) else { - return Err(Error::Other( - "Could not find 'PK' key on DynamoDB item".into(), - )); - }; - let Some(AttributeValue::B(val)) = item.remove(VAL) else { - return Err(Error::Other( - "Could not find 'val' key on DynamoDB item".into(), - )); - }; - - results.push((pk, Some(val.into_inner()))); + match (item.remove(PK), item.remove(VAL)) { + (Some(AttributeValue::S(pk)), Some(AttributeValue::B(val))) => { + results.push((pk, Some(val.into_inner()))); + } + (Some(AttributeValue::S(pk)), None) => { + results.push((pk, None)); + } + _ => (), + } } } @@ -355,8 +375,8 @@ impl Store for AwsDynamoStore { .update_item() .table_name(self.table.as_str()) .key(PK, AttributeValue::S(key)) - .update_expression("ADD #val :delta") - .expression_attribute_names("#val", VAL) + .update_expression("ADD #VAL :delta") + .expression_attribute_names("#VAL", VAL) .expression_attribute_values(":delta", AttributeValue::N(delta.to_string())) .return_values(aws_sdk_dynamodb::types::ReturnValue::UpdatedNew) .send() @@ -381,7 +401,7 @@ impl Store for AwsDynamoStore { key: key.to_string(), client: self.client.clone(), table: self.table.clone(), - has_lock: Mutex::new(false), + state: Mutex::new(CasState::Unknown), bucket_rep, })) } @@ -390,60 +410,80 @@ impl Store for AwsDynamoStore { #[async_trait] impl Cas for CompareAndSwap { async fn current(&self) -> Result>, Error> { - let UpdateItemOutput { attributes, .. } = self + let GetItemOutput { item, .. } = self .client - .update_item() + .get_item() + .consistent_read(true) .table_name(self.table.as_str()) .key(PK, AttributeValue::S(self.key.clone())) - .update_expression("SET #lock=:lock") - .expression_attribute_names("#lock", LOCK) - .expression_attribute_values(":lock", AttributeValue::Null(true)) - .condition_expression("attribute_not_exists (#lock)") - .return_values(aws_sdk_dynamodb::types::ReturnValue::AllNew) + .projection_expression(format!("{VAL},{VER}")) .send() .await .map_err(log_error)?; - self.has_lock.lock().unwrap().clone_from(&true); + match item { + Some(mut current_item) => match (current_item.remove(VAL), current_item.remove(VER)) { + (Some(AttributeValue::B(val)), Some(AttributeValue::N(ver))) => { + self.state + .lock() + .unwrap() + .clone_from(&CasState::Versioned(ver)); + + Ok(Some(val.into_inner())) + } + (Some(AttributeValue::B(val)), _) => { + self.state + .lock() + .unwrap() + .clone_from(&CasState::Unversioned(val.clone())); - match attributes { - Some(mut item) => match item.remove(VAL) { - Some(AttributeValue::B(val)) => Ok(Some(val.into_inner())), - _ => Ok(None), + Ok(Some(val.into_inner())) + } + (_, _) => { + self.state.lock().unwrap().clone_from(&CasState::Unset); + Ok(None) + } }, - None => Ok(None), + None => { + self.state.lock().unwrap().clone_from(&CasState::Unset); + Ok(None) + } } } - /// `swap` updates the value for the key using the version saved in the `current` function for - /// optimistic concurrency. + /// `swap` updates the value for the key -- if possible, using the version saved in the `current` function for + /// optimistic concurrency or the previous item value async fn swap(&self, value: Vec) -> Result<(), SwapError> { - let mut update_item = Update::builder() + let mut update_item = self + .client + .update_item() .table_name(self.table.as_str()) .key(PK, AttributeValue::S(self.key.clone())) - .update_expression("SET #val=:val REMOVE #lock") - .expression_attribute_names("#val", VAL) + .update_expression("SET #VAL = :val ADD #VER :increment") + .expression_attribute_names("#VAL", VAL) + .expression_attribute_names("#VER", VER) .expression_attribute_values(":val", AttributeValue::B(Blob::new(value))) - .expression_attribute_names("#lock", LOCK); - - let has_lock = *self.has_lock.lock().unwrap(); - // Ensure exclusive access between fetching the current value of the item and swapping - if has_lock { - update_item = update_item.condition_expression("attribute_exists (#lock)"); - } + .expression_attribute_values(":increment", AttributeValue::N("1".to_owned())); + + let state = self.state.lock().unwrap().clone(); + match state { + CasState::Versioned(version) => { + update_item = update_item + .condition_expression("#VER = :ver") + .expression_attribute_values(":ver", AttributeValue::N(version)); + } + CasState::Unversioned(old_val) => { + update_item = update_item + .condition_expression("#VAL = :old_val") + .expression_attribute_values(":old_val", AttributeValue::B(old_val)); + } + CasState::Unset => { + update_item = update_item.condition_expression("attribute_not_exists (#VAL)"); + } + CasState::Unknown => (), + }; - // TransactWriteItems fails if concurrent writes are in progress on an item, so even without locking, we get atomicity in overwriting - self.client - .transact_write_items() - .transact_items( - TransactWriteItem::builder() - .update( - update_item - .build() - .map_err(|e| SwapError::Other(format!("{e:?}")))?, - ) - .build(), - ) + update_item .send() .await .map_err(|e| SwapError::CasFailed(format!("{e:?}")))?; @@ -463,23 +503,17 @@ impl Cas for CompareAndSwap { impl AwsDynamoStore { async fn get_keys(&self) -> Result, Error> { let mut primary_keys = Vec::new(); - let mut last_evaluated_key = None; - loop { - let mut scan_builder = self - .client - .scan() - .table_name(self.table.as_str()) - .projection_expression(PK); - - if let Some(keys) = last_evaluated_key { - for (key, val) in keys { - scan_builder = scan_builder.exclusive_start_key(key, val); - } - } - - let scan_output = scan_builder.send().await.map_err(log_error)?; + let mut scan_paginator = self + .client + .scan() + .table_name(self.table.as_str()) + .projection_expression(PK) + .into_paginator() + .send(); + while let Some(output) = scan_paginator.next().await { + let scan_output = output.map_err(log_error)?; if let Some(items) = scan_output.items { for mut item in items { if let Some(AttributeValue::S(pk)) = item.remove(PK) { @@ -487,11 +521,6 @@ impl AwsDynamoStore { } } } - - last_evaluated_key = scan_output.last_evaluated_key; - if last_evaluated_key.is_none() { - break; - } } Ok(primary_keys) From 1cc913cc6b2a78d13f66e6e3ba8b8e7901d7df1b Mon Sep 17 00:00:00 2001 From: Darwin Boersma Date: Wed, 13 Nov 2024 14:56:41 -0700 Subject: [PATCH 11/13] Use transaction in increment and swap for better atomicity, remove unneeded exists check, higher level filtering of empty get_all queries, sqlite handle null value before swap Signed-off-by: Darwin Boersma --- crates/factor-key-value/src/host.rs | 9 ++ crates/factor-key-value/src/util.rs | 10 +- crates/key-value-aws/src/store.rs | 165 +++++++++++++++++----------- crates/key-value-spin/src/store.rs | 43 +++++--- 4 files changed, 146 insertions(+), 81 deletions(-) diff --git a/crates/factor-key-value/src/host.rs b/crates/factor-key-value/src/host.rs index 60d08b509d..efb473fb1b 100644 --- a/crates/factor-key-value/src/host.rs +++ b/crates/factor-key-value/src/host.rs @@ -283,6 +283,9 @@ impl wasi_keyvalue::batch::Host for KeyValueDispatch { keys: Vec, ) -> std::result::Result>)>, wasi_keyvalue::store::Error> { let store = self.get_store_wasi(bucket)?; + if keys.is_empty() { + return Ok(vec![]); + } store.get_many(keys).await.map_err(to_wasi_err) } @@ -293,6 +296,9 @@ impl wasi_keyvalue::batch::Host for KeyValueDispatch { key_values: Vec<(String, Vec)>, ) -> std::result::Result<(), wasi_keyvalue::store::Error> { let store = self.get_store_wasi(bucket)?; + if key_values.is_empty() { + return Ok(()); + } store.set_many(key_values).await.map_err(to_wasi_err) } @@ -303,6 +309,9 @@ impl wasi_keyvalue::batch::Host for KeyValueDispatch { keys: Vec, ) -> std::result::Result<(), wasi_keyvalue::store::Error> { let store = self.get_store_wasi(bucket)?; + if keys.is_empty() { + return Ok(()); + } store.delete_many(keys).await.map_err(to_wasi_err) } } diff --git a/crates/factor-key-value/src/util.rs b/crates/factor-key-value/src/util.rs index cea72c92b6..82dbb59611 100644 --- a/crates/factor-key-value/src/util.rs +++ b/crates/factor-key-value/src/util.rs @@ -260,10 +260,12 @@ impl Store for CachingStore { } } - let keys_and_values = self.inner.get_many(not_found).await?; - for (key, value) in keys_and_values { - found.push((key.clone(), value.clone())); - state.cache.put(key, value); + if !not_found.is_empty() { + let keys_and_values = self.inner.get_many(not_found).await?; + for (key, value) in keys_and_values { + found.push((key.clone(), value.clone())); + state.cache.put(key, value); + } } Ok(found) diff --git a/crates/key-value-aws/src/store.rs b/crates/key-value-aws/src/store.rs index 96a6e33828..ed38e1e0b2 100644 --- a/crates/key-value-aws/src/store.rs +++ b/crates/key-value-aws/src/store.rs @@ -1,3 +1,4 @@ +use core::str; use std::{ collections::HashMap, sync::{Arc, Mutex}, @@ -13,7 +14,10 @@ use aws_sdk_dynamodb::{ get_item::GetItemOutput, }, primitives::Blob, - types::{AttributeValue, DeleteRequest, KeysAndAttributes, PutRequest, WriteRequest}, + types::{ + AttributeValue, DeleteRequest, KeysAndAttributes, PutRequest, TransactWriteItem, Update, + WriteRequest, + }, Client, }; use spin_core::async_trait; @@ -148,7 +152,7 @@ enum CasState { Versioned(String), // Existing item without version Unversioned(Blob), - // Item was null when fetched during `current` + // Item was missing when fetched during `current`, expected to be new Unset, // Potentially new item -- `current` was never called to fetch version Unknown, @@ -210,15 +214,13 @@ impl Store for AwsDynamoStore { } async fn delete(&self, key: &str) -> Result<(), Error> { - if self.exists(key).await? { - self.client - .delete_item() - .table_name(self.table.as_str()) - .key(PK, AttributeValue::S(key.to_string())) - .send() - .await - .map_err(log_error)?; - } + self.client + .delete_item() + .table_name(self.table.as_str()) + .key(PK, AttributeValue::S(key.to_string())) + .send() + .await + .map_err(log_error)?; Ok(()) } @@ -241,16 +243,32 @@ impl Store for AwsDynamoStore { } async fn get_keys(&self) -> Result, Error> { - self.get_keys().await - } + let mut primary_keys = Vec::new(); - async fn get_many(&self, keys: Vec) -> Result>)>, Error> { - let mut results = Vec::with_capacity(keys.len()); + let mut scan_paginator = self + .client + .scan() + .table_name(self.table.as_str()) + .projection_expression(PK) + .into_paginator() + .send(); - if keys.is_empty() { - return Ok(results); + while let Some(output) = scan_paginator.next().await { + let scan_output = output.map_err(log_error)?; + if let Some(items) = scan_output.items { + for mut item in items { + if let Some(AttributeValue::S(pk)) = item.remove(PK) { + primary_keys.push(pk); + } + } + } } + Ok(primary_keys) + } + + async fn get_many(&self, keys: Vec) -> Result>)>, Error> { + let mut results = Vec::with_capacity(keys.len()); let mut keys_and_attributes_builder = KeysAndAttributes::builder() .projection_expression(format!("{PK},{VAL}")) .consistent_read(self.consistent_read); @@ -370,26 +388,66 @@ impl Store for AwsDynamoStore { } async fn increment(&self, key: String, delta: i64) -> Result { - let result = self + let GetItemOutput { item, .. } = self .client - .update_item() + .get_item() + .consistent_read(true) .table_name(self.table.as_str()) - .key(PK, AttributeValue::S(key)) - .update_expression("ADD #VAL :delta") - .expression_attribute_names("#VAL", VAL) - .expression_attribute_values(":delta", AttributeValue::N(delta.to_string())) - .return_values(aws_sdk_dynamodb::types::ReturnValue::UpdatedNew) + .key(PK, AttributeValue::S(key.clone())) + .projection_expression(VAL) .send() .await .map_err(log_error)?; - if let Some(updated_attributes) = result.attributes { - if let Some(AttributeValue::N(new_value)) = updated_attributes.get(VAL) { - return Ok(new_value.parse::().map_err(log_error))?; - } + let old_val = match item { + Some(mut current_item) => match current_item.remove(VAL) { + // We're expecting i64, so technically we could transmute but seems risky... + Some(AttributeValue::B(val)) => Some( + str::from_utf8(&val.into_inner()) + .map_err(log_error)? + .parse::() + .map_err(log_error)?, + ), + _ => None, + }, + None => None, + }; + + let new_val = old_val.unwrap_or(0) + delta; + + let mut update = Update::builder() + .table_name(self.table.as_str()) + .key(PK, AttributeValue::S(key)) + .update_expression("SET #VAL = :new_val") + .expression_attribute_names("#VAL", VAL) + .expression_attribute_values( + ":new_val", + AttributeValue::B(Blob::new(new_val.to_string().as_bytes())), + ); + + if let Some(old_val) = old_val { + update = update + .condition_expression("#VAL = :old_val") + .expression_attribute_values( + ":old_val", + AttributeValue::B(Blob::new(old_val.to_string().as_bytes())), + ) + } else { + update = update.condition_expression("attribute_not_exists (#VAL)") } - Err(Error::Other("Failed to increment value".into())) + self.client + .transact_write_items() + .transact_items( + TransactWriteItem::builder() + .update(update.build().map_err(log_error)?) + .build(), + ) + .send() + .await + .map_err(log_error)?; + + Ok(new_val) } async fn new_compare_and_swap( @@ -454,9 +512,7 @@ impl Cas for CompareAndSwap { /// `swap` updates the value for the key -- if possible, using the version saved in the `current` function for /// optimistic concurrency or the previous item value async fn swap(&self, value: Vec) -> Result<(), SwapError> { - let mut update_item = self - .client - .update_item() + let mut update = Update::builder() .table_name(self.table.as_str()) .key(PK, AttributeValue::S(self.key.clone())) .update_expression("SET #VAL = :val ADD #VER :increment") @@ -468,22 +524,32 @@ impl Cas for CompareAndSwap { let state = self.state.lock().unwrap().clone(); match state { CasState::Versioned(version) => { - update_item = update_item + update = update .condition_expression("#VER = :ver") .expression_attribute_values(":ver", AttributeValue::N(version)); } CasState::Unversioned(old_val) => { - update_item = update_item + update = update .condition_expression("#VAL = :old_val") .expression_attribute_values(":old_val", AttributeValue::B(old_val)); } CasState::Unset => { - update_item = update_item.condition_expression("attribute_not_exists (#VAL)"); + update = update.condition_expression("attribute_not_exists (#VAL)"); } CasState::Unknown => (), }; - update_item + self.client + .transact_write_items() + .transact_items( + TransactWriteItem::builder() + .update( + update + .build() + .map_err(|e| SwapError::Other(format!("{e:?}")))?, + ) + .build(), + ) .send() .await .map_err(|e| SwapError::CasFailed(format!("{e:?}")))?; @@ -499,30 +565,3 @@ impl Cas for CompareAndSwap { self.key.clone() } } - -impl AwsDynamoStore { - async fn get_keys(&self) -> Result, Error> { - let mut primary_keys = Vec::new(); - - let mut scan_paginator = self - .client - .scan() - .table_name(self.table.as_str()) - .projection_expression(PK) - .into_paginator() - .send(); - - while let Some(output) = scan_paginator.next().await { - let scan_output = output.map_err(log_error)?; - if let Some(items) = scan_output.items { - for mut item in items { - if let Some(AttributeValue::S(pk)) = item.remove(PK) { - primary_keys.push(pk); - } - } - } - } - - Ok(primary_keys) - } -} diff --git a/crates/key-value-spin/src/store.rs b/crates/key-value-spin/src/store.rs index f18b60f7b7..7c3e50101d 100644 --- a/crates/key-value-spin/src/store.rs +++ b/crates/key-value-spin/src/store.rs @@ -307,20 +307,35 @@ impl Cas for CompareAndSwap { async fn swap(&self, value: Vec) -> Result<(), SwapError> { task::block_in_place(|| { let old_value = self.value.lock().unwrap(); - let rows_changed = self.connection - .lock() - .unwrap() - .prepare_cached( - "UPDATE spin_key_value SET value=:new_value WHERE store=:name and key=:key and value=:old_value", - ) - .map_err(log_cas_error)? - .execute(named_params! { - ":name": &self.name, - ":key": self.key, - ":old_value": old_value.clone().unwrap(), - ":new_value": value, - }) - .map_err(log_cas_error)?; + let mut conn = self.connection.lock().unwrap(); + let rows_changed = match old_value.clone() { + Some(old_val) => { + conn + .prepare_cached( + "UPDATE spin_key_value SET value=:new_value WHERE store=:name and key=:key and value=:old_value") + .map_err(log_cas_error)? + .execute(named_params! { + ":name": &self.name, + ":key": self.key, + ":old_value": old_val, + ":new_value": value, + }) + .map_err(log_cas_error)? + } + None => { + let tx = conn.transaction().map_err(log_cas_error)?; + let rows = tx + .prepare_cached( + "INSERT INTO spin_key_value (store, key, value) VALUES ($1, $2, $3) + ON CONFLICT(store, key) DO UPDATE SET value=$3", + ) + .map_err(log_cas_error)? + .execute(rusqlite::params![&self.name, self.key, value]) + .map_err(log_cas_error)?; + tx.commit().map_err(log_cas_error)?; + rows + } + }; // We expect only 1 row to be updated. If 0, we know that the underlying value has changed. if rows_changed == 1 { From 8db1174fb5d3d32be88c5b01b574e4411fc203d2 Mon Sep 17 00:00:00 2001 From: Darwin Boersma Date: Thu, 14 Nov 2024 06:52:34 -0700 Subject: [PATCH 12/13] Updated to handle CasError in wasm mapping, ensure new cas is returned to client Signed-off-by: Darwin Boersma --- crates/factor-key-value/src/host.rs | 29 ++++++++++++++++--------- crates/key-value-spin/src/store.rs | 13 ++++------- crates/world/src/lib.rs | 1 + wit/deps/keyvalue-2024-10-17/atomic.wit | 18 +++++++-------- wit/deps/keyvalue-2024-10-17/world.wit | 2 +- 5 files changed, 34 insertions(+), 29 deletions(-) diff --git a/crates/factor-key-value/src/host.rs b/crates/factor-key-value/src/host.rs index efb473fb1b..ce5001a4cb 100644 --- a/crates/factor-key-value/src/host.rs +++ b/crates/factor-key-value/src/host.rs @@ -358,6 +358,13 @@ impl wasi_keyvalue::atomics::HostCas for KeyValueDispatch { #[async_trait] impl wasi_keyvalue::atomics::Host for KeyValueDispatch { + fn convert_cas_error( + &mut self, + error: spin_world::wasi::keyvalue::atomics::CasError, + ) -> std::result::Result { + Ok(error) + } + #[instrument(name = "spin_key_value.increment", skip(self, bucket, key, delta), err(level = Level::INFO), fields(otel.kind = "client"))] async fn increment( &mut self, @@ -374,27 +381,29 @@ impl wasi_keyvalue::atomics::Host for KeyValueDispatch { &mut self, cas_res: Resource, value: Vec, - ) -> Result> { + ) -> Result<(), CasError> { let cas_rep = cas_res.rep(); let cas = self .get_cas(Resource::::new_own(cas_rep)) .map_err(|e| CasError::StoreError(atomics::Error::Other(e.to_string())))?; match cas.swap(value).await { - Ok(_) => Ok(Ok(())), + Ok(_) => Ok(()), Err(err) => match err { SwapError::CasFailed(_) => { let bucket = Resource::new_own(cas.bucket_rep().await); - let new_cas = self.new(bucket, cas.key().await).await?; + let new_cas = self + .new(bucket, cas.key().await) + .await + .map_err(CasError::StoreError)?; let new_cas_rep = new_cas.rep(); - self.current(Resource::new_own(new_cas_rep)).await?; - Err(anyhow::Error::new(CasError::CasFailed(Resource::new_own( - new_cas_rep, - )))) + self.current(Resource::new_own(new_cas_rep)) + .await + .map_err(CasError::StoreError)?; + let res = Resource::new_own(new_cas_rep); + Err(CasError::CasFailed(res)) } - SwapError::Other(msg) => Err(anyhow::Error::new(CasError::StoreError( - atomics::Error::Other(msg), - ))), + SwapError::Other(msg) => Err(CasError::StoreError(atomics::Error::Other(msg))), }, } } diff --git a/crates/key-value-spin/src/store.rs b/crates/key-value-spin/src/store.rs index 7c3e50101d..122e63c413 100644 --- a/crates/key-value-spin/src/store.rs +++ b/crates/key-value-spin/src/store.rs @@ -522,15 +522,10 @@ mod test { let res = kv.swap(Resource::new_own(cas.rep()), cas_final_value).await; match res { Ok(_) => panic!("expected a CAS failure"), - Err(err) => { - for cause in err.chain() { - if let Some(cas_err) = cause.downcast_ref::() { - assert!(matches!(cas_err, CasError::CasFailed(_))); - return Ok(()); - } - } - panic!("expected a CAS failure") - } + Err(err) => match err { + CasError::CasFailed(_) => Ok(()), + CasError::StoreError(_) => panic!("expected a CasFailed error"), + }, } } diff --git a/crates/world/src/lib.rs b/crates/world/src/lib.rs index a3ecf810e4..501ca9c568 100644 --- a/crates/world/src/lib.rs +++ b/crates/world/src/lib.rs @@ -33,6 +33,7 @@ wasmtime::component::bindgen!({ "spin:postgres/postgres/error" => spin::postgres::postgres::Error, "wasi:config/store@0.2.0-draft-2024-09-27/error" => wasi::config::store::Error, "wasi:keyvalue/store/error" => wasi::keyvalue::store::Error, + "wasi:keyvalue/atomics/cas-error" => wasi::keyvalue::atomics::CasError, }, trappable_imports: true, }); diff --git a/wit/deps/keyvalue-2024-10-17/atomic.wit b/wit/deps/keyvalue-2024-10-17/atomic.wit index 2c3e0d047b..4a02c58034 100644 --- a/wit/deps/keyvalue-2024-10-17/atomic.wit +++ b/wit/deps/keyvalue-2024-10-17/atomic.wit @@ -13,22 +13,22 @@ interface atomics { /// The error returned by a CAS operation variant cas-error { - /// A store error occurred when performing the operation - store-error(error), + /// A store error occurred when performing the operation + store-error(error), /// The CAS operation failed because the value was too old. This returns a new CAS handle /// for easy retries. Implementors MUST return a CAS handle that has been updated to the /// latest version or transaction. - cas-failed(cas), + cas-failed(cas), } /// A handle to a CAS (compare-and-swap) operation. resource cas { - /// Construct a new CAS operation. Implementors can map the underlying functionality - /// (transactions, versions, etc) as desired. - new: static func(bucket: borrow, key: string) -> result; - /// Get the current value of the key (if it exists). This allows for avoiding reads if all - /// that is needed to ensure the atomicity of the operation - current: func() -> result>, error>; + /// Construct a new CAS operation. Implementors can map the underlying functionality + /// (transactions, versions, etc) as desired. + new: static func(bucket: borrow, key: string) -> result; + /// Get the current value of the key (if it exists). This allows for avoiding reads if all + /// that is needed to ensure the atomicity of the operation + current: func() -> result>, error>; } /// Atomically increment the value associated with the key in the store by the given delta. It diff --git a/wit/deps/keyvalue-2024-10-17/world.wit b/wit/deps/keyvalue-2024-10-17/world.wit index 64eb4e1225..e8fb821a95 100644 --- a/wit/deps/keyvalue-2024-10-17/world.wit +++ b/wit/deps/keyvalue-2024-10-17/world.wit @@ -1,4 +1,4 @@ -package wasi: keyvalue@0.2.0-draft2; +package wasi:keyvalue@0.2.0-draft2; /// The `wasi:keyvalue/imports` world provides common APIs for interacting with key-value stores. /// Components targeting this world will be able to do: From 6d8570bd0325f1a7eb9fffc79b9b2fcf13f56a4a Mon Sep 17 00:00:00 2001 From: Darwin Boersma Date: Thu, 14 Nov 2024 07:22:18 -0700 Subject: [PATCH 13/13] Flush in get_many to ensure updated cache Signed-off-by: Darwin Boersma --- crates/factor-key-value/src/util.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/factor-key-value/src/util.rs b/crates/factor-key-value/src/util.rs index 82dbb59611..9fec7e4348 100644 --- a/crates/factor-key-value/src/util.rs +++ b/crates/factor-key-value/src/util.rs @@ -251,6 +251,11 @@ impl Store for CachingStore { keys: Vec, ) -> anyhow::Result>)>, Error> { let mut state = self.state.lock().await; + + // Flush any outstanding writes first in case entries have been popped off the end of the LRU cache prior + // to their corresponding writes reaching the backing store. + state.flush().await?; + let mut found: Vec<(String, Option>)> = Vec::new(); let mut not_found: Vec = Vec::new(); for key in keys {