@@ -3,7 +3,7 @@ use std::{future::Future, time::Duration};
33use backon:: { ExponentialBuilder , Retryable } ;
44use sqlx:: { postgres:: PgPoolOptions , Pool , Postgres } ;
55
6- use crate :: retry:: RetryError ;
6+ use crate :: retry:: { RetryConfig , RetryError } ;
77
88#[ derive( Clone , Debug ) ]
99struct DbNode {
@@ -12,29 +12,45 @@ struct DbNode {
1212
1313pub struct DbOrchestartor {
1414 nodes : Vec < DbNode > ,
15+ retry_config : RetryConfig ,
16+ }
17+
18+ pub enum DbOrchestartorError {
19+ InvalidNumberOfConnectionUrls ,
20+ Sqlx ( sqlx:: Error ) ,
1521}
1622
1723impl DbOrchestartor {
18- pub fn try_new ( connection_urls : Vec < String > ) -> Result < Self , sqlx:: Error > {
19- // TODO: validate at least one connection url
24+ pub fn try_new (
25+ connection_urls : Vec < String > ,
26+ retry_config : RetryConfig ,
27+ ) -> Result < Self , DbOrchestartorError > {
28+ if connection_urls. is_empty ( ) {
29+ return Err ( DbOrchestartorError :: InvalidNumberOfConnectionUrls ) ;
30+ }
31+
2032 let nodes = connection_urls
2133 . into_iter ( )
2234 . map ( |url| {
2335 let pool = PgPoolOptions :: new ( ) . max_connections ( 5 ) . connect_lazy ( & url) ?;
2436
2537 Ok ( DbNode { pool } )
2638 } )
27- . collect :: < Result < Vec < _ > , sqlx:: Error > > ( ) ?;
39+ . collect :: < Result < Vec < _ > , sqlx:: Error > > ( )
40+ . map_err ( |e| DbOrchestartorError :: Sqlx ( e) ) ?;
2841
29- Ok ( Self { nodes } )
42+ Ok ( Self {
43+ nodes,
44+ retry_config,
45+ } )
3046 }
3147
3248 fn backoff_builder ( & self ) -> ExponentialBuilder {
3349 ExponentialBuilder :: default ( )
34- . with_min_delay ( Duration :: from_millis ( 0 ) )
35- . with_max_times ( 0 )
36- . with_factor ( 0.0 )
37- . with_max_delay ( Duration :: from_secs ( 0 ) )
50+ . with_min_delay ( Duration :: from_millis ( self . retry_config . min_delay_millis ) )
51+ . with_max_times ( self . retry_config . max_times )
52+ . with_factor ( self . retry_config . factor )
53+ . with_max_delay ( Duration :: from_secs ( self . retry_config . max_delay_seconds ) )
3854 }
3955
4056 pub async fn query < T , E , Q , Fut > ( & self , query_fn : Q ) -> Result < T , sqlx:: Error >
@@ -51,6 +67,7 @@ impl DbOrchestartor {
5167 Ok ( res) => return Ok ( res) ,
5268 Err ( err) => {
5369 if Self :: is_connection_error ( & err) {
70+ tracing:: warn!( node_index = idx, error = ?err, "database query failed; retrying" ) ;
5471 last_error = Some ( err) ;
5572 } else {
5673 return Err ( RetryError :: Permanent ( err) ) ;
@@ -81,6 +98,7 @@ impl DbOrchestartor {
8198 | sqlx:: Error :: PoolClosed
8299 | sqlx:: Error :: WorkerCrashed
83100 | sqlx:: Error :: BeginFailed
101+ | sqlx:: Error :: Database ( _)
84102 )
85103 }
86104}
0 commit comments