1- //! PostgreSQL backend for cold storage.
1+ //! Unified SQL backend for cold storage.
2+ //!
3+ //! Supports both PostgreSQL and SQLite via [`sqlx::Any`]. The backend
4+ //! auto-detects the database type at construction time and runs the
5+ //! appropriate migration.
26
37use crate :: SqlColdError ;
48use crate :: convert:: {
@@ -7,43 +11,80 @@ use crate::convert::{
711} ;
812use alloy:: { consensus:: Header , primitives:: BlockNumber } ;
913use signet_cold:: {
10- BlockData , BlockTag , ColdResult , ColdStorage , ColdStorageError , Confirmed , HeaderSpecifier ,
14+ BlockData , ColdResult , ColdStorage , ColdStorageError , Confirmed , HeaderSpecifier ,
1115 ReceiptSpecifier , SignetEventsSpecifier , TransactionSpecifier , ZenithHeaderSpecifier ,
1216} ;
1317use signet_storage_types:: {
1418 ConfirmationMeta , DbSignetEvent , DbZenithHeader , Receipt , TransactionSigned ,
1519} ;
16- use sqlx:: { PgPool , Row } ;
20+ use sqlx:: { AnyPool , Row } ;
1721
18- /// PostgreSQL -based cold storage backend.
22+ /// SQL -based cold storage backend.
1923///
20- /// Uses an `sqlx::PgPool` for connection management and connection pooling.
24+ /// Uses [`sqlx::Any`] for database-agnostic access, supporting both
25+ /// PostgreSQL and SQLite through a single implementation. The backend
26+ /// is determined by the connection URL at construction time.
2127///
2228/// # Example
2329///
2430/// ```no_run
2531/// # async fn example() {
26- /// use signet_cold_sql::PostgresColdBackend;
27- /// use sqlx::PgPool;
32+ /// use signet_cold_sql::SqlColdBackend;
2833///
29- /// let pool = PgPool::connect("postgres://localhost/signet").await.unwrap();
30- /// let backend = PostgresColdBackend::new(pool).await.unwrap();
34+ /// // SQLite (in-memory)
35+ /// let backend = SqlColdBackend::connect("sqlite::memory:").await.unwrap();
36+ ///
37+ /// // PostgreSQL
38+ /// let backend = SqlColdBackend::connect("postgres://localhost/signet").await.unwrap();
3139/// # }
3240/// ```
3341#[ derive( Debug , Clone ) ]
34- pub struct PostgresColdBackend {
35- pool : PgPool ,
42+ pub struct SqlColdBackend {
43+ pool : AnyPool ,
3644}
3745
38- impl PostgresColdBackend {
39- /// Create a new PostgreSQL cold storage backend.
46+ impl SqlColdBackend {
47+ /// Create a new SQL cold storage backend from an existing [`AnyPool`] .
4048 ///
41- /// Creates all tables if they do not already exist.
42- pub async fn new ( pool : PgPool ) -> Result < Self , SqlColdError > {
43- sqlx:: raw_sql ( include_str ! ( "../migrations/001_initial_pg.sql" ) ) . execute ( & pool) . await ?;
49+ /// Auto-detects the database backend and creates all tables if they
50+ /// do not already exist. Callers must ensure
51+ /// [`sqlx::any::install_default_drivers`] has been called before
52+ /// constructing the pool.
53+ pub async fn new ( pool : AnyPool ) -> Result < Self , SqlColdError > {
54+ // Detect backend from a pooled connection.
55+ let conn = pool. acquire ( ) . await ?;
56+ let backend = conn. backend_name ( ) . to_owned ( ) ;
57+ drop ( conn) ;
58+
59+ let migration = match backend. as_str ( ) {
60+ "PostgreSQL" => include_str ! ( "../migrations/001_initial_pg.sql" ) ,
61+ "SQLite" => include_str ! ( "../migrations/001_initial.sql" ) ,
62+ other => {
63+ return Err ( SqlColdError :: Convert ( format ! (
64+ "unsupported database backend: {other}"
65+ ) ) ) ;
66+ }
67+ } ;
68+ // Execute via pool to ensure the migration uses the same
69+ // connection that subsequent queries will use.
70+ sqlx:: raw_sql ( migration) . execute ( & pool) . await ?;
4471 Ok ( Self { pool } )
4572 }
4673
74+ /// Connect to a database URL and create the backend.
75+ ///
76+ /// Installs the default sqlx drivers on the first call. The database
77+ /// type is inferred from the URL scheme (`sqlite:` or `postgres:`).
78+ ///
79+ /// For SQLite in-memory databases (`sqlite::memory:`), the pool is
80+ /// limited to one connection to ensure all operations share the same
81+ /// database.
82+ pub async fn connect ( url : & str ) -> Result < Self , SqlColdError > {
83+ sqlx:: any:: install_default_drivers ( ) ;
84+ let pool: AnyPool = sqlx:: pool:: PoolOptions :: new ( ) . max_connections ( 1 ) . connect ( url) . await ?;
85+ Self :: new ( pool) . await
86+ }
87+
4788 // ========================================================================
4889 // Specifier resolution
4990 // ========================================================================
@@ -62,24 +103,9 @@ impl PostgresColdBackend {
62103 . await ?;
63104 Ok ( row. map ( |r| from_i64 ( r. get :: < i64 , _ > ( "block_number" ) ) ) )
64105 }
65- HeaderSpecifier :: Tag ( tag) => self . resolve_tag ( tag) . await ,
66106 }
67107 }
68108
69- async fn resolve_tag ( & self , tag : BlockTag ) -> Result < Option < BlockNumber > , SqlColdError > {
70- let key = match tag {
71- BlockTag :: Latest => "latest_block" ,
72- BlockTag :: Finalized => "finalized_block" ,
73- BlockTag :: Safe => "safe_block" ,
74- BlockTag :: Earliest => "earliest_block" ,
75- } ;
76- let row = sqlx:: query ( "SELECT block_number FROM metadata WHERE key = $1" )
77- . bind ( key)
78- . fetch_optional ( & self . pool )
79- . await ?;
80- Ok ( row. map ( |r| from_i64 ( r. get :: < i64 , _ > ( "block_number" ) ) ) )
81- }
82-
83109 async fn resolve_tx_spec (
84110 & self ,
85111 spec : TransactionSpecifier ,
@@ -231,7 +257,7 @@ impl PostgresColdBackend {
231257 block_number : rr. get ( "block_number" ) ,
232258 tx_index : rr. get ( "tx_index" ) ,
233259 tx_type : rr. get :: < i32 , _ > ( "tx_type" ) as i16 ,
234- success : rr. get :: < bool , _ > ( "success" ) ,
260+ success : rr. get :: < i32 , _ > ( "success" ) != 0 ,
235261 cumulative_gas_used : rr. get ( "cumulative_gas_used" ) ,
236262 } ;
237263
@@ -329,7 +355,7 @@ impl PostgresColdBackend {
329355 . bind ( tr. tx_index )
330356 . bind ( & tr. tx_hash )
331357 . bind ( tr. tx_type as i32 )
332- . bind ( tr. sig_y_parity )
358+ . bind ( tr. sig_y_parity as i32 )
333359 . bind ( & tr. sig_r )
334360 . bind ( & tr. sig_s )
335361 . bind ( tr. chain_id )
@@ -359,7 +385,7 @@ impl PostgresColdBackend {
359385 . bind ( rr. block_number )
360386 . bind ( rr. tx_index )
361387 . bind ( rr. tx_type as i32 )
362- . bind ( rr. success )
388+ . bind ( rr. success as i32 )
363389 . bind ( rr. cumulative_gas_used )
364390 . execute ( & mut * tx)
365391 . await ?;
@@ -468,13 +494,13 @@ impl PostgresColdBackend {
468494}
469495
470496/// Convert a sqlx row to a TxRow.
471- fn row_to_tx_row ( r : & sqlx:: postgres :: PgRow ) -> TxRow {
497+ fn row_to_tx_row ( r : & sqlx:: any :: AnyRow ) -> TxRow {
472498 TxRow {
473499 block_number : r. get ( "block_number" ) ,
474500 tx_index : r. get ( "tx_index" ) ,
475501 tx_hash : r. get ( "tx_hash" ) ,
476502 tx_type : r. get :: < i32 , _ > ( "tx_type" ) as i16 ,
477- sig_y_parity : r. get ( "sig_y_parity" ) ,
503+ sig_y_parity : r. get :: < i32 , _ > ( "sig_y_parity" ) != 0 ,
478504 sig_r : r. get ( "sig_r" ) ,
479505 sig_s : r. get ( "sig_s" ) ,
480506 chain_id : r. get ( "chain_id" ) ,
@@ -493,7 +519,7 @@ fn row_to_tx_row(r: &sqlx::postgres::PgRow) -> TxRow {
493519 }
494520}
495521
496- fn row_to_signet_event_row ( r : & sqlx:: postgres :: PgRow ) -> SignetEventRow {
522+ fn row_to_signet_event_row ( r : & sqlx:: any :: AnyRow ) -> SignetEventRow {
497523 SignetEventRow {
498524 block_number : r. get ( "block_number" ) ,
499525 event_index : r. get ( "event_index" ) ,
@@ -512,7 +538,7 @@ fn row_to_signet_event_row(r: &sqlx::postgres::PgRow) -> SignetEventRow {
512538 }
513539}
514540
515- fn row_to_zenith_header_row ( r : & sqlx:: postgres :: PgRow ) -> ZenithHeaderRow {
541+ fn row_to_zenith_header_row ( r : & sqlx:: any :: AnyRow ) -> ZenithHeaderRow {
516542 ZenithHeaderRow {
517543 block_number : r. get ( "block_number" ) ,
518544 host_block_number : r. get ( "host_block_number" ) ,
@@ -523,7 +549,7 @@ fn row_to_zenith_header_row(r: &sqlx::postgres::PgRow) -> ZenithHeaderRow {
523549 }
524550}
525551
526- impl ColdStorage for PostgresColdBackend {
552+ impl ColdStorage for SqlColdBackend {
527553 async fn get_header ( & self , spec : HeaderSpecifier ) -> ColdResult < Option < Header > > {
528554 let Some ( block_num) = self . resolve_header_spec ( spec) . await ? else {
529555 return Ok ( None ) ;
@@ -616,7 +642,7 @@ impl ColdStorage for PostgresColdBackend {
616642 block_number : rr. get ( "block_number" ) ,
617643 tx_index : tx_idx,
618644 tx_type : rr. get :: < i32 , _ > ( "tx_type" ) as i16 ,
619- success : rr. get :: < bool , _ > ( "success" ) ,
645+ success : rr. get :: < i32 , _ > ( "success" ) != 0 ,
620646 cumulative_gas_used : rr. get ( "cumulative_gas_used" ) ,
621647 } ;
622648
@@ -738,7 +764,12 @@ impl ColdStorage for PostgresColdBackend {
738764 }
739765
740766 async fn get_latest_block ( & self ) -> ColdResult < Option < BlockNumber > > {
741- self . resolve_tag ( BlockTag :: Latest ) . await . map_err ( ColdStorageError :: from)
767+ let row = sqlx:: query ( "SELECT block_number FROM metadata WHERE key = $1" )
768+ . bind ( "latest_block" )
769+ . fetch_optional ( & self . pool )
770+ . await
771+ . map_err ( SqlColdError :: from) ?;
772+ Ok ( row. map ( |r| from_i64 ( r. get :: < i64 , _ > ( "block_number" ) ) ) )
742773 }
743774
744775 async fn append_block ( & self , data : BlockData ) -> ColdResult < ( ) > {
@@ -825,13 +856,18 @@ mod tests {
825856 use signet_cold:: conformance:: conformance;
826857
827858 #[ tokio:: test]
828- async fn pg_backend_conformance ( ) {
859+ async fn sqlite_conformance ( ) {
860+ let backend = SqlColdBackend :: connect ( "sqlite::memory:" ) . await . unwrap ( ) ;
861+ conformance ( & backend) . await . unwrap ( ) ;
862+ }
863+
864+ #[ tokio:: test]
865+ async fn pg_conformance ( ) {
829866 let Ok ( url) = std:: env:: var ( "DATABASE_URL" ) else {
830867 eprintln ! ( "skipping pg conformance: DATABASE_URL not set" ) ;
831868 return ;
832869 } ;
833- let pool = PgPool :: connect ( & url) . await . unwrap ( ) ;
834- let backend = PostgresColdBackend :: new ( pool) . await . unwrap ( ) ;
870+ let backend = SqlColdBackend :: connect ( & url) . await . unwrap ( ) ;
835871 conformance ( & backend) . await . unwrap ( ) ;
836872 }
837873}
0 commit comments