Skip to content

Commit 49c96ca

Browse files
committed
implement wasi kv 0.2.0-draft2 for redis and cosmosdb
Signed-off-by: David Justice <[email protected]>
1 parent a0a132f commit 49c96ca

File tree

11 files changed

+348
-62
lines changed

11 files changed

+348
-62
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/factor-key-value/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ spin-world = { path = "../world" }
1616
tokio = { workspace = true, features = ["macros", "sync", "rt"] }
1717
toml = { workspace = true }
1818
tracing = { workspace = true }
19+
thiserror = { workspace = true }
1920

2021
[dev-dependencies]
2122
spin-factors-test = { path = "../factors-test" }

crates/factor-key-value/src/host.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::Cas;
1+
use super::{Cas, SwapError};
22
use anyhow::{Context, Result};
33
use spin_core::{async_trait, wasmtime::component::Resource};
44
use spin_resource_table::Table;
@@ -378,22 +378,21 @@ impl wasi_keyvalue::atomics::Host for KeyValueDispatch {
378378
.map_err(|e| CasError::StoreError(atomics::Error::Other(e.to_string())))?;
379379

380380
match cas.swap(value).await {
381-
Ok(cas) => Ok(Ok(())),
382-
Err(err) => {
383-
if err.to_string().contains("CAS_ERROR") {
381+
Ok(_) => Ok(Ok(())),
382+
Err(err) => match err {
383+
SwapError::CasFailed(_) => {
384384
let bucket = Resource::new_own(cas.bucket_rep().await);
385385
let new_cas = self.new(bucket, cas.key().await).await?;
386386
let new_cas_rep = new_cas.rep();
387387
self.current(Resource::new_own(new_cas_rep)).await?;
388388
Err(anyhow::Error::new(CasError::CasFailed(Resource::new_own(
389389
new_cas_rep,
390390
))))
391-
} else {
392-
Err(anyhow::Error::new(CasError::StoreError(
393-
atomics::Error::Other(err.to_string()),
394-
)))
395391
}
396-
}
392+
SwapError::Other(msg) => Err(anyhow::Error::new(CasError::StoreError(
393+
atomics::Error::Other(msg),
394+
))),
395+
},
397396
}
398397
}
399398
}
@@ -403,6 +402,11 @@ pub fn log_error(err: impl std::fmt::Debug) -> Error {
403402
Error::Other(format!("{err:?}"))
404403
}
405404

405+
pub fn log_cas_error(err: impl std::fmt::Debug) -> SwapError {
406+
tracing::warn!("key-value error: {err:?}");
407+
SwapError::Other(format!("{err:?}"))
408+
}
409+
406410
use spin_world::v1::key_value::Error as LegacyError;
407411
use spin_world::wasi::keyvalue::atomics;
408412
use spin_world::wasi::keyvalue::atomics::{CasError, HostCas};

crates/factor-key-value/src/lib.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use spin_locked_app::MetadataKey;
1515

1616
/// Metadata key for key-value stores.
1717
pub const KEY_VALUE_STORES_KEY: MetadataKey<Vec<String>> = MetadataKey::new("key_value_stores");
18-
pub use host::{log_error, Error, KeyValueDispatch, Store, StoreManager};
18+
pub use host::{log_cas_error, log_error, Error, KeyValueDispatch, Store, StoreManager};
1919
pub use runtime_config::RuntimeConfig;
2020
use spin_core::async_trait;
2121
pub use util::{CachingStoreManager, DelegatingStoreManager};
@@ -42,6 +42,8 @@ impl Factor for KeyValueFactor {
4242
ctx.link_bindings(spin_world::v1::key_value::add_to_linker)?;
4343
ctx.link_bindings(spin_world::v2::key_value::add_to_linker)?;
4444
ctx.link_bindings(spin_world::wasi::keyvalue::store::add_to_linker)?;
45+
ctx.link_bindings(spin_world::wasi::keyvalue::batch::add_to_linker)?;
46+
ctx.link_bindings(spin_world::wasi::keyvalue::atomics::add_to_linker)?;
4547
Ok(())
4648
}
4749

@@ -133,10 +135,33 @@ impl AppState {
133135
}
134136
}
135137

138+
/// `SwapError` are errors that occur during compare and swap operations
139+
#[derive(Debug, thiserror::Error)]
140+
pub enum SwapError {
141+
#[error("{0}")]
142+
CasFailed(String),
143+
144+
#[error("{0}")]
145+
Other(String),
146+
}
147+
148+
/// `Cas` trait describes the interface a key value compare and swap implementor must fulfill.
149+
///
150+
/// `current` is expected to get the current value for the key associated with the CAS operation
151+
/// while also starting what is needed to ensure the value to be replaced will not have mutated
152+
/// between the time of calling `current` and `swap`. For example, a get from a backend store
153+
/// may provide the caller with an etag (a version stamp), which can be used with an if-match
154+
/// header to ensure the version updated is the version that was read (optimistic concurrency).
155+
/// Rather than an etag, one could start a transaction, if supported by the backing store, which
156+
/// would provide atomicity.
157+
///
158+
/// `swap` is expected to replace the old value with the new value respecting the atomicity of the
159+
/// operation. If there was no key / value with the given key in the store, the `swap` operation
160+
/// should **insert** the key and value, disallowing an update.
136161
#[async_trait]
137162
pub trait Cas: Sync + Send {
138163
async fn current(&self) -> anyhow::Result<Option<Vec<u8>>, Error>;
139-
async fn swap(&self, value: Vec<u8>) -> anyhow::Result<(), Error>;
164+
async fn swap(&self, value: Vec<u8>) -> anyhow::Result<(), SwapError>;
140165
async fn bucket_rep(&self) -> u32;
141166
async fn key(&self) -> String;
142167
}

crates/factor-key-value/src/util.rs

Lines changed: 40 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -242,49 +242,62 @@ impl Store for CachingStore {
242242
&self,
243243
keys: Vec<String>,
244244
) -> anyhow::Result<Vec<(String, Option<Vec<u8>>)>, Error> {
245-
// // Retrieve the specified value from the cache, lazily populating the cache as necessary.
246-
// let mut state = self.state.lock().await;
247-
//
248-
// let mut keys_and_values: Vec<Option<(String, Vec<u8>)>> = Vec::new();
249-
// let mut keys_not_found: Vec<String> = Vec::new();
250-
// for key in keys {
251-
// match state.cache.get(key.as_str()).cloned() {
252-
// Some(value) => keys_and_values.push(Some((key, value))),
253-
// None => keys_not_found.push(key),
254-
// }
255-
// }
256-
//
257-
// // guarantee the guest will read its own writes even if entries have been popped off the end of the LRU
258-
// // cache prior to their corresponding writes reaching the backing store.
259-
// state.flush().await?;
260-
//
261-
// let value = self.inner.get(key).await?;
262-
//
263-
// state.cache.put(key.to_owned(), value.clone());
264-
//
265-
// Ok(value)
266-
//
245+
let mut state = self.state.lock().await;
246+
let mut found: Vec<(String, Option<Vec<u8>>)> = Vec::new();
247+
let mut not_found: Vec<String> = Vec::new();
248+
for key in keys {
249+
match state.cache.get(key.as_str()) {
250+
Some(res) => match res {
251+
Some(value) => found.push((key, Some(value.clone()))),
252+
None => not_found.push(key),
253+
},
254+
None => not_found.push(key),
255+
}
256+
}
267257

268-
todo!()
258+
let keys_and_values = self.inner.get_many(not_found).await?;
259+
for (key, value) in keys_and_values {
260+
found.push((key.clone(), value.clone()));
261+
state.cache.put(key, value);
262+
}
263+
264+
Ok(found)
269265
}
270266

271267
async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> anyhow::Result<(), Error> {
272-
todo!()
268+
let mut state = self.state.lock().await;
269+
270+
for (key, value) in key_values.clone() {
271+
state.cache.put(key, Some(value));
272+
}
273+
274+
self.inner.set_many(key_values).await
273275
}
274276

275277
async fn delete_many(&self, keys: Vec<String>) -> anyhow::Result<(), Error> {
276-
todo!()
278+
let mut state = self.state.lock().await;
279+
280+
for key in keys.clone() {
281+
state.cache.put(key, None);
282+
}
283+
284+
self.inner.delete_many(keys).await
277285
}
278286

279287
async fn increment(&self, key: String, delta: i64) -> anyhow::Result<i64, Error> {
280-
todo!()
288+
let state = self.state.lock().await;
289+
let counter = self.inner.increment(key.clone(), delta).await?;
290+
state
291+
.cache
292+
.put(key, Some(i64::to_le_bytes(counter).to_vec()));
293+
Ok(counter)
281294
}
282295

283296
async fn new_compare_and_swap(
284297
&self,
285298
bucket_rep: u32,
286299
key: &str,
287300
) -> anyhow::Result<Arc<dyn Cas>, Error> {
288-
todo!()
301+
self.inner.new_compare_and_swap(bucket_rep, key).await
289302
}
290303
}

crates/key-value-azure/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ rust-version.workspace = true
1212
anyhow = { workspace = true }
1313
azure_data_cosmos = { git = "https://github.com/azure/azure-sdk-for-rust.git", rev = "8c4caa251c3903d5eae848b41bb1d02a4d65231c" }
1414
azure_identity = { git = "https://github.com/azure/azure-sdk-for-rust.git", rev = "8c4caa251c3903d5eae848b41bb1d02a4d65231c" }
15+
azure_core = { git = "https://github.com/azure/azure-sdk-for-rust.git", rev = "8c4caa251c3903d5eae848b41bb1d02a4d65231c" }
1516
futures = { workspace = true }
1617
serde = { workspace = true }
1718
spin-core = { path = "../core" }

0 commit comments

Comments
 (0)