Skip to content

Commit 0073218

Browse files
committed
fix(odbc): correctly propagate errors when the client of the worker thread drops its channel
1 parent 56f6f37 commit 0073218

File tree

1 file changed

+20
-17
lines changed

1 file changed

+20
-17
lines changed

sqlx-core/src/odbc/connection/worker.rs

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::thread;
22

3-
use flume::TrySendError;
3+
use flume::{SendError, TrySendError};
44
use futures_channel::oneshot;
55

66
use crate::error::Error;
@@ -203,8 +203,11 @@ fn send_result<T: std::fmt::Debug>(tx: oneshot::Sender<T>, result: T) {
203203
let _ = tx.send(result);
204204
}
205205

206-
fn send_stream_result(tx: &ExecuteSender, result: ExecuteResult) {
207-
let _ = tx.send(result);
206+
fn send_stream_result(
207+
tx: &ExecuteSender,
208+
result: ExecuteResult,
209+
) -> Result<(), SendError<ExecuteResult>> {
210+
tx.send(result)
208211
}
209212

210213
async fn send_command_and_await<T>(
@@ -349,8 +352,8 @@ where
349352
{
350353
match conn.execute(sql, params, None) {
351354
Ok(Some(mut cursor)) => handle_cursor(&mut cursor, tx),
352-
Ok(None) => send_empty_result(tx),
353-
Err(e) => send_error(tx, Error::from(e)),
355+
Ok(None) => send_empty_result(tx).unwrap_or_default(),
356+
Err(e) => send_error(tx, Error::from(e)).unwrap_or_default(),
354357
}
355358
}
356359

@@ -360,20 +363,19 @@ where
360363
{
361364
let columns = collect_columns(cursor);
362365

363-
if let Err(e) = stream_rows(cursor, &columns, tx) {
364-
send_error(tx, e);
365-
return;
366+
match stream_rows(cursor, &columns, tx) {
367+
Ok(true) => send_empty_result(tx).unwrap_or_default(),
368+
Ok(false) => {}
369+
Err(e) => send_error(tx, e).unwrap_or_default(),
366370
}
367-
368-
send_empty_result(tx);
369371
}
370372

371-
fn send_empty_result(tx: &ExecuteSender) {
372-
send_stream_result(tx, Ok(Either::Left(OdbcQueryResult { rows_affected: 0 })));
373+
fn send_empty_result(tx: &ExecuteSender) -> Result<(), SendError<ExecuteResult>> {
374+
send_stream_result(tx, Ok(Either::Left(OdbcQueryResult { rows_affected: 0 })))
373375
}
374376

375-
fn send_error(tx: &ExecuteSender, error: Error) {
376-
send_stream_result(tx, Err(error));
377+
fn send_error(tx: &ExecuteSender, error: Error) -> Result<(), SendError<ExecuteResult>> {
378+
send_stream_result(tx, Err(error))
377379
}
378380

379381
// Metadata and row processing
@@ -406,10 +408,11 @@ fn decode_column_name(name_bytes: Vec<u8>, index: u16) -> String {
406408
String::from_utf8(name_bytes).unwrap_or_else(|_| format!("col{}", index - 1))
407409
}
408410

409-
fn stream_rows<C>(cursor: &mut C, columns: &[OdbcColumn], tx: &ExecuteSender) -> Result<(), Error>
411+
fn stream_rows<C>(cursor: &mut C, columns: &[OdbcColumn], tx: &ExecuteSender) -> Result<bool, Error>
410412
where
411413
C: Cursor,
412414
{
415+
let mut receiver_open = true;
413416
while let Some(mut row) = cursor.next_row()? {
414417
let values = collect_row_values(&mut row, columns)?;
415418
let row_data = OdbcRow {
@@ -418,11 +421,11 @@ where
418421
};
419422

420423
if tx.send(Ok(Either::Right(row_data))).is_err() {
421-
// Receiver dropped, stop processing
424+
receiver_open = false;
422425
break;
423426
}
424427
}
425-
Ok(())
428+
Ok(receiver_open)
426429
}
427430

428431
fn collect_row_values(

0 commit comments

Comments
 (0)