@@ -5,7 +5,9 @@ use futures_channel::oneshot;
55use futures_intrusive:: sync:: Mutex ;
66
77use crate :: error:: Error ;
8- use crate :: odbc:: OdbcConnectOptions ;
8+ use crate :: odbc:: { OdbcColumn , OdbcConnectOptions , OdbcQueryResult , OdbcRow , OdbcTypeInfo } ;
9+ use either:: Either ;
10+ use odbc_api:: Cursor ;
911
1012#[ derive( Debug ) ]
1113pub ( crate ) struct ConnectionWorker {
@@ -34,6 +36,10 @@ enum Command {
3436 Rollback {
3537 tx : oneshot:: Sender < Result < ( ) , Error > > ,
3638 } ,
39+ Execute {
40+ sql : Box < str > ,
41+ tx : flume:: Sender < Result < Either < OdbcQueryResult , OdbcRow > , Error > > ,
42+ } ,
3743}
3844
3945impl ConnectionWorker {
@@ -119,6 +125,60 @@ impl ConnectionWorker {
119125 let _ = tx. send ( ( ) ) ;
120126 return ;
121127 }
128+ Command :: Execute { sql, tx } => {
129+ // Helper closure to process using a given connection reference
130+ let process = |conn : & odbc_api:: Connection < ' static > | {
131+ match conn. execute ( & sql, ( ) , None ) {
132+ Ok ( Some ( mut cursor) ) => {
133+ use odbc_api:: ResultSetMetadata ;
134+ let mut columns: Vec < OdbcColumn > = Vec :: new ( ) ;
135+ if let Ok ( count) = cursor. num_result_cols ( ) {
136+ for i in 1 ..=count {
137+ let mut cd = odbc_api:: ColumnDescription :: default ( ) ;
138+ let _ = cursor. describe_col ( i as u16 , & mut cd) ;
139+ let name = String :: from_utf8 ( cd. name )
140+ . unwrap_or_else ( |_| format ! ( "col{}" , i - 1 ) ) ;
141+ columns. push ( OdbcColumn {
142+ name,
143+ type_info : OdbcTypeInfo { name : format ! ( "{:?}" , cd. data_type) , is_null : false } ,
144+ ordinal : ( i - 1 ) as usize ,
145+ } ) ;
146+ }
147+ }
148+
149+ while let Ok ( Some ( mut row) ) = cursor. next_row ( ) {
150+ let mut values: Vec < ( OdbcTypeInfo , Option < Vec < u8 > > ) > = Vec :: with_capacity ( columns. len ( ) ) ;
151+ for i in 1 ..=columns. len ( ) {
152+ let mut buf = Vec :: new ( ) ;
153+ match row. get_text ( i as u16 , & mut buf) {
154+ Ok ( true ) => values. push ( ( OdbcTypeInfo { name : "TEXT" . into ( ) , is_null : false } , Some ( buf) ) ) ,
155+ Ok ( false ) => values. push ( ( OdbcTypeInfo { name : "TEXT" . into ( ) , is_null : true } , None ) ) ,
156+ Err ( e) => {
157+ let _ = tx. send ( Err ( Error :: from ( e) ) ) ;
158+ return ;
159+ }
160+ }
161+ }
162+ let _ = tx. send ( Ok ( Either :: Right ( OdbcRow { columns : columns. clone ( ) , values } ) ) ) ;
163+ }
164+ let _ = tx. send ( Ok ( Either :: Left ( OdbcQueryResult { rows_affected : 0 } ) ) ) ;
165+ }
166+ Ok ( None ) => {
167+ let _ = tx. send ( Ok ( Either :: Left ( OdbcQueryResult { rows_affected : 0 } ) ) ) ;
168+ }
169+ Err ( e) => {
170+ let _ = tx. send ( Err ( Error :: from ( e) ) ) ;
171+ }
172+ }
173+ } ;
174+
175+ if let Some ( conn) = shared. conn . try_lock ( ) {
176+ process ( & conn) ;
177+ } else {
178+ let guard = futures_executor:: block_on ( shared. conn . lock ( ) ) ;
179+ process ( & guard) ;
180+ }
181+ }
122182 }
123183 }
124184 } ) ?;
@@ -173,4 +233,16 @@ impl ConnectionWorker {
173233 rx. await . map_err ( |_| Error :: WorkerCrashed ) ??;
174234 Ok ( ( ) )
175235 }
236+
237+ pub ( crate ) async fn execute_stream (
238+ & mut self ,
239+ sql : & str ,
240+ ) -> Result < flume:: Receiver < Result < Either < OdbcQueryResult , OdbcRow > , Error > > , Error > {
241+ let ( tx, rx) = flume:: bounded ( 64 ) ;
242+ self . command_tx
243+ . send_async ( Command :: Execute { sql : sql. into ( ) , tx } )
244+ . await
245+ . map_err ( |_| Error :: WorkerCrashed ) ?;
246+ Ok ( rx)
247+ }
176248}
0 commit comments