Skip to content

Commit 9fb8ab0

Browse files
committed
refactor(odbc): enhance command processing and error handling
- Updated command processing to utilize a more structured approach with CommandControlFlow for better clarity and error handling. - Improved logging for command execution and error scenarios to aid in debugging. - Refactored SQL execution functions to streamline error handling and result sending. - Enhanced tests to assert specific error types for connection and SQL syntax issues.
1 parent 04a2df3 commit 9fb8ab0

File tree

3 files changed

+168
-114
lines changed

3 files changed

+168
-114
lines changed

sqlx-core/src/odbc/arguments.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::encode::Encode;
33
use crate::odbc::Odbc;
44
use crate::types::Type;
55

6-
#[derive(Default)]
6+
#[derive(Default, Debug)]
77
pub struct OdbcArguments {
88
pub(crate) values: Vec<OdbcArgumentValue>,
99
}

sqlx-core/src/odbc/connection/worker.rs

Lines changed: 104 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub(crate) struct ConnectionWorker {
3333
join_handle: Option<thread::JoinHandle<()>>,
3434
}
3535

36+
#[derive(Debug)]
3637
enum Command {
3738
Ping {
3839
tx: oneshot::Sender<()>,
@@ -194,12 +195,19 @@ fn worker_thread_main(
194195

195196
// Process commands
196197
while let Ok(cmd) = command_rx.recv() {
197-
if let Some(shutdown_tx) = process_command(cmd, &conn, &mut stmt_manager) {
198-
log::debug!("Shutting down ODBC worker thread");
199-
drop(stmt_manager);
200-
drop(conn);
201-
let _ = shutdown_tx.send(());
202-
break;
198+
log::trace!("Processing command: {:?}", cmd);
199+
match process_command(cmd, &conn, &mut stmt_manager) {
200+
Ok(CommandControlFlow::Continue) => {}
201+
Ok(CommandControlFlow::Shutdown(shutdown_tx)) => {
202+
log::debug!("Shutting down ODBC worker thread");
203+
drop(stmt_manager);
204+
drop(conn);
205+
send_oneshot(shutdown_tx, (), "shutdown");
206+
break;
207+
}
208+
Err(()) => {
209+
log::error!("ODBC worker error while processing command");
210+
}
203211
}
204212
}
205213
// Channel disconnected or shutdown command received, worker thread exits
@@ -270,9 +278,11 @@ impl<'conn> StatementManager<'conn> {
270278
}
271279
}
272280
}
273-
// Utility functions for channel operations (deprecated - use send_result_safe)
274-
fn send_result<T: std::fmt::Debug>(tx: oneshot::Sender<T>, result: T) {
275-
send_result_safe(tx, result, "unknown");
281+
// Helper function to send results through oneshot channels with consistent error handling
282+
fn send_oneshot<T>(tx: oneshot::Sender<T>, result: T, operation: &str) {
283+
if tx.send(result).is_err() {
284+
log::warn!("Failed to send {} result: receiver dropped", operation);
285+
}
276286
}
277287

278288
fn send_stream_result(
@@ -312,151 +322,141 @@ fn execute_transaction_operation<F>(
312322
where
313323
F: FnOnce(&OdbcConnection) -> Result<(), odbc_api::Error>,
314324
{
325+
log::trace!("{} odbc transaction", operation_name);
315326
operation(conn)
316327
.map_err(|e| Error::Protocol(format!("Failed to {} transaction: {}", operation_name, e)))
317328
}
318329

330+
#[derive(Debug)]
331+
enum CommandControlFlow {
332+
Shutdown(oneshot::Sender<()>),
333+
Continue,
334+
}
335+
336+
type CommandResult = Result<CommandControlFlow, ()>;
337+
319338
// Returns a shutdown tx if the command is a shutdown command
320339
fn process_command<'conn>(
321340
cmd: Command,
322341
conn: &'conn OdbcConnection,
323342
stmt_manager: &mut StatementManager<'conn>,
324-
) -> Option<oneshot::Sender<()>> {
343+
) -> CommandResult {
325344
match cmd {
326345
Command::Ping { tx } => handle_ping(conn, tx),
327346
Command::Begin { tx } => handle_begin(conn, tx),
328347
Command::Commit { tx } => handle_commit(conn, tx),
329348
Command::Rollback { tx } => handle_rollback(conn, tx),
330-
Command::Shutdown { tx } => return Some(tx),
349+
Command::Shutdown { tx } => Ok(CommandControlFlow::Shutdown(tx)),
331350
Command::Execute { sql, args, tx } => handle_execute(stmt_manager, sql, args, tx),
332351
Command::Prepare { sql, tx } => handle_prepare(stmt_manager, sql, tx),
333352
Command::GetDbmsName { tx } => handle_get_dbms_name(conn, tx),
334353
}
335-
None
336354
}
337355

338356
// Command handlers
339-
fn handle_ping(conn: &OdbcConnection, tx: oneshot::Sender<()>) {
340-
let _ = conn.execute("SELECT 1", (), None);
341-
send_result(tx, ());
357+
fn handle_ping(conn: &OdbcConnection, tx: oneshot::Sender<()>) -> CommandResult {
358+
match conn.execute("SELECT 1", (), None) {
359+
Ok(_) => send_oneshot(tx, (), "ping"),
360+
Err(e) => log::error!("Ping failed: {}", e),
361+
}
362+
Ok(CommandControlFlow::Continue)
342363
}
343364

344-
fn handle_begin(conn: &OdbcConnection, tx: TransactionSender) {
345-
log::debug!("Beginning transaction");
365+
fn handle_begin(conn: &OdbcConnection, tx: TransactionSender) -> CommandResult {
346366
let result = execute_transaction_operation(conn, |c| c.set_autocommit(false), "begin");
347-
send_result_safe(tx, result, "begin transaction");
367+
send_oneshot(tx, result, "begin transaction");
368+
Ok(CommandControlFlow::Continue)
348369
}
349370

350-
fn handle_commit(conn: &OdbcConnection, tx: TransactionSender) {
351-
log::debug!("Committing transaction");
371+
fn handle_commit(conn: &OdbcConnection, tx: TransactionSender) -> CommandResult {
352372
let result = execute_transaction_operation(
353373
conn,
354374
|c| c.commit().and_then(|_| c.set_autocommit(true)),
355375
"commit",
356376
);
357-
send_result_safe(tx, result, "commit transaction");
377+
send_oneshot(tx, result, "commit transaction");
378+
Ok(CommandControlFlow::Continue)
358379
}
359380

360-
fn handle_rollback(conn: &OdbcConnection, tx: TransactionSender) {
361-
log::debug!("Rolling back transaction");
381+
fn handle_rollback(conn: &OdbcConnection, tx: TransactionSender) -> CommandResult {
362382
let result = execute_transaction_operation(
363383
conn,
364384
|c| c.rollback().and_then(|_| c.set_autocommit(true)),
365385
"rollback",
366386
);
367-
send_result_safe(tx, result, "rollback transaction");
368-
}
369-
370-
fn handle_execute<'conn>(
371-
stmt_manager: &mut StatementManager<'conn>,
372-
sql: Box<str>,
373-
args: Option<OdbcArguments>,
374-
tx: ExecuteSender,
375-
) {
376-
execute_sql(stmt_manager, &sql, args, &tx);
387+
send_oneshot(tx, result, "rollback transaction");
388+
Ok(CommandControlFlow::Continue)
377389
}
378-
379390
fn handle_prepare<'conn>(
380391
stmt_manager: &mut StatementManager<'conn>,
381392
sql: Box<str>,
382393
tx: PrepareSender,
383-
) {
384-
log::debug!(
385-
"Preparing statement: {}",
386-
sql.chars().take(100).collect::<String>()
387-
);
394+
) -> CommandResult {
395+
let result = do_prepare(stmt_manager, sql);
396+
send_oneshot(tx, result, "prepare");
397+
Ok(CommandControlFlow::Continue)
398+
}
388399

400+
fn do_prepare<'conn>(stmt_manager: &mut StatementManager<'conn>, sql: Box<str>) -> PrepareResult {
401+
log::trace!("Preparing statement: {}", sql);
389402
// Use the statement manager to get or create the prepared statement
390-
let result = match stmt_manager.get_or_create_prepared(&sql) {
391-
Ok(prepared) => {
392-
let columns = collect_columns(prepared);
393-
let params = prepared.num_params().unwrap_or(0) as usize;
394-
log::debug!(
395-
"Prepared statement with {} columns and {} parameters",
396-
columns.len(),
397-
params
398-
);
399-
Ok((0, columns, params))
400-
}
401-
Err(e) => Err(e),
402-
};
403-
404-
send_result_safe(tx, result, "prepare statement");
403+
let prepared = stmt_manager.get_or_create_prepared(&sql)?;
404+
let columns = collect_columns(prepared);
405+
let params = usize::from(prepared.num_params().unwrap_or(0));
406+
log::debug!(
407+
"Prepared statement with {} columns and {} parameters",
408+
columns.len(),
409+
params
410+
);
411+
Ok((0, columns, params))
405412
}
406413

407-
fn handle_get_dbms_name(conn: &OdbcConnection, tx: oneshot::Sender<Result<String, Error>>) {
414+
fn handle_get_dbms_name(
415+
conn: &OdbcConnection,
416+
tx: oneshot::Sender<Result<String, Error>>,
417+
) -> CommandResult {
408418
log::debug!("Getting DBMS name");
409419
let result = conn
410420
.database_management_system_name()
411421
.map_err(|e| Error::Protocol(format!("Failed to get DBMS name: {}", e)));
412-
send_result_safe(tx, result, "get DBMS name");
422+
send_oneshot(tx, result, "DBMS name");
423+
Ok(CommandControlFlow::Continue)
413424
}
414425

415-
// SQL execution functions
416-
fn execute_sql<'conn>(
426+
fn handle_execute<'conn>(
417427
stmt_manager: &mut StatementManager<'conn>,
418-
sql: &str,
428+
sql: Box<str>,
419429
args: Option<OdbcArguments>,
420-
tx: &ExecuteSender,
421-
) {
422-
let params = prepare_parameters(args);
423-
let has_params = !params.is_empty();
424-
425-
let result = if has_params {
426-
execute_with_prepared_statement(stmt_manager, sql, &params[..], tx)
427-
} else {
428-
execute_with_direct_statement(stmt_manager, sql, tx)
429-
};
430+
tx: ExecuteSender,
431+
) -> CommandResult {
432+
log::debug!(
433+
"Executing SQL: {}",
434+
sql.chars().take(100).collect::<String>()
435+
);
430436

431-
if let Err(e) = result {
432-
let _ = send_error(tx, e);
433-
}
437+
let result = execute_sql(stmt_manager, &sql, args, &tx);
438+
with_result_send_error(result, &tx, |_| {});
439+
Ok(CommandControlFlow::Continue)
434440
}
435441

436-
fn execute_with_direct_statement<'conn>(
442+
// SQL execution functions
443+
fn execute_sql<'conn>(
437444
stmt_manager: &mut StatementManager<'conn>,
438445
sql: &str,
446+
args: Option<OdbcArguments>,
439447
tx: &ExecuteSender,
440448
) -> Result<(), Error> {
449+
let params = prepare_parameters(args);
441450
let stmt = stmt_manager.get_or_create_direct_stmt()?;
442-
execute_statement(stmt.execute(sql, ()), tx)
443-
}
444-
445-
fn execute_with_prepared_statement<'conn, P>(
446-
stmt_manager: &mut StatementManager<'conn>,
447-
sql: &str,
448-
params: P,
449-
tx: &ExecuteSender,
450-
) -> Result<(), Error>
451-
where
452-
P: odbc_api::ParameterCollectionRef,
453-
{
454-
let prepared = stmt_manager.get_or_create_prepared(sql)?;
455-
execute_statement(prepared.execute(params), tx)
451+
log::trace!("Starting execution of SQL: {}", sql);
452+
let cursor_result = stmt.execute(sql, &params[..]);
453+
log::trace!("Received cursor result for SQL: {}", sql);
454+
send_exec_result(cursor_result, tx)?;
455+
Ok(())
456456
}
457457

458458
// Unified execution logic for both direct and prepared statements
459-
fn execute_statement<C>(
459+
fn send_exec_result<C>(
460460
execution_result: Result<Option<C>, odbc_api::Error>,
461461
tx: &ExecuteSender,
462462
) -> Result<(), Error>
@@ -509,7 +509,7 @@ where
509509
log::trace!("Row streaming stopped early (receiver closed)");
510510
}
511511
Err(e) => {
512-
let _ = send_error(tx, e);
512+
send_error(tx, e);
513513
}
514514
}
515515
}
@@ -519,19 +519,25 @@ fn send_done(tx: &ExecuteSender, rows_affected: u64) -> Result<(), SendError<Exe
519519
send_stream_result(tx, Ok(Either::Left(OdbcQueryResult { rows_affected })))
520520
}
521521

522-
fn send_error(tx: &ExecuteSender, error: Error) -> Result<(), SendError<ExecuteResult>> {
523-
send_stream_result(tx, Err(error))
522+
fn with_result_send_error<T>(
523+
result: Result<T, Error>,
524+
tx: &ExecuteSender,
525+
handler: impl FnOnce(T),
526+
) {
527+
match result {
528+
Ok(result) => handler(result),
529+
Err(error) => send_error(tx, error),
530+
}
524531
}
525532

526-
fn send_row(tx: &ExecuteSender, row: OdbcRow) -> Result<(), SendError<ExecuteResult>> {
527-
send_stream_result(tx, Ok(Either::Right(row)))
533+
fn send_error(tx: &ExecuteSender, error: Error) {
534+
if let Err(e) = send_stream_result(tx, Err(error)) {
535+
log::error!("Failed to send error from ODBC worker thread: {}", e);
536+
}
528537
}
529538

530-
// Helper function for safe result sending with logging
531-
fn send_result_safe<T: std::fmt::Debug>(tx: oneshot::Sender<T>, result: T, operation: &str) {
532-
if tx.send(result).is_err() {
533-
log::warn!("Failed to send {} result: receiver dropped", operation);
534-
}
539+
fn send_row(tx: &ExecuteSender, row: OdbcRow) -> Result<(), SendError<ExecuteResult>> {
540+
send_stream_result(tx, Ok(Either::Right(row)))
535541
}
536542

537543
// Metadata and row processing

0 commit comments

Comments
 (0)