Skip to content

Commit b27505b

Browse files
committed
refactor(any): Implement fetch_optional in terms of fetch_many
Signed-off-by: Joshua Potts <[email protected]>
1 parent e3d86a3 commit b27505b

File tree

4 files changed

+50
-82
lines changed

4 files changed

+50
-82
lines changed

sqlx-core/src/any/connection/backend.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::sql_str::SqlStr;
44
use either::Either;
55
use futures_core::future::BoxFuture;
66
use futures_core::stream::BoxStream;
7+
use futures_util::TryStreamExt;
78
use std::fmt::Debug;
89

910
pub trait AnyConnectionBackend: std::any::Any + Debug + Send + 'static {
@@ -106,7 +107,19 @@ pub trait AnyConnectionBackend: std::any::Any + Debug + Send + 'static {
106107
query: SqlStr,
107108
persistent: bool,
108109
arguments: Option<AnyArguments>,
109-
) -> BoxFuture<'_, crate::Result<Option<AnyRow>>>;
110+
) -> BoxFuture<'_, crate::Result<Option<AnyRow>>> {
111+
let mut stream = self.fetch_many(query, persistent, arguments);
112+
113+
Box::pin(async move {
114+
while let Some(result) = stream.try_next().await? {
115+
if let Either::Right(row) = result {
116+
return Ok(Some(row));
117+
}
118+
}
119+
120+
Ok(None)
121+
})
122+
}
110123

111124
fn prepare_with<'c, 'q: 'c>(
112125
&'c mut self,

sqlx-mysql/src/any.rs

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::{
66
use either::Either;
77
use futures_core::future::BoxFuture;
88
use futures_core::stream::BoxStream;
9-
use futures_util::{stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
9+
use futures_util::{stream, FutureExt, StreamExt, TryFutureExt};
1010
use sqlx_core::any::{
1111
Any, AnyArguments, AnyColumn, AnyConnectOptions, AnyConnectionBackend, AnyQueryResult, AnyRow,
1212
AnyStatement, AnyTypeInfo, AnyTypeInfoKind,
@@ -17,7 +17,7 @@ use sqlx_core::describe::Describe;
1717
use sqlx_core::executor::Executor;
1818
use sqlx_core::sql_str::SqlStr;
1919
use sqlx_core::transaction::TransactionManager;
20-
use std::{future, pin::pin};
20+
use std::future;
2121

2222
sqlx_core::declare_driver_with_optional_migrate!(DRIVER = MySql);
2323

@@ -103,32 +103,6 @@ impl AnyConnectionBackend for MySqlConnection {
103103
)
104104
}
105105

106-
fn fetch_optional(
107-
&mut self,
108-
query: SqlStr,
109-
persistent: bool,
110-
arguments: Option<AnyArguments>,
111-
) -> BoxFuture<'_, sqlx_core::Result<Option<AnyRow>>> {
112-
let persistent = persistent && arguments.is_some();
113-
let arguments = arguments
114-
.map(AnyArguments::convert_into)
115-
.transpose()
116-
.map_err(sqlx_core::Error::Encode);
117-
118-
Box::pin(async move {
119-
let arguments = arguments?;
120-
let mut stream = pin!(self.run(query, arguments, persistent).await?);
121-
122-
while let Some(result) = stream.try_next().await? {
123-
if let Either::Right(row) = result {
124-
return Ok(Some(AnyRow::try_from(&row)?));
125-
}
126-
}
127-
128-
Ok(None)
129-
})
130-
}
131-
132106
fn prepare_with<'c, 'q: 'c>(
133107
&'c mut self,
134108
sql: SqlStr,

sqlx-postgres/src/any.rs

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ use crate::{
44
};
55
use futures_core::future::BoxFuture;
66
use futures_core::stream::BoxStream;
7-
use futures_util::{stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
7+
use futures_util::{stream, FutureExt, StreamExt, TryFutureExt};
88
use sqlx_core::sql_str::SqlStr;
9-
use std::{future, pin::pin};
9+
use std::future;
1010

1111
use sqlx_core::any::{
1212
Any, AnyArguments, AnyColumn, AnyConnectOptions, AnyConnectionBackend, AnyQueryResult, AnyRow,
@@ -105,30 +105,6 @@ impl AnyConnectionBackend for PgConnection {
105105
)
106106
}
107107

108-
fn fetch_optional(
109-
&mut self,
110-
query: SqlStr,
111-
persistent: bool,
112-
arguments: Option<AnyArguments>,
113-
) -> BoxFuture<sqlx_core::Result<Option<AnyRow>>> {
114-
let persistent = persistent && arguments.is_some();
115-
let arguments = arguments
116-
.map(AnyArguments::convert_into)
117-
.transpose()
118-
.map_err(sqlx_core::Error::Encode);
119-
120-
Box::pin(async move {
121-
let arguments = arguments?;
122-
let mut stream = pin!(self.run(query, arguments, persistent, None).await?);
123-
124-
if let Some(Either::Right(row)) = stream.try_next().await? {
125-
return Ok(Some(AnyRow::try_from(&row)?));
126-
}
127-
128-
Ok(None)
129-
})
130-
}
131-
132108
fn prepare_with<'c, 'q: 'c>(
133109
&'c mut self,
134110
sql: SqlStr,

sqlx-sqlite/src/any.rs

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use sqlx_core::database::Database;
1919
use sqlx_core::describe::Describe;
2020
use sqlx_core::executor::Executor;
2121
use sqlx_core::transaction::TransactionManager;
22-
use std::pin::pin;
2322
use std::sync::Arc;
2423

2524
sqlx_core::declare_driver_with_optional_migrate!(DRIVER = Sqlite);
@@ -86,21 +85,7 @@ impl AnyConnectionBackend for SqliteConnection {
8685
persistent: bool,
8786
arguments: Option<AnyArguments>,
8887
) -> BoxStream<'_, sqlx_core::Result<Either<AnyQueryResult, AnyRow>>> {
89-
let persistent = persistent && arguments.is_some();
90-
let args = arguments.map(map_arguments);
91-
92-
Box::pin(
93-
self.worker
94-
.execute(query, args, self.row_channel_size, persistent, None)
95-
.map_ok(flume::Receiver::into_stream)
96-
.try_flatten_stream()
97-
.map(
98-
move |res: sqlx_core::Result<Either<SqliteQueryResult, SqliteRow>>| match res? {
99-
Either::Left(result) => Ok(Either::Left(map_result(result))),
100-
Either::Right(row) => Ok(Either::Right(AnyRow::try_from(&row)?)),
101-
},
102-
),
103-
)
88+
self.fetch_with_limit(query, persistent, arguments, None)
10489
}
10590

10691
fn fetch_optional(
@@ -109,19 +94,13 @@ impl AnyConnectionBackend for SqliteConnection {
10994
persistent: bool,
11095
arguments: Option<AnyArguments>,
11196
) -> BoxFuture<'_, sqlx_core::Result<Option<AnyRow>>> {
112-
let persistent = persistent && arguments.is_some();
113-
let args = arguments.map(map_arguments);
97+
let mut stream = self.fetch_with_limit(query, persistent, arguments, Some(1));
11498

11599
Box::pin(async move {
116-
let mut stream = pin!(
117-
self.worker
118-
.execute(query, args, self.row_channel_size, persistent, Some(1))
119-
.map_ok(flume::Receiver::into_stream)
120-
.await?
121-
);
122-
123-
if let Some(Either::Right(row)) = stream.try_next().await? {
124-
return Ok(Some(AnyRow::try_from(&row)?));
100+
while let Some(result) = stream.try_next().await? {
101+
if let Either::Right(row) = result {
102+
return Ok(Some(row));
103+
}
125104
}
126105

127106
Ok(None)
@@ -145,6 +124,32 @@ impl AnyConnectionBackend for SqliteConnection {
145124
}
146125
}
147126

127+
impl SqliteConnection {
128+
fn fetch_with_limit(
129+
&mut self,
130+
query: SqlStr,
131+
persistent: bool,
132+
arguments: Option<AnyArguments>,
133+
limit: Option<usize>,
134+
) -> BoxStream<'_, sqlx_core::Result<Either<AnyQueryResult, AnyRow>>> {
135+
let persistent = persistent && arguments.is_some();
136+
let args = arguments.map(map_arguments);
137+
138+
Box::pin(
139+
self.worker
140+
.execute(query, args, self.row_channel_size, persistent, limit)
141+
.map_ok(flume::Receiver::into_stream)
142+
.try_flatten_stream()
143+
.map(
144+
move |res: sqlx_core::Result<Either<SqliteQueryResult, SqliteRow>>| match res? {
145+
Either::Left(result) => Ok(Either::Left(map_result(result))),
146+
Either::Right(row) => Ok(Either::Right(AnyRow::try_from(&row)?)),
147+
},
148+
),
149+
)
150+
}
151+
}
152+
148153
impl<'a> TryFrom<&'a SqliteTypeInfo> for AnyTypeInfo {
149154
type Error = sqlx_core::Error;
150155

0 commit comments

Comments
 (0)