@@ -11,7 +11,7 @@ use crate::odbc::{
1111#[ allow( unused_imports) ]
1212use crate :: row:: Row as SqlxRow ;
1313use either:: Either ;
14- use odbc_api:: { Cursor , CursorRow , ResultSetMetadata } ;
14+ use odbc_api:: { Cursor , CursorRow , IntoParameter , ResultSetMetadata } ;
1515
1616#[ derive( Debug ) ]
1717pub ( crate ) struct ConnectionWorker {
@@ -115,12 +115,10 @@ impl ConnectionWorker {
115115 Command :: Execute { sql, tx } => {
116116 with_conn ( & shared, |conn| execute_sql ( conn, & sql, & tx) ) ;
117117 }
118- Command :: ExecuteWithArgs {
119- sql,
120- args : _args,
121- tx,
122- } => {
123- with_conn ( & shared, |conn| execute_sql ( conn, & sql, & tx) ) ;
118+ Command :: ExecuteWithArgs { sql, args, tx } => {
119+ with_conn ( & shared, |conn| {
120+ execute_sql_with_params ( conn, & sql, args, & tx)
121+ } ) ;
124122 }
125123 }
126124 }
@@ -254,6 +252,65 @@ fn execute_sql(
254252 }
255253}
256254
255+ fn execute_sql_with_params (
256+ conn : & odbc_api:: Connection < ' static > ,
257+ sql : & str ,
258+ args : Vec < OdbcArgumentValue < ' static > > ,
259+ tx : & flume:: Sender < Result < Either < OdbcQueryResult , OdbcRow > , Error > > ,
260+ ) {
261+ if args. is_empty ( ) {
262+ dispatch_execute ( conn, sql, ( ) , tx) ;
263+ return ;
264+ }
265+
266+ let mut params: Vec < Box < dyn odbc_api:: parameter:: InputParameter > > =
267+ Vec :: with_capacity ( args. len ( ) ) ;
268+ for a in args {
269+ params. push ( to_param ( a) ) ;
270+ }
271+ dispatch_execute ( conn, sql, & params[ ..] , tx) ;
272+ }
273+
274+ fn to_param (
275+ arg : OdbcArgumentValue < ' static > ,
276+ ) -> Box < dyn odbc_api:: parameter:: InputParameter + ' static > {
277+ match arg {
278+ OdbcArgumentValue :: Int ( i) => Box :: new ( i. into_parameter ( ) ) ,
279+ OdbcArgumentValue :: Float ( f) => Box :: new ( f. into_parameter ( ) ) ,
280+ OdbcArgumentValue :: Text ( s) => Box :: new ( s. into_parameter ( ) ) ,
281+ OdbcArgumentValue :: Bytes ( b) => Box :: new ( b. into_parameter ( ) ) ,
282+ OdbcArgumentValue :: Null | OdbcArgumentValue :: Phantom ( _) => {
283+ Box :: new ( Option :: < i32 > :: None . into_parameter ( ) )
284+ }
285+ }
286+ }
287+
288+ fn dispatch_execute < P > (
289+ conn : & odbc_api:: Connection < ' static > ,
290+ sql : & str ,
291+ params : P ,
292+ tx : & flume:: Sender < Result < Either < OdbcQueryResult , OdbcRow > , Error > > ,
293+ ) where
294+ P : odbc_api:: ParameterCollectionRef ,
295+ {
296+ match conn. execute ( sql, params, None ) {
297+ Ok ( Some ( mut cursor) ) => {
298+ let columns = collect_columns ( & mut cursor) ;
299+ if let Err ( e) = stream_rows ( & mut cursor, & columns, tx) {
300+ let _ = tx. send ( Err ( e) ) ;
301+ return ;
302+ }
303+ let _ = tx. send ( Ok ( Either :: Left ( OdbcQueryResult { rows_affected : 0 } ) ) ) ;
304+ }
305+ Ok ( None ) => {
306+ let _ = tx. send ( Ok ( Either :: Left ( OdbcQueryResult { rows_affected : 0 } ) ) ) ;
307+ }
308+ Err ( e) => {
309+ let _ = tx. send ( Err ( Error :: from ( e) ) ) ;
310+ }
311+ }
312+ }
313+
257314fn collect_columns < C > ( cursor : & mut C ) -> Vec < OdbcColumn >
258315where
259316 C : ResultSetMetadata ,
0 commit comments