Skip to content

Commit 4cdf84f

Browse files
committed
fix(odbc): implement Drop for ConnectionWorker to ensure proper shutdown of worker threads
1 parent 2c3c9f5 commit 4cdf84f

File tree

2 files changed

+48
-42
lines changed

2 files changed

+48
-42
lines changed

sqlx-core/src/odbc/connection/mod.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,3 @@ impl Connection for OdbcConnection {
6464
false
6565
}
6666
}
67-
68-
impl Drop for OdbcConnection {
69-
fn drop(&mut self) {
70-
// Send shutdown command to worker thread to prevent resource leak
71-
self.worker.shutdown_sync();
72-
}
73-
}

sqlx-core/src/odbc/connection/worker.rs

Lines changed: 48 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::thread;
22

3+
use flume::TrySendError;
34
use futures_channel::oneshot;
45

56
use crate::error::Error;
@@ -24,6 +25,7 @@ type PrepareSender = oneshot::Sender<PrepareResult>;
2425
#[derive(Debug)]
2526
pub(crate) struct ConnectionWorker {
2627
command_tx: flume::Sender<Command>,
28+
join_handle: Option<thread::JoinHandle<()>>,
2729
}
2830

2931
enum Command {
@@ -53,15 +55,25 @@ enum Command {
5355
},
5456
}
5557

58+
impl Drop for ConnectionWorker {
59+
fn drop(&mut self) {
60+
self.shutdown_sync();
61+
}
62+
}
63+
5664
impl ConnectionWorker {
5765
pub async fn establish(options: OdbcConnectOptions) -> Result<Self, Error> {
58-
let (establish_tx, establish_rx) = oneshot::channel();
59-
60-
thread::Builder::new()
66+
let (command_tx, command_rx) = flume::bounded(64);
67+
let (conn_tx, conn_rx) = oneshot::channel();
68+
let thread = thread::Builder::new()
6169
.name("sqlx-odbc-conn".into())
62-
.spawn(move || worker_thread_main(options, establish_tx))?;
70+
.spawn(move || worker_thread_main(options, command_rx, conn_tx))?;
6371

64-
establish_rx.await.map_err(|_| Error::WorkerCrashed)?
72+
conn_rx.await.map_err(|_| Error::WorkerCrashed)??;
73+
Ok(ConnectionWorker {
74+
command_tx,
75+
join_handle: Some(thread),
76+
})
6577
}
6678

6779
pub(crate) async fn ping(&mut self) -> Result<(), Error> {
@@ -77,11 +89,24 @@ impl ConnectionWorker {
7789
pub(crate) fn shutdown_sync(&mut self) {
7890
// Send shutdown command without waiting for response
7991
// Use try_send to avoid any potential blocking in Drop
80-
let (tx, _rx) = oneshot::channel();
81-
let _ = self.command_tx.try_send(Command::Shutdown { tx });
8292

83-
// Don't aggressively drop the channel to avoid SendError panics
84-
// The worker thread will exit when it processes the Shutdown command
93+
if let Some(join_handle) = self.join_handle.take() {
94+
let (mut tx, _rx) = oneshot::channel();
95+
while let Err(TrySendError::Full(Command::Shutdown { tx: t })) =
96+
self.command_tx.try_send(Command::Shutdown { tx })
97+
{
98+
tx = t;
99+
log::warn!("odbc worker thread queue is full, retrying...");
100+
thread::sleep(std::time::Duration::from_millis(10));
101+
}
102+
if let Err(e) = join_handle.join() {
103+
let err = e.downcast_ref::<std::io::Error>();
104+
log::error!(
105+
"failed to join worker thread while shutting down: {:?}",
106+
err
107+
);
108+
}
109+
}
85110
}
86111

87112
pub(crate) async fn begin(&mut self) -> Result<(), Error> {
@@ -136,32 +161,22 @@ impl ConnectionWorker {
136161
// Worker thread implementation
137162
fn worker_thread_main(
138163
options: OdbcConnectOptions,
139-
establish_tx: oneshot::Sender<Result<ConnectionWorker, Error>>,
164+
command_rx: flume::Receiver<Command>,
165+
conn_tx: oneshot::Sender<Result<(), Error>>,
140166
) {
141-
let (tx, rx) = flume::bounded(64);
142-
143167
// Establish connection
144168
let conn = match establish_connection(&options) {
145-
Ok(conn) => conn,
146-
Err(e) => {
147-
let _ = establish_tx.send(Err(e));
148-
return;
169+
Ok(conn) => {
170+
conn_tx.send(Ok(())).unwrap();
171+
conn
149172
}
173+
Err(e) => return conn_tx.send(Err(e)).unwrap(),
150174
};
151-
152-
// Send back the worker handle
153-
if establish_tx
154-
.send(Ok(ConnectionWorker {
155-
command_tx: tx.clone(),
156-
}))
157-
.is_err()
158-
{
159-
return;
160-
}
161-
162175
// Process commands
163-
while let Ok(cmd) = rx.recv() {
164-
if !process_command(cmd, &conn) {
176+
while let Ok(cmd) = command_rx.recv() {
177+
if let Some(shutdown_tx) = process_command(cmd, &conn) {
178+
drop(conn);
179+
shutdown_tx.send(()).unwrap();
165180
break;
166181
}
167182
}
@@ -223,20 +238,18 @@ where
223238
.map_err(|e| Error::Protocol(format!("Failed to {} transaction: {}", operation_name, e)))
224239
}
225240

226-
fn process_command(cmd: Command, conn: &OdbcConnection) -> bool {
241+
// Returns a shutdown tx if the command is a shutdown command
242+
fn process_command(cmd: Command, conn: &OdbcConnection) -> Option<oneshot::Sender<()>> {
227243
match cmd {
228244
Command::Ping { tx } => handle_ping(conn, tx),
229245
Command::Begin { tx } => handle_begin(conn, tx),
230246
Command::Commit { tx } => handle_commit(conn, tx),
231247
Command::Rollback { tx } => handle_rollback(conn, tx),
232-
Command::Shutdown { tx } => {
233-
let _ = tx.send(());
234-
return false; // Signal to exit the loop
235-
}
248+
Command::Shutdown { tx } => return Some(tx),
236249
Command::Execute { sql, args, tx } => handle_execute(conn, sql, args, tx),
237250
Command::Prepare { sql, tx } => handle_prepare(conn, sql, tx),
238251
}
239-
true
252+
None
240253
}
241254

242255
// Command handlers

0 commit comments

Comments
 (0)