@@ -27,6 +27,22 @@ const KEY_COLUMN: &str = "key";
2727const VALUE_COLUMN : & str = "value" ;
2828const VERSION_COLUMN : & str = "version" ;
2929
30+ const DB_CHECK_STMT : & str = "SELECT 1 FROM pg_database WHERE datname = $1" ;
31+ const DB_INIT_CMD : & str = "CREATE DATABASE" ;
32+ const TABLE_CHECK_STMT : & str = "SELECT 1 FROM vss_db WHERE false" ;
33+ const TABLE_INIT_STMT : & str = "
34+ CREATE TABLE IF NOT EXISTS vss_db (
35+ user_token character varying(120) NOT NULL CHECK (user_token <> ''),
36+ store_id character varying(120) NOT NULL CHECK (store_id <> ''),
37+ key character varying(600) NOT NULL,
38+ value bytea NULL,
39+ version bigint NOT NULL,
40+ created_at TIMESTAMP WITH TIME ZONE,
41+ last_updated_at TIMESTAMP WITH TIME ZONE,
42+ PRIMARY KEY (user_token, store_id, key)
43+ );
44+ " ;
45+
3046/// The maximum number of key versions that can be returned in a single page.
3147///
3248/// This constant helps control memory and bandwidth usage for list operations,
@@ -46,17 +62,103 @@ pub struct PostgresBackendImpl {
4662 pool : Pool < PostgresConnectionManager < NoTls > > ,
4763}
4864
65+ async fn initialize_vss_database ( postgres_endpoint : & str , db_name : & str ) -> Result < ( ) , Error > {
66+ let postgres_dsn = format ! ( "{}/{}" , postgres_endpoint, "postgres" ) ;
67+ let ( client, connection) = tokio_postgres:: connect ( & postgres_dsn, NoTls ) . await
68+ . map_err ( |e| Error :: new ( ErrorKind :: Other , format ! ( "Connection error: {}" , e) ) ) ?;
69+ // Connection must be driven on separate task, will be dropped on client dropped
70+ tokio:: spawn ( async move {
71+ if let Err ( e) = connection. await {
72+ eprintln ! ( "connection error: {}" , e) ;
73+ }
74+ } ) ;
75+
76+ // Check if the database already exists
77+ let num_rows = client. execute ( DB_CHECK_STMT , & [ & db_name] ) . await
78+ . map_err ( |e| Error :: new ( ErrorKind :: Other , format ! ( "Query error: {}" , e) ) ) ?;
79+
80+ if num_rows == 0 {
81+ // Database does not exist, so create it
82+ let stmt = format ! ( "{} {}" , DB_INIT_CMD , db_name) ;
83+ client. execute ( & stmt, & [ ] ) . await
84+ . map_err ( |e| Error :: new ( ErrorKind :: Other , format ! ( "Query error: {}" , e) ) ) ?;
85+ println ! ( "Database '{}' created successfully" , db_name) ;
86+ } else {
87+ println ! ( "Database '{}' already exists skipping creation" , db_name) ;
88+ }
89+
90+ Ok ( ( ) )
91+ }
92+
93+
4994impl PostgresBackendImpl {
5095 /// Constructs a [`PostgresBackendImpl`] using `dsn` for PostgreSQL connection information.
51- pub async fn new ( dsn : & str ) -> Result < Self , Error > {
52- let manager = PostgresConnectionManager :: new_from_stringlike ( dsn, NoTls ) . map_err ( |e| {
96+ pub async fn new ( postgres_endpoint : & str , db_name : & str , init_db : bool ) -> Result < Self , Error > {
97+ if init_db {
98+ tokio:: time:: timeout (
99+ tokio:: time:: Duration :: from_secs ( 3 ) ,
100+ initialize_vss_database ( postgres_endpoint, db_name) ,
101+ )
102+ . await
103+ . map_err ( |e| Error :: new ( ErrorKind :: Other , format ! ( "Is the postgres endpoint online? {}" , e) ) ) ?
104+ . map_err ( |e| Error :: new ( ErrorKind :: Other , format ! ( "Connection error: {}" , e) ) ) ?;
105+ }
106+ let vss_dsn = format ! ( "{}/{}" , postgres_endpoint, db_name) ;
107+ let manager = PostgresConnectionManager :: new_from_stringlike ( vss_dsn, NoTls ) . map_err ( |e| {
53108 Error :: new ( ErrorKind :: Other , format ! ( "Connection manager error: {}" , e) )
54109 } ) ?;
55110 let pool = Pool :: builder ( )
56111 . build ( manager)
57112 . await
58113 . map_err ( |e| Error :: new ( ErrorKind :: Other , format ! ( "Pool build error: {}" , e) ) ) ?;
59- Ok ( PostgresBackendImpl { pool } )
114+ let ret = PostgresBackendImpl { pool } ;
115+ let touch_table = async {
116+ if init_db {
117+ ret. initialize_vss_table ( ) . await ?;
118+ }
119+ ret. check_health ( ) . await
120+ } ;
121+ tokio:: time:: timeout (
122+ tokio:: time:: Duration :: from_secs ( 3 ) ,
123+ touch_table,
124+ )
125+ . await
126+ . map_err ( |e| Error :: new ( ErrorKind :: Other , format ! ( "Does the database exist? If not use --init-db {}" , e) ) ) ?
127+ . map_err ( |e| Error :: new ( ErrorKind :: Other , format ! ( "Connection error: {}" , e) ) ) ?;
128+
129+ Ok ( ret)
130+ }
131+
132+ async fn initialize_vss_table ( & self ) -> Result < ( ) , Error > {
133+ let conn = self
134+ . pool
135+ . get ( )
136+ . await
137+ . map_err ( |e| Error :: new ( ErrorKind :: Other , format ! ( "Connection error: {}" , e) ) ) ?;
138+ let num_rows = conn
139+ . execute ( TABLE_INIT_STMT , & [ ] )
140+ . await
141+ . map_err ( |e| {
142+ Error :: new ( ErrorKind :: Other , format ! ( "Database operation failed. {}" , e) )
143+ } ) ?;
144+ assert_eq ! ( num_rows, 0 ) ;
145+ Ok ( ( ) )
146+ }
147+
148+ async fn check_health ( & self ) -> Result < ( ) , Error > {
149+ let conn = self
150+ . pool
151+ . get ( )
152+ . await
153+ . map_err ( |e| Error :: new ( ErrorKind :: Other , format ! ( "Connection error: {}" , e) ) ) ?;
154+ let num_rows = conn
155+ . execute ( TABLE_CHECK_STMT , & [ ] )
156+ . await
157+ . map_err ( |e| {
158+ Error :: new ( ErrorKind :: Other , format ! ( "Does the table exist? If not use --init-db {}" , e) )
159+ } ) ?;
160+ assert_eq ! ( num_rows, 0 ) ;
161+ Ok ( ( ) )
60162 }
61163
62164 fn build_vss_record ( & self , user_token : String , store_id : String , kv : KeyValue ) -> VssDbRecord {
@@ -413,7 +515,7 @@ mod tests {
413515 define_kv_store_tests ! (
414516 PostgresKvStoreTest ,
415517 PostgresBackendImpl ,
416- PostgresBackendImpl :: new( "postgresql://postgres:postgres@localhost:5432/ postgres" )
518+ PostgresBackendImpl :: new( "postgresql://postgres:postgres@localhost:5432" , " postgres", false )
417519 . await
418520 . unwrap( )
419521 ) ;
0 commit comments