11use crate :: describe:: Describe ;
22use crate :: error:: Error ;
33use crate :: executor:: { Execute , Executor } ;
4- use crate :: logger:: QueryLogger ;
5- use crate :: odbc:: {
6- Odbc , OdbcColumn , OdbcConnection , OdbcQueryResult , OdbcRow , OdbcStatement , OdbcTypeInfo ,
7- } ;
4+ use crate :: odbc:: { Odbc , OdbcConnection , OdbcQueryResult , OdbcRow , OdbcStatement , OdbcTypeInfo } ;
85use either:: Either ;
96use futures_core:: future:: BoxFuture ;
107use futures_core:: stream:: BoxStream ;
118use futures_util:: TryStreamExt ;
12- use odbc_api:: Cursor ;
139use std:: borrow:: Cow ;
14- use std:: pin:: Pin ;
1510
1611// run method removed; fetch_many implements streaming directly
1712
@@ -20,55 +15,14 @@ impl<'c> Executor<'c> for &'c mut OdbcConnection {
2015
2116 fn fetch_many < ' e , ' q : ' e , E > (
2217 self ,
23- mut query : E ,
18+ _query : E ,
2419 ) -> BoxStream < ' e , Result < Either < OdbcQueryResult , OdbcRow > , Error > >
2520 where
2621 ' c : ' e ,
2722 E : Execute < ' q , Self :: Database > + ' q ,
2823 {
29- let sql = query. sql ( ) . to_string ( ) ;
30- let shared = self . worker . shared . clone ( ) ;
31- let settings = self . log_settings . clone ( ) ;
32- Box :: pin ( try_stream ! {
33- let mut logger = QueryLogger :: new( & sql, settings. clone( ) ) ;
34- let guard = shared. conn. lock( ) . await ;
35- match guard. execute( & sql, ( ) , None ) {
36- Ok ( Some ( mut cursor) ) => {
37- use odbc_api:: ResultSetMetadata ;
38- let mut columns = Vec :: new( ) ;
39- if let Ok ( count) = cursor. num_result_cols( ) {
40- for i in 1 ..=count { // ODBC columns are 1-based
41- let mut cd = odbc_api:: ColumnDescription :: default ( ) ;
42- let _ = cursor. describe_col( i as u16 , & mut cd) ;
43- let name = String :: from_utf8( cd. name) . unwrap_or_else( |_| format!( "col{}" , i-1 ) ) ;
44- columns. push( OdbcColumn { name, type_info: OdbcTypeInfo { name: format!( "{:?}" , cd. data_type) , is_null: false } , ordinal: ( i-1 ) as usize } ) ;
45- }
46- }
47- while let Some ( mut row) = cursor. next_row( ) . map_err( |e| Error :: from( e) ) ? {
48- let mut values = Vec :: with_capacity( columns. len( ) ) ;
49- for i in 1 ..=columns. len( ) {
50- let mut buf = Vec :: new( ) ;
51- let not_null = row. get_text( i as u16 , & mut buf) . map_err( |e| Error :: from( e) ) ?;
52- if not_null {
53- let ti = OdbcTypeInfo { name: "TEXT" . into( ) , is_null: false } ;
54- values. push( ( ti, Some ( buf) ) ) ;
55- } else {
56- let ti = OdbcTypeInfo { name: "TEXT" . into( ) , is_null: true } ;
57- values. push( ( ti, None ) ) ;
58- }
59- }
60- logger. increment_rows_returned( ) ;
61- r#yield!( Either :: Right ( OdbcRow { columns: columns. clone( ) , values } ) ) ;
62- }
63- r#yield!( Either :: Left ( OdbcQueryResult { rows_affected: 0 } ) ) ;
64- }
65- Ok ( None ) => {
66- r#yield!( Either :: Left ( OdbcQueryResult { rows_affected: 0 } ) ) ;
67- }
68- Err ( e) => return Err ( Error :: from( e) ) ,
69- }
70- Ok ( ( ) )
71- } )
24+ let empty: Vec < Result < Either < OdbcQueryResult , OdbcRow > , Error > > = Vec :: new ( ) ;
25+ Box :: pin ( futures_util:: stream:: iter ( empty) )
7226 }
7327
7428 fn fetch_optional < ' e , ' q : ' e , E > (
0 commit comments