Skip to content

Commit 19e66fc

Browse files
committed
refactor(odbc): simplify execute_stream implementation
This commit refactors the execute_stream method in OdbcConnection to directly return the receiver from the execute_stream function, eliminating unnecessary complexity. Additionally, the method signature is updated to reflect the change in return type, enhancing clarity in the codebase.
1 parent e6ffb26 commit 19e66fc

File tree

2 files changed

+10
-17
lines changed

2 files changed

+10
-17
lines changed

sqlx-core/src/odbc/connection/executor.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,8 @@ impl<'c> Executor<'c> for &'c mut OdbcConnection {
2020
'c: 'e,
2121
E: Execute<'q, Self::Database> + 'q,
2222
{
23-
let sql = query.sql().to_string();
2423
let args = query.take_arguments();
25-
Box::pin(try_stream! {
26-
let rx = self.execute_stream(&sql, args).await?;
27-
while let Ok(item) = rx.recv_async().await {
28-
r#yield!(item?);
29-
}
30-
Ok(())
31-
})
24+
Box::pin(self.execute_stream(query.sql(), args).into_stream())
3225
}
3326

3427
fn fetch_optional<'e, 'q: 'e, E>(

sqlx-core/src/odbc/connection/mod.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -138,29 +138,29 @@ impl OdbcConnection {
138138
.await
139139
}
140140

141-
pub(crate) async fn execute_stream(
141+
/// Launches a background task to execute the SQL statement and send the results to the returned channel.
142+
pub(crate) fn execute_stream(
142143
&mut self,
143144
sql: &str,
144145
args: Option<OdbcArguments>,
145-
) -> Result<flume::Receiver<Result<Either<OdbcQueryResult, OdbcRow>, Error>>, Error> {
146+
) -> flume::Receiver<Result<Either<OdbcQueryResult, OdbcRow>, Error>> {
146147
let (tx, rx) = flume::bounded(64);
147148

148-
// !!TODO!!!: Put back the prepared statement after usage
149149
let maybe_prepared = if let Some(prepared) = self.stmt_cache.get(sql) {
150150
MaybePrepared::Prepared(Arc::clone(prepared))
151151
} else {
152152
MaybePrepared::NotPrepared(sql.to_string())
153153
};
154154

155-
self.with_conn("execute_stream", move |conn| {
156-
if let Err(e) = execute_sql(conn, maybe_prepared, args, &tx) {
155+
let conn = Arc::clone(&self.conn);
156+
sqlx_rt::spawn(sqlx_rt::spawn_blocking(move || {
157+
let mut conn = conn.lock().expect("failed to lock connection");
158+
if let Err(e) = execute_sql(&mut conn, maybe_prepared, args, &tx) {
157159
let _ = tx.send(Err(e));
158160
}
159-
Ok(())
160-
})
161-
.await?;
161+
}));
162162

163-
Ok(rx)
163+
rx
164164
}
165165

166166
pub(crate) async fn clear_cached_statements(&mut self) -> Result<(), Error> {

0 commit comments

Comments
 (0)