Skip to content

Commit 258b635

Browse files
committed
Implement KVStore for SqliteStore
1 parent de64ae4 commit 258b635

File tree

1 file changed

+74
-1
lines changed

1 file changed

+74
-1
lines changed

src/io/sqlite_store/mod.rs

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,17 @@
99
use crate::io::utils::check_namespace_key_validity;
1010

1111
use lightning::io;
12-
use lightning::util::persist::KVStoreSync;
12+
use lightning::util::persist::{KVStore, KVStoreSync};
1313

1414
use lightning_types::string::PrintableString;
1515

1616
use rusqlite::{named_params, Connection};
1717

18+
use std::boxed::Box;
1819
use std::fs;
20+
use std::future::Future;
1921
use std::path::PathBuf;
22+
use std::pin::Pin;
2023
use std::sync::{Arc, Mutex};
2124

2225
mod migrations;
@@ -62,6 +65,76 @@ impl SqliteStore {
6265
}
6366
}
6467

68+
impl KVStore for SqliteStore {
69+
fn read(
70+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
71+
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + Send>> {
72+
let primary_namespace = primary_namespace.to_string();
73+
let secondary_namespace = secondary_namespace.to_string();
74+
let key = key.to_string();
75+
let inner = Arc::clone(&self.inner);
76+
let fut = tokio::task::spawn_blocking(move || {
77+
inner.read_internal(&primary_namespace, &secondary_namespace, &key)
78+
});
79+
Box::pin(async move {
80+
fut.await.unwrap_or_else(|e| {
81+
let msg = format!("Failed to IO operation due join error: {}", e);
82+
Err(io::Error::new(io::ErrorKind::Other, msg))
83+
})
84+
})
85+
}
86+
fn write(
87+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
88+
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + Send>> {
89+
let primary_namespace = primary_namespace.to_string();
90+
let secondary_namespace = secondary_namespace.to_string();
91+
let key = key.to_string();
92+
let inner = Arc::clone(&self.inner);
93+
let fut = tokio::task::spawn_blocking(move || {
94+
inner.write_internal(&primary_namespace, &secondary_namespace, &key, buf)
95+
});
96+
Box::pin(async move {
97+
fut.await.unwrap_or_else(|e| {
98+
let msg = format!("Failed to IO operation due join error: {}", e);
99+
Err(io::Error::new(io::ErrorKind::Other, msg))
100+
})
101+
})
102+
}
103+
fn remove(
104+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
105+
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + Send>> {
106+
let primary_namespace = primary_namespace.to_string();
107+
let secondary_namespace = secondary_namespace.to_string();
108+
let key = key.to_string();
109+
let inner = Arc::clone(&self.inner);
110+
let fut = tokio::task::spawn_blocking(move || {
111+
inner.remove_internal(&primary_namespace, &secondary_namespace, &key, lazy)
112+
});
113+
Box::pin(async move {
114+
fut.await.unwrap_or_else(|e| {
115+
let msg = format!("Failed to IO operation due join error: {}", e);
116+
Err(io::Error::new(io::ErrorKind::Other, msg))
117+
})
118+
})
119+
}
120+
fn list(
121+
&self, primary_namespace: &str, secondary_namespace: &str,
122+
) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + Send>> {
123+
let primary_namespace = primary_namespace.to_string();
124+
let secondary_namespace = secondary_namespace.to_string();
125+
let inner = Arc::clone(&self.inner);
126+
let fut = tokio::task::spawn_blocking(move || {
127+
inner.list_internal(&primary_namespace, &secondary_namespace)
128+
});
129+
Box::pin(async move {
130+
fut.await.unwrap_or_else(|e| {
131+
let msg = format!("Failed to IO operation due join error: {}", e);
132+
Err(io::Error::new(io::ErrorKind::Other, msg))
133+
})
134+
})
135+
}
136+
}
137+
65138
impl KVStoreSync for SqliteStore {
66139
fn read(
67140
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,

0 commit comments

Comments
 (0)