Skip to content

Commit ba4f3e0

Browse files
committed
implement kv 0.2.0draft2 for sqlite
Signed-off-by: David Justice <[email protected]>
1 parent 5ee121a commit ba4f3e0

File tree

2 files changed

+121
-6
lines changed

2 files changed

+121
-6
lines changed

crates/key-value-spin/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ edition = { workspace = true }
66

77
[dependencies]
88
anyhow = { workspace = true }
9-
rusqlite = { version = "0.32", features = ["bundled"] }
9+
rusqlite = { version = "0.32", features = ["bundled", "array"] }
1010
serde = { workspace = true }
1111
spin-core = { path = "../core" }
1212
spin-factor-key-value = { path = "../factor-key-value" }

crates/key-value-spin/src/store.rs

Lines changed: 120 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use anyhow::Result;
22
use rusqlite::Connection;
33
use spin_core::async_trait;
44
use spin_factor_key_value::{log_error, Cas, Error, Store, StoreManager};
5+
use std::rc::Rc;
56
use std::{
67
path::PathBuf,
78
sync::OnceLock,
@@ -158,23 +159,137 @@ impl Store for SqliteStore {
158159
}
159160

160161
async fn get_many(&self, keys: Vec<String>) -> Result<Vec<Option<(String, Vec<u8>)>>, Error> {
161-
todo!()
162+
task::block_in_place(|| {
163+
let sql_value_keys: Vec<rusqlite::types::Value> =
164+
keys.into_iter().map(rusqlite::types::Value::from).collect();
165+
let ptr = Rc::new(sql_value_keys);
166+
let row_iter: Vec<Result<(String, Vec<u8>), Error>> = self.connection
167+
.lock()
168+
.unwrap()
169+
.prepare_cached("SELECT key, value FROM spin_key_value WHERE store=:name AND key IN rarray(:keys)")
170+
.map_err(log_error)?
171+
.query_map((":name", &self.name, ":keys", ptr), |row| {
172+
<(String, Vec<u8>)>::try_from(row)
173+
})
174+
.map_err(log_error)?
175+
.map(|r: Result<(String, Vec<u8>), rusqlite::Error>| r.map_err(log_error))
176+
.collect();
177+
178+
let mut keys_and_values: Vec<Option<(String, Vec<u8>)>> = Vec::new();
179+
for row in row_iter {
180+
let res = row.map_err(log_error)?;
181+
keys_and_values.push(Some(res));
182+
}
183+
Ok(keys_and_values)
184+
})
162185
}
163186

164187
async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> Result<(), Error> {
165-
todo!()
188+
task::block_in_place(|| {
189+
let mut binding = self.connection.lock().unwrap();
190+
let tx = binding.transaction().map_err(log_error)?;
191+
for kv in key_values {
192+
tx.prepare_cached(
193+
"INSERT INTO spin_key_value (store, key, value) VALUES ($1, $2, $3)
194+
ON CONFLICT(store, key) DO UPDATE SET value=$3",
195+
)
196+
.map_err(log_error)?
197+
.execute(rusqlite::params![&self.name, kv.0, kv.1])
198+
.map_err(log_error)
199+
.map(drop)?;
200+
}
201+
tx.commit().map_err(log_error)
202+
})
166203
}
167204

168205
async fn delete_many(&self, keys: Vec<String>) -> Result<(), Error> {
169-
todo!()
206+
task::block_in_place(|| {
207+
let sql_value_keys: Vec<rusqlite::types::Value> =
208+
keys.into_iter().map(rusqlite::types::Value::from).collect();
209+
let ptr = Rc::new(sql_value_keys);
210+
self.connection
211+
.lock()
212+
.unwrap()
213+
.prepare_cached("DELETE FROM spin_key_value WHERE store=:name AND key IN (:keys)")
214+
.map_err(log_error)?
215+
.execute((":name", &self.name, ":keys", ptr))
216+
.map_err(log_error)
217+
.map(drop)
218+
})
170219
}
171220

221+
// The assumption with increment is that if the value for the key does not exist, it will be
222+
// assumed to be zero. In the case that we are unable to unmarshal the value into an i64 an error will be returned.
172223
async fn increment(&self, key: String, delta: i64) -> Result<i64, Error> {
173-
todo!()
224+
task::block_in_place(|| {
225+
let mut binding = self.connection.lock().unwrap();
226+
227+
let tx = binding.transaction().map_err(log_error)?;
228+
229+
let value: Option<Vec<u8>> = tx
230+
.prepare_cached("SELECT value FROM spin_key_value WHERE store=$1 AND key=$2")
231+
.map_err(log_error)?
232+
.query_map([&self.name, &key], |row| row.get(0))
233+
.map_err(log_error)?
234+
.next()
235+
.transpose()
236+
.map_err(log_error)?;
237+
238+
let numeric: i64 = match value {
239+
Some(v) => i64::from_be_bytes(v.try_into().expect("incorrect length")),
240+
None => 0,
241+
};
242+
243+
let new_value = numeric + delta;
244+
tx.prepare_cached(
245+
"INSERT INTO spin_key_value (store, key, value) VALUES ($1, $2, $3)
246+
ON CONFLICT(store, key) DO UPDATE SET value=$3",
247+
)
248+
.map_err(log_error)?
249+
.execute(rusqlite::params![&self.name, key, new_value])
250+
.map_err(log_error)
251+
.map(drop)?;
252+
253+
tx.commit().map_err(log_error)?;
254+
Ok(new_value)
255+
})
174256
}
175257

176258
async fn new_compare_and_swap(&self, key: &str) -> Result<Arc<dyn Cas>, Error> {
177-
todo!()
259+
let value = self.get(key).await?;
260+
Ok(Arc::new(CompareAndSwap {
261+
name: self.name.clone(),
262+
key: key.to_string(),
263+
connection: self.connection.clone(),
264+
value,
265+
}))
266+
}
267+
}
268+
269+
struct CompareAndSwap {
270+
name: String,
271+
key: String,
272+
value: Option<Vec<u8>>,
273+
connection: Arc<Mutex<Connection>>,
274+
}
275+
276+
#[async_trait]
277+
impl Cas for CompareAndSwap {
278+
async fn current(&self) -> Result<Option<Vec<u8>>, Error> {
279+
Ok(self.value.clone())
280+
}
281+
282+
async fn swap(&self, value: Vec<u8>) -> Result<(), Error> {
283+
task::block_in_place(|| {
284+
self.connection
285+
.lock()
286+
.unwrap()
287+
.prepare_cached("UPDATE spin_key_value SET value=$3 WHERE store=$1 and key=$2")
288+
.map_err(log_error)?
289+
.execute(rusqlite::params![&self.name, self.key, value])
290+
.map_err(log_error)
291+
.map(drop)
292+
})
178293
}
179294
}
180295

0 commit comments

Comments
 (0)