@@ -2,63 +2,18 @@ use crate::describe::Describe;
22use crate :: error:: Error ;
33use crate :: executor:: { Execute , Executor } ;
44use crate :: logger:: QueryLogger ;
5- use crate :: odbc:: { Odbc , OdbcColumn , OdbcConnection , OdbcQueryResult , OdbcRow , OdbcStatement , OdbcTypeInfo } ;
5+ use crate :: odbc:: {
6+ Odbc , OdbcColumn , OdbcConnection , OdbcQueryResult , OdbcRow , OdbcStatement , OdbcTypeInfo ,
7+ } ;
68use either:: Either ;
79use futures_core:: future:: BoxFuture ;
810use futures_core:: stream:: BoxStream ;
911use futures_util:: TryStreamExt ;
10- use std:: pin:: Pin ;
1112use odbc_api:: Cursor ;
1213use std:: borrow:: Cow ;
14+ use std:: pin:: Pin ;
1315
14- impl OdbcConnection {
15- async fn run < ' e , ' c : ' e > (
16- & ' c mut self ,
17- sql : & ' e str ,
18- ) -> Result < impl futures_core:: Stream < Item = Result < Either < OdbcQueryResult , OdbcRow > , Error > > + ' e , Error > {
19- let mut logger = QueryLogger :: new ( sql, self . log_settings . clone ( ) ) ;
20-
21- Ok ( Box :: pin ( try_stream ! {
22- let guard = self . worker. shared. conn. lock( ) . await ;
23- match guard. execute( sql, ( ) , None ) {
24- Ok ( Some ( mut cursor) ) => {
25- use odbc_api:: ResultSetMetadata ;
26- let mut columns = Vec :: new( ) ;
27- if let Ok ( count) = cursor. num_result_cols( ) {
28- for i in 1 ..=count { // ODBC columns are 1-based
29- let mut cd = odbc_api:: ColumnDescription :: default ( ) ;
30- let _ = cursor. describe_col( i as u16 , & mut cd) ;
31- let name = String :: from_utf8( cd. name) . unwrap_or_else( |_| format!( "col{}" , i-1 ) ) ;
32- columns. push( OdbcColumn { name, type_info: OdbcTypeInfo { name: format!( "{:?}" , cd. data_type) , is_null: false } , ordinal: ( i-1 ) as usize } ) ;
33- }
34- }
35- while let Some ( mut row) = cursor. next_row( ) . map_err( |e| Error :: from( e) ) ? {
36- let mut values = Vec :: with_capacity( columns. len( ) ) ;
37- for i in 1 ..=columns. len( ) {
38- let mut buf = Vec :: new( ) ;
39- let not_null = row. get_text( i as u16 , & mut buf) . map_err( |e| Error :: from( e) ) ?;
40- if not_null {
41- let ti = OdbcTypeInfo { name: "TEXT" . into( ) , is_null: false } ;
42- values. push( ( ti, Some ( buf) ) ) ;
43- } else {
44- let ti = OdbcTypeInfo { name: "TEXT" . into( ) , is_null: true } ;
45- values. push( ( ti, None ) ) ;
46- }
47- }
48- logger. increment_rows_returned( ) ;
49- r#yield!( Either :: Right ( OdbcRow { columns: columns. clone( ) , values } ) ) ;
50- }
51- r#yield!( Either :: Left ( OdbcQueryResult { rows_affected: 0 } ) ) ;
52- }
53- Ok ( None ) => {
54- r#yield!( Either :: Left ( OdbcQueryResult { rows_affected: 0 } ) ) ;
55- }
56- Err ( e) => return Err ( Error :: from( e) ) ,
57- }
58- Ok ( ( ) )
59- } ) )
60- }
61- }
16+ // run method removed; fetch_many implements streaming directly
6217
6318impl < ' c > Executor < ' c > for & ' c mut OdbcConnection {
6419 type Database = Odbc ;
@@ -127,7 +82,9 @@ impl<'c> Executor<'c> for &'c mut OdbcConnection {
12782 let mut s = self . fetch_many ( query) ;
12883 Box :: pin ( async move {
12984 while let Some ( v) = s. try_next ( ) . await ? {
130- if let Either :: Right ( r) = v { return Ok ( Some ( r) ) ; }
85+ if let Either :: Right ( r) = v {
86+ return Ok ( Some ( r) ) ;
87+ }
13188 }
13289 Ok ( None )
13390 } )
@@ -143,7 +100,11 @@ impl<'c> Executor<'c> for &'c mut OdbcConnection {
143100 {
144101 Box :: pin ( async move {
145102 // Basic statement metadata: no parameter/column info without executing
146- Ok ( OdbcStatement { sql : Cow :: Borrowed ( sql) , columns : Vec :: new ( ) , parameters : 0 } )
103+ Ok ( OdbcStatement {
104+ sql : Cow :: Borrowed ( sql) ,
105+ columns : Vec :: new ( ) ,
106+ parameters : 0 ,
107+ } )
147108 } )
148109 }
149110
0 commit comments