@@ -24,6 +24,25 @@ pub struct OdbcConnection {
2424}
2525
2626impl OdbcConnection {
27+ #[ inline]
28+ async fn with_conn < T , F > ( & self , f : F ) -> Result < T , Error >
29+ where
30+ T : Send + ' static ,
31+ F : FnOnce ( & mut OdbcConn ) -> Result < T , Error > + Send + ' static ,
32+ {
33+ let inner = self . inner . clone ( ) ;
34+ run_blocking ( move || {
35+ let mut conn = inner. lock ( ) . unwrap ( ) ;
36+ f ( & mut conn)
37+ } )
38+ . await
39+ }
40+
41+ #[ inline]
42+ fn odbc_err < T , E : std:: fmt:: Display > ( res : Result < T , E > , ctx : & ' static str ) -> Result < T , Error > {
43+ res. map_err ( |e| Error :: Protocol ( format ! ( "{}: {}" , ctx, e) ) )
44+ }
45+
2746 pub ( crate ) async fn establish ( options : & OdbcConnectOptions ) -> Result < Self , Error > {
2847 let conn = run_blocking ( {
2948 let options = options. clone ( ) ;
@@ -40,56 +59,44 @@ impl OdbcConnection {
4059 /// Returns the name of the actual Database Management System (DBMS) this
4160 /// connection is talking to as reported by the ODBC driver.
4261 pub async fn dbms_name ( & mut self ) -> Result < String , Error > {
43- let inner = self . inner . clone ( ) ;
44- run_blocking ( move || {
45- let conn = inner . lock ( ) . unwrap ( ) ;
46- conn . database_management_system_name ( )
47- . map_err ( |e| Error :: Protocol ( format ! ( "Failed to get DBMS name: {}" , e ) ) )
62+ self . with_conn ( |conn| {
63+ Self :: odbc_err (
64+ conn . database_management_system_name ( ) ,
65+ "Failed to get DBMS name" ,
66+ )
4867 } )
4968 . await
5069 }
5170
5271 pub ( crate ) async fn ping_blocking ( & mut self ) -> Result < ( ) , Error > {
53- let inner = self . inner . clone ( ) ;
54- run_blocking ( move || {
55- let conn = inner. lock ( ) . unwrap ( ) ;
56- let res = conn. execute ( "SELECT 1" , ( ) , None ) ;
57- match res {
58- Ok ( _) => Ok ( ( ) ) ,
59- Err ( e) => Err ( Error :: Protocol ( format ! ( "Ping failed: {}" , e) ) ) ,
60- }
72+ self . with_conn ( |conn| {
73+ Self :: odbc_err ( conn. execute ( "SELECT 1" , ( ) , None ) , "Ping failed" ) ?;
74+ Ok ( ( ) )
6175 } )
6276 . await
6377 }
6478
6579 pub ( crate ) async fn begin_blocking ( & mut self ) -> Result < ( ) , Error > {
66- let inner = self . inner . clone ( ) ;
67- run_blocking ( move || {
68- let conn = inner. lock ( ) . unwrap ( ) ;
69- conn. set_autocommit ( false )
70- . map_err ( |e| Error :: Protocol ( format ! ( "Failed to begin transaction: {}" , e) ) )
80+ self . with_conn ( |conn| {
81+ Self :: odbc_err ( conn. set_autocommit ( false ) , "Failed to begin transaction" )
7182 } )
7283 . await
7384 }
7485
7586 pub ( crate ) async fn commit_blocking ( & mut self ) -> Result < ( ) , Error > {
76- let inner = self . inner . clone ( ) ;
77- run_blocking ( move || {
78- let conn = inner. lock ( ) . unwrap ( ) ;
79- conn. commit ( )
80- . and_then ( |_| conn. set_autocommit ( true ) )
81- . map_err ( |e| Error :: Protocol ( format ! ( "Failed to commit transaction: {}" , e) ) )
87+ self . with_conn ( |conn| {
88+ Self :: odbc_err ( conn. commit ( ) , "Failed to commit transaction" ) ?;
89+ Self :: odbc_err ( conn. set_autocommit ( true ) , "Failed to commit transaction" ) ?;
90+ Ok ( ( ) )
8291 } )
8392 . await
8493 }
8594
8695 pub ( crate ) async fn rollback_blocking ( & mut self ) -> Result < ( ) , Error > {
87- let inner = self . inner . clone ( ) ;
88- run_blocking ( move || {
89- let conn = inner. lock ( ) . unwrap ( ) ;
90- conn. rollback ( )
91- . and_then ( |_| conn. set_autocommit ( true ) )
92- . map_err ( |e| Error :: Protocol ( format ! ( "Failed to rollback transaction: {}" , e) ) )
96+ self . with_conn ( |conn| {
97+ Self :: odbc_err ( conn. rollback ( ) , "Failed to rollback transaction" ) ?;
98+ Self :: odbc_err ( conn. set_autocommit ( true ) , "Failed to rollback transaction" ) ?;
99+ Ok ( ( ) )
93100 } )
94101 . await
95102 }
@@ -100,11 +107,10 @@ impl OdbcConnection {
100107 args : Option < OdbcArguments > ,
101108 ) -> Result < flume:: Receiver < Result < Either < OdbcQueryResult , OdbcRow > , Error > > , Error > {
102109 let ( tx, rx) = flume:: bounded ( 64 ) ;
103- let inner = self . inner . clone ( ) ;
104110 let sql = sql. to_string ( ) ;
105- run_blocking ( move || {
106- let mut guard = inner . lock ( ) . unwrap ( ) ;
107- if let Err ( e) = execute_sql ( & mut guard , & sql, args , & tx) {
111+ let args_move = args ;
112+ self . with_conn ( move |conn| {
113+ if let Err ( e) = execute_sql ( conn , & sql, args_move , & tx) {
108114 let _ = tx. send ( Err ( e) ) ;
109115 }
110116 Ok ( ( ) )
@@ -117,9 +123,9 @@ impl OdbcConnection {
117123 & mut self ,
118124 sql : & str ,
119125 ) -> Result < ( u64 , Vec < OdbcColumn > , usize ) , Error > {
120- let inner = self . inner . clone ( ) ;
121126 let sql = sql. to_string ( ) ;
122- run_blocking ( move || do_prepare ( & mut inner. lock ( ) . unwrap ( ) , sql. into ( ) ) ) . await
127+ self . with_conn ( move |conn| do_prepare ( conn, sql. into ( ) ) )
128+ . await
123129 }
124130}
125131
0 commit comments