From 6b09b86829a37963f7004be3f1afce9366293f94 Mon Sep 17 00:00:00 2001 From: Leo Nash Date: Fri, 15 Aug 2025 06:24:19 +0000 Subject: [PATCH 1/3] init db and table, check health --- rust/impls/Cargo.toml | 1 + rust/impls/src/postgres_store.rs | 110 +++++++++++++++++++++++++++-- rust/server/src/main.rs | 10 ++- rust/server/src/util/config.rs | 6 +- rust/server/vss-server-config.toml | 2 +- 5 files changed, 118 insertions(+), 11 deletions(-) diff --git a/rust/impls/Cargo.toml b/rust/impls/Cargo.toml index 026bbda..37e3c47 100644 --- a/rust/impls/Cargo.toml +++ b/rust/impls/Cargo.toml @@ -10,6 +10,7 @@ chrono = "0.4.38" tokio-postgres = { version = "0.7.12", features = ["with-chrono-0_4"] } bb8-postgres = "0.7" bytes = "1.4.0" +tokio = { version = "1.38.0", default-features = false } [dev-dependencies] tokio = { version = "1.38.0", default-features = false, features = ["rt-multi-thread", "macros"] } diff --git a/rust/impls/src/postgres_store.rs b/rust/impls/src/postgres_store.rs index 96f9618..b43508a 100644 --- a/rust/impls/src/postgres_store.rs +++ b/rust/impls/src/postgres_store.rs @@ -27,6 +27,22 @@ const KEY_COLUMN: &str = "key"; const VALUE_COLUMN: &str = "value"; const VERSION_COLUMN: &str = "version"; +const DB_CHECK_STMT: &str = "SELECT 1 FROM pg_database WHERE datname = $1"; +const DB_INIT_CMD: &str = "CREATE DATABASE"; +const TABLE_CHECK_STMT: &str = "SELECT 1 FROM vss_db WHERE false"; +const TABLE_INIT_STMT: &str = " + CREATE TABLE IF NOT EXISTS vss_db ( + user_token character varying(120) NOT NULL CHECK (user_token <> ''), + store_id character varying(120) NOT NULL CHECK (store_id <> ''), + key character varying(600) NOT NULL, + value bytea NULL, + version bigint NOT NULL, + created_at TIMESTAMP WITH TIME ZONE, + last_updated_at TIMESTAMP WITH TIME ZONE, + PRIMARY KEY (user_token, store_id, key) + ); +"; + /// The maximum number of key versions that can be returned in a single page. /// /// This constant helps control memory and bandwidth usage for list operations, @@ -46,17 +62,103 @@ pub struct PostgresBackendImpl { pool: Pool>, } +async fn initialize_vss_database(postgres_endpoint: &str, db_name: &str) -> Result<(), Error> { + let postgres_dsn = format!("{}/{}", postgres_endpoint, "postgres"); + let (client, connection) = tokio_postgres::connect(&postgres_dsn, NoTls).await + .map_err(|e| Error::new(ErrorKind::Other, format!("Connection error: {}", e)))?; + // Connection must be driven on separate task, will be dropped on client dropped + tokio::spawn(async move { + if let Err(e) = connection.await { + eprintln!("connection error: {}", e); + } + }); + + // Check if the database already exists + let num_rows = client.execute(DB_CHECK_STMT, &[&db_name]).await + .map_err(|e| Error::new(ErrorKind::Other, format!("Query error: {}", e)))?; + + if num_rows == 0 { + // Database does not exist, so create it + let stmt = format!("{} {}", DB_INIT_CMD, db_name); + client.execute(&stmt, &[]).await + .map_err(|e| Error::new(ErrorKind::Other, format!("Query error: {}", e)))?; + println!("Database '{}' created successfully", db_name); + } else { + println!("Database '{}' already exists skipping creation", db_name); + } + + Ok(()) +} + + impl PostgresBackendImpl { /// Constructs a [`PostgresBackendImpl`] using `dsn` for PostgreSQL connection information. - pub async fn new(dsn: &str) -> Result { - let manager = PostgresConnectionManager::new_from_stringlike(dsn, NoTls).map_err(|e| { + pub async fn new(postgres_endpoint: &str, db_name: &str, init_db: bool) -> Result { + if init_db { + tokio::time::timeout( + tokio::time::Duration::from_secs(3), + initialize_vss_database(postgres_endpoint, db_name), + ) + .await + .map_err(|e| Error::new(ErrorKind::Other, format!("Is the postgres endpoint online? {}", e)))? + .map_err(|e| Error::new(ErrorKind::Other, format!("Connection error: {}", e)))?; + } + let vss_dsn = format!("{}/{}", postgres_endpoint, db_name); + let manager = PostgresConnectionManager::new_from_stringlike(vss_dsn, NoTls).map_err(|e| { Error::new(ErrorKind::Other, format!("Connection manager error: {}", e)) })?; let pool = Pool::builder() .build(manager) .await .map_err(|e| Error::new(ErrorKind::Other, format!("Pool build error: {}", e)))?; - Ok(PostgresBackendImpl { pool }) + let ret = PostgresBackendImpl { pool }; + let touch_table = async { + if init_db { + ret.initialize_vss_table().await?; + } + ret.check_health().await + }; + tokio::time::timeout( + tokio::time::Duration::from_secs(3), + touch_table, + ) + .await + .map_err(|e| Error::new(ErrorKind::Other, format!("Does the database exist? If not use --init-db {}", e)))? + .map_err(|e| Error::new(ErrorKind::Other, format!("Connection error: {}", e)))?; + + Ok(ret) + } + + async fn initialize_vss_table(&self) -> Result<(), Error> { + let conn = self + .pool + .get() + .await + .map_err(|e| Error::new(ErrorKind::Other, format!("Connection error: {}", e)))?; + let num_rows = conn + .execute(TABLE_INIT_STMT, &[]) + .await + .map_err(|e| { + Error::new(ErrorKind::Other, format!("Database operation failed. {}", e)) + })?; + assert_eq!(num_rows, 0); + Ok(()) + } + + async fn check_health(&self) -> Result<(), Error> { + let conn = self + .pool + .get() + .await + .map_err(|e| Error::new(ErrorKind::Other, format!("Connection error: {}", e)))?; + let num_rows = conn + .execute(TABLE_CHECK_STMT, &[]) + .await + .map_err(|e| { + Error::new(ErrorKind::Other, format!("Does the table exist? If not use --init-db {}", e)) + })?; + assert_eq!(num_rows, 0); + Ok(()) } fn build_vss_record(&self, user_token: String, store_id: String, kv: KeyValue) -> VssDbRecord { @@ -413,7 +515,7 @@ mod tests { define_kv_store_tests!( PostgresKvStoreTest, PostgresBackendImpl, - PostgresBackendImpl::new("postgresql://postgres:postgres@localhost:5432/postgres") + PostgresBackendImpl::new("postgresql://postgres:postgres@localhost:5432", "postgres", false) .await .unwrap() ); diff --git a/rust/server/src/main.rs b/rust/server/src/main.rs index 849dcee..4f23de6 100644 --- a/rust/server/src/main.rs +++ b/rust/server/src/main.rs @@ -28,10 +28,11 @@ pub(crate) mod vss_service; fn main() { let args: Vec = std::env::args().collect(); - if args.len() != 2 { - eprintln!("Usage: {} ", args[0]); + if args.len() < 2 { + eprintln!("Usage: {} [--init-db]", args[0]); std::process::exit(1); } + let init_db = args.contains(&String::from("--init-db")); let config = match util::config::load_config(&args[1]) { Ok(cfg) => cfg, @@ -67,13 +68,16 @@ fn main() { }, }; let authorizer = Arc::new(NoopAuthorizer {}); + let postgresql_config = config.postgresql_config.expect("PostgreSQLConfig must be defined in config file."); let store = Arc::new( - PostgresBackendImpl::new(&config.postgresql_config.expect("PostgreSQLConfig must be defined in config file.").to_connection_string()) + PostgresBackendImpl::new(&postgresql_config.to_postgresql_endpoint(), &postgresql_config.database, init_db) .await .unwrap(), ); + println!("Loaded postgres!"); let rest_svc_listener = TcpListener::bind(&addr).await.expect("Failed to bind listening port"); + println!("Bound to {}", addr); loop { tokio::select! { res = rest_svc_listener.accept() => { diff --git a/rust/server/src/util/config.rs b/rust/server/src/util/config.rs index e75f972..295bf04 100644 --- a/rust/server/src/util/config.rs +++ b/rust/server/src/util/config.rs @@ -22,7 +22,7 @@ pub(crate) struct PostgreSQLConfig { } impl PostgreSQLConfig { - pub(crate) fn to_connection_string(&self) -> String { + pub(crate) fn to_postgresql_endpoint(&self) -> String { let username_env = std::env::var("VSS_POSTGRESQL_USERNAME"); let username = username_env.as_ref() .ok() @@ -35,8 +35,8 @@ impl PostgreSQLConfig { .expect("PostgreSQL database password must be provided in config or env var VSS_POSTGRESQL_PASSWORD must be set."); format!( - "postgresql://{}:{}@{}:{}/{}", - username, password, self.host, self.port, self.database + "postgresql://{}:{}@{}:{}", + username, password, self.host, self.port ) } } diff --git a/rust/server/vss-server-config.toml b/rust/server/vss-server-config.toml index 8a013b5..d73a6e0 100644 --- a/rust/server/vss-server-config.toml +++ b/rust/server/vss-server-config.toml @@ -7,4 +7,4 @@ username = "postgres" # Optional in TOML, can be overridden by env var `VSS_POS password = "postgres" # Optional in TOML, can be overridden by env var `VSS_POSTGRESQL_PASSWORD` host = "localhost" port = 5432 -database = "postgres" +database = "vss_db" From 14c39a91e1c49742908b9da712774f53e7fe020b Mon Sep 17 00:00:00 2001 From: Leo Nash Date: Fri, 15 Aug 2025 17:02:59 +0000 Subject: [PATCH 2/3] fixup: Remove `String` allocation --- rust/server/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/server/src/main.rs b/rust/server/src/main.rs index 4f23de6..1239c11 100644 --- a/rust/server/src/main.rs +++ b/rust/server/src/main.rs @@ -32,7 +32,7 @@ fn main() { eprintln!("Usage: {} [--init-db]", args[0]); std::process::exit(1); } - let init_db = args.contains(&String::from("--init-db")); + let init_db = args.iter().any(|arg| arg == "--init-db"); let config = match util::config::load_config(&args[1]) { Ok(cfg) => cfg, From 837b300c32e63a717ddf3fa1d8bcd79b8c03da1e Mon Sep 17 00:00:00 2001 From: Leo Nash Date: Fri, 15 Aug 2025 17:03:04 +0000 Subject: [PATCH 3/3] fixup: Add required `time` tokio feature --- rust/impls/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/impls/Cargo.toml b/rust/impls/Cargo.toml index 37e3c47..d5ea0ef 100644 --- a/rust/impls/Cargo.toml +++ b/rust/impls/Cargo.toml @@ -10,7 +10,7 @@ chrono = "0.4.38" tokio-postgres = { version = "0.7.12", features = ["with-chrono-0_4"] } bb8-postgres = "0.7" bytes = "1.4.0" -tokio = { version = "1.38.0", default-features = false } +tokio = { version = "1.38.0", default-features = false, features = ["time"] } [dev-dependencies] tokio = { version = "1.38.0", default-features = false, features = ["rt-multi-thread", "macros"] }