Skip to content

Commit 6c54656

Browse files
authored
Merge pull request #1880 from fermyon/key-value-tweaks
Key value interface changes
2 parents 1c37552 + c828c8e commit 6c54656

File tree

32 files changed

+542
-2347
lines changed

32 files changed

+542
-2347
lines changed

crates/key-value-azure/src/lib.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@ struct AzureCosmosStore {
4646

4747
#[async_trait]
4848
impl Store for AzureCosmosStore {
49-
async fn get(&self, key: &str) -> Result<Vec<u8>, Error> {
49+
async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error> {
5050
let pair = self.get_pair(key).await?;
51-
Ok(pair.value)
51+
Ok(pair.map(|p| p.value))
5252
}
5353

5454
async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error> {
@@ -73,11 +73,7 @@ impl Store for AzureCosmosStore {
7373
}
7474

7575
async fn exists(&self, key: &str) -> Result<bool, Error> {
76-
match self.get_pair(key).await {
77-
Ok(_) => Ok(true),
78-
Err(Error::NoSuchKey) => Ok(false),
79-
Err(e) => Err(e),
80-
}
76+
Ok(self.get_pair(key).await?.is_some())
8177
}
8278

8379
async fn get_keys(&self) -> Result<Vec<String>, Error> {
@@ -86,7 +82,7 @@ impl Store for AzureCosmosStore {
8682
}
8783

8884
impl AzureCosmosStore {
89-
async fn get_pair(&self, key: &str) -> Result<Pair, Error> {
85+
async fn get_pair(&self, key: &str) -> Result<Option<Pair>, Error> {
9086
let query = self
9187
.client
9288
.query_documents(Query::new(format!("SELECT * FROM c WHERE c.id='{}'", key)))
@@ -100,11 +96,11 @@ impl AzureCosmosStore {
10096
Some(r) => {
10197
let r = r.map_err(log_error)?;
10298
match r.results.first().cloned() {
103-
Some((p, _)) => Ok(p),
104-
None => Err(Error::NoSuchKey),
99+
Some((p, _)) => Ok(Some(p)),
100+
None => Ok(None),
105101
}
106102
}
107-
None => Err(Error::NoSuchKey),
103+
None => Ok(None),
108104
}
109105
}
110106

crates/key-value-redis/src/lib.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,9 @@ struct RedisStore {
5353

5454
#[async_trait]
5555
impl Store for RedisStore {
56-
async fn get(&self, key: &str) -> Result<Vec<u8>, Error> {
56+
async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error> {
5757
let mut conn = self.connection.lock().await;
58-
let result: Vec<u8> = conn.get(key).await.map_err(log_error)?;
59-
60-
if result.is_empty() {
61-
Err(Error::NoSuchKey)
62-
} else {
63-
Ok(result)
64-
}
58+
conn.get(key).await.map_err(log_error)
6559
}
6660

6761
async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error> {

crates/key-value-sqlite/src/lib.rs

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ struct SqliteStore {
7474

7575
#[async_trait]
7676
impl Store for SqliteStore {
77-
async fn get(&self, key: &str) -> Result<Vec<u8>, Error> {
77+
async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error> {
7878
task::block_in_place(|| {
7979
self.connection
8080
.lock()
@@ -84,7 +84,7 @@ impl Store for SqliteStore {
8484
.query_map([&self.name, key], |row| row.get(0))
8585
.map_err(log_error)?
8686
.next()
87-
.ok_or(Error::NoSuchKey)?
87+
.transpose()
8888
.map_err(log_error)
8989
})
9090
}
@@ -119,11 +119,7 @@ impl Store for SqliteStore {
119119
}
120120

121121
async fn exists(&self, key: &str) -> Result<bool, Error> {
122-
match self.get(key).await {
123-
Ok(_) => Ok(true),
124-
Err(Error::NoSuchKey) => Ok(false),
125-
Err(e) => Err(e),
126-
}
122+
Ok(self.get(key).await?.is_some())
127123
}
128124

129125
async fn get_keys(&self) -> Result<Vec<String>, Error> {
@@ -162,11 +158,6 @@ mod test {
162158
)])),
163159
);
164160

165-
assert!(matches!(
166-
kv.exists(Resource::new_own(42), "bar".to_owned()).await?,
167-
Err(Error::InvalidStore)
168-
));
169-
170161
assert!(matches!(
171162
kv.open("foo".to_owned()).await?,
172163
Err(Error::NoSuchStore)
@@ -186,7 +177,7 @@ mod test {
186177

187178
assert!(matches!(
188179
kv.get(Resource::new_own(rep), "bar".to_owned()).await?,
189-
Err(Error::NoSuchKey)
180+
Ok(None)
190181
));
191182

192183
kv.set(Resource::new_own(rep), "bar".to_owned(), b"baz".to_vec())
@@ -198,16 +189,20 @@ mod test {
198189
);
199190

200191
assert_eq!(
201-
b"baz" as &[_],
202-
&kv.get(Resource::new_own(rep), "bar".to_owned()).await??
192+
Some(b"baz" as &[_]),
193+
kv.get(Resource::new_own(rep), "bar".to_owned())
194+
.await??
195+
.as_deref()
203196
);
204197

205198
kv.set(Resource::new_own(rep), "bar".to_owned(), b"wow".to_vec())
206199
.await??;
207200

208201
assert_eq!(
209-
b"wow" as &[_],
210-
&kv.get(Resource::new_own(rep), "bar".to_owned()).await??
202+
Some(b"wow" as &[_]),
203+
kv.get(Resource::new_own(rep), "bar".to_owned())
204+
.await??
205+
.as_deref()
211206
);
212207

213208
assert_eq!(
@@ -230,16 +225,11 @@ mod test {
230225

231226
assert!(matches!(
232227
kv.get(Resource::new_own(rep), "bar".to_owned()).await?,
233-
Err(Error::NoSuchKey)
228+
Ok(None)
234229
));
235230

236231
kv.drop(Resource::new_own(rep))?;
237232

238-
assert!(matches!(
239-
kv.exists(Resource::new_own(rep), "bar".to_owned()).await?,
240-
Err(Error::InvalidStore)
241-
));
242-
243233
Ok(())
244234
}
245235
}

crates/key-value/src/lib.rs

Lines changed: 22 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use anyhow::Result;
1+
use anyhow::{Context, Result};
22
use spin_app::MetadataKey;
33
use spin_core::{async_trait, wasmtime::component::Resource};
44
use spin_world::v2::key_value;
@@ -25,7 +25,7 @@ pub trait StoreManager: Sync + Send {
2525

2626
#[async_trait]
2727
pub trait Store: Sync + Send {
28-
async fn get(&self, key: &str) -> Result<Vec<u8>, Error>;
28+
async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error>;
2929
async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error>;
3030
async fn delete(&self, key: &str) -> Result<(), Error>;
3131
async fn exists(&self, key: &str) -> Result<bool, Error>;
@@ -55,6 +55,10 @@ impl KeyValueDispatch {
5555
self.allowed_stores = allowed_stores;
5656
self.manager = manager;
5757
}
58+
59+
pub fn get_store(&self, store: Resource<key_value::Store>) -> anyhow::Result<&Arc<dyn Store>> {
60+
self.stores.get(store.rep()).context("invalid store")
61+
}
5862
}
5963

6064
impl Default for KeyValueDispatch {
@@ -87,15 +91,9 @@ impl key_value::HostStore for KeyValueDispatch {
8791
&mut self,
8892
store: Resource<key_value::Store>,
8993
key: String,
90-
) -> Result<Result<Vec<u8>, Error>> {
91-
Ok(async {
92-
self.stores
93-
.get(store.rep())
94-
.ok_or(Error::InvalidStore)?
95-
.get(&key)
96-
.await
97-
}
98-
.await)
94+
) -> Result<Result<Option<Vec<u8>>, Error>> {
95+
let store = self.get_store(store)?;
96+
Ok(store.get(&key).await)
9997
}
10098

10199
async fn set(
@@ -104,58 +102,34 @@ impl key_value::HostStore for KeyValueDispatch {
104102
key: String,
105103
value: Vec<u8>,
106104
) -> Result<Result<(), Error>> {
107-
Ok(async {
108-
self.stores
109-
.get(store.rep())
110-
.ok_or(Error::InvalidStore)?
111-
.set(&key, &value)
112-
.await
113-
}
114-
.await)
105+
let store = self.get_store(store)?;
106+
Ok(store.set(&key, &value).await)
115107
}
116108

117109
async fn delete(
118110
&mut self,
119111
store: Resource<key_value::Store>,
120112
key: String,
121113
) -> Result<Result<(), Error>> {
122-
Ok(async {
123-
self.stores
124-
.get(store.rep())
125-
.ok_or(Error::InvalidStore)?
126-
.delete(&key)
127-
.await
128-
}
129-
.await)
114+
let store = self.get_store(store)?;
115+
Ok(store.delete(&key).await)
130116
}
131117

132118
async fn exists(
133119
&mut self,
134120
store: Resource<key_value::Store>,
135121
key: String,
136122
) -> Result<Result<bool, Error>> {
137-
Ok(async {
138-
self.stores
139-
.get(store.rep())
140-
.ok_or(Error::InvalidStore)?
141-
.exists(&key)
142-
.await
143-
}
144-
.await)
123+
let store = self.get_store(store)?;
124+
Ok(store.exists(&key).await)
145125
}
146126

147127
async fn get_keys(
148128
&mut self,
149129
store: Resource<key_value::Store>,
150130
) -> Result<Result<Vec<String>, Error>> {
151-
Ok(async {
152-
self.stores
153-
.get(store.rep())
154-
.ok_or(Error::InvalidStore)?
155-
.get_keys()
156-
.await
157-
}
158-
.await)
131+
let store = self.get_store(store)?;
132+
Ok(store.get_keys().await)
159133
}
160134

161135
fn drop(&mut self, store: Resource<key_value::Store>) -> Result<()> {
@@ -166,7 +140,7 @@ impl key_value::HostStore for KeyValueDispatch {
166140

167141
pub fn log_error(err: impl std::fmt::Debug) -> Error {
168142
tracing::warn!("key-value error: {err:?}");
169-
Error::Io(format!("{err:?}"))
143+
Error::Other(format!("{err:?}"))
170144
}
171145

172146
use spin_world::v1::key_value::Error as LegacyError;
@@ -176,9 +150,7 @@ fn to_legacy_error(value: key_value::Error) -> LegacyError {
176150
Error::StoreTableFull => LegacyError::StoreTableFull,
177151
Error::NoSuchStore => LegacyError::NoSuchStore,
178152
Error::AccessDenied => LegacyError::AccessDenied,
179-
Error::InvalidStore => LegacyError::InvalidStore,
180-
Error::NoSuchKey => LegacyError::NoSuchKey,
181-
Error::Io(s) => LegacyError::Io(s),
153+
Error::Other(s) => LegacyError::Io(s),
182154
}
183155
}
184156

@@ -192,7 +164,9 @@ impl spin_world::v1::key_value::Host for KeyValueDispatch {
192164
async fn get(&mut self, store: u32, key: String) -> Result<Result<Vec<u8>, LegacyError>> {
193165
let this = Resource::new_borrow(store);
194166
let result = <Self as key_value::HostStore>::get(self, this, key).await?;
195-
Ok(result.map_err(to_legacy_error))
167+
Ok(result
168+
.map_err(to_legacy_error)
169+
.and_then(|v| v.ok_or(LegacyError::NoSuchKey)))
196170
}
197171

198172
async fn set(

crates/key-value/src/util.rs

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ impl CachingStoreState {
123123
if let Some(previous_task) = previous_task {
124124
previous_task
125125
.await
126-
.map_err(|e| Error::Io(format!("{e:?}")))??
126+
.map_err(|e| Error::Other(format!("{e:?}")))??
127127
}
128128

129129
task.await
@@ -134,7 +134,7 @@ impl CachingStoreState {
134134
if let Some(previous_task) = self.previous_task.take() {
135135
previous_task
136136
.await
137-
.map_err(|e| Error::Io(format!("{e:?}")))??
137+
.map_err(|e| Error::Other(format!("{e:?}")))??
138138
}
139139

140140
Ok(())
@@ -148,30 +148,25 @@ struct CachingStore {
148148

149149
#[async_trait]
150150
impl Store for CachingStore {
151-
async fn get(&self, key: &str) -> Result<Vec<u8>, Error> {
151+
async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error> {
152152
// Retrieve the specified value from the cache, lazily populating the cache as necessary.
153153

154154
let mut state = self.state.lock().await;
155155

156156
if let Some(value) = state.cache.get(key).cloned() {
157-
value
158-
} else {
159-
// Flush any outstanding writes prior to reading from store. This is necessary because we need to
160-
// guarantee the guest will read its own writes even if entries have been popped off the end of the LRU
161-
// cache prior to their corresponding writes reaching the backing store.
162-
state.flush().await?;
163-
164-
let value = match self.inner.get(key).await {
165-
Ok(value) => Some(value),
166-
Err(Error::NoSuchKey) => None,
167-
e => return e,
168-
};
169-
170-
state.cache.put(key.to_owned(), value.clone());
171-
172-
value
157+
return Ok(value);
173158
}
174-
.ok_or(Error::NoSuchKey)
159+
160+
// Flush any outstanding writes prior to reading from store. This is necessary because we need to
161+
// guarantee the guest will read its own writes even if entries have been popped off the end of the LRU
162+
// cache prior to their corresponding writes reaching the backing store.
163+
state.flush().await?;
164+
165+
let value = self.inner.get(key).await?;
166+
167+
state.cache.put(key.to_owned(), value.clone());
168+
169+
Ok(value)
175170
}
176171

177172
async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error> {
@@ -204,11 +199,7 @@ impl Store for CachingStore {
204199
}
205200

206201
async fn exists(&self, key: &str) -> Result<bool, Error> {
207-
match self.get(key).await {
208-
Ok(_) => Ok(true),
209-
Err(Error::NoSuchKey) => Ok(false),
210-
Err(e) => Err(e),
211-
}
202+
Ok(self.get(key).await?.is_some())
212203
}
213204

214205
async fn get_keys(&self) -> Result<Vec<String>, Error> {

0 commit comments

Comments
 (0)