@@ -5,7 +5,7 @@ use futures_channel::oneshot;
55use futures_intrusive:: sync:: Mutex ;
66
77use crate :: error:: Error ;
8- use crate :: odbc:: { OdbcColumn , OdbcConnectOptions , OdbcQueryResult , OdbcRow , OdbcTypeInfo } ;
8+ use crate :: odbc:: { OdbcArgumentValue , OdbcColumn , OdbcConnectOptions , OdbcQueryResult , OdbcRow , OdbcTypeInfo } ;
99use either:: Either ;
1010use odbc_api:: Cursor ;
1111
@@ -40,6 +40,11 @@ enum Command {
4040 sql : Box < str > ,
4141 tx : flume:: Sender < Result < Either < OdbcQueryResult , OdbcRow > , Error > > ,
4242 } ,
43+ ExecuteWithArgs {
44+ sql : Box < str > ,
45+ args : Vec < OdbcArgumentValue < ' static > > ,
46+ tx : flume:: Sender < Result < Either < OdbcQueryResult , OdbcRow > , Error > > ,
47+ } ,
4348}
4449
4550impl ConnectionWorker {
@@ -204,6 +209,76 @@ impl ConnectionWorker {
204209 process ( & guard) ;
205210 }
206211 }
212+ Command :: ExecuteWithArgs { sql, args, tx } => {
213+ let process = |conn : & odbc_api:: Connection < ' static > | {
214+ // Fallback: if parameter API is unavailable, execute interpolated SQL directly
215+ match conn. execute ( & sql, ( ) , None ) {
216+ Ok ( Some ( mut cursor) ) => {
217+ use odbc_api:: ResultSetMetadata ;
218+ let mut columns: Vec < OdbcColumn > = Vec :: new ( ) ;
219+ if let Ok ( count) = cursor. num_result_cols ( ) {
220+ for i in 1 ..=count {
221+ let mut cd = odbc_api:: ColumnDescription :: default ( ) ;
222+ let _ = cursor. describe_col ( i as u16 , & mut cd) ;
223+ let name = String :: from_utf8 ( cd. name )
224+ . unwrap_or_else ( |_| format ! ( "col{}" , i - 1 ) ) ;
225+ columns. push ( OdbcColumn {
226+ name,
227+ type_info : OdbcTypeInfo {
228+ name : format ! ( "{:?}" , cd. data_type) ,
229+ is_null : false ,
230+ } ,
231+ ordinal : ( i - 1 ) as usize ,
232+ } ) ;
233+ }
234+ }
235+ while let Ok ( Some ( mut row) ) = cursor. next_row ( ) {
236+ let mut values: Vec < ( OdbcTypeInfo , Option < Vec < u8 > > ) > =
237+ Vec :: with_capacity ( columns. len ( ) ) ;
238+ for i in 1 ..=columns. len ( ) {
239+ let mut buf = Vec :: new ( ) ;
240+ // Try text first, then fallback to binary, then numeric
241+ if let Ok ( true ) = row. get_text ( i as u16 , & mut buf) {
242+ values. push ( ( OdbcTypeInfo { name : "TEXT" . into ( ) , is_null : false } , Some ( buf) ) ) ;
243+ } else if let Ok ( false ) = row. get_text ( i as u16 , & mut buf) {
244+ values. push ( ( OdbcTypeInfo { name : "TEXT" . into ( ) , is_null : true } , None ) ) ;
245+ } else if let Ok ( bytes) = row. get_binary ( i as u16 ) {
246+ values. push ( ( OdbcTypeInfo { name : "BLOB" . into ( ) , is_null : false } , Some ( bytes. unwrap_or_default ( ) ) ) ) ;
247+ } else if let Ok ( opt) = row. get_data :: < i64 > ( i as u16 ) {
248+ if let Some ( num) = opt {
249+ values. push ( ( OdbcTypeInfo { name : "INT" . into ( ) , is_null : false } , Some ( num. to_string ( ) . into_bytes ( ) ) ) ) ;
250+ } else {
251+ values. push ( ( OdbcTypeInfo { name : "INT" . into ( ) , is_null : true } , None ) ) ;
252+ }
253+ } else if let Ok ( opt) = row. get_data :: < f64 > ( i as u16 ) {
254+ if let Some ( num) = opt {
255+ values. push ( ( OdbcTypeInfo { name : "DOUBLE" . into ( ) , is_null : false } , Some ( num. to_string ( ) . into_bytes ( ) ) ) ) ;
256+ } else {
257+ values. push ( ( OdbcTypeInfo { name : "DOUBLE" . into ( ) , is_null : true } , None ) ) ;
258+ }
259+ } else {
260+ values. push ( ( OdbcTypeInfo { name : "UNKNOWN" . into ( ) , is_null : true } , None ) ) ;
261+ }
262+ }
263+ let _ = tx. send ( Ok ( Either :: Right ( OdbcRow { columns : columns. clone ( ) , values } ) ) ) ;
264+ }
265+ let _ = tx. send ( Ok ( Either :: Left ( OdbcQueryResult { rows_affected : 0 } ) ) ) ;
266+ }
267+ Ok ( None ) => {
268+ let _ = tx. send ( Ok ( Either :: Left ( OdbcQueryResult { rows_affected : 0 } ) ) ) ;
269+ }
270+ Err ( e) => {
271+ let _ = tx. send ( Err ( Error :: from ( e) ) ) ;
272+ }
273+ }
274+ } ;
275+ if let Some ( conn) = shared. conn . try_lock ( ) {
276+ process ( & conn) ;
277+ } else {
278+ let guard = futures_executor:: block_on ( shared. conn . lock ( ) ) ;
279+ process ( & guard) ;
280+ }
281+ }
207282 }
208283 }
209284 } ) ?;
@@ -273,4 +348,21 @@ impl ConnectionWorker {
273348 . map_err ( |_| Error :: WorkerCrashed ) ?;
274349 Ok ( rx)
275350 }
351+
352+ pub ( crate ) async fn execute_stream_with_args (
353+ & mut self ,
354+ sql : & str ,
355+ args : Vec < OdbcArgumentValue < ' static > > ,
356+ ) -> Result < flume:: Receiver < Result < Either < OdbcQueryResult , OdbcRow > , Error > > , Error > {
357+ let ( tx, rx) = flume:: bounded ( 64 ) ;
358+ self . command_tx
359+ . send_async ( Command :: ExecuteWithArgs {
360+ sql : sql. into ( ) ,
361+ args,
362+ tx,
363+ } )
364+ . await
365+ . map_err ( |_| Error :: WorkerCrashed ) ?;
366+ Ok ( rx)
367+ }
276368}
0 commit comments