Skip to content

Commit 1cea3ba

Browse files
committed
f Use versioned write for SqliteStore
1 parent 082c588 commit 1cea3ba

File tree

1 file changed

+156
-33
lines changed

1 file changed

+156
-33
lines changed

src/io/sqlite_store/mod.rs

Lines changed: 156 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@
77

88
//! Objects related to [`SqliteStore`] live here.
99
use std::boxed::Box;
10+
use std::collections::HashMap;
1011
use std::fs;
1112
use std::future::Future;
1213
use std::path::PathBuf;
1314
use std::pin::Pin;
15+
use std::sync::atomic::{AtomicU64, Ordering};
1416
use std::sync::{Arc, Mutex};
1517

1618
use lightning::io;
@@ -41,6 +43,10 @@ const SCHEMA_USER_VERSION: u16 = 2;
4143
/// [SQLite]: https://sqlite.org
4244
pub struct SqliteStore {
4345
inner: Arc<SqliteStoreInner>,
46+
47+
// Version counter to ensure that writes are applied in the correct order. It is assumed that read and list
48+
// operations aren't sensitive to the order of execution.
49+
next_write_version: AtomicU64,
4450
}
4551

4652
impl SqliteStore {
@@ -54,7 +60,31 @@ impl SqliteStore {
5460
data_dir: PathBuf, db_file_name: Option<String>, kv_table_name: Option<String>,
5561
) -> io::Result<Self> {
5662
let inner = Arc::new(SqliteStoreInner::new(data_dir, db_file_name, kv_table_name)?);
57-
Ok(Self { inner })
63+
let next_write_version = AtomicU64::new(1);
64+
Ok(Self { inner, next_write_version })
65+
}
66+
67+
fn build_locking_key(
68+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
69+
) -> String {
70+
if primary_namespace.is_empty() {
71+
key.to_owned()
72+
} else {
73+
format!("{}#{}#{}", primary_namespace, secondary_namespace, key)
74+
}
75+
}
76+
77+
fn get_new_version_and_lock_ref(&self, locking_key: String) -> (Arc<Mutex<u64>>, u64) {
78+
let version = self.next_write_version.fetch_add(1, Ordering::Relaxed);
79+
if version == u64::MAX {
80+
panic!("SqliteStore version counter overflowed");
81+
}
82+
83+
// Get a reference to the inner lock. We do this early so that the arc can double as an in-flight counter for
84+
// cleaning up unused locks.
85+
let inner_lock_ref = self.inner.get_inner_lock_ref(locking_key);
86+
87+
(inner_lock_ref, version)
5888
}
5989

6090
/// Returns the data directory.
@@ -85,12 +115,22 @@ impl KVStore for SqliteStore {
85115
fn write(
86116
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
87117
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + Send>> {
118+
let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
119+
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
88120
let primary_namespace = primary_namespace.to_string();
89121
let secondary_namespace = secondary_namespace.to_string();
90122
let key = key.to_string();
91123
let inner = Arc::clone(&self.inner);
92124
let fut = tokio::task::spawn_blocking(move || {
93-
inner.write_internal(&primary_namespace, &secondary_namespace, &key, buf)
125+
inner.write_internal(
126+
inner_lock_ref,
127+
locking_key,
128+
version,
129+
&primary_namespace,
130+
&secondary_namespace,
131+
&key,
132+
buf,
133+
)
94134
});
95135
Box::pin(async move {
96136
fut.await.unwrap_or_else(|e| {
@@ -103,12 +143,22 @@ impl KVStore for SqliteStore {
103143
fn remove(
104144
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
105145
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + Send>> {
146+
let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
147+
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
106148
let primary_namespace = primary_namespace.to_string();
107149
let secondary_namespace = secondary_namespace.to_string();
108150
let key = key.to_string();
109151
let inner = Arc::clone(&self.inner);
110152
let fut = tokio::task::spawn_blocking(move || {
111-
inner.remove_internal(&primary_namespace, &secondary_namespace, &key, lazy)
153+
inner.remove_internal(
154+
inner_lock_ref,
155+
locking_key,
156+
version,
157+
&primary_namespace,
158+
&secondary_namespace,
159+
&key,
160+
lazy,
161+
)
112162
});
113163
Box::pin(async move {
114164
fut.await.unwrap_or_else(|e| {
@@ -146,13 +196,33 @@ impl KVStoreSync for SqliteStore {
146196
fn write(
147197
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
148198
) -> io::Result<()> {
149-
self.inner.write_internal(primary_namespace, secondary_namespace, key, buf)
199+
let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
200+
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
201+
self.inner.write_internal(
202+
inner_lock_ref,
203+
locking_key,
204+
version,
205+
primary_namespace,
206+
secondary_namespace,
207+
key,
208+
buf,
209+
)
150210
}
151211

152212
fn remove(
153213
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
154214
) -> io::Result<()> {
155-
self.inner.remove_internal(primary_namespace, secondary_namespace, key, lazy)
215+
let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
216+
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
217+
self.inner.remove_internal(
218+
inner_lock_ref,
219+
locking_key,
220+
version,
221+
primary_namespace,
222+
secondary_namespace,
223+
key,
224+
lazy,
225+
)
156226
}
157227

158228
fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> {
@@ -164,6 +234,7 @@ struct SqliteStoreInner {
164234
connection: Arc<Mutex<Connection>>,
165235
data_dir: PathBuf,
166236
kv_table_name: String,
237+
write_version_locks: Mutex<HashMap<String, Arc<Mutex<u64>>>>,
167238
}
168239

169240
impl SqliteStoreInner {
@@ -237,7 +308,13 @@ impl SqliteStoreInner {
237308
})?;
238309

239310
let connection = Arc::new(Mutex::new(connection));
240-
Ok(Self { connection, data_dir, kv_table_name })
311+
let write_version_locks = Mutex::new(HashMap::new());
312+
Ok(Self { connection, data_dir, kv_table_name, write_version_locks })
313+
}
314+
315+
fn get_inner_lock_ref(&self, locking_key: String) -> Arc<Mutex<u64>> {
316+
let mut outer_lock = self.write_version_locks.lock().unwrap();
317+
Arc::clone(&outer_lock.entry(locking_key).or_default())
241318
}
242319

243320
fn read_internal(
@@ -289,46 +366,51 @@ impl SqliteStoreInner {
289366
}
290367

291368
fn write_internal(
292-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
369+
&self, inner_lock_ref: Arc<Mutex<u64>>, locking_key: String, version: u64,
370+
primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
293371
) -> io::Result<()> {
294372
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
295373

296-
let locked_conn = self.connection.lock().unwrap();
374+
self.execute_locked_write(inner_lock_ref, locking_key, version, || {
375+
let locked_conn = self.connection.lock().unwrap();
297376

298-
let sql = format!(
299-
"INSERT OR REPLACE INTO {} (primary_namespace, secondary_namespace, key, value) VALUES (:primary_namespace, :secondary_namespace, :key, :value);",
300-
self.kv_table_name
301-
);
377+
let sql = format!(
378+
"INSERT OR REPLACE INTO {} (primary_namespace, secondary_namespace, key, value) VALUES (:primary_namespace, :secondary_namespace, :key, :value);",
379+
self.kv_table_name
380+
);
302381

303-
let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
304-
let msg = format!("Failed to prepare statement: {}", e);
305-
io::Error::new(io::ErrorKind::Other, msg)
306-
})?;
382+
let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
383+
let msg = format!("Failed to prepare statement: {}", e);
384+
io::Error::new(io::ErrorKind::Other, msg)
385+
})?;
307386

308-
stmt.execute(named_params! {
309-
":primary_namespace": primary_namespace,
310-
":secondary_namespace": secondary_namespace,
311-
":key": key,
312-
":value": buf,
313-
})
314-
.map(|_| ())
315-
.map_err(|e| {
316-
let msg = format!(
317-
"Failed to write to key {}/{}/{}: {}",
318-
PrintableString(primary_namespace),
319-
PrintableString(secondary_namespace),
320-
PrintableString(key),
321-
e
322-
);
323-
io::Error::new(io::ErrorKind::Other, msg)
387+
stmt.execute(named_params! {
388+
":primary_namespace": primary_namespace,
389+
":secondary_namespace": secondary_namespace,
390+
":key": key,
391+
":value": buf,
392+
})
393+
.map(|_| ())
394+
.map_err(|e| {
395+
let msg = format!(
396+
"Failed to write to key {}/{}/{}: {}",
397+
PrintableString(primary_namespace),
398+
PrintableString(secondary_namespace),
399+
PrintableString(key),
400+
e
401+
);
402+
io::Error::new(io::ErrorKind::Other, msg)
403+
})
324404
})
325405
}
326406

327407
fn remove_internal(
328-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
408+
&self, inner_lock_ref: Arc<Mutex<u64>>, locking_key: String, version: u64,
409+
primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool,
329410
) -> io::Result<()> {
330411
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?;
331412

413+
self.execute_locked_write(inner_lock_ref, locking_key, version, || {
332414
let locked_conn = self.connection.lock().unwrap();
333415

334416
let sql = format!("DELETE FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;", self.kv_table_name);
@@ -354,6 +436,7 @@ impl SqliteStoreInner {
354436
io::Error::new(io::ErrorKind::Other, msg)
355437
})?;
356438
Ok(())
439+
})
357440
}
358441

359442
fn list_internal(
@@ -396,6 +479,46 @@ impl SqliteStoreInner {
396479

397480
Ok(keys)
398481
}
482+
483+
fn execute_locked_write<F: FnOnce() -> Result<(), lightning::io::Error>>(
484+
&self, inner_lock_ref: Arc<Mutex<u64>>, locking_key: String, version: u64, callback: F,
485+
) -> Result<(), lightning::io::Error> {
486+
let res = {
487+
let mut last_written_version = inner_lock_ref.lock().unwrap();
488+
489+
// Check if we already have a newer version written/removed. This is used in async contexts to realize eventual
490+
// consistency.
491+
let is_stale_version = version <= *last_written_version;
492+
493+
// If the version is not stale, we execute the callback. Otherwise we can and must skip writing.
494+
if is_stale_version {
495+
Ok(())
496+
} else {
497+
callback().map(|_| {
498+
*last_written_version = version;
499+
})
500+
}
501+
};
502+
503+
self.clean_locks(&inner_lock_ref, locking_key);
504+
505+
res
506+
}
507+
508+
fn clean_locks(&self, inner_lock_ref: &Arc<Mutex<u64>>, locking_key: String) {
509+
// If there no arcs in use elsewhere, this means that there are no in-flight writes. We can remove the map entry
510+
// to prevent leaking memory. The two arcs that are expected are the one in the map and the one held here in
511+
// inner_lock_ref. The outer lock is obtained first, to avoid a new arc being cloned after we've already
512+
// counted.
513+
let mut outer_lock = self.write_version_locks.lock().unwrap();
514+
515+
let strong_count = Arc::strong_count(&inner_lock_ref);
516+
debug_assert!(strong_count >= 2, "Unexpected SqliteStore strong count");
517+
518+
if strong_count == 2 {
519+
outer_lock.remove(&locking_key);
520+
}
521+
}
399522
}
400523

401524
#[cfg(test)]

0 commit comments

Comments
 (0)