Skip to content

Commit 48d0c31

Browse files
committed
Async FilesystemStore
1 parent c3a4c24 commit 48d0c31

File tree

4 files changed

+238
-30
lines changed

4 files changed

+238
-30
lines changed

lightning-persister/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ rustdoc-args = ["--cfg", "docsrs"]
1717
bitcoin = "0.32.2"
1818
lightning = { version = "0.2.0", path = "../lightning" }
1919

20+
# TODO: Make conditional?
21+
tokio = { version = "1.35", features = [ "macros", "rt", "rt-multi-thread", "sync", "time", "fs", "io-util" ] }
22+
2023
[target.'cfg(windows)'.dependencies]
2124
windows-sys = { version = "0.48.0", default-features = false, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] }
2225

lightning-persister/src/fs_store.rs

Lines changed: 59 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,10 @@ pub struct FilesystemStore {
3838
data_dir: PathBuf,
3939
tmp_file_counter: AtomicUsize,
4040
gc_counter: AtomicUsize,
41-
locks: Mutex<HashMap<PathBuf, Arc<RwLock<()>>>>,
41+
42+
// Per path lock that ensures that we don't have concurrent writes to the same file. The lock also encapsulates the
43+
// latest written version per key.
44+
locks: Mutex<HashMap<PathBuf, Arc<RwLock<HashMap<String, u64>>>>>,
4245
}
4346

4447
impl FilesystemStore {
@@ -90,36 +93,12 @@ impl FilesystemStore {
9093

9194
Ok(dest_dir_path)
9295
}
93-
}
9496

95-
impl KVStoreSync for FilesystemStore {
96-
fn read(
97-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
98-
) -> lightning::io::Result<Vec<u8>> {
99-
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;
100-
101-
let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
102-
dest_file_path.push(key);
103-
104-
let mut buf = Vec::new();
105-
{
106-
let inner_lock_ref = {
107-
let mut outer_lock = self.locks.lock().unwrap();
108-
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
109-
};
110-
let _guard = inner_lock_ref.read().unwrap();
111-
112-
let mut f = fs::File::open(dest_file_path)?;
113-
f.read_to_end(&mut buf)?;
114-
}
115-
116-
self.garbage_collect_locks();
117-
118-
Ok(buf)
119-
}
120-
121-
fn write(
97+
/// Writes a specific version of a key to the filesystem. If a newer version has been written already, this function
98+
/// returns early without writing.
99+
pub(crate) fn write_version(
122100
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
101+
version: Option<u64>,
123102
) -> lightning::io::Result<()> {
124103
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
125104

@@ -153,7 +132,24 @@ impl KVStoreSync for FilesystemStore {
153132
let mut outer_lock = self.locks.lock().unwrap();
154133
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
155134
};
156-
let _guard = inner_lock_ref.write().unwrap();
135+
let mut guard = inner_lock_ref.write().unwrap();
136+
137+
// If a version is provided, we check if we already have a newer version written. This is used in async
138+
// contexts to realize eventual consistency.
139+
if let Some(version) = version {
140+
match guard.entry(key.to_string()) {
141+
std::collections::hash_map::Entry::Vacant(e) => {
142+
e.insert(version);
143+
},
144+
std::collections::hash_map::Entry::Occupied(mut e) => {
145+
if version <= *e.get() {
146+
// If the version is not greater, we don't write the file.
147+
return Ok(());
148+
}
149+
e.insert(version);
150+
},
151+
}
152+
}
157153

158154
#[cfg(not(target_os = "windows"))]
159155
{
@@ -204,6 +200,39 @@ impl KVStoreSync for FilesystemStore {
204200

205201
res
206202
}
203+
}
204+
205+
impl KVStoreSync for FilesystemStore {
206+
fn read(
207+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
208+
) -> lightning::io::Result<Vec<u8>> {
209+
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;
210+
211+
let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
212+
dest_file_path.push(key);
213+
214+
let mut buf = Vec::new();
215+
{
216+
let inner_lock_ref = {
217+
let mut outer_lock = self.locks.lock().unwrap();
218+
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
219+
};
220+
let _guard = inner_lock_ref.read().unwrap();
221+
222+
let mut f = fs::File::open(dest_file_path)?;
223+
f.read_to_end(&mut buf)?;
224+
}
225+
226+
self.garbage_collect_locks();
227+
228+
Ok(buf)
229+
}
230+
231+
fn write(
232+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
233+
) -> lightning::io::Result<()> {
234+
self.write_version(primary_namespace, secondary_namespace, key, buf, None)
235+
}
207236

208237
fn remove(
209238
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
//! Objects related to [`FilesystemStoreAsync`] live here.
2+
3+
use std::sync::{
4+
atomic::{AtomicU64, Ordering},
5+
Arc,
6+
};
7+
8+
use lightning::util::persist::{KVStore, KVStoreSync};
9+
10+
use crate::fs_store::FilesystemStore;
11+
12+
/// An asynchronous extension of FilesystemStore, implementing the `KVStore` trait for async operations. It is shaped as
13+
/// a wrapper around an existing [`FilesystemStore`] so that the same locks are used. This allows both the sync and
14+
/// async interface to be used simultaneously.
15+
pub struct FilesystemStoreAsync {
16+
inner: Arc<FilesystemStore>,
17+
version_counter: Arc<AtomicU64>,
18+
}
19+
20+
impl FilesystemStoreAsync {
21+
/// Creates a new instance of [`FilesystemStoreAsync`].
22+
pub fn new(inner: Arc<FilesystemStore>) -> Self {
23+
Self { inner, version_counter: Arc::new(AtomicU64::new(0)) }
24+
}
25+
}
26+
27+
impl KVStore for FilesystemStoreAsync {
28+
fn read(
29+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
30+
) -> std::pin::Pin<
31+
Box<
32+
dyn std::prelude::rust_2024::Future<Output = Result<Vec<u8>, lightning::io::Error>>
33+
+ 'static
34+
+ Send,
35+
>,
36+
> {
37+
let primary_namespace = primary_namespace.to_string();
38+
let secondary_namespace = secondary_namespace.to_string();
39+
let key = key.to_string();
40+
let this = Arc::clone(&self.inner);
41+
42+
Box::pin(async move {
43+
tokio::task::spawn_blocking(move || {
44+
this.read(&primary_namespace, &secondary_namespace, &key)
45+
})
46+
.await
47+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
48+
})
49+
}
50+
51+
fn write(
52+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
53+
) -> std::pin::Pin<
54+
Box<
55+
dyn std::prelude::rust_2024::Future<Output = Result<(), lightning::io::Error>>
56+
+ 'static
57+
+ Send,
58+
>,
59+
> {
60+
let primary_namespace = primary_namespace.to_string();
61+
let secondary_namespace = secondary_namespace.to_string();
62+
let key = key.to_string();
63+
let buf = buf.to_vec();
64+
let this = Arc::clone(&self.inner);
65+
66+
// Obtain a version number to retain the call sequence.
67+
let version = self.version_counter.fetch_add(1, Ordering::SeqCst);
68+
69+
Box::pin(async move {
70+
tokio::task::spawn_blocking(move || {
71+
this.write_version(
72+
&primary_namespace,
73+
&secondary_namespace,
74+
&key,
75+
&buf,
76+
Some(version),
77+
)
78+
})
79+
.await
80+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
81+
})
82+
}
83+
84+
fn remove(
85+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
86+
) -> std::pin::Pin<
87+
Box<
88+
dyn std::prelude::rust_2024::Future<Output = Result<(), lightning::io::Error>>
89+
+ 'static
90+
+ Send,
91+
>,
92+
> {
93+
let primary_namespace = primary_namespace.to_string();
94+
let secondary_namespace = secondary_namespace.to_string();
95+
let key = key.to_string();
96+
let this = Arc::clone(&self.inner);
97+
98+
Box::pin(async move {
99+
tokio::task::spawn_blocking(move || {
100+
this.remove(&primary_namespace, &secondary_namespace, &key, lazy)
101+
})
102+
.await
103+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
104+
})
105+
}
106+
107+
fn list(
108+
&self, primary_namespace: &str, secondary_namespace: &str,
109+
) -> std::pin::Pin<
110+
Box<
111+
dyn std::prelude::rust_2024::Future<Output = Result<Vec<String>, lightning::io::Error>>
112+
+ 'static
113+
+ Send,
114+
>,
115+
> {
116+
let primary_namespace = primary_namespace.to_string();
117+
let secondary_namespace = secondary_namespace.to_string();
118+
let this = Arc::clone(&self.inner);
119+
120+
Box::pin(async move {
121+
tokio::task::spawn_blocking(move || this.list(&primary_namespace, &secondary_namespace))
122+
.await
123+
.unwrap_or_else(|e| {
124+
Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))
125+
})
126+
})
127+
}
128+
}
129+
130+
mod test {
131+
use crate::{fs_store::FilesystemStore, fs_store_async::FilesystemStoreAsync};
132+
use lightning::util::persist::KVStore;
133+
use std::sync::Arc;
134+
135+
#[tokio::test]
136+
async fn read_write_remove_list_persist() {
137+
let mut temp_path = std::env::temp_dir();
138+
temp_path.push("test_read_write_remove_list_persist");
139+
let fs_store = Arc::new(FilesystemStore::new(temp_path));
140+
let fs_store_async = FilesystemStoreAsync::new(Arc::clone(&fs_store));
141+
142+
let data1 = [42u8; 32];
143+
let data2 = [43u8; 32];
144+
145+
let primary_namespace = "testspace";
146+
let secondary_namespace = "testsubspace";
147+
let key = "testkey";
148+
149+
// Test writing the same key twice with different data. Execute the asynchronous part out of order to ensure
150+
// that eventual consistency works.
151+
let fut1 = fs_store_async.write(primary_namespace, secondary_namespace, key, &data1);
152+
let fut2 = fs_store_async.write(primary_namespace, secondary_namespace, key, &data2);
153+
154+
fut2.await.unwrap();
155+
fut1.await.unwrap();
156+
157+
// Test list.
158+
let listed_keys =
159+
fs_store_async.list(primary_namespace, secondary_namespace).await.unwrap();
160+
assert_eq!(listed_keys.len(), 1);
161+
assert_eq!(listed_keys[0], key);
162+
163+
// Test read. We expect to read data2, as the write call was initiated later.
164+
let read_data =
165+
fs_store_async.read(primary_namespace, secondary_namespace, key).await.unwrap();
166+
assert_eq!(data2, &*read_data);
167+
168+
// Test remove.
169+
fs_store_async.remove(primary_namespace, secondary_namespace, key, false).await.unwrap();
170+
171+
let listed_keys =
172+
fs_store_async.list(primary_namespace, secondary_namespace).await.unwrap();
173+
assert_eq!(listed_keys.len(), 0);
174+
}
175+
}

lightning-persister/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
extern crate criterion;
1010

1111
pub mod fs_store;
12+
pub mod fs_store_async;
1213

1314
mod utils;
1415

0 commit comments

Comments
 (0)