Skip to content

Commit 496552b

Browse files
committed
Implement KVStore for SqliteStore
1 parent c0473cc commit 496552b

File tree

1 file changed

+248
-53
lines changed

1 file changed

+248
-53
lines changed

src/io/sqlite_store/mod.rs

Lines changed: 248 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,17 @@
66
// accordance with one or both of these licenses.
77

88
//! Objects related to [`SqliteStore`] live here.
9+
use std::boxed::Box;
10+
use std::collections::HashMap;
911
use std::fs;
12+
use std::future::Future;
1013
use std::path::PathBuf;
14+
use std::pin::Pin;
15+
use std::sync::atomic::{AtomicU64, Ordering};
1116
use std::sync::{Arc, Mutex};
1217

1318
use lightning::io;
14-
use lightning::util::persist::KVStoreSync;
19+
use lightning::util::persist::{KVStore, KVStoreSync};
1520
use lightning_types::string::PrintableString;
1621
use rusqlite::{named_params, Connection};
1722

@@ -38,6 +43,10 @@ const SCHEMA_USER_VERSION: u16 = 2;
3843
/// [SQLite]: https://sqlite.org
3944
pub struct SqliteStore {
4045
inner: Arc<SqliteStoreInner>,
46+
47+
// Version counter to ensure that writes are applied in the correct order. It is assumed that read and list
48+
// operations aren't sensitive to the order of execution.
49+
next_write_version: AtomicU64,
4150
}
4251

4352
impl SqliteStore {
@@ -51,7 +60,27 @@ impl SqliteStore {
5160
data_dir: PathBuf, db_file_name: Option<String>, kv_table_name: Option<String>,
5261
) -> io::Result<Self> {
5362
let inner = Arc::new(SqliteStoreInner::new(data_dir, db_file_name, kv_table_name)?);
54-
Ok(Self { inner })
63+
let next_write_version = AtomicU64::new(1);
64+
Ok(Self { inner, next_write_version })
65+
}
66+
67+
fn build_locking_key(
68+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
69+
) -> String {
70+
format!("{}#{}#{}", primary_namespace, secondary_namespace, key)
71+
}
72+
73+
fn get_new_version_and_lock_ref(&self, locking_key: String) -> (Arc<Mutex<u64>>, u64) {
74+
let version = self.next_write_version.fetch_add(1, Ordering::Relaxed);
75+
if version == u64::MAX {
76+
panic!("SqliteStore version counter overflowed");
77+
}
78+
79+
// Get a reference to the inner lock. We do this early so that the arc can double as an in-flight counter for
80+
// cleaning up unused locks.
81+
let inner_lock_ref = self.inner.get_inner_lock_ref(locking_key);
82+
83+
(inner_lock_ref, version)
5584
}
5685

5786
/// Returns the data directory.
@@ -60,6 +89,99 @@ impl SqliteStore {
6089
}
6190
}
6291

92+
impl KVStore for SqliteStore {
93+
fn read(
94+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
95+
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + Send>> {
96+
let primary_namespace = primary_namespace.to_string();
97+
let secondary_namespace = secondary_namespace.to_string();
98+
let key = key.to_string();
99+
let inner = Arc::clone(&self.inner);
100+
let fut = tokio::task::spawn_blocking(move || {
101+
inner.read_internal(&primary_namespace, &secondary_namespace, &key)
102+
});
103+
Box::pin(async move {
104+
fut.await.unwrap_or_else(|e| {
105+
let msg = format!("Failed to IO operation due join error: {}", e);
106+
Err(io::Error::new(io::ErrorKind::Other, msg))
107+
})
108+
})
109+
}
110+
111+
fn write(
112+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
113+
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + Send>> {
114+
let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
115+
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
116+
let primary_namespace = primary_namespace.to_string();
117+
let secondary_namespace = secondary_namespace.to_string();
118+
let key = key.to_string();
119+
let inner = Arc::clone(&self.inner);
120+
let fut = tokio::task::spawn_blocking(move || {
121+
inner.write_internal(
122+
inner_lock_ref,
123+
locking_key,
124+
version,
125+
&primary_namespace,
126+
&secondary_namespace,
127+
&key,
128+
buf,
129+
)
130+
});
131+
Box::pin(async move {
132+
fut.await.unwrap_or_else(|e| {
133+
let msg = format!("Failed to IO operation due join error: {}", e);
134+
Err(io::Error::new(io::ErrorKind::Other, msg))
135+
})
136+
})
137+
}
138+
139+
fn remove(
140+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
141+
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + Send>> {
142+
let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
143+
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
144+
let primary_namespace = primary_namespace.to_string();
145+
let secondary_namespace = secondary_namespace.to_string();
146+
let key = key.to_string();
147+
let inner = Arc::clone(&self.inner);
148+
let fut = tokio::task::spawn_blocking(move || {
149+
inner.remove_internal(
150+
inner_lock_ref,
151+
locking_key,
152+
version,
153+
&primary_namespace,
154+
&secondary_namespace,
155+
&key,
156+
lazy,
157+
)
158+
});
159+
Box::pin(async move {
160+
fut.await.unwrap_or_else(|e| {
161+
let msg = format!("Failed to IO operation due join error: {}", e);
162+
Err(io::Error::new(io::ErrorKind::Other, msg))
163+
})
164+
})
165+
}
166+
167+
fn list(
168+
&self, primary_namespace: &str, secondary_namespace: &str,
169+
) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + Send>> {
170+
let primary_namespace = primary_namespace.to_string();
171+
let secondary_namespace = secondary_namespace.to_string();
172+
let inner = Arc::clone(&self.inner);
173+
let fut = tokio::task::spawn_blocking(move || {
174+
inner.list_internal(&primary_namespace, &secondary_namespace)
175+
});
176+
Box::pin(async move {
177+
fut.await.unwrap_or_else(|e| {
178+
let msg = format!("Failed to IO operation due join error: {}", e);
179+
Err(io::Error::new(io::ErrorKind::Other, msg))
180+
})
181+
})
182+
}
183+
}
184+
63185
impl KVStoreSync for SqliteStore {
64186
fn read(
65187
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
@@ -70,13 +192,33 @@ impl KVStoreSync for SqliteStore {
70192
fn write(
71193
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
72194
) -> io::Result<()> {
73-
self.inner.write_internal(primary_namespace, secondary_namespace, key, buf)
195+
let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
196+
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
197+
self.inner.write_internal(
198+
inner_lock_ref,
199+
locking_key,
200+
version,
201+
primary_namespace,
202+
secondary_namespace,
203+
key,
204+
buf,
205+
)
74206
}
75207

76208
fn remove(
77209
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
78210
) -> io::Result<()> {
79-
self.inner.remove_internal(primary_namespace, secondary_namespace, key, lazy)
211+
let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
212+
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
213+
self.inner.remove_internal(
214+
inner_lock_ref,
215+
locking_key,
216+
version,
217+
primary_namespace,
218+
secondary_namespace,
219+
key,
220+
lazy,
221+
)
80222
}
81223

82224
fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> {
@@ -88,6 +230,7 @@ struct SqliteStoreInner {
88230
connection: Arc<Mutex<Connection>>,
89231
data_dir: PathBuf,
90232
kv_table_name: String,
233+
write_version_locks: Mutex<HashMap<String, Arc<Mutex<u64>>>>,
91234
}
92235

93236
impl SqliteStoreInner {
@@ -161,7 +304,13 @@ impl SqliteStoreInner {
161304
})?;
162305

163306
let connection = Arc::new(Mutex::new(connection));
164-
Ok(Self { connection, data_dir, kv_table_name })
307+
let write_version_locks = Mutex::new(HashMap::new());
308+
Ok(Self { connection, data_dir, kv_table_name, write_version_locks })
309+
}
310+
311+
fn get_inner_lock_ref(&self, locking_key: String) -> Arc<Mutex<u64>> {
312+
let mut outer_lock = self.write_version_locks.lock().unwrap();
313+
Arc::clone(&outer_lock.entry(locking_key).or_default())
165314
}
166315

167316
fn read_internal(
@@ -213,71 +362,77 @@ impl SqliteStoreInner {
213362
}
214363

215364
fn write_internal(
216-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
365+
&self, inner_lock_ref: Arc<Mutex<u64>>, locking_key: String, version: u64,
366+
primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
217367
) -> io::Result<()> {
218368
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
219369

220-
let locked_conn = self.connection.lock().unwrap();
370+
self.execute_locked_write(inner_lock_ref, locking_key, version, || {
371+
let locked_conn = self.connection.lock().unwrap();
221372

222-
let sql = format!(
223-
"INSERT OR REPLACE INTO {} (primary_namespace, secondary_namespace, key, value) VALUES (:primary_namespace, :secondary_namespace, :key, :value);",
224-
self.kv_table_name
225-
);
373+
let sql = format!(
374+
"INSERT OR REPLACE INTO {} (primary_namespace, secondary_namespace, key, value) VALUES (:primary_namespace, :secondary_namespace, :key, :value);",
375+
self.kv_table_name
376+
);
226377

227-
let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
228-
let msg = format!("Failed to prepare statement: {}", e);
229-
io::Error::new(io::ErrorKind::Other, msg)
230-
})?;
378+
let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
379+
let msg = format!("Failed to prepare statement: {}", e);
380+
io::Error::new(io::ErrorKind::Other, msg)
381+
})?;
231382

232-
stmt.execute(named_params! {
233-
":primary_namespace": primary_namespace,
234-
":secondary_namespace": secondary_namespace,
235-
":key": key,
236-
":value": buf,
237-
})
238-
.map(|_| ())
239-
.map_err(|e| {
240-
let msg = format!(
241-
"Failed to write to key {}/{}/{}: {}",
242-
PrintableString(primary_namespace),
243-
PrintableString(secondary_namespace),
244-
PrintableString(key),
245-
e
246-
);
247-
io::Error::new(io::ErrorKind::Other, msg)
383+
stmt.execute(named_params! {
384+
":primary_namespace": primary_namespace,
385+
":secondary_namespace": secondary_namespace,
386+
":key": key,
387+
":value": buf,
388+
})
389+
.map(|_| ())
390+
.map_err(|e| {
391+
let msg = format!(
392+
"Failed to write to key {}/{}/{}: {}",
393+
PrintableString(primary_namespace),
394+
PrintableString(secondary_namespace),
395+
PrintableString(key),
396+
e
397+
);
398+
io::Error::new(io::ErrorKind::Other, msg)
399+
})
248400
})
249401
}
250402

251403
fn remove_internal(
252-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
404+
&self, inner_lock_ref: Arc<Mutex<u64>>, locking_key: String, version: u64,
405+
primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
253406
) -> io::Result<()> {
254407
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?;
255408

256-
let locked_conn = self.connection.lock().unwrap();
409+
self.execute_locked_write(inner_lock_ref, locking_key, version, || {
410+
let locked_conn = self.connection.lock().unwrap();
257411

258-
let sql = format!("DELETE FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;", self.kv_table_name);
412+
let sql = format!("DELETE FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;", self.kv_table_name);
259413

260-
let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
261-
let msg = format!("Failed to prepare statement: {}", e);
262-
io::Error::new(io::ErrorKind::Other, msg)
263-
})?;
414+
let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
415+
let msg = format!("Failed to prepare statement: {}", e);
416+
io::Error::new(io::ErrorKind::Other, msg)
417+
})?;
264418

265-
stmt.execute(named_params! {
266-
":primary_namespace": primary_namespace,
267-
":secondary_namespace": secondary_namespace,
268-
":key": key,
419+
stmt.execute(named_params! {
420+
":primary_namespace": primary_namespace,
421+
":secondary_namespace": secondary_namespace,
422+
":key": key,
423+
})
424+
.map_err(|e| {
425+
let msg = format!(
426+
"Failed to delete key {}/{}/{}: {}",
427+
PrintableString(primary_namespace),
428+
PrintableString(secondary_namespace),
429+
PrintableString(key),
430+
e
431+
);
432+
io::Error::new(io::ErrorKind::Other, msg)
433+
})?;
434+
Ok(())
269435
})
270-
.map_err(|e| {
271-
let msg = format!(
272-
"Failed to delete key {}/{}/{}: {}",
273-
PrintableString(primary_namespace),
274-
PrintableString(secondary_namespace),
275-
PrintableString(key),
276-
e
277-
);
278-
io::Error::new(io::ErrorKind::Other, msg)
279-
})?;
280-
Ok(())
281436
}
282437

283438
fn list_internal(
@@ -320,6 +475,46 @@ impl SqliteStoreInner {
320475

321476
Ok(keys)
322477
}
478+
479+
fn execute_locked_write<F: FnOnce() -> Result<(), lightning::io::Error>>(
480+
&self, inner_lock_ref: Arc<Mutex<u64>>, locking_key: String, version: u64, callback: F,
481+
) -> Result<(), lightning::io::Error> {
482+
let res = {
483+
let mut last_written_version = inner_lock_ref.lock().unwrap();
484+
485+
// Check if we already have a newer version written/removed. This is used in async contexts to realize eventual
486+
// consistency.
487+
let is_stale_version = version <= *last_written_version;
488+
489+
// If the version is not stale, we execute the callback. Otherwise we can and must skip writing.
490+
if is_stale_version {
491+
Ok(())
492+
} else {
493+
callback().map(|_| {
494+
*last_written_version = version;
495+
})
496+
}
497+
};
498+
499+
self.clean_locks(&inner_lock_ref, locking_key);
500+
501+
res
502+
}
503+
504+
fn clean_locks(&self, inner_lock_ref: &Arc<Mutex<u64>>, locking_key: String) {
505+
// If there no arcs in use elsewhere, this means that there are no in-flight writes. We can remove the map entry
506+
// to prevent leaking memory. The two arcs that are expected are the one in the map and the one held here in
507+
// inner_lock_ref. The outer lock is obtained first, to avoid a new arc being cloned after we've already
508+
// counted.
509+
let mut outer_lock = self.write_version_locks.lock().unwrap();
510+
511+
let strong_count = Arc::strong_count(&inner_lock_ref);
512+
debug_assert!(strong_count >= 2, "Unexpected SqliteStore strong count");
513+
514+
if strong_count == 2 {
515+
outer_lock.remove(&locking_key);
516+
}
517+
}
323518
}
324519

325520
#[cfg(test)]

0 commit comments

Comments
 (0)