Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/factor-key-value/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ spin-world = { path = "../world" }
tokio = { workspace = true, features = ["macros", "sync", "rt"] }
toml = { workspace = true }
tracing = { workspace = true }
thiserror = { workspace = true }

[dev-dependencies]
spin-factors-test = { path = "../factors-test" }
Expand Down
265 changes: 264 additions & 1 deletion crates/factor-key-value/src/host.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use super::{Cas, SwapError};
use anyhow::{Context, Result};
use spin_core::{async_trait, wasmtime::component::Resource};
use spin_resource_table::Table;
use spin_world::v2::key_value;
use spin_world::wasi::keyvalue as wasi_keyvalue;
use std::{collections::HashSet, sync::Arc};
use tracing::{instrument, Level};

Expand Down Expand Up @@ -30,12 +32,19 @@ pub trait Store: Sync + Send {
async fn delete(&self, key: &str) -> Result<(), Error>;
async fn exists(&self, key: &str) -> Result<bool, Error>;
async fn get_keys(&self) -> Result<Vec<String>, Error>;
async fn get_many(&self, keys: Vec<String>) -> Result<Vec<(String, Option<Vec<u8>>)>, Error>;
async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> Result<(), Error>;
async fn delete_many(&self, keys: Vec<String>) -> Result<(), Error>;
async fn increment(&self, key: String, delta: i64) -> Result<i64, Error>;
async fn new_compare_and_swap(&self, bucket_rep: u32, key: &str)
-> Result<Arc<dyn Cas>, Error>;
}

pub struct KeyValueDispatch {
allowed_stores: HashSet<String>,
manager: Arc<dyn StoreManager>,
stores: Table<Arc<dyn Store>>,
compare_and_swaps: Table<Arc<dyn Cas>>,
}

impl KeyValueDispatch {
Expand All @@ -52,16 +61,43 @@ impl KeyValueDispatch {
allowed_stores,
manager,
stores: Table::new(capacity),
compare_and_swaps: Table::new(capacity),
}
}

pub fn get_store(&self, store: Resource<key_value::Store>) -> anyhow::Result<&Arc<dyn Store>> {
pub fn get_store<T: 'static>(&self, store: Resource<T>) -> anyhow::Result<&Arc<dyn Store>> {
self.stores.get(store.rep()).context("invalid store")
}

pub fn get_cas<T: 'static>(&self, cas: Resource<T>) -> Result<&Arc<dyn Cas>> {
self.compare_and_swaps
.get(cas.rep())
.context("invalid compare and swap")
}

pub fn allowed_stores(&self) -> &HashSet<String> {
&self.allowed_stores
}

pub fn get_store_wasi<T: 'static>(
&self,
store: Resource<T>,
) -> Result<&Arc<dyn Store>, wasi_keyvalue::store::Error> {
self.stores
.get(store.rep())
.ok_or(wasi_keyvalue::store::Error::NoSuchStore)
}

pub fn get_cas_wasi<T: 'static>(
&self,
cas: Resource<T>,
) -> Result<&Arc<dyn Cas>, wasi_keyvalue::atomics::Error> {
self.compare_and_swaps
.get(cas.rep())
.ok_or(wasi_keyvalue::atomics::Error::Other(
"compare and swap not found".to_string(),
))
}
}

#[async_trait]
Expand Down Expand Up @@ -141,12 +177,239 @@ impl key_value::HostStore for KeyValueDispatch {
}
}

fn to_wasi_err(e: Error) -> wasi_keyvalue::store::Error {
match e {
Error::AccessDenied => wasi_keyvalue::store::Error::AccessDenied,
Error::NoSuchStore => wasi_keyvalue::store::Error::NoSuchStore,
Error::StoreTableFull => wasi_keyvalue::store::Error::Other("store table full".to_string()),
Error::Other(msg) => wasi_keyvalue::store::Error::Other(msg),
}
}

#[async_trait]
impl wasi_keyvalue::store::Host for KeyValueDispatch {
async fn open(
&mut self,
identifier: String,
) -> Result<Resource<wasi_keyvalue::store::Bucket>, wasi_keyvalue::store::Error> {
if self.allowed_stores.contains(&identifier) {
let store = self
.stores
.push(self.manager.get(&identifier).await.map_err(to_wasi_err)?)
.map_err(|()| wasi_keyvalue::store::Error::Other("store table full".to_string()))?;
Ok(Resource::new_own(store))
} else {
Err(wasi_keyvalue::store::Error::AccessDenied)
}
}

fn convert_error(
&mut self,
error: spin_world::wasi::keyvalue::store::Error,
) -> std::result::Result<spin_world::wasi::keyvalue::store::Error, anyhow::Error> {
Ok(error)
}
}

use wasi_keyvalue::store::Bucket;
#[async_trait]
impl wasi_keyvalue::store::HostBucket for KeyValueDispatch {
async fn get(
&mut self,
self_: Resource<Bucket>,
key: String,
) -> Result<Option<Vec<u8>>, wasi_keyvalue::store::Error> {
let store = self.get_store_wasi(self_)?;
store.get(&key).await.map_err(to_wasi_err)
}

async fn set(
&mut self,
self_: Resource<Bucket>,
key: String,
value: Vec<u8>,
) -> Result<(), wasi_keyvalue::store::Error> {
let store = self.get_store_wasi(self_)?;
store.set(&key, &value).await.map_err(to_wasi_err)
}

async fn delete(
&mut self,
self_: Resource<Bucket>,
key: String,
) -> Result<(), wasi_keyvalue::store::Error> {
let store = self.get_store_wasi(self_)?;
store.delete(&key).await.map_err(to_wasi_err)
}

async fn exists(
&mut self,
self_: Resource<Bucket>,
key: String,
) -> Result<bool, wasi_keyvalue::store::Error> {
let store = self.get_store_wasi(self_)?;
store.exists(&key).await.map_err(to_wasi_err)
}

async fn list_keys(
&mut self,
self_: Resource<Bucket>,
cursor: Option<String>,
) -> Result<wasi_keyvalue::store::KeyResponse, wasi_keyvalue::store::Error> {
match cursor {
Some(_) => Err(wasi_keyvalue::store::Error::Other(
"list_keys: cursor not supported".to_owned(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there ever a plan to support a cursor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for data stores that support paging. You could imagine a case where too many keys are in the store and the client would like to fetch them a page at a time.

Personally, I don't like this. I think listing all keys, possibly across partitions, is a really bad idea.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an artefact of the original Spin key-value interface not supporting a cursor. There's no technical reason we couldn't implement one in the WASI implementation (and have the Spin interface sit over that, rather than the other way round). It's just down to time and effort.

)),
None => {
let store = self.get_store_wasi(self_)?;
let keys = store.get_keys().await.map_err(to_wasi_err)?;
Ok(wasi_keyvalue::store::KeyResponse { keys, cursor: None })
}
}
}

async fn drop(&mut self, rep: Resource<Bucket>) -> anyhow::Result<()> {
self.stores.remove(rep.rep());
Ok(())
}
}

#[async_trait]
impl wasi_keyvalue::batch::Host for KeyValueDispatch {
#[instrument(name = "spin_key_value.get_many", skip(self, bucket, keys), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn get_many(
&mut self,
bucket: Resource<wasi_keyvalue::batch::Bucket>,
keys: Vec<String>,
) -> std::result::Result<Vec<(String, Option<Vec<u8>>)>, wasi_keyvalue::store::Error> {
let store = self.get_store_wasi(bucket)?;
store
.get_many(keys.iter().map(|k| k.to_string()).collect())
.await
.map_err(to_wasi_err)
}

#[instrument(name = "spin_key_value.set_many", skip(self, bucket, key_values), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn set_many(
&mut self,
bucket: Resource<wasi_keyvalue::batch::Bucket>,
key_values: Vec<(String, Vec<u8>)>,
) -> std::result::Result<(), wasi_keyvalue::store::Error> {
let store = self.get_store_wasi(bucket)?;
store.set_many(key_values).await.map_err(to_wasi_err)
}

#[instrument(name = "spin_key_value.get_many", skip(self, bucket, keys), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn delete_many(
&mut self,
bucket: Resource<wasi_keyvalue::batch::Bucket>,
keys: Vec<String>,
) -> std::result::Result<(), wasi_keyvalue::store::Error> {
let store = self.get_store_wasi(bucket)?;
store
.delete_many(keys.iter().map(|k| k.to_string()).collect())
.await
.map_err(to_wasi_err)
}
}

#[async_trait]
impl wasi_keyvalue::atomics::HostCas for KeyValueDispatch {
async fn new(
&mut self,
bucket: Resource<wasi_keyvalue::atomics::Bucket>,
key: String,
) -> Result<Resource<wasi_keyvalue::atomics::Cas>, wasi_keyvalue::store::Error> {
let bucket_rep = bucket.rep();
let bucket: Resource<Bucket> = Resource::new_own(bucket_rep);
let store = self.get_store_wasi(bucket)?;
let cas = store
.new_compare_and_swap(bucket_rep, &key)
.await
.map_err(to_wasi_err)?;
self.compare_and_swaps
.push(cas)
.map_err(|()| {
spin_world::wasi::keyvalue::store::Error::Other(
"too many compare_and_swaps opened".to_string(),
)
})
.map(Resource::new_own)
}

async fn current(
&mut self,
cas: Resource<wasi_keyvalue::atomics::Cas>,
) -> Result<Option<Vec<u8>>, wasi_keyvalue::store::Error> {
let cas = self
.get_cas(cas)
.map_err(|e| wasi_keyvalue::store::Error::Other(e.to_string()))?;
cas.current().await.map_err(to_wasi_err)
}

async fn drop(&mut self, rep: Resource<wasi_keyvalue::atomics::Cas>) -> Result<()> {
self.compare_and_swaps.remove(rep.rep());
Ok(())
}
}

#[async_trait]
impl wasi_keyvalue::atomics::Host for KeyValueDispatch {
#[instrument(name = "spin_key_value.increment", skip(self, bucket, key, delta), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn increment(
&mut self,
bucket: Resource<wasi_keyvalue::atomics::Bucket>,
key: String,
delta: i64,
) -> Result<i64, wasi_keyvalue::store::Error> {
let store = self.get_store_wasi(bucket)?;
store.increment(key, delta).await.map_err(to_wasi_err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Store::increment documented as requiring to be atomic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what sense? If I give a delta and then ask for it to be applied it will be without regard to the underlying value. The only issue with atomicity would be in the case where one fetches and increments while another has mutated the underlying value. In that case, there should be isolation from each mutation. In the case of Redis, this is handled with the INCR instruction. In other cases, it should be done within a transaction or optimistically.

I guest the short answer is that increment should be serializable and consistent.

}

#[instrument(name = "spin_key_value.swap", skip(self, cas_res, value), err(level = Level::INFO), fields(otel.kind = "client"))]
async fn swap(
&mut self,
cas_res: Resource<atomics::Cas>,
value: Vec<u8>,
) -> Result<std::result::Result<(), CasError>> {
let cas_rep = cas_res.rep();
let cas = self
.get_cas(Resource::<Bucket>::new_own(cas_rep))
.map_err(|e| CasError::StoreError(atomics::Error::Other(e.to_string())))?;

match cas.swap(value).await {
Ok(_) => Ok(Ok(())),
Err(err) => match err {
SwapError::CasFailed(_) => {
let bucket = Resource::new_own(cas.bucket_rep().await);
let new_cas = self.new(bucket, cas.key().await).await?;
let new_cas_rep = new_cas.rep();
self.current(Resource::new_own(new_cas_rep)).await?;
Err(anyhow::Error::new(CasError::CasFailed(Resource::new_own(
new_cas_rep,
))))
}
SwapError::Other(msg) => Err(anyhow::Error::new(CasError::StoreError(
atomics::Error::Other(msg),
))),
},
}
}
}

pub fn log_error(err: impl std::fmt::Debug) -> Error {
tracing::warn!("key-value error: {err:?}");
Error::Other(format!("{err:?}"))
}

pub fn log_cas_error(err: impl std::fmt::Debug) -> SwapError {
tracing::warn!("key-value error: {err:?}");
SwapError::Other(format!("{err:?}"))
}

use spin_world::v1::key_value::Error as LegacyError;
use spin_world::wasi::keyvalue::atomics;
use spin_world::wasi::keyvalue::atomics::{CasError, HostCas};

fn to_legacy_error(value: key_value::Error) -> LegacyError {
match value {
Expand Down
37 changes: 36 additions & 1 deletion crates/factor-key-value/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ use spin_locked_app::MetadataKey;

/// Metadata key for key-value stores.
pub const KEY_VALUE_STORES_KEY: MetadataKey<Vec<String>> = MetadataKey::new("key_value_stores");
pub use host::{log_error, Error, KeyValueDispatch, Store, StoreManager};
pub use host::{log_cas_error, log_error, Error, KeyValueDispatch, Store, StoreManager};
pub use runtime_config::RuntimeConfig;
use spin_core::async_trait;
pub use util::{CachingStoreManager, DelegatingStoreManager};

/// A factor that provides key-value storage.
Expand All @@ -40,6 +41,9 @@ impl Factor for KeyValueFactor {
fn init<T: Send + 'static>(&mut self, mut ctx: InitContext<T, Self>) -> anyhow::Result<()> {
ctx.link_bindings(spin_world::v1::key_value::add_to_linker)?;
ctx.link_bindings(spin_world::v2::key_value::add_to_linker)?;
ctx.link_bindings(spin_world::wasi::keyvalue::store::add_to_linker)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note: We probably don't want to force all embedders of this factor to support wasi key_value. We can leave it as is for now, but we'll probably want to come up with some sort of feature system that allows embedders to choose which interfaces get linked in.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rylev - would you mind adding an issue so we can track that if we don't already have one?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As Ryan says, this seems like a system-wide consideration and principle which we should approach from a system-wide perspective. For example it would apply equally to "embedders shouldn't be forced to support the v1 WITs." I agree with Michelle, let's capture this but let's not ad-hoc it into each PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the linker bindings for batch and atomic.

ctx.link_bindings(spin_world::wasi::keyvalue::batch::add_to_linker)?;
ctx.link_bindings(spin_world::wasi::keyvalue::atomics::add_to_linker)?;
Ok(())
}

Expand Down Expand Up @@ -131,6 +135,37 @@ impl AppState {
}
}

/// `SwapError` are errors that occur during compare and swap operations
#[derive(Debug, thiserror::Error)]
pub enum SwapError {
#[error("{0}")]
CasFailed(String),

#[error("{0}")]
Other(String),
}

/// `Cas` trait describes the interface a key value compare and swap implementor must fulfill.
///
/// `current` is expected to get the current value for the key associated with the CAS operation
/// while also starting what is needed to ensure the value to be replaced will not have mutated
/// between the time of calling `current` and `swap`. For example, a get from a backend store
/// may provide the caller with an etag (a version stamp), which can be used with an if-match
/// header to ensure the version updated is the version that was read (optimistic concurrency).
/// Rather than an etag, one could start a transaction, if supported by the backing store, which
/// would provide atomicity.
///
/// `swap` is expected to replace the old value with the new value respecting the atomicity of the
/// operation. If there was no key / value with the given key in the store, the `swap` operation
/// should **insert** the key and value, disallowing an update.
#[async_trait]
pub trait Cas: Sync + Send {
async fn current(&self) -> anyhow::Result<Option<Vec<u8>>, Error>;
async fn swap(&self, value: Vec<u8>) -> anyhow::Result<(), SwapError>;
async fn bucket_rep(&self) -> u32;
async fn key(&self) -> String;
}

pub struct InstanceBuilder {
/// The store manager for the app.
///
Expand Down
Loading
Loading