1010
1111use crate :: graph:: ConnectionPoolManager :: Direct ;
1212use crate :: pool:: ManagedConnection ;
13+ use crate :: query:: RetryableQuery ;
14+ use crate :: retry:: Retry ;
1315use crate :: RunResult ;
1416use crate :: {
1517 config:: { Config , ConfigBuilder , Database , LiveConfig } ,
@@ -20,27 +22,27 @@ use crate::{
2022 txn:: Txn ,
2123 Operation ,
2224} ;
23- use backoff :: { Error , ExponentialBackoff } ;
25+ use backon :: { ExponentialBuilder , RetryableWithContext } ;
2426use std:: time:: Duration ;
2527
2628#[ derive( Clone ) ]
27- enum ConnectionPoolManager {
29+ pub ( crate ) enum ConnectionPoolManager {
2830 #[ cfg( feature = "unstable-bolt-protocol-impl-v2" ) ]
2931 Routed ( RoutedConnectionManager ) ,
3032 Direct ( ConnectionPool ) ,
3133}
3234
3335impl ConnectionPoolManager {
3436 #[ allow( unused_variables) ]
35- async fn get ( & self , operation : Option < Operation > ) -> Result < ManagedConnection > {
37+ pub ( crate ) async fn get ( & self , operation : Option < Operation > ) -> Result < ManagedConnection > {
3638 match self {
3739 #[ cfg( feature = "unstable-bolt-protocol-impl-v2" ) ]
3840 Routed ( manager) => manager. get ( operation) . await ,
3941 Direct ( pool) => pool. get ( ) . await . map_err ( crate :: Error :: from) ,
4042 }
4143 }
4244
43- fn backoff ( & self ) -> ExponentialBackoff {
45+ fn backoff ( & self ) -> ExponentialBuilder {
4446 match self {
4547 #[ cfg( feature = "unstable-bolt-protocol-impl-v2" ) ]
4648 Routed ( manager) => manager. backoff ( ) ,
@@ -164,7 +166,7 @@ impl Graph {
164166 operation : Operation ,
165167 bookmarks : & [ String ] ,
166168 ) -> Result < Txn > {
167- let connection = self . pool . get ( Some ( operation. clone ( ) ) ) . await ?;
169+ let connection = self . pool . get ( Some ( operation) ) . await ?;
168170 #[ cfg( feature = "unstable-bolt-protocol-impl-v2" ) ]
169171 {
170172 Txn :: new ( db, self . config . fetch_size , connection, operation, bookmarks) . await
@@ -222,29 +224,18 @@ impl Graph {
222224 async fn impl_run_on (
223225 & self ,
224226 db : Option < Database > ,
225- q : Query ,
227+ query : Query ,
226228 operation : Operation ,
227229 ) -> Result < RunResult > {
228- let is_read = operation. is_read ( ) ;
229- let result = backoff:: future:: retry_notify (
230- self . pool . backoff ( ) ,
231- || {
232- let pool = & self . pool ;
233- let mut query = q. clone ( ) ;
234- let operation = operation. clone ( ) ;
235- if let Some ( db) = db. as_deref ( ) {
236- query = query. extra ( "db" , db) ;
237- }
238- query = query. extra ( "mode" , if is_read { "r" } else { "w" } ) ;
239- async move {
240- let mut connection =
241- pool. get ( Some ( operation) ) . await . map_err ( Error :: Permanent ) ?; // an error when retrieving a connection is considered permanent
242- query. run_retryable ( & mut connection) . await
243- }
244- } ,
245- Self :: log_retry,
246- )
247- . await ;
230+ let query = query. into_retryable ( db, operation, & self . pool , None ) ;
231+
232+ let ( query, result) = RetryableQuery :: retry_run
233+ . retry ( self . pool . backoff ( ) )
234+ . sleep ( tokio:: time:: sleep)
235+ . context ( query)
236+ . when ( |e| matches ! ( e, Retry :: Yes ( _) ) )
237+ . notify ( Self :: log_retry)
238+ . await ;
248239
249240 match result {
250241 Ok ( result) => {
@@ -257,7 +248,7 @@ impl Graph {
257248 }
258249 Direct ( _) => { }
259250 }
260- } else if is_read {
251+ } else if query . is_read ( ) {
261252 match & self . pool {
262253 Routed ( routed) => {
263254 debug ! ( "No bookmark received after a read operation, discarding all bookmarks" ) ;
@@ -269,7 +260,7 @@ impl Graph {
269260 }
270261 Ok ( result)
271262 }
272- Err ( e) => Err ( e) ,
263+ Err ( e) => Err ( e. into_inner ( ) ) ,
273264 }
274265 }
275266
@@ -331,32 +322,23 @@ impl Graph {
331322 async fn impl_execute_on (
332323 & self ,
333324 db : Option < Database > ,
334- q : Query ,
325+ query : Query ,
335326 operation : Operation ,
336327 ) -> Result < DetachedRowStream > {
337- backoff:: future:: retry_notify (
338- self . pool . backoff ( ) ,
339- || {
340- let pool = & self . pool ;
341- let mut query = q. clone ( ) ;
342- let operation = operation. clone ( ) ;
343- let fetch_size = self . config . fetch_size ;
344- if let Some ( db) = db. as_deref ( ) {
345- query = query. extra ( "db" , db) ;
346- }
347- let operation = operation. clone ( ) ;
348- query = query. param ( "mode" , if operation. is_read ( ) { "r" } else { "w" } ) ;
349- async move {
350- let connection = pool. get ( Some ( operation) ) . await . map_err ( Error :: Permanent ) ?; // an error when retrieving a connection is considered permanent
351- query. execute_retryable ( fetch_size, connection) . await
352- }
353- } ,
354- Self :: log_retry,
355- )
356- . await
328+ let query = query. into_retryable ( db, operation, & self . pool , Some ( self . config . fetch_size ) ) ;
329+
330+ let ( query, result) = RetryableQuery :: retry_execute
331+ . retry ( self . pool . backoff ( ) )
332+ . sleep ( tokio:: time:: sleep)
333+ . context ( query)
334+ . when ( |e| matches ! ( e, Retry :: Yes ( _) ) )
335+ . notify ( Self :: log_retry)
336+ . await ;
337+
338+ result. map_err ( Retry :: into_inner)
357339 }
358340
359- fn log_retry ( e : crate :: Error , delay : Duration ) {
341+ fn log_retry ( e : & Retry < crate :: Error > , delay : Duration ) {
360342 let level = match delay. as_millis ( ) {
361343 0 ..=499 => log:: Level :: Debug ,
362344 500 ..=4999 => log:: Level :: Info ,
0 commit comments