Skip to content

Commit 5ee121a

Browse files
committed
implement kv factor for 0.2.0draft2 and stub impls
Signed-off-by: David Justice <[email protected]>
1 parent 470a264 commit 5ee121a

File tree

6 files changed

+217
-7
lines changed

6 files changed

+217
-7
lines changed

crates/factor-key-value/src/host.rs

Lines changed: 128 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use spin_world::v2::key_value;
55
use spin_world::wasi::keyvalue as wasi_keyvalue;
66
use std::{collections::HashSet, sync::Arc};
77
use tracing::{instrument, Level};
8+
use super::Cas;
89

910
const DEFAULT_STORE_TABLE_CAPACITY: u32 = 256;
1011

@@ -31,12 +32,20 @@ pub trait Store: Sync + Send {
3132
async fn delete(&self, key: &str) -> Result<(), Error>;
3233
async fn exists(&self, key: &str) -> Result<bool, Error>;
3334
async fn get_keys(&self) -> Result<Vec<String>, Error>;
35+
async fn get_many(&self, keys: Vec<String>) -> Result<Vec<Option<(String, Vec<u8>)>>, Error>;
36+
async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> Result<(), Error>;
37+
async fn delete_many(&self, keys: Vec<String>) -> Result<(), Error>;
38+
async fn increment(&self, key: String, delta: i64) -> Result<i64, Error>;
39+
async fn new_compare_and_swap(&self, key: &str) -> Result<Arc<dyn Cas>, Error>;
3440
}
3541

42+
43+
3644
pub struct KeyValueDispatch {
3745
allowed_stores: HashSet<String>,
3846
manager: Arc<dyn StoreManager>,
3947
stores: Table<Arc<dyn Store>>,
48+
compare_and_swaps: Table<Arc<dyn Cas>>,
4049
}
4150

4251
impl KeyValueDispatch {
@@ -53,13 +62,20 @@ impl KeyValueDispatch {
5362
allowed_stores,
5463
manager,
5564
stores: Table::new(capacity),
65+
compare_and_swaps: Table::new(capacity),
5666
}
5767
}
5868

5969
pub fn get_store<T: 'static>(&self, store: Resource<T>) -> anyhow::Result<&Arc<dyn Store>> {
6070
self.stores.get(store.rep()).context("invalid store")
6171
}
6272

73+
pub fn get_cas<T: 'static>(&self, cas: Resource<T>) -> Result<&Arc<dyn Cas>> {
74+
self.compare_and_swaps
75+
.get(cas.rep())
76+
.context("invalid compare and swap")
77+
}
78+
6379
pub fn allowed_stores(&self) -> &HashSet<String> {
6480
&self.allowed_stores
6581
}
@@ -72,6 +88,17 @@ impl KeyValueDispatch {
7288
.get(store.rep())
7389
.ok_or(wasi_keyvalue::store::Error::NoSuchStore)
7490
}
91+
92+
pub fn get_cas_wasi<T: 'static>(
93+
&self,
94+
cas: Resource<T>,
95+
) -> Result<&Arc<dyn Cas>, wasi_keyvalue::atomics::Error> {
96+
self.compare_and_swaps
97+
.get(cas.rep())
98+
.ok_or(wasi_keyvalue::atomics::Error::Other(
99+
"compare and swap not found".to_string(),
100+
))
101+
}
75102
}
76103

77104
#[async_trait]
@@ -231,11 +258,9 @@ impl wasi_keyvalue::store::HostBucket for KeyValueDispatch {
231258
cursor: Option<String>,
232259
) -> Result<wasi_keyvalue::store::KeyResponse, wasi_keyvalue::store::Error> {
233260
match cursor {
234-
Some(_) => {
235-
Err(wasi_keyvalue::store::Error::Other(
236-
"list_keys: cursor not supported".to_owned(),
237-
))
238-
},
261+
Some(_) => Err(wasi_keyvalue::store::Error::Other(
262+
"list_keys: cursor not supported".to_owned(),
263+
)),
239264
None => {
240265
let store = self.get_store_wasi(self_)?;
241266
let keys = store.get_keys().await.map_err(to_wasi_err)?;
@@ -250,6 +275,104 @@ impl wasi_keyvalue::store::HostBucket for KeyValueDispatch {
250275
}
251276
}
252277

278+
#[async_trait]
279+
impl wasi_keyvalue::batch::Host for KeyValueDispatch {
280+
#[instrument(name = "spin_key_value.get_many", skip(self, bucket, keys), err(level = Level::INFO), fields(otel.kind = "client"))]
281+
async fn get_many(
282+
&mut self,
283+
bucket: Resource<wasi_keyvalue::batch::Bucket>,
284+
keys: Vec<String>,
285+
) -> std::result::Result<Vec<Option<(String, Vec<u8>)>>, wasi_keyvalue::store::Error> {
286+
let store = self.get_store_wasi(bucket)?;
287+
store
288+
.get_many(keys.iter().map(|k| k.to_string()).collect())
289+
.await
290+
.map_err(to_wasi_err)
291+
}
292+
293+
#[instrument(name = "spin_key_value.set_many", skip(self, bucket, key_values), err(level = Level::INFO), fields(otel.kind = "client"))]
294+
async fn set_many(
295+
&mut self,
296+
bucket: Resource<wasi_keyvalue::batch::Bucket>,
297+
key_values: Vec<(String, Vec<u8>)>,
298+
) -> std::result::Result<(), wasi_keyvalue::store::Error> {
299+
let store = self.get_store_wasi(bucket)?;
300+
store.set_many(key_values).await.map_err(to_wasi_err)
301+
}
302+
303+
#[instrument(name = "spin_key_value.get_many", skip(self, bucket, keys), err(level = Level::INFO), fields(otel.kind = "client"))]
304+
async fn delete_many(
305+
&mut self,
306+
bucket: Resource<wasi_keyvalue::batch::Bucket>,
307+
keys: Vec<String>,
308+
) -> std::result::Result<(), wasi_keyvalue::store::Error> {
309+
let store = self.get_store_wasi(bucket)?;
310+
store
311+
.delete_many(keys.iter().map(|k| k.to_string()).collect())
312+
.await
313+
.map_err(to_wasi_err)
314+
}
315+
}
316+
317+
#[async_trait]
318+
impl wasi_keyvalue::atomics::HostCas for KeyValueDispatch {
319+
async fn new(
320+
&mut self,
321+
bucket: Resource<wasi_keyvalue::atomics::Bucket>,
322+
key: String,
323+
) -> Result<Resource<wasi_keyvalue::atomics::Cas>, wasi_keyvalue::store::Error> {
324+
let store = self.get_store_wasi(bucket)?;
325+
let cas = store.new_compare_and_swap(&key).await.map_err(to_wasi_err)?;
326+
self.compare_and_swaps
327+
.push(cas)
328+
.map_err(|()| spin_world::wasi::keyvalue::store::Error::Other("too many compare_and_swaps opened".to_string()))
329+
.map(Resource::new_own)
330+
}
331+
332+
async fn current(
333+
&mut self,
334+
cas: Resource<wasi_keyvalue::atomics::Cas>,
335+
) -> Result<Option<Vec<u8>>, wasi_keyvalue::store::Error> {
336+
let cas = self
337+
.get_cas(cas)
338+
.map_err(|e| wasi_keyvalue::store::Error::Other(e.to_string()))?;
339+
cas.current().await.map_err(to_wasi_err)
340+
}
341+
342+
async fn drop(&mut self, rep: Resource<wasi_keyvalue::atomics::Cas>) -> Result<()> {
343+
self.compare_and_swaps.remove(rep.rep());
344+
Ok(())
345+
}
346+
}
347+
348+
#[async_trait]
349+
impl wasi_keyvalue::atomics::Host for KeyValueDispatch {
350+
#[instrument(name = "spin_key_value.increment", skip(self, bucket, key, delta), err(level = Level::INFO), fields(otel.kind = "client"))]
351+
async fn increment(
352+
&mut self,
353+
bucket: Resource<wasi_keyvalue::atomics::Bucket>,
354+
key: String,
355+
delta: i64,
356+
) -> Result<i64, wasi_keyvalue::store::Error> {
357+
let store = self.get_store_wasi(bucket)?;
358+
store.increment(key, delta).await.map_err(to_wasi_err)
359+
}
360+
361+
#[instrument(name = "spin_key_value.swap", skip(self, cas_res, value), err(level = Level::INFO), fields(otel.kind = "client"))]
362+
async fn swap(
363+
&mut self,
364+
cas_res: Resource<wasi_keyvalue::atomics::Cas>,
365+
value: Vec<u8>,
366+
) -> Result<std::result::Result<(), wasi_keyvalue::atomics::CasError>> {
367+
let cas = self
368+
.get_cas(cas_res)
369+
.map_err(|e| wasi_keyvalue::atomics::CasError::StoreError(wasi_keyvalue::atomics::Error::Other(e.to_string())))?;
370+
Ok(cas.swap(value)
371+
.await
372+
.map_err(|e| wasi_keyvalue::atomics::CasError::StoreError(wasi_keyvalue::atomics::Error::Other(e.to_string()))))
373+
}
374+
}
375+
253376
pub fn log_error(err: impl std::fmt::Debug) -> Error {
254377
tracing::warn!("key-value error: {err:?}");
255378
Error::Other(format!("{err:?}"))

crates/factor-key-value/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use spin_locked_app::MetadataKey;
1717
pub const KEY_VALUE_STORES_KEY: MetadataKey<Vec<String>> = MetadataKey::new("key_value_stores");
1818
pub use host::{log_error, Error, KeyValueDispatch, Store, StoreManager};
1919
pub use runtime_config::RuntimeConfig;
20+
use spin_core::async_trait;
2021
pub use util::{CachingStoreManager, DelegatingStoreManager};
2122

2223
/// A factor that provides key-value storage.
@@ -132,6 +133,12 @@ impl AppState {
132133
}
133134
}
134135

136+
#[async_trait]
137+
pub trait Cas: Sync + Send {
138+
async fn current(&self) -> anyhow::Result<Option<Vec<u8>>, Error>;
139+
async fn swap(&self, value: Vec<u8>) -> anyhow::Result<(), Error>;
140+
}
141+
135142
pub struct InstanceBuilder {
136143
/// The store manager for the app.
137144
///

crates/factor-key-value/src/util.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{Error, Store, StoreManager};
1+
use crate::{Cas, Error, Store, StoreManager};
22
use lru::LruCache;
33
use spin_core::async_trait;
44
use std::{
@@ -237,4 +237,24 @@ impl Store for CachingStore {
237237
.into_iter()
238238
.collect())
239239
}
240+
241+
async fn get_many(&self, keys: Vec<String>) -> anyhow::Result<Vec<Option<(String, Vec<u8>)>>, Error> {
242+
todo!()
243+
}
244+
245+
async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> anyhow::Result<(), Error> {
246+
todo!()
247+
}
248+
249+
async fn delete_many(&self, keys: Vec<String>) -> anyhow::Result<(), Error> {
250+
todo!()
251+
}
252+
253+
async fn increment(&self, key: String, delta: i64) -> anyhow::Result<i64, Error> {
254+
todo!()
255+
}
256+
257+
async fn new_compare_and_swap(&self, key: &str) -> anyhow::Result<Arc<dyn Cas>, Error> {
258+
todo!()
259+
}
240260
}

crates/key-value-azure/src/store.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,26 @@ impl Store for AzureCosmosStore {
151151
async fn get_keys(&self) -> Result<Vec<String>, Error> {
152152
self.get_keys().await
153153
}
154+
155+
async fn get_many(&self, keys: Vec<String>) -> Result<Vec<Option<(String, Vec<u8>)>>, Error> {
156+
todo!()
157+
}
158+
159+
async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> Result<(), Error> {
160+
todo!()
161+
}
162+
163+
async fn delete_many(&self, keys: Vec<String>) -> Result<(), Error> {
164+
todo!()
165+
}
166+
167+
async fn increment(&self, key: String, delta: i64) -> Result<i64, Error> {
168+
todo!()
169+
}
170+
171+
async fn new_compare_and_swap(&self, key: &str) -> Result<Arc<dyn spin_factor_key_value::Cas>, Error> {
172+
todo!()
173+
}
154174
}
155175

156176
impl AzureCosmosStore {

crates/key-value-redis/src/store.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,4 +98,24 @@ impl Store for RedisStore {
9898
.await
9999
.map_err(log_error)
100100
}
101+
102+
async fn get_many(&self, keys: Vec<String>) -> Result<Vec<Option<(String, Vec<u8>)>>, Error> {
103+
todo!()
104+
}
105+
106+
async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> Result<(), Error> {
107+
todo!()
108+
}
109+
110+
async fn delete_many(&self, keys: Vec<String>) -> Result<(), Error> {
111+
todo!()
112+
}
113+
114+
async fn increment(&self, key: String, delta: i64) -> Result<i64, Error> {
115+
todo!()
116+
}
117+
118+
async fn new_compare_and_swap(&self, key: &str) -> Result<Arc<dyn spin_factor_key_value::Cas>, Error> {
119+
todo!()
120+
}
101121
}

crates/key-value-spin/src/store.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use anyhow::Result;
22
use rusqlite::Connection;
33
use spin_core::async_trait;
4-
use spin_factor_key_value::{log_error, Error, Store, StoreManager};
4+
use spin_factor_key_value::{log_error, Cas, Error, Store, StoreManager};
55
use std::{
66
path::PathBuf,
77
sync::OnceLock,
@@ -156,6 +156,26 @@ impl Store for SqliteStore {
156156
.collect()
157157
})
158158
}
159+
160+
async fn get_many(&self, keys: Vec<String>) -> Result<Vec<Option<(String, Vec<u8>)>>, Error> {
161+
todo!()
162+
}
163+
164+
async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> Result<(), Error> {
165+
todo!()
166+
}
167+
168+
async fn delete_many(&self, keys: Vec<String>) -> Result<(), Error> {
169+
todo!()
170+
}
171+
172+
async fn increment(&self, key: String, delta: i64) -> Result<i64, Error> {
173+
todo!()
174+
}
175+
176+
async fn new_compare_and_swap(&self, key: &str) -> Result<Arc<dyn Cas>, Error> {
177+
todo!()
178+
}
159179
}
160180

161181
#[cfg(test)]

0 commit comments

Comments
 (0)