Skip to content

Commit 7e436ef

Browse files
committed
implement wasi kv 0.2.0-draft2 for redis and cosmosdb
Signed-off-by: David Justice <[email protected]>
1 parent 58871fa commit 7e436ef

File tree

7 files changed

+284
-45
lines changed

7 files changed

+284
-45
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/factor-key-value/src/host.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ impl wasi_keyvalue::atomics::Host for KeyValueDispatch {
378378
.map_err(|e| CasError::StoreError(atomics::Error::Other(e.to_string())))?;
379379

380380
match cas.swap(value).await {
381-
Ok(cas) => Ok(Ok(())),
381+
Ok(_) => Ok(Ok(())),
382382
Err(err) => {
383383
if err.to_string().contains("CAS_ERROR") {
384384
let bucket = Resource::new_own(cas.bucket_rep().await);

crates/factor-key-value/src/util.rs

Lines changed: 41 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -242,49 +242,63 @@ impl Store for CachingStore {
242242
&self,
243243
keys: Vec<String>,
244244
) -> anyhow::Result<Vec<(String, Option<Vec<u8>>)>, Error> {
245-
// // Retrieve the specified value from the cache, lazily populating the cache as necessary.
246-
// let mut state = self.state.lock().await;
247-
//
248-
// let mut keys_and_values: Vec<Option<(String, Vec<u8>)>> = Vec::new();
249-
// let mut keys_not_found: Vec<String> = Vec::new();
250-
// for key in keys {
251-
// match state.cache.get(key.as_str()).cloned() {
252-
// Some(value) => keys_and_values.push(Some((key, value))),
253-
// None => keys_not_found.push(key),
254-
// }
255-
// }
256-
//
257-
// // guarantee the guest will read its own writes even if entries have been popped off the end of the LRU
258-
// // cache prior to their corresponding writes reaching the backing store.
259-
// state.flush().await?;
260-
//
261-
// let value = self.inner.get(key).await?;
262-
//
263-
// state.cache.put(key.to_owned(), value.clone());
264-
//
265-
// Ok(value)
266-
//
245+
let mut state = self.state.lock().await;
246+
let mut found: Vec<(String, Option<Vec<u8>>)> = Vec::new();
247+
let mut not_found: Vec<String> = Vec::new();
248+
for key in keys {
249+
match state.cache.get(key.as_str()) {
250+
Some(res) => match res {
251+
Some(value) => found.push((key, Some(value.clone()))),
252+
None => not_found.push(key),
253+
},
254+
None => not_found.push(key),
255+
}
256+
}
267257

268-
todo!()
258+
let keys_and_values = self.inner.get_many(not_found).await?;
259+
for (key, value) in keys_and_values {
260+
found.push((key.clone(), value.clone()));
261+
state.cache.put(key, value);
262+
}
263+
264+
Ok(found)
269265
}
270266

271267
async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> anyhow::Result<(), Error> {
272-
todo!()
268+
let mut state = self.state.lock().await;
269+
270+
for (key, value) in key_values.clone() {
271+
state.cache.put(key.to_owned(), Some(value));
272+
}
273+
274+
self.inner.set_many(key_values).await
273275
}
274276

275277
async fn delete_many(&self, keys: Vec<String>) -> anyhow::Result<(), Error> {
276-
todo!()
278+
let mut state = self.state.lock().await;
279+
280+
for key in keys.clone() {
281+
state.cache.put(key.to_owned(), None);
282+
}
283+
284+
self.inner.delete_many(keys).await
277285
}
278286

279287
async fn increment(&self, key: String, delta: i64) -> anyhow::Result<i64, Error> {
280-
todo!()
288+
let counter = self.inner.increment(key.clone(), delta).await?;
289+
self.state
290+
.lock()
291+
.await
292+
.cache
293+
.put(key, Some(i64::to_le_bytes(counter).to_vec()));
294+
Ok(counter)
281295
}
282296

283297
async fn new_compare_and_swap(
284298
&self,
285299
bucket_rep: u32,
286300
key: &str,
287301
) -> anyhow::Result<Arc<dyn Cas>, Error> {
288-
todo!()
302+
self.inner.new_compare_and_swap(bucket_rep, key).await
289303
}
290304
}

crates/key-value-azure/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ rust-version.workspace = true
1212
anyhow = { workspace = true }
1313
azure_data_cosmos = { git = "https://github.com/azure/azure-sdk-for-rust.git", rev = "8c4caa251c3903d5eae848b41bb1d02a4d65231c" }
1414
azure_identity = { git = "https://github.com/azure/azure-sdk-for-rust.git", rev = "8c4caa251c3903d5eae848b41bb1d02a4d65231c" }
15+
azure_core = { git = "https://github.com/azure/azure-sdk-for-rust.git", rev = "8c4caa251c3903d5eae848b41bb1d02a4d65231c" }
1516
futures = { workspace = true }
1617
serde = { workspace = true }
1718
spin-core = { path = "../core" }

crates/key-value-azure/src/store.rs

Lines changed: 137 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
1-
use std::sync::Arc;
2-
31
use anyhow::Result;
2+
use azure_data_cosmos::prelude::Operation;
3+
use azure_data_cosmos::resources::collection::PartitionKey;
44
use azure_data_cosmos::{
55
prelude::{AuthorizationToken, CollectionClient, CosmosClient, Query},
66
CosmosEntity,
77
};
88
use futures::StreamExt;
99
use serde::{Deserialize, Serialize};
1010
use spin_core::async_trait;
11-
use spin_factor_key_value::{log_error, Error, Store, StoreManager};
11+
use spin_factor_key_value::{log_error, Cas, Error, Store, StoreManager};
12+
use std::sync::{Arc, Mutex};
1213

1314
pub struct KeyValueAzureCosmos {
1415
client: CollectionClient,
@@ -111,11 +112,19 @@ impl StoreManager for KeyValueAzureCosmos {
111112
}
112113
}
113114

115+
#[derive(Clone)]
114116
struct AzureCosmosStore {
115117
_name: String,
116118
client: CollectionClient,
117119
}
118120

121+
struct CompareAndSwap {
122+
key: String,
123+
client: CollectionClient,
124+
bucket_rep: u32,
125+
etag: Mutex<Option<String>>,
126+
}
127+
119128
#[async_trait]
120129
impl Store for AzureCosmosStore {
121130
async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error> {
@@ -153,27 +162,147 @@ impl Store for AzureCosmosStore {
153162
}
154163

155164
async fn get_many(&self, keys: Vec<String>) -> Result<Vec<(String, Option<Vec<u8>>)>, Error> {
156-
todo!()
165+
let in_clause: String = keys
166+
.into_iter()
167+
.map(|k| format!("'{}'", k))
168+
.collect::<Vec<String>>()
169+
.join(", ");
170+
let stmt = Query::new(format!("SELECT * FROM c WHERE c.id IN ({})", in_clause));
171+
let query = self
172+
.client
173+
.query_documents(stmt)
174+
.query_cross_partition(true);
175+
176+
let mut res = Vec::new();
177+
let mut stream = query.into_stream::<Pair>();
178+
while let Some(resp) = stream.next().await {
179+
let resp = resp.map_err(log_error)?;
180+
for (pair, _) in resp.results {
181+
res.push((pair.id, Some(pair.value)));
182+
}
183+
}
184+
Ok(res)
157185
}
158186

159187
async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> Result<(), Error> {
160-
todo!()
188+
for (key, value) in key_values {
189+
self.set(key.as_ref(), &value).await?
190+
}
191+
Ok(())
161192
}
162193

163194
async fn delete_many(&self, keys: Vec<String>) -> Result<(), Error> {
164-
todo!()
195+
for key in keys {
196+
self.delete(key.as_ref()).await?
197+
}
198+
Ok(())
165199
}
166200

167201
async fn increment(&self, key: String, delta: i64) -> Result<i64, Error> {
168-
todo!()
202+
let operations = vec![Operation::incr("/value", delta).map_err(log_error)?];
203+
let _ = self
204+
.client
205+
.document_client(key.clone(), &key.as_str())
206+
.map_err(log_error)?
207+
.patch_document(operations)
208+
.await
209+
.map_err(log_error)?;
210+
let pair = self.get_pair(key.as_ref()).await?;
211+
match pair {
212+
Some(p) => Ok(i64::from_le_bytes(
213+
p.value.try_into().expect("incorrect length"),
214+
)),
215+
None => Err(Error::Other(
216+
"increment returned an empty value after patching, which indicates a bug"
217+
.to_string(),
218+
)),
219+
}
169220
}
170221

171222
async fn new_compare_and_swap(
172223
&self,
173224
bucket_rep: u32,
174225
key: &str,
175226
) -> Result<Arc<dyn spin_factor_key_value::Cas>, Error> {
176-
todo!()
227+
Ok(Arc::new(CompareAndSwap {
228+
key: key.to_string(),
229+
client: self.client.clone(),
230+
etag: Mutex::new(None),
231+
bucket_rep,
232+
}))
233+
}
234+
}
235+
236+
#[async_trait]
237+
impl Cas for CompareAndSwap {
238+
async fn current(&self) -> Result<Option<Vec<u8>>, Error> {
239+
let mut stream = self
240+
.client
241+
.query_documents(Query::new(format!(
242+
"SELECT * FROM c WHERE c.id='{}'",
243+
self.key
244+
)))
245+
.query_cross_partition(true)
246+
.max_item_count(1)
247+
.into_stream::<Pair>();
248+
249+
let current_value: Option<(Vec<u8>, String)> = match stream.next().await {
250+
Some(r) => {
251+
let r = r.map_err(log_error)?;
252+
match r.results.first() {
253+
Some((item, attr)) => {
254+
Some((item.clone().value, attr.clone().unwrap().etag().to_string()))
255+
}
256+
None => None,
257+
}
258+
}
259+
None => None,
260+
};
261+
262+
match current_value {
263+
Some((value, etag)) => {
264+
*self.etag.lock().unwrap() = Some(etag);
265+
Ok(Some(value))
266+
}
267+
None => Ok(None),
268+
}
269+
}
270+
271+
async fn swap(&self, value: Vec<u8>) -> Result<(), Error> {
272+
let pk = PartitionKey::from(&self.key);
273+
let pair = Pair {
274+
id: self.key.clone(),
275+
value,
276+
};
277+
278+
let replace_builder = self
279+
.client
280+
.document_client(&self.key, &pk)
281+
.map_err(log_error)?
282+
.replace_document(pair);
283+
284+
let etag_value = self.etag.lock().unwrap().clone();
285+
let res = match etag_value {
286+
Some(etag) => {
287+
replace_builder
288+
.if_match_condition(azure_core::request_options::IfMatchCondition::Match(etag))
289+
.await
290+
}
291+
None => replace_builder.await,
292+
};
293+
294+
match res {
295+
Ok(_) => Ok(()),
296+
Err(_) => Err(Error::Other("CAS_ERROR".to_owned())),
297+
}
298+
}
299+
300+
async fn bucket_rep(&self) -> u32 {
301+
self.bucket_rep
302+
}
303+
304+
async fn key(&self) -> String {
305+
self.key.clone()
177306
}
178307
}
179308

crates/key-value-redis/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ spin-core = { path = "../core" }
1212
spin-factor-key-value = { path = "../factor-key-value" }
1313
tokio = { workspace = true }
1414
url = { workspace = true }
15+
log = "0.4.22"
1516

1617
[lints]
1718
workspace = true

0 commit comments

Comments
 (0)