Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
468 changes: 463 additions & 5 deletions Cargo.lock

Large diffs are not rendered by default.

48 changes: 30 additions & 18 deletions crates/factor-key-value/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,10 @@ impl wasi_keyvalue::batch::Host for KeyValueDispatch {
keys: Vec<String>,
) -> std::result::Result<Vec<(String, Option<Vec<u8>>)>, wasi_keyvalue::store::Error> {
let store = self.get_store_wasi(bucket)?;
store
.get_many(keys.iter().map(|k| k.to_string()).collect())
.await
.map_err(to_wasi_err)
if keys.is_empty() {
return Ok(vec![]);
}
store.get_many(keys).await.map_err(to_wasi_err)
}

#[instrument(name = "spin_key_value.set_many", skip(self, bucket, key_values), err(level = Level::INFO), fields(otel.kind = "client"))]
Expand All @@ -296,6 +296,9 @@ impl wasi_keyvalue::batch::Host for KeyValueDispatch {
key_values: Vec<(String, Vec<u8>)>,
) -> std::result::Result<(), wasi_keyvalue::store::Error> {
let store = self.get_store_wasi(bucket)?;
if key_values.is_empty() {
return Ok(());
}
store.set_many(key_values).await.map_err(to_wasi_err)
}

Expand All @@ -306,10 +309,10 @@ impl wasi_keyvalue::batch::Host for KeyValueDispatch {
keys: Vec<String>,
) -> std::result::Result<(), wasi_keyvalue::store::Error> {
let store = self.get_store_wasi(bucket)?;
store
.delete_many(keys.iter().map(|k| k.to_string()).collect())
.await
.map_err(to_wasi_err)
if keys.is_empty() {
return Ok(());
}
store.delete_many(keys).await.map_err(to_wasi_err)
}
}

Expand Down Expand Up @@ -355,6 +358,13 @@ impl wasi_keyvalue::atomics::HostCas for KeyValueDispatch {

#[async_trait]
impl wasi_keyvalue::atomics::Host for KeyValueDispatch {
fn convert_cas_error(
&mut self,
error: spin_world::wasi::keyvalue::atomics::CasError,
) -> std::result::Result<spin_world::wasi::keyvalue::atomics::CasError, anyhow::Error> {
Ok(error)
}

#[instrument(name = "spin_key_value.increment", skip(self, bucket, key, delta), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn increment(
&mut self,
Expand All @@ -371,27 +381,29 @@ impl wasi_keyvalue::atomics::Host for KeyValueDispatch {
&mut self,
cas_res: Resource<atomics::Cas>,
value: Vec<u8>,
) -> Result<std::result::Result<(), CasError>> {
) -> Result<(), CasError> {
let cas_rep = cas_res.rep();
let cas = self
.get_cas(Resource::<Bucket>::new_own(cas_rep))
.map_err(|e| CasError::StoreError(atomics::Error::Other(e.to_string())))?;

match cas.swap(value).await {
Ok(_) => Ok(Ok(())),
Ok(_) => Ok(()),
Err(err) => match err {
SwapError::CasFailed(_) => {
let bucket = Resource::new_own(cas.bucket_rep().await);
let new_cas = self.new(bucket, cas.key().await).await?;
let new_cas = self
.new(bucket, cas.key().await)
.await
.map_err(CasError::StoreError)?;
let new_cas_rep = new_cas.rep();
self.current(Resource::new_own(new_cas_rep)).await?;
Err(anyhow::Error::new(CasError::CasFailed(Resource::new_own(
new_cas_rep,
))))
self.current(Resource::new_own(new_cas_rep))
.await
.map_err(CasError::StoreError)?;
let res = Resource::new_own(new_cas_rep);
Err(CasError::CasFailed(res))
}
SwapError::Other(msg) => Err(anyhow::Error::new(CasError::StoreError(
atomics::Error::Other(msg),
))),
SwapError::Other(msg) => Err(CasError::StoreError(atomics::Error::Other(msg))),
},
}
}
Expand Down
15 changes: 11 additions & 4 deletions crates/factor-key-value/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ impl Store for CachingStore {
keys: Vec<String>,
) -> anyhow::Result<Vec<(String, Option<Vec<u8>>)>, Error> {
let mut state = self.state.lock().await;

// Flush any outstanding writes first in case entries have been popped off the end of the LRU cache prior
// to their corresponding writes reaching the backing store.
state.flush().await?;

let mut found: Vec<(String, Option<Vec<u8>>)> = Vec::new();
let mut not_found: Vec<String> = Vec::new();
for key in keys {
Expand All @@ -260,10 +265,12 @@ impl Store for CachingStore {
}
}

let keys_and_values = self.inner.get_many(not_found).await?;
for (key, value) in keys_and_values {
found.push((key.clone(), value.clone()));
state.cache.put(key, value);
if !not_found.is_empty() {
let keys_and_values = self.inner.get_many(not_found).await?;
for (key, value) in keys_and_values {
found.push((key.clone(), value.clone()));
state.cache.put(key, value);
}
}

Ok(found)
Expand Down
22 changes: 22 additions & 0 deletions crates/key-value-aws/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "spin-key-value-aws"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
rust-version.workspace = true

[dependencies]
anyhow = { workspace = true }
async-once-cell = "0.5.4"
aws-config = "1.1.7"
aws-credential-types = "1.1.7"
aws-sdk-dynamodb = "1.49.0"
serde = { workspace = true }
spin-core = { path = "../core" }
spin-factor-key-value = { path = "../factor-key-value" }

[lints]
workspace = true
74 changes: 74 additions & 0 deletions crates/key-value-aws/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
mod store;

use serde::Deserialize;
use spin_factor_key_value::runtime_config::spin::MakeKeyValueStore;
use store::{
KeyValueAwsDynamo, KeyValueAwsDynamoAuthOptions, KeyValueAwsDynamoRuntimeConfigOptions,
};

/// A key-value store that uses AWS Dynamo as the backend.
#[derive(Default)]
pub struct AwsDynamoKeyValueStore {
_priv: (),
}

impl AwsDynamoKeyValueStore {
/// Creates a new `AwsKeyValueStore`.
pub fn new() -> Self {
Self::default()
}
}

/// Runtime configuration for the AWS Dynamo key-value store.
#[derive(Deserialize)]
pub struct AwsDynamoKeyValueRuntimeConfig {
/// The access key for the AWS Dynamo DB account role.
access_key: Option<String>,
/// The secret key for authorization on the AWS Dynamo DB account.
secret_key: Option<String>,
/// The token for authorization on the AWS Dynamo DB account.
token: Option<String>,
/// The AWS region where the database is located
region: String,
/// Boolean determining whether to use strongly consistent reads.
/// Defaults to `false` but can be set to `true` to improve atomicity
consistent_read: Option<bool>,
/// The AWS Dynamo DB table.
table: String,
}

impl MakeKeyValueStore for AwsDynamoKeyValueStore {
const RUNTIME_CONFIG_TYPE: &'static str = "aws_dynamo";

type RuntimeConfig = AwsDynamoKeyValueRuntimeConfig;

type StoreManager = KeyValueAwsDynamo;

fn make_store(
&self,
runtime_config: Self::RuntimeConfig,
) -> anyhow::Result<Self::StoreManager> {
let AwsDynamoKeyValueRuntimeConfig {
access_key,
secret_key,
token,
region,
consistent_read,
table,
} = runtime_config;
let auth_options = match (access_key, secret_key) {
(Some(access_key), Some(secret_key)) => {
KeyValueAwsDynamoAuthOptions::RuntimeConfigValues(
KeyValueAwsDynamoRuntimeConfigOptions::new(access_key, secret_key, token),
)
}
_ => KeyValueAwsDynamoAuthOptions::Environmental,
};
KeyValueAwsDynamo::new(
region,
consistent_read.unwrap_or(false),
table,
auth_options,
)
}
}
Loading
Loading