Skip to content

Commit 5014661

Browse files
cursoragentlovasoa
andcommitted
Refactor: Improve async runtime compatibility for ODBC
Co-authored-by: contact <[email protected]>
1 parent 0fb39af commit 5014661

File tree

6 files changed

+51
-26
lines changed

6 files changed

+51
-26
lines changed

sqlx-core/src/odbc/blocking.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,14 @@ where
66
R: Send + 'static,
77
F: FnOnce() -> Result<R, Error> + Send + 'static,
88
{
9-
let res = spawn_blocking(f).await.map_err(|_| Error::WorkerCrashed)?;
10-
res
9+
#[cfg(feature = "_rt-tokio")]
10+
{
11+
let join_result = spawn_blocking(f).await.map_err(|_| Error::WorkerCrashed)?;
12+
join_result
13+
}
14+
15+
#[cfg(feature = "_rt-async-std")]
16+
{
17+
spawn_blocking(f).await
18+
}
1119
}

sqlx-core/src/odbc/connection/inner.rs

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use crate::error::Error;
2-
use crate::odbc::{OdbcArgumentValue, OdbcArguments, OdbcColumn, OdbcQueryResult, OdbcRow, OdbcTypeInfo};
2+
use crate::odbc::{
3+
OdbcArgumentValue, OdbcArguments, OdbcColumn, OdbcQueryResult, OdbcRow, OdbcTypeInfo,
4+
};
35
use either::Either;
46
use flume::{SendError, Sender};
57
use odbc_api::handles::StatementImpl;
@@ -105,7 +107,9 @@ where
105107
C: ResultSetMetadata,
106108
{
107109
let count = cursor.num_result_cols().unwrap_or(0);
108-
(1..=count).map(|i| create_column(cursor, i as u16)).collect()
110+
(1..=count)
111+
.map(|i| create_column(cursor, i as u16))
112+
.collect()
109113
}
110114

111115
fn create_column<C>(cursor: &mut C, index: u16) -> OdbcColumn
@@ -177,7 +181,9 @@ fn collect_column_value(
177181
| DataType::Bit => extract_int(row, col_idx, &type_info)?,
178182

179183
DataType::Real => extract_float::<f32>(row, col_idx, &type_info)?,
180-
DataType::Float { .. } | DataType::Double => extract_float::<f64>(row, col_idx, &type_info)?,
184+
DataType::Float { .. } | DataType::Double => {
185+
extract_float::<f64>(row, col_idx, &type_info)?
186+
}
181187

182188
DataType::Char { .. }
183189
| DataType::Varchar { .. }
@@ -195,10 +201,12 @@ fn collect_column_value(
195201
extract_binary(row, col_idx, &type_info)?
196202
}
197203

198-
DataType::Unknown | DataType::Other { .. } => match extract_text(row, col_idx, &type_info) {
199-
Ok(v) => v,
200-
Err(_) => extract_binary(row, col_idx, &type_info)?,
201-
},
204+
DataType::Unknown | DataType::Other { .. } => {
205+
match extract_text(row, col_idx, &type_info) {
206+
Ok(v) => v,
207+
Err(_) => extract_binary(row, col_idx, &type_info)?,
208+
}
209+
}
202210
};
203211

204212
Ok((type_info, value))
@@ -289,7 +297,11 @@ fn extract_binary(
289297
let mut buf = Vec::new();
290298
let is_some = row.get_binary(col_idx, &mut buf)?;
291299

292-
let (is_null, blob) = if !is_some { (true, None) } else { (false, Some(buf)) };
300+
let (is_null, blob) = if !is_some {
301+
(true, None)
302+
} else {
303+
(false, Some(buf))
304+
};
293305

294306
Ok(crate::odbc::OdbcValue {
295307
type_info: type_info.clone(),
@@ -310,4 +322,3 @@ pub fn do_prepare(
310322
let params = usize::from(prepared.num_params().unwrap_or(0));
311323
Ok((0, columns, params))
312324
}
313-

sqlx-core/src/odbc/connection/mod.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
use crate::connection::{Connection, LogSettings};
22
use crate::error::Error;
3+
use crate::odbc::blocking::run_blocking;
34
use crate::odbc::{Odbc, OdbcArguments, OdbcColumn, OdbcConnectOptions, OdbcQueryResult, OdbcRow};
45
use crate::transaction::Transaction;
56
use either::Either;
6-
use crate::odbc::blocking::run_blocking;
77
mod inner;
8-
use inner::{do_prepare, establish_connection, execute_sql, OdbcConn};
98
use futures_core::future::BoxFuture;
109
use futures_util::future;
10+
use inner::{do_prepare, establish_connection, execute_sql, OdbcConn};
1111
// no direct spawn_blocking here; use run_blocking helper
1212
use std::sync::{Arc, Mutex};
1313

@@ -28,7 +28,8 @@ impl OdbcConnection {
2828
let conn = run_blocking({
2929
let options = options.clone();
3030
move || establish_connection(&options)
31-
}).await?;
31+
})
32+
.await?;
3233

3334
Ok(Self {
3435
inner: Arc::new(Mutex::new(conn)),
@@ -44,7 +45,8 @@ impl OdbcConnection {
4445
let conn = inner.lock().unwrap();
4546
conn.database_management_system_name()
4647
.map_err(|e| Error::Protocol(format!("Failed to get DBMS name: {}", e)))
47-
}).await
48+
})
49+
.await
4850
}
4951

5052
pub(crate) async fn ping_blocking(&mut self) -> Result<(), Error> {
@@ -56,7 +58,8 @@ impl OdbcConnection {
5658
Ok(_) => Ok(()),
5759
Err(e) => Err(Error::Protocol(format!("Ping failed: {}", e))),
5860
}
59-
}).await
61+
})
62+
.await
6063
}
6164

6265
pub(crate) async fn begin_blocking(&mut self) -> Result<(), Error> {
@@ -65,7 +68,8 @@ impl OdbcConnection {
6568
let conn = inner.lock().unwrap();
6669
conn.set_autocommit(false)
6770
.map_err(|e| Error::Protocol(format!("Failed to begin transaction: {}", e)))
68-
}).await
71+
})
72+
.await
6973
}
7074

7175
pub(crate) async fn commit_blocking(&mut self) -> Result<(), Error> {
@@ -75,7 +79,8 @@ impl OdbcConnection {
7579
conn.commit()
7680
.and_then(|_| conn.set_autocommit(true))
7781
.map_err(|e| Error::Protocol(format!("Failed to commit transaction: {}", e)))
78-
}).await
82+
})
83+
.await
7984
}
8085

8186
pub(crate) async fn rollback_blocking(&mut self) -> Result<(), Error> {
@@ -85,7 +90,8 @@ impl OdbcConnection {
8590
conn.rollback()
8691
.and_then(|_| conn.set_autocommit(true))
8792
.map_err(|e| Error::Protocol(format!("Failed to rollback transaction: {}", e)))
88-
}).await
93+
})
94+
.await
8995
}
9096

9197
pub(crate) async fn execute_stream(
@@ -102,7 +108,8 @@ impl OdbcConnection {
102108
let _ = tx.send(Err(e));
103109
}
104110
Ok(())
105-
}).await?;
111+
})
112+
.await?;
106113
Ok(rx)
107114
}
108115

sqlx-core/src/odbc/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
use crate::executor::Executor;
2525

2626
mod arguments;
27+
mod blocking;
2728
mod column;
2829
mod connection;
2930
mod database;
@@ -36,7 +37,6 @@ mod transaction;
3637
mod type_info;
3738
pub mod types;
3839
mod value;
39-
mod blocking;
4040

4141
pub use arguments::{OdbcArgumentValue, OdbcArguments};
4242
pub use column::OdbcColumn;

sqlx-rt/src/rt_async_std.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
pub use async_std::{
22
self, fs, future::timeout, io::prelude::ReadExt as AsyncReadExt,
33
io::prelude::WriteExt as AsyncWriteExt, io::Read as AsyncRead, io::Write as AsyncWrite,
4-
net::TcpStream, sync::Mutex as AsyncMutex, task::sleep, task::spawn, task::yield_now,
5-
task::spawn_blocking,
4+
net::TcpStream, sync::Mutex as AsyncMutex, task::sleep, task::spawn, task::spawn_blocking,
5+
task::yield_now,
66
};
77

88
#[cfg(unix)]

sqlx-rt/src/rt_tokio.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
pub use tokio::{
22
self, fs, io::AsyncRead, io::AsyncReadExt, io::AsyncWrite, io::AsyncWriteExt, io::ReadBuf,
3-
net::TcpStream, runtime::Handle, sync::Mutex as AsyncMutex, task::spawn, task::yield_now,
4-
task::spawn_blocking,
5-
time::sleep, time::timeout,
3+
net::TcpStream, runtime::Handle, sync::Mutex as AsyncMutex, task::spawn, task::spawn_blocking,
4+
task::yield_now, time::sleep, time::timeout,
65
};
76

87
#[cfg(unix)]

0 commit comments

Comments
 (0)