Skip to content

Commit ebd481c

Browse files
cursoragentlovasoa
andcommitted
feat: Add ODBC driver support
Adds the ODBC driver to sqlx, enabling connections to databases via ODBC. Co-authored-by: contact <[email protected]>
1 parent 3c84fc2 commit ebd481c

File tree

20 files changed

+1369
-4
lines changed

20 files changed

+1369
-4
lines changed

Cargo.lock

Lines changed: 620 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ postgres = ["sqlx-core/postgres", "sqlx-macros/postgres"]
131131
mysql = ["sqlx-core/mysql", "sqlx-macros/mysql"]
132132
sqlite = ["sqlx-core/sqlite", "sqlx-macros/sqlite"]
133133
mssql = ["sqlx-core/mssql", "sqlx-macros/mssql"]
134+
odbc = ["sqlx-core/odbc"]
134135

135136
# types
136137
bigdecimal = ["sqlx-core/bigdecimal", "sqlx-macros/bigdecimal"]

sqlx-core/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ mysql = [
4646
sqlite = ["libsqlite3-sys", "futures-executor", "flume"]
4747
mssql = ["uuid", "encoding_rs", "regex"]
4848
any = []
49+
odbc = ["odbc-api", "futures-executor", "flume"]
4950

5051
# types
5152
all-types = [
@@ -172,6 +173,7 @@ hkdf = { version = "0.12.0", optional = true }
172173
event-listener = "5.4.0"
173174

174175
dotenvy = "0.15"
176+
odbc-api = { version = "19.0.1", optional = true }
175177

176178
[dev-dependencies]
177179
sqlx = { package = "sqlx-oldapi", path = "..", features = ["postgres", "sqlite", "mysql", "runtime-tokio-rustls"] }

sqlx-core/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,10 @@ pub mod mysql;
105105
#[cfg_attr(docsrs, doc(cfg(feature = "mssql")))]
106106
pub mod mssql;
107107

108+
#[cfg(feature = "odbc")]
109+
#[cfg_attr(docsrs, doc(cfg(feature = "odbc")))]
110+
pub mod odbc;
111+
108112
// Implements test support with automatic DB management.
109113
#[cfg(feature = "migrate")]
110114
pub mod testing;

sqlx-core/src/odbc/arguments.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
use crate::arguments::Arguments;
2+
use crate::encode::Encode;
3+
use crate::odbc::Odbc;
4+
use crate::types::Type;
5+
6+
#[derive(Default)]
7+
pub struct OdbcArguments<'q> {
8+
pub(crate) values: Vec<OdbcArgumentValue<'q>>,
9+
}
10+
11+
pub enum OdbcArgumentValue<'q> {
12+
Text(String),
13+
Bytes(Vec<u8>),
14+
Int(i64),
15+
Float(f64),
16+
Null,
17+
// Borrowed placeholder to satisfy lifetimes; not used for now
18+
Phantom(std::marker::PhantomData<&'q ()>),
19+
}
20+
21+
impl<'q> Arguments<'q> for OdbcArguments<'q> {
22+
type Database = Odbc;
23+
24+
fn reserve(&mut self, additional: usize, _size: usize) {
25+
self.values.reserve(additional);
26+
}
27+
28+
fn add<T>(&mut self, _value: T)
29+
where
30+
T: 'q + Send + Encode<'q, Self::Database> + Type<Self::Database>,
31+
{
32+
// Not implemented yet; ODBC backend currently executes direct SQL without binds
33+
// This stub allows query() without binds to compile.
34+
let _ = _value;
35+
}
36+
}

sqlx-core/src/odbc/column.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use crate::column::Column;
2+
use crate::odbc::{Odbc, OdbcTypeInfo};
3+
4+
#[derive(Debug, Clone)]
5+
pub struct OdbcColumn {
6+
pub(crate) name: String,
7+
pub(crate) type_info: OdbcTypeInfo,
8+
pub(crate) ordinal: usize,
9+
}
10+
11+
impl Column for OdbcColumn {
12+
type Database = Odbc;
13+
14+
fn ordinal(&self) -> usize { self.ordinal }
15+
fn name(&self) -> &str { &self.name }
16+
fn type_info(&self) -> &OdbcTypeInfo { &self.type_info }
17+
}
18+
19+
mod private {
20+
use crate::column::private_column::Sealed;
21+
use super::OdbcColumn;
22+
impl Sealed for OdbcColumn {}
23+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
use crate::describe::Describe;
2+
use crate::error::Error;
3+
use crate::executor::{Execute, Executor};
4+
use crate::logger::QueryLogger;
5+
use crate::odbc::{Odbc, OdbcColumn, OdbcConnection, OdbcQueryResult, OdbcRow, OdbcStatement, OdbcTypeInfo};
6+
use either::Either;
7+
use futures_core::future::BoxFuture;
8+
use futures_core::stream::BoxStream;
9+
use futures_util::TryStreamExt;
10+
use std::pin::Pin;
11+
use odbc_api::Cursor;
12+
use std::borrow::Cow;
13+
14+
impl OdbcConnection {
15+
async fn run<'e>(
16+
&'e mut self,
17+
sql: &'e str,
18+
) -> Result<impl futures_core::Stream<Item = Result<Either<OdbcQueryResult, OdbcRow>, Error>> + 'e, Error> {
19+
let mut logger = QueryLogger::new(sql, self.log_settings.clone());
20+
21+
Ok(Box::pin(try_stream! {
22+
let guard = self.worker.shared.conn.lock().await;
23+
match guard.execute(sql, (), None) {
24+
Ok(Some(mut cursor)) => {
25+
use odbc_api::ResultSetMetadata;
26+
let mut columns = Vec::new();
27+
if let Ok(count) = cursor.num_result_cols() {
28+
for i in 1..=count { // ODBC columns are 1-based
29+
let mut cd = odbc_api::ColumnDescription::default();
30+
let _ = cursor.describe_col(i as u16, &mut cd);
31+
let name = String::from_utf8(cd.name).unwrap_or_else(|_| format!("col{}", i-1));
32+
columns.push(OdbcColumn { name, type_info: OdbcTypeInfo { name: format!("{:?}", cd.data_type), is_null: false }, ordinal: (i-1) as usize });
33+
}
34+
}
35+
while let Some(mut row) = cursor.next_row().map_err(|e| Error::from(e))? {
36+
let mut values = Vec::with_capacity(columns.len());
37+
for i in 1..=columns.len() {
38+
let mut buf = Vec::new();
39+
let not_null = row.get_text(i as u16, &mut buf).map_err(|e| Error::from(e))?;
40+
if not_null {
41+
let ti = OdbcTypeInfo { name: "TEXT".into(), is_null: false };
42+
values.push((ti, Some(buf)));
43+
} else {
44+
let ti = OdbcTypeInfo { name: "TEXT".into(), is_null: true };
45+
values.push((ti, None));
46+
}
47+
}
48+
logger.increment_rows_returned();
49+
r#yield!(Either::Right(OdbcRow { columns: columns.clone(), values }));
50+
}
51+
r#yield!(Either::Left(OdbcQueryResult { rows_affected: 0 }));
52+
}
53+
Ok(None) => {
54+
r#yield!(Either::Left(OdbcQueryResult { rows_affected: 0 }));
55+
}
56+
Err(e) => return Err(Error::from(e)),
57+
}
58+
Ok(())
59+
}))
60+
}
61+
}
62+
63+
impl<'c> Executor<'c> for &'c mut OdbcConnection {
64+
type Database = Odbc;
65+
66+
fn fetch_many<'e, 'q: 'e, E>(
67+
self,
68+
mut query: E,
69+
) -> BoxStream<'e, Result<Either<OdbcQueryResult, OdbcRow>, Error>>
70+
where
71+
'c: 'e,
72+
E: Execute<'q, Self::Database> + 'q,
73+
{
74+
let sql = query.sql();
75+
Box::pin(try_stream! {
76+
let s = self.run(sql).await?;
77+
futures_util::pin_mut!(s);
78+
while let Some(v) = s.try_next().await? { r#yield!(v); }
79+
Ok(())
80+
})
81+
}
82+
83+
fn fetch_optional<'e, 'q: 'e, E>(
84+
self,
85+
query: E,
86+
) -> BoxFuture<'e, Result<Option<OdbcRow>, Error>>
87+
where
88+
'c: 'e,
89+
E: Execute<'q, Self::Database> + 'q,
90+
{
91+
let mut s = self.fetch_many(query);
92+
Box::pin(async move {
93+
while let Some(v) = s.try_next().await? {
94+
if let Either::Right(r) = v { return Ok(Some(r)); }
95+
}
96+
Ok(None)
97+
})
98+
}
99+
100+
fn prepare_with<'e, 'q: 'e>(
101+
self,
102+
sql: &'q str,
103+
_parameters: &'e [OdbcTypeInfo],
104+
) -> BoxFuture<'e, Result<OdbcStatement<'q>, Error>>
105+
where
106+
'c: 'e,
107+
{
108+
Box::pin(async move {
109+
// Basic statement metadata: no parameter/column info without executing
110+
Ok(OdbcStatement { sql: Cow::Borrowed(sql), columns: Vec::new(), parameters: 0 })
111+
})
112+
}
113+
114+
#[doc(hidden)]
115+
fn describe<'e, 'q: 'e>(self, _sql: &'q str) -> BoxFuture<'e, Result<Describe<Odbc>, Error>>
116+
where
117+
'c: 'e,
118+
{
119+
Box::pin(async move { Err(Error::Protocol("ODBC describe not implemented".into())) })
120+
}
121+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
use crate::connection::{Connection, LogSettings};
2+
use crate::error::Error;
3+
use crate::transaction::Transaction;
4+
use crate::odbc::{Odbc, OdbcConnectOptions};
5+
use futures_core::future::BoxFuture;
6+
use futures_util::future;
7+
8+
mod worker;
9+
mod executor;
10+
11+
pub(crate) use worker::ConnectionWorker;
12+
13+
/// A connection to an ODBC-accessible database.
14+
///
15+
/// ODBC uses a blocking C API, so we run all calls on a dedicated background thread
16+
/// and communicate over channels to provide async access.
17+
#[derive(Debug)]
18+
pub struct OdbcConnection {
19+
pub(crate) worker: ConnectionWorker,
20+
pub(crate) log_settings: LogSettings,
21+
}
22+
23+
impl OdbcConnection {
24+
pub(crate) async fn establish(options: &OdbcConnectOptions) -> Result<Self, Error> {
25+
let worker = ConnectionWorker::establish(options.clone()).await?;
26+
Ok(Self { worker, log_settings: LogSettings::default() })
27+
}
28+
}
29+
30+
impl Connection for OdbcConnection {
31+
type Database = Odbc;
32+
33+
type Options = OdbcConnectOptions;
34+
35+
fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
36+
Box::pin(async move { self.worker.shutdown().await })
37+
}
38+
39+
fn close_hard(self) -> BoxFuture<'static, Result<(), Error>> {
40+
Box::pin(async move { Ok(()) })
41+
}
42+
43+
fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
44+
Box::pin(self.worker.ping())
45+
}
46+
47+
fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
48+
where
49+
Self: Sized,
50+
{
51+
Transaction::begin(self)
52+
}
53+
54+
#[doc(hidden)]
55+
fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
56+
Box::pin(future::ok(()))
57+
}
58+
59+
#[doc(hidden)]
60+
fn should_flush(&self) -> bool {
61+
false
62+
}
63+
}

0 commit comments

Comments
 (0)