Skip to content

Commit a9ff6e7

Browse files
committed
Async FilesystemStore
1 parent c3a4c24 commit a9ff6e7

File tree

4 files changed

+215
-30
lines changed

4 files changed

+215
-30
lines changed

lightning-persister/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ rustdoc-args = ["--cfg", "docsrs"]
1717
bitcoin = "0.32.2"
1818
lightning = { version = "0.2.0", path = "../lightning" }
1919

20+
# TODO: Make conditional?
21+
tokio = { version = "1.35", features = [ "macros", "rt", "rt-multi-thread", "sync", "time", "fs", "io-util" ] }
22+
2023
[target.'cfg(windows)'.dependencies]
2124
windows-sys = { version = "0.48.0", default-features = false, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] }
2225

lightning-persister/src/fs_store.rs

Lines changed: 59 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,10 @@ pub struct FilesystemStore {
3838
data_dir: PathBuf,
3939
tmp_file_counter: AtomicUsize,
4040
gc_counter: AtomicUsize,
41-
locks: Mutex<HashMap<PathBuf, Arc<RwLock<()>>>>,
41+
42+
// Per path lock that ensures that we don't have concurrent writes to the same file. The lock also encapsulates the
43+
// latest written version per key.
44+
locks: Mutex<HashMap<PathBuf, Arc<RwLock<HashMap<String, u64>>>>>,
4245
}
4346

4447
impl FilesystemStore {
@@ -90,36 +93,12 @@ impl FilesystemStore {
9093

9194
Ok(dest_dir_path)
9295
}
93-
}
9496

95-
impl KVStoreSync for FilesystemStore {
96-
fn read(
97-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
98-
) -> lightning::io::Result<Vec<u8>> {
99-
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;
100-
101-
let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
102-
dest_file_path.push(key);
103-
104-
let mut buf = Vec::new();
105-
{
106-
let inner_lock_ref = {
107-
let mut outer_lock = self.locks.lock().unwrap();
108-
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
109-
};
110-
let _guard = inner_lock_ref.read().unwrap();
111-
112-
let mut f = fs::File::open(dest_file_path)?;
113-
f.read_to_end(&mut buf)?;
114-
}
115-
116-
self.garbage_collect_locks();
117-
118-
Ok(buf)
119-
}
120-
121-
fn write(
97+
/// Writes a specific version of a key to the filesystem. If a newer version has been written already, this function
98+
/// returns early without writing.
99+
pub(crate) fn write_version(
122100
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
101+
version: Option<u64>,
123102
) -> lightning::io::Result<()> {
124103
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
125104

@@ -153,7 +132,24 @@ impl KVStoreSync for FilesystemStore {
153132
let mut outer_lock = self.locks.lock().unwrap();
154133
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
155134
};
156-
let _guard = inner_lock_ref.write().unwrap();
135+
let mut guard = inner_lock_ref.write().unwrap();
136+
137+
// If a version is provided, we check if we already have a newer version written. This is used in async
138+
// contexts to realize eventual consistency.
139+
if let Some(version) = version {
140+
match guard.entry(key.to_string()) {
141+
std::collections::hash_map::Entry::Vacant(e) => {
142+
e.insert(version);
143+
},
144+
std::collections::hash_map::Entry::Occupied(mut e) => {
145+
if version <= *e.get() {
146+
// If the version is not greater, we don't write the file.
147+
return Ok(());
148+
}
149+
e.insert(version);
150+
},
151+
}
152+
}
157153

158154
#[cfg(not(target_os = "windows"))]
159155
{
@@ -204,6 +200,39 @@ impl KVStoreSync for FilesystemStore {
204200

205201
res
206202
}
203+
}
204+
205+
impl KVStoreSync for FilesystemStore {
206+
fn read(
207+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
208+
) -> lightning::io::Result<Vec<u8>> {
209+
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;
210+
211+
let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
212+
dest_file_path.push(key);
213+
214+
let mut buf = Vec::new();
215+
{
216+
let inner_lock_ref = {
217+
let mut outer_lock = self.locks.lock().unwrap();
218+
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
219+
};
220+
let _guard = inner_lock_ref.read().unwrap();
221+
222+
let mut f = fs::File::open(dest_file_path)?;
223+
f.read_to_end(&mut buf)?;
224+
}
225+
226+
self.garbage_collect_locks();
227+
228+
Ok(buf)
229+
}
230+
231+
fn write(
232+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
233+
) -> lightning::io::Result<()> {
234+
self.write_version(primary_namespace, secondary_namespace, key, buf, None)
235+
}
207236

208237
fn remove(
209238
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
//! Objects related to [`FilesystemStoreAsync`] live here.
2+
3+
use std::sync::{
4+
atomic::{AtomicU64, Ordering},
5+
Arc,
6+
};
7+
8+
use crate::fs_store::FilesystemStore;
9+
use core::future::Future;
10+
use core::pin::Pin;
11+
use lightning::util::persist::{KVStore, KVStoreSync};
12+
13+
/// An asynchronous extension of FilesystemStore, implementing the `KVStore` trait for async operations. It is shaped as
14+
/// a wrapper around an existing [`FilesystemStore`] so that the same locks are used. This allows both the sync and
15+
/// async interface to be used simultaneously.
16+
pub struct FilesystemStoreAsync {
17+
inner: Arc<FilesystemStore>,
18+
version_counter: Arc<AtomicU64>,
19+
}
20+
21+
impl FilesystemStoreAsync {
22+
/// Creates a new instance of [`FilesystemStoreAsync`].
23+
pub fn new(inner: Arc<FilesystemStore>) -> Self {
24+
Self { inner, version_counter: Arc::new(AtomicU64::new(0)) }
25+
}
26+
}
27+
28+
impl KVStore for FilesystemStoreAsync {
29+
fn read(
30+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
31+
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, lightning::io::Error>> + 'static + Send>> {
32+
let primary_namespace = primary_namespace.to_string();
33+
let secondary_namespace = secondary_namespace.to_string();
34+
let key = key.to_string();
35+
let this = Arc::clone(&self.inner);
36+
37+
Box::pin(async move {
38+
tokio::task::spawn_blocking(move || {
39+
this.read(&primary_namespace, &secondary_namespace, &key)
40+
})
41+
.await
42+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
43+
})
44+
}
45+
46+
fn write(
47+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
48+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
49+
let primary_namespace = primary_namespace.to_string();
50+
let secondary_namespace = secondary_namespace.to_string();
51+
let key = key.to_string();
52+
let buf = buf.to_vec();
53+
let this = Arc::clone(&self.inner);
54+
55+
// Obtain a version number to retain the call sequence.
56+
let version = self.version_counter.fetch_add(1, Ordering::SeqCst);
57+
58+
Box::pin(async move {
59+
tokio::task::spawn_blocking(move || {
60+
this.write_version(
61+
&primary_namespace,
62+
&secondary_namespace,
63+
&key,
64+
&buf,
65+
Some(version),
66+
)
67+
})
68+
.await
69+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
70+
})
71+
}
72+
73+
fn remove(
74+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
75+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
76+
let primary_namespace = primary_namespace.to_string();
77+
let secondary_namespace = secondary_namespace.to_string();
78+
let key = key.to_string();
79+
let this = Arc::clone(&self.inner);
80+
81+
Box::pin(async move {
82+
tokio::task::spawn_blocking(move || {
83+
this.remove(&primary_namespace, &secondary_namespace, &key, lazy)
84+
})
85+
.await
86+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
87+
})
88+
}
89+
90+
fn list(
91+
&self, primary_namespace: &str, secondary_namespace: &str,
92+
) -> Pin<Box<dyn Future<Output = Result<Vec<String>, lightning::io::Error>> + 'static + Send>> {
93+
let primary_namespace = primary_namespace.to_string();
94+
let secondary_namespace = secondary_namespace.to_string();
95+
let this = Arc::clone(&self.inner);
96+
97+
Box::pin(async move {
98+
tokio::task::spawn_blocking(move || this.list(&primary_namespace, &secondary_namespace))
99+
.await
100+
.unwrap_or_else(|e| {
101+
Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))
102+
})
103+
})
104+
}
105+
}
106+
107+
mod test {
108+
use crate::{fs_store::FilesystemStore, fs_store_async::FilesystemStoreAsync};
109+
use lightning::util::persist::KVStore;
110+
use std::sync::Arc;
111+
112+
#[tokio::test]
113+
async fn read_write_remove_list_persist() {
114+
let mut temp_path = std::env::temp_dir();
115+
temp_path.push("test_read_write_remove_list_persist");
116+
let fs_store = Arc::new(FilesystemStore::new(temp_path));
117+
let fs_store_async = FilesystemStoreAsync::new(Arc::clone(&fs_store));
118+
119+
let data1 = [42u8; 32];
120+
let data2 = [43u8; 32];
121+
122+
let primary_namespace = "testspace";
123+
let secondary_namespace = "testsubspace";
124+
let key = "testkey";
125+
126+
// Test writing the same key twice with different data. Execute the asynchronous part out of order to ensure
127+
// that eventual consistency works.
128+
let fut1 = fs_store_async.write(primary_namespace, secondary_namespace, key, &data1);
129+
let fut2 = fs_store_async.write(primary_namespace, secondary_namespace, key, &data2);
130+
131+
fut2.await.unwrap();
132+
fut1.await.unwrap();
133+
134+
// Test list.
135+
let listed_keys =
136+
fs_store_async.list(primary_namespace, secondary_namespace).await.unwrap();
137+
assert_eq!(listed_keys.len(), 1);
138+
assert_eq!(listed_keys[0], key);
139+
140+
// Test read. We expect to read data2, as the write call was initiated later.
141+
let read_data =
142+
fs_store_async.read(primary_namespace, secondary_namespace, key).await.unwrap();
143+
assert_eq!(data2, &*read_data);
144+
145+
// Test remove.
146+
fs_store_async.remove(primary_namespace, secondary_namespace, key, false).await.unwrap();
147+
148+
let listed_keys =
149+
fs_store_async.list(primary_namespace, secondary_namespace).await.unwrap();
150+
assert_eq!(listed_keys.len(), 0);
151+
}
152+
}

lightning-persister/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
extern crate criterion;
1010

1111
pub mod fs_store;
12+
pub mod fs_store_async;
1213

1314
mod utils;
1415

0 commit comments

Comments
 (0)