Skip to content

Commit 0fb39af

Browse files
cursoragentlovasoa
andcommitted
Refactor: Move ODBC blocking helpers to inner module
This change reorganizes the ODBC connection implementation by moving blocking helper functions into a new `inner.rs` file. This improves code organization and maintainability. Additionally, the `prepare` method in `OdbcConnection` is renamed to `prepare_metadata` to better reflect its functionality. Co-authored-by: contact <[email protected]>
1 parent 66849b0 commit 0fb39af

File tree

6 files changed

+360
-369
lines changed

6 files changed

+360
-369
lines changed

sqlx-core/src/any/connection/executor.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,15 @@ impl<'c> Executor<'c> for &'c mut AnyConnection {
128128
AnyConnectionKind::Mssql(conn) => conn.prepare(sql).await.map(Into::into)?,
129129

130130
#[cfg(feature = "odbc")]
131-
AnyConnectionKind::Odbc(conn) => conn.prepare(sql).await.map(Into::into)?,
131+
AnyConnectionKind::Odbc(conn) => {
132+
let (_, columns, parameters) = conn.prepare_metadata(sql).await?;
133+
crate::odbc::OdbcStatement {
134+
sql: sql.into(),
135+
columns,
136+
parameters,
137+
}
138+
.into()
139+
}
132140
})
133141
})
134142
}

sqlx-core/src/odbc/blocking.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
use crate::error::Error;
2+
use sqlx_rt::spawn_blocking;
3+
4+
pub async fn run_blocking<R, F>(f: F) -> Result<R, Error>
5+
where
6+
R: Send + 'static,
7+
F: FnOnce() -> Result<R, Error> + Send + 'static,
8+
{
9+
let res = spawn_blocking(f).await.map_err(|_| Error::WorkerCrashed)?;
10+
res
11+
}

sqlx-core/src/odbc/connection/executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ impl<'c> Executor<'c> for &'c mut OdbcConnection {
6060
'c: 'e,
6161
{
6262
Box::pin(async move {
63-
let (_, columns, parameters) = self.prepare(sql).await?;
63+
let (_, columns, parameters) = self.prepare_metadata(sql).await?;
6464
Ok(OdbcStatement {
6565
sql: Cow::Borrowed(sql),
6666
columns,
Lines changed: 313 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,313 @@
1+
use crate::error::Error;
2+
use crate::odbc::{OdbcArgumentValue, OdbcArguments, OdbcColumn, OdbcQueryResult, OdbcRow, OdbcTypeInfo};
3+
use either::Either;
4+
use flume::{SendError, Sender};
5+
use odbc_api::handles::StatementImpl;
6+
use odbc_api::{Cursor, CursorRow, IntoParameter, Nullable, Preallocated, ResultSetMetadata};
7+
8+
pub type OdbcConn = odbc_api::Connection<'static>;
9+
pub type ExecuteResult = Result<Either<OdbcQueryResult, OdbcRow>, Error>;
10+
pub type ExecuteSender = Sender<ExecuteResult>;
11+
12+
pub fn establish_connection(options: &crate::odbc::OdbcConnectOptions) -> Result<OdbcConn, Error> {
13+
let env = odbc_api::environment().map_err(|e| Error::Configuration(e.to_string().into()))?;
14+
let conn = env
15+
.connect_with_connection_string(options.connection_string(), Default::default())
16+
.map_err(|e| Error::Configuration(e.to_string().into()))?;
17+
Ok(conn)
18+
}
19+
20+
pub fn execute_sql(
21+
conn: &mut OdbcConn,
22+
sql: &str,
23+
args: Option<OdbcArguments>,
24+
tx: &ExecuteSender,
25+
) -> Result<(), Error> {
26+
let params = prepare_parameters(args);
27+
let stmt = &mut conn.preallocate().map_err(Error::from)?;
28+
29+
if let Some(mut cursor) = stmt.execute(sql, &params[..])? {
30+
handle_cursor(&mut cursor, tx);
31+
return Ok(());
32+
}
33+
34+
let affected = extract_rows_affected(stmt);
35+
let _ = send_done(tx, affected);
36+
Ok(())
37+
}
38+
39+
fn extract_rows_affected(stmt: &mut Preallocated<StatementImpl<'_>>) -> u64 {
40+
let count_opt = match stmt.row_count() {
41+
Ok(count_opt) => count_opt,
42+
Err(_) => {
43+
return 0;
44+
}
45+
};
46+
47+
let count = match count_opt {
48+
Some(count) => count,
49+
None => {
50+
return 0;
51+
}
52+
};
53+
54+
u64::try_from(count).unwrap_or_default()
55+
}
56+
57+
fn prepare_parameters(
58+
args: Option<OdbcArguments>,
59+
) -> Vec<Box<dyn odbc_api::parameter::InputParameter>> {
60+
let args = args.map(|a| a.values).unwrap_or_default();
61+
args.into_iter().map(to_param).collect()
62+
}
63+
64+
fn to_param(arg: OdbcArgumentValue) -> Box<dyn odbc_api::parameter::InputParameter + 'static> {
65+
match arg {
66+
OdbcArgumentValue::Int(i) => Box::new(i.into_parameter()),
67+
OdbcArgumentValue::Float(f) => Box::new(f.into_parameter()),
68+
OdbcArgumentValue::Text(s) => Box::new(s.into_parameter()),
69+
OdbcArgumentValue::Bytes(b) => Box::new(b.into_parameter()),
70+
OdbcArgumentValue::Null => Box::new(Option::<String>::None.into_parameter()),
71+
}
72+
}
73+
74+
fn handle_cursor<C>(cursor: &mut C, tx: &ExecuteSender)
75+
where
76+
C: Cursor + ResultSetMetadata,
77+
{
78+
let columns = collect_columns(cursor);
79+
80+
match stream_rows(cursor, &columns, tx) {
81+
Ok(true) => {
82+
let _ = send_done(tx, 0);
83+
}
84+
Ok(false) => {}
85+
Err(e) => {
86+
send_error(tx, e);
87+
}
88+
}
89+
}
90+
91+
fn send_done(tx: &ExecuteSender, rows_affected: u64) -> Result<(), SendError<ExecuteResult>> {
92+
tx.send(Ok(Either::Left(OdbcQueryResult { rows_affected })))
93+
}
94+
95+
fn send_error(tx: &ExecuteSender, error: Error) {
96+
let _ = tx.send(Err(error));
97+
}
98+
99+
fn send_row(tx: &ExecuteSender, row: OdbcRow) -> Result<(), SendError<ExecuteResult>> {
100+
tx.send(Ok(Either::Right(row)))
101+
}
102+
103+
fn collect_columns<C>(cursor: &mut C) -> Vec<OdbcColumn>
104+
where
105+
C: ResultSetMetadata,
106+
{
107+
let count = cursor.num_result_cols().unwrap_or(0);
108+
(1..=count).map(|i| create_column(cursor, i as u16)).collect()
109+
}
110+
111+
fn create_column<C>(cursor: &mut C, index: u16) -> OdbcColumn
112+
where
113+
C: ResultSetMetadata,
114+
{
115+
let mut cd = odbc_api::ColumnDescription::default();
116+
let _ = cursor.describe_col(index, &mut cd);
117+
118+
OdbcColumn {
119+
name: decode_column_name(cd.name, index),
120+
type_info: OdbcTypeInfo::new(cd.data_type),
121+
ordinal: usize::from(index.checked_sub(1).unwrap()),
122+
}
123+
}
124+
125+
fn decode_column_name(name_bytes: Vec<u8>, index: u16) -> String {
126+
String::from_utf8(name_bytes).unwrap_or_else(|_| format!("col{}", index - 1))
127+
}
128+
129+
fn stream_rows<C>(cursor: &mut C, columns: &[OdbcColumn], tx: &ExecuteSender) -> Result<bool, Error>
130+
where
131+
C: Cursor,
132+
{
133+
let mut receiver_open = true;
134+
135+
while let Some(mut row) = cursor.next_row()? {
136+
let values = collect_row_values(&mut row, columns)?;
137+
let row_data = OdbcRow {
138+
columns: columns.to_vec(),
139+
values: values.into_iter().map(|(_, value)| value).collect(),
140+
};
141+
142+
if send_row(tx, row_data).is_err() {
143+
receiver_open = false;
144+
break;
145+
}
146+
}
147+
Ok(receiver_open)
148+
}
149+
150+
fn collect_row_values(
151+
row: &mut CursorRow<'_>,
152+
columns: &[OdbcColumn],
153+
) -> Result<Vec<(OdbcTypeInfo, crate::odbc::OdbcValue)>, Error> {
154+
columns
155+
.iter()
156+
.enumerate()
157+
.map(|(i, column)| collect_column_value(row, i, column))
158+
.collect()
159+
}
160+
161+
fn collect_column_value(
162+
row: &mut CursorRow<'_>,
163+
index: usize,
164+
column: &OdbcColumn,
165+
) -> Result<(OdbcTypeInfo, crate::odbc::OdbcValue), Error> {
166+
use odbc_api::DataType;
167+
168+
let col_idx = (index + 1) as u16;
169+
let type_info = column.type_info.clone();
170+
let data_type = type_info.data_type();
171+
172+
let value = match data_type {
173+
DataType::TinyInt
174+
| DataType::SmallInt
175+
| DataType::Integer
176+
| DataType::BigInt
177+
| DataType::Bit => extract_int(row, col_idx, &type_info)?,
178+
179+
DataType::Real => extract_float::<f32>(row, col_idx, &type_info)?,
180+
DataType::Float { .. } | DataType::Double => extract_float::<f64>(row, col_idx, &type_info)?,
181+
182+
DataType::Char { .. }
183+
| DataType::Varchar { .. }
184+
| DataType::LongVarchar { .. }
185+
| DataType::WChar { .. }
186+
| DataType::WVarchar { .. }
187+
| DataType::WLongVarchar { .. }
188+
| DataType::Date
189+
| DataType::Time { .. }
190+
| DataType::Timestamp { .. }
191+
| DataType::Decimal { .. }
192+
| DataType::Numeric { .. } => extract_text(row, col_idx, &type_info)?,
193+
194+
DataType::Binary { .. } | DataType::Varbinary { .. } | DataType::LongVarbinary { .. } => {
195+
extract_binary(row, col_idx, &type_info)?
196+
}
197+
198+
DataType::Unknown | DataType::Other { .. } => match extract_text(row, col_idx, &type_info) {
199+
Ok(v) => v,
200+
Err(_) => extract_binary(row, col_idx, &type_info)?,
201+
},
202+
};
203+
204+
Ok((type_info, value))
205+
}
206+
207+
fn extract_int(
208+
row: &mut CursorRow<'_>,
209+
col_idx: u16,
210+
type_info: &OdbcTypeInfo,
211+
) -> Result<crate::odbc::OdbcValue, Error> {
212+
let mut nullable = Nullable::<i64>::null();
213+
row.get_data(col_idx, &mut nullable)?;
214+
215+
let (is_null, int) = match nullable.into_opt() {
216+
None => (true, None),
217+
Some(v) => (false, Some(v)),
218+
};
219+
220+
Ok(crate::odbc::OdbcValue {
221+
type_info: type_info.clone(),
222+
is_null,
223+
text: None,
224+
blob: None,
225+
int,
226+
float: None,
227+
})
228+
}
229+
230+
fn extract_float<T>(
231+
row: &mut CursorRow<'_>,
232+
col_idx: u16,
233+
type_info: &OdbcTypeInfo,
234+
) -> Result<crate::odbc::OdbcValue, Error>
235+
where
236+
T: Into<f64> + Default,
237+
odbc_api::Nullable<T>: odbc_api::parameter::CElement + odbc_api::handles::CDataMut,
238+
{
239+
let mut nullable = Nullable::<T>::null();
240+
row.get_data(col_idx, &mut nullable)?;
241+
242+
let (is_null, float) = match nullable.into_opt() {
243+
None => (true, None),
244+
Some(v) => (false, Some(v.into())),
245+
};
246+
247+
Ok(crate::odbc::OdbcValue {
248+
type_info: type_info.clone(),
249+
is_null,
250+
text: None,
251+
blob: None,
252+
int: None,
253+
float,
254+
})
255+
}
256+
257+
fn extract_text(
258+
row: &mut CursorRow<'_>,
259+
col_idx: u16,
260+
type_info: &OdbcTypeInfo,
261+
) -> Result<crate::odbc::OdbcValue, Error> {
262+
let mut buf = Vec::new();
263+
let is_some = row.get_text(col_idx, &mut buf)?;
264+
265+
let (is_null, text) = if !is_some {
266+
(true, None)
267+
} else {
268+
match String::from_utf8(buf) {
269+
Ok(s) => (false, Some(s)),
270+
Err(e) => return Err(Error::Decode(e.into())),
271+
}
272+
};
273+
274+
Ok(crate::odbc::OdbcValue {
275+
type_info: type_info.clone(),
276+
is_null,
277+
text,
278+
blob: None,
279+
int: None,
280+
float: None,
281+
})
282+
}
283+
284+
fn extract_binary(
285+
row: &mut CursorRow<'_>,
286+
col_idx: u16,
287+
type_info: &OdbcTypeInfo,
288+
) -> Result<crate::odbc::OdbcValue, Error> {
289+
let mut buf = Vec::new();
290+
let is_some = row.get_binary(col_idx, &mut buf)?;
291+
292+
let (is_null, blob) = if !is_some { (true, None) } else { (false, Some(buf)) };
293+
294+
Ok(crate::odbc::OdbcValue {
295+
type_info: type_info.clone(),
296+
is_null,
297+
text: None,
298+
blob,
299+
int: None,
300+
float: None,
301+
})
302+
}
303+
304+
pub fn do_prepare(
305+
conn: &mut OdbcConn,
306+
sql: Box<str>,
307+
) -> Result<(u64, Vec<OdbcColumn>, usize), Error> {
308+
let mut prepared = conn.prepare(&sql)?;
309+
let columns = collect_columns(&mut prepared);
310+
let params = usize::from(prepared.num_params().unwrap_or(0));
311+
Ok((0, columns, params))
312+
}
313+

0 commit comments

Comments
 (0)