Skip to content

Commit 7869e86

Browse files
cursoragentlovasoa
andcommitted
feat: Implement streaming execution for ODBC connections
Co-authored-by: contact <[email protected]>
1 parent ce5f82e commit 7869e86

File tree

2 files changed

+81
-3
lines changed

2 files changed

+81
-3
lines changed

sqlx-core/src/odbc/connection/executor.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,14 @@ impl<'c> Executor<'c> for &'c mut OdbcConnection {
2121
'c: 'e,
2222
E: Execute<'q, Self::Database> + 'q,
2323
{
24-
let empty: Vec<Result<Either<OdbcQueryResult, OdbcRow>, Error>> = Vec::new();
25-
Box::pin(futures_util::stream::iter(empty))
24+
let sql = _query.sql().to_string();
25+
Box::pin(try_stream! {
26+
let rx = self.worker.execute_stream(&sql).await?;
27+
while let Ok(item) = rx.recv_async().await {
28+
r#yield!(item?);
29+
}
30+
Ok(())
31+
})
2632
}
2733

2834
fn fetch_optional<'e, 'q: 'e, E>(

sqlx-core/src/odbc/connection/worker.rs

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ use futures_channel::oneshot;
55
use futures_intrusive::sync::Mutex;
66

77
use crate::error::Error;
8-
use crate::odbc::OdbcConnectOptions;
8+
use crate::odbc::{OdbcColumn, OdbcConnectOptions, OdbcQueryResult, OdbcRow, OdbcTypeInfo};
9+
use either::Either;
10+
use odbc_api::Cursor;
911

1012
#[derive(Debug)]
1113
pub(crate) struct ConnectionWorker {
@@ -34,6 +36,10 @@ enum Command {
3436
Rollback {
3537
tx: oneshot::Sender<Result<(), Error>>,
3638
},
39+
Execute {
40+
sql: Box<str>,
41+
tx: flume::Sender<Result<Either<OdbcQueryResult, OdbcRow>, Error>>,
42+
},
3743
}
3844

3945
impl ConnectionWorker {
@@ -119,6 +125,60 @@ impl ConnectionWorker {
119125
let _ = tx.send(());
120126
return;
121127
}
128+
Command::Execute { sql, tx } => {
129+
// Helper closure to process using a given connection reference
130+
let process = |conn: &odbc_api::Connection<'static>| {
131+
match conn.execute(&sql, (), None) {
132+
Ok(Some(mut cursor)) => {
133+
use odbc_api::ResultSetMetadata;
134+
let mut columns: Vec<OdbcColumn> = Vec::new();
135+
if let Ok(count) = cursor.num_result_cols() {
136+
for i in 1..=count {
137+
let mut cd = odbc_api::ColumnDescription::default();
138+
let _ = cursor.describe_col(i as u16, &mut cd);
139+
let name = String::from_utf8(cd.name)
140+
.unwrap_or_else(|_| format!("col{}", i - 1));
141+
columns.push(OdbcColumn {
142+
name,
143+
type_info: OdbcTypeInfo { name: format!("{:?}", cd.data_type), is_null: false },
144+
ordinal: (i - 1) as usize,
145+
});
146+
}
147+
}
148+
149+
while let Ok(Some(mut row)) = cursor.next_row() {
150+
let mut values: Vec<(OdbcTypeInfo, Option<Vec<u8>>)> = Vec::with_capacity(columns.len());
151+
for i in 1..=columns.len() {
152+
let mut buf = Vec::new();
153+
match row.get_text(i as u16, &mut buf) {
154+
Ok(true) => values.push((OdbcTypeInfo { name: "TEXT".into(), is_null: false }, Some(buf))),
155+
Ok(false) => values.push((OdbcTypeInfo { name: "TEXT".into(), is_null: true }, None)),
156+
Err(e) => {
157+
let _ = tx.send(Err(Error::from(e)));
158+
return;
159+
}
160+
}
161+
}
162+
let _ = tx.send(Ok(Either::Right(OdbcRow { columns: columns.clone(), values })));
163+
}
164+
let _ = tx.send(Ok(Either::Left(OdbcQueryResult { rows_affected: 0 })));
165+
}
166+
Ok(None) => {
167+
let _ = tx.send(Ok(Either::Left(OdbcQueryResult { rows_affected: 0 })));
168+
}
169+
Err(e) => {
170+
let _ = tx.send(Err(Error::from(e)));
171+
}
172+
}
173+
};
174+
175+
if let Some(conn) = shared.conn.try_lock() {
176+
process(&conn);
177+
} else {
178+
let guard = futures_executor::block_on(shared.conn.lock());
179+
process(&guard);
180+
}
181+
}
122182
}
123183
}
124184
})?;
@@ -173,4 +233,16 @@ impl ConnectionWorker {
173233
rx.await.map_err(|_| Error::WorkerCrashed)??;
174234
Ok(())
175235
}
236+
237+
pub(crate) async fn execute_stream(
238+
&mut self,
239+
sql: &str,
240+
) -> Result<flume::Receiver<Result<Either<OdbcQueryResult, OdbcRow>, Error>>, Error> {
241+
let (tx, rx) = flume::bounded(64);
242+
self.command_tx
243+
.send_async(Command::Execute { sql: sql.into(), tx })
244+
.await
245+
.map_err(|_| Error::WorkerCrashed)?;
246+
Ok(rx)
247+
}
176248
}

0 commit comments

Comments
 (0)