Skip to content

Commit 328a558

Browse files
committed
refactor: Simplify argument handling in ODBC command execution
This commit refactors the ODBC command execution logic by consolidating the handling of SQL arguments. The `execute_stream` method now directly accepts an optional `OdbcArguments` type, streamlining the execution process and improving code clarity. Unused methods related to argument handling have been removed, enhancing maintainability.
1 parent 28852da commit 328a558

File tree

2 files changed

+11
-60
lines changed

2 files changed

+11
-60
lines changed

sqlx-core/src/odbc/connection/executor.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,9 @@ impl<'c> Executor<'c> for &'c mut OdbcConnection {
2222
E: Execute<'q, Self::Database> + 'q,
2323
{
2424
let sql = query.sql().to_string();
25-
let mut args = query.take_arguments();
25+
let args = query.take_arguments();
2626
Box::pin(try_stream! {
27-
let rx = if let Some(a) = args.take() {
28-
self.worker.execute_stream_with_args(&sql, a.values).await?
29-
} else {
30-
self.worker.execute_stream(&sql).await?
31-
};
27+
let rx = self.worker.execute_stream(&sql, args).await?;
3228
while let Ok(item) = rx.recv_async().await {
3329
r#yield!(item?);
3430
}

sqlx-core/src/odbc/connection/worker.rs

Lines changed: 9 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ use futures_intrusive::sync::Mutex;
66

77
use crate::error::Error;
88
use crate::odbc::{
9-
OdbcArgumentValue, OdbcColumn, OdbcConnectOptions, OdbcQueryResult, OdbcRow, OdbcTypeInfo,
9+
OdbcArgumentValue, OdbcArguments, OdbcColumn, OdbcConnectOptions, OdbcQueryResult, OdbcRow,
10+
OdbcTypeInfo,
1011
};
1112
#[allow(unused_imports)]
1213
use crate::row::Row as SqlxRow;
@@ -42,11 +43,7 @@ enum Command {
4243
},
4344
Execute {
4445
sql: Box<str>,
45-
tx: flume::Sender<Result<Either<OdbcQueryResult, OdbcRow>, Error>>,
46-
},
47-
ExecuteWithArgs {
48-
sql: Box<str>,
49-
args: Vec<OdbcArgumentValue>,
46+
args: Option<OdbcArguments>,
5047
tx: flume::Sender<Result<Either<OdbcQueryResult, OdbcRow>, Error>>,
5148
},
5249
}
@@ -112,13 +109,8 @@ impl ConnectionWorker {
112109
let _ = tx.send(());
113110
return;
114111
}
115-
Command::Execute { sql, tx } => {
116-
with_conn(&shared, |conn| execute_sql(conn, &sql, &tx));
117-
}
118-
Command::ExecuteWithArgs { sql, args, tx } => {
119-
with_conn(&shared, |conn| {
120-
execute_sql_with_params(conn, &sql, args, &tx)
121-
});
112+
Command::Execute { sql, args, tx } => {
113+
with_conn(&shared, |conn| execute_sql(conn, &sql, args, &tx));
122114
}
123115
}
124116
}
@@ -178,26 +170,11 @@ impl ConnectionWorker {
178170
pub(crate) async fn execute_stream(
179171
&mut self,
180172
sql: &str,
173+
args: Option<OdbcArguments>,
181174
) -> Result<flume::Receiver<Result<Either<OdbcQueryResult, OdbcRow>, Error>>, Error> {
182175
let (tx, rx) = flume::bounded(64);
183176
self.command_tx
184177
.send_async(Command::Execute {
185-
sql: sql.into(),
186-
tx,
187-
})
188-
.await
189-
.map_err(|_| Error::WorkerCrashed)?;
190-
Ok(rx)
191-
}
192-
193-
pub(crate) async fn execute_stream_with_args(
194-
&mut self,
195-
sql: &str,
196-
args: Vec<OdbcArgumentValue>,
197-
) -> Result<flume::Receiver<Result<Either<OdbcQueryResult, OdbcRow>, Error>>, Error> {
198-
let (tx, rx) = flume::bounded(64);
199-
self.command_tx
200-
.send_async(Command::ExecuteWithArgs {
201178
sql: sql.into(),
202179
args,
203180
tx,
@@ -232,40 +209,18 @@ fn exec_simple(shared: &Shared, sql: &str) -> Result<(), Error> {
232209
fn execute_sql(
233210
conn: &odbc_api::Connection<'static>,
234211
sql: &str,
212+
args: Option<OdbcArguments>,
235213
tx: &flume::Sender<Result<Either<OdbcQueryResult, OdbcRow>, Error>>,
236214
) {
237-
match conn.execute(sql, (), None) {
238-
Ok(Some(mut cursor)) => {
239-
let columns = collect_columns(&mut cursor);
240-
if let Err(e) = stream_rows(&mut cursor, &columns, tx) {
241-
let _ = tx.send(Err(e));
242-
return;
243-
}
244-
let _ = tx.send(Ok(Either::Left(OdbcQueryResult { rows_affected: 0 })));
245-
}
246-
Ok(None) => {
247-
let _ = tx.send(Ok(Either::Left(OdbcQueryResult { rows_affected: 0 })));
248-
}
249-
Err(e) => {
250-
let _ = tx.send(Err(Error::from(e)));
251-
}
252-
}
253-
}
254-
255-
fn execute_sql_with_params(
256-
conn: &odbc_api::Connection<'static>,
257-
sql: &str,
258-
args: Vec<OdbcArgumentValue>,
259-
tx: &flume::Sender<Result<Either<OdbcQueryResult, OdbcRow>, Error>>,
260-
) {
215+
let args = args.map(|a| a.values).unwrap_or_default();
261216
if args.is_empty() {
262217
dispatch_execute(conn, sql, (), tx);
263218
return;
264219
}
265220

266221
let mut params: Vec<Box<dyn odbc_api::parameter::InputParameter>> =
267222
Vec::with_capacity(args.len());
268-
for a in dbg!(args) {
223+
for a in args {
269224
params.push(to_param(a));
270225
}
271226
dispatch_execute(conn, sql, &params[..], tx);

0 commit comments

Comments
 (0)