@@ -12,8 +12,8 @@ use odbc_api::Cursor;
1212use std:: borrow:: Cow ;
1313
1414impl OdbcConnection {
15- async fn run < ' e > (
16- & ' e mut self ,
15+ async fn run < ' e , ' c : ' e > (
16+ & ' c mut self ,
1717 sql : & ' e str ,
1818 ) -> Result < impl futures_core:: Stream < Item = Result < Either < OdbcQueryResult , OdbcRow > , Error > > + ' e , Error > {
1919 let mut logger = QueryLogger :: new ( sql, self . log_settings . clone ( ) ) ;
@@ -71,11 +71,47 @@ impl<'c> Executor<'c> for &'c mut OdbcConnection {
7171 ' c : ' e ,
7272 E : Execute < ' q , Self :: Database > + ' q ,
7373 {
74- let sql = query. sql ( ) ;
74+ let sql = query. sql ( ) . to_string ( ) ;
75+ let shared = self . worker . shared . clone ( ) ;
76+ let settings = self . log_settings . clone ( ) ;
7577 Box :: pin ( try_stream ! {
76- let s = self . run( sql) . await ?;
77- futures_util:: pin_mut!( s) ;
78- while let Some ( v) = s. try_next( ) . await ? { r#yield!( v) ; }
78+ let mut logger = QueryLogger :: new( & sql, settings. clone( ) ) ;
79+ let guard = shared. conn. lock( ) . await ;
80+ match guard. execute( & sql, ( ) , None ) {
81+ Ok ( Some ( mut cursor) ) => {
82+ use odbc_api:: ResultSetMetadata ;
83+ let mut columns = Vec :: new( ) ;
84+ if let Ok ( count) = cursor. num_result_cols( ) {
85+ for i in 1 ..=count { // ODBC columns are 1-based
86+ let mut cd = odbc_api:: ColumnDescription :: default ( ) ;
87+ let _ = cursor. describe_col( i as u16 , & mut cd) ;
88+ let name = String :: from_utf8( cd. name) . unwrap_or_else( |_| format!( "col{}" , i-1 ) ) ;
89+ columns. push( OdbcColumn { name, type_info: OdbcTypeInfo { name: format!( "{:?}" , cd. data_type) , is_null: false } , ordinal: ( i-1 ) as usize } ) ;
90+ }
91+ }
92+ while let Some ( mut row) = cursor. next_row( ) . map_err( |e| Error :: from( e) ) ? {
93+ let mut values = Vec :: with_capacity( columns. len( ) ) ;
94+ for i in 1 ..=columns. len( ) {
95+ let mut buf = Vec :: new( ) ;
96+ let not_null = row. get_text( i as u16 , & mut buf) . map_err( |e| Error :: from( e) ) ?;
97+ if not_null {
98+ let ti = OdbcTypeInfo { name: "TEXT" . into( ) , is_null: false } ;
99+ values. push( ( ti, Some ( buf) ) ) ;
100+ } else {
101+ let ti = OdbcTypeInfo { name: "TEXT" . into( ) , is_null: true } ;
102+ values. push( ( ti, None ) ) ;
103+ }
104+ }
105+ logger. increment_rows_returned( ) ;
106+ r#yield!( Either :: Right ( OdbcRow { columns: columns. clone( ) , values } ) ) ;
107+ }
108+ r#yield!( Either :: Left ( OdbcQueryResult { rows_affected: 0 } ) ) ;
109+ }
110+ Ok ( None ) => {
111+ r#yield!( Either :: Left ( OdbcQueryResult { rows_affected: 0 } ) ) ;
112+ }
113+ Err ( e) => return Err ( Error :: from( e) ) ,
114+ }
79115 Ok ( ( ) )
80116 } )
81117 }
0 commit comments