Skip to content

Commit 9d4e17a

Browse files
committed
refactor: Enhance ODBC command structure and utility functions
This commit introduces type aliases for commonly used types in the ODBC worker, streamlines command handling by replacing direct command sending with utility functions, and improves transaction command processing. The connection handling has also been updated to use a more consistent type definition, enhancing code clarity and maintainability.
1 parent 4f6adc1 commit 9d4e17a

File tree

1 file changed

+127
-89
lines changed

1 file changed

+127
-89
lines changed

sqlx-core/src/odbc/connection/worker.rs

Lines changed: 127 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,15 @@ use crate::row::Row as SqlxRow;
1212
use either::Either;
1313
use odbc_api::{Cursor, CursorRow, IntoParameter, ResultSetMetadata};
1414

15+
// Type aliases for commonly used types
16+
type OdbcConnection = odbc_api::Connection<'static>;
17+
type TransactionResult = Result<(), Error>;
18+
type TransactionSender = oneshot::Sender<TransactionResult>;
19+
type ExecuteResult = Result<Either<OdbcQueryResult, OdbcRow>, Error>;
20+
type ExecuteSender = flume::Sender<ExecuteResult>;
21+
type PrepareResult = Result<(u64, Vec<OdbcColumn>, usize), Error>;
22+
type PrepareSender = oneshot::Sender<PrepareResult>;
23+
1524
#[derive(Debug)]
1625
pub(crate) struct ConnectionWorker {
1726
command_tx: flume::Sender<Command>,
@@ -25,22 +34,22 @@ enum Command {
2534
tx: oneshot::Sender<()>,
2635
},
2736
Begin {
28-
tx: oneshot::Sender<Result<(), Error>>,
37+
tx: TransactionSender,
2938
},
3039
Commit {
31-
tx: oneshot::Sender<Result<(), Error>>,
40+
tx: TransactionSender,
3241
},
3342
Rollback {
34-
tx: oneshot::Sender<Result<(), Error>>,
43+
tx: TransactionSender,
3544
},
3645
Execute {
3746
sql: Box<str>,
3847
args: Option<OdbcArguments>,
39-
tx: flume::Sender<Result<Either<OdbcQueryResult, OdbcRow>, Error>>,
48+
tx: ExecuteSender,
4049
},
4150
Prepare {
4251
sql: Box<str>,
43-
tx: oneshot::Sender<Result<(u64, Vec<OdbcColumn>, usize), Error>>,
52+
tx: PrepareSender,
4453
},
4554
}
4655

@@ -59,35 +68,27 @@ impl ConnectionWorker {
5968

6069
pub(crate) async fn ping(&mut self) -> Result<(), Error> {
6170
let (tx, rx) = oneshot::channel();
62-
self.send_command(Command::Ping { tx }).await?;
63-
rx.await.map_err(|_| Error::WorkerCrashed)
71+
send_command_and_await(&self.command_tx, Command::Ping { tx }, rx).await
6472
}
6573

6674
pub(crate) async fn shutdown(&mut self) -> Result<(), Error> {
6775
let (tx, rx) = oneshot::channel();
68-
self.send_command(Command::Shutdown { tx }).await?;
69-
rx.await.map_err(|_| Error::WorkerCrashed)
76+
send_command_and_await(&self.command_tx, Command::Shutdown { tx }, rx).await
7077
}
7178

7279
pub(crate) async fn begin(&mut self) -> Result<(), Error> {
7380
let (tx, rx) = oneshot::channel();
74-
self.send_command(Command::Begin { tx }).await?;
75-
rx.await.map_err(|_| Error::WorkerCrashed)??;
76-
Ok(())
81+
send_transaction_command(&self.command_tx, Command::Begin { tx }, rx).await
7782
}
7883

7984
pub(crate) async fn commit(&mut self) -> Result<(), Error> {
8085
let (tx, rx) = oneshot::channel();
81-
self.send_command(Command::Commit { tx }).await?;
82-
rx.await.map_err(|_| Error::WorkerCrashed)??;
83-
Ok(())
86+
send_transaction_command(&self.command_tx, Command::Commit { tx }, rx).await
8487
}
8588

8689
pub(crate) async fn rollback(&mut self) -> Result<(), Error> {
8790
let (tx, rx) = oneshot::channel();
88-
self.send_command(Command::Rollback { tx }).await?;
89-
rx.await.map_err(|_| Error::WorkerCrashed)??;
90-
Ok(())
91+
send_transaction_command(&self.command_tx, Command::Rollback { tx }, rx).await
9192
}
9293

9394
pub(crate) async fn execute_stream(
@@ -96,12 +97,14 @@ impl ConnectionWorker {
9697
args: Option<OdbcArguments>,
9798
) -> Result<flume::Receiver<Result<Either<OdbcQueryResult, OdbcRow>, Error>>, Error> {
9899
let (tx, rx) = flume::bounded(64);
99-
self.send_command(Command::Execute {
100-
sql: sql.into(),
101-
args,
102-
tx,
103-
})
104-
.await?;
100+
self.command_tx
101+
.send_async(Command::Execute {
102+
sql: sql.into(),
103+
args,
104+
tx,
105+
})
106+
.await
107+
.map_err(|_| Error::WorkerCrashed)?;
105108
Ok(rx)
106109
}
107110

@@ -110,19 +113,15 @@ impl ConnectionWorker {
110113
sql: &str,
111114
) -> Result<(u64, Vec<OdbcColumn>, usize), Error> {
112115
let (tx, rx) = oneshot::channel();
113-
self.send_command(Command::Prepare {
114-
sql: sql.into(),
115-
tx,
116-
})
117-
.await?;
118-
rx.await.map_err(|_| Error::WorkerCrashed)?
119-
}
120-
121-
async fn send_command(&mut self, cmd: Command) -> Result<(), Error> {
122-
self.command_tx
123-
.send_async(cmd)
124-
.await
125-
.map_err(|_| Error::WorkerCrashed)
116+
send_command_and_await(
117+
&self.command_tx,
118+
Command::Prepare {
119+
sql: sql.into(),
120+
tx,
121+
},
122+
rx,
123+
)
124+
.await?
126125
}
127126
}
128127

@@ -160,26 +159,70 @@ fn worker_thread_main(
160159
}
161160
}
162161

163-
fn establish_connection(
164-
options: &OdbcConnectOptions,
165-
) -> Result<odbc_api::Connection<'static>, Error> {
162+
fn establish_connection(options: &OdbcConnectOptions) -> Result<OdbcConnection, Error> {
166163
// Create environment and connect. We leak the environment to extend its lifetime
167164
// to 'static, as ODBC connection borrows it. This is acceptable for long-lived
168165
// process and mirrors SQLite approach to background workers.
169166
let env = Box::leak(Box::new(
170167
odbc_api::Environment::new().map_err(|e| Error::Configuration(e.to_string().into()))?,
171168
));
172169

173-
env.connect_with_connection_string(options.connection_string(), Default::default())
174-
.map_err(|e| Error::Configuration(e.to_string().into()))
170+
let conn = env
171+
.connect_with_connection_string(options.connection_string(), Default::default())
172+
.map_err(|e| Error::Configuration(e.to_string().into()))?;
173+
174+
Ok(conn)
175+
}
176+
177+
// Utility functions for channel operations
178+
fn send_result<T: std::fmt::Debug>(tx: oneshot::Sender<T>, result: T) {
179+
tx.send(result).expect("The odbc worker thread has crashed");
180+
}
181+
182+
fn send_stream_result(tx: &ExecuteSender, result: ExecuteResult) {
183+
tx.send(result).expect("The odbc worker thread has crashed");
184+
}
185+
186+
async fn send_command_and_await<T>(
187+
command_tx: &flume::Sender<Command>,
188+
cmd: Command,
189+
rx: oneshot::Receiver<T>,
190+
) -> Result<T, Error> {
191+
command_tx
192+
.send_async(cmd)
193+
.await
194+
.map_err(|_| Error::WorkerCrashed)?;
195+
rx.await.map_err(|_| Error::WorkerCrashed)
196+
}
197+
198+
async fn send_transaction_command(
199+
command_tx: &flume::Sender<Command>,
200+
cmd: Command,
201+
rx: oneshot::Receiver<TransactionResult>,
202+
) -> Result<(), Error> {
203+
send_command_and_await(command_tx, cmd, rx).await??;
204+
Ok(())
175205
}
176206

177-
fn process_command(cmd: Command, conn: &odbc_api::Connection<'static>) -> bool {
207+
// Utility functions for transaction operations
208+
fn execute_transaction_operation<F>(
209+
conn: &OdbcConnection,
210+
operation: F,
211+
operation_name: &str,
212+
) -> TransactionResult
213+
where
214+
F: FnOnce(&OdbcConnection) -> Result<(), odbc_api::Error>,
215+
{
216+
operation(conn)
217+
.map_err(|e| Error::Protocol(format!("Failed to {} transaction: {}", operation_name, e)))
218+
}
219+
220+
fn process_command(cmd: Command, conn: &OdbcConnection) -> bool {
178221
match cmd {
179222
Command::Ping { tx } => handle_ping(conn, tx),
180-
Command::Begin { tx } => handle_transaction(conn, "BEGIN", tx),
181-
Command::Commit { tx } => handle_transaction(conn, "COMMIT", tx),
182-
Command::Rollback { tx } => handle_transaction(conn, "ROLLBACK", tx),
223+
Command::Begin { tx } => handle_begin(conn, tx),
224+
Command::Commit { tx } => handle_commit(conn, tx),
225+
Command::Rollback { tx } => handle_rollback(conn, tx),
183226
Command::Shutdown { tx } => {
184227
let _ = tx.send(());
185228
return false; // Signal to exit the loop
@@ -191,34 +234,44 @@ fn process_command(cmd: Command, conn: &odbc_api::Connection<'static>) -> bool {
191234
}
192235

193236
// Command handlers
194-
fn handle_ping(conn: &odbc_api::Connection<'static>, tx: oneshot::Sender<()>) {
237+
fn handle_ping(conn: &OdbcConnection, tx: oneshot::Sender<()>) {
195238
let _ = conn.execute("SELECT 1", (), None);
196-
let _ = tx.send(());
239+
send_result(tx, ());
197240
}
198241

199-
fn handle_transaction(
200-
conn: &odbc_api::Connection<'static>,
201-
sql: &str,
202-
tx: oneshot::Sender<Result<(), Error>>,
203-
) {
204-
let result = execute_simple(conn, sql);
205-
let _ = tx.send(result);
242+
fn handle_begin(conn: &OdbcConnection, tx: TransactionSender) {
243+
let result = execute_transaction_operation(conn, |c| c.set_autocommit(false), "begin");
244+
send_result(tx, result);
245+
}
246+
247+
fn handle_commit(conn: &OdbcConnection, tx: TransactionSender) {
248+
let result = execute_transaction_operation(
249+
conn,
250+
|c| c.commit().and_then(|_| c.set_autocommit(true)),
251+
"commit",
252+
);
253+
send_result(tx, result);
254+
}
255+
256+
fn handle_rollback(conn: &OdbcConnection, tx: TransactionSender) {
257+
let result = execute_transaction_operation(
258+
conn,
259+
|c| c.rollback().and_then(|_| c.set_autocommit(true)),
260+
"rollback",
261+
);
262+
send_result(tx, result);
206263
}
207264

208265
fn handle_execute(
209-
conn: &odbc_api::Connection<'static>,
266+
conn: &OdbcConnection,
210267
sql: Box<str>,
211268
args: Option<OdbcArguments>,
212-
tx: flume::Sender<Result<Either<OdbcQueryResult, OdbcRow>, Error>>,
269+
tx: ExecuteSender,
213270
) {
214271
execute_sql(conn, &sql, args, &tx);
215272
}
216273

217-
fn handle_prepare(
218-
conn: &odbc_api::Connection<'static>,
219-
sql: Box<str>,
220-
tx: oneshot::Sender<Result<(u64, Vec<OdbcColumn>, usize), Error>>,
221-
) {
274+
fn handle_prepare(conn: &OdbcConnection, sql: Box<str>, tx: PrepareSender) {
222275
let result = match conn.prepare(&sql) {
223276
Ok(mut prepared) => {
224277
let columns = collect_columns(&mut prepared);
@@ -228,24 +281,19 @@ fn handle_prepare(
228281
Err(e) => Err(Error::from(e)),
229282
};
230283

231-
let _ = tx.send(result);
284+
send_result(tx, result);
232285
}
233286

234287
// Helper functions
235-
fn execute_simple(conn: &odbc_api::Connection<'static>, sql: &str) -> Result<(), Error> {
288+
fn execute_simple(conn: &OdbcConnection, sql: &str) -> Result<(), Error> {
236289
match conn.execute(sql, (), None) {
237290
Ok(_) => Ok(()),
238291
Err(e) => Err(Error::Configuration(e.to_string().into())),
239292
}
240293
}
241294

242295
// SQL execution functions
243-
fn execute_sql(
244-
conn: &odbc_api::Connection<'static>,
245-
sql: &str,
246-
args: Option<OdbcArguments>,
247-
tx: &flume::Sender<Result<Either<OdbcQueryResult, OdbcRow>, Error>>,
248-
) {
296+
fn execute_sql(conn: &OdbcConnection, sql: &str, args: Option<OdbcArguments>, tx: &ExecuteSender) {
249297
let params = prepare_parameters(args);
250298

251299
if params.is_empty() {
@@ -273,12 +321,8 @@ fn to_param(arg: OdbcArgumentValue) -> Box<dyn odbc_api::parameter::InputParamet
273321
}
274322

275323
// Dispatch functions
276-
fn dispatch_execute<P>(
277-
conn: &odbc_api::Connection<'static>,
278-
sql: &str,
279-
params: P,
280-
tx: &flume::Sender<Result<Either<OdbcQueryResult, OdbcRow>, Error>>,
281-
) where
324+
fn dispatch_execute<P>(conn: &OdbcConnection, sql: &str, params: P, tx: &ExecuteSender)
325+
where
282326
P: odbc_api::ParameterCollectionRef,
283327
{
284328
match conn.execute(sql, params, None) {
@@ -288,10 +332,8 @@ fn dispatch_execute<P>(
288332
}
289333
}
290334

291-
fn handle_cursor<C>(
292-
cursor: &mut C,
293-
tx: &flume::Sender<Result<Either<OdbcQueryResult, OdbcRow>, Error>>,
294-
) where
335+
fn handle_cursor<C>(cursor: &mut C, tx: &ExecuteSender)
336+
where
295337
C: Cursor + ResultSetMetadata,
296338
{
297339
let columns = collect_columns(cursor);
@@ -304,12 +346,12 @@ fn handle_cursor<C>(
304346
send_empty_result(tx);
305347
}
306348

307-
fn send_empty_result(tx: &flume::Sender<Result<Either<OdbcQueryResult, OdbcRow>, Error>>) {
308-
let _ = tx.send(Ok(Either::Left(OdbcQueryResult { rows_affected: 0 })));
349+
fn send_empty_result(tx: &ExecuteSender) {
350+
send_stream_result(tx, Ok(Either::Left(OdbcQueryResult { rows_affected: 0 })));
309351
}
310352

311-
fn send_error(tx: &flume::Sender<Result<Either<OdbcQueryResult, OdbcRow>, Error>>, error: Error) {
312-
let _ = tx.send(Err(error));
353+
fn send_error(tx: &ExecuteSender, error: Error) {
354+
send_stream_result(tx, Err(error));
313355
}
314356

315357
// Metadata and row processing
@@ -342,11 +384,7 @@ fn decode_column_name(name_bytes: Vec<u8>, index: u16) -> String {
342384
String::from_utf8(name_bytes).unwrap_or_else(|_| format!("col{}", index - 1))
343385
}
344386

345-
fn stream_rows<C>(
346-
cursor: &mut C,
347-
columns: &[OdbcColumn],
348-
tx: &flume::Sender<Result<Either<OdbcQueryResult, OdbcRow>, Error>>,
349-
) -> Result<(), Error>
387+
fn stream_rows<C>(cursor: &mut C, columns: &[OdbcColumn], tx: &ExecuteSender) -> Result<(), Error>
350388
where
351389
C: Cursor,
352390
{

0 commit comments

Comments
 (0)