Skip to content

Commit fa82549

Browse files
committed
FIX: Add postgres storage adapter for samod
1 parent 8f9dc9c commit fa82549

File tree

9 files changed

+368
-74
lines changed

9 files changed

+368
-74
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/backend/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ jsonrpsee = "0.24.6"
2222
jsonrpsee-server = "0.24.6"
2323
notebook-types = { version = "0.1.0", path = "../notebook-types" }
2424
qubit = { version = "1.0.0-beta.0", features = ["ts-serde-json", "ts-uuid"] }
25+
rand = "0.8"
2526
regex = "1.11.1"
2627
samod = { git = "https://github.com/alexjg/samod", features = [
2728
"tokio",

packages/backend/src/app.rs

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use sqlx::PgPool;
44
use std::collections::HashSet;
55
use std::sync::Arc;
66
use thiserror::Error;
7-
use tokio::sync::{RwLock, watch};
7+
use tokio::sync::RwLock;
88
use ts_rs::TS;
99
use uuid::Uuid;
1010

@@ -19,21 +19,10 @@ pub struct AppState {
1919
/// Automerge-repo provider
2020
pub repo: samod::Repo,
2121

22-
pub app_status: watch::Receiver<AppStatus>,
23-
2422
/// Tracks which ref_ids have active autosave listeners to prevent duplicates
2523
pub active_listeners: Arc<RwLock<HashSet<Uuid>>>,
2624
}
2725

28-
#[derive(Clone, Debug, PartialEq, Eq)]
29-
pub enum AppStatus {
30-
Starting,
31-
Migrating,
32-
Running,
33-
#[allow(dead_code)]
34-
Failed(String),
35-
}
36-
3726
/// Context available to RPC procedures.
3827
#[derive(Clone)]
3928
pub struct AppCtx {

packages/backend/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod storage;

packages/backend/src/main.rs

Lines changed: 16 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,14 @@ use axum::{Router, routing::get};
66
use axum::{extract::State, response::IntoResponse};
77
use clap::{Parser, Subcommand};
88
use firebase_auth::FirebaseAuth;
9-
use http::StatusCode;
109
use sqlx::postgres::PgPoolOptions;
11-
use sqlx::{PgPool, Postgres};
1210
use sqlx_migrator::cli::MigrationCommand;
1311
use sqlx_migrator::migrator::{Migrate, Migrator};
1412
use sqlx_migrator::{Info, Plan};
1513
use std::collections::HashSet;
1614
use std::path::Path;
1715
use std::sync::Arc;
18-
use tokio::sync::{RwLock, watch};
16+
use tokio::sync::RwLock;
1917
use tower::ServiceBuilder;
2018
use tower_http::cors::CorsLayer;
2119
use tracing::{error, info};
@@ -26,10 +24,9 @@ mod auth;
2624
mod automerge_json;
2725
mod document;
2826
mod rpc;
27+
mod storage;
2928
mod user;
3029

31-
use app::AppStatus;
32-
3330
/// Port for the web server providing the RPC API.
3431
fn web_port() -> String {
3532
dotenvy::var("PORT").unwrap_or("8000".to_string())
@@ -100,17 +97,21 @@ async fn main() {
10097
}
10198

10299
Command::Serve => {
103-
let (status_tx, status_rx) = watch::channel(AppStatus::Starting);
100+
info!("Applying database migrations...");
101+
let mut conn = db.acquire().await.expect("Failed to acquire DB connection");
102+
migrator
103+
.run(&mut conn, &Plan::apply_all())
104+
.await
105+
.expect("Failed to run migrations");
106+
info!("Migrations complete");
104107

105-
// Create samod repo
106108
let repo = samod::Repo::builder(tokio::runtime::Handle::current())
107-
.with_storage(samod::storage::InMemoryStorage::new())
109+
.with_storage(storage::PostgresStorage::new(db.clone()))
108110
.load()
109111
.await;
110112

111113
let state = app::AppState {
112114
db: db.clone(),
113-
app_status: status_rx.clone(),
114115
repo,
115116
active_listeners: Arc::new(RwLock::new(HashSet::new())),
116117
};
@@ -127,48 +128,10 @@ async fn main() {
127128
.await,
128129
);
129130

130-
tokio::try_join!(
131-
run_migrator_apply(db.clone(), migrator, status_tx.clone()),
132-
run_web_server(state.clone(), firebase_auth.clone()),
133-
)
134-
.unwrap();
135-
}
136-
}
137-
}
138-
139-
async fn run_migrator_apply(
140-
db: PgPool,
141-
migrator: Migrator<Postgres>,
142-
status_tx: watch::Sender<AppStatus>,
143-
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
144-
status_tx.send(AppStatus::Migrating)?;
145-
info!("Applying database migrations...");
146-
147-
let mut conn = db.acquire().await?;
148-
migrator.run(&mut conn, &Plan::apply_all()).await.unwrap();
131+
// Notify systemd we're ready
132+
sd_notify::notify(false, &[sd_notify::NotifyState::Ready]).ok();
149133

150-
status_tx.send(AppStatus::Running)?;
151-
sd_notify::notify(false, &[sd_notify::NotifyState::Ready])?;
152-
info!("Migrations complete");
153-
154-
Ok(())
155-
}
156-
157-
async fn app_status_gate(
158-
State(status_rx): State<watch::Receiver<AppStatus>>,
159-
req: Request,
160-
next: Next,
161-
) -> impl IntoResponse {
162-
// Combining the following 2 lines will anger the rust gods
163-
let status = status_rx.borrow().clone();
164-
match status {
165-
AppStatus::Running => next.run(req).await,
166-
AppStatus::Failed(reason) => {
167-
(StatusCode::INTERNAL_SERVER_ERROR, format!("App failed to start: {reason}"))
168-
.into_response()
169-
}
170-
AppStatus::Starting | AppStatus::Migrating => {
171-
(StatusCode::SERVICE_UNAVAILABLE, "Server not ready yet").into_response()
134+
run_web_server(state.clone(), firebase_auth.clone()).await.unwrap();
172135
}
173136
}
174137
}
@@ -191,13 +154,8 @@ async fn auth_middleware(
191154
next.run(req).await
192155
}
193156

194-
async fn status_handler(State(status_rx): State<watch::Receiver<AppStatus>>) -> String {
195-
match status_rx.borrow().clone() {
196-
AppStatus::Starting => "Starting".into(),
197-
AppStatus::Migrating => "Migrating".into(),
198-
AppStatus::Running => "Running".into(),
199-
AppStatus::Failed(reason) => format!("Failed: {reason}"),
200-
}
157+
async fn status_handler() -> &'static str {
158+
"Running"
201159
}
202160

203161
async fn websocket_handler(
@@ -223,20 +181,16 @@ async fn run_web_server(
223181
let (qubit_service, qubit_handle) = rpc_router.as_rpc(state.clone()).into_service();
224182

225183
let rpc_with_mw = ServiceBuilder::new()
226-
.layer(from_fn_with_state(state.app_status.clone(), app_status_gate))
227184
.layer(from_fn_with_state(firebase_auth.clone(), auth_middleware))
228185
.service(qubit_service);
229186

230187
let samod_router = Router::new()
231188
.layer(from_fn_with_state(firebase_auth, auth_middleware))
232-
.layer(from_fn_with_state(state.app_status.clone(), app_status_gate))
233189
.route("/repo-ws", get(websocket_handler))
234190
.with_state(state.repo.clone());
235191

236192
// used by tests to tell when the backend is ready
237-
let status_router = Router::new()
238-
.route("/status", get(status_handler))
239-
.with_state(state.app_status.clone());
193+
let status_router = Router::new().route("/status", get(status_handler));
240194

241195
let mut app = Router::new()
242196
.merge(status_router)
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
mod postgres;
2+
pub mod testing;
3+
4+
pub use postgres::PostgresStorage;
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
use samod::storage::{Storage, StorageKey};
2+
use sqlx::PgPool;
3+
use std::collections::HashMap;
4+
5+
/// A PostgreSQL-backed storage adapter for samod
6+
///
7+
/// ## Database Schema
8+
///
9+
/// The adapter requires a table with the following structure:
10+
/// ```sql
11+
/// CREATE TABLE storage (
12+
/// key text[] PRIMARY KEY,
13+
/// data bytea NOT NULL
14+
/// );
15+
/// ```
16+
#[derive(Clone)]
17+
pub struct PostgresStorage {
18+
pool: PgPool,
19+
}
20+
21+
impl PostgresStorage {
22+
pub fn new(pool: PgPool) -> Self {
23+
Self { pool }
24+
}
25+
}
26+
27+
impl Storage for PostgresStorage {
28+
async fn load(&self, key: StorageKey) -> Option<Vec<u8>> {
29+
let key_parts: Vec<String> = key.into_iter().collect();
30+
31+
let result = sqlx::query_scalar::<_, Vec<u8>>("SELECT data FROM storage WHERE key = $1")
32+
.bind(&key_parts)
33+
.fetch_optional(&self.pool)
34+
.await;
35+
36+
match result {
37+
Ok(data) => data,
38+
Err(e) => {
39+
tracing::error!("Failed to load from storage: {}", e);
40+
None
41+
}
42+
}
43+
}
44+
45+
async fn load_range(&self, prefix: StorageKey) -> HashMap<StorageKey, Vec<u8>> {
46+
let prefix_parts: Vec<String> = prefix.into_iter().collect();
47+
48+
let result = if prefix_parts.is_empty() {
49+
sqlx::query_as::<_, (Vec<String>, Vec<u8>)>("SELECT key, data FROM storage")
50+
.fetch_all(&self.pool)
51+
.await
52+
} else {
53+
sqlx::query_as::<_, (Vec<String>, Vec<u8>)>(
54+
"SELECT key, data FROM storage WHERE key[1:cardinality($1::text[])] = $1::text[]",
55+
)
56+
.bind(&prefix_parts)
57+
.fetch_all(&self.pool)
58+
.await
59+
};
60+
61+
match result {
62+
Ok(rows) => {
63+
let mut map = HashMap::new();
64+
for (key_parts, data) in rows {
65+
if let Ok(storage_key) = StorageKey::from_parts(key_parts) {
66+
map.insert(storage_key, data);
67+
}
68+
}
69+
map
70+
}
71+
Err(e) => {
72+
tracing::error!("Failed to load range from storage: {}", e);
73+
HashMap::new()
74+
}
75+
}
76+
}
77+
78+
async fn put(&self, key: StorageKey, data: Vec<u8>) {
79+
let key_parts: Vec<String> = key.into_iter().collect();
80+
81+
let result = sqlx::query(
82+
"
83+
INSERT INTO storage (key, data)
84+
VALUES ($1, $2)
85+
ON CONFLICT (key) DO UPDATE SET data = $2
86+
",
87+
)
88+
.bind(&key_parts)
89+
.bind(&data)
90+
.execute(&self.pool)
91+
.await;
92+
93+
if let Err(e) = result {
94+
tracing::error!("Failed to put to storage: {}", e);
95+
}
96+
}
97+
98+
async fn delete(&self, key: StorageKey) {
99+
let key_parts: Vec<String> = key.into_iter().collect();
100+
101+
let result = sqlx::query("DELETE FROM storage WHERE key = $1")
102+
.bind(&key_parts)
103+
.execute(&self.pool)
104+
.await;
105+
106+
if let Err(e) = result {
107+
tracing::error!("Failed to delete from storage: {}", e);
108+
}
109+
}
110+
}

0 commit comments

Comments
 (0)