Skip to content

Commit 9f74c9d

Browse files
committed
FIX: Add postgres storage adapter for samod
1 parent 8f9dc9c commit 9f74c9d

File tree

8 files changed

+350
-1
lines changed

8 files changed

+350
-1
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/backend/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ jsonrpsee = "0.24.6"
2222
jsonrpsee-server = "0.24.6"
2323
notebook-types = { version = "0.1.0", path = "../notebook-types" }
2424
qubit = { version = "1.0.0-beta.0", features = ["ts-serde-json", "ts-uuid"] }
25+
rand = "0.8"
2526
regex = "1.11.1"
2627
samod = { git = "https://github.com/alexjg/samod", features = [
2728
"tokio",

packages/backend/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod storage;

packages/backend/src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ mod auth;
2626
mod automerge_json;
2727
mod document;
2828
mod rpc;
29+
mod storage;
2930
mod user;
3031

3132
use app::AppStatus;
@@ -104,7 +105,7 @@ async fn main() {
104105

105106
// Create samod repo
106107
let repo = samod::Repo::builder(tokio::runtime::Handle::current())
107-
.with_storage(samod::storage::InMemoryStorage::new())
108+
.with_storage(storage::PostgresStorage::new(db.clone()))
108109
.load()
109110
.await;
110111

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
mod postgres;
2+
pub mod testing;
3+
4+
pub use postgres::PostgresStorage;
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
use samod::storage::{Storage, StorageKey};
2+
use sqlx::PgPool;
3+
use std::collections::HashMap;
4+
5+
/// A PostgreSQL-backed storage adapter for samod
6+
///
7+
/// ## Database Schema
8+
///
9+
/// The adapter requires a table with the following structure:
10+
/// ```sql
11+
/// CREATE TABLE storage (
12+
/// key text[] PRIMARY KEY,
13+
/// data bytea NOT NULL
14+
/// );
15+
/// ```
16+
#[derive(Clone)]
17+
pub struct PostgresStorage {
18+
pool: PgPool,
19+
}
20+
21+
impl PostgresStorage {
22+
pub fn new(pool: PgPool) -> Self {
23+
Self { pool }
24+
}
25+
}
26+
27+
impl Storage for PostgresStorage {
28+
async fn load(&self, key: StorageKey) -> Option<Vec<u8>> {
29+
let key_parts: Vec<String> = key.into_iter().collect();
30+
31+
let result = sqlx::query_scalar::<_, Vec<u8>>("SELECT data FROM storage WHERE key = $1")
32+
.bind(&key_parts)
33+
.fetch_optional(&self.pool)
34+
.await;
35+
36+
match result {
37+
Ok(data) => data,
38+
Err(e) => {
39+
tracing::error!("Failed to load from storage: {}", e);
40+
None
41+
}
42+
}
43+
}
44+
45+
async fn load_range(&self, prefix: StorageKey) -> HashMap<StorageKey, Vec<u8>> {
46+
let prefix_parts: Vec<String> = prefix.into_iter().collect();
47+
48+
let result = if prefix_parts.is_empty() {
49+
sqlx::query_as::<_, (Vec<String>, Vec<u8>)>("SELECT key, data FROM storage")
50+
.fetch_all(&self.pool)
51+
.await
52+
} else {
53+
sqlx::query_as::<_, (Vec<String>, Vec<u8>)>(
54+
"SELECT key, data FROM storage WHERE key[1:cardinality($1::text[])] = $1::text[]",
55+
)
56+
.bind(&prefix_parts)
57+
.fetch_all(&self.pool)
58+
.await
59+
};
60+
61+
match result {
62+
Ok(rows) => {
63+
let mut map = HashMap::new();
64+
for (key_parts, data) in rows {
65+
if let Ok(storage_key) = StorageKey::from_parts(key_parts) {
66+
map.insert(storage_key, data);
67+
}
68+
}
69+
map
70+
}
71+
Err(e) => {
72+
tracing::error!("Failed to load range from storage: {}", e);
73+
HashMap::new()
74+
}
75+
}
76+
}
77+
78+
async fn put(&self, key: StorageKey, data: Vec<u8>) {
79+
let key_parts: Vec<String> = key.into_iter().collect();
80+
81+
let result = sqlx::query(
82+
"
83+
INSERT INTO storage (key, data)
84+
VALUES ($1, $2)
85+
ON CONFLICT (key) DO UPDATE SET data = $2
86+
",
87+
)
88+
.bind(&key_parts)
89+
.bind(&data)
90+
.execute(&self.pool)
91+
.await;
92+
93+
if let Err(e) = result {
94+
tracing::error!("Failed to put to storage: {}", e);
95+
}
96+
}
97+
98+
async fn delete(&self, key: StorageKey) {
99+
let key_parts: Vec<String> = key.into_iter().collect();
100+
101+
let result = sqlx::query("DELETE FROM storage WHERE key = $1")
102+
.bind(&key_parts)
103+
.execute(&self.pool)
104+
.await;
105+
106+
if let Err(e) = result {
107+
tracing::error!("Failed to delete from storage: {}", e);
108+
}
109+
}
110+
}
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
//! Storage adapter testing utilities
2+
//!
3+
//! Provides a test suite for any implementation of the `Storage` trait.
4+
//! Based on the TypeScript `runStorageAdapterTests` from automerge-repo.
5+
6+
#![allow(dead_code)]
7+
8+
use rand::Rng;
9+
use samod::storage::{Storage, StorageKey};
10+
use std::future::Future;
11+
use std::pin::Pin;
12+
use std::sync::LazyLock;
13+
14+
pub fn payload_a() -> Vec<u8> {
15+
vec![0, 1, 127, 99, 154, 235]
16+
}
17+
18+
pub fn payload_b() -> Vec<u8> {
19+
vec![1, 76, 160, 53, 57, 10, 230]
20+
}
21+
22+
pub fn payload_c() -> Vec<u8> {
23+
vec![2, 111, 74, 131, 236, 96, 142, 193]
24+
}
25+
26+
static LARGE_PAYLOAD: LazyLock<Vec<u8>> = LazyLock::new(|| {
27+
let mut vec = vec![0u8; 100000];
28+
rand::thread_rng().fill(&mut vec[..]);
29+
vec
30+
});
31+
32+
pub fn large_payload() -> Vec<u8> {
33+
LARGE_PAYLOAD.clone()
34+
}
35+
36+
/// Trait for storage test fixtures
37+
pub trait StorageTestFixture: Sized + Send {
38+
/// The storage type being tested
39+
type Storage: Storage + Send + Sync + 'static;
40+
41+
/// Setup the test fixture
42+
fn setup() -> impl std::future::Future<Output = Self> + Send;
43+
44+
/// Get reference to the storage adapter
45+
fn storage(&self) -> &Self::Storage;
46+
47+
/// Optional cleanup
48+
fn teardown(self) -> impl std::future::Future<Output = ()> + Send {
49+
async {}
50+
}
51+
}
52+
53+
/// Helper to run a single test with setup and teardown
54+
async fn run_test<F, TestFn>(test_fn: TestFn)
55+
where
56+
F: StorageTestFixture,
57+
TestFn: for<'a> FnOnce(&'a F::Storage) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> + Send,
58+
{
59+
let fixture = F::setup().await;
60+
test_fn(fixture.storage()).await;
61+
fixture.teardown().await;
62+
}
63+
64+
/// Run all storage adapter acceptance tests
65+
pub async fn run_storage_adapter_tests<F: StorageTestFixture>() {
66+
run_test::<F, _>(|a| Box::pin(test_load_should_return_none_if_no_data(a))).await;
67+
run_test::<F, _>(|a| Box::pin(test_save_and_load_should_return_data_that_was_saved(a))).await;
68+
run_test::<F, _>(|a| Box::pin(test_save_and_load_should_work_with_composite_keys(a))).await;
69+
run_test::<F, _>(|a| Box::pin(test_save_and_load_should_work_with_large_payload(a))).await;
70+
run_test::<F, _>(|a| Box::pin(test_load_range_should_return_empty_if_no_data(a))).await;
71+
run_test::<F, _>(|a| Box::pin(test_save_and_load_range_should_return_all_matching_data(a)))
72+
.await;
73+
run_test::<F, _>(|a| Box::pin(test_save_and_load_range_should_only_load_matching_values(a)))
74+
.await;
75+
run_test::<F, _>(|a| Box::pin(test_save_and_remove_should_be_empty_after_removing(a))).await;
76+
run_test::<F, _>(|a| Box::pin(test_save_and_save_should_overwrite(a))).await;
77+
}
78+
79+
// describe("load")
80+
pub async fn test_load_should_return_none_if_no_data<S: Storage>(adapter: &S) {
81+
let actual = adapter
82+
.load(StorageKey::from_parts(["AAAAA", "sync-state", "xxxxx"]).unwrap())
83+
.await;
84+
85+
assert_eq!(actual, None);
86+
}
87+
88+
// describe("save and load")
89+
pub async fn test_save_and_load_should_return_data_that_was_saved<S: Storage>(adapter: &S) {
90+
let key = StorageKey::from_parts(["storage-adapter-id"]).unwrap();
91+
adapter.put(key.clone(), payload_a()).await;
92+
93+
let actual = adapter.load(key).await;
94+
95+
assert_eq!(actual, Some(payload_a()));
96+
}
97+
98+
pub async fn test_save_and_load_should_work_with_composite_keys<S: Storage>(adapter: &S) {
99+
let key = StorageKey::from_parts(["AAAAA", "sync-state", "xxxxx"]).unwrap();
100+
adapter.put(key.clone(), payload_a()).await;
101+
102+
let actual = adapter.load(key).await;
103+
104+
assert_eq!(actual, Some(payload_a()));
105+
}
106+
107+
pub async fn test_save_and_load_should_work_with_large_payload<S: Storage>(adapter: &S) {
108+
let key = StorageKey::from_parts(["AAAAA", "sync-state", "xxxxx"]).unwrap();
109+
adapter.put(key.clone(), large_payload()).await;
110+
111+
let actual = adapter.load(key).await;
112+
113+
assert_eq!(actual, Some(large_payload()));
114+
}
115+
116+
// describe("loadRange")
117+
pub async fn test_load_range_should_return_empty_if_no_data<S: Storage>(adapter: &S) {
118+
let result = adapter.load_range(StorageKey::from_parts(["AAAAA"]).unwrap()).await;
119+
120+
assert_eq!(result.len(), 0);
121+
}
122+
123+
// describe("save and loadRange")
124+
pub async fn test_save_and_load_range_should_return_all_matching_data<S: Storage>(adapter: &S) {
125+
let key_a = StorageKey::from_parts(["AAAAA", "sync-state", "xxxxx"]).unwrap();
126+
let key_b = StorageKey::from_parts(["AAAAA", "snapshot", "yyyyy"]).unwrap();
127+
let key_c = StorageKey::from_parts(["AAAAA", "sync-state", "zzzzz"]).unwrap();
128+
129+
adapter.put(key_a.clone(), payload_a()).await;
130+
adapter.put(key_b.clone(), payload_b()).await;
131+
adapter.put(key_c.clone(), payload_c()).await;
132+
133+
let result = adapter.load_range(StorageKey::from_parts(["AAAAA"]).unwrap()).await;
134+
135+
assert_eq!(result.len(), 3);
136+
assert_eq!(result.get(&key_a), Some(&payload_a()));
137+
assert_eq!(result.get(&key_b), Some(&payload_b()));
138+
assert_eq!(result.get(&key_c), Some(&payload_c()));
139+
140+
let sync_result = adapter
141+
.load_range(StorageKey::from_parts(["AAAAA", "sync-state"]).unwrap())
142+
.await;
143+
144+
assert_eq!(sync_result.len(), 2);
145+
assert_eq!(sync_result.get(&key_a), Some(&payload_a()));
146+
assert_eq!(sync_result.get(&key_c), Some(&payload_c()));
147+
}
148+
149+
pub async fn test_save_and_load_range_should_only_load_matching_values<S: Storage>(adapter: &S) {
150+
let key_a = StorageKey::from_parts(["AAAAA", "sync-state", "xxxxx"]).unwrap();
151+
let key_c = StorageKey::from_parts(["BBBBB", "sync-state", "zzzzz"]).unwrap();
152+
153+
adapter.put(key_a.clone(), payload_a()).await;
154+
adapter.put(key_c.clone(), payload_c()).await;
155+
156+
let actual = adapter.load_range(StorageKey::from_parts(["AAAAA"]).unwrap()).await;
157+
158+
assert_eq!(actual.len(), 1);
159+
assert_eq!(actual.get(&key_a), Some(&payload_a()));
160+
}
161+
162+
// describe("save and remove")
163+
pub async fn test_save_and_remove_should_be_empty_after_removing<S: Storage>(adapter: &S) {
164+
let key = StorageKey::from_parts(["AAAAA", "snapshot", "xxxxx"]).unwrap();
165+
adapter.put(key.clone(), payload_a()).await;
166+
adapter.delete(key.clone()).await;
167+
168+
let range_result = adapter.load_range(StorageKey::from_parts(["AAAAA"]).unwrap()).await;
169+
assert_eq!(range_result.len(), 0);
170+
171+
let load_result = adapter.load(key).await;
172+
assert_eq!(load_result, None);
173+
}
174+
175+
// describe("save and save")
176+
pub async fn test_save_and_save_should_overwrite<S: Storage>(adapter: &S) {
177+
let key = StorageKey::from_parts(["AAAAA", "sync-state", "xxxxx"]).unwrap();
178+
adapter.put(key.clone(), payload_a()).await;
179+
adapter.put(key.clone(), payload_b()).await;
180+
181+
let result = adapter
182+
.load_range(StorageKey::from_parts(["AAAAA", "sync-state"]).unwrap())
183+
.await;
184+
185+
assert_eq!(result.len(), 1);
186+
assert_eq!(result.get(&key), Some(&payload_b()));
187+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
use backend::storage::{PostgresStorage, testing};
2+
use sqlx::PgPool;
3+
4+
async fn cleanup_test_data(pool: &PgPool) {
5+
let _ = sqlx::query("DELETE FROM storage WHERE key[1] = ANY($1)")
6+
.bind(&["AAAAA", "BBBBB", "storage-adapter-id"])
7+
.execute(pool)
8+
.await;
9+
}
10+
11+
struct PostgresTestFixture {
12+
storage: PostgresStorage,
13+
pool: PgPool,
14+
}
15+
16+
impl testing::StorageTestFixture for PostgresTestFixture {
17+
type Storage = PostgresStorage;
18+
19+
async fn setup() -> Self {
20+
let database_url =
21+
std::env::var("DATABASE_URL").expect("DATABASE_URL must be set for tests");
22+
23+
let pool = PgPool::connect(&database_url).await.expect("Failed to connect to database");
24+
25+
cleanup_test_data(&pool).await;
26+
27+
let storage = PostgresStorage::new(pool.clone());
28+
29+
Self { storage, pool }
30+
}
31+
32+
fn storage(&self) -> &PostgresStorage {
33+
&self.storage
34+
}
35+
36+
async fn teardown(self) {
37+
cleanup_test_data(&self.pool).await;
38+
}
39+
}
40+
41+
#[tokio::test]
42+
async fn postgres_storage_adapter_tests() {
43+
testing::run_storage_adapter_tests::<PostgresTestFixture>().await;
44+
}

0 commit comments

Comments
 (0)